Coverage for mongo/problem/problem.py: 100%
192 statements
« prev ^ index » next coverage.py v7.3.2, created at 2024-11-05 04:22 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2024-11-05 04:22 +0000
1from .. import engine
2from ..base import MongoBase
3from ..course import *
4from ..utils import (RedisCache, doc_required, drop_none)
5from ..user import User
6from .exception import BadTestCase
7from .test_case import (
8 SimpleIO,
9 ContextIO,
10 IncludeDirectory,
11 TestCaseRule,
12)
13from datetime import datetime
14from typing import (
15 Any,
16 BinaryIO,
17 Dict,
18 List,
19 Optional,
20)
21import json
22import enum
24__all__ = ('Problem', )
27class Problem(MongoBase, engine=engine.Problem):
29 class Permission(enum.IntFlag):
30 VIEW = enum.auto() # user view permission
31 ONLINE = enum.auto() # user can view problem or not
32 MANAGE = enum.auto() # user manage problem permission
34 def detailed_info(self, *ks, **kns) -> Dict[str, Any]:
35 '''
36 return detailed info about this problem. notice
37 that the `input` and `output` of problem test
38 case won't be sent to front end, need call other
39 route to get this info.
41 Args:
42 ks (*str): the field name you want to get
43 kns (**[str, str]):
44 specify the dict key you want to store
45 the data get by field name
46 Return:
47 a dict contains problem's data
48 '''
49 if not self:
50 return {}
51 # problem -> dict
52 _ret = self.to_mongo()
53 # preprocess fields
54 # case zip can not be serialized
55 if 'caseZip' in _ret['testCase']:
56 del _ret['testCase']['caseZip']
57 # convert couse document to course name
58 _ret['courses'] = [course.course_name for course in self.courses]
59 ret = {}
60 for k in ks:
61 kns[k] = k
62 for k, n in kns.items():
63 s_ns = n.split('__')
64 # extract wanted value
65 v = _ret[s_ns[0]]
66 for s_n in s_ns[1:]:
67 v = v[s_n]
68 # extract wanted keys
69 e = ret
70 s_ks = k.split('__')
71 for s_k in s_ks[:-1]:
72 if s_k not in e:
73 e[s_k] = {}
74 e = e[s_k]
75 e[s_ks[-1]] = v
76 return ret
78 def allowed(self, language):
79 if self.problem_type == 2:
80 return True
81 if language >= 3 or language < 0:
82 return False
83 return bool((1 << language) & self.allowed_language)
85 def submit_count(self, user: User) -> int:
86 '''
87 Calculate how many submissions the user has submitted to this problem.
88 '''
89 # reset quota if it's a new day
90 if user.last_submit.date() != datetime.now().date():
91 user.update(problem_submission={})
92 return 0
93 return user.problem_submission.get(str(self.problem_id), 0)
95 def running_homeworks(self) -> List:
96 from ..homework import Homework
97 now = datetime.now()
98 return [Homework(hw.id) for hw in self.homeworks if now in hw.duration]
100 def is_valid_ip(self, ip: str):
101 return all(hw.is_valid_ip(ip) for hw in self.running_homeworks())
103 def get_submission_status(self) -> Dict[str, int]:
104 pipeline = {
105 "$group": {
106 "_id": "$status",
107 "count": {
108 "$sum": 1
109 },
110 }
111 }
112 cursor = engine.Submission.objects(problem=self.id).aggregate(
113 [pipeline], )
114 return {item['_id']: item['count'] for item in cursor}
116 def get_ac_user_count(self) -> int:
117 ac_users = engine.Submission.objects(
118 problem=self.id,
119 status=0,
120 ).distinct('user')
121 return len(ac_users)
123 def get_tried_user_count(self) -> int:
124 tried_users = engine.Submission.objects(
125 problem=self.id, ).distinct('user')
126 return len(tried_users)
128 @doc_required('user', User)
129 def high_score_key(self, user: User) -> str:
130 return f'PROBLEM_{self.id}_{user.id}_HIGH_SCORE'
132 @doc_required('user', User)
133 def get_high_score(self, user: User) -> int:
134 '''
135 Get highest score for user of this problem.
136 '''
137 cache = RedisCache()
138 key = self.high_score_key(user=user)
139 if (val := cache.get(key)) is not None:
140 return int(val.decode())
141 # TODO: avoid calling mongoengine API directly
142 submissions = engine.Submission.objects(
143 user=user.id,
144 problem=self.id,
145 ).only('score').order_by('-score').limit(1)
146 if submissions.count() == 0:
147 high_score = 0
148 else:
149 # It might < 0 if there is only incomplete submission
150 high_score = max(submissions[0].score, 0)
151 cache.set(key, high_score, ex=600)
152 return high_score
154 @doc_required('user', User)
155 def own_permission(self, user: User) -> Permission:
156 """
157 generate user permission capability
158 """
160 user_cap = self.Permission(0)
161 for course in map(Course, self.courses):
162 # inherit course permission
163 if course.permission(user, Course.Permission.VIEW):
164 user_cap |= self.Permission.VIEW
166 # online problem
167 if self.problem_status == 0:
168 check_public_problem = True
169 for homework in course.homeworks:
170 if self.problem_id in homework.problem_ids:
171 check_public_problem = False
172 # current time after homework then online problem
173 if datetime.now() >= homework.duration.start:
174 user_cap |= self.Permission.ONLINE
176 # problem does not belong to any homework
177 if check_public_problem:
178 user_cap |= self.Permission.ONLINE
180 # Admin, Teacher && is owner
181 if user.role == 0 or self.owner == user.username:
182 user_cap |= self.Permission.VIEW
183 user_cap |= self.Permission.ONLINE
184 user_cap |= self.Permission.MANAGE
186 return user_cap
188 def permission(self, user: User, req: Permission) -> bool:
189 """
190 check whether user own `req` permission
191 """
193 return (self.own_permission(user=user) & req) == req
195 @classmethod
196 def get_problem_list(
197 cls,
198 user,
199 offset: int = 0,
200 count: int = -1,
201 problem_id: int = None,
202 name: str = None,
203 tags: list = None,
204 course: str = None,
205 ):
206 '''
207 get a list of problems
208 '''
209 if course is not None:
210 course = Course(course)
211 if not course:
212 return []
213 course = course.obj
214 # qurey args
215 ks = drop_none({
216 'problem_id': problem_id,
217 'problem_name': name,
218 'courses': course,
219 'tags__in': tags,
220 })
221 problems = [
222 p for p in engine.Problem.objects(**ks).order_by('problemId')
223 if cls(p).permission(user=user, req=cls.Permission.ONLINE)
224 ]
225 # truncate
226 if offset < 0 or (offset >= len(problems) and len(problems)):
227 raise IndexError
228 right = len(problems) if count < 0 else offset + count
229 right = min(len(problems), right)
230 return problems[offset:right]
232 @classmethod
233 def add(
234 cls,
235 user: User,
236 courses: List[str],
237 problem_name: str,
238 status: Optional[int] = None,
239 description: Optional[Dict[str, Any]] = None,
240 tags: Optional[List[str]] = None,
241 type: Optional[int] = None,
242 test_case_info: Optional[Dict[str, Any]] = None,
243 can_view_stdout: bool = False,
244 allowed_language: Optional[int] = None,
245 quota: Optional[int] = None,
246 default_code: Optional[str] = None,
247 ):
248 if len(courses) == 0:
249 raise ValueError('No course provided')
250 course_objs = []
251 for course in map(Course, courses):
252 if not course:
253 raise engine.DoesNotExist
254 course_objs.append(course.id)
255 problem_args = drop_none({
256 'courses': course_objs,
257 'problem_status': status,
258 'problem_type': type,
259 'problem_name': problem_name,
260 'description': description,
261 'owner': user.username,
262 'tags': tags,
263 'quota': quota,
264 'default_code': default_code,
265 })
266 problem = cls.engine(**problem_args).save()
267 programming_problem_args = drop_none({
268 'test_case':
269 test_case_info,
270 'can_view_stdout':
271 can_view_stdout,
272 'allowed_language':
273 allowed_language,
274 })
275 if programming_problem_args and type != 2:
276 problem.update(**programming_problem_args)
277 return problem.problem_id
279 @classmethod
280 def edit_problem(
281 cls,
282 user: User,
283 problem_id: int,
284 courses: List[str],
285 status: int,
286 problem_name: str,
287 description: Dict[str, Any],
288 tags: List[str],
289 type,
290 test_case_info: Optional[Dict[str, Any]] = None,
291 allowed_language: int = 7,
292 can_view_stdout: bool = False,
293 quota: int = -1,
294 default_code: str = '',
295 ):
296 if type != 2:
297 score = sum(t['taskScore'] for t in test_case_info['tasks'])
298 if score != 100:
299 raise ValueError("Cases' scores should be 100 in total")
300 problem = Problem(problem_id).obj
301 course_objs = []
302 for name in courses:
303 if not (course := Course(name)):
304 raise engine.DoesNotExist
305 course_objs.append(course.obj)
306 problem.update(
307 courses=course_objs,
308 problem_status=status,
309 problem_type=type,
310 problem_name=problem_name,
311 description=description,
312 owner=user.username,
313 tags=tags,
314 quota=quota,
315 default_code=default_code,
316 )
317 if type != 2:
318 # preprocess test case
319 test_case = problem.test_case
320 if test_case_info:
321 test_case = engine.ProblemTestCase.from_json(
322 json.dumps(test_case_info))
323 test_case.case_zip = problem.test_case.case_zip
324 problem.update(
325 allowed_language=allowed_language,
326 can_view_stdout=can_view_stdout,
327 test_case=test_case,
328 )
330 def update_test_case(self, test_case: BinaryIO):
331 '''
332 edit problem's testcase
334 Args:
335 test_case: testcase zip file
336 Exceptions:
337 zipfile.BadZipFile: if `test_case` is not a zip file
338 ValueError: if test case is None or problem_id is invalid
339 engine.DoesNotExist
340 '''
341 rules: List[TestCaseRule] = [
342 IncludeDirectory(self, 'include'),
343 IncludeDirectory(self, 'share'),
344 # for backward compatibility
345 IncludeDirectory(self, 'chaos'),
346 ]
347 for rule in rules:
348 rule.validate(test_case)
350 # Should only match one format
351 rules = [
352 SimpleIO(self, ['include/', 'share/', 'chaos/']),
353 ContextIO(self),
354 ]
355 excs = []
356 for rule in rules:
357 try:
358 rule.validate(test_case)
359 except BadTestCase as e:
360 excs.append(e)
362 if len(excs) == 0:
363 raise BadTestCase('ambiguous test case format')
364 elif len(excs) == 2:
365 raise BadTestCase(
366 f'invalid test case format\n\n{excs[0]}\n\n{excs[1]}')
368 # save zip file
369 test_case.seek(0)
370 # check whether the test case exists
371 if self.test_case.case_zip.grid_id is None:
372 # if no, put data to a new file
373 write_func = self.test_case.case_zip.put
374 else:
375 # else, replace original file with a new one
376 write_func = self.test_case.case_zip.replace
377 write_func(
378 test_case,
379 content_type='application/zip',
380 )
381 # update problem obj
382 self.save()
384 @classmethod
385 def copy_problem(cls, user, problem_id):
386 problem = Problem(problem_id).obj
387 engine.Problem(
388 problem_status=problem.problem_status,
389 problem_type=problem.problem_type,
390 problem_name=problem.problem_name,
391 description=problem.description,
392 owner=user.username,
393 tags=problem.tags,
394 test_case=problem.test_case,
395 ).save()
397 @doc_required('target', Course, src_none_allowed=True)
398 def copy_to(
399 self,
400 user: User,
401 target: Optional[Course] = None,
402 **override,
403 ) -> 'Problem':
404 '''
405 Copy a problem to target course, hidden by default.
407 Args:
408 user (User): The user who execute this action and will become
409 the owner of copied problem.
410 target (Optional[Course] = None): The course this problem will
411 be copied to, default to the first of origial courses.
412 override: Override field values passed to `Problem.add`.
413 '''
414 target = self.courses[0] if target is None else target
415 # Copied problem is hidden by default
416 status = override.pop('status', Problem.engine.Visibility.HIDDEN)
417 ks = dict(
418 user=user,
419 courses=[target.course_name],
420 problem_name=self.problem_name,
421 status=status,
422 description=self.description.to_mongo(),
423 tags=self.tags,
424 type=self.problem_type,
425 test_case_info=self.test_case.to_mongo(),
426 can_view_stdout=self.can_view_stdout,
427 allowed_language=self.allowed_language,
428 quota=self.quota,
429 default_code=self.default_code,
430 )
431 ks.update(override)
432 copy = self.add(**ks)
433 return copy
435 @classmethod
436 def release_problem(cls, problem_id):
437 course = Course('Public').obj
438 problem = Problem(problem_id).obj
439 problem.courses = [course]
440 problem.owner = 'first_admin'
441 problem.save()