Coverage for model/auth.py: 100%

206 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-11 18:37 +0000

1# Standard library 

2from functools import wraps 

3from random import SystemRandom 

4from typing import Optional 

5import csv 

6import io 

7# Related third party imports 

8from flask import Blueprint, request, current_app, url_for 

9# Local application 

10from mongo import * 

11from mongo import engine 

12from mongo.utils import hash_id 

13from .utils import * 

14 

15import string 

16 

17__all__ = ( 

18 'auth_api', 

19 'login_required', 

20 'identity_verify', 

21 'get_verify_link', 

22) 

23 

24auth_api = Blueprint('auth_api', __name__) 

25 

26VERIFY_TEXT = '''\ 

27Welcome! you've signed up successfully! 

28Enter Normal OJ to active your account via this link: 

29{url} 

30''' 

31 

32VERIFY_HTML = '''\ 

33<!DOCTYPE html><html lang="en"><head><title>template</title><meta http-equiv="Content-Type" content="text/html; charset=utf-8"><meta http-equiv="X-UA-Compatible" content="IE=edge"><meta name="viewport" content="width=device-width, initial-scale=1.0 "><meta name="format-detection" content="telephone=no"><link href="https://fonts.googleapis.com/css?family=Lato:300,400,600,700,800" rel="stylesheet"><style>.em_body {{margin: 0px;padding: 0px;background-color: #efefef;}}.em_full_wrap {{vertical-align: top;width: 100%;border-spacing: 0px;border-collapse: separate;border: 0px;background-color: #efefef;margin-left: auto; margin-right: auto;}}.em_main_table {{width: 700px;border-spacing: 0px;border-collapse: separate;align-self: center;margin-left:auto; margin-right:auto;}}.em_full_wrap td, .em_main_table td {{padding: 0px;vertical-align: top;text-align: center;}}</style></head><body class="em_body"><table class="em_full_wrap"><tbody><tr><td><table class="em_main_table"><tr><td style="padding:35px 70px 30px; background-color: #003865"><table style="width: 100%; border-spacing: 0px; border-collapse: separate; border: 0px; margin-left: auto; margin-right: auto;"><tbody><tr><td style="font-family:'Lato', Arial, sans-serif; font-size:16px; line-height:30px; color:#fff; vertical-align: top; text-align: center;">Normal Online Judge Email Verification</td></tr><tr><td><hr></td></tr><tr><td style="font-family:'Lato', Arial, sans-serif; font-size:20px; line-height:22px; color:#fff; padding:12px; vertical-align: top; text-align: center;">Welcome! you've signed up successfully!<br><br>Enter Normal OJ to active your account via this link.</td></tr><tr><td class="em_h20" style="font-size:0px; line-height:0px; height:25px;">&nbsp;</td></tr><tr><td style="vertical-align: top; text-align: center;"><form target="_blank" action="{url}"><button type="submit" style="background:#A6DAEF; border-color: #fff; border-radius: 5px; font-family:'Lato', Arial, sans-serif; font-size:16px; line-height:22px; box-shadow: 0 8px 16px 0 rgba(0,0,0,0.2), 0 6px 20px 0 rgba(0,0,0,0.19); cursor: pointer;">Active Account</button></form></td></tr></tbody></table></td></tr><tr><td style="padding:18px 30px; background-color: #f6f7f8"><table style="width: 100%; border-spacing: 0px; border-collapse: separate; border: 0px; margin-left: auto; margin-right: auto;"><tbody><tr><td style="font-family:'Lato', Arial, sans-serif; font-size:11px; line-height:18px; color:#999999; vertical-align: top; text-align: center;">© 2020 Normal Online Judge. All Rights Reserved.</td></tr></tbody></table></td></tr></table></td></tr></tbody></table></body></html> 

34''' 

35 

36 

37def login_required(func): 

38 '''Check if the user is login 

39 

40 Returns: 

41 - A wrapped function 

42 - 403 Not Logged In 

43 - 403 Invalid Token 

44 - 403 Inactive User 

45 ''' 

46 

47 @wraps(func) 

48 @Request.cookies(vars_dict={'token': 'piann'}) 

49 def wrapper(token, *args, **kwargs): 

50 if token is None: 

51 return HTTPError('Not Logged In', 403) 

52 json = jwt_decode(token) 

53 if json is None or not json.get('secret'): 

54 return HTTPError('Invalid Token', 403) 

55 user = User(json['data']['username']) 

56 if json['data'].get('userId') != user.user_id: 

57 return HTTPError('Authorization Expired', 403) 

58 if not user.active: 

59 return HTTPError('Inactive User', 403) 

60 kwargs['user'] = user 

61 return func(*args, **kwargs) 

62 

63 return wrapper 

64 

65 

66def identity_verify(*roles): 

