Coverage for mongo/utils.py: 100%
88 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-11 18:37 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-11 18:37 +0000
1import abc
2import hashlib
3import os
4from functools import wraps
5from typing import Dict, Optional, Any, TYPE_CHECKING
6from flask import current_app
7from minio import Minio
8import redis
9from . import engine
10from . import config
11from .config import FLASK_DEBUG, MINIO_HOST, MINIO_SECRET_KEY, MINIO_ACCESS_KEY, MINIO_BUCKET
13if TYPE_CHECKING:
14 from .user import User # pragma: no cover
15 from .problem import Problem # pragma: no cover
17__all__ = (
18 'hash_id',
19 'perm',
20 'RedisCache',
21 'doc_required',
22 'drop_none',
23)
26def hash_id(salt, text):
27 text = ((salt or '') + (text or '')).encode()
28 sha = hashlib.sha3_512(text)
29 return sha.hexdigest()[:24]
32def perm(course, user):
33 '''4: admin, 3: teacher, 2: TA, 1: student, 0: not found
34 '''
35 return 4 - [
36 user.role == 0, user == course.teacher, user in course.tas,
37 user.username in course.student_nicknames.keys(), True
38 ].index(True)
41class Cache(abc.ABC):
43 @abc.abstractmethod
44 def exists(self, key: str) -> bool:
45 '''
46 check whether a value exists
47 '''
48 raise NotImplementedError # pragma: no cover
50 @abc.abstractmethod
51 def get(self, key: str):
52 '''
53 get value by key
54 '''
55 raise NotImplementedError # pragma: no cover
57 @abc.abstractmethod
58 def set(self, key: str, value, ex: Optional[int] = None):
59 '''
60 set a value and set expire time in seconds
61 '''
62 raise NotImplementedError # pragma: no cover
64 @abc.abstractmethod
65 def delete(self, key: str):
66 '''
67 delete a value by key
68 '''
69 raise NotImplementedError # pragma: no cover
72class RedisCache(Cache):
73 POOL = None
75 def __new__(cls) -> Any:
76 if cls.POOL is None:
77 cls.HOST = os.getenv('REDIS_HOST')
78 cls.PORT = os.getenv('REDIS_PORT')
79 cls.POOL = redis.ConnectionPool(
80 host=cls.HOST,
81 port=cls.PORT,
82 db=0,
83 )
85 return super().__new__(cls)
87 def __init__(self) -> None:
88 self._client = None
90 @property
91 def client(self):
92 if self._client is None:
93 if self.PORT is None:
94 import fakeredis
95 self._client = fakeredis.FakeStrictRedis()
96 else:
97 self._client = redis.Redis(connection_pool=self.POOL)
98 return self._client
100 def exists(self, key: str) -> bool:
101 return self.client.exists(key)
103 def get(self, key: str):
104 return self.client.get(key)
106 def delete(self, key: str):
107 return self.client.delete(key)
109 def set(self, key: str, value, ex: Optional[int] = None):
110 return self.client.set(key, value, ex=ex)
113def doc_required(
114 src,
115 des,
116 cls=None,
117 src_none_allowed=False,
118):
119 '''
120 query db to inject document into functions.
121 if the document does not exist in db, raise `engine.DoesNotExist`.
122 if `src` not in parameters, this funtcion will raise `TypeError`
123 `doc_required` will check the existence of `des` in `func` parameters,
124 if `des` is exist, this function will override it, so `src == des`
125 are acceptable
126 '''
127 # user the same name for `src` and `des`
128 # e.g. `doc_required('user', User)` will replace parameter `user`
129 if cls is None:
130 cls = des
131 des = src
133 def deco(func):
135 @wraps(func)
136 def wrapper(*args, **ks):
137 # try get source param
138 if src not in ks:
139 raise TypeError(f'{src} not found in function argument')
140 src_param = ks.get(src)
141 # convert it to document
142 # TODO: add type checking, whether the cls is a subclass of `MongoBase`
143 # or maybe it is not need
144 if type(cls) != type:
145 raise TypeError('cls must be a type')
146 # process `None`
147 if src_param is None:
148 if not src_none_allowed:
149 raise ValueError('src can not be None')
150 doc = None
151 elif not isinstance(src_param, cls):
152 doc = cls(src_param)
153 # or, it is already target class instance
154 else:
155 doc = src_param
156 # not None and non-existent
157 if doc is not None and not doc:
158 raise engine.DoesNotExist(f'{doc} not found!')
159 # replace original paramters
160 del ks[src]
161 if des in ks:
162 current_app.logger.warning(
163 f'replace a existed argument in {func}')
164 ks[des] = doc
165 return func(*args, **ks)
167 return wrapper
169 return deco
172def drop_none(d: Dict):
173 return {k: v for k, v in d.items() if v is not None}
176class MinioClient:
178 def __init__(self):
179 self.client = Minio(
180 config.MINIO_HOST,
181 access_key=config.MINIO_ACCESS_KEY,
182 secret_key=config.MINIO_SECRET_KEY,
183 secure=not config.FLASK_DEBUG,
184 )
185 self.bucket = config.MINIO_BUCKET