Coverage for mongo/utils.py: 100%

88 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-11 18:37 +0000

1import abc 

2import hashlib 

3import os 

4from functools import wraps 

5from typing import Dict, Optional, Any, TYPE_CHECKING 

6from flask import current_app 

7from minio import Minio 

8import redis 

9from . import engine 

10from . import config 

11from .config import FLASK_DEBUG, MINIO_HOST, MINIO_SECRET_KEY, MINIO_ACCESS_KEY, MINIO_BUCKET 

12 

13if TYPE_CHECKING: 

14 from .user import User # pragma: no cover 

15 from .problem import Problem # pragma: no cover 

16 

17__all__ = ( 

18 'hash_id', 

19 'perm', 

20 'RedisCache', 

21 'doc_required', 

22 'drop_none', 

23) 

24 

25 

26def hash_id(salt, text): 

27 text = ((salt or '') + (text or '')).encode() 

28 sha = hashlib.sha3_512(text) 

29 return sha.hexdigest()[:24] 

30 

31 

32def perm(course, user): 

33 '''4: admin, 3: teacher, 2: TA, 1: student, 0: not found 

34 ''' 

35 return 4 - [ 

36 user.role == 0, user == course.teacher, user in course.tas, 

37 user.username in course.student_nicknames.keys(), True 

38 ].index(True) 

39 

40 

41class Cache(abc.ABC): 

42 

43 @abc.abstractmethod 

44 def exists(self, key: str) -> bool: 

45 ''' 

46 check whether a value exists 

47 ''' 

48 raise NotImplementedError # pragma: no cover 

49 

50 @abc.abstractmethod 

51 def get(self, key: str): 

52 ''' 

53 get value by key 

54 ''' 

55 raise NotImplementedError # pragma: no cover 

56 

57 @abc.abstractmethod 

58 def set(self, key: str, value, ex: Optional[int] = None): 

59 ''' 

60 set a value and set expire time in seconds 

61 ''' 

62 raise NotImplementedError # pragma: no cover 

63 

64 @abc.abstractmethod 

65 def delete(self, key: str): 

66 ''' 

67 delete a value by key 

68 ''' 

69 raise NotImplementedError # pragma: no cover 

70 

71 

72class RedisCache(Cache): 

73 POOL = None 

74 

75 def __new__(cls) -> Any: 

76 if cls.POOL is None: 

77 cls.HOST = os.getenv('REDIS_HOST') 

78 cls.PORT = os.getenv('REDIS_PORT') 

79 cls.POOL = redis.ConnectionPool( 

80 host=cls.HOST, 

81 port=cls.PORT, 

82 db=0, 

83 ) 

84 

85 return super().__new__(cls) 

86 

87 def __init__(self) -> None: 

88 self._client = None 

89 

90 @property 

91 def client(self): 

92 if self._client is None: 

93 if self.PORT is None: 

94 import fakeredis 

95 self._client = fakeredis.FakeStrictRedis() 

96 else: 

97 self._client = redis.Redis(connection_pool=self.POOL) 

98 return self._client 

99 

100 def exists(self, key: str) -> bool: 

101 return self.client.exists(key) 

102 

103 def get(self, key: str): 

104 return self.client.get(key) 

105 

106 def delete(self, key: str): 

107 return self.client.delete(key) 

108 

109 def set(self, key: str, value, ex: Optional[int] = None): 

110 return self.client.set(key, value, ex=ex) 

111 

112 

113def doc_required( 

114 src, 

115 des, 

116 cls=None, 

117 src_none_allowed=False, 

118): 

119 ''' 

120 query db to inject document into functions. 

121 if the document does not exist in db, raise `engine.DoesNotExist`. 

122 if `src` not in parameters, this funtcion will raise `TypeError` 

123 `doc_required` will check the existence of `des` in `func` parameters, 

124 if `des` is exist, this function will override it, so `src == des` 

125 are acceptable 

126 ''' 

127 # user the same name for `src` and `des` 

128 # e.g. `doc_required('user', User)` will replace parameter `user` 

129 if cls is None: 

130 cls = des 

131 des = src 

132 

133 def deco(func): 

134 

135 @wraps(func) 

136 def wrapper(*args, **ks): 

137 # try get source param 

138 if src not in ks: 

139 raise TypeError(f'{src} not found in function argument') 

140 src_param = ks.get(src) 

141 # convert it to document 

142 # TODO: add type checking, whether the cls is a subclass of `MongoBase` 

143 # or maybe it is not need 

144 if type(cls) != type: 

145 raise TypeError('cls must be a type') 

146 # process `None` 

147 if src_param is None: 

148 if not src_none_allowed: 

149 raise ValueError('src can not be None') 

150 doc = None 

151 elif not isinstance(src_param, cls): 

152 doc = cls(src_param) 

153 # or, it is already target class instance 

154 else: 

155 doc = src_param 

156 # not None and non-existent 

157 if doc is not None and not doc: 

158 raise engine.DoesNotExist(f'{doc} not found!') 

159 # replace original paramters 

160 del ks[src] 

161 if des in ks: 

162 current_app.logger.warning( 

163 f'replace a existed argument in {func}') 

164 ks[des] = doc 

165 return func(*args, **ks) 

166 

167 return wrapper 

168 

169 return deco 

170 

171 

172def drop_none(d: Dict): 

173 return {k: v for k, v in d.items() if v is not None} 

174 

175 

176class MinioClient: 

177 

178 def __init__(self): 

179 self.client = Minio( 

180 config.MINIO_HOST, 

181 access_key=config.MINIO_ACCESS_KEY, 

182 secret_key=config.MINIO_SECRET_KEY, 

183 secure=not config.FLASK_DEBUG, 

184 ) 

185 self.bucket = config.MINIO_BUCKET