Coverage for model/problem.py: 91%
239 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-14 03:01 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-14 03:01 +0000
1import json
2import hashlib
3import statistics
4from dataclasses import asdict
5from flask import Blueprint, request, send_file
6from urllib import parse
7from zipfile import BadZipFile
8from mongo import *
9from mongo import engine
10from mongo import sandbox
11from mongo.utils import drop_none
12from mongo.problem import *
13from .auth import *
14from .utils import *
16__all__ = ['problem_api']
18problem_api = Blueprint('problem_api', __name__)
21def permission_error_response():
22 return HTTPError('Not enough permission', 403)
25def online_error_response():
26 return HTTPError('Problem is unavailable', 403)
29@problem_api.route('/', methods=['GET'])
30@login_required
31@Request.args(
32 'offset',
33 'count',
34 'problem_id',
35 'tags',
36 'name',
37 'course',
38)
39def view_problem_list(
40 user,
41 offset,
42 count,
43 tags,
44 problem_id,
45 name,
46 course,
47):
48 # casting args
49 try:
50 if offset is not None:
51 offset = int(offset)
52 if count is not None:
53 count = int(count)
54 except (TypeError, ValueError):
55 return HTTPError(
56 'offset and count must be integer!',
57 400,
58 )
59 problem_id, name, tags, course = (parse.unquote(p or '') or None
60 for p in (problem_id, name, tags,
61 course))
62 try:
63 ks = {
64 'user': user,
65 'offset': offset,
66 'count': count,
67 'tags': tags and tags.split(','),
68 'problem_id': problem_id,
69 'name': name,
70 'course': course,
71 }
72 ks = {k: v for k, v in ks.items() if v is not None}
73 data = Problem.get_problem_list(**ks)
74 except IndexError:
75 return HTTPError('invalid offset', 400)
76 data = [{
77 'problemId': p.problem_id,
78 'problemName': p.problem_name,
79 'status': p.problem_status,
80 'ACUser': p.ac_user,
81 'submitter': p.submitter,
82 'tags': p.tags,
83 'type': p.problem_type,
84 'quota': p.quota,
85 'submitCount': Problem(p.problem_id).submit_count(user)
86 } for p in data]
87 return HTTPResponse('Success.', data=data)
90@problem_api.route('/<int:problem_id>', methods=['GET'])
91@problem_api.route('/view/<int:problem_id>', methods=['GET'])
92@login_required
93@Request.doc('problem_id', 'problem', Problem)
94def view_problem(user: User, problem: Problem):
95 if not problem.permission(user=user, req=problem.Permission.VIEW):
96 return permission_error_response()
97 if not problem.permission(user=user, req=problem.Permission.ONLINE):
98 return online_error_response()
99 # ip validation
100 if not problem.is_valid_ip(get_ip()):
101 return HTTPError('Invalid IP address.', 403)
102 # filter data
103 data = problem.detailed_info(
104 'problemName',
105 'description',
106 'owner',
107 'tags',
108 'allowedLanguage',
109 'courses',
110 'quota',
111 defaultCode='defaultCode',
112 status='problemStatus',
113 type='problemType',
114 testCase='testCase__tasks',
115 )
116 if problem.obj.problem_type == 1:
117 data.update({'fillInTemplate': problem.obj.test_case.fill_in_template})
118 data.update({
119 'submitCount': problem.submit_count(user),
120 'highScore': problem.get_high_score(user=user),
121 })
122 return HTTPResponse('Problem can view.', data=data)
125@problem_api.route('/manage/<int:problem_id>', methods=['GET'])
126@Request.doc('problem_id', 'problem', Problem)
127@identity_verify(0, 1) # admin and teacher only
128def get_problem_detailed(user, problem: Problem):
129 '''
130 Get problem's detailed information
131 '''
132 if not problem.permission(user, problem.Permission.MANAGE):
133 return permission_error_response()
134 if not problem.permission(user=user, req=problem.Permission.ONLINE):
135 return online_error_response()
136 info = problem.detailed_info(
137 'courses',
138 'problemName',
139 'description',
140 'tags',
141 'testCase',
142 'ACUser',
143 'submitter',
144 'allowedLanguage',
145 'canViewStdout',
146 'quota',
147 status='problemStatus',
148 type='problemType',
149 )
150 info.update({'submitCount': problem.submit_count(user)})
151 return HTTPResponse(
152 'Success.',
153 data=info,
154 )
157@problem_api.route('/manage', methods=['POST'])
158@identity_verify(0, 1)
159@Request.json(
160 'type',
161 'courses: list',
162 'status',
163 'type',
164 'description',
165 'tags',
166 'problem_name',
167 'quota',
168 'test_case_info',
169 'can_view_stdout',
170 'allowed_language',
171 'default_code',
172)
173def create_problem(user: User, **ks):
174 try:
175 pid = Problem.add(user=user, **ks)
176 except ValidationError as e:
177 return HTTPError(
178 'Invalid or missing arguments.',
179 400,
180 data=e.to_dict(),
181 )
182 except DoesNotExist as e:
183 return HTTPError('Course not found', 404)
184 except ValueError as e:
185 return HTTPError(str(e), 400)
186 return HTTPResponse(data={'problemId': pid})
189@problem_api.route('/manage/<int:problem>', methods=['DELETE'])
190@identity_verify(0, 1)
191@Request.doc('problem', Problem)
192def delete_problem(user: User, problem: Problem):
193 if not problem.permission(user, problem.Permission.MANAGE):
194 return permission_error_response()
195 if not problem.permission(user=user, req=problem.Permission.ONLINE):
196 return online_error_response()
197 problem.delete()
198 return HTTPResponse()
201@problem_api.route('/manage/<int:problem>', methods=['PUT'])
202@identity_verify(0, 1)
203@Request.doc('problem', Problem)
204def manage_problem(user: User, problem: Problem):
206 @Request.json(
207 'type',
208 'courses: list',
209 'status',
210 'type',
211 'description',
212 'tags',
213 'problem_name',
214 'quota',
215 'test_case_info',
216 'can_view_stdout',
217 'allowed_language',
218 'default_code',
219 )
220 def modify_problem(**p_ks):
221 Problem.edit_problem(
222 user=user,
223 problem_id=problem.id,
224 **drop_none(p_ks),
225 )
226 return HTTPResponse()
228 @Request.files('case')
229 def modify_problem_test_case(case):
230 try:
231 problem.update_test_case(case)
232 except engine.DoesNotExist as e:
233 return HTTPError(str(e), 404)
234 except (ValueError, BadZipFile) as e:
235 return HTTPError(str(e), 400)
236 except BadTestCase as e:
237 return HTTPError(str(e), 400)
238 return HTTPResponse('Success.')
240 if not problem.permission(user, problem.Permission.MANAGE):
241 return permission_error_response()
242 if not problem.permission(user=user, req=problem.Permission.ONLINE):
243 return online_error_response()
245 # edit problem
246 try:
247 # modify problem meta
248 if request.content_type.startswith('application/json'):
249 return modify_problem()
250 # upload testcase file
251 elif request.content_type.startswith('multipart/form-data'):
252 return modify_problem_test_case()
253 else:
254 return HTTPError(
255 'Unknown content type',
256 400,
257 data={'contentType': request.content_type},
258 )
259 except ValidationError as ve:
260 return HTTPError(
261 'Invalid or missing arguments.',
262 400,
263 data=ve.to_dict(),
264 )
265 except engine.DoesNotExist:
266 return HTTPError('Course not found.', 404)
269@problem_api.post('/<int:problem>/initiate-test-case-upload')
270@identity_verify(0, 1)
271@Request.doc('problem', Problem)
272@Request.json('length: int', 'part_size: int')
273def initiate_test_case_upload(
274 user: User,
275 problem: Problem,
276 length: int,
277 part_size: int,
278):
279 if not problem.permission(user, problem.Permission.MANAGE):
280 return permission_error_response()
281 if not problem.permission(user=user, req=problem.Permission.ONLINE):
282 return online_error_response()
283 upload_info = problem.generate_urls_for_uploading_test_case(
284 length, part_size)
285 return HTTPResponse(data=asdict(upload_info))
288@problem_api.post('/<int:problem>/complete-test-case-upload')
289@identity_verify(0, 1)
290@Request.doc('problem', Problem)
291@Request.json('upload_id', 'parts: list')
292def complete_test_case_upload(
293 user: User,
294 problem: Problem,
295 upload_id: str,
296 parts: list,
297):
298 if not problem.permission(user, problem.Permission.MANAGE):
299 return permission_error_response()
300 if not problem.permission(user=user, req=problem.Permission.ONLINE):
301 return online_error_response()
302 # convert parts to list[Part]
303 from minio.datatypes import Part
304 parts = [
305 Part(part_number=part['PartNumber'], etag=part['ETag'])
306 for part in parts
307 ]
308 try:
309 problem.complete_test_case_upload(upload_id, parts)
310 except BadTestCase as e:
311 return HTTPError(str(e), 400)
312 return HTTPResponse(status_code=201)
315@problem_api.route('/<int:problem_id>/test-case', methods=['GET'])
316@problem_api.route('/<int:problem_id>/testcase', methods=['GET'])
317@login_required
318@Request.doc('problem_id', 'problem', Problem)
319def get_test_case(user: User, problem: Problem):
320 if not problem.permission(user, problem.Permission.MANAGE):
321 return permission_error_response()
322 if not problem.permission(user=user, req=problem.Permission.ONLINE):
323 return online_error_response()
324 return send_file(
325 problem.get_test_case(),
326 mimetype='application/zip',
327 as_attachment=True,
328 download_name=f'testdata-{problem.id}.zip',
329 )
332# FIXME: Find a better name
333@problem_api.route('/<int:problem_id>/testdata', methods=['GET'])
334@Request.args('token: str')
335@Request.doc('problem_id', 'problem', Problem)
336def get_testdata(token: str, problem: Problem):
337 if sandbox.find_by_token(token) is None:
338 return HTTPError('Invalid sandbox token', 401)
339 return send_file(
340 problem.get_test_case(),
341 mimetype='application/zip',
342 as_attachment=True,
343 download_name=f'testdata-{problem.id}.zip',
344 )
347@problem_api.route('/<int:problem_id>/checksum', methods=['GET'])
348@Request.args('token: str')
349def get_checksum(token: str, problem_id: int):
350 if sandbox.find_by_token(token) is None:
351 return HTTPError('Invalid sandbox token', 401)
352 problem = Problem(problem_id)
353 if not problem:
354 return HTTPError(f'{problem} not found', 404)
355 meta = json.dumps({
356 'tasks':
357 [json.loads(task.to_json()) for task in problem.test_case.tasks]
358 }).encode()
359 # TODO: use etag of bucket object
360 content = problem.get_test_case().read() + meta
361 digest = hashlib.md5(content).hexdigest()
362 return HTTPResponse(data=digest)
365@problem_api.route('/<int:problem_id>/meta', methods=['GET'])
366@Request.args('token: str')
367def get_meta(token: str, problem_id: int):
368 if sandbox.find_by_token(token) is None:
369 return HTTPError('Invalid sandbox token', 401)
370 problem = Problem(problem_id)
371 if not problem:
372 return HTTPError(f'{problem} not found', 404)
373 meta = {
374 'tasks':
375 [json.loads(task.to_json()) for task in problem.test_case.tasks]
376 }
377 return HTTPResponse(data=meta)
380@problem_api.route('/<int:problem_id>/high-score', methods=['GET'])
381@login_required
382@Request.doc('problem_id', 'problem', Problem)
383def high_score(user: User, problem: Problem):
384 return HTTPResponse(data={
385 'score': problem.get_high_score(user=user),
386 })
389@problem_api.route('/clone', methods=['POST'])
390@problem_api.route('/copy', methods=['POST'])
391@identity_verify(0, 1)
392@Request.json('problem_id: int', 'target', 'status')
393@Request.doc('problem_id', 'problem', Problem)
394def clone_problem(
395 user: User,
396 problem: Problem,
397 target,
398 status,
399):
400 if not problem.permission(user, problem.Permission.VIEW):
401 return HTTPError('Problem can not view.', 403)
402 override = drop_none({'status': status})
403 new_problem_id = problem.copy_to(
404 user=user,
405 target=target,
406 **override,
407 )
408 return HTTPResponse(
409 'Success.',
410 data={'problemId': new_problem_id},
411 )
414@problem_api.route('/publish', methods=['POST'])
415@identity_verify(0, 1)
416@Request.json('problem_id')
417@Request.doc('problem_id', 'problem', Problem)
418def publish_problem(user, problem: Problem):
419 if user.role == 1 and problem.owner != user.username:
420 return HTTPError('Not the owner.', 403)
421 Problem.release_problem(problem.problem_id)
422 return HTTPResponse('Success.')
425@problem_api.route('/<int:problem_id>/stats', methods=['GET'])
426@login_required
427@Request.doc('problem_id', 'problem', Problem)
428def problem_stats(user: User, problem: Problem):
429 if not problem.permission(user, problem.Permission.VIEW):
430 return permission_error_response()
431 if not problem.permission(user=user, req=problem.Permission.ONLINE):
432 return online_error_response()
433 ret = {}
434 students = []
435 for course in problem.courses:
436 students += [User(name) for name in course.student_nicknames.keys()]
437 students_high_scores = [problem.get_high_score(user=u) for u in students]
438 # These score statistics are only counting the scores of the students in the course.
439 ret['acUserRatio'] = [problem.get_ac_user_count(), len(students)]
440 ret['triedUserCount'] = problem.get_tried_user_count()
441 ret['average'] = None if len(students) == 0 else statistics.mean(
442 students_high_scores)
443 ret['std'] = None if len(students) <= 1 else statistics.pstdev(
444 students_high_scores)
445 ret['scoreDistribution'] = students_high_scores
446 # However, submissions include the submissions of teacher and admin.
447 ret['statusCount'] = problem.get_submission_status()
448 params = {
449 'user': user,
450 'offset': 0,
451 'count': 10,
452 'problem': problem.id,
453 'status': 0,
454 }
455 top_10_runtime_submissions = [
456 s.to_dict() for s in Submission.filter(**params, sort_by='runTime')
457 ]
458 ret['top10RunTime'] = top_10_runtime_submissions
459 top_10_memory_submissions = [
460 s.to_dict() for s in Submission.filter(**params, sort_by='memoryUsage')
461 ]
462 ret['top10MemoryUsage'] = top_10_memory_submissions
463 return HTTPResponse('Success.', data=ret)