Coverage for mongo/user.py: 100%

139 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-11 18:37 +0000

1from __future__ import annotations 

2from datetime import datetime, timedelta 

3from hmac import compare_digest 

4from typing import Any, Dict, List, TYPE_CHECKING, Optional 

5 

6from . import engine, course 

7from .utils import * 

8from .base import * 

9 

10import hashlib 

11import jwt 

12import os 

13import re 

14 

15if TYPE_CHECKING: 

16 from .course import Course # pragma: no cover 

17 

18__all__ = ['User', 'jwt_decode'] 

19 

20JWT_EXP = timedelta(days=int(os.environ.get('JWT_EXP', '30'))) 

21JWT_ISS = os.environ.get('JWT_ISS', 'test.test') 

22JWT_SECRET = os.environ.get('JWT_SECRET', 'SuperSecretString') 

23 

24 

25class User(MongoBase, engine=engine.User): 

26 

27 @classmethod 

28 def signup( 

29 cls, 

30 username: str, 

31 password: str, 

32 email: str, 

33 ): 

34 if re.match(r'^[a-zA-Z0-9_\-]+$', username) is None: 

35 raise ValueError(f'Invalid username [username={username}]') 

36 user_id = hash_id(username, password) 

37 email = email.lower().strip() 

38 user = cls.engine( 

39 user_id=user_id, 

40 user_id2=user_id, 

41 username=username, 

42 email=email, 

43 md5=hashlib.md5(email.encode()).hexdigest(), 

44 active=False, 

45 ).save(force_insert=True) 

46 return cls(user).reload() 

47 

48 @classmethod 

49 def batch_signup( 

50 cls, 

51 new_users: List[Dict[str, str]], 

52 course: Optional['Course'] = None, 

53 force: bool = False, 

54 ): 

55 ''' 

56 Register multiple students with course 

57 ''' 

58 # Validate 

59 keys = {'username', 'password', 'email'} 

60 if not all(({*u.keys()} >= keys) for u in new_users): 

61 raise ValueError('The input of batch_signup has invalid keys') 

62 for u in new_users: 

63 if (role := u.get('role')) is not None: 

64 try: 

65 role = int(role) 

66 u['role'] = role 

67 except ValueError: 

68 username = u['username'] 

69 raise ValueError( 

70 'Got invalid role in batch signup ' 

71 f'[username={username}, role={role}]', ) 

72 # Register 

73 registered_users = [] 

74 for u in new_users: 

75 try: 

76 new_user = cls.signup( 

77 username=u['username'], 

78 password=u['password'], 

79 email=u['email'], 

80 ) 

81 activate_payload = drop_none({ 

82 'displayedName': 

83 u.get('displayedName'), 

84 }) 

85 new_user.activate(activate_payload) 

86 if (role := u.get('role')) is not None: 

87 new_user.update(role=role) 

88 new_user.reload('role') 

89 except engine.NotUniqueError: 

90 try: 

91 new_user = cls.get_by_username(u['username']) 

92 except engine.DoesNotExist: 

93 new_user = cls.get_by_email(u['email']) 

94 if force: 

95 new_user.force_update(u, course) 

96 registered_users.append(new_user) 

97 if course is not None: 

98 new_student_nicknames = { 

99 **course.student_nicknames, 

100 **{ 

101 u.username: u.username 

102 for u in registered_users 

103 } 

104 } 

105 course.update_student_namelist(new_student_nicknames) 

106 return new_users 

107 

108 def force_update(self, new_user: Dict[str, Any], course: Optional[Course]): 

109 ''' 

110 Force update an existent user in batch update procedure 

111 ''' 

112 if (displayed_name := new_user.get('displayedName')) is not None: 

113 self.update(profile__displayed_name=displayed_name) 

114 if (role := new_user.get('role')) is not None: 

115 self.update(role=role) 

116 if (password := new_user.get('password')) is not None: 

117 self.change_password(password) 

118 if (email := new_user.get('email')) is not None: 

119 self.update(email=email, 

120 md5=hashlib.md5(email.encode()).hexdigest()) 

121 if course is not None: 

122 self.update(add_to_set__courses=course.id) 

123 self.reload() 

124 

125 @classmethod 

126 def login(cls, username, password, ip_addr): 

