Coverage for mongo/problem/problem.py: 100%

192 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2024-11-05 04:22 +0000

1from .. import engine 

2from ..base import MongoBase 

3from ..course import * 

4from ..utils import (RedisCache, doc_required, drop_none) 

5from ..user import User 

6from .exception import BadTestCase 

7from .test_case import ( 

8 SimpleIO, 

9 ContextIO, 

10 IncludeDirectory, 

11 TestCaseRule, 

12) 

13from datetime import datetime 

14from typing import ( 

15 Any, 

16 BinaryIO, 

17 Dict, 

18 List, 

19 Optional, 

20) 

21import json 

22import enum 

23 

24__all__ = ('Problem', ) 

25 

26 

27class Problem(MongoBase, engine=engine.Problem): 

28 

29 class Permission(enum.IntFlag): 

30 VIEW = enum.auto() # user view permission 

31 ONLINE = enum.auto() # user can view problem or not 

32 MANAGE = enum.auto() # user manage problem permission 

33 

34 def detailed_info(self, *ks, **kns) -> Dict[str, Any]: 

35 ''' 

36 return detailed info about this problem. notice 

37 that the `input` and `output` of problem test 

38 case won't be sent to front end, need call other 

39 route to get this info. 

40 

41 Args: 

42 ks (*str): the field name you want to get 

43 kns (**[str, str]): 

44 specify the dict key you want to store 

45 the data get by field name 

46 Return: 

47 a dict contains problem's data 

48 ''' 

49 if not self: 

50 return {} 

51 # problem -> dict 

52 _ret = self.to_mongo() 

53 # preprocess fields 

54 # case zip can not be serialized 

55 if 'caseZip' in _ret['testCase']: 

56 del _ret['testCase']['caseZip'] 

57 # convert couse document to course name 

58 _ret['courses'] = [course.course_name for course in self.courses] 

59 ret = {} 

60 for k in ks: 

61 kns[k] = k 

62 for k, n in kns.items(): 

63 s_ns = n.split('__') 

64 # extract wanted value 

65 v = _ret[s_ns[0]] 

66 for s_n in s_ns[1:]: 

67 v = v[s_n] 

68 # extract wanted keys 

69 e = ret 

70 s_ks = k.split('__') 

71 for s_k in s_ks[:-1]: 

72 if s_k not in e: 

73 e[s_k] = {} 

74 e = e[s_k] 

75 e[s_ks[-1]] = v 

76 return ret 

77 

78 def allowed(self, language): 

79 if self.problem_type == 2: 

80 return True 

81 if language >= 3 or language < 0: 

82 return False 

83 return bool((1 << language) & self.allowed_language) 

84 

85 def submit_count(self, user: User) -> int: 

86 ''' 

87 Calculate how many submissions the user has submitted to this problem. 

88 ''' 

89 # reset quota if it's a new day 

90 if user.last_submit.date() != datetime.now().date(): 

91 user.update(problem_submission={}) 

92 return 0 

93 return user.problem_submission.get(str(self.problem_id), 0) 

94 

95 def running_homeworks(self) -> List: 

96 from ..homework import Homework 

97 now = datetime.now() 

98 return [Homework(hw.id) for hw in self.homeworks if now in hw.duration] 

99 

100 def is_valid_ip(self, ip: str): 

101 return all(hw.is_valid_ip(ip) for hw in self.running_homeworks()) 

102 

103 def get_submission_status(self) -> Dict[str, int]: 

104 pipeline = { 

105 "$group": { 

106 "_id": "$status", 

107 "count": { 

108 "$sum": 1 

109 }, 

110 } 

111 } 

112 cursor = engine.Submission.objects(problem=self.id).aggregate( 

113 [pipeline], ) 

114 return {item['_id']: item['count'] for item in cursor} 

115 

116 def get_ac_user_count(self) -> int: 

117 ac_users = engine.Submission.objects( 

118 problem=self.id, 

119 status=0, 

120 ).distinct('user') 

121 return len(ac_users) 

122 

123 def get_tried_user_count(self) -> int: 

124 tried_users = engine.Submission.objects( 

125 problem=self.id, ).distinct('user') 

126 return len(tried_users) 

127 

128 @doc_required('user', User) 

129 def high_score_key(self, user: User) -> str: 

130 return f'PROBLEM_{self.id}_{user.id}_HIGH_SCORE' 

131 

132 @doc_required('user', User) 

133 def get_high_score(self, user: User) -> int: 

134 ''' 

135 Get highest score for user of this problem. 

136 ''' 

137 cache = RedisCache() 

138 key = self.high_score_key(user=user) 

139 if (val := cache.get(key)) is not None: 

140 return int(val.decode()) 

141 # TODO: avoid calling mongoengine API directly 

142 submissions = engine.Submission.objects( 

143 user=user.id, 

144 problem=self.id, 

145 ).only('score').order_by('-score').limit(1) 

