Coverage for model/auth.py: 100%
204 statements
« prev ^ index » next coverage.py v7.3.2, created at 2024-11-05 04:22 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2024-11-05 04:22 +0000
1# Standard library
2from functools import wraps
3from random import SystemRandom
4from typing import Optional
5import csv
6import io
7# Related third party imports
8from flask import Blueprint, request, current_app, url_for
9# Local application
10from mongo import *
11from mongo import engine
12from mongo.utils import hash_id
13from .utils import *
15import string
17__all__ = (
18 'auth_api',
19 'login_required',
20 'identity_verify',
21 'get_verify_link',
22)
24auth_api = Blueprint('auth_api', __name__)
26VERIFY_TEXT = '''\
27Welcome! you've signed up successfully!
28Enter Normal OJ to active your account via this link:
29{url}
30'''
32VERIFY_HTML = '''\
33<!DOCTYPE html><html lang="en"><head><title>template</title><meta http-equiv="Content-Type" content="text/html; charset=utf-8"><meta http-equiv="X-UA-Compatible" content="IE=edge"><meta name="viewport" content="width=device-width, initial-scale=1.0 "><meta name="format-detection" content="telephone=no"><link href="https://fonts.googleapis.com/css?family=Lato:300,400,600,700,800" rel="stylesheet"><style>.em_body {{margin: 0px;padding: 0px;background-color: #efefef;}}.em_full_wrap {{vertical-align: top;width: 100%;border-spacing: 0px;border-collapse: separate;border: 0px;background-color: #efefef;margin-left: auto; margin-right: auto;}}.em_main_table {{width: 700px;border-spacing: 0px;border-collapse: separate;align-self: center;margin-left:auto; margin-right:auto;}}.em_full_wrap td, .em_main_table td {{padding: 0px;vertical-align: top;text-align: center;}}</style></head><body class="em_body"><table class="em_full_wrap"><tbody><tr><td><table class="em_main_table"><tr><td style="padding:35px 70px 30px; background-color: #003865"><table style="width: 100%; border-spacing: 0px; border-collapse: separate; border: 0px; margin-left: auto; margin-right: auto;"><tbody><tr><td style="font-family:'Lato', Arial, sans-serif; font-size:16px; line-height:30px; color:#fff; vertical-align: top; text-align: center;">Normal Online Judge Email Verification</td></tr><tr><td><hr></td></tr><tr><td style="font-family:'Lato', Arial, sans-serif; font-size:20px; line-height:22px; color:#fff; padding:12px; vertical-align: top; text-align: center;">Welcome! you've signed up successfully!<br><br>Enter Normal OJ to active your account via this link.</td></tr><tr><td class="em_h20" style="font-size:0px; line-height:0px; height:25px;"> </td></tr><tr><td style="vertical-align: top; text-align: center;"><form target="_blank" action="{url}"><button type="submit" style="background:#A6DAEF; border-color: #fff; border-radius: 5px; font-family:'Lato', Arial, sans-serif; font-size:16px; line-height:22px; box-shadow: 0 8px 16px 0 rgba(0,0,0,0.2), 0 6px 20px 0 rgba(0,0,0,0.19); cursor: pointer;">Active Account</button></form></td></tr></tbody></table></td></tr><tr><td style="padding:18px 30px; background-color: #f6f7f8"><table style="width: 100%; border-spacing: 0px; border-collapse: separate; border: 0px; margin-left: auto; margin-right: auto;"><tbody><tr><td style="font-family:'Lato', Arial, sans-serif; font-size:11px; line-height:18px; color:#999999; vertical-align: top; text-align: center;">© 2020 Normal Online Judge. All Rights Reserved.</td></tr></tbody></table></td></tr></table></td></tr></tbody></table></body></html>
34'''
37def login_required(func):
38 '''Check if the user is login
40 Returns:
41 - A wrapped function
42 - 403 Not Logged In
43 - 403 Invalid Token
44 - 403 Inactive User
45 '''
47 @wraps(func)
48 @Request.cookies(vars_dict={'token': 'piann'})
49 def wrapper(token, *args, **kwargs):
50 if token is None:
51 return HTTPError('Not Logged In', 403)
52 json = jwt_decode(token)
53 if json is None or not json.get('secret'):
54 return HTTPError('Invalid Token', 403)
55 user = User(json['data']['username'])
56 if json['data'].get('userId') != user.user_id:
57 return HTTPError('Authorization Expired', 403)
58 if not user.active:
59 return HTTPError('Inactive User', 403)
60 kwargs['user'] = user
61 return func(*args, **kwargs)
63 return wrapper
66def identity_verify(*roles):
67 '''Verify a logged in user's identity
69 You can find an example in `model/test.py`
70 '''
72 def verify(func):
74 @wraps(func)
75 @login_required
76 def wrapper(user, *args, **kwargs):
77 if user.role not in roles:
78 return HTTPError('Insufficient Permissions', 403)
79 kwargs['user'] = user
80 return func(*args, **kwargs)
82 return wrapper
84 return verify
87def get_verify_link(user: User) -> str:
88 return url_for(
89 'auth_api.active',
90 _external=True,
91 token=user.cookie,
92 )
95@auth_api.route('/session', methods=['GET', 'POST'])
96def session():
97 '''Create a session or remove a session.
98 Request methods:
99 GET: Logout
100 POST: Login
101 '''
103 def logout():
104 '''Logout a user.
105 Returns:
106 - 200 Logout Success
107 '''
108 cookies = {'jwt': None, 'piann': None}
109 return HTTPResponse('Goodbye', cookies=cookies)
111 @Request.json('username: str', 'password: str')
112 def login(username, password):
113 '''Login a user.
114 Returns:
115 - 400 Incomplete Data
116 - 403 Login Failed
117 '''
118 try:
119 user = User.login(username, password, request.remote_addr)
120 except DoesNotExist:
121 return HTTPError('Login Failed', 403)
122 if not user.active:
123 return HTTPError('Invalid User', 403)
124 cookies = {'piann_httponly': user.secret, 'jwt': user.cookie}
125 return HTTPResponse('Login Success', cookies=cookies)
127 methods = {'GET': logout, 'POST': login}
129 return methods[request.method]()
132@auth_api.route('/signup', methods=['POST'])
133@Request.json('username: str', 'password: str', 'email: str')
134def signup(username, password, email):
135 try:
136 user = User.signup(username, password, email)
137 except ValidationError as ve:
138 return HTTPError('Signup Failed', 400, data=ve.to_dict())
139 except NotUniqueError:
140 return HTTPError('User Exists', 400)
141 except ValueError:
142 return HTTPError('Not Allowed Name', 400)
143 verify_link = get_verify_link(user)
144 text = VERIFY_TEXT.format(url=verify_link)
145 html = VERIFY_HTML.format(url=verify_link)
146 send_noreply([email], '[N-OJ] Varify Your Email', text, html)
147 return HTTPResponse('Signup Success')
150@auth_api.route('/change-password', methods=['POST'])
151@login_required
152@Request.json('old_password: str', 'new_password: str')
153def change_password(user, old_password, new_password):
154 try:
155 User.login(user.username, old_password, request.remote_addr)
156 except DoesNotExist:
157 return HTTPError('Wrong Password', 403)
158 user.change_password(new_password)
159 cookies = {'piann_httponly': user.secret}
160 return HTTPResponse('Password Has Been Changed', cookies=cookies)
163@auth_api.route('/check/<item>', methods=['POST'])
164def check(item):
165 '''Checking when the user is registing.
166 '''
168 @Request.json('username: str')
169 def check_username(username):
170 try:
171 User.get_by_username(username)
172 except DoesNotExist:
173 return HTTPResponse('Username Can Be Used', data={'valid': 1})
174 return HTTPResponse('User Exists', data={'valid': 0})
176 @Request.json('email: str')
177 def check_email(email):
178 try:
179 User.get_by_email(email)
180 except DoesNotExist:
181 return HTTPResponse('Email Can Be Used', data={'valid': 1})
182 return HTTPResponse('Email Has Been Used', data={'valid': 0})
184 method = {'username': check_username, 'email': check_email}.get(item)
185 return method() if method else HTTPError('Ivalid Checking Type', 400)
188@auth_api.route('/resend-email', methods=['POST'])
189@Request.json('email: str')
190def resend_email(email):
191 try:
192 user = User.get_by_email(email)
193 except DoesNotExist:
194 return HTTPError('User Not Exists', 400)
195 if user.active:
196 return HTTPError('User Has Been Actived', 400)
197 verify_link = get_verify_link(user)
198 send_noreply([email], '[N-OJ] Varify Your Email', verify_link)
199 return HTTPResponse('Email Has Been Resent')
202@auth_api.route('/active', methods=['POST'])
203@auth_api.route('/active/<token>', methods=['GET'])
204def active(token=None):
205 '''Activate a user.
206 '''
208 @Request.json('profile: dict', 'agreement: bool')
209 @Request.cookies(vars_dict={'token': 'piann'})
210 def update(profile, agreement, token):
211 '''User: active: false -> true
212 '''
213 if agreement is not True:
214 return HTTPError('Not Confirm the Agreement', 403)
215 json = jwt_decode(token)
216 if json is None or not json.get('secret'):
217 return HTTPError('Invalid Token.', 403)
218 user = User(json['data']['username'])
219 if not user:
220 return HTTPError('User Not Exists', 400)
221 if user.active:
222 return HTTPError('User Has Been Actived', 400)
223 try:
224 user.activate(profile)
225 except engine.DoesNotExist as e:
226 return HTTPError(str(e), 404)
227 cookies = {'jwt': user.cookie}
228 return HTTPResponse('User Is Now Active', cookies=cookies)
230 def redir():
231 '''Redirect user to active page.
232 '''
233 json = jwt_decode(token)
234 if json is None:
235 return HTTPError('Invalid Token', 403)
236 user = User(json['data']['username'])
237 cookies = {'piann_httponly': user.secret, 'jwt': user.cookie}
238 return HTTPRedirect('/email_verify', cookies=cookies)
240 methods = {'GET': redir, 'POST': update}
241 return methods[request.method]()
244@auth_api.route('/password-recovery', methods=['POST'])
245@Request.json('email: str')
246def password_recovery(email):
247 try:
248 user = User.get_by_email(email)
249 except DoesNotExist:
250 return HTTPError('User Not Exists', 400)
251 new_password = (lambda r: ''.join(
252 r.choice(string.hexdigits)
253 for i in range(r.randint(12, 24))))(SystemRandom())
254 user_id2 = hash_id(user.username, new_password)
255 user.update(user_id2=user_id2)
256 send_noreply(
257 [email], '[N-OJ] Password Recovery',
258 f'Your alternative password is {new_password}.\nPlease login and change your password.'
259 )
260 return HTTPResponse('Recovery Email Has Been Sent')
263@auth_api.route('/user', methods=['POST'])
264@Request.json('username: str', 'password: str', 'email: str')
265@identity_verify(0)
266def add_user(
267 user,
268 username: str,
269 password: str,
270 email: str,
271):
272 '''
273 Directly add a user without activation required.
274 This operation only allow admin to use.
275 '''
276 try:
277 User.signup(
278 username,
279 password,
280 email,
281 ).activate()
282 except ValidationError as ve:
283 return HTTPError('Signup Failed', 400, data=ve.to_dict())
284 except NotUniqueError:
285 return HTTPError('User Exists', 400)
286 except ValueError:
287 return HTTPError('Not Allowed Name', 400)
288 return HTTPResponse()
291@auth_api.route('/batch-signup', methods=['POST'])
292@Request.json('new_users: str', 'course', 'force')
293@Request.doc('course', 'course', Course, src_none_allowed=True)
294@identity_verify(0)
295def batch_signup(
296 user,
297 new_users: str,
298 course: Optional[Course],
299 force: Optional[bool],
300):
301 try:
302 new_users = [*csv.DictReader(io.StringIO(new_users))]
303 except csv.Error as e:
304 current_app.logger.info(f'Error parse csv file [err={e}]')
305 return HTTPError('Invalid file content', 400)
306 if force is None:
307 force = False
308 try:
309 new_users = User.batch_signup(
310 new_users=new_users,
311 course=course,
312 force=force,
313 )
314 except ValueError as e:
315 return HTTPError(str(e), 400)
316 return HTTPResponse()
319@auth_api.route('/me', methods=['GET'])
320@Request.args('fields')
321@login_required
322def get_me(user: User, fields: Optional[str]):
323 default = [
324 'username',
325 'email',
326 'md5',
327 'active',
328 'role',
329 'editorConfig',
330 'displayedName',
331 'bio',
332 ]
333 if fields is None:
334 fields = default
335 else:
336 fields = fields.split(',')
337 try:
338 return HTTPResponse(data=user.properties(*fields))
339 except ValueError as e:
340 return HTTPError(str(e), 400)