Coverage for mongo/user.py: 100%
139 statements
« prev ^ index » next coverage.py v7.3.2, created at 2024-11-05 04:22 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2024-11-05 04:22 +0000
1from __future__ import annotations
2from datetime import datetime, timedelta
3from hmac import compare_digest
4from typing import Any, Dict, List, TYPE_CHECKING, Optional
6from . import engine, course
7from .utils import *
8from .base import *
10import hashlib
11import jwt
12import os
13import re
15if TYPE_CHECKING:
16 from .course import Course # pragma: no cover
18__all__ = ['User', 'jwt_decode']
20JWT_EXP = timedelta(days=int(os.environ.get('JWT_EXP', '30')))
21JWT_ISS = os.environ.get('JWT_ISS', 'test.test')
22JWT_SECRET = os.environ.get('JWT_SECRET', 'SuperSecretString')
25class User(MongoBase, engine=engine.User):
27 @classmethod
28 def signup(
29 cls,
30 username: str,
31 password: str,
32 email: str,
33 ):
34 if re.match(r'^[a-zA-Z0-9_\-]+$', username) is None:
35 raise ValueError(f'Invalid username [username={username}]')
36 user_id = hash_id(username, password)
37 email = email.lower().strip()
38 user = cls.engine(
39 user_id=user_id,
40 user_id2=user_id,
41 username=username,
42 email=email,
43 md5=hashlib.md5(email.encode()).hexdigest(),
44 active=False,
45 ).save(force_insert=True)
46 return cls(user).reload()
48 @classmethod
49 def batch_signup(
50 cls,
51 new_users: List[Dict[str, str]],
52 course: Optional['Course'] = None,
53 force: bool = False,
54 ):
55 '''
56 Register multiple students with course
57 '''
58 # Validate
59 keys = {'username', 'password', 'email'}
60 if not all(({*u.keys()} >= keys) for u in new_users):
61 raise ValueError('The input of batch_signup has invalid keys')
62 for u in new_users:
63 if (role := u.get('role')) is not None:
64 try:
65 role = int(role)
66 u['role'] = role
67 except ValueError:
68 username = u['username']
69 raise ValueError(
70 'Got invalid role in batch signup '
71 f'[username={username}, role={role}]', )
72 # Register
73 registered_users = []
74 for u in new_users:
75 try:
76 new_user = cls.signup(
77 username=u['username'],
78 password=u['password'],
79 email=u['email'],
80 )
81 activate_payload = drop_none({
82 'displayedName':
83 u.get('displayedName'),
84 })
85 new_user.activate(activate_payload)
86 if (role := u.get('role')) is not None:
87 new_user.update(role=role)
88 new_user.reload('role')
89 except engine.NotUniqueError:
90 try:
91 new_user = cls.get_by_username(u['username'])
92 except engine.DoesNotExist:
93 new_user = cls.get_by_email(u['email'])
94 if force:
95 new_user.force_update(u, course)
96 registered_users.append(new_user)
97 if course is not None:
98 new_student_nicknames = {
99 **course.student_nicknames,
100 **{u.username: u.username
101 for u in registered_users}
102 }
103 course.update_student_namelist(new_student_nicknames)
104 return new_users
106 def force_update(self, new_user: Dict[str, Any], course: Optional[Course]):
107 '''
108 Force update an existent user in batch update procedure
109 '''
110 if (displayed_name := new_user.get('displayedName')) is not None:
111 self.update(profile__displayed_name=displayed_name)
112 if (role := new_user.get('role')) is not None:
113 self.update(role=role)
114 if (password := new_user.get('password')) is not None:
115 self.change_password(password)
116 if (email := new_user.get('email')) is not None:
117 self.update(email=email,
118 md5=hashlib.md5(email.encode()).hexdigest())
119 if course is not None:
120 self.update(add_to_set__courses=course.id)
121 self.reload()
123 @classmethod
124 def login(cls, username, password, ip_addr):
125 try:
126 user = cls.get_by_username(username)
127 except engine.DoesNotExist:
128 user = cls.get_by_email(username)
129 user_id = hash_id(user.username, password)
130 if (compare_digest(user.user_id, user_id)
131 or compare_digest(user.user_id2, user_id)):
132 engine.LoginRecords(
133 user_id=user.id,
134 ip_addr=ip_addr,
135 success=True,
136 ).save(force_insert=True)
137 return user
138 engine.LoginRecords(
139 user_id=user.id,
140 ip_addr=ip_addr,
141 success=False,
142 ).save(force_insert=True)
143 raise engine.DoesNotExist
145 @classmethod
146 def get_by_username(cls, username):
147 obj = cls.engine.objects.get(username=username)
148 return cls(obj)
150 @classmethod
151 def get_by_email(cls, email):
152 obj = cls.engine.objects.get(email=email.lower())
153 return cls(obj)
155 @property
156 def displayedName(self):
157 return self.profile.displayed_name
159 @property
160 def bio(self):
161 return self.profile.bio
163 @property
164 def cookie(self):
165 keys = (
166 'username',
167 'email',
168 'md5',
169 'active',
170 'role',
171 'profile',
172 'editorConfig',
173 )
174 return self.jwt(*keys)
176 @property
177 def secret(self):
178 keys = (
179 'username',
180 'userId',
181 )
182 return self.jwt(*keys, secret=True)
184 def jwt(self, *keys, secret=False, **kwargs):
185 if not self:
186 return ''
187 data = self.properties(*keys)
188 data.update(kwargs)
189 payload = {
190 'iss': JWT_ISS,
191 'exp': datetime.now() + JWT_EXP,
192 'secret': secret,
193 'data': data
194 }
195 return jwt.encode(payload, JWT_SECRET, algorithm='HS256')
197 def properties(self, *keys) -> Dict[str, Any]:
198 '''
199 Extract proeprties from user and serialize it to a dictionary
200 '''
201 whiltelists = {
202 'username',
203 'userId',
204 'email',
205 'md5',
206 'active',
207 'role',
208 'profile',
209 'editorConfig',
210 'bio',
211 'displayedName',
212 }
213 if any((k not in whiltelists) for k in keys):
214 raise ValueError('Found unallowed key')
215 user = self.to_mongo()
216 user['username'] = user.get('_id')
217 return {k: user.get(k, getattr(self, k, None)) for k in keys}
219 def change_password(self, password):
220 user_id = hash_id(self.username, password)
221 self.update(user_id=user_id, user_id2=user_id)
222 self.reload()
224 def activate(self, profile={}) -> 'User':
225 '''
226 activate a user
228 raises:
229 ValidationError: when user field in db is wrong or data isn't valid
230 engine.DoesNotExist
231 '''
232 # check whether `Public` is exists
233 pub_course = course.Course('Public').obj
234 # update user data
235 self.update(
236 active=True,
237 profile={
238 'displayed_name': profile.get('displayedName'),
239 'bio': profile.get('bio'),
240 },
241 push__courses=pub_course,
242 )
243 # update `Public`
244 pub_course.student_nicknames.update({
245 self.username: self.username,
246 })
247 return self.reload()
249 def add_submission(self, submission: engine.Submission):
250 if submission.score == 100:
251 self.update(
252 add_to_set__AC_problem_ids=submission.problem_id,
253 inc__AC_submission=1,
254 )
255 self.submission += 1
256 self.save()
259def jwt_decode(token):
260 try:
261 json = jwt.decode(
262 token,
263 JWT_SECRET,
264 issuer=JWT_ISS,
265 algorithms='HS256',
266 )
267 except jwt.exceptions.PyJWTError:
268 return None
269 return json