Coverage for model/user.py: 100%

71 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-11 18:37 +0000

1from urllib import parse 

2from typing import Optional 

3from flask import Blueprint, current_app, request 

4from mongo import engine 

5from mongo.utils import drop_none 

6from mongo import * 

7from .utils import * 

8from .auth import identity_verify, login_required 

9 

10__all__ = ['user_api'] 

11 

12user_api = Blueprint('user_api', __name__) 

13 

14 

15@identity_verify(0) 

16def check_admin(user): 

17 ''' 

18 an empty wrapper to check whether client is admin 

19 ''' 

20 return None 

21 

22 

23@user_api.before_request 

24def before_request(): 

25 ''' 

26 we only allow admins to call user APIs, but the CORS preflight 

27 request won't contain credentials, so we skip the check for that. 

28 ''' 

29 if request.method.lower() == 'options': 

30 return None 

31 return check_admin() 

32 

33 

34@user_api.get('/') 

35@Request.args('offset', 'count', 'course', 'role') 

36def get_user_list( 

37 offset: Optional[str], 

38 count: Optional[str], 

39 course: Optional[str], 

40 role: Optional[str], 

41): 

42 try: 

43 if offset is not None: 

44 offset = int(offset) 

45 if count is not None: 

46 count = int(count) 

47 if role is not None: 

48 role = int(role) 

49 except (TypeError, ValueError): 

50 return HTTPError( 

51 'offset, count and role must be integer', 

52 400, 

53 ) 

54 if course is not None: 

55 course = parse.unquote(course) 

56 

57 # filter 

58 query = drop_none({ 

59 'courses': course, 

60 'role': role, 

61 }) 

62 user_list = engine.User.objects(**query) 

63 # truncate 

64 if offset is not None: 

65 user_list = user_list[offset:] 

66 if count is not None: 

67 user_list = user_list[:count] 

68 

69 user_list = [User(u).info for u in user_list] 

70 return HTTPResponse(data=user_list) 

71 

72 

73@user_api.get('/summary') 

74def get_user_summary(): 

75 user_count = engine.User.objects.count() 

76 breakdown = [{ 

77 "role": role.name.lower(), 

78 "count": engine.User.objects(role=role.value).count() 

79 } for role in engine.User.Role] 

80 return HTTPResponse(data={ 

81 "userCount": user_count, 

82 "breakdown": breakdown, 

83 }) 

84 

85 

86@user_api.post('/') 

87@Request.json('username: str', 'password: str', 'email: str') 

88def add_user( 

89 username: str, 

90 password: str, 

91 email: str, 

92): 

93 ''' 

94 Directly add a user without activation required. 

95 This operation only allow admin to use. 

96 ''' 

97 try: 

98 User.signup( 

99 username, 

100 password, 

101 email, 

102 ).activate() 

103 except ValidationError as ve: 

104 return HTTPError('Signup Failed', 400, data=ve.to_dict()) 

105 except NotUniqueError: 

106 return HTTPError('User Exists', 400) 

107 except ValueError as ve: 

108 return HTTPError('Not Allowed Name', 400) 

109 return HTTPResponse() 

110 

111 

112@user_api.patch('/<username>') 

113@login_required 

114@Request.doc('username', 'target_user', User) 

115@Request.json('password', 'displayed_name', 'role') 

116def update_user( 

117 user: User, 

118 target_user: User, 

119 password, 

120 displayed_name, 

121 role, 

122): 

123 # TODO: notify admin & user (by email, SMS, etc.) 

124 if password is not None: 

125 target_user.change_password(password) 

126 current_app.logger.info( 

127 'admin changed user password ' 

128 f'[actor={user.username}, user={target_user.username}]', ) 

129 payload = drop_none({ 

130 'profile__displayed_name': displayed_name, 

131 'role': role, 

132 }) 

133 if len(payload): 

134 fields = [*payload.keys()] 

135 target_user.update(**payload) 

136 current_app.logger.info( 

137 'admin changed user info ' 

138 f'[actor={user.username}, user={target_user.username}, fields={fields}]', 

139 ) 

140 return HTTPResponse()