Coverage for mongo/course.py: 100%
126 statements
« prev ^ index » next coverage.py v7.3.2, created at 2024-11-05 04:22 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2024-11-05 04:22 +0000
1from . import engine
2from .user import *
3from .utils import *
4import re
5import enum
6from typing import Dict, List, Optional
7from .base import MongoBase
8from datetime import datetime
10__all__ = [
11 'Course',
12]
15class Course(MongoBase, engine=engine.Course):
17 class Permission(enum.IntFlag):
18 VIEW = enum.auto() # view course basic info
19 SCORE = enum.auto() # only can view self score
20 MODIFY = enum.auto() # manage course
21 GRADE = enum.auto() # grade students' score
23 def __new__(cls, course_name, *args, **kwargs):
24 try:
25 new = super().__new__(cls, course_name)
26 except engine.ValidationError:
27 try:
28 pk = Course.engine.objects(course_name=course_name).get()
29 new = super().__new__(cls, pk)
30 except engine.DoesNotExist:
31 new = super().__new__(cls, '0' * 24)
32 return new
34 def update_student_namelist(
35 self,
36 student_nicknames: Dict[str, str],
37 ):
38 from .homework import Homework
39 if not all(User(name) for name in student_nicknames):
40 raise engine.DoesNotExist(f'User not found')
41 drop_user = set(self.student_nicknames) - set(student_nicknames)
42 for user in drop_user:
43 self.remove_user(User(user).obj)
44 new_user = set(student_nicknames) - set(self.student_nicknames)
45 for user in new_user:
46 self.add_user(User(user).obj)
47 self.student_nicknames = student_nicknames
48 # TODO: use event to update homework data
49 drop_user = [*map(User, drop_user)]
50 new_user = [*map(User, new_user)]
51 for homework in map(Homework, self.homeworks):
52 homework.remove_student(drop_user)
53 homework.add_student(new_user)
54 self.save()
56 def add_user(self, user: User):
57 if not self:
58 raise engine.DoesNotExist(f'Course [{self.course_name}]')
59 user.update(add_to_set__courses=self.id)
60 user.reload('courses')
62 def remove_user(self, user: User):
63 user.update(pull__courses=self.id)
64 user.reload('courses')
66 @classmethod
67 def get_all(cls):
68 return engine.Course.objects
70 @classmethod
71 def get_user_courses(cls, user):
72 if user.role != 0:
73 return user.courses
74 else:
75 return cls.get_all()
77 def edit_course(self, user, new_course, teacher):
78 if re.match(r'^[a-zA-Z0-9._\- ]+$', new_course) is None:
79 raise ValueError
81 if not self:
82 raise engine.DoesNotExist('Course')
83 if not perm(self, user):
84 raise PermissionError
85 te = User(teacher)
86 if not te:
87 raise engine.DoesNotExist('User')
89 self.course_name = new_course
90 if te.obj != self.teacher:
91 self.remove_user(self.teacher)
92 self.add_user(te.obj)
93 self.teacher = te.obj
94 self.save()
95 return True
97 def delete_course(self, user):
98 if not self:
99 # course not found
100 raise engine.DoesNotExist('Course')
101 if not perm(self, user):
102 # user is not the TA or teacher in course
103 raise PermissionError
105 self.remove_user(self.teacher)
106 self.delete()
107 return True
109 def get_scoreboard(
110 self,
111 problem_ids: List[int],
112 start: Optional[float] = None,
113 end: Optional[float] = None,
114 ) -> List[Dict]:
115 scoreboard = []
116 usernames = [User(u).id for u in self.student_nicknames.keys()]
117 matching = {
118 "user": {
119 "$in": usernames
120 },
121 "problem": {
122 "$in": problem_ids
123 },
124 "timestamp": {},
125 }
126 if start:
127 matching['timestamp']['$gte'] = datetime.fromtimestamp(start)
128 if end:
129 matching['timestamp']['$lte'] = datetime.fromtimestamp(end)
130 if not matching["timestamp"]:
131 del matching["timestamp"]
132 pipeline = [
133 {
134 "$match": matching
135 },
136 {
137 "$group": {
138 "_id": {
139 "user": "$user",
140 "problem": "$problem",
141 },
142 "count": {
143 "$sum": 1
144 },
145 "max": {
146 "$max": "$score"
147 },
148 "min": {
149 "$min": "$score"
150 },
151 "avg": {
152 "$avg": "$score"
153 },
154 }
155 },
156 {
157 "$group": {
158 "_id": "$_id.user",
159 "scores": {
160 "$push": {
161 "pid": "$_id.problem",
162 "count": "$count",
163 "max": "$max",
164 "min": "$min",
165 "avg": "$avg",
166 },
167 },
168 }
169 },
170 ]
171 cursor = engine.Submission.objects().aggregate(pipeline)
172 unrecorded_users = set(usernames)
173 for item in cursor:
174 sum_of_score = sum(s['max'] for s in item['scores'])
175 scoreboard.append({
176 'user': User(item['_id']).info,
177 'sum': sum_of_score,
178 'avg': sum_of_score / len(problem_ids),
179 **{f'{score["pid"]}': score
180 for score in item['scores']},
181 })
182 unrecorded_users.remove(item['_id'])
183 for u in unrecorded_users:
184 scoreboard.append({
185 'user': User(u).info,
186 'sum': 0,
187 'avg': 0,
188 })
190 return scoreboard
192 @classmethod
193 def add_course(cls, course, teacher):
194 if re.match(r'^[a-zA-Z0-9._\- ]+$', course) is None:
195 raise ValueError
196 teacher = User(teacher)
197 if not teacher:
198 raise engine.DoesNotExist('User')
199 if teacher.role >= 2:
200 raise PermissionError(
201 f'{teacher} is not permitted to create a course')
202 co = cls.engine(
203 course_name=course,
204 teacher=teacher.obj,
205 ).save()
206 cls(co).add_user(teacher.obj)
207 return True
209 @classmethod
210 def get_public(cls):
211 if not cls('Public'):
212 cls.add_course('Public', 'first_admin')
213 return cls('Public')
215 def own_permission(self, user) -> Permission:
216 ROLE_CAPABILITY = {
217 0:
218 self.Permission(0),
219 1:
220 self.Permission.VIEW | self.Permission.SCORE,
221 2:
222 self.Permission.VIEW | self.Permission.GRADE,
223 3:
224 self.Permission.VIEW | self.Permission.GRADE
225 | self.Permission.MODIFY,
226 4:
227 self.Permission.VIEW | self.Permission.GRADE
228 | self.Permission.MODIFY,
229 }
231 role = perm(self.obj, user)
233 return ROLE_CAPABILITY[role]
235 def permission(self, user, req) -> bool:
236 """
237 check whether user own `req` permission
238 """
240 return bool(self.own_permission(user) & req)