Coverage for mongo/submission.py: 76%

433 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2024-11-05 04:22 +0000

1from __future__ import annotations 

2import os 

3import pathlib 

4import secrets 

5import logging 

6from typing import ( 

7 Any, 

8 Dict, 

9 Optional, 

10 Union, 

11 List, 

12) 

13import enum 

14import tempfile 

15import requests as rq 

16import itertools 

17from bson.son import SON 

18from flask import current_app 

19from tempfile import NamedTemporaryFile 

20from datetime import date, datetime 

21from zipfile import ZipFile, is_zipfile 

22 

23from . import engine 

24from .base import MongoBase 

25from .user import User 

26from .problem import Problem 

27from .homework import Homework 

28from .course import Course 

29from .utils import RedisCache 

30 

31__all__ = [ 

32 'SubmissionConfig', 

33 'Submission', 

34 'JudgeQueueFullError', 

35 'TestCaseNotFound', 

36] 

37 

38# TODO: modular token function 

39 

40 

41def gen_key(_id): 

42 return f'stoekn_{_id}' 

43 

44 

45def gen_token(): 

46 return secrets.token_urlsafe() 

47 

48 

49# Errors 

50class JudgeQueueFullError(Exception): 

51 ''' 

52 when sandbox task queue is full 

53 ''' 

54 

55 

56class TestCaseNotFound(Exception): 

57 ''' 

58 when a problem's testcase havn't been uploaded 

59 ''' 

60 __test__ = False 

61 

62 def __init__(self, problem_id): 

63 self.problem_id = problem_id 

64 

65 def __str__(self): 

66 return f'{Problem(self.problem_id)}\'s testcase is not found' 

67 

68 

69class SubmissionConfig(MongoBase, engine=engine.SubmissionConfig): 

70 TMP_DIR = pathlib.Path( 

71 os.getenv( 

72 'SUBMISSION_TMP_DIR', 

73 tempfile.TemporaryDirectory(suffix='noj-submisisons').name, 

74 ), ) 

75 

76 def __init__(self, name: str): 

77 self.name = name 

78 

79 

80class Submission(MongoBase, engine=engine.Submission): 

81 

82 class Permission(enum.IntFlag): 

83 VIEW = enum.auto() # view submission info 

84 UPLOAD = enum.auto() # student can re-upload 

85 FEEDBACK = enum.auto() # student can view homework feedback 

86 COMMENT = enum.auto() # teacher or TAs can give comment 

87 REJUDGE = enum.auto() # teacher or TAs can rejudge submission 

88 GRADE = enum.auto() # teacher or TAs can grade homework 

89 VIEW_OUTPUT = enum.auto() 

90 OTHER = VIEW 

91 STUDENT = OTHER | UPLOAD | FEEDBACK 

92 MANAGER = STUDENT | COMMENT | REJUDGE | GRADE | VIEW_OUTPUT 

93 

94 _config = None 

95 

96 def __init__(self, submission_id): 

97 self.submission_id = str(submission_id) 

98 

99 def __str__(self): 

100 return f'submission [{self.submission_id}]' 

101 

102 @property 

103 def id(self): 

104 ''' 

105 convert mongo ObjectId to hex string for serialize 

106 ''' 

107 return str(self.obj.id) 

108 

109 @property 

110 def problem_id(self) -> int: 

111 return self.problem.problem_id 

112 

113 @property 

114 def username(self) -> str: 

115 return self.user.username 

116 

117 @property 

118 def status2code(self): 

119 return { 

120 'AC': 0, 

121 'WA': 1, 

122 'CE': 2, 

123 'TLE': 3, 

124 'MLE': 4, 

125 'RE': 5, 

126 'JE': 6, 

127 'OLE': 7, 

128 } 

129 

130 @property 

131 def handwritten(self): 

132 return self.language == 3 

133 

134 @property 

135 def tmp_dir(self) -> pathlib.Path: 

136 tmp_dir = self.config().TMP_DIR 

137 tmp_dir.mkdir(exist_ok=True) 

138 tmp_dir = tmp_dir / self.username / self.id 

139 tmp_dir.mkdir(exist_ok=True, parents=True) 

140 return tmp_dir 

141 

142 @property 

