Coverage for mongo/submission.py: 74%
465 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-14 03:01 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-14 03:01 +0000
1from __future__ import annotations
2import io
3import os
4import pathlib
5import secrets
6import logging
7from typing import (
8 Any,
9 Dict,
10 Optional,
11 Union,
12 List,
13)
14import enum
15import tempfile
16import requests as rq
17import itertools
18from bson.son import SON
19from flask import current_app
20from tempfile import NamedTemporaryFile
21from datetime import date, datetime
22from zipfile import ZipFile, is_zipfile
23from ulid import ULID
25from . import engine
26from .base import MongoBase
27from .user import User
28from .problem import Problem
29from .homework import Homework
30from .course import Course
31from .utils import RedisCache, MinioClient
33__all__ = [
34 'SubmissionConfig',
35 'Submission',
36 'JudgeQueueFullError',
37 'TestCaseNotFound',
38]
40# TODO: modular token function
43def gen_key(_id):
44 return f'stoekn_{_id}'
47def gen_token():
48 return secrets.token_urlsafe()
51# Errors
52class JudgeQueueFullError(Exception):
53 '''
54 when sandbox task queue is full
55 '''
58class TestCaseNotFound(Exception):
59 '''
60 when a problem's testcase havn't been uploaded
61 '''
62 __test__ = False
64 def __init__(self, problem_id):
65 self.problem_id = problem_id
67 def __str__(self):
68 return f'{Problem(self.problem_id)}\'s testcase is not found'
71class SubmissionCodeNotFound(Exception):
72 '''
73 when a submission's code is not found
74 '''
77class SubmissionConfig(MongoBase, engine=engine.SubmissionConfig):
78 TMP_DIR = pathlib.Path(
79 os.getenv(
80 'SUBMISSION_TMP_DIR',
81 tempfile.TemporaryDirectory(suffix='noj-submisisons').name,
82 ), )
84 def __init__(self, name: str):
85 self.name = name
88class Submission(MongoBase, engine=engine.Submission):
90 class Permission(enum.IntFlag):
91 VIEW = enum.auto() # view submission info
92 UPLOAD = enum.auto() # student can re-upload
93 FEEDBACK = enum.auto() # student can view homework feedback
94 COMMENT = enum.auto() # teacher or TAs can give comment
95 REJUDGE = enum.auto() # teacher or TAs can rejudge submission
96 GRADE = enum.auto() # teacher or TAs can grade homework
97 VIEW_OUTPUT = enum.auto()
98 OTHER = VIEW
99 STUDENT = OTHER | UPLOAD | FEEDBACK
100 MANAGER = STUDENT | COMMENT | REJUDGE | GRADE | VIEW_OUTPUT
102 _config = None
104 def __init__(self, submission_id):
105 self.submission_id = str(submission_id)
107 def __str__(self):
108 return f'submission [{self.submission_id}]'
110 @property
111 def id(self):
112 '''
113 convert mongo ObjectId to hex string for serialize
114 '''
115 return str(self.obj.id)
117 @property
118 def problem_id(self) -> int:
119 return self.problem.problem_id
121 @property
122 def username(self) -> str:
123 return self.user.username
125 @property
126 def status2code(self):
127 return {
128 'AC': 0,
129 'WA': 1,
130 'CE': 2,
131 'TLE': 3,
132 'MLE': 4,
133 'RE': 5,
134 'JE': 6,
135 'OLE': 7,
136 }
138 @property
139 def handwritten(self):
140 return self.language == 3
142 @property
143 def tmp_dir(self) -> pathlib.Path:
144 tmp_dir = self.config().TMP_DIR
145 tmp_dir.mkdir(exist_ok=True)
146 tmp_dir = tmp_dir / self.username / self.id
147 tmp_dir.mkdir(exist_ok=True, parents=True)
148 return tmp_dir
150 @property
151 def main_code_ext(self):
152 lang2ext = {0: '.c', 1: '.cpp', 2: '.py', 3: '.pdf'}
153 return lang2ext[self.language]
155 def main_code_path(self) -> str:
156 # handwritten submission didn't provide this function
157 if self.handwritten:
158 return
159 # get excepted code name & temp path
160 ext = self.main_code_ext
161 path = self.tmp_dir / f'main{ext}'
162 # check whether the code has been generated
163 if not path.exists():
164 if (z := self._get_code_zip()) is None:
165 raise SubmissionCodeNotFound
166 with z as zf:
167 path.write_text(zf.read(f'main{ext}').decode('utf-8'))
168 # return absolute path
169 return str(path.absolute())
171 @classmethod
172 def config(cls):
173 if not cls._config:
174 cls._config = SubmissionConfig('submission')
175 if not cls._config:
176 cls._config.save()
177 return cls._config.reload()
179 def get_single_output(
180 self,
181 task_no: int,
182 case_no: int,
183 text: bool = True,
184 ):
185 try:
186 case = self.tasks[task_no].cases[case_no]
187 except IndexError:
188 raise FileNotFoundError('task not exist')
189 ret = {}
190 try:
191 with ZipFile(case.output) as zf:
192 ret = {k: zf.read(k) for k in ('stdout', 'stderr')}
193 if text:
194 ret = {k: v.decode('utf-8') for k, v in ret.items()}
195 except AttributeError:
196 raise AttributeError('The submission is still in pending')
197 return ret
199 def delete_output(self, *args):
200 '''
201 delete stdout/stderr of this submission
203 Args:
204 args: ignored value, don't mind
205 '''
206 for task in self.tasks:
207 for case in task.cases:
208 case.output.delete()
210 def delete(self, *keeps):
211 '''
212 delete submission and its related file
214 Args:
215 keeps:
216 the field name you want to keep, accepted
217 value is {'comment', 'code', 'output'}
218 other value will be ignored
219 '''
220 drops = {'comment', 'code', 'output'} - {*keeps}
221 del_funcs = {
222 'output': self.delete_output,
223 }
225 def default_del_func(d):
226 return self.obj[d].delete()
228 for d in drops:
229 del_funcs.get(d, default_del_func)(d)
230 self.obj.delete()
232 def sandbox_resp_handler(self, resp):
233 # judge queue is currently full
234 def on_500(resp):
235 raise JudgeQueueFullError
237 # backend send some invalid data
238 def on_400(resp):
239 raise ValueError(resp.text)
241 # send a invalid token
242 def on_403(resp):
243 raise ValueError('invalid token')
245 h = {
246 500: on_500,
247 403: on_403,
248 400: on_400,
249 200: lambda r: True,
250 }
251 try:
252 return h[resp.status_code](resp)
253 except KeyError:
254 self.logger.error('can not handle response from sandbox')
255 self.logger.error(
256 f'status code: {resp.status_code}\n'
257 f'headers: {resp.headers}\n'
258 f'body: {resp.text}', )
259 return False
261 def target_sandbox(self):
262 load = 10**3 # current min load
263 tar = None # target
264 for sb in self.config().sandbox_instances:
265 resp = rq.get(f'{sb.url}/status')
266 if not resp.ok:
267 self.logger.warning(f'sandbox {sb.name} status exception')
268 self.logger.warning(
269 f'status code: {resp.status_code}\n '
270 f'body: {resp.text}', )
271 continue
272 resp = resp.json()
273 if resp['load'] < load:
274 load = resp['load']
275 tar = sb
276 return tar
278 def get_comment(self) -> bytes:
279 '''
280 if comment not exist
281 '''
282 if self.comment.grid_id is None:
283 raise FileNotFoundError('it seems that comment haven\'t upload')
284 return self.comment.read()
286 def _check_code(self, file):
287 if not file:
288 return 'no file'
289 if not is_zipfile(file):
290 return 'not a valid zip file'
292 # HACK: hard-coded config
293 MAX_SIZE = 10**7
294 with ZipFile(file) as zf:
295 infos = zf.infolist()
297 size = sum(i.file_size for i in infos)
298 if size > MAX_SIZE:
299 return 'code file size too large'
301 if len(infos) != 1:
302 return 'more than one file in zip'
303 name, ext = os.path.splitext(infos[0].filename)
304 if name != 'main':
305 return 'only accept file with name \'main\''
306 if ext != ['.c', '.cpp', '.py', '.pdf'][self.language]:
307 return f'invalid file extension, got {ext}'
308 if ext == '.pdf':
309 with zf.open('main.pdf') as pdf:
310 if pdf.read(5) != b'%PDF-':
311 return 'only accept PDF file.'
312 file.seek(0)
313 return None
315 def rejudge(self) -> bool:
316 '''
317 rejudge this submission
318 '''
319 # delete output file
320 self.delete_output()
321 # turn back to haven't be judged
322 self.update(
323 status=-1,
324 last_send=datetime.now(),
325 tasks=[],
326 )
327 if current_app.config['TESTING']:
328 return True
329 return self.send()
331 def _generate_obj_path(self):
332 return f'submissions/{ULID()}.zip'
334 def _put_code(self, code_file) -> str:
335 '''
336 put code file to minio, return the object name
337 '''
338 if (err := self._check_code(code_file)) is not None:
339 raise ValueError(err)
341 minio_client = MinioClient()
342 path = self._generate_obj_path()
343 minio_client.client.put_object(
344 minio_client.bucket,
345 path,
346 code_file,
347 -1,
348 part_size=5 * 1024 * 1024,
349 content_type='application/zip',
350 )
351 return path
353 def submit(self, code_file) -> bool:
354 '''
355 prepare data for submit code to sandbox and then send it
357 Args:
358 code_file: a zip file contains user's code
359 '''
360 # unexisted id
361 if not self:
362 raise engine.DoesNotExist(f'{self}')
363 self.update(
364 status=-1,
365 last_send=datetime.now(),
366 code_minio_path=self._put_code(code_file),
367 )
368 self.reload()
369 self.logger.debug(f'{self} code updated.')
370 # delete old handwritten submission
371 if self.handwritten:
372 q = {
373 'problem': self.problem,
374 'user': self.user,
375 'language': 3,
376 }
377 for submission in engine.Submission.objects(**q):
378 if submission != self.obj:
379 for homework in self.problem.homeworks:
380 stat = homework.student_status[self.user.username][str(
381 self.problem_id)]
382 stat['score'] = 0
383 stat['problemStatus'] = -1
384 stat['submissionIds'] = []
385 homework.save()
386 submission.delete()
387 # we no need to actually send code to sandbox during testing
388 if current_app.config['TESTING'] or self.handwritten:
389 return True
390 return self.send()
392 def send(self) -> bool:
393 '''
394 send code to sandbox
395 '''
396 if self.handwritten:
397 logging.warning(f'try to send a handwritten {self}')
398 return False
399 # TODO: Ensure problem is ready to submitted
400 # if not Problem(self.problem).is_test_case_ready():
401 # raise TestCaseNotFound(self.problem.problem_id)
402 # setup post body
403 files = {
404 'src': io.BytesIO(b"".join(self._get_code_raw())),
405 }
406 # look for the target sandbox
407 tar = self.target_sandbox()
408 if tar is None:
409 self.logger.error(f'can not target a sandbox for {repr(self)}')
410 return False
411 # save token for validation
412 Submission.assign_token(self.id, tar.token)
413 post_data = {
414 'token': tar.token,
415 'checker': 'print("not implement yet. qaq")',
416 'problem_id': self.problem_id,
417 'language': self.language,
418 }
419 judge_url = f'{tar.url}/submit/{self.id}'
420 # send submission to snadbox for judgement
421 self.logger.info(f'send {self} to {tar.name}')
422 resp = rq.post(
423 judge_url,
424 data=post_data,
425 files=files,
426 )
427 self.logger.info(f'recieve {self} resp from sandbox')
428 return self.sandbox_resp_handler(resp)
430 def process_result(self, tasks: list):
431 '''
432 process results from sandbox
434 Args:
435 tasks:
436 a 2-dim list of the dict with schema
437 {
438 'exitCode': int,
439 'status': str,
440 'stdout': str,
441 'stderr': str,
442 'execTime': int,
443 'memoryUsage': int
444 }
445 '''
446 self.logger.info(f'recieve {self} result')
447 for task in tasks:
448 for case in task:
449 # we don't need exit code
450 del case['exitCode']
451 # convert status into integer
452 case['status'] = self.status2code.get(case['status'], -3)
453 # process task
454 for i, cases in enumerate(tasks):
455 # save stdout/stderr
456 fds = ['stdout', 'stderr']
457 for j, case in enumerate(cases):
458 tf = NamedTemporaryFile(delete=False)
459 with ZipFile(tf, 'w') as zf:
460 for fd in fds:
461 content = case.pop(fd)
462 if content is None:
463 self.logger.error(
464 f'key {fd} not in case result {self} {i:02d}{j:02d}'
465 )
466 zf.writestr(fd, content)
467 tf.seek(0)
468 case['output'] = tf
469 # convert dict to document
470 cases[j] = engine.CaseResult(
471 status=case['status'],
472 exec_time=case['execTime'],
473 memory_usage=case['memoryUsage'],
474 output=case['output'],
475 )
476 status = max(c.status for c in cases)
477 exec_time = max(c.exec_time for c in cases)
478 memory_usage = max(c.memory_usage for c in cases)
479 tasks[i] = engine.TaskResult(
480 status=status,
481 exec_time=exec_time,
482 memory_usage=memory_usage,
483 score=self.problem.test_case.tasks[i].task_score
484 if status == 0 else 0,
485 cases=cases,
486 )
487 status = max(t.status for t in tasks)
488 exec_time = max(t.exec_time for t in tasks)
489 memory_usage = max(t.memory_usage for t in tasks)
490 self.update(
491 score=sum(task.score for task in tasks),
492 status=status,
493 tasks=tasks,
494 exec_time=exec_time,
495 memory_usage=memory_usage,
496 )
497 self.reload()
498 self.finish_judging()
499 return True
501 def finish_judging(self):
502 # update user's submission
503 User(self.username).add_submission(self)
504 # update homework data
505 for homework in self.problem.homeworks:
506 try:
507 stat = homework.student_status[self.username][str(
508 self.problem_id)]
509 except KeyError:
510 self.logger.warning(
511 f'{self} not in {homework} [user={self.username}, problem={self.problem_id}]'
512 )
513 continue
514 if self.handwritten:
515 continue
516 if 'rawScore' not in stat:
517 stat['rawScore'] = 0
518 stat['submissionIds'].append(self.id)
519 # handwritten problem will only keep the last submission
520 if self.handwritten:
521 stat['submissionIds'] = stat['submissionIds'][-1:]
522 # if the homework is overdue, do the penalty
523 if self.timestamp > homework.duration.end and not self.handwritten and homework.penalty is not None:
524 self.score, stat['rawScore'] = Homework(homework).do_penalty(
525 self, stat)
526 else:
527 if self.score > stat['rawScore']:
528 stat['rawScore'] = self.score
529 # update high score / handwritten problem is judged by teacher
530 if self.score >= stat['score'] or self.handwritten:
531 stat['score'] = self.score
532 stat['problemStatus'] = self.status
534 homework.save()
535 key = Problem(self.problem).high_score_key(user=self.user)
536 RedisCache().delete(key)
538 def add_comment(self, file):
539 '''
540 comment a submission with PDF
542 Args:
543 file: a PDF file
544 '''
545 data = file.read()
546 # check magic number
547 if data[:5] != b'%PDF-':
548 raise ValueError('only accept PDF file.')
549 # write to a new file if it did not exist before
550 if self.comment.grid_id is None:
551 write_func = self.comment.put
552 # replace its content otherwise
553 else:
554 write_func = self.comment.replace
555 write_func(data)
556 self.logger.debug(f'{self} comment updated.')
557 # update submission
558 self.save()
560 @staticmethod
561 def count():
562 return len(engine.Submission.objects)
564 @classmethod
565 def filter(
566 cls,
567 user,
568 offset: int = 0,
569 count: int = -1,
570 problem: Optional[Union[Problem, int]] = None,
571 q_user: Optional[Union[User, str]] = None,
572 status: Optional[int] = None,
573 language_type: Optional[Union[List[int], int]] = None,
574 course: Optional[Union[Course, str]] = None,
575 before: Optional[datetime] = None,
576 after: Optional[datetime] = None,
577 sort_by: Optional[str] = None,
578 with_count: bool = False,
579 ip_addr: Optional[str] = None,
580 ):
581 if before is not None and after is not None:
582 if after > before:
583 raise ValueError('the query period is empty')
584 if offset < 0:
585 raise ValueError(f'offset must >= 0!')
586 if count < -1:
587 raise ValueError(f'count must >=-1!')
588 if sort_by is not None and sort_by not in ['runTime', 'memoryUsage']:
589 raise ValueError(f'can only sort by runTime or memoryUsage')
590 wont_have_results = False
591 if isinstance(problem, int):
592 problem = Problem(problem).obj
593 if problem is None:
594 wont_have_results = True
595 if isinstance(q_user, str):
596 q_user = User(q_user)
597 if not q_user:
598 wont_have_results = True
599 q_user = q_user.obj
600 if isinstance(course, str):
601 course = Course(course)
602 if not course:
603 wont_have_results = True
604 # problem's query key
605 p_k = 'problem'
606 if course:
607 problems = Problem.get_problem_list(
608 user,
609 course=course.course_name,
610 )
611 # use all problems under this course to filter
612 if problem is None:
613 p_k = 'problem__in'
614 problem = problems
615 # if problem not in course
616 elif problem not in problems:
617 wont_have_results = True
618 if wont_have_results:
619 return ([], 0) if with_count else []
620 if isinstance(language_type, int):
621 language_type = [language_type]
622 # query args
623 q = {
624 p_k: problem,
625 'status': status,
626 'language__in': language_type,
627 'user': q_user,
628 'ip_addr': ip_addr,
629 'timestamp__lte': before,
630 'timestamp__gte': after,
631 }
632 q = {k: v for k, v in q.items() if v is not None}
633 # sort by upload time
634 submissions = engine.Submission.objects(
635 **q).order_by(sort_by if sort_by is not None else '-timestamp')
636 submission_count = submissions.count()
637 # truncate
638 if count == -1:
639 submissions = submissions[offset:]
640 else:
641 submissions = submissions[offset:offset + count]
642 submissions = list(cls(s) for s in submissions)
643 if with_count:
644 return submissions, submission_count
645 return submissions
647 @classmethod
648 def add(
649 cls,
650 problem_id: int,
651 username: str,
652 lang: int,
653 timestamp: Optional[date] = None,
654 ip_addr: Optional[str] = None,
655 ) -> 'Submission':
656 '''
657 Insert a new submission into db
659 Returns:
660 The created submission
661 '''
662 # check existence
663 user = User(username)
664 if not user:
665 raise engine.DoesNotExist(f'{user} does not exist')
666 problem = Problem(problem_id)
667 if not problem:
668 raise engine.DoesNotExist(f'{problem} dose not exist')
669 # TODO: Ensure problem is ready to submitted
670 # if not problem.is_test_case_ready():
671 # raise TestCaseNotFound(problem_id)
672 if timestamp is None:
673 timestamp = datetime.now()
674 # create a new submission
675 submission = engine.Submission(problem=problem.obj,
676 user=user.obj,
677 language=lang,
678 timestamp=timestamp,
679 ip_addr=ip_addr)
680 submission.save()
681 return cls(submission.id)
683 @classmethod
684 def assign_token(cls, submission_id, token=None):
685 '''
686 generate a token for the submission
687 '''
688 if token is None:
689 token = gen_token()
690 RedisCache().set(gen_key(submission_id), token)
691 return token
693 @classmethod
694 def verify_token(cls, submission_id, token):
695 cache = RedisCache()
696 key = gen_key(submission_id)
697 s_token = cache.get(key)
698 if s_token is None:
699 return False
700 s_token = s_token.decode('ascii')
701 valid = secrets.compare_digest(s_token, token)
702 if valid:
703 cache.delete(key)
704 return valid
706 def to_dict(self) -> Dict[str, Any]:
707 ret = self._to_dict()
708 # Convert Bson object to python dictionary
709 ret = ret.to_dict()
710 return ret
712 def _to_dict(self) -> SON:
713 ret = self.to_mongo()
714 _ret = {
715 'problemId': ret['problem'],
716 'user': self.user.info,
717 'submissionId': str(self.id),
718 'timestamp': self.timestamp.timestamp(),
719 'lastSend': self.last_send.timestamp(),
720 'ipAddr': self.ip_addr,
721 }
722 old = [
723 '_id',
724 'problem',
725 'code',
726 'comment',
727 'tasks',
728 'ip_addr',
729 ]
730 # delete old keys
731 for o in old:
732 del ret[o]
733 # insert new keys
734 ret.update(**_ret)
735 return ret
737 def get_result(self) -> List[Dict[str, Any]]:
738 '''
739 Get results without output
740 '''
741 tasks = [task.to_mongo() for task in self.tasks]
742 for task in tasks:
743 for case in task['cases']:
744 del case['output']
745 return [task.to_dict() for task in tasks]
747 def get_detailed_result(self) -> List[Dict[str, Any]]:
748 '''
749 Get all results (including stdout/stderr) of this submission
750 '''
751 tasks = [task.to_mongo() for task in self.tasks]
752 for task in tasks:
753 for case in task.cases:
754 # extract zip file
755 output = case.pop('output', None)
756 if output is not None:
757 output = engine.GridFSProxy(output)
758 with ZipFile(output) as zf:
759 case['stdout'] = zf.read('stdout').decode('utf-8')
760 case['stderr'] = zf.read('stderr').decode('utf-8')
761 return [task.to_dict() for task in tasks]
763 def _get_code_raw(self):
764 if self.code.grid_id is None and self.code_minio_path is None:
765 return None
767 if self.code_minio_path is not None:
768 minio_client = MinioClient()
769 try:
770 resp = minio_client.client.get_object(
771 minio_client.bucket,
772 self.code_minio_path,
773 )
774 return [resp.read()]
775 finally:
776 if 'resp' in locals():
777 resp.close()
778 resp.release_conn()
780 # fallback to read from gridfs
781 return [self.code.read()]
783 def _get_code_zip(self):
784 if (raw := self._get_code_raw()) is None:
785 return None
786 return ZipFile(io.BytesIO(b"".join(raw)))
788 def get_code(self, path: str, binary=False) -> Union[str, bytes]:
789 # read file
790 try:
791 if (z := self._get_code_zip()) is None:
792 raise SubmissionCodeNotFound
793 with z as zf:
794 data = zf.read(path)
795 # file not exists in the zip or code haven't been uploaded
796 except KeyError:
797 return None
798 # decode byte if need
799 if not binary:
800 try:
801 data = data.decode('utf-8')
802 except UnicodeDecodeError:
803 data = 'Unusual file content, decode fail'
804 return data
806 def get_main_code(self) -> str:
807 '''
808 Get source code user submitted
809 '''
810 ext = self.main_code_ext
811 return self.get_code(f'main{ext}')
813 def has_code(self) -> bool:
814 return self._get_code_zip() is not None
816 def own_permission(self, user) -> Permission:
817 key = f'SUBMISSION_PERMISSION_{self.id}_{user.id}_{self.problem.id}'
818 # Check cache
819 cache = RedisCache()
820 if (v := cache.get(key)) is not None:
821 return self.Permission(int(v))
823 # Calculate
824 if max(
825 course.own_permission(user) for course in map(
826 Course, self.problem.courses)) & Course.Permission.GRADE:
827 cap = self.Permission.MANAGER
828 elif user.username == self.user.username:
829 cap = self.Permission.STUDENT
830 elif Problem(self.problem).permission(
831 user=user,
832 req=Problem.Permission.VIEW,
833 ):
834 cap = self.Permission.OTHER
835 else:
836 cap = self.Permission(0)
838 # students can view outputs of their CE submissions
839 CE = 2
840 if cap & self.Permission.STUDENT and self.status == CE:
841 cap |= self.Permission.VIEW_OUTPUT
843 cache.set(key, cap.value, 60)
844 return cap
846 def permission(self, user, req: Permission):
847 """
848 check whether user own `req` permission
849 """
851 return bool(self.own_permission(user) & req)