Coverage for mongo/utils.py: 100%

87 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-14 03:01 +0000

1import abc 

2import hashlib 

3import os 

4from functools import wraps 

5from typing import Dict, Optional, Any, TYPE_CHECKING 

6from flask import current_app 

7from minio import Minio 

8import redis 

9from . import engine 

10from .config import MINIO_HOST, MINIO_SECRET_KEY, MINIO_ACCESS_KEY, MINIO_BUCKET 

11 

12if TYPE_CHECKING: 

13 from .user import User # pragma: no cover 

14 from .problem import Problem # pragma: no cover 

15 

16__all__ = ( 

17 'hash_id', 

18 'perm', 

19 'RedisCache', 

20 'doc_required', 

21 'drop_none', 

22) 

23 

24 

25def hash_id(salt, text): 

26 text = ((salt or '') + (text or '')).encode() 

27 sha = hashlib.sha3_512(text) 

28 return sha.hexdigest()[:24] 

29 

30 

31def perm(course, user): 

32 '''4: admin, 3: teacher, 2: TA, 1: student, 0: not found 

33 ''' 

34 return 4 - [ 

35 user.role == 0, user == course.teacher, user in course.tas, 

36 user.username in course.student_nicknames.keys(), True 

37 ].index(True) 

38 

39 

40class Cache(abc.ABC): 

41 

42 @abc.abstractmethod 

43 def exists(self, key: str) -> bool: 

44 ''' 

45 check whether a value exists 

46 ''' 

47 raise NotImplementedError # pragma: no cover 

48 

49 @abc.abstractmethod 

50 def get(self, key: str): 

51 ''' 

52 get value by key 

53 ''' 

54 raise NotImplementedError # pragma: no cover 

55 

56 @abc.abstractmethod 

57 def set(self, key: str, value, ex: Optional[int] = None): 

58 ''' 

59 set a value and set expire time in seconds 

60 ''' 

61 raise NotImplementedError # pragma: no cover 

62 

63 @abc.abstractmethod 

64 def delete(self, key: str): 

65 ''' 

66 delete a value by key 

67 ''' 

68 raise NotImplementedError # pragma: no cover 

69 

70 

71class RedisCache(Cache): 

72 POOL = None 

73 

74 def __new__(cls) -> Any: 

75 if cls.POOL is None: 

76 cls.HOST = os.getenv('REDIS_HOST') 

77 cls.PORT = os.getenv('REDIS_PORT') 

78 cls.POOL = redis.ConnectionPool( 

79 host=cls.HOST, 

80 port=cls.PORT, 

81 db=0, 

82 ) 

83 

84 return super().__new__(cls) 

85 

86 def __init__(self) -> None: 

87 self._client = None 

88 

89 @property 

90 def client(self): 

91 if self._client is None: 

92 if self.PORT is None: 

93 import fakeredis 

94 self._client = fakeredis.FakeStrictRedis() 

95 else: 

96 self._client = redis.Redis(connection_pool=self.POOL) 

97 return self._client 

98 

99 def exists(self, key: str) -> bool: 

100 return self.client.exists(key) 

101 

102 def get(self, key: str): 

103 return self.client.get(key) 

104 

105 def delete(self, key: str): 

106 return self.client.delete(key) 

107 

108 def set(self, key: str, value, ex: Optional[int] = None): 

109 return self.client.set(key, value, ex=ex) 

110 

111 

112def doc_required( 

113 src, 

114 des, 

115 cls=None, 

116 src_none_allowed=False, 

117): 

118 ''' 

119 query db to inject document into functions. 

120 if the document does not exist in db, raise `engine.DoesNotExist`. 

121 if `src` not in parameters, this funtcion will raise `TypeError` 

122 `doc_required` will check the existence of `des` in `func` parameters, 

123 if `des` is exist, this function will override it, so `src == des` 

124 are acceptable 

125 ''' 

126 # user the same name for `src` and `des` 

127 # e.g. `doc_required('user', User)` will replace parameter `user` 

128 if cls is None: 

129 cls = des 

130 des = src 

131 

132 def deco(func): 

133 

134 @wraps(func) 

135 def wrapper(*args, **ks): 

136 # try get source param 

137 if src not in ks: 

138 raise TypeError(f'{src} not found in function argument') 

139 src_param = ks.get(src) 

140 # convert it to document 

141 # TODO: add type checking, whether the cls is a subclass of `MongoBase` 

142 # or maybe it is not need 

143 if type(cls) != type: 

144 raise TypeError('cls must be a type') 

145 # process `None` 

146 if src_param is None: 

147 if not src_none_allowed: 

148 raise ValueError('src can not be None') 

149 doc = None 

150 elif not isinstance(src_param, cls): 

151 doc = cls(src_param) 

152 # or, it is already target class instance 

153 else: 

154 doc = src_param 

155 # not None and non-existent 

156 if doc is not None and not doc: 

157 raise engine.DoesNotExist(f'{doc} not found!') 

158 # replace original paramters 

159 del ks[src] 

160 if des in ks: 

161 current_app.logger.warning( 

162 f'replace a existed argument in {func}') 

163 ks[des] = doc 

164 return func(*args, **ks) 

165 

166 return wrapper 

167 

168 return deco 

169 

170 

171def drop_none(d: Dict): 

172 return {k: v for k, v in d.items() if v is not None} 

173 

174 

175class MinioClient: 

176 

177 def __init__(self): 

178 self.client = Minio( 

179 MINIO_HOST, 

180 access_key=MINIO_ACCESS_KEY, 

181 secret_key=MINIO_SECRET_KEY, 

182 ) 

183 self.bucket = MINIO_BUCKET