Coverage for model/problem.py: 91%

239 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-14 03:01 +0000

1import json 

2import hashlib 

3import statistics 

4from dataclasses import asdict 

5from flask import Blueprint, request, send_file 

6from urllib import parse 

7from zipfile import BadZipFile 

8from mongo import * 

9from mongo import engine 

10from mongo import sandbox 

11from mongo.utils import drop_none 

12from mongo.problem import * 

13from .auth import * 

14from .utils import * 

15 

16__all__ = ['problem_api'] 

17 

18problem_api = Blueprint('problem_api', __name__) 

19 

20 

21def permission_error_response(): 

22 return HTTPError('Not enough permission', 403) 

23 

24 

25def online_error_response(): 

26 return HTTPError('Problem is unavailable', 403) 

27 

28 

29@problem_api.route('/', methods=['GET']) 

30@login_required 

31@Request.args( 

32 'offset', 

33 'count', 

34 'problem_id', 

35 'tags', 

36 'name', 

37 'course', 

38) 

39def view_problem_list( 

40 user, 

41 offset, 

42 count, 

43 tags, 

44 problem_id, 

45 name, 

46 course, 

47): 

48 # casting args 

49 try: 

50 if offset is not None: 

51 offset = int(offset) 

52 if count is not None: 

53 count = int(count) 

54 except (TypeError, ValueError): 

55 return HTTPError( 

56 'offset and count must be integer!', 

57 400, 

58 ) 

59 problem_id, name, tags, course = (parse.unquote(p or '') or None 

60 for p in (problem_id, name, tags, 

61 course)) 

62 try: 

63 ks = { 

64 'user': user, 

65 'offset': offset, 

66 'count': count, 

67 'tags': tags and tags.split(','), 

68 'problem_id': problem_id, 

69 'name': name, 

70 'course': course, 

71 } 

72 ks = {k: v for k, v in ks.items() if v is not None} 

73 data = Problem.get_problem_list(**ks) 

74 except IndexError: 

75 return HTTPError('invalid offset', 400) 

76 data = [{ 

77 'problemId': p.problem_id, 

78 'problemName': p.problem_name, 

79 'status': p.problem_status, 

80 'ACUser': p.ac_user, 

81 'submitter': p.submitter, 

82 'tags': p.tags, 

83 'type': p.problem_type, 

84 'quota': p.quota, 

85 'submitCount': Problem(p.problem_id).submit_count(user) 

86 } for p in data] 

87 return HTTPResponse('Success.', data=data) 

88 

89 

90@problem_api.route('/<int:problem_id>', methods=['GET']) 

91@problem_api.route('/view/<int:problem_id>', methods=['GET']) 

92@login_required 

93@Request.doc('problem_id', 'problem', Problem) 

94def view_problem(user: User, problem: Problem): 

95 if not problem.permission(user=user, req=problem.Permission.VIEW): 

96 return permission_error_response() 

97 if not problem.permission(user=user, req=problem.Permission.ONLINE): 

98 return online_error_response() 

99 # ip validation 

100 if not problem.is_valid_ip(get_ip()): 

101 return HTTPError('Invalid IP address.', 403) 

102 # filter data 

103 data = problem.detailed_info( 

104 'problemName', 

105 'description', 

106 'owner', 

107 'tags', 

108 'allowedLanguage', 

109 'courses', 

110 'quota', 

111 defaultCode='defaultCode', 

112 status='problemStatus', 

113 type='problemType', 

114 testCase='testCase__tasks', 

115 ) 

116 if problem.obj.problem_type == 1: 

117 data.update({'fillInTemplate': problem.obj.test_case.fill_in_template}) 

118 data.update({ 

119 'submitCount': problem.submit_count(user), 

120 'highScore': problem.get_high_score(user=user), 

121 }) 

122 return HTTPResponse('Problem can view.', data=data) 

123 

124 

125@problem_api.route('/manage/<int:problem_id>', methods=['GET']) 

126@Request.doc('problem_id', 'problem', Problem) 

127@identity_verify(0, 1) # admin and teacher only 

