Coverage for mongo/utils.py: 100%
87 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-14 03:01 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-14 03:01 +0000
1import abc
2import hashlib
3import os
4from functools import wraps
5from typing import Dict, Optional, Any, TYPE_CHECKING
6from flask import current_app
7from minio import Minio
8import redis
9from . import engine
10from .config import MINIO_HOST, MINIO_SECRET_KEY, MINIO_ACCESS_KEY, MINIO_BUCKET
12if TYPE_CHECKING:
13 from .user import User # pragma: no cover
14 from .problem import Problem # pragma: no cover
16__all__ = (
17 'hash_id',
18 'perm',
19 'RedisCache',
20 'doc_required',
21 'drop_none',
22)
25def hash_id(salt, text):
26 text = ((salt or '') + (text or '')).encode()
27 sha = hashlib.sha3_512(text)
28 return sha.hexdigest()[:24]
31def perm(course, user):
32 '''4: admin, 3: teacher, 2: TA, 1: student, 0: not found
33 '''
34 return 4 - [
35 user.role == 0, user == course.teacher, user in course.tas,
36 user.username in course.student_nicknames.keys(), True
37 ].index(True)
40class Cache(abc.ABC):
42 @abc.abstractmethod
43 def exists(self, key: str) -> bool:
44 '''
45 check whether a value exists
46 '''
47 raise NotImplementedError # pragma: no cover
49 @abc.abstractmethod
50 def get(self, key: str):
51 '''
52 get value by key
53 '''
54 raise NotImplementedError # pragma: no cover
56 @abc.abstractmethod
57 def set(self, key: str, value, ex: Optional[int] = None):
58 '''
59 set a value and set expire time in seconds
60 '''
61 raise NotImplementedError # pragma: no cover
63 @abc.abstractmethod
64 def delete(self, key: str):
65 '''
66 delete a value by key
67 '''
68 raise NotImplementedError # pragma: no cover
71class RedisCache(Cache):
72 POOL = None
74 def __new__(cls) -> Any:
75 if cls.POOL is None:
76 cls.HOST = os.getenv('REDIS_HOST')
77 cls.PORT = os.getenv('REDIS_PORT')
78 cls.POOL = redis.ConnectionPool(
79 host=cls.HOST,
80 port=cls.PORT,
81 db=0,
82 )
84 return super().__new__(cls)
86 def __init__(self) -> None:
87 self._client = None
89 @property
90 def client(self):
91 if self._client is None:
92 if self.PORT is None:
93 import fakeredis
94 self._client = fakeredis.FakeStrictRedis()
95 else:
96 self._client = redis.Redis(connection_pool=self.POOL)
97 return self._client
99 def exists(self, key: str) -> bool:
100 return self.client.exists(key)
102 def get(self, key: str):
103 return self.client.get(key)
105 def delete(self, key: str):
106 return self.client.delete(key)
108 def set(self, key: str, value, ex: Optional[int] = None):
109 return self.client.set(key, value, ex=ex)
112def doc_required(
113 src,
114 des,
115 cls=None,
116 src_none_allowed=False,
117):
118 '''
119 query db to inject document into functions.
120 if the document does not exist in db, raise `engine.DoesNotExist`.
121 if `src` not in parameters, this funtcion will raise `TypeError`
122 `doc_required` will check the existence of `des` in `func` parameters,
123 if `des` is exist, this function will override it, so `src == des`
124 are acceptable
125 '''
126 # user the same name for `src` and `des`
127 # e.g. `doc_required('user', User)` will replace parameter `user`
128 if cls is None:
129 cls = des
130 des = src
132 def deco(func):
134 @wraps(func)
135 def wrapper(*args, **ks):
136 # try get source param
137 if src not in ks:
138 raise TypeError(f'{src} not found in function argument')
139 src_param = ks.get(src)
140 # convert it to document
141 # TODO: add type checking, whether the cls is a subclass of `MongoBase`
142 # or maybe it is not need
143 if type(cls) != type:
144 raise TypeError('cls must be a type')
145 # process `None`
146 if src_param is None:
147 if not src_none_allowed:
148 raise ValueError('src can not be None')
149 doc = None
150 elif not isinstance(src_param, cls):
151 doc = cls(src_param)
152 # or, it is already target class instance
153 else:
154 doc = src_param
155 # not None and non-existent
156 if doc is not None and not doc:
157 raise engine.DoesNotExist(f'{doc} not found!')
158 # replace original paramters
159 del ks[src]
160 if des in ks:
161 current_app.logger.warning(
162 f'replace a existed argument in {func}')
163 ks[des] = doc
164 return func(*args, **ks)
166 return wrapper
168 return deco
171def drop_none(d: Dict):
172 return {k: v for k, v in d.items() if v is not None}
175class MinioClient:
177 def __init__(self):
178 self.client = Minio(
179 MINIO_HOST,
180 access_key=MINIO_ACCESS_KEY,
181 secret_key=MINIO_SECRET_KEY,
182 )
183 self.bucket = MINIO_BUCKET