Coverage for mongo/course.py: 99%

130 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-14 03:01 +0000

1from . import engine 

2from .user import * 

3from .utils import * 

4import re 

5import enum 

6from typing import Dict, List, Optional 

7from .base import MongoBase 

8from datetime import datetime 

9 

10__all__ = [ 

11 'Course', 

12] 

13 

14 

15class Course(MongoBase, engine=engine.Course): 

16 

17 class Permission(enum.IntFlag): 

18 VIEW = enum.auto() # view course basic info 

19 SCORE = enum.auto() # only can view self score 

20 MODIFY = enum.auto() # manage course 

21 GRADE = enum.auto() # grade students' score 

22 

23 def __new__(cls, course_name, *args, **kwargs): 

24 try: 

25 new = super().__new__(cls, course_name) 

26 except engine.ValidationError: 

27 try: 

28 pk = Course.engine.objects(course_name=course_name).get() 

29 new = super().__new__(cls, pk) 

30 except engine.DoesNotExist: 

31 new = super().__new__(cls, '0' * 24) 

32 return new 

33 

34 def update_student_namelist( 

35 self, 

36 student_nicknames: Dict[str, str], 

37 ): 

38 from .homework import Homework 

39 if not all(User(name) for name in student_nicknames): 

40 raise engine.DoesNotExist(f'User not found') 

41 drop_user = set(self.student_nicknames) - set(student_nicknames) 

42 for user in drop_user: 

43 self.remove_user(User(user).obj) 

44 new_user = set(student_nicknames) - set(self.student_nicknames) 

45 for user in new_user: 

46 self.add_user(User(user).obj) 

47 self.student_nicknames = student_nicknames 

48 # TODO: use event to update homework data 

49 drop_user = [*map(User, drop_user)] 

50 new_user = [*map(User, new_user)] 

51 for homework in map(Homework, self.homeworks): 

52 homework.remove_student(drop_user) 

53 homework.add_student(new_user) 

54 self.save() 

55 

56 def add_user(self, user: User): 

57 if not self: 

58 raise engine.DoesNotExist(f'Course [{self.course_name}]') 

59 user.update(add_to_set__courses=self.id) 

60 user.reload('courses') 

61 

62 def remove_user(self, user: User): 

63 user.update(pull__courses=self.id) 

64 user.reload('courses') 

65 

66 @classmethod 

67 def get_all(cls): 

68 return engine.Course.objects 

69 

70 @classmethod 

71 def get_user_courses(cls, user): 

72 if user.role != 0: 

73 return user.courses 

74 else: 

75 return cls.get_all() 

76 

77 def edit_course(self, user, new_course, teacher): 

78 if re.match(r'^[a-zA-Z0-9._\- ]+$', new_course) is None: 

79 raise ValueError 

80 

81 if not self: 

82 raise engine.DoesNotExist('Course') 

83 if not perm(self, user): 

84 raise PermissionError 

85 te = User(teacher) 

86 if not te: 

87 raise engine.DoesNotExist('User') 

88 

89 # HACK: not sure why the unique index is not work during the test 

90 if Course(new_course): 

91 raise engine.NotUniqueError('Course') 

92 

93 self.course_name = new_course 

94 if te.obj != self.teacher: 

95 self.remove_user(self.teacher) 

96 self.add_user(te.obj) 

97 self.teacher = te.obj 

98 self.save() 

99 return True 

100 

101 def delete_course(self, user): 

102 if not self: 

103 # course not found 

104 raise engine.DoesNotExist('Course') 

105 if not perm(self, user): 

106 # user is not the TA or teacher in course 

107 raise PermissionError 

108 

109 self.remove_user(self.teacher) 

110 self.delete() 

111 return True 

112 

113 def get_scoreboard( 

114 self, 

115 problem_ids: List[int], 

116 start: Optional[float] = None, 

117 end: Optional[float] = None, 

118 ) -> List[Dict]: 

119 scoreboard = [] 

120 usernames = [User(u).id for u in self.student_nicknames.keys()] 

121 matching = { 

122 "user": { 

123 "$in": usernames 

124 }, 

125 "problem": { 

126 "$in": problem_ids 

127 }, 

128 "timestamp": {}, 

129 } 

130 if start: 

131 matching['timestamp']['$gte'] = datetime.fromtimestamp(start) 

132 if end: 

133 matching['timestamp']['$lte'] = datetime.fromtimestamp(end) 

134 if not matching["timestamp"]: 

135 del matching["timestamp"] 

136 pipeline = [ 

137 { 

138 "$match": matching 

139 }, 

140 { 

141 "$group": { 

142 "_id": { 

143 "user": "$user", 

144 "problem": "$problem", 

145 }, 

146 "count": { 

147 "$sum": 1 

148 }, 

149 "max": { 

150 "$max": "$score" 

151 }, 

152 "min": { 

153 "$min": "$score" 

154 }, 

155 "avg": { 

156 "$avg": "$score" 

157 }, 

158 } 

159 }, 

160 { 

161 "$group": { 

162 "_id": "$_id.user", 

163 "scores": { 

164 "$push": { 

165 "pid": "$_id.problem", 

166 "count": "$count", 

167 "max": "$max", 

168 "min": "$min", 

169 "avg": "$avg", 

170 }, 

171 }, 

172 } 

173 }, 

174 ] 

175 cursor = engine.Submission.objects().aggregate(pipeline) 

176 unrecorded_users = set(usernames) 

177 for item in cursor: 

178 sum_of_score = sum(s['max'] for s in item['scores']) 

179 scoreboard.append({ 

180 'user': User(item['_id']).info, 

181 'sum': sum_of_score, 

182 'avg': sum_of_score / len(problem_ids), 

183 **{f'{score["pid"]}': score 

184 for score in item['scores']}, 

185 }) 

186 unrecorded_users.remove(item['_id']) 

187 for u in unrecorded_users: 

188 scoreboard.append({ 

189 'user': User(u).info, 

190 'sum': 0, 

191 'avg': 0, 

192 }) 

193 

194 return scoreboard 

195 

196 @classmethod 

197 def add_course(cls, course, teacher): 

198 if re.match(r'^[a-zA-Z0-9._\- ]+$', course) is None: 

199 raise ValueError 

200 teacher = User(teacher) 

201 if not teacher: 

202 raise engine.DoesNotExist('User') 

203 if teacher.role >= 2: 

204 raise PermissionError( 

205 f'{teacher} is not permitted to create a course') 

206 # HACK: not sure why the unique index is not work during the test 

207 if cls(course): 

208 raise engine.NotUniqueError('Course') 

209 co = cls.engine( 

210 course_name=course, 

211 teacher=teacher.obj, 

212 ).save() 

213 cls(co).add_user(teacher.obj) 

214 return True 

215 

216 @classmethod 

217 def get_public(cls): 

218 if not cls('Public'): 

219 cls.add_course('Public', 'first_admin') 

220 return cls('Public') 

221 

222 def own_permission(self, user) -> Permission: 

223 ROLE_CAPABILITY = { 

224 0: 

225 self.Permission(0), 

226 1: 

227 self.Permission.VIEW | self.Permission.SCORE, 

228 2: 

229 self.Permission.VIEW | self.Permission.GRADE, 

230 3: 

231 self.Permission.VIEW | self.Permission.GRADE 

232 | self.Permission.MODIFY, 

233 4: 

234 self.Permission.VIEW | self.Permission.GRADE 

235 | self.Permission.MODIFY, 

236 } 

237 

238 role = perm(self.obj, user) 

239 

240 return ROLE_CAPABILITY[role] 

241 

242 def permission(self, user, req) -> bool: 

243 """ 

244 check whether user own `req` permission 

245 """ 

246 

247 return bool(self.own_permission(user) & req)