128def get_problem_detailed(user, problem: Problem): 

129 ''' 

130 Get problem's detailed information 

131 ''' 

132 if not problem.permission(user, problem.Permission.MANAGE): 

133 return permission_error_response() 

134 if not problem.permission(user=user, req=problem.Permission.ONLINE): 

135 return online_error_response() 

136 info = problem.detailed_info( 

137 'courses', 

138 'problemName', 

139 'description', 

140 'tags', 

141 'testCase', 

142 'ACUser', 

143 'submitter', 

144 'allowedLanguage', 

145 'canViewStdout', 

146 'quota', 

147 status='problemStatus', 

148 type='problemType', 

149 ) 

150 info.update({'submitCount': problem.submit_count(user)}) 

151 return HTTPResponse( 

152 'Success.', 

153 data=info, 

154 ) 

155 

156 

157@problem_api.route('/manage', methods=['POST']) 

158@identity_verify(0, 1) 

159@Request.json( 

160 'type', 

161 'courses: list', 

162 'status', 

163 'type', 

164 'description', 

165 'tags', 

166 'problem_name', 

167 'quota', 

168 'test_case_info', 

169 'can_view_stdout', 

170 'allowed_language', 

171 'default_code', 

172) 

173def create_problem(user: User, **ks): 

174 try: 

175 pid = Problem.add(user=user, **ks) 

176 except ValidationError as e: 

177 return HTTPError( 

178 'Invalid or missing arguments.', 

179 400, 

180 data=e.to_dict(), 

181 ) 

182 except DoesNotExist as e: 

183 return HTTPError('Course not found', 404) 

184 except ValueError as e: 

185 return HTTPError(str(e), 400) 

186 return HTTPResponse(data={'problemId': pid}) 

187 

188 

189@problem_api.route('/manage/<int:problem>', methods=['DELETE']) 

190@identity_verify(0, 1) 

191@Request.doc('problem', Problem) 

192def delete_problem(user: User, problem: Problem): 

193 if not problem.permission(user, problem.Permission.MANAGE): 

194 return permission_error_response() 

195 if not problem.permission(user=user, req=problem.Permission.ONLINE): 

196 return online_error_response() 

197 problem.delete() 

198 return HTTPResponse() 

199 

200 

201@problem_api.route('/manage/<int:problem>', methods=['PUT']) 

202@identity_verify(0, 1) 

203@Request.doc('problem', Problem) 

204def manage_problem(user: User, problem: Problem): 

205 

206 @Request.json( 

207 'type', 

208 'courses: list', 

209 'status', 

210 'type', 

211 'description', 

212 'tags', 

213 'problem_name', 

214 'quota', 

215 'test_case_info', 

216 'can_view_stdout', 

217 'allowed_language', 

218 'default_code', 

219 ) 

220 def modify_problem(**p_ks): 

221 Problem.edit_problem( 

222 user=user, 

223 problem_id=problem.id, 

224 **drop_none(p_ks), 

225 ) 

226 return HTTPResponse() 

227 

228 @Request.files('case') 

229 def modify_problem_test_case(case): 

230 try: 

231 problem.update_test_case(case) 

232 except engine.DoesNotExist as e: 

233 return HTTPError(str(e), 404) 

234 except (ValueError, BadZipFile) as e: 

235 return HTTPError(str(e), 400) 

236 except BadTestCase as e: 

237 return HTTPError(str(e), 400) 

238 return HTTPResponse('Success.') 

239 

240 if not problem.permission(user, problem.Permission.MANAGE): 

241 return permission_error_response() 

242 if not problem.permission(user=user, req=problem.Permission.ONLINE): 

243 return online_error_response() 

244 

245 # edit problem 

246 try: 

247 # modify problem meta 

248 if request.content_type.startswith('application/json'): 

249 return modify_problem() 

250 # upload testcase file 

251 elif request.content_type.startswith('multipart/form-data'): 

252 return modify_problem_test_case() 

253 else: 

254 return HTTPError( 

255 'Unknown content type', 

256 400, 

257 data={'contentType': request.content_type}, 

258 ) 

