Coverage for mongo/course.py: 99%
130 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-14 03:01 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-14 03:01 +0000
1from . import engine
2from .user import *
3from .utils import *
4import re
5import enum
6from typing import Dict, List, Optional
7from .base import MongoBase
8from datetime import datetime
10__all__ = [
11 'Course',
12]
15class Course(MongoBase, engine=engine.Course):
17 class Permission(enum.IntFlag):
18 VIEW = enum.auto() # view course basic info
19 SCORE = enum.auto() # only can view self score
20 MODIFY = enum.auto() # manage course
21 GRADE = enum.auto() # grade students' score
23 def __new__(cls, course_name, *args, **kwargs):
24 try:
25 new = super().__new__(cls, course_name)
26 except engine.ValidationError:
27 try:
28 pk = Course.engine.objects(course_name=course_name).get()
29 new = super().__new__(cls, pk)
30 except engine.DoesNotExist:
31 new = super().__new__(cls, '0' * 24)
32 return new
34 def update_student_namelist(
35 self,
36 student_nicknames: Dict[str, str],
37 ):
38 from .homework import Homework
39 if not all(User(name) for name in student_nicknames):
40 raise engine.DoesNotExist(f'User not found')
41 drop_user = set(self.student_nicknames) - set(student_nicknames)
42 for user in drop_user:
43 self.remove_user(User(user).obj)
44 new_user = set(student_nicknames) - set(self.student_nicknames)
45 for user in new_user:
46 self.add_user(User(user).obj)
47 self.student_nicknames = student_nicknames
48 # TODO: use event to update homework data
49 drop_user = [*map(User, drop_user)]
50 new_user = [*map(User, new_user)]
51 for homework in map(Homework, self.homeworks):
52 homework.remove_student(drop_user)
53 homework.add_student(new_user)
54 self.save()
56 def add_user(self, user: User):
57 if not self:
58 raise engine.DoesNotExist(f'Course [{self.course_name}]')
59 user.update(add_to_set__courses=self.id)
60 user.reload('courses')
62 def remove_user(self, user: User):
63 user.update(pull__courses=self.id)
64 user.reload('courses')
66 @classmethod
67 def get_all(cls):
68 return engine.Course.objects
70 @classmethod
71 def get_user_courses(cls, user):
72 if user.role != 0:
73 return user.courses
74 else:
75 return cls.get_all()
77 def edit_course(self, user, new_course, teacher):
78 if re.match(r'^[a-zA-Z0-9._\- ]+$', new_course) is None:
79 raise ValueError
81 if not self:
82 raise engine.DoesNotExist('Course')
83 if not perm(self, user):
84 raise PermissionError
85 te = User(teacher)
86 if not te:
87 raise engine.DoesNotExist('User')
89 # HACK: not sure why the unique index is not work during the test
90 if Course(new_course):
91 raise engine.NotUniqueError('Course')
93 self.course_name = new_course
94 if te.obj != self.teacher:
95 self.remove_user(self.teacher)
96 self.add_user(te.obj)
97 self.teacher = te.obj
98 self.save()
99 return True
101 def delete_course(self, user):
102 if not self:
103 # course not found
104 raise engine.DoesNotExist('Course')
105 if not perm(self, user):
106 # user is not the TA or teacher in course
107 raise PermissionError
109 self.remove_user(self.teacher)
110 self.delete()
111 return True
113 def get_scoreboard(
114 self,
115 problem_ids: List[int],
116 start: Optional[float] = None,
117 end: Optional[float] = None,
118 ) -> List[Dict]:
119 scoreboard = []
120 usernames = [User(u).id for u in self.student_nicknames.keys()]
121 matching = {
122 "user": {
123 "$in": usernames
124 },
125 "problem": {
126 "$in": problem_ids
127 },
128 "timestamp": {},
129 }
130 if start:
131 matching['timestamp']['$gte'] = datetime.fromtimestamp(start)
132 if end:
133 matching['timestamp']['$lte'] = datetime.fromtimestamp(end)
134 if not matching["timestamp"]:
135 del matching["timestamp"]
136 pipeline = [
137 {
138 "$match": matching
139 },
140 {
141 "$group": {
142 "_id": {
143 "user": "$user",
144 "problem": "$problem",
145 },
146 "count": {
147 "$sum": 1
148 },
149 "max": {
150 "$max": "$score"
151 },
152 "min": {
153 "$min": "$score"
154 },
155 "avg": {
156 "$avg": "$score"
157 },
158 }
159 },
160 {
161 "$group": {
162 "_id": "$_id.user",
163 "scores": {
164 "$push": {
165 "pid": "$_id.problem",
166 "count": "$count",
167 "max": "$max",
168 "min": "$min",
169 "avg": "$avg",
170 },
171 },
172 }
173 },
174 ]
175 cursor = engine.Submission.objects().aggregate(pipeline)
176 unrecorded_users = set(usernames)
177 for item in cursor:
178 sum_of_score = sum(s['max'] for s in item['scores'])
179 scoreboard.append({
180 'user': User(item['_id']).info,
181 'sum': sum_of_score,
182 'avg': sum_of_score / len(problem_ids),
183 **{f'{score["pid"]}': score
184 for score in item['scores']},
185 })
186 unrecorded_users.remove(item['_id'])
187 for u in unrecorded_users:
188 scoreboard.append({
189 'user': User(u).info,
190 'sum': 0,
191 'avg': 0,
192 })
194 return scoreboard
196 @classmethod
197 def add_course(cls, course, teacher):
198 if re.match(r'^[a-zA-Z0-9._\- ]+$', course) is None:
199 raise ValueError
200 teacher = User(teacher)
201 if not teacher:
202 raise engine.DoesNotExist('User')
203 if teacher.role >= 2:
204 raise PermissionError(
205 f'{teacher} is not permitted to create a course')
206 # HACK: not sure why the unique index is not work during the test
207 if cls(course):
208 raise engine.NotUniqueError('Course')
209 co = cls.engine(
210 course_name=course,
211 teacher=teacher.obj,
212 ).save()
213 cls(co).add_user(teacher.obj)
214 return True
216 @classmethod
217 def get_public(cls):
218 if not cls('Public'):
219 cls.add_course('Public', 'first_admin')
220 return cls('Public')
222 def own_permission(self, user) -> Permission:
223 ROLE_CAPABILITY = {
224 0:
225 self.Permission(0),
226 1:
227 self.Permission.VIEW | self.Permission.SCORE,
228 2:
229 self.Permission.VIEW | self.Permission.GRADE,
230 3:
231 self.Permission.VIEW | self.Permission.GRADE
232 | self.Permission.MODIFY,
233 4:
234 self.Permission.VIEW | self.Permission.GRADE
235 | self.Permission.MODIFY,
236 }
238 role = perm(self.obj, user)
240 return ROLE_CAPABILITY[role]
242 def permission(self, user, req) -> bool:
243 """
244 check whether user own `req` permission
245 """
247 return bool(self.own_permission(user) & req)