Coverage for model/auth.py: 100%

204 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2024-11-05 04:22 +0000

1# Standard library 

2from functools import wraps 

3from random import SystemRandom 

4from typing import Optional 

5import csv 

6import io 

7# Related third party imports 

8from flask import Blueprint, request, current_app, url_for 

9# Local application 

10from mongo import * 

11from mongo import engine 

12from mongo.utils import hash_id 

13from .utils import * 

14 

15import string 

16 

17__all__ = ( 

18 'auth_api', 

19 'login_required', 

20 'identity_verify', 

21 'get_verify_link', 

22) 

23 

24auth_api = Blueprint('auth_api', __name__) 

25 

26VERIFY_TEXT = '''\ 

27Welcome! you've signed up successfully! 

28Enter Normal OJ to active your account via this link: 

29{url} 

30''' 

31 

32VERIFY_HTML = '''\ 

33<!DOCTYPE html><html lang="en"><head><title>template</title><meta http-equiv="Content-Type" content="text/html; charset=utf-8"><meta http-equiv="X-UA-Compatible" content="IE=edge"><meta name="viewport" content="width=device-width, initial-scale=1.0 "><meta name="format-detection" content="telephone=no"><link href="https://fonts.googleapis.com/css?family=Lato:300,400,600,700,800" rel="stylesheet"><style>.em_body {{margin: 0px;padding: 0px;background-color: #efefef;}}.em_full_wrap {{vertical-align: top;width: 100%;border-spacing: 0px;border-collapse: separate;border: 0px;background-color: #efefef;margin-left: auto; margin-right: auto;}}.em_main_table {{width: 700px;border-spacing: 0px;border-collapse: separate;align-self: center;margin-left:auto; margin-right:auto;}}.em_full_wrap td, .em_main_table td {{padding: 0px;vertical-align: top;text-align: center;}}</style></head><body class="em_body"><table class="em_full_wrap"><tbody><tr><td><table class="em_main_table"><tr><td style="padding:35px 70px 30px; background-color: #003865"><table style="width: 100%; border-spacing: 0px; border-collapse: separate; border: 0px; margin-left: auto; margin-right: auto;"><tbody><tr><td style="font-family:'Lato', Arial, sans-serif; font-size:16px; line-height:30px; color:#fff; vertical-align: top; text-align: center;">Normal Online Judge Email Verification</td></tr><tr><td><hr></td></tr><tr><td style="font-family:'Lato', Arial, sans-serif; font-size:20px; line-height:22px; color:#fff; padding:12px; vertical-align: top; text-align: center;">Welcome! you've signed up successfully!<br><br>Enter Normal OJ to active your account via this link.</td></tr><tr><td class="em_h20" style="font-size:0px; line-height:0px; height:25px;">&nbsp;</td></tr><tr><td style="vertical-align: top; text-align: center;"><form target="_blank" action="{url}"><button type="submit" style="background:#A6DAEF; border-color: #fff; border-radius: 5px; font-family:'Lato', Arial, sans-serif; font-size:16px; line-height:22px; box-shadow: 0 8px 16px 0 rgba(0,0,0,0.2), 0 6px 20px 0 rgba(0,0,0,0.19); cursor: pointer;">Active Account</button></form></td></tr></tbody></table></td></tr><tr><td style="padding:18px 30px; background-color: #f6f7f8"><table style="width: 100%; border-spacing: 0px; border-collapse: separate; border: 0px; margin-left: auto; margin-right: auto;"><tbody><tr><td style="font-family:'Lato', Arial, sans-serif; font-size:11px; line-height:18px; color:#999999; vertical-align: top; text-align: center;">© 2020 Normal Online Judge. All Rights Reserved.</td></tr></tbody></table></td></tr></table></td></tr></tbody></table></body></html> 

34''' 

35 

36 

37def login_required(func): 

38 '''Check if the user is login 

39 

40 Returns: 

41 - A wrapped function 

42 - 403 Not Logged In 

43 - 403 Invalid Token 

44 - 403 Inactive User 

45 ''' 

46 

47 @wraps(func) 

48 @Request.cookies(vars_dict={'token': 'piann'}) 

49 def wrapper(token, *args, **kwargs): 

50 if token is None: 

51 return HTTPError('Not Logged In', 403) 

52 json = jwt_decode(token) 

53 if json is None or not json.get('secret'): 

54 return HTTPError('Invalid Token', 403) 

55 user = User(json['data']['username']) 

56 if json['data'].get('userId') != user.user_id: 

57 return HTTPError('Authorization Expired', 403) 

58 if not user.active: 

59 return HTTPError('Inactive User', 403) 

60 kwargs['user'] = user 

61 return func(*args, **kwargs) 

62 

63 return wrapper 

