Coverage for model/auth.py: 100%
206 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-11 18:37 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-11 18:37 +0000
1# Standard library
2from functools import wraps
3from random import SystemRandom
4from typing import Optional
5import csv
6import io
7# Related third party imports
8from flask import Blueprint, request, current_app, url_for
9# Local application
10from mongo import *
11from mongo import engine
12from mongo.utils import hash_id
13from .utils import *
15import string
17__all__ = (
18 'auth_api',
19 'login_required',
20 'identity_verify',
21 'get_verify_link',
22)
24auth_api = Blueprint('auth_api', __name__)
26VERIFY_TEXT = '''\
27Welcome! you've signed up successfully!
28Enter Normal OJ to active your account via this link:
29{url}
30'''
32VERIFY_HTML = '''\
33<!DOCTYPE html><html lang="en"><head><title>template</title><meta http-equiv="Content-Type" content="text/html; charset=utf-8"><meta http-equiv="X-UA-Compatible" content="IE=edge"><meta name="viewport" content="width=device-width, initial-scale=1.0 "><meta name="format-detection" content="telephone=no"><link href="https://fonts.googleapis.com/css?family=Lato:300,400,600,700,800" rel="stylesheet"><style>.em_body {{margin: 0px;padding: 0px;background-color: #efefef;}}.em_full_wrap {{vertical-align: top;width: 100%;border-spacing: 0px;border-collapse: separate;border: 0px;background-color: #efefef;margin-left: auto; margin-right: auto;}}.em_main_table {{width: 700px;border-spacing: 0px;border-collapse: separate;align-self: center;margin-left:auto; margin-right:auto;}}.em_full_wrap td, .em_main_table td {{padding: 0px;vertical-align: top;text-align: center;}}</style></head><body class="em_body"><table class="em_full_wrap"><tbody><tr><td><table class="em_main_table"><tr><td style="padding:35px 70px 30px; background-color: #003865"><table style="width: 100%; border-spacing: 0px; border-collapse: separate; border: 0px; margin-left: auto; margin-right: auto;"><tbody><tr><td style="font-family:'Lato', Arial, sans-serif; font-size:16px; line-height:30px; color:#fff; vertical-align: top; text-align: center;">Normal Online Judge Email Verification</td></tr><tr><td><hr></td></tr><tr><td style="font-family:'Lato', Arial, sans-serif; font-size:20px; line-height:22px; color:#fff; padding:12px; vertical-align: top; text-align: center;">Welcome! you've signed up successfully!<br><br>Enter Normal OJ to active your account via this link.</td></tr><tr><td class="em_h20" style="font-size:0px; line-height:0px; height:25px;"> </td></tr><tr><td style="vertical-align: top; text-align: center;"><form target="_blank" action="{url}"><button type="submit" style="background:#A6DAEF; border-color: #fff; border-radius: 5px; font-family:'Lato', Arial, sans-serif; font-size:16px; line-height:22px; box-shadow: 0 8px 16px 0 rgba(0,0,0,0.2), 0 6px 20px 0 rgba(0,0,0,0.19); cursor: pointer;">Active Account</button></form></td></tr></tbody></table></td></tr><tr><td style="padding:18px 30px; background-color: #f6f7f8"><table style="width: 100%; border-spacing: 0px; border-collapse: separate; border: 0px; margin-left: auto; margin-right: auto;"><tbody><tr><td style="font-family:'Lato', Arial, sans-serif; font-size:11px; line-height:18px; color:#999999; vertical-align: top; text-align: center;">© 2020 Normal Online Judge. All Rights Reserved.</td></tr></tbody></table></td></tr></table></td></tr></tbody></table></body></html>
34'''
37def login_required(func):
38 '''Check if the user is login
40 Returns:
41 - A wrapped function
42 - 403 Not Logged In
43 - 403 Invalid Token
44 - 403 Inactive User
45 '''
47 @wraps(func)
48 @Request.cookies(vars_dict={'token': 'piann'})
49 def wrapper(token, *args, **kwargs):
50 if token is None:
51 return HTTPError('Not Logged In', 403)
52 json = jwt_decode(token)
53 if json is None or not json.get('secret'):
54 return HTTPError('Invalid Token', 403)
55 user = User(json['data']['username'])
56 if json['data'].get('userId') != user.user_id:
57 return HTTPError('Authorization Expired', 403)
58 if not user.active:
59 return HTTPError('Inactive User', 403)
60 kwargs['user'] = user
61 return func(*args, **kwargs)
63 return wrapper
66def identity_verify(*roles):
67 '''Verify a logged in user's identity
69 You can find an example in `model/test.py`
70 '''
72 def verify(func):
74 @wraps(func)
75 @login_required
76 def wrapper(user, *args, **kwargs):
77 if user.role not in roles:
78 return HTTPError('Insufficient Permissions', 403)
79 kwargs['user'] = user
80 return func(*args, **kwargs)
82 return wrapper
84 return verify
87def get_verify_link(user: User) -> str:
88 return url_for(
89 'auth_api.active',
90 _external=True,
91 token=user.cookie,
92 )
95@auth_api.route('/session', methods=['GET', 'POST'])
96def session():
97 '''Create a session or remove a session.
98 Request methods:
99 GET: Logout
100 POST: Login
101 '''
103 def logout():
104 '''Logout a user.
105 Returns:
106 - 200 Logout Success
107 '''
108 cookies = {'jwt': None, 'piann': None}
109 return HTTPResponse('Goodbye', cookies=cookies)
111 @Request.json('username: str', 'password: str')
112 def login(username, password):
113 '''Login a user.
114 Returns:
115 - 400 Incomplete Data
116 - 403 Login Failed
117 '''
118 ip_addr = request.headers.get('cf-connecting-ip', request.remote_addr)
119 try:
120 user = User.login(username, password, ip_addr)
121 except DoesNotExist:
122 return HTTPError('Login Failed', 403)
123 if not user.active:
124 return HTTPError('Invalid User', 403)
125 cookies = {'piann_httponly': user.secret, 'jwt': user.cookie}
126 return HTTPResponse('Login Success', cookies=cookies)
128 methods = {'GET': logout, 'POST': login}
130 return methods[request.method]()
133@auth_api.route('/signup', methods=['POST'])
134@Request.json('username: str', 'password: str', 'email: str')
135def signup(username, password, email):
136 try:
137 user = User.signup(username, password, email)
138 except ValidationError as ve:
139 return HTTPError('Signup Failed', 400, data=ve.to_dict())
140 except NotUniqueError:
141 return HTTPError('User Exists', 400)
142 except ValueError:
143 return HTTPError('Not Allowed Name', 400)
144 verify_link = get_verify_link(user)
145 text = VERIFY_TEXT.format(url=verify_link)
146 html = VERIFY_HTML.format(url=verify_link)
147 send_noreply([email], '[N-OJ] Varify Your Email', text, html)
148 return HTTPResponse('Signup Success')
151@auth_api.route('/change-password', methods=['POST'])
152@login_required
153@Request.json('old_password: str', 'new_password: str')
154def change_password(user, old_password, new_password):
155 ip_addr = request.headers.get('cf-connecting-ip', request.remote_addr)
156 try:
157 User.login(user.username, old_password, ip_addr)
158 except DoesNotExist:
159 return HTTPError('Wrong Password', 403)
160 user.change_password(new_password)
161 cookies = {'piann_httponly': user.secret}
162 return HTTPResponse('Password Has Been Changed', cookies=cookies)
165@auth_api.route('/check/<item>', methods=['POST'])
166def check(item):
167 '''Checking when the user is registing.
168 '''
170 @Request.json('username: str')
171 def check_username(username):
172 try:
173 User.get_by_username(username)
174 except DoesNotExist:
175 return HTTPResponse('Username Can Be Used', data={'valid': 1})
176 return HTTPResponse('User Exists', data={'valid': 0})
178 @Request.json('email: str')
179 def check_email(email):
180 try:
181 User.get_by_email(email)
182 except DoesNotExist:
183 return HTTPResponse('Email Can Be Used', data={'valid': 1})
184 return HTTPResponse('Email Has Been Used', data={'valid': 0})
186 method = {'username': check_username, 'email': check_email}.get(item)
187 return method() if method else HTTPError('Ivalid Checking Type', 400)
190@auth_api.route('/resend-email', methods=['POST'])
191@Request.json('email: str')
192def resend_email(email):
193 try:
194 user = User.get_by_email(email)
195 except DoesNotExist:
196 return HTTPError('User Not Exists', 400)
197 if user.active:
198 return HTTPError('User Has Been Actived', 400)
199 verify_link = get_verify_link(user)
200 send_noreply([email], '[N-OJ] Varify Your Email', verify_link)
201 return HTTPResponse('Email Has Been Resent')
204@auth_api.route('/active', methods=['POST'])
205@auth_api.route('/active/<token>', methods=['GET'])
206def active(token=None):
207 '''Activate a user.
208 '''
210 @Request.json('profile: dict', 'agreement: bool')
211 @Request.cookies(vars_dict={'token': 'piann'})
212 def update(profile, agreement, token):
213 '''User: active: false -> true
214 '''
215 if agreement is not True:
216 return HTTPError('Not Confirm the Agreement', 403)
217 json = jwt_decode(token)
218 if json is None or not json.get('secret'):
219 return HTTPError('Invalid Token.', 403)
220 user = User(json['data']['username'])
221 if not user:
222 return HTTPError('User Not Exists', 400)
223 if user.active:
224 return HTTPError('User Has Been Actived', 400)
225 try:
226 user.activate(profile)
227 except engine.DoesNotExist as e:
228 return HTTPError(str(e), 404)
229 cookies = {'jwt': user.cookie}
230 return HTTPResponse('User Is Now Active', cookies=cookies)
232 def redir():
233 '''Redirect user to active page.
234 '''
235 json = jwt_decode(token)
236 if json is None:
237 return HTTPError('Invalid Token', 403)
238 user = User(json['data']['username'])
239 cookies = {'piann_httponly': user.secret, 'jwt': user.cookie}
240 return HTTPRedirect('/email_verify', cookies=cookies)
242 methods = {'GET': redir, 'POST': update}
243 return methods[request.method]()
246@auth_api.route('/password-recovery', methods=['POST'])
247@Request.json('email: str')
248def password_recovery(email):
249 try:
250 user = User.get_by_email(email)
251 except DoesNotExist:
252 return HTTPError('User Not Exists', 400)
253 new_password = (lambda r: ''.join(
254 r.choice(string.hexdigits)
255 for i in range(r.randint(12, 24))))(SystemRandom())
256 user_id2 = hash_id(user.username, new_password)
257 user.update(user_id2=user_id2)
258 send_noreply(
259 [email], '[N-OJ] Password Recovery',
260 f'Your alternative password is {new_password}.\nPlease login and change your password.'
261 )
262 return HTTPResponse('Recovery Email Has Been Sent')
265@auth_api.route('/user', methods=['POST'])
266@Request.json('username: str', 'password: str', 'email: str')
267@identity_verify(0)
268def add_user(
269 user,
270 username: str,
271 password: str,
272 email: str,
273):
274 '''
275 Directly add a user without activation required.
276 This operation only allow admin to use.
277 '''
278 try:
279 User.signup(
280 username,
281 password,
282 email,
283 ).activate()
284 except ValidationError as ve:
285 return HTTPError('Signup Failed', 400, data=ve.to_dict())
286 except NotUniqueError:
287 return HTTPError('User Exists', 400)
288 except ValueError:
289 return HTTPError('Not Allowed Name', 400)
290 return HTTPResponse()
293@auth_api.route('/batch-signup', methods=['POST'])
294@Request.json('new_users: str', 'course', 'force')
295@Request.doc('course', 'course', Course, src_none_allowed=True)
296@identity_verify(0)
297def batch_signup(
298 user,
299 new_users: str,
300 course: Optional[Course],
301 force: Optional[bool],
302):
303 try:
304 new_users = [*csv.DictReader(io.StringIO(new_users))]
305 except csv.Error as e:
306 current_app.logger.info(f'Error parse csv file [err={e}]')
307 return HTTPError('Invalid file content', 400)
308 if force is None:
309 force = False
310 try:
311 new_users = User.batch_signup(
312 new_users=new_users,
313 course=course,
314 force=force,
315 )
316 except ValueError as e:
317 return HTTPError(str(e), 400)
318 return HTTPResponse()
321@auth_api.route('/me', methods=['GET'])
322@Request.args('fields')
323@login_required
324def get_me(user: User, fields: Optional[str]):
325 default = [
326 'username',
327 'email',
328 'md5',
329 'active',
330 'role',
331 'editorConfig',
332 'displayedName',
333 'bio',
334 ]
335 if fields is None:
336 fields = default
337 else:
338 fields = fields.split(',')
339 try:
340 return HTTPResponse(data=user.properties(*fields))
341 except ValueError as e:
342 return HTTPError(str(e), 400)