Coverage for mongo/user.py: 100%

139 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2024-11-05 04:22 +0000

1from __future__ import annotations 

2from datetime import datetime, timedelta 

3from hmac import compare_digest 

4from typing import Any, Dict, List, TYPE_CHECKING, Optional 

5 

6from . import engine, course 

7from .utils import * 

8from .base import * 

9 

10import hashlib 

11import jwt 

12import os 

13import re 

14 

15if TYPE_CHECKING: 

16 from .course import Course # pragma: no cover 

17 

18__all__ = ['User', 'jwt_decode'] 

19 

20JWT_EXP = timedelta(days=int(os.environ.get('JWT_EXP', '30'))) 

21JWT_ISS = os.environ.get('JWT_ISS', 'test.test') 

22JWT_SECRET = os.environ.get('JWT_SECRET', 'SuperSecretString') 

23 

24 

25class User(MongoBase, engine=engine.User): 

26 

27 @classmethod 

28 def signup( 

29 cls, 

30 username: str, 

31 password: str, 

32 email: str, 

33 ): 

34 if re.match(r'^[a-zA-Z0-9_\-]+$', username) is None: 

35 raise ValueError(f'Invalid username [username={username}]') 

36 user_id = hash_id(username, password) 

37 email = email.lower().strip() 

38 user = cls.engine( 

39 user_id=user_id, 

40 user_id2=user_id, 

41 username=username, 

42 email=email, 

43 md5=hashlib.md5(email.encode()).hexdigest(), 

44 active=False, 

45 ).save(force_insert=True) 

46 return cls(user).reload() 

47 

48 @classmethod 

49 def batch_signup( 

50 cls, 

51 new_users: List[Dict[str, str]], 

52 course: Optional['Course'] = None, 

53 force: bool = False, 

54 ): 

55 ''' 

56 Register multiple students with course 

57 ''' 

58 # Validate 

59 keys = {'username', 'password', 'email'} 

60 if not all(({*u.keys()} >= keys) for u in new_users): 

61 raise ValueError('The input of batch_signup has invalid keys') 

62 for u in new_users: 

63 if (role := u.get('role')) is not None: 

64 try: 

65 role = int(role) 

66 u['role'] = role 

67 except ValueError: 

68 username = u['username'] 

69 raise ValueError( 

70 'Got invalid role in batch signup ' 

71 f'[username={username}, role={role}]', ) 

72 # Register 

73 registered_users = [] 

74 for u in new_users: 

75 try: 

76 new_user = cls.signup( 

77 username=u['username'], 

78 password=u['password'], 

79 email=u['email'], 

80 ) 

81 activate_payload = drop_none({ 

82 'displayedName': 

83 u.get('displayedName'), 

84 }) 

85 new_user.activate(activate_payload) 

86 if (role := u.get('role')) is not None: 

87 new_user.update(role=role) 

88 new_user.reload('role') 

89 except engine.NotUniqueError: 

90 try: 

91 new_user = cls.get_by_username(u['username']) 

92 except engine.DoesNotExist: 

93 new_user = cls.get_by_email(u['email']) 

94 if force: 

95 new_user.force_update(u, course) 

96 registered_users.append(new_user) 

97 if course is not None: 

98 new_student_nicknames = { 

99 **course.student_nicknames, 

100 **{u.username: u.username 

101 for u in registered_users} 

102 } 

103 course.update_student_namelist(new_student_nicknames) 

104 return new_users 

105 

106 def force_update(self, new_user: Dict[str, Any], course: Optional[Course]): 

107 ''' 

108 Force update an existent user in batch update procedure 

109 ''' 

110 if (displayed_name := new_user.get('displayedName')) is not None: 

111 self.update(profile__displayed_name=displayed_name) 

112 if (role := new_user.get('role')) is not None: 

113 self.update(role=role) 

114 if (password := new_user.get('password')) is not None: 

115 self.change_password(password) 

116 if (email := new_user.get('email')) is not None: 

117 self.update(email=email, 

118 md5=hashlib.md5(email.encode()).hexdigest()) 

119 if course is not None: 

120 self.update(add_to_set__courses=course.id) 

121 self.reload() 

122 

123 @classmethod 

124 def login(cls, username, password, ip_addr): 