146 if submissions.count() == 0: 

147 high_score = 0 

148 else: 

149 # It might < 0 if there is only incomplete submission 

150 high_score = max(submissions[0].score, 0) 

151 cache.set(key, high_score, ex=600) 

152 return high_score 

153 

154 @doc_required('user', User) 

155 def own_permission(self, user: User) -> Permission: 

156 """ 

157 generate user permission capability 

158 """ 

159 

160 user_cap = self.Permission(0) 

161 for course in map(Course, self.courses): 

162 # inherit course permission 

163 if course.permission(user, Course.Permission.VIEW): 

164 user_cap |= self.Permission.VIEW 

165 

166 # online problem 

167 if self.problem_status == 0: 

168 check_public_problem = True 

169 for homework in course.homeworks: 

170 if self.problem_id in homework.problem_ids: 

171 check_public_problem = False 

172 # current time after homework then online problem 

173 if datetime.now() >= homework.duration.start: 

174 user_cap |= self.Permission.ONLINE 

175 

176 # problem does not belong to any homework 

177 if check_public_problem: 

178 user_cap |= self.Permission.ONLINE 

179 

180 # Admin, Teacher && is owner 

181 if user.role == 0 or self.owner == user.username: 

182 user_cap |= self.Permission.VIEW 

183 user_cap |= self.Permission.ONLINE 

184 user_cap |= self.Permission.MANAGE 

185 

186 return user_cap 

187 

188 def permission(self, user: User, req: Permission) -> bool: 

189 """ 

190 check whether user own `req` permission 

191 """ 

192 

193 return (self.own_permission(user=user) & req) == req 

194 

195 @classmethod 

196 def get_problem_list( 

197 cls, 

198 user, 

199 offset: int = 0, 

200 count: int = -1, 

201 problem_id: int = None, 

202 name: str = None, 

203 tags: list = None, 

204 course: str = None, 

205 ): 

206 ''' 

207 get a list of problems 

208 ''' 

209 if course is not None: 

210 course = Course(course) 

211 if not course: 

212 return [] 

213 course = course.obj 

214 # qurey args 

215 ks = drop_none({ 

216 'problem_id': problem_id, 

217 'problem_name': name, 

218 'courses': course, 

219 'tags__in': tags, 

220 }) 

221 problems = [ 

222 p for p in engine.Problem.objects(**ks).order_by('problemId') 

223 if cls(p).permission(user=user, req=cls.Permission.ONLINE) 

224 ] 

225 # truncate 

226 if offset < 0 or (offset >= len(problems) and len(problems)): 

227 raise IndexError 

228 right = len(problems) if count < 0 else offset + count 

229 right = min(len(problems), right) 

230 return problems[offset:right] 

231 

232 @classmethod 

233 def add( 

234 cls, 

235 user: User, 

236 courses: List[str], 

237 problem_name: str, 

238 status: Optional[int] = None, 

239 description: Optional[Dict[str, Any]] = None, 

240 tags: Optional[List[str]] = None, 

241 type: Optional[int] = None, 

242 test_case_info: Optional[Dict[str, Any]] = None, 

243 can_view_stdout: bool = False, 

244 allowed_language: Optional[int] = None, 

245 quota: Optional[int] = None, 

246 default_code: Optional[str] = None, 

247 ): 

248 if len(courses) == 0: 

249 raise ValueError('No course provided') 

250 course_objs = [] 

251 for course in map(Course, courses): 

252 if not course: 

253 raise engine.DoesNotExist 

254 course_objs.append(course.id) 

255 problem_args = drop_none({ 

256 'courses': course_objs, 

257 'problem_status': status, 

258 'problem_type': type, 

259 'problem_name': problem_name, 

260 'description': description, 

261 'owner': user.username, 

262 'tags': tags, 

263 'quota': quota, 

264 'default_code': default_code, 

265 }) 

266 problem = cls.engine(**problem_args).save() 

267 programming_problem_args = drop_none({ 

268 'test_case': 

269 test_case_info, 

270 'can_view_stdout': 

271 can_view_stdout, 

272 'allowed_language': 

273 allowed_language, 

274 }) 

275 if programming_problem_args and type != 2: 

276 problem.update(**programming_problem_args) 

277 return problem.problem_id 

278 

279 @classmethod 

280 def edit_problem( 

281 cls, 

282 user: User, 

283 problem_id: int, 

284 courses: List[str], 

285 status: int, 

286 problem_name: str, 

287 description: Dict[str, Any], 

288 tags: List[str], 

289 type, 

290 test_case_info: Optional[Dict[str, Any]] = None, 

291 allowed_language: int = 7, 

292 can_view_stdout: bool = False, 

293 quota: int = -1, 

294 default_code: str = '', 

295 ): 

296 if type != 2: 