67 '''Verify a logged in user's identity 

68 

69 You can find an example in `model/test.py` 

70 ''' 

71 

72 def verify(func): 

73 

74 @wraps(func) 

75 @login_required 

76 def wrapper(user, *args, **kwargs): 

77 if user.role not in roles: 

78 return HTTPError('Insufficient Permissions', 403) 

79 kwargs['user'] = user 

80 return func(*args, **kwargs) 

81 

82 return wrapper 

83 

84 return verify 

85 

86 

87def get_verify_link(user: User) -> str: 

88 return url_for( 

89 'auth_api.active', 

90 _external=True, 

91 token=user.cookie, 

92 ) 

93 

94 

95@auth_api.route('/session', methods=['GET', 'POST']) 

96def session(): 

97 '''Create a session or remove a session. 

98 Request methods: 

99 GET: Logout 

100 POST: Login 

101 ''' 

102 

103 def logout(): 

104 '''Logout a user. 

105 Returns: 

106 - 200 Logout Success 

107 ''' 

108 cookies = {'jwt': None, 'piann': None} 

109 return HTTPResponse('Goodbye', cookies=cookies) 

110 

111 @Request.json('username: str', 'password: str') 

112 def login(username, password): 

113 '''Login a user. 

114 Returns: 

115 - 400 Incomplete Data 

116 - 403 Login Failed 

117 ''' 

118 ip_addr = request.headers.get('cf-connecting-ip', request.remote_addr) 

119 try: 

120 user = User.login(username, password, ip_addr) 

121 except DoesNotExist: 

122 return HTTPError('Login Failed', 403) 

123 if not user.active: 

124 return HTTPError('Invalid User', 403) 

125 cookies = {'piann_httponly': user.secret, 'jwt': user.cookie} 

126 return HTTPResponse('Login Success', cookies=cookies) 

127 

128 methods = {'GET': logout, 'POST': login} 

129 

130 return methods[request.method]() 

131 

132 

133@auth_api.route('/signup', methods=['POST']) 

134@Request.json('username: str', 'password: str', 'email: str') 

135def signup(username, password, email): 

136 try: 

137 user = User.signup(username, password, email) 

138 except ValidationError as ve: 

139 return HTTPError('Signup Failed', 400, data=ve.to_dict()) 

140 except NotUniqueError: 

141 return HTTPError('User Exists', 400) 

142 except ValueError: 

143 return HTTPError('Not Allowed Name', 400) 

144 verify_link = get_verify_link(user) 

145 text = VERIFY_TEXT.format(url=verify_link) 

146 html = VERIFY_HTML.format(url=verify_link) 

147 send_noreply([email], '[N-OJ] Varify Your Email', text, html) 

148 return HTTPResponse('Signup Success') 

149 

150 

151@auth_api.route('/change-password', methods=['POST']) 

152@login_required 

153@Request.json('old_password: str', 'new_password: str') 

154def change_password(user, old_password, new_password): 

155 ip_addr = request.headers.get('cf-connecting-ip', request.remote_addr) 

156 try: 

157 User.login(user.username, old_password, ip_addr) 

158 except DoesNotExist: 

159 return HTTPError('Wrong Password', 403) 

160 user.change_password(new_password) 

161 cookies = {'piann_httponly': user.secret} 

162 return HTTPResponse('Password Has Been Changed', cookies=cookies) 

163 

164 

165@auth_api.route('/check/<item>', methods=['POST']) 

166def check(item): 

167 '''Checking when the user is registing. 

168 ''' 

169 

170 @Request.json('username: str') 

171 def check_username(username): 

172 try: 

173 User.get_by_username(username) 

174 except DoesNotExist: 

175 return HTTPResponse('Username Can Be Used', data={'valid': 1}) 

176 return HTTPResponse('User Exists', data={'valid': 0}) 

177 

178 @Request.json('email: str') 

179 def check_email(email): 

180 try: 

181 User.get_by_email(email) 

182 except DoesNotExist: 

183 return HTTPResponse('Email Can Be Used', data={'valid': 1}) 

184 return HTTPResponse('Email Has Been Used', data={'valid': 0}) 

185 

186 method = {'username': check_username, 'email': check_email}.get(item) 

187 return method() if method else HTTPError('Ivalid Checking Type', 400) 

188 

189 

190@auth_api.route('/resend-email', methods=['POST']) 

191@Request.json('email: str') 

192def resend_email(email): 

193 try: 

194 user = User.get_by_email(email) 

195 except DoesNotExist: 

196 return HTTPError('User Not Exists', 400) 

197 if user.active: 

198 return HTTPError('User Has Been Actived', 400) 

199 verify_link = get_verify_link(user) 

200 send_noreply([email], '[N-OJ] Varify Your Email', verify_link) 