143 def main_code_ext(self): 

144 lang2ext = {0: '.c', 1: '.cpp', 2: '.py'} 

145 return lang2ext[self.language] 

146 

147 @property 

148 def main_code_path(self) -> str: 

149 # handwritten submission didn't provide this function 

150 if self.handwritten: 

151 return 

152 # get excepted code name & temp path 

153 ext = self.main_code_ext 

154 path = self.tmp_dir / f'main{ext}' 

155 # check whether the code has been generated 

156 if not path.exists(): 

157 with ZipFile(self.code) as zf: 

158 path.write_text(zf.read(f'main{ext}').decode('utf-8')) 

159 # return absolute path 

160 return str(path.absolute()) 

161 

162 @classmethod 

163 def config(cls): 

164 if not cls._config: 

165 cls._config = SubmissionConfig('submission') 

166 if not cls._config: 

167 cls._config.save() 

168 return cls._config.reload() 

169 

170 def get_single_output( 

171 self, 

172 task_no: int, 

173 case_no: int, 

174 text: bool = True, 

175 ): 

176 try: 

177 case = self.tasks[task_no].cases[case_no] 

178 except IndexError: 

179 raise FileNotFoundError('task not exist') 

180 ret = {} 

181 try: 

182 with ZipFile(case.output) as zf: 

183 ret = {k: zf.read(k) for k in ('stdout', 'stderr')} 

184 if text: 

185 ret = {k: v.decode('utf-8') for k, v in ret.items()} 

186 except AttributeError: 

187 raise AttributeError('The submission is still in pending') 

188 return ret 

189 

190 def delete_output(self, *args): 

191 ''' 

192 delete stdout/stderr of this submission 

193 

194 Args: 

195 args: ignored value, don't mind 

196 ''' 

197 for task in self.tasks: 

198 for case in task.cases: 

199 case.output.delete() 

200 

201 def delete(self, *keeps): 

202 ''' 

203 delete submission and its related file 

204 

205 Args: 

206 keeps: 

207 the field name you want to keep, accepted 

208 value is {'comment', 'code', 'output'} 

209 other value will be ignored 

210 ''' 

211 drops = {'comment', 'code', 'output'} - {*keeps} 

212 del_funcs = { 

213 'output': self.delete_output, 

214 } 

215 

216 def default_del_func(d): 

217 return self.obj[d].delete() 

218 

219 for d in drops: 

220 del_funcs.get(d, default_del_func)(d) 

221 self.obj.delete() 

222 

223 def sandbox_resp_handler(self, resp): 

224 # judge queue is currently full 

225 def on_500(resp): 

226 raise JudgeQueueFullError 

227 

228 # backend send some invalid data 

229 def on_400(resp): 

230 raise ValueError(resp.text) 

231 

232 # send a invalid token 

233 def on_403(resp): 

234 raise ValueError('invalid token') 

235 

236 h = { 

237 500: on_500, 

238 403: on_403, 

239 400: on_400, 

240 200: lambda r: True, 

241 } 

242 try: 

243 return h[resp.status_code](resp) 

244 except KeyError: 

245 self.logger.error('can not handle response from sandbox') 

246 self.logger.error( 

247 f'status code: {resp.status_code}\n' 

248 f'headers: {resp.headers}\n' 

249 f'body: {resp.text}', ) 

250 return False 

251 

252 def target_sandbox(self): 

253 load = 10**3 # current min load 

254 tar = None # target 

255 for sb in self.config().sandbox_instances: 

256 resp = rq.get(f'{sb.url}/status') 

257 if not resp.ok: 

258 self.logger.warning(f'sandbox {sb.name} status exception') 

259 self.logger.warning( 

260 f'status code: {resp.status_code}\n ' 

261 f'body: {resp.text}', ) 

262 continue 

263 resp = resp.json() 

264 if resp['load'] < load: 

265 load = resp['load'] 

266 tar = sb 

267 return tar 

268 

269 def get_comment(self) -> bytes: 

270 ''' 

271 if comment not exist 

272 ''' 

273 if self.comment.grid_id is None: 

274 raise FileNotFoundError('it seems that comment haven\'t upload') 

275 return self.comment.read() 

276 

277 def check_code(self, file): 

