Coverage for model/problem.py: 98%

211 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2024-11-05 04:22 +0000

1import json 

2import hashlib 

3import statistics 

4from flask import Blueprint, request, send_file 

5from urllib import parse 

6from zipfile import BadZipFile 

7from mongo import * 

8from mongo import engine 

9from mongo import sandbox 

10from mongo.utils import drop_none 

11from mongo.problem import * 

12from .auth import * 

13from .utils import * 

14 

15__all__ = ['problem_api'] 

16 

17problem_api = Blueprint('problem_api', __name__) 

18 

19 

20def permission_error_response(): 

21 return HTTPError('Not enough permission', 403) 

22 

23 

24def online_error_response(): 

25 return HTTPError('Problem is unavailable', 403) 

26 

27 

28@problem_api.route('/', methods=['GET']) 

29@login_required 

30@Request.args( 

31 'offset', 

32 'count', 

33 'problem_id', 

34 'tags', 

35 'name', 

36 'course', 

37) 

38def view_problem_list( 

39 user, 

40 offset, 

41 count, 

42 tags, 

43 problem_id, 

44 name, 

45 course, 

46): 

47 # casting args 

48 try: 

49 if offset is not None: 

50 offset = int(offset) 

51 if count is not None: 

52 count = int(count) 

53 except (TypeError, ValueError): 

54 return HTTPError( 

55 'offset and count must be integer!', 

56 400, 

57 ) 

58 problem_id, name, tags, course = (parse.unquote(p or '') or None 

59 for p in (problem_id, name, tags, 

60 course)) 

61 try: 

62 ks = { 

63 'user': user, 

64 'offset': offset, 

65 'count': count, 

66 'tags': tags and tags.split(','), 

67 'problem_id': problem_id, 

68 'name': name, 

69 'course': course, 

70 } 

71 ks = {k: v for k, v in ks.items() if v is not None} 

72 data = Problem.get_problem_list(**ks) 

73 except IndexError: 

74 return HTTPError('invalid offset', 400) 

75 data = [{ 

76 'problemId': p.problem_id, 

77 'problemName': p.problem_name, 

78 'status': p.problem_status, 

79 'ACUser': p.ac_user, 

80 'submitter': p.submitter, 

81 'tags': p.tags, 

82 'type': p.problem_type, 

83 'quota': p.quota, 

84 'submitCount': Problem(p.problem_id).submit_count(user) 

85 } for p in data] 

86 return HTTPResponse('Success.', data=data) 

87 

88 

89@problem_api.route('/<int:problem_id>', methods=['GET']) 

90@problem_api.route('/view/<int:problem_id>', methods=['GET']) 

91@login_required 

92@Request.doc('problem_id', 'problem', Problem) 

93def view_problem(user: User, problem: Problem): 

94 if not problem.permission(user=user, req=problem.Permission.VIEW): 

95 return permission_error_response() 

96 if not problem.permission(user=user, req=problem.Permission.ONLINE): 

97 return online_error_response() 

98 # ip validation 

99 if not problem.is_valid_ip(get_ip()): 

100 return HTTPError('Invalid IP address.', 403) 

101 # filter data 

102 data = problem.detailed_info( 

103 'problemName', 

104 'description', 

105 'owner', 

106 'tags', 

107 'allowedLanguage', 

108 'courses', 

109 'quota', 

110 defaultCode='defaultCode', 

111 status='problemStatus', 

112 type='problemType', 

113 testCase='testCase__tasks', 

114 ) 

115 if problem.obj.problem_type == 1: 

116 data.update({'fillInTemplate': problem.obj.test_case.fill_in_template}) 

117 data.update({ 

118 'submitCount': problem.submit_count(user), 

119 'highScore': problem.get_high_score(user=user), 

120 }) 

121 return HTTPResponse('Problem can view.', data=data) 

122 

123 

124@problem_api.route('/manage/<int:problem_id>', methods=['GET']) 

125@Request.doc('problem_id', 'problem', Problem) 

126@identity_verify(0, 1) # admin and teacher only 

127def get_problem_detailed(user, problem: Problem): 

128 ''' 

129 Get problem's detailed information 

130 ''' 

131 if not problem.permission(user, problem.Permission.MANAGE): 

132 return permission_error_response() 

133 if not problem.permission(user=user, req=problem.Permission.ONLINE): 

134 return online_error_response() 

135 info = problem.detailed_info( 

136 'courses', 

137 'problemName', 

138 'description', 

139 'tags', 

140 'testCase', 

141 'ACUser', 

142 'submitter', 

143 'allowedLanguage', 

144 'canViewStdout', 

145 'quota', 

146 status='problemStatus', 

147 type='problemType', 

148 ) 

149 info.update({'submitCount': problem.submit_count(user)}) 

150 return HTTPResponse( 

151 'Success.', 

152 data=info, 

153 ) 

154 

155 

156@problem_api.route('/manage', methods=['POST']) 

157@identity_verify(0, 1) 

158@Request.json( 

159 'type', 

160 'courses: list', 

161 'status', 

162 'type', 

163 'description', 

164 'tags', 

165 'problem_name', 

166 'quota', 

167 'test_case_info', 

168 'can_view_stdout', 

169 'allowed_language', 

170 'default_code', 

171) 

172def create_problem(user: User, **ks): 

173 try: 

174 pid = Problem.add(user=user, **ks) 

175 except ValidationError as e: 

176 return HTTPError( 

177 'Invalid or missing arguments.', 

178 400, 

179 data=e.to_dict(), 

180 ) 

181 except DoesNotExist as e: 

182 return HTTPError('Course not found', 404) 

183 except ValueError as e: 

184 return HTTPError(str(e), 400) 

185 return HTTPResponse(data={'problemId': pid}) 

186 

187 

188@problem_api.route('/manage/<int:problem>', methods=['DELETE']) 

189@identity_verify(0, 1) 

190@Request.doc('problem', Problem) 

191def delete_problem(user: User, problem: Problem): 

192 if not problem.permission(user, problem.Permission.MANAGE): 

193 return permission_error_response() 

194 if not problem.permission(user=user, req=problem.Permission.ONLINE): 

195 return online_error_response() 

196 problem.delete() 

197 return HTTPResponse() 

198 

199 

200@problem_api.route('/manage/<int:problem>', methods=['PUT']) 

201@identity_verify(0, 1) 

202@Request.doc('problem', Problem) 

203def manage_problem(user: User, problem: Problem): 

204 

205 @Request.json( 

206 'type', 

207 'courses: list', 

208 'status', 

209 'type', 

210 'description', 

211 'tags', 

212 'problem_name', 

213 'quota', 

214 'test_case_info', 

215 'can_view_stdout', 

216 'allowed_language', 

217 'default_code', 

218 ) 

219 def modify_problem(**p_ks): 

220 Problem.edit_problem( 

221 user=user, 

222 problem_id=problem.id, 

223 **drop_none(p_ks), 

224 ) 

225 return HTTPResponse() 

226 

227 @Request.files('case') 

228 def modify_problem_test_case(case): 

229 try: 

230 problem.update_test_case(case) 

231 except engine.DoesNotExist as e: 

232 return HTTPError(str(e), 404) 

233 except (ValueError, BadZipFile) as e: 

234 return HTTPError(str(e), 400) 

235 except BadTestCase as e: 

236 return HTTPError(str(e), 400) 

237 return HTTPResponse('Success.') 

238 

239 if not problem.permission(user, problem.Permission.MANAGE): 

240 return permission_error_response() 

241 if not problem.permission(user=user, req=problem.Permission.ONLINE): 

242 return online_error_response() 

243 

244 # edit problem 

245 try: 

246 # modify problem meta 

247 if request.content_type.startswith('application/json'): 

248 return modify_problem() 

249 # upload testcase file 

250 elif request.content_type.startswith('multipart/form-data'): 

251 return modify_problem_test_case() 

252 else: 

253 return HTTPError( 

254 'Unknown content type', 

255 400, 

256 data={'contentType': request.content_type}, 

257 ) 

258 except ValidationError as ve: 

259 return HTTPError( 

260 'Invalid or missing arguments.', 

261 400, 

262 data=ve.to_dict(), 

263 ) 

264 except engine.DoesNotExist: 

265 return HTTPError('Course not found.', 404) 

266 

267 

268@problem_api.route('/<int:problem_id>/test-case', methods=['GET']) 

269@problem_api.route('/<int:problem_id>/testcase', methods=['GET']) 

270@login_required 

271@Request.doc('problem_id', 'problem', Problem) 

272def get_test_case(user: User, problem: Problem): 

273 if not problem.permission(user, problem.Permission.MANAGE): 

274 return permission_error_response() 

275 if not problem.permission(user=user, req=problem.Permission.ONLINE): 

276 return online_error_response() 

277 return send_file( 

278 problem.test_case.case_zip, 

279 mimetype='application/zip', 

280 as_attachment=True, 

281 download_name=f'testdata-{problem.id}.zip', 

282 ) 

283 

284 

285# FIXME: Find a better name 

286@problem_api.route('/<int:problem_id>/testdata', methods=['GET']) 

287@Request.args('token: str') 

288@Request.doc('problem_id', 'problem', Problem) 

289def get_testdata(token: str, problem: Problem): 

290 if sandbox.find_by_token(token) is None: 

291 return HTTPError('Invalid sandbox token', 401) 

292 return send_file( 

293 problem.test_case.case_zip, 

294 mimetype='application/zip', 

295 as_attachment=True, 

296 download_name=f'testdata-{problem.id}.zip', 

297 ) 

298 

299 

300@problem_api.route('/<int:problem_id>/checksum', methods=['GET']) 

301@Request.args('token: str') 

302def get_checksum(token: str, problem_id: int): 

303 if sandbox.find_by_token(token) is None: 

304 return HTTPError('Invalid sandbox token', 401) 

305 problem = Problem(problem_id) 

306 if not problem: 

307 return HTTPError(f'{problem} not found', 404) 

308 meta = json.dumps({ 

309 'tasks': 

310 [json.loads(task.to_json()) for task in problem.test_case.tasks] 

311 }).encode() 

312 content = problem.test_case.case_zip.read() + meta 

313 digest = hashlib.md5(content).hexdigest() 

314 return HTTPResponse(data=digest) 

315 

316 

317@problem_api.route('/<int:problem_id>/meta', methods=['GET']) 

318@Request.args('token: str') 

319def get_meta(token: str, problem_id: int): 

320 if sandbox.find_by_token(token) is None: 

321 return HTTPError('Invalid sandbox token', 401) 

322 problem = Problem(problem_id) 

323 if not problem: 

324 return HTTPError(f'{problem} not found', 404) 

325 meta = { 

326 'tasks': 

327 [json.loads(task.to_json()) for task in problem.test_case.tasks] 

328 } 

329 return HTTPResponse(data=meta) 

330 

331 

332@problem_api.route('/<int:problem_id>/high-score', methods=['GET']) 

333@login_required 

334@Request.doc('problem_id', 'problem', Problem) 

335def high_score(user: User, problem: Problem): 

336 return HTTPResponse(data={ 

337 'score': problem.get_high_score(user=user), 

338 }) 

339 

340 

341@problem_api.route('/clone', methods=['POST']) 

342@problem_api.route('/copy', methods=['POST']) 

343@identity_verify(0, 1) 

344@Request.json('problem_id: int', 'target', 'status') 

345@Request.doc('problem_id', 'problem', Problem) 

346def clone_problem( 

347 user: User, 

348 problem: Problem, 

349 target, 

350 status, 

351): 

352 if not problem.permission(user, problem.Permission.VIEW): 

353 return HTTPError('Problem can not view.', 403) 

354 override = drop_none({'status': status}) 

355 new_problem_id = problem.copy_to( 

356 user=user, 

357 target=target, 

358 **override, 

359 ) 

360 return HTTPResponse( 

361 'Success.', 

362 data={'problemId': new_problem_id}, 

363 ) 

364 

365 

366@problem_api.route('/publish', methods=['POST']) 

367@identity_verify(0, 1) 

368@Request.json('problem_id') 

369@Request.doc('problem_id', 'problem', Problem) 

370def publish_problem(user, problem: Problem): 

371 if user.role == 1 and problem.owner != user.username: 

372 return HTTPError('Not the owner.', 403) 

373 Problem.release_problem(problem.problem_id) 

374 return HTTPResponse('Success.') 

375 

376 

377@problem_api.route('/<int:problem_id>/stats', methods=['GET']) 

378@login_required 

379@Request.doc('problem_id', 'problem', Problem) 

380def problem_stats(user: User, problem: Problem): 

381 if not problem.permission(user, problem.Permission.VIEW): 

382 return permission_error_response() 

383 if not problem.permission(user=user, req=problem.Permission.ONLINE): 

384 return online_error_response() 

385 ret = {} 

386 students = [] 

387 for course in problem.courses: 

388 students += [User(name) for name in course.student_nicknames.keys()] 

389 students_high_scores = [problem.get_high_score(user=u) for u in students] 

390 # These score statistics are only counting the scores of the students in the course. 

391 ret['acUserRatio'] = [problem.get_ac_user_count(), len(students)] 

392 ret['triedUserCount'] = problem.get_tried_user_count() 

393 ret['average'] = None if len(students) == 0 else statistics.mean( 

394 students_high_scores) 

395 ret['std'] = None if len(students) <= 1 else statistics.pstdev( 

396 students_high_scores) 

397 ret['scoreDistribution'] = students_high_scores 

398 # However, submissions include the submissions of teacher and admin. 

399 ret['statusCount'] = problem.get_submission_status() 

400 params = { 

401 'user': user, 

402 'offset': 0, 

403 'count': 10, 

404 'problem': problem.id, 

405 'status': 0, 

406 } 

407 top_10_runtime_submissions = [ 

408 s.to_dict() for s in Submission.filter(**params, sort_by='runTime') 

409 ] 

410 ret['top10RunTime'] = top_10_runtime_submissions 

411 top_10_memory_submissions = [ 

412 s.to_dict() for s in Submission.filter(**params, sort_by='memoryUsage') 

413 ] 

414 ret['top10MemoryUsage'] = top_10_memory_submissions 

415 return HTTPResponse('Success.', data=ret)