259 except ValidationError as ve: 

260 return HTTPError( 

261 'Invalid or missing arguments.', 

262 400, 

263 data=ve.to_dict(), 

264 ) 

265 except engine.DoesNotExist: 

266 return HTTPError('Course not found.', 404) 

267 

268 

269@problem_api.post('/<int:problem>/initiate-test-case-upload') 

270@identity_verify(0, 1) 

271@Request.doc('problem', Problem) 

272@Request.json('length: int', 'part_size: int') 

273def initiate_test_case_upload( 

274 user: User, 

275 problem: Problem, 

276 length: int, 

277 part_size: int, 

278): 

279 if not problem.permission(user, problem.Permission.MANAGE): 

280 return permission_error_response() 

281 if not problem.permission(user=user, req=problem.Permission.ONLINE): 

282 return online_error_response() 

283 upload_info = problem.generate_urls_for_uploading_test_case( 

284 length, part_size) 

285 return HTTPResponse(data=asdict(upload_info)) 

286 

287 

288@problem_api.post('/<int:problem>/complete-test-case-upload') 

289@identity_verify(0, 1) 

290@Request.doc('problem', Problem) 

291@Request.json('upload_id', 'parts: list') 

292def complete_test_case_upload( 

293 user: User, 

294 problem: Problem, 

295 upload_id: str, 

296 parts: list, 

297): 

298 if not problem.permission(user, problem.Permission.MANAGE): 

299 return permission_error_response() 

300 if not problem.permission(user=user, req=problem.Permission.ONLINE): 

301 return online_error_response() 

302 # convert parts to list[Part] 

303 from minio.datatypes import Part 

304 parts = [ 

305 Part(part_number=part['PartNumber'], etag=part['ETag']) 

306 for part in parts 

307 ] 

308 try: 

309 problem.complete_test_case_upload(upload_id, parts) 

310 except BadTestCase as e: 

311 return HTTPError(str(e), 400) 

312 return HTTPResponse(status_code=201) 

313 

314 

315@problem_api.route('/<int:problem_id>/test-case', methods=['GET']) 

316@problem_api.route('/<int:problem_id>/testcase', methods=['GET']) 

317@login_required 

318@Request.doc('problem_id', 'problem', Problem) 

319def get_test_case(user: User, problem: Problem): 

320 if not problem.permission(user, problem.Permission.MANAGE): 

321 return permission_error_response() 

322 if not problem.permission(user=user, req=problem.Permission.ONLINE): 

323 return online_error_response() 

324 return send_file( 

325 problem.get_test_case(), 

326 mimetype='application/zip', 

327 as_attachment=True, 

328 download_name=f'testdata-{problem.id}.zip', 

329 ) 

330 

331 

332# FIXME: Find a better name 

333@problem_api.route('/<int:problem_id>/testdata', methods=['GET']) 

334@Request.args('token: str') 

335@Request.doc('problem_id', 'problem', Problem) 

336def get_testdata(token: str, problem: Problem): 

337 if sandbox.find_by_token(token) is None: 

338 return HTTPError('Invalid sandbox token', 401) 

339 return send_file( 

340 problem.get_test_case(), 

341 mimetype='application/zip', 

342 as_attachment=True, 

343 download_name=f'testdata-{problem.id}.zip', 

344 ) 

345 

346 

347@problem_api.route('/<int:problem_id>/checksum', methods=['GET']) 

348@Request.args('token: str') 

349def get_checksum(token: str, problem_id: int): 

350 if sandbox.find_by_token(token) is None: 

351 return HTTPError('Invalid sandbox token', 401) 

352 problem = Problem(problem_id) 

353 if not problem: 

354 return HTTPError(f'{problem} not found', 404) 

355 meta = json.dumps({ 

356 'tasks': 

357 [json.loads(task.to_json()) for task in problem.test_case.tasks] 

358 }).encode() 

359 # TODO: use etag of bucket object 

360 content = problem.get_test_case().read() + meta 

361 digest = hashlib.md5(content).hexdigest() 

362 return HTTPResponse(data=digest) 

363 

364 

