Coverage for mongo/user.py: 100%
139 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-11 18:37 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-11 18:37 +0000
1from __future__ import annotations
2from datetime import datetime, timedelta
3from hmac import compare_digest
4from typing import Any, Dict, List, TYPE_CHECKING, Optional
6from . import engine, course
7from .utils import *
8from .base import *
10import hashlib
11import jwt
12import os
13import re
15if TYPE_CHECKING:
16 from .course import Course # pragma: no cover
18__all__ = ['User', 'jwt_decode']
20JWT_EXP = timedelta(days=int(os.environ.get('JWT_EXP', '30')))
21JWT_ISS = os.environ.get('JWT_ISS', 'test.test')
22JWT_SECRET = os.environ.get('JWT_SECRET', 'SuperSecretString')
25class User(MongoBase, engine=engine.User):
27 @classmethod
28 def signup(
29 cls,
30 username: str,
31 password: str,
32 email: str,
33 ):
34 if re.match(r'^[a-zA-Z0-9_\-]+$', username) is None:
35 raise ValueError(f'Invalid username [username={username}]')
36 user_id = hash_id(username, password)
37 email = email.lower().strip()
38 user = cls.engine(
39 user_id=user_id,
40 user_id2=user_id,
41 username=username,
42 email=email,
43 md5=hashlib.md5(email.encode()).hexdigest(),
44 active=False,
45 ).save(force_insert=True)
46 return cls(user).reload()
48 @classmethod
49 def batch_signup(
50 cls,
51 new_users: List[Dict[str, str]],
52 course: Optional['Course'] = None,
53 force: bool = False,
54 ):
55 '''
56 Register multiple students with course
57 '''
58 # Validate
59 keys = {'username', 'password', 'email'}
60 if not all(({*u.keys()} >= keys) for u in new_users):
61 raise ValueError('The input of batch_signup has invalid keys')
62 for u in new_users:
63 if (role := u.get('role')) is not None:
64 try:
65 role = int(role)
66 u['role'] = role
67 except ValueError:
68 username = u['username']
69 raise ValueError(
70 'Got invalid role in batch signup '
71 f'[username={username}, role={role}]', )
72 # Register
73 registered_users = []
74 for u in new_users:
75 try:
76 new_user = cls.signup(
77 username=u['username'],
78 password=u['password'],
79 email=u['email'],
80 )
81 activate_payload = drop_none({
82 'displayedName':
83 u.get('displayedName'),
84 })
85 new_user.activate(activate_payload)
86 if (role := u.get('role')) is not None:
87 new_user.update(role=role)
88 new_user.reload('role')
89 except engine.NotUniqueError:
90 try:
91 new_user = cls.get_by_username(u['username'])
92 except engine.DoesNotExist:
93 new_user = cls.get_by_email(u['email'])
94 if force:
95 new_user.force_update(u, course)
96 registered_users.append(new_user)
97 if course is not None:
98 new_student_nicknames = {
99 **course.student_nicknames,
100 **{
101 u.username: u.username
102 for u in registered_users
103 }
104 }
105 course.update_student_namelist(new_student_nicknames)
106 return new_users
108 def force_update(self, new_user: Dict[str, Any], course: Optional[Course]):
109 '''
110 Force update an existent user in batch update procedure
111 '''
112 if (displayed_name := new_user.get('displayedName')) is not None:
113 self.update(profile__displayed_name=displayed_name)
114 if (role := new_user.get('role')) is not None:
115 self.update(role=role)
116 if (password := new_user.get('password')) is not None:
117 self.change_password(password)
118 if (email := new_user.get('email')) is not None:
119 self.update(email=email,
120 md5=hashlib.md5(email.encode()).hexdigest())
121 if course is not None:
122 self.update(add_to_set__courses=course.id)
123 self.reload()
125 @classmethod
126 def login(cls, username, password, ip_addr):
127 try:
128 user = cls.get_by_username(username)
129 except engine.DoesNotExist:
130 user = cls.get_by_email(username)
131 user_id = hash_id(user.username, password)
132 if (compare_digest(user.user_id, user_id)
133 or compare_digest(user.user_id2, user_id)):
134 engine.LoginRecords(
135 user_id=user.id,
136 ip_addr=ip_addr,
137 success=True,
138 ).save(force_insert=True)
139 return user
140 engine.LoginRecords(
141 user_id=user.id,
142 ip_addr=ip_addr,
143 success=False,
144 ).save(force_insert=True)
145 raise engine.DoesNotExist
147 @classmethod
148 def get_by_username(cls, username):
149 obj = cls.engine.objects.get(username=username)
150 return cls(obj)
152 @classmethod
153 def get_by_email(cls, email):
154 obj = cls.engine.objects.get(email=email.lower())
155 return cls(obj)
157 @property
158 def displayedName(self):
159 return self.profile.displayed_name
161 @property
162 def bio(self):
163 return self.profile.bio
165 @property
166 def cookie(self):
167 keys = (
168 'username',
169 'email',
170 'md5',
171 'active',
172 'role',
173 'profile',
174 'editorConfig',
175 )
176 return self.jwt(*keys)
178 @property
179 def secret(self):
180 keys = (
181 'username',
182 'userId',
183 )
184 return self.jwt(*keys, secret=True)
186 def jwt(self, *keys, secret=False, **kwargs):
187 if not self:
188 return ''
189 data = self.properties(*keys)
190 data.update(kwargs)
191 payload = {
192 'iss': JWT_ISS,
193 'exp': datetime.now() + JWT_EXP,
194 'secret': secret,
195 'data': data
196 }
197 return jwt.encode(payload, JWT_SECRET, algorithm='HS256')
199 def properties(self, *keys) -> Dict[str, Any]:
200 '''
201 Extract proeprties from user and serialize it to a dictionary
202 '''
203 whiltelists = {
204 'username',
205 'userId',
206 'email',
207 'md5',
208 'active',
209 'role',
210 'profile',
211 'editorConfig',
212 'bio',
213 'displayedName',
214 }
215 if any((k not in whiltelists) for k in keys):
216 raise ValueError('Found unallowed key')
217 user = self.to_mongo()
218 user['username'] = user.get('_id')
219 return {k: user.get(k, getattr(self, k, None)) for k in keys}
221 def change_password(self, password):
222 user_id = hash_id(self.username, password)
223 self.update(user_id=user_id, user_id2=user_id)
224 self.reload()
226 def activate(self, profile={}) -> 'User':
227 '''
228 activate a user
230 raises:
231 ValidationError: when user field in db is wrong or data isn't valid
232 engine.DoesNotExist
233 '''
234 # check whether `Public` is exists
235 pub_course = course.Course('Public').obj
236 # update user data
237 self.update(
238 active=True,
239 profile={
240 'displayed_name': profile.get('displayedName'),
241 'bio': profile.get('bio'),
242 },
243 push__courses=pub_course,
244 )
245 # update `Public`
246 pub_course.student_nicknames.update({
247 self.username: self.username,
248 })
249 return self.reload()
251 def add_submission(self, submission: engine.Submission):
252 if submission.score == 100:
253 self.update(
254 add_to_set__AC_problem_ids=submission.problem_id,
255 inc__AC_submission=1,
256 )
257 self.submission += 1
258 self.save()
261def jwt_decode(token):
262 try:
263 json = jwt.decode(
264 token,
265 JWT_SECRET,
266 issuer=JWT_ISS,
267 algorithms='HS256',
268 )
269 except jwt.exceptions.PyJWTError:
270 return None
271 return json