Coverage for model/user.py: 100%
71 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-11 18:37 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-11 18:37 +0000
1from urllib import parse
2from typing import Optional
3from flask import Blueprint, current_app, request
4from mongo import engine
5from mongo.utils import drop_none
6from mongo import *
7from .utils import *
8from .auth import identity_verify, login_required
10__all__ = ['user_api']
12user_api = Blueprint('user_api', __name__)
15@identity_verify(0)
16def check_admin(user):
17 '''
18 an empty wrapper to check whether client is admin
19 '''
20 return None
23@user_api.before_request
24def before_request():
25 '''
26 we only allow admins to call user APIs, but the CORS preflight
27 request won't contain credentials, so we skip the check for that.
28 '''
29 if request.method.lower() == 'options':
30 return None
31 return check_admin()
34@user_api.get('/')
35@Request.args('offset', 'count', 'course', 'role')
36def get_user_list(
37 offset: Optional[str],
38 count: Optional[str],
39 course: Optional[str],
40 role: Optional[str],
41):
42 try:
43 if offset is not None:
44 offset = int(offset)
45 if count is not None:
46 count = int(count)
47 if role is not None:
48 role = int(role)
49 except (TypeError, ValueError):
50 return HTTPError(
51 'offset, count and role must be integer',
52 400,
53 )
54 if course is not None:
55 course = parse.unquote(course)
57 # filter
58 query = drop_none({
59 'courses': course,
60 'role': role,
61 })
62 user_list = engine.User.objects(**query)
63 # truncate
64 if offset is not None:
65 user_list = user_list[offset:]
66 if count is not None:
67 user_list = user_list[:count]
69 user_list = [User(u).info for u in user_list]
70 return HTTPResponse(data=user_list)
73@user_api.get('/summary')
74def get_user_summary():
75 user_count = engine.User.objects.count()
76 breakdown = [{
77 "role": role.name.lower(),
78 "count": engine.User.objects(role=role.value).count()
79 } for role in engine.User.Role]
80 return HTTPResponse(data={
81 "userCount": user_count,
82 "breakdown": breakdown,
83 })
86@user_api.post('/')
87@Request.json('username: str', 'password: str', 'email: str')
88def add_user(
89 username: str,
90 password: str,
91 email: str,
92):
93 '''
94 Directly add a user without activation required.
95 This operation only allow admin to use.
96 '''
97 try:
98 User.signup(
99 username,
100 password,
101 email,
102 ).activate()
103 except ValidationError as ve:
104 return HTTPError('Signup Failed', 400, data=ve.to_dict())
105 except NotUniqueError:
106 return HTTPError('User Exists', 400)
107 except ValueError as ve:
108 return HTTPError('Not Allowed Name', 400)
109 return HTTPResponse()
112@user_api.patch('/<username>')
113@login_required
114@Request.doc('username', 'target_user', User)
115@Request.json('password', 'displayed_name', 'role')
116def update_user(
117 user: User,
118 target_user: User,
119 password,
120 displayed_name,
121 role,
122):
123 # TODO: notify admin & user (by email, SMS, etc.)
124 if password is not None:
125 target_user.change_password(password)
126 current_app.logger.info(
127 'admin changed user password '
128 f'[actor={user.username}, user={target_user.username}]', )
129 payload = drop_none({
130 'profile__displayed_name': displayed_name,
131 'role': role,
132 })
133 if len(payload):
134 fields = [*payload.keys()]
135 target_user.update(**payload)
136 current_app.logger.info(
137 'admin changed user info '
138 f'[actor={user.username}, user={target_user.username}, fields={fields}]',
139 )
140 return HTTPResponse()