278 if not file: 

279 return 'no file' 

280 if not is_zipfile(file): 

281 return 'not a valid zip file' 

282 with ZipFile(file) as zf: 

283 infos = zf.infolist() 

284 if len(infos) != 1: 

285 return 'more than one file in zip' 

286 name, ext = os.path.splitext(infos[0].filename) 

287 if name != 'main': 

288 return 'only accept file with name \'main\'' 

289 if ext != ['.c', '.cpp', '.py', '.pdf'][self.language]: 

290 return f'invalid file extension, got {ext}' 

291 if ext == '.pdf': 

292 with zf.open('main.pdf') as pdf: 

293 if pdf.read(5) != b'%PDF-': 

294 return 'only accept PDF file.' 

295 file.seek(0) 

296 return True 

297 

298 def rejudge(self) -> bool: 

299 ''' 

300 rejudge this submission 

301 ''' 

302 # delete output file 

303 self.delete_output() 

304 # turn back to haven't be judged 

305 self.update( 

306 status=-1, 

307 last_send=datetime.now(), 

308 tasks=[], 

309 ) 

310 if current_app.config['TESTING']: 

311 return True 

312 return self.send() 

313 

314 def submit(self, code_file) -> bool: 

315 ''' 

316 prepara data for submit code to sandbox and then send it 

317 

318 Args: 

319 code_file: a zip file contains user's code 

320 ''' 

321 # unexisted id 

322 if not self: 

323 raise engine.DoesNotExist(f'{self}') 

324 # save source 

325 res = self.check_code(code_file) 

326 if res is not True: 

327 raise ValueError(res) 

328 self.code.put(code_file) 

329 self.update(status=-1, last_send=datetime.now()) 

330 self.save() 

331 self.reload() 

332 self.logger.debug(f'{self} code updated.') 

333 # delete old handwritten submission 

334 if self.handwritten: 

335 q = { 

336 'problem': self.problem, 

337 'user': self.user, 

338 'language': 3, 

339 } 

340 for submission in engine.Submission.objects(**q): 

341 if submission != self.obj: 

342 for homework in self.problem.homeworks: 

343 stat = homework.student_status[self.user.username][str( 

344 self.problem_id)] 

345 stat['score'] = 0 

346 stat['problemStatus'] = -1 

347 stat['submissionIds'] = [] 

348 homework.save() 

349 submission.delete() 

350 # we no need to actually send code to sandbox during testing 

351 if current_app.config['TESTING'] or self.handwritten: 

352 return True 

353 return self.send() 

354 

355 def send(self) -> bool: 

356 ''' 

357 send code to sandbox 

358 ''' 

359 if self.handwritten: 

360 logging.warning(f'try to send a handwritten {self}') 

361 return False 

362 # TODO: Ensure problem is ready to submitted 

363 if self.problem.test_case.case_zip is None: 

364 raise TestCaseNotFound(self.problem.problem_id) 

365 # setup post body 

366 files = { 

367 'src': self.code, 

368 } 

369 # look for the target sandbox 

370 tar = self.target_sandbox() 

371 if tar is None: 

372 self.logger.error(f'can not target a sandbox for {repr(self)}') 

373 return False 

374 # save token for validation 

375 Submission.assign_token(self.id, tar.token) 

376 post_data = { 

377 'token': tar.token, 

378 'checker': 'print("not implement yet. qaq")', 

379 'problem_id': self.problem_id, 

380 'language': self.language, 

381 } 

382 judge_url = f'{tar.url}/submit/{self.id}' 

383 # send submission to snadbox for judgement 

384 self.logger.info(f'send {self} to {tar.name}') 

385 resp = rq.post( 

386 judge_url, 

387 data=post_data, 

388 files=files, 

389 ) 

390 self.logger.info(f'recieve {self} resp from sandbox') 

391 return self.sandbox_resp_handler(resp) 

392 

393 def process_result(self, tasks: list): 

394 ''' 

395 process results from sandbox 

396 

397 Args: 

398 tasks: 

399 a 2-dim list of the dict with schema 

400 { 

401 'exitCode': int, 

402 'status': str, 

403 'stdout': str, 

404 'stderr': str, 

405 'execTime': int, 

406 'memoryUsage': int 

407 } 

408 ''' 