125 try: 

126 user = cls.get_by_username(username) 

127 except engine.DoesNotExist: 

128 user = cls.get_by_email(username) 

129 user_id = hash_id(user.username, password) 

130 if (compare_digest(user.user_id, user_id) 

131 or compare_digest(user.user_id2, user_id)): 

132 engine.LoginRecords( 

133 user_id=user.id, 

134 ip_addr=ip_addr, 

135 success=True, 

136 ).save(force_insert=True) 

137 return user 

138 engine.LoginRecords( 

139 user_id=user.id, 

140 ip_addr=ip_addr, 

141 success=False, 

142 ).save(force_insert=True) 

143 raise engine.DoesNotExist 

144 

145 @classmethod 

146 def get_by_username(cls, username): 

147 obj = cls.engine.objects.get(username=username) 

148 return cls(obj) 

149 

150 @classmethod 

151 def get_by_email(cls, email): 

152 obj = cls.engine.objects.get(email=email.lower()) 

153 return cls(obj) 

154 

155 @property 

156 def displayedName(self): 

157 return self.profile.displayed_name 

158 

159 @property 

160 def bio(self): 

161 return self.profile.bio 

162 

163 @property 

164 def cookie(self): 

165 keys = ( 

166 'username', 

167 'email', 

168 'md5', 

169 'active', 

170 'role', 

171 'profile', 

172 'editorConfig', 

173 ) 

174 return self.jwt(*keys) 

175 

176 @property 

177 def secret(self): 

178 keys = ( 

179 'username', 

180 'userId', 

181 ) 

182 return self.jwt(*keys, secret=True) 

183 

184 def jwt(self, *keys, secret=False, **kwargs): 

185 if not self: 

186 return '' 

187 data = self.properties(*keys) 

188 data.update(kwargs) 

189 payload = { 

190 'iss': JWT_ISS, 

191 'exp': datetime.now() + JWT_EXP, 

192 'secret': secret, 

193 'data': data 

194 } 

195 return jwt.encode(payload, JWT_SECRET, algorithm='HS256') 

196 

197 def properties(self, *keys) -> Dict[str, Any]: 

198 ''' 

199 Extract proeprties from user and serialize it to a dictionary 

200 ''' 

201 whiltelists = { 

202 'username', 

203 'userId', 

204 'email', 

205 'md5', 

206 'active', 

207 'role', 

208 'profile', 

209 'editorConfig', 

210 'bio', 

211 'displayedName', 

212 } 

213 if any((k not in whiltelists) for k in keys): 

214 raise ValueError('Found unallowed key') 

215 user = self.to_mongo() 

216 user['username'] = user.get('_id') 

217 return {k: user.get(k, getattr(self, k, None)) for k in keys} 

218 

219 def change_password(self, password): 

220 user_id = hash_id(self.username, password) 

221 self.update(user_id=user_id, user_id2=user_id) 

222 self.reload() 

223 

224 def activate(self, profile={}) -> 'User': 

225 ''' 

226 activate a user 

227 

228 raises: 

229 ValidationError: when user field in db is wrong or data isn't valid 

230 engine.DoesNotExist 

231 ''' 

232 # check whether `Public` is exists 

233 pub_course = course.Course('Public').obj 

234 # update user data 

235 self.update( 

236 active=True, 

237 profile={ 

238 'displayed_name': profile.get('displayedName'), 

239 'bio': profile.get('bio'), 

240 }, 

241 push__courses=pub_course, 

242 ) 

243 # update `Public` 

244 pub_course.student_nicknames.update({ 

245 self.username: self.username, 

246 }) 

247 return self.reload() 

248 

249 def add_submission(self, submission: engine.Submission): 

250 if submission.score == 100: 

251 self.update( 

252 add_to_set__AC_problem_ids=submission.problem_id, 

253 inc__AC_submission=1, 

254 ) 

255 self.submission += 1 

256 self.save() 

257 

258 

259def jwt_decode(token): 

260 try: 

261 json = jwt.decode( 

262 token, 

263 JWT_SECRET, 

264 issuer=JWT_ISS, 

265 algorithms='HS256', 

266 ) 

267 except jwt.exceptions.PyJWTError: 

268 return None 

269 return json