Coverage for mongo/utils.py: 100%

81 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2024-11-05 04:22 +0000

1import abc 

2import hashlib 

3import os 

4from flask import current_app 

5import redis 

6from functools import wraps 

7from typing import Dict, Optional, Any, TYPE_CHECKING 

8from . import engine 

9 

10if TYPE_CHECKING: 

11 from .user import User # pragma: no cover 

12 from .problem import Problem # pragma: no cover 

13 

14__all__ = ( 

15 'hash_id', 

16 'perm', 

17 'RedisCache', 

18 'doc_required', 

19 'drop_none', 

20) 

21 

22 

23def hash_id(salt, text): 

24 text = ((salt or '') + (text or '')).encode() 

25 sha = hashlib.sha3_512(text) 

26 return sha.hexdigest()[:24] 

27 

28 

29def perm(course, user): 

30 '''4: admin, 3: teacher, 2: TA, 1: student, 0: not found 

31 ''' 

32 return 4 - [ 

33 user.role == 0, user == course.teacher, user in course.tas, 

34 user.username in course.student_nicknames.keys(), True 

35 ].index(True) 

36 

37 

38class Cache(abc.ABC): 

39 

40 @abc.abstractmethod 

41 def exists(self, key: str) -> bool: 

42 ''' 

43 check whether a value exists 

44 ''' 

45 raise NotImplementedError # pragma: no cover 

46 

47 @abc.abstractmethod 

48 def get(self, key: str): 

49 ''' 

50 get value by key 

51 ''' 

52 raise NotImplementedError # pragma: no cover 

53 

54 @abc.abstractmethod 

55 def set(self, key: str, value, ex: Optional[int] = None): 

56 ''' 

57 set a value and set expire time in seconds 

58 ''' 

59 raise NotImplementedError # pragma: no cover 

60 

61 @abc.abstractmethod 

62 def delete(self, key: str): 

63 ''' 

64 delete a value by key 

65 ''' 

66 raise NotImplementedError # pragma: no cover 

67 

68 

69class RedisCache(Cache): 

70 POOL = None 

71 

72 def __new__(cls) -> Any: 

73 if cls.POOL is None: 

74 cls.HOST = os.getenv('REDIS_HOST') 

75 cls.PORT = os.getenv('REDIS_PORT') 

76 cls.POOL = redis.ConnectionPool( 

77 host=cls.HOST, 

78 port=cls.PORT, 

79 db=0, 

80 ) 

81 

82 return super().__new__(cls) 

83 

84 def __init__(self) -> None: 

85 self._client = None 

86 

87 @property 

88 def client(self): 

89 if self._client is None: 

90 if self.PORT is None: 

91 import fakeredis 

92 self._client = fakeredis.FakeStrictRedis() 

93 else: 

94 self._client = redis.Redis(connection_pool=self.POOL) 

95 return self._client 

96 

97 def exists(self, key: str) -> bool: 

98 return self.client.exists(key) 

99 

100 def get(self, key: str): 

101 return self.client.get(key) 

102 

103 def delete(self, key: str): 

104 return self.client.delete(key) 

105 

106 def set(self, key: str, value, ex: Optional[int] = None): 

107 return self.client.set(key, value, ex=ex) 

108 

109 

110def doc_required( 

111 src, 

112 des, 

113 cls=None, 

114 src_none_allowed=False, 

115): 

116 ''' 

117 query db to inject document into functions. 

118 if the document does not exist in db, raise `engine.DoesNotExist`. 

119 if `src` not in parameters, this funtcion will raise `TypeError` 

120 `doc_required` will check the existence of `des` in `func` parameters, 

121 if `des` is exist, this function will override it, so `src == des` 

122 are acceptable 

123 ''' 

124 # user the same name for `src` and `des` 

125 # e.g. `doc_required('user', User)` will replace parameter `user` 

126 if cls is None: 

127 cls = des 

128 des = src 

129 

130 def deco(func): 

131 

132 @wraps(func) 

133 def wrapper(*args, **ks): 

134 # try get source param 

135 if src not in ks: 

136 raise TypeError(f'{src} not found in function argument') 

137 src_param = ks.get(src) 

138 # convert it to document 

139 # TODO: add type checking, whether the cls is a subclass of `MongoBase` 

140 # or maybe it is not need 

141 if type(cls) != type: 

142 raise TypeError('cls must be a type') 

143 # process `None` 

144 if src_param is None: 

145 if not src_none_allowed: 

146 raise ValueError('src can not be None') 

147 doc = None 

148 elif not isinstance(src_param, cls): 

149 doc = cls(src_param) 

150 # or, it is already target class instance 

151 else: 

152 doc = src_param 

153 # not None and non-existent 

154 if doc is not None and not doc: 

155 raise engine.DoesNotExist(f'{doc} not found!') 

156 # replace original paramters 

157 del ks[src] 

158 if des in ks: 

159 current_app.logger.warning( 

160 f'replace a existed argument in {func}') 

161 ks[des] = doc 

162 return func(*args, **ks) 

163 

164 return wrapper 

165 

166 return deco 

167 

168 

169def drop_none(d: Dict): 

170 return {k: v for k, v in d.items() if v is not None}