64 

65 

66def identity_verify(*roles): 

67 '''Verify a logged in user's identity 

68 

69 You can find an example in `model/test.py` 

70 ''' 

71 

72 def verify(func): 

73 

74 @wraps(func) 

75 @login_required 

76 def wrapper(user, *args, **kwargs): 

77 if user.role not in roles: 

78 return HTTPError('Insufficient Permissions', 403) 

79 kwargs['user'] = user 

80 return func(*args, **kwargs) 

81 

82 return wrapper 

83 

84 return verify 

85 

86 

87def get_verify_link(user: User) -> str: 

88 return url_for( 

89 'auth_api.active', 

90 _external=True, 

91 token=user.cookie, 

92 ) 

93 

94 

95@auth_api.route('/session', methods=['GET', 'POST']) 

96def session(): 

97 '''Create a session or remove a session. 

98 Request methods: 

99 GET: Logout 

100 POST: Login 

101 ''' 

102 

103 def logout(): 

104 '''Logout a user. 

105 Returns: 

106 - 200 Logout Success 

107 ''' 

108 cookies = {'jwt': None, 'piann': None} 

109 return HTTPResponse('Goodbye', cookies=cookies) 

110 

111 @Request.json('username: str', 'password: str') 

112 def login(username, password): 

113 '''Login a user. 

114 Returns: 

115 - 400 Incomplete Data 

116 - 403 Login Failed 

117 ''' 

118 try: 

119 user = User.login(username, password, request.remote_addr) 

120 except DoesNotExist: 

121 return HTTPError('Login Failed', 403) 

122 if not user.active: 

123 return HTTPError('Invalid User', 403) 

124 cookies = {'piann_httponly': user.secret, 'jwt': user.cookie} 

125 return HTTPResponse('Login Success', cookies=cookies) 

126 

127 methods = {'GET': logout, 'POST': login} 

128 

129 return methods[request.method]() 

130 

131 

132@auth_api.route('/signup', methods=['POST']) 

133@Request.json('username: str', 'password: str', 'email: str') 

134def signup(username, password, email): 

135 try: 

136 user = User.signup(username, password, email) 

137 except ValidationError as ve: 

138 return HTTPError('Signup Failed', 400, data=ve.to_dict()) 

139 except NotUniqueError: 

140 return HTTPError('User Exists', 400) 

141 except ValueError: 

142 return HTTPError('Not Allowed Name', 400) 

143 verify_link = get_verify_link(user) 

144 text = VERIFY_TEXT.format(url=verify_link) 

145 html = VERIFY_HTML.format(url=verify_link) 

146 send_noreply([email], '[N-OJ] Varify Your Email', text, html) 

147 return HTTPResponse('Signup Success') 

148 

149 

150@auth_api.route('/change-password', methods=['POST']) 

151@login_required 

152@Request.json('old_password: str', 'new_password: str') 

153def change_password(user, old_password, new_password): 

154 try: 

155 User.login(user.username, old_password, request.remote_addr) 

156 except DoesNotExist: 

157 return HTTPError('Wrong Password', 403) 

158 user.change_password(new_password) 

159 cookies = {'piann_httponly': user.secret} 

160 return HTTPResponse('Password Has Been Changed', cookies=cookies) 

161 

162 

163@auth_api.route('/check/<item>', methods=['POST']) 

164def check(item): 

165 '''Checking when the user is registing. 

166 ''' 

167 

168 @Request.json('username: str') 

169 def check_username(username): 

170 try: 

171 User.get_by_username(username) 

172 except DoesNotExist: 

173 return HTTPResponse('Username Can Be Used', data={'valid': 1}) 

174 return HTTPResponse('User Exists', data={'valid': 0}) 

175 

176 @Request.json('email: str') 

177 def check_email(email): 

178 try: 

179 User.get_by_email(email) 

180 except DoesNotExist: 

181 return HTTPResponse('Email Can Be Used', data={'valid': 1}) 

182 return HTTPResponse('Email Has Been Used', data={'valid': 0}) 

183 

184 method = {'username': check_username, 'email': check_email}.get(item) 

185 return method() if method else HTTPError('Ivalid Checking Type', 400) 

186 

187 

188@auth_api.route('/resend-email', methods=['POST']) 

189@Request.json('email: str') 

190def resend_email(email): 

191 try: 

192 user = User.get_by_email(email) 

193 except DoesNotExist: 

194 return HTTPError('User Not Exists', 400) 

195 if user.active: 

196 return HTTPError('User Has Been Actived', 400) 

197 verify_link = get_verify_link(user) 

198 send_noreply([email], '[N-OJ] Varify Your Email', verify_link) 

199 return HTTPResponse('Email Has Been Resent') 

200 

201 