409 self.logger.info(f'recieve {self} result') 

410 for task in tasks: 

411 for case in task: 

412 # we don't need exit code 

413 del case['exitCode'] 

414 # convert status into integer 

415 case['status'] = self.status2code.get(case['status'], -3) 

416 # process task 

417 for i, cases in enumerate(tasks): 

418 # save stdout/stderr 

419 fds = ['stdout', 'stderr'] 

420 for j, case in enumerate(cases): 

421 tf = NamedTemporaryFile(delete=False) 

422 with ZipFile(tf, 'w') as zf: 

423 for fd in fds: 

424 content = case.pop(fd) 

425 if content is None: 

426 self.logger.error( 

427 f'key {fd} not in case result {self} {i:02d}{j:02d}' 

428 ) 

429 zf.writestr(fd, content) 

430 tf.seek(0) 

431 case['output'] = tf 

432 # convert dict to document 

433 cases[j] = engine.CaseResult( 

434 status=case['status'], 

435 exec_time=case['execTime'], 

436 memory_usage=case['memoryUsage'], 

437 output=case['output'], 

438 ) 

439 status = max(c.status for c in cases) 

440 exec_time = max(c.exec_time for c in cases) 

441 memory_usage = max(c.memory_usage for c in cases) 

442 tasks[i] = engine.TaskResult( 

443 status=status, 

444 exec_time=exec_time, 

445 memory_usage=memory_usage, 

446 score=self.problem.test_case.tasks[i].task_score 

447 if status == 0 else 0, 

448 cases=cases, 

449 ) 

450 status = max(t.status for t in tasks) 

451 exec_time = max(t.exec_time for t in tasks) 

452 memory_usage = max(t.memory_usage for t in tasks) 

453 self.update( 

454 score=sum(task.score for task in tasks), 

455 status=status, 

456 tasks=tasks, 

457 exec_time=exec_time, 

458 memory_usage=memory_usage, 

459 ) 

460 self.reload() 

461 self.finish_judging() 

462 return True 

463 

464 def finish_judging(self): 

465 # update user's submission 

466 User(self.username).add_submission(self) 

467 # update homework data 

468 for homework in self.problem.homeworks: 

469 

470 stat = homework.student_status[self.username][str(self.problem_id)] 

471 if self.handwritten: 

472 continue 

473 if 'rawScore' not in stat: 

474 stat['rawScore'] = 0 

475 stat['submissionIds'].append(self.id) 

476 # handwritten problem will only keep the last submission 

477 if self.handwritten: 

478 stat['submissionIds'] = stat['submissionIds'][-1:] 

479 # if the homework is overdue, do the penalty 

480 if self.timestamp > homework.duration.end and not self.handwritten and homework.penalty is not None: 

481 self.score, stat['rawScore'] = Homework(homework).do_penalty( 

482 self, stat) 

483 else: 

484 if self.score > stat['rawScore']: 

485 stat['rawScore'] = self.score 

486 # update high score / handwritten problem is judged by teacher 

487 if self.score >= stat['score'] or self.handwritten: 

488 stat['score'] = self.score 

489 stat['problemStatus'] = self.status 

490 

491 homework.save() 

492 key = Problem(self.problem).high_score_key(user=self.user) 

493 RedisCache().delete(key) 

494 

495 def add_comment(self, file): 

496 ''' 

497 comment a submission with PDF 

498 

499 Args: 

500 file: a PDF file 

501 ''' 

502 data = file.read() 

503 # check magic number 

504 if data[:5] != b'%PDF-': 

505 raise ValueError('only accept PDF file.') 

506 # write to a new file if it did not exist before 

507 if self.comment.grid_id is None: 

508 write_func = self.comment.put 

509 # replace its content otherwise 

510 else: 

511 write_func = self.comment.replace 

512 write_func(data) 

513 self.logger.debug(f'{self} comment updated.') 

514 # update submission 

515 self.save() 

516 

517 @staticmethod 

518 def count(): 

519 return len(engine.Submission.objects) 

520 

521 @classmethod 