365@problem_api.route('/<int:problem_id>/meta', methods=['GET']) 

366@Request.args('token: str') 

367def get_meta(token: str, problem_id: int): 

368 if sandbox.find_by_token(token) is None: 

369 return HTTPError('Invalid sandbox token', 401) 

370 problem = Problem(problem_id) 

371 if not problem: 

372 return HTTPError(f'{problem} not found', 404) 

373 meta = { 

374 'tasks': 

375 [json.loads(task.to_json()) for task in problem.test_case.tasks] 

376 } 

377 return HTTPResponse(data=meta) 

378 

379 

380@problem_api.route('/<int:problem_id>/high-score', methods=['GET']) 

381@login_required 

382@Request.doc('problem_id', 'problem', Problem) 

383def high_score(user: User, problem: Problem): 

384 return HTTPResponse(data={ 

385 'score': problem.get_high_score(user=user), 

386 }) 

387 

388 

389@problem_api.route('/clone', methods=['POST']) 

390@problem_api.route('/copy', methods=['POST']) 

391@identity_verify(0, 1) 

392@Request.json('problem_id: int', 'target', 'status') 

393@Request.doc('problem_id', 'problem', Problem) 

394def clone_problem( 

395 user: User, 

396 problem: Problem, 

397 target, 

398 status, 

399): 

400 if not problem.permission(user, problem.Permission.VIEW): 

401 return HTTPError('Problem can not view.', 403) 

402 override = drop_none({'status': status}) 

403 new_problem_id = problem.copy_to( 

404 user=user, 

405 target=target, 

406 **override, 

407 ) 

408 return HTTPResponse( 

409 'Success.', 

410 data={'problemId': new_problem_id}, 

411 ) 

412 

413 

414@problem_api.route('/publish', methods=['POST']) 

415@identity_verify(0, 1) 

416@Request.json('problem_id') 

417@Request.doc('problem_id', 'problem', Problem) 

418def publish_problem(user, problem: Problem): 

419 if user.role == 1 and problem.owner != user.username: 

420 return HTTPError('Not the owner.', 403) 

421 Problem.release_problem(problem.problem_id) 

422 return HTTPResponse('Success.') 

423 

424 

425@problem_api.route('/<int:problem_id>/stats', methods=['GET']) 

426@login_required 

427@Request.doc('problem_id', 'problem', Problem) 

428def problem_stats(user: User, problem: Problem): 

429 if not problem.permission(user, problem.Permission.VIEW): 

430 return permission_error_response() 

431 if not problem.permission(user=user, req=problem.Permission.ONLINE): 

432 return online_error_response() 

433 ret = {} 

434 students = [] 

435 for course in problem.courses: 

436 students += [User(name) for name in course.student_nicknames.keys()] 

437 students_high_scores = [problem.get_high_score(user=u) for u in students] 

438 # These score statistics are only counting the scores of the students in the course. 

439 ret['acUserRatio'] = [problem.get_ac_user_count(), len(students)] 

440 ret['triedUserCount'] = problem.get_tried_user_count() 

441 ret['average'] = None if len(students) == 0 else statistics.mean( 

442 students_high_scores) 

443 ret['std'] = None if len(students) <= 1 else statistics.pstdev( 

444 students_high_scores) 

445 ret['scoreDistribution'] = students_high_scores 

446 # However, submissions include the submissions of teacher and admin. 

447 ret['statusCount'] = problem.get_submission_status() 

448 params = { 

449 'user': user, 

450 'offset': 0, 

451 'count': 10, 

452 'problem': problem.id, 

453 'status': 0, 

454 } 

455 top_10_runtime_submissions = [ 

456 s.to_dict() for s in Submission.filter(**params, sort_by='runTime') 

457 ] 

458 ret['top10RunTime'] = top_10_runtime_submissions 

459 top_10_memory_submissions = [ 

460 s.to_dict() for s in Submission.filter(**params, sort_by='memoryUsage') 

461 ] 

462 ret['top10MemoryUsage'] = top_10_memory_submissions 

463 return HTTPResponse('Success.', data=ret)