202@auth_api.route('/active', methods=['POST']) 

203@auth_api.route('/active/<token>', methods=['GET']) 

204def active(token=None): 

205 '''Activate a user. 

206 ''' 

207 

208 @Request.json('profile: dict', 'agreement: bool') 

209 @Request.cookies(vars_dict={'token': 'piann'}) 

210 def update(profile, agreement, token): 

211 '''User: active: false -> true 

212 ''' 

213 if agreement is not True: 

214 return HTTPError('Not Confirm the Agreement', 403) 

215 json = jwt_decode(token) 

216 if json is None or not json.get('secret'): 

217 return HTTPError('Invalid Token.', 403) 

218 user = User(json['data']['username']) 

219 if not user: 

220 return HTTPError('User Not Exists', 400) 

221 if user.active: 

222 return HTTPError('User Has Been Actived', 400) 

223 try: 

224 user.activate(profile) 

225 except engine.DoesNotExist as e: 

226 return HTTPError(str(e), 404) 

227 cookies = {'jwt': user.cookie} 

228 return HTTPResponse('User Is Now Active', cookies=cookies) 

229 

230 def redir(): 

231 '''Redirect user to active page. 

232 ''' 

233 json = jwt_decode(token) 

234 if json is None: 

235 return HTTPError('Invalid Token', 403) 

236 user = User(json['data']['username']) 

237 cookies = {'piann_httponly': user.secret, 'jwt': user.cookie} 

238 return HTTPRedirect('/email_verify', cookies=cookies) 

239 

240 methods = {'GET': redir, 'POST': update} 

241 return methods[request.method]() 

242 

243 

244@auth_api.route('/password-recovery', methods=['POST']) 

245@Request.json('email: str') 

246def password_recovery(email): 

247 try: 

248 user = User.get_by_email(email) 

249 except DoesNotExist: 

250 return HTTPError('User Not Exists', 400) 

251 new_password = (lambda r: ''.join( 

252 r.choice(string.hexdigits) 

253 for i in range(r.randint(12, 24))))(SystemRandom()) 

254 user_id2 = hash_id(user.username, new_password) 

255 user.update(user_id2=user_id2) 

256 send_noreply( 

257 [email], '[N-OJ] Password Recovery', 

258 f'Your alternative password is {new_password}.\nPlease login and change your password.' 

259 ) 

260 return HTTPResponse('Recovery Email Has Been Sent') 

261 

262 

263@auth_api.route('/user', methods=['POST']) 

264@Request.json('username: str', 'password: str', 'email: str') 

265@identity_verify(0) 

266def add_user( 

267 user, 

268 username: str, 

269 password: str, 

270 email: str, 

271): 

272 ''' 

273 Directly add a user without activation required. 

274 This operation only allow admin to use. 

275 ''' 

276 try: 

277 User.signup( 

278 username, 

279 password, 

280 email, 

281 ).activate() 

282 except ValidationError as ve: 

283 return HTTPError('Signup Failed', 400, data=ve.to_dict()) 

284 except NotUniqueError: 

285 return HTTPError('User Exists', 400) 

286 except ValueError: 

287 return HTTPError('Not Allowed Name', 400) 

288 return HTTPResponse() 

289 

290 

291@auth_api.route('/batch-signup', methods=['POST']) 

292@Request.json('new_users: str', 'course', 'force') 

293@Request.doc('course', 'course', Course, src_none_allowed=True) 

294@identity_verify(0) 

295def batch_signup( 

296 user, 

297 new_users: str, 

298 course: Optional[Course], 

299 force: Optional[bool], 

300): 

301 try: 

302 new_users = [*csv.DictReader(io.StringIO(new_users))] 

303 except csv.Error as e: 

304 current_app.logger.info(f'Error parse csv file [err={e}]') 

305 return HTTPError('Invalid file content', 400) 

306 if force is None: 

307 force = False 

308 try: 

309 new_users = User.batch_signup( 

310 new_users=new_users, 

311 course=course, 

312 force=force, 

313 ) 

314 except ValueError as e: 

315 return HTTPError(str(e), 400) 

316 return HTTPResponse() 

317 

318 

319@auth_api.route('/me', methods=['GET']) 

320@Request.args('fields') 

321@login_required 

322def get_me(user: User, fields: Optional[str]): 

323 default = [ 

324 'username', 

325 'email', 

326 'md5', 

327 'active', 

328 'role', 

329 'editorConfig', 

330 'displayedName', 

331 'bio', 

332 ] 

333 if fields is None: 

334 fields = default 

335 else: 

336 fields = fields.split(',') 

337 try: 

338 return HTTPResponse(data=user.properties(*fields)) 

339 except ValueError as e: 

340 return HTTPError(str(e), 400)