522 def filter( 

523 cls, 

524 user, 

525 offset: int = 0, 

526 count: int = -1, 

527 problem: Optional[Union[Problem, int]] = None, 

528 q_user: Optional[Union[User, str]] = None, 

529 status: Optional[int] = None, 

530 language_type: Optional[Union[List[int], int]] = None, 

531 course: Optional[Union[Course, str]] = None, 

532 before: Optional[datetime] = None, 

533 after: Optional[datetime] = None, 

534 sort_by: Optional[str] = None, 

535 with_count: bool = False, 

536 ip_addr: Optional[str] = None, 

537 ): 

538 if before is not None and after is not None: 

539 if after > before: 

540 raise ValueError('the query period is empty') 

541 if offset < 0: 

542 raise ValueError(f'offset must >= 0!') 

543 if count < -1: 

544 raise ValueError(f'count must >=-1!') 

545 if sort_by is not None and sort_by not in ['runTime', 'memoryUsage']: 

546 raise ValueError(f'can only sort by runTime or memoryUsage') 

547 wont_have_results = False 

548 if isinstance(problem, int): 

549 problem = Problem(problem).obj 

550 if problem is None: 

551 wont_have_results = True 

552 if isinstance(q_user, str): 

553 q_user = User(q_user) 

554 if not q_user: 

555 wont_have_results = True 

556 q_user = q_user.obj 

557 if isinstance(course, str): 

558 course = Course(course) 

559 if not course: 

560 wont_have_results = True 

561 # problem's query key 

562 p_k = 'problem' 

563 if course: 

564 problems = Problem.get_problem_list( 

565 user, 

566 course=course.course_name, 

567 ) 

568 # use all problems under this course to filter 

569 if problem is None: 

570 p_k = 'problem__in' 

571 problem = problems 

572 # if problem not in course 

573 elif problem not in problems: 

574 wont_have_results = True 

575 if wont_have_results: 

576 return ([], 0) if with_count else [] 

577 if isinstance(language_type, int): 

578 language_type = [language_type] 

579 # query args 

580 q = { 

581 p_k: problem, 

582 'status': status, 

583 'language__in': language_type, 

584 'user': q_user, 

585 'ip_addr': ip_addr, 

586 'timestamp__lte': before, 

587 'timestamp__gte': after, 

588 } 

589 q = {k: v for k, v in q.items() if v is not None} 

590 # sort by upload time 

591 submissions = engine.Submission.objects( 

592 **q).order_by(sort_by if sort_by is not None else '-timestamp') 

593 submission_count = submissions.count() 

594 # truncate 

595 if count == -1: 

596 submissions = submissions[offset:] 

597 else: 

598 submissions = submissions[offset:offset + count] 

599 submissions = list(cls(s) for s in submissions) 

600 if with_count: 

601 return submissions, submission_count 

602 return submissions 

603 

604 @classmethod 

605 def add( 

606 cls, 

607 problem_id: int, 

608 username: str, 

609 lang: int, 

610 timestamp: Optional[date] = None, 

611 ip_addr: Optional[str] = None, 

612 ) -> 'Submission': 

613 ''' 

614 Insert a new submission into db 

615 

616 Returns: 

617 The created submission 

618 ''' 

619 # check existence 

620 user = User(username) 

621 if not user: 

622 raise engine.DoesNotExist(f'{user} does not exist') 

623 problem = Problem(problem_id) 

624 if not problem: 

625 raise engine.DoesNotExist(f'{problem} dose not exist') 

626 if problem.test_case.case_zip is None: 

627 raise TestCaseNotFound(problem_id) 

628 if timestamp is None: 

629 timestamp = datetime.now() 

630 # create a new submission 

631 submission = engine.Submission(problem=problem.obj, 

632 user=user.obj, 

633 language=lang, 

634 timestamp=timestamp, 

635 ip_addr=ip_addr) 

636 submission.save() 

637 return cls(submission.id) 

638 

639 @classmethod 

640 def assign_token(cls, submission_id, token=None): 

641 ''' 

642 generate a token for the submission 

643 ''' 

644 if token is None: 

645 token = gen_token() 

646 RedisCache().set(gen_key(submission_id), token) 

647 return token 

648 

649 @classmethod 

650 def verify_token(cls, submission_id, token): 

651 cache = RedisCache() 

652 key = gen_key(submission_id) 