297 score = sum(t['taskScore'] for t in test_case_info['tasks']) 

298 if score != 100: 

299 raise ValueError("Cases' scores should be 100 in total") 

300 problem = Problem(problem_id).obj 

301 course_objs = [] 

302 for name in courses: 

303 if not (course := Course(name)): 

304 raise engine.DoesNotExist 

305 course_objs.append(course.obj) 

306 problem.update( 

307 courses=course_objs, 

308 problem_status=status, 

309 problem_type=type, 

310 problem_name=problem_name, 

311 description=description, 

312 owner=user.username, 

313 tags=tags, 

314 quota=quota, 

315 default_code=default_code, 

316 ) 

317 if type != 2: 

318 # preprocess test case 

319 test_case = problem.test_case 

320 if test_case_info: 

321 test_case = engine.ProblemTestCase.from_json( 

322 json.dumps(test_case_info)) 

323 test_case.case_zip = problem.test_case.case_zip 

324 problem.update( 

325 allowed_language=allowed_language, 

326 can_view_stdout=can_view_stdout, 

327 test_case=test_case, 

328 ) 

329 

330 def update_test_case(self, test_case: BinaryIO): 

331 ''' 

332 edit problem's testcase 

333 

334 Args: 

335 test_case: testcase zip file 

336 Exceptions: 

337 zipfile.BadZipFile: if `test_case` is not a zip file 

338 ValueError: if test case is None or problem_id is invalid 

339 engine.DoesNotExist 

340 ''' 

341 rules: List[TestCaseRule] = [ 

342 IncludeDirectory(self, 'include'), 

343 IncludeDirectory(self, 'share'), 

344 # for backward compatibility 

345 IncludeDirectory(self, 'chaos'), 

346 ] 

347 for rule in rules: 

348 rule.validate(test_case) 

349 

350 # Should only match one format 

351 rules = [ 

352 SimpleIO(self, ['include/', 'share/', 'chaos/']), 

353 ContextIO(self), 

354 ] 

355 excs = [] 

356 for rule in rules: 

357 try: 

358 rule.validate(test_case) 

359 except BadTestCase as e: 

360 excs.append(e) 

361 

362 if len(excs) == 0: 

363 raise BadTestCase('ambiguous test case format') 

364 elif len(excs) == 2: 

365 raise BadTestCase( 

366 f'invalid test case format\n\n{excs[0]}\n\n{excs[1]}') 

367 

368 # save zip file 

369 test_case.seek(0) 

370 # check whether the test case exists 

371 if self.test_case.case_zip.grid_id is None: 

372 # if no, put data to a new file 

373 write_func = self.test_case.case_zip.put 

374 else: 

375 # else, replace original file with a new one 

376 write_func = self.test_case.case_zip.replace 

377 write_func( 

378 test_case, 

379 content_type='application/zip', 

380 ) 

381 # update problem obj 

382 self.save() 

383 

384 @classmethod 

385 def copy_problem(cls, user, problem_id): 

386 problem = Problem(problem_id).obj 

387 engine.Problem( 

388 problem_status=problem.problem_status, 

389 problem_type=problem.problem_type, 

390 problem_name=problem.problem_name, 

391 description=problem.description, 

392 owner=user.username, 

393 tags=problem.tags, 

394 test_case=problem.test_case, 

395 ).save() 

396 

397 @doc_required('target', Course, src_none_allowed=True) 

398 def copy_to( 

399 self, 

400 user: User, 

401 target: Optional[Course] = None, 

402 **override, 

403 ) -> 'Problem': 

404 ''' 

405 Copy a problem to target course, hidden by default. 

406 

407 Args: 

408 user (User): The user who execute this action and will become 

409 the owner of copied problem. 

410 target (Optional[Course] = None): The course this problem will 

411 be copied to, default to the first of origial courses. 

412 override: Override field values passed to `Problem.add`. 

413 ''' 

414 target = self.courses[0] if target is None else target 

415 # Copied problem is hidden by default 

416 status = override.pop('status', Problem.engine.Visibility.HIDDEN) 

417 ks = dict( 

418 user=user, 

419 courses=[target.course_name], 

420 problem_name=self.problem_name, 

421 status=status, 

422 description=self.description.to_mongo(), 

423 tags=self.tags, 

424 type=self.problem_type, 

425 test_case_info=self.test_case.to_mongo(), 

426 can_view_stdout=self.can_view_stdout, 

427 allowed_language=self.allowed_language, 

428 quota=self.quota, 

429 default_code=self.default_code, 

430 ) 

431 ks.update(override) 

432 copy = self.add(**ks) 

433 return copy 

434 

435 @classmethod 

436 def release_problem(cls, problem_id): 

437 course = Course('Public').obj 

438 problem = Problem(problem_id).obj 

439 problem.courses = [course] 

440 problem.owner = 'first_admin' 

441 problem.save()