127 try: 

128 user = cls.get_by_username(username) 

129 except engine.DoesNotExist: 

130 user = cls.get_by_email(username) 

131 user_id = hash_id(user.username, password) 

132 if (compare_digest(user.user_id, user_id) 

133 or compare_digest(user.user_id2, user_id)): 

134 engine.LoginRecords( 

135 user_id=user.id, 

136 ip_addr=ip_addr, 

137 success=True, 

138 ).save(force_insert=True) 

139 return user 

140 engine.LoginRecords( 

141 user_id=user.id, 

142 ip_addr=ip_addr, 

143 success=False, 

144 ).save(force_insert=True) 

145 raise engine.DoesNotExist 

146 

147 @classmethod 

148 def get_by_username(cls, username): 

149 obj = cls.engine.objects.get(username=username) 

150 return cls(obj) 

151 

152 @classmethod 

153 def get_by_email(cls, email): 

154 obj = cls.engine.objects.get(email=email.lower()) 

155 return cls(obj) 

156 

157 @property 

158 def displayedName(self): 

159 return self.profile.displayed_name 

160 

161 @property 

162 def bio(self): 

163 return self.profile.bio 

164 

165 @property 

166 def cookie(self): 

167 keys = ( 

168 'username', 

169 'email', 

170 'md5', 

171 'active', 

172 'role', 

173 'profile', 

174 'editorConfig', 

175 ) 

176 return self.jwt(*keys) 

177 

178 @property 

179 def secret(self): 

180 keys = ( 

181 'username', 

182 'userId', 

183 ) 

184 return self.jwt(*keys, secret=True) 

185 

186 def jwt(self, *keys, secret=False, **kwargs): 

187 if not self: 

188 return '' 

189 data = self.properties(*keys) 

190 data.update(kwargs) 

191 payload = { 

192 'iss': JWT_ISS, 

193 'exp': datetime.now() + JWT_EXP, 

194 'secret': secret, 

195 'data': data 

196 } 

197 return jwt.encode(payload, JWT_SECRET, algorithm='HS256') 

198 

199 def properties(self, *keys) -> Dict[str, Any]: 

200 ''' 

201 Extract proeprties from user and serialize it to a dictionary 

202 ''' 

203 whiltelists = { 

204 'username', 

205 'userId', 

206 'email', 

207 'md5', 

208 'active', 

209 'role', 

210 'profile', 

211 'editorConfig', 

212 'bio', 

213 'displayedName', 

214 } 

215 if any((k not in whiltelists) for k in keys): 

216 raise ValueError('Found unallowed key') 

217 user = self.to_mongo() 

218 user['username'] = user.get('_id') 

219 return {k: user.get(k, getattr(self, k, None)) for k in keys} 

220 

221 def change_password(self, password): 

222 user_id = hash_id(self.username, password) 

223 self.update(user_id=user_id, user_id2=user_id) 

224 self.reload() 

225 

226 def activate(self, profile={}) -> 'User': 

227 ''' 

228 activate a user 

229 

230 raises: 

231 ValidationError: when user field in db is wrong or data isn't valid 

232 engine.DoesNotExist 

233 ''' 

234 # check whether `Public` is exists 

235 pub_course = course.Course('Public').obj 

236 # update user data 

237 self.update( 

238 active=True, 

239 profile={ 

240 'displayed_name': profile.get('displayedName'), 

241 'bio': profile.get('bio'), 

242 }, 

243 push__courses=pub_course, 

244 ) 

245 # update `Public` 

246 pub_course.student_nicknames.update({ 

247 self.username: self.username, 

248 }) 

249 return self.reload() 

250 

251 def add_submission(self, submission: engine.Submission): 

252 if submission.score == 100: 

253 self.update( 

254 add_to_set__AC_problem_ids=submission.problem_id, 

255 inc__AC_submission=1, 

256 ) 

257 self.submission += 1 

258 self.save() 

259 

260 

261def jwt_decode(token): 

262 try: 

263 json = jwt.decode( 

264 token, 

265 JWT_SECRET, 

266 issuer=JWT_ISS, 

267 algorithms='HS256', 

268 ) 

269 except jwt.exceptions.PyJWTError: 

270 return None 

271 return json