653 s_token = cache.get(key) 

654 if s_token is None: 

655 return False 

656 s_token = s_token.decode('ascii') 

657 valid = secrets.compare_digest(s_token, token) 

658 if valid: 

659 cache.delete(key) 

660 return valid 

661 

662 def to_dict(self) -> Dict[str, Any]: 

663 ret = self._to_dict() 

664 # Convert Bson object to python dictionary 

665 ret = ret.to_dict() 

666 return ret 

667 

668 def _to_dict(self) -> SON: 

669 ret = self.to_mongo() 

670 _ret = { 

671 'problemId': ret['problem'], 

672 'user': self.user.info, 

673 'submissionId': str(self.id), 

674 'timestamp': self.timestamp.timestamp(), 

675 'lastSend': self.last_send.timestamp(), 

676 'ipAddr': self.ip_addr, 

677 } 

678 old = [ 

679 '_id', 

680 'problem', 

681 'code', 

682 'comment', 

683 'tasks', 

684 'ip_addr', 

685 ] 

686 # delete old keys 

687 for o in old: 

688 del ret[o] 

689 # insert new keys 

690 ret.update(**_ret) 

691 return ret 

692 

693 def get_result(self) -> List[Dict[str, Any]]: 

694 ''' 

695 Get results without output 

696 ''' 

697 tasks = [task.to_mongo() for task in self.tasks] 

698 for task in tasks: 

699 for case in task['cases']: 

700 del case['output'] 

701 return [task.to_dict() for task in tasks] 

702 

703 def get_detailed_result(self) -> List[Dict[str, Any]]: 

704 ''' 

705 Get all results (including stdout/stderr) of this submission 

706 ''' 

707 tasks = [task.to_mongo() for task in self.tasks] 

708 for task in tasks: 

709 for case in task.cases: 

710 # extract zip file 

711 output = case.pop('output', None) 

712 if output is not None: 

713 output = engine.GridFSProxy(output) 

714 with ZipFile(output) as zf: 

715 case['stdout'] = zf.read('stdout').decode('utf-8') 

716 case['stderr'] = zf.read('stderr').decode('utf-8') 

717 return [task.to_dict() for task in tasks] 

718 

719 def get_code(self, path: str, binary=False) -> Union[str, bytes]: 

720 # read file 

721 try: 

722 with ZipFile(self.code) as zf: 

723 data = zf.read(path) 

724 # file not exists in the zip or code haven't been uploaded 

725 except (KeyError, AttributeError): 

726 return None 

727 # decode byte if need 

728 if not binary: 

729 try: 

730 data = data.decode('utf-8') 

731 except UnicodeDecodeError: 

732 data = 'Unusual file content, decode fail' 

733 return data 

734 

735 def get_main_code(self) -> str: 

736 ''' 

737 Get source code user submitted 

738 ''' 

739 ext = self.main_code_ext 

740 return self.get_code(f'main{ext}') 

741 

742 def own_permission(self, user) -> Permission: 

743 key = f'SUBMISSION_PERMISSION_{self.id}_{user.id}_{self.problem.id}' 

744 # Check cache 

745 cache = RedisCache() 

746 if (v := cache.get(key)) is not None: 

747 return self.Permission(int(v)) 

748 

749 # Calculate 

750 if max( 

751 course.own_permission(user) for course in map( 

752 Course, self.problem.courses)) & Course.Permission.GRADE: 

753 cap = self.Permission.MANAGER 

754 elif user.username == self.user.username: 

755 cap = self.Permission.STUDENT 

756 elif Problem(self.problem).permission( 

757 user=user, 

758 req=Problem.Permission.VIEW, 

759 ): 

760 cap = self.Permission.OTHER 

761 else: 

762 cap = self.Permission(0) 

763 

764 # students can view outputs of their CE submissions 

765 CE = 2 

766 if cap & self.Permission.STUDENT and self.status == CE: 

767 cap |= self.Permission.VIEW_OUTPUT 

768 

769 cache.set(key, cap.value, 60) 

770 return cap 

771 

772 def permission(self, user, req: Permission): 

773 """ 

774 check whether user own `req` permission 

775 """ 

776 

777 return bool(self.own_permission(user) & req)