Coverage for mongo/utils.py: 100%
81 statements
« prev ^ index » next coverage.py v7.3.2, created at 2024-11-05 04:22 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2024-11-05 04:22 +0000
1import abc
2import hashlib
3import os
4from flask import current_app
5import redis
6from functools import wraps
7from typing import Dict, Optional, Any, TYPE_CHECKING
8from . import engine
10if TYPE_CHECKING:
11 from .user import User # pragma: no cover
12 from .problem import Problem # pragma: no cover
14__all__ = (
15 'hash_id',
16 'perm',
17 'RedisCache',
18 'doc_required',
19 'drop_none',
20)
23def hash_id(salt, text):
24 text = ((salt or '') + (text or '')).encode()
25 sha = hashlib.sha3_512(text)
26 return sha.hexdigest()[:24]
29def perm(course, user):
30 '''4: admin, 3: teacher, 2: TA, 1: student, 0: not found
31 '''
32 return 4 - [
33 user.role == 0, user == course.teacher, user in course.tas,
34 user.username in course.student_nicknames.keys(), True
35 ].index(True)
38class Cache(abc.ABC):
40 @abc.abstractmethod
41 def exists(self, key: str) -> bool:
42 '''
43 check whether a value exists
44 '''
45 raise NotImplementedError # pragma: no cover
47 @abc.abstractmethod
48 def get(self, key: str):
49 '''
50 get value by key
51 '''
52 raise NotImplementedError # pragma: no cover
54 @abc.abstractmethod
55 def set(self, key: str, value, ex: Optional[int] = None):
56 '''
57 set a value and set expire time in seconds
58 '''
59 raise NotImplementedError # pragma: no cover
61 @abc.abstractmethod
62 def delete(self, key: str):
63 '''
64 delete a value by key
65 '''
66 raise NotImplementedError # pragma: no cover
69class RedisCache(Cache):
70 POOL = None
72 def __new__(cls) -> Any:
73 if cls.POOL is None:
74 cls.HOST = os.getenv('REDIS_HOST')
75 cls.PORT = os.getenv('REDIS_PORT')
76 cls.POOL = redis.ConnectionPool(
77 host=cls.HOST,
78 port=cls.PORT,
79 db=0,
80 )
82 return super().__new__(cls)
84 def __init__(self) -> None:
85 self._client = None
87 @property
88 def client(self):
89 if self._client is None:
90 if self.PORT is None:
91 import fakeredis
92 self._client = fakeredis.FakeStrictRedis()
93 else:
94 self._client = redis.Redis(connection_pool=self.POOL)
95 return self._client
97 def exists(self, key: str) -> bool:
98 return self.client.exists(key)
100 def get(self, key: str):
101 return self.client.get(key)
103 def delete(self, key: str):
104 return self.client.delete(key)
106 def set(self, key: str, value, ex: Optional[int] = None):
107 return self.client.set(key, value, ex=ex)
110def doc_required(
111 src,
112 des,
113 cls=None,
114 src_none_allowed=False,
115):
116 '''
117 query db to inject document into functions.
118 if the document does not exist in db, raise `engine.DoesNotExist`.
119 if `src` not in parameters, this funtcion will raise `TypeError`
120 `doc_required` will check the existence of `des` in `func` parameters,
121 if `des` is exist, this function will override it, so `src == des`
122 are acceptable
123 '''
124 # user the same name for `src` and `des`
125 # e.g. `doc_required('user', User)` will replace parameter `user`
126 if cls is None:
127 cls = des
128 des = src
130 def deco(func):
132 @wraps(func)
133 def wrapper(*args, **ks):
134 # try get source param
135 if src not in ks:
136 raise TypeError(f'{src} not found in function argument')
137 src_param = ks.get(src)
138 # convert it to document
139 # TODO: add type checking, whether the cls is a subclass of `MongoBase`
140 # or maybe it is not need
141 if type(cls) != type:
142 raise TypeError('cls must be a type')
143 # process `None`
144 if src_param is None:
145 if not src_none_allowed:
146 raise ValueError('src can not be None')
147 doc = None
148 elif not isinstance(src_param, cls):
149 doc = cls(src_param)
150 # or, it is already target class instance
151 else:
152 doc = src_param
153 # not None and non-existent
154 if doc is not None and not doc:
155 raise engine.DoesNotExist(f'{doc} not found!')
156 # replace original paramters
157 del ks[src]
158 if des in ks:
159 current_app.logger.warning(
160 f'replace a existed argument in {func}')
161 ks[des] = doc
162 return func(*args, **ks)
164 return wrapper
166 return deco
169def drop_none(d: Dict):
170 return {k: v for k, v in d.items() if v is not None}