Coverage for model/user.py: 100%
66 statements
« prev ^ index » next coverage.py v7.3.2, created at 2024-11-05 04:22 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2024-11-05 04:22 +0000
1from urllib import parse
2from typing import Optional
3from flask import Blueprint, current_app, request
4from mongo import engine
5from mongo.utils import drop_none
6from mongo import *
7from .utils import *
8from .auth import identity_verify, login_required
10__all__ = ['user_api']
12user_api = Blueprint('user_api', __name__)
15@identity_verify(0)
16def check_admin(user):
17 '''
18 an empty wrapper to check whether client is admin
19 '''
20 return None
23@user_api.before_request
24def before_request():
25 '''
26 we only allow admins to call user APIs, but the CORS preflight
27 request won't contain credentials, so we skip the check for that.
28 '''
29 if request.method.lower() == 'options':
30 return None
31 return check_admin()
34@user_api.get('/')
35@Request.args('offset', 'count', 'course', 'role')
36def get_user_list(
37 offset: Optional[str],
38 count: Optional[str],
39 course: Optional[str],
40 role: Optional[str],
41):
42 try:
43 if offset is not None:
44 offset = int(offset)
45 if count is not None:
46 count = int(count)
47 if role is not None:
48 role = int(role)
49 except (TypeError, ValueError):
50 return HTTPError(
51 'offset, count and role must be integer',
52 400,
53 )
54 if course is not None:
55 course = parse.unquote(course)
57 # filter
58 query = drop_none({
59 'courses': course,
60 'role': role,
61 })
62 user_list = engine.User.objects(**query)
63 # truncate
64 if offset is not None:
65 user_list = user_list[offset:]
66 if count is not None:
67 user_list = user_list[:count]
69 user_list = [User(u).info for u in user_list]
70 return HTTPResponse(data=user_list)
73@user_api.post('/')
74@Request.json('username: str', 'password: str', 'email: str')
75def add_user(
76 username: str,
77 password: str,
78 email: str,
79):
80 '''
81 Directly add a user without activation required.
82 This operation only allow admin to use.
83 '''
84 try:
85 User.signup(
86 username,
87 password,
88 email,
89 ).activate()
90 except ValidationError as ve:
91 return HTTPError('Signup Failed', 400, data=ve.to_dict())
92 except NotUniqueError:
93 return HTTPError('User Exists', 400)
94 except ValueError as ve:
95 return HTTPError('Not Allowed Name', 400)
96 return HTTPResponse()
99@user_api.patch('/<username>')
100@login_required
101@Request.doc('username', 'target_user', User)
102@Request.json('password', 'displayed_name', 'role')
103def update_user(
104 user: User,
105 target_user: User,
106 password,
107 displayed_name,
108 role,
109):
110 # TODO: notify admin & user (by email, SMS, etc.)
111 if password is not None:
112 target_user.change_password(password)
113 current_app.logger.info(
114 'admin changed user password '
115 f'[actor={user.username}, user={target_user.username}]', )
116 payload = drop_none({
117 'profile__displayed_name': displayed_name,
118 'role': role,
119 })
120 if len(payload):
121 fields = [*payload.keys()]
122 target_user.update(**payload)
123 current_app.logger.info(
124 'admin changed user info '
125 f'[actor={user.username}, user={target_user.username}, fields={fields}]',
126 )
127 return HTTPResponse()