201 return HTTPResponse('Email Has Been Resent') 

202 

203 

204@auth_api.route('/active', methods=['POST']) 

205@auth_api.route('/active/<token>', methods=['GET']) 

206def active(token=None): 

207 '''Activate a user. 

208 ''' 

209 

210 @Request.json('profile: dict', 'agreement: bool') 

211 @Request.cookies(vars_dict={'token': 'piann'}) 

212 def update(profile, agreement, token): 

213 '''User: active: false -> true 

214 ''' 

215 if agreement is not True: 

216 return HTTPError('Not Confirm the Agreement', 403) 

217 json = jwt_decode(token) 

218 if json is None or not json.get('secret'): 

219 return HTTPError('Invalid Token.', 403) 

220 user = User(json['data']['username']) 

221 if not user: 

222 return HTTPError('User Not Exists', 400) 

223 if user.active: 

224 return HTTPError('User Has Been Actived', 400) 

225 try: 

226 user.activate(profile) 

227 except engine.DoesNotExist as e: 

228 return HTTPError(str(e), 404) 

229 cookies = {'jwt': user.cookie} 

230 return HTTPResponse('User Is Now Active', cookies=cookies) 

231 

232 def redir(): 

233 '''Redirect user to active page. 

234 ''' 

235 json = jwt_decode(token) 

236 if json is None: 

237 return HTTPError('Invalid Token', 403) 

238 user = User(json['data']['username']) 

239 cookies = {'piann_httponly': user.secret, 'jwt': user.cookie} 

240 return HTTPRedirect('/email_verify', cookies=cookies) 

241 

242 methods = {'GET': redir, 'POST': update} 

243 return methods[request.method]() 

244 

245 

246@auth_api.route('/password-recovery', methods=['POST']) 

247@Request.json('email: str') 

248def password_recovery(email): 

249 try: 

250 user = User.get_by_email(email) 

251 except DoesNotExist: 

252 return HTTPError('User Not Exists', 400) 

253 new_password = (lambda r: ''.join( 

254 r.choice(string.hexdigits) 

255 for i in range(r.randint(12, 24))))(SystemRandom()) 

256 user_id2 = hash_id(user.username, new_password) 

257 user.update(user_id2=user_id2) 

258 send_noreply( 

259 [email], '[N-OJ] Password Recovery', 

260 f'Your alternative password is {new_password}.\nPlease login and change your password.' 

261 ) 

262 return HTTPResponse('Recovery Email Has Been Sent') 

263 

264 

265@auth_api.route('/user', methods=['POST']) 

266@Request.json('username: str', 'password: str', 'email: str') 

267@identity_verify(0) 

268def add_user( 

269 user, 

270 username: str, 

271 password: str, 

272 email: str, 

273): 

274 ''' 

275 Directly add a user without activation required. 

276 This operation only allow admin to use. 

277 ''' 

278 try: 

279 User.signup( 

280 username, 

281 password, 

282 email, 

283 ).activate() 

284 except ValidationError as ve: 

285 return HTTPError('Signup Failed', 400, data=ve.to_dict()) 

286 except NotUniqueError: 

287 return HTTPError('User Exists', 400) 

288 except ValueError: 

289 return HTTPError('Not Allowed Name', 400) 

290 return HTTPResponse() 

291 

292 

293@auth_api.route('/batch-signup', methods=['POST']) 

294@Request.json('new_users: str', 'course', 'force') 

295@Request.doc('course', 'course', Course, src_none_allowed=True) 

296@identity_verify(0) 

297def batch_signup( 

298 user, 

299 new_users: str, 

300 course: Optional[Course], 

301 force: Optional[bool], 

302): 

303 try: 

304 new_users = [*csv.DictReader(io.StringIO(new_users))] 

305 except csv.Error as e: 

306 current_app.logger.info(f'Error parse csv file [err={e}]') 

307 return HTTPError('Invalid file content', 400) 

308 if force is None: 

309 force = False 

310 try: 

311 new_users = User.batch_signup( 

312 new_users=new_users, 

313 course=course, 

314 force=force, 

315 ) 

316 except ValueError as e: 

317 return HTTPError(str(e), 400) 

318 return HTTPResponse() 

319 

320 

321@auth_api.route('/me', methods=['GET']) 

322@Request.args('fields') 

323@login_required 

324def get_me(user: User, fields: Optional[str]): 

325 default = [ 

326 'username', 

327 'email', 

328 'md5', 

329 'active', 

330 'role', 

331 'editorConfig', 

332 'displayedName', 

333 'bio', 

334 ] 

335 if fields is None: 

336 fields = default 

337 else: 

338 fields = fields.split(',') 

339 try: 

340 return HTTPResponse(data=user.properties(*fields)) 

341 except ValueError as e: 

342 return HTTPError(str(e), 400)