Coverage for mongo/course.py: 99%
132 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-11 18:37 +0000
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-11 18:37 +0000
1from . import engine
2from .user import *
3from .utils import *
4import re
5import enum
6from typing import Dict, List, Optional
7from .base import MongoBase
8from datetime import datetime
10__all__ = [
11 'Course',
12]
15class Course(MongoBase, engine=engine.Course):
17 class Permission(enum.IntFlag):
18 VIEW = enum.auto() # view course basic info
19 SCORE = enum.auto() # only can view self score
20 MODIFY = enum.auto() # manage course
21 GRADE = enum.auto() # grade students' score
23 def __new__(cls, course_name, *args, **kwargs):
24 try:
25 new = super().__new__(cls, course_name)
26 except engine.ValidationError:
27 try:
28 pk = Course.engine.objects(course_name=course_name).get()
29 new = super().__new__(cls, pk)
30 except engine.DoesNotExist:
31 new = super().__new__(cls, '0' * 24)
32 return new
34 def update_student_namelist(
35 self,
36 student_nicknames: Dict[str, str],
37 ):
38 from .homework import Homework
39 if not all(User(name) for name in student_nicknames):
40 raise engine.DoesNotExist(f'User not found')
41 drop_user = set(self.student_nicknames) - set(student_nicknames)
42 for user in drop_user:
43 self.remove_user(User(user).obj)
44 new_user = set(student_nicknames) - set(self.student_nicknames)
45 for user in new_user:
46 self.add_user(User(user).obj)
47 self.student_nicknames = student_nicknames
48 # TODO: use event to update homework data
49 drop_user = [*map(User, drop_user)]
50 new_user = [*map(User, new_user)]
51 for homework in map(Homework, self.homeworks):
52 homework.remove_student(drop_user)
53 homework.add_student(new_user)
54 self.save()
56 def add_user(self, user: User):
57 if not self:
58 raise engine.DoesNotExist(f'Course [{self.course_name}]')
59 user.update(add_to_set__courses=self.id)
60 user.reload('courses')
62 def remove_user(self, user: User):
63 user.update(pull__courses=self.id)
64 user.reload('courses')
66 @classmethod
67 def get_all(cls):
68 return engine.Course.objects
70 @classmethod
71 def get_user_courses(cls, user):
72 if user.role != 0:
73 return user.courses
74 else:
75 return cls.get_all()
77 def get_course_summary(self, problems: list):
78 return {
79 "course":
80 self.course_name,
81 "userCount":
82 engine.User.objects(courses=self.id).count(),
83 "homeworkCount":
84 engine.Homework.objects(course_id=str(self.id)).count(),
85 "submissionCount":
86 engine.Submission.objects(problem__in=problems).count(),
87 }
89 def edit_course(self, user, new_course, teacher):
90 if re.match(r'^[a-zA-Z0-9._\- ]+$', new_course) is None:
91 raise ValueError
93 if not self:
94 raise engine.DoesNotExist('Course')
95 if not perm(self, user):
96 raise PermissionError
97 te = User(teacher)
98 if not te:
99 raise engine.DoesNotExist('User')
101 # HACK: not sure why the unique index is not work during the test
102 if Course(new_course):
103 raise engine.NotUniqueError('Course')
105 self.course_name = new_course
106 if te.obj != self.teacher:
107 self.remove_user(self.teacher)
108 self.add_user(te.obj)
109 self.teacher = te.obj
110 self.save()
111 return True
113 def delete_course(self, user):
114 if not self:
115 # course not found
116 raise engine.DoesNotExist('Course')
117 if not perm(self, user):
118 # user is not the TA or teacher in course
119 raise PermissionError
121 self.remove_user(self.teacher)
122 self.delete()
123 return True
125 def get_scoreboard(
126 self,
127 problem_ids: List[int],
128 start: Optional[float] = None,
129 end: Optional[float] = None,
130 ) -> List[Dict]:
131 scoreboard = []
132 usernames = [User(u).id for u in self.student_nicknames.keys()]
133 matching = {
134 "user": {
135 "$in": usernames
136 },
137 "problem": {
138 "$in": problem_ids
139 },
140 "timestamp": {},
141 }
142 if start:
143 matching['timestamp']['$gte'] = datetime.fromtimestamp(start)
144 if end:
145 matching['timestamp']['$lte'] = datetime.fromtimestamp(end)
146 if not matching["timestamp"]:
147 del matching["timestamp"]
148 pipeline = [
149 {
150 "$match": matching
151 },
152 {
153 "$group": {
154 "_id": {
155 "user": "$user",
156 "problem": "$problem",
157 },
158 "count": {
159 "$sum": 1
160 },
161 "max": {
162 "$max": "$score"
163 },
164 "min": {
165 "$min": "$score"
166 },
167 "avg": {
168 "$avg": "$score"
169 },
170 }
171 },
172 {
173 "$group": {
174 "_id": "$_id.user",
175 "scores": {
176 "$push": {
177 "pid": "$_id.problem",
178 "count": "$count",
179 "max": "$max",
180 "min": "$min",
181 "avg": "$avg",
182 },
183 },
184 }
185 },
186 ]
187 cursor = engine.Submission.objects().aggregate(pipeline)
188 unrecorded_users = set(usernames)
189 for item in cursor:
190 sum_of_score = sum(s['max'] for s in item['scores'])
191 scoreboard.append({
192 'user': User(item['_id']).info,
193 'sum': sum_of_score,
194 'avg': sum_of_score / len(problem_ids),
195 **{
196 f'{score["pid"]}': score
197 for score in item['scores']
198 },
199 })
200 unrecorded_users.remove(item['_id'])
201 for u in unrecorded_users:
202 scoreboard.append({
203 'user': User(u).info,
204 'sum': 0,
205 'avg': 0,
206 })
208 return scoreboard
210 @classmethod
211 def add_course(cls, course, teacher):
212 if re.match(r'^[a-zA-Z0-9._\- ]+$', course) is None:
213 raise ValueError
214 teacher = User(teacher)
215 if not teacher:
216 raise engine.DoesNotExist('User')
217 if teacher.role >= 2:
218 raise PermissionError(
219 f'{teacher} is not permitted to create a course')
220 # HACK: not sure why the unique index is not work during the test
221 if cls(course):
222 raise engine.NotUniqueError('Course')
223 co = cls.engine(
224 course_name=course,
225 teacher=teacher.obj,
226 ).save()
227 cls(co).add_user(teacher.obj)
228 return True
230 @classmethod
231 def get_public(cls):
232 if not cls('Public'):
233 cls.add_course('Public', 'first_admin')
234 return cls('Public')
236 def own_permission(self, user) -> Permission:
237 ROLE_CAPABILITY = {
238 0:
239 self.Permission(0),
240 1:
241 self.Permission.VIEW | self.Permission.SCORE,
242 2:
243 self.Permission.VIEW | self.Permission.GRADE,
244 3:
245 self.Permission.VIEW | self.Permission.GRADE
246 | self.Permission.MODIFY,
247 4:
248 self.Permission.VIEW | self.Permission.GRADE
249 | self.Permission.MODIFY,
250 }
252 role = perm(self.obj, user)
254 return ROLE_CAPABILITY[role]
256 def permission(self, user, req) -> bool:
257 """
258 check whether user own `req` permission
259 """
261 return bool(self.own_permission(user) & req)