Coverage for model/problem.py: 98%
211 statements
« prev ^ index » next coverage.py v7.3.2, created at 2024-11-05 04:22 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2024-11-05 04:22 +0000
1import json
2import hashlib
3import statistics
4from flask import Blueprint, request, send_file
5from urllib import parse
6from zipfile import BadZipFile
7from mongo import *
8from mongo import engine
9from mongo import sandbox
10from mongo.utils import drop_none
11from mongo.problem import *
12from .auth import *
13from .utils import *
15__all__ = ['problem_api']
17problem_api = Blueprint('problem_api', __name__)
20def permission_error_response():
21 return HTTPError('Not enough permission', 403)
24def online_error_response():
25 return HTTPError('Problem is unavailable', 403)
28@problem_api.route('/', methods=['GET'])
29@login_required
30@Request.args(
31 'offset',
32 'count',
33 'problem_id',
34 'tags',
35 'name',
36 'course',
37)
38def view_problem_list(
39 user,
40 offset,
41 count,
42 tags,
43 problem_id,
44 name,
45 course,
46):
47 # casting args
48 try:
49 if offset is not None:
50 offset = int(offset)
51 if count is not None:
52 count = int(count)
53 except (TypeError, ValueError):
54 return HTTPError(
55 'offset and count must be integer!',
56 400,
57 )
58 problem_id, name, tags, course = (parse.unquote(p or '') or None
59 for p in (problem_id, name, tags,
60 course))
61 try:
62 ks = {
63 'user': user,
64 'offset': offset,
65 'count': count,
66 'tags': tags and tags.split(','),
67 'problem_id': problem_id,
68 'name': name,
69 'course': course,
70 }
71 ks = {k: v for k, v in ks.items() if v is not None}
72 data = Problem.get_problem_list(**ks)
73 except IndexError:
74 return HTTPError('invalid offset', 400)
75 data = [{
76 'problemId': p.problem_id,
77 'problemName': p.problem_name,
78 'status': p.problem_status,
79 'ACUser': p.ac_user,
80 'submitter': p.submitter,
81 'tags': p.tags,
82 'type': p.problem_type,
83 'quota': p.quota,
84 'submitCount': Problem(p.problem_id).submit_count(user)
85 } for p in data]
86 return HTTPResponse('Success.', data=data)
89@problem_api.route('/<int:problem_id>', methods=['GET'])
90@problem_api.route('/view/<int:problem_id>', methods=['GET'])
91@login_required
92@Request.doc('problem_id', 'problem', Problem)
93def view_problem(user: User, problem: Problem):
94 if not problem.permission(user=user, req=problem.Permission.VIEW):
95 return permission_error_response()
96 if not problem.permission(user=user, req=problem.Permission.ONLINE):
97 return online_error_response()
98 # ip validation
99 if not problem.is_valid_ip(get_ip()):
100 return HTTPError('Invalid IP address.', 403)
101 # filter data
102 data = problem.detailed_info(
103 'problemName',
104 'description',
105 'owner',
106 'tags',
107 'allowedLanguage',
108 'courses',
109 'quota',
110 defaultCode='defaultCode',
111 status='problemStatus',
112 type='problemType',
113 testCase='testCase__tasks',
114 )
115 if problem.obj.problem_type == 1:
116 data.update({'fillInTemplate': problem.obj.test_case.fill_in_template})
117 data.update({
118 'submitCount': problem.submit_count(user),
119 'highScore': problem.get_high_score(user=user),
120 })
121 return HTTPResponse('Problem can view.', data=data)
124@problem_api.route('/manage/<int:problem_id>', methods=['GET'])
125@Request.doc('problem_id', 'problem', Problem)
126@identity_verify(0, 1) # admin and teacher only
127def get_problem_detailed(user, problem: Problem):
128 '''
129 Get problem's detailed information
130 '''
131 if not problem.permission(user, problem.Permission.MANAGE):
132 return permission_error_response()
133 if not problem.permission(user=user, req=problem.Permission.ONLINE):
134 return online_error_response()
135 info = problem.detailed_info(
136 'courses',
137 'problemName',
138 'description',
139 'tags',
140 'testCase',
141 'ACUser',
142 'submitter',
143 'allowedLanguage',
144 'canViewStdout',
145 'quota',
146 status='problemStatus',
147 type='problemType',
148 )
149 info.update({'submitCount': problem.submit_count(user)})
150 return HTTPResponse(
151 'Success.',
152 data=info,
153 )
156@problem_api.route('/manage', methods=['POST'])
157@identity_verify(0, 1)
158@Request.json(
159 'type',
160 'courses: list',
161 'status',
162 'type',
163 'description',
164 'tags',
165 'problem_name',
166 'quota',
167 'test_case_info',
168 'can_view_stdout',
169 'allowed_language',
170 'default_code',
171)
172def create_problem(user: User, **ks):
173 try:
174 pid = Problem.add(user=user, **ks)
175 except ValidationError as e:
176 return HTTPError(
177 'Invalid or missing arguments.',
178 400,
179 data=e.to_dict(),
180 )
181 except DoesNotExist as e:
182 return HTTPError('Course not found', 404)
183 except ValueError as e:
184 return HTTPError(str(e), 400)
185 return HTTPResponse(data={'problemId': pid})
188@problem_api.route('/manage/<int:problem>', methods=['DELETE'])
189@identity_verify(0, 1)
190@Request.doc('problem', Problem)
191def delete_problem(user: User, problem: Problem):
192 if not problem.permission(user, problem.Permission.MANAGE):
193 return permission_error_response()
194 if not problem.permission(user=user, req=problem.Permission.ONLINE):
195 return online_error_response()
196 problem.delete()
197 return HTTPResponse()
200@problem_api.route('/manage/<int:problem>', methods=['PUT'])
201@identity_verify(0, 1)
202@Request.doc('problem', Problem)
203def manage_problem(user: User, problem: Problem):
205 @Request.json(
206 'type',
207 'courses: list',
208 'status',
209 'type',
210 'description',
211 'tags',
212 'problem_name',
213 'quota',
214 'test_case_info',
215 'can_view_stdout',
216 'allowed_language',
217 'default_code',
218 )
219 def modify_problem(**p_ks):
220 Problem.edit_problem(
221 user=user,
222 problem_id=problem.id,
223 **drop_none(p_ks),
224 )
225 return HTTPResponse()
227 @Request.files('case')
228 def modify_problem_test_case(case):
229 try:
230 problem.update_test_case(case)
231 except engine.DoesNotExist as e:
232 return HTTPError(str(e), 404)
233 except (ValueError, BadZipFile) as e:
234 return HTTPError(str(e), 400)
235 except BadTestCase as e:
236 return HTTPError(str(e), 400)
237 return HTTPResponse('Success.')
239 if not problem.permission(user, problem.Permission.MANAGE):
240 return permission_error_response()
241 if not problem.permission(user=user, req=problem.Permission.ONLINE):
242 return online_error_response()
244 # edit problem
245 try:
246 # modify problem meta
247 if request.content_type.startswith('application/json'):
248 return modify_problem()
249 # upload testcase file
250 elif request.content_type.startswith('multipart/form-data'):
251 return modify_problem_test_case()
252 else:
253 return HTTPError(
254 'Unknown content type',
255 400,
256 data={'contentType': request.content_type},
257 )
258 except ValidationError as ve:
259 return HTTPError(
260 'Invalid or missing arguments.',
261 400,
262 data=ve.to_dict(),
263 )
264 except engine.DoesNotExist:
265 return HTTPError('Course not found.', 404)
268@problem_api.route('/<int:problem_id>/test-case', methods=['GET'])
269@problem_api.route('/<int:problem_id>/testcase', methods=['GET'])
270@login_required
271@Request.doc('problem_id', 'problem', Problem)
272def get_test_case(user: User, problem: Problem):
273 if not problem.permission(user, problem.Permission.MANAGE):
274 return permission_error_response()
275 if not problem.permission(user=user, req=problem.Permission.ONLINE):
276 return online_error_response()
277 return send_file(
278 problem.test_case.case_zip,
279 mimetype='application/zip',
280 as_attachment=True,
281 download_name=f'testdata-{problem.id}.zip',
282 )
285# FIXME: Find a better name
286@problem_api.route('/<int:problem_id>/testdata', methods=['GET'])
287@Request.args('token: str')
288@Request.doc('problem_id', 'problem', Problem)
289def get_testdata(token: str, problem: Problem):
290 if sandbox.find_by_token(token) is None:
291 return HTTPError('Invalid sandbox token', 401)
292 return send_file(
293 problem.test_case.case_zip,
294 mimetype='application/zip',
295 as_attachment=True,
296 download_name=f'testdata-{problem.id}.zip',
297 )
300@problem_api.route('/<int:problem_id>/checksum', methods=['GET'])
301@Request.args('token: str')
302def get_checksum(token: str, problem_id: int):
303 if sandbox.find_by_token(token) is None:
304 return HTTPError('Invalid sandbox token', 401)
305 problem = Problem(problem_id)
306 if not problem:
307 return HTTPError(f'{problem} not found', 404)
308 meta = json.dumps({
309 'tasks':
310 [json.loads(task.to_json()) for task in problem.test_case.tasks]
311 }).encode()
312 content = problem.test_case.case_zip.read() + meta
313 digest = hashlib.md5(content).hexdigest()
314 return HTTPResponse(data=digest)
317@problem_api.route('/<int:problem_id>/meta', methods=['GET'])
318@Request.args('token: str')
319def get_meta(token: str, problem_id: int):
320 if sandbox.find_by_token(token) is None:
321 return HTTPError('Invalid sandbox token', 401)
322 problem = Problem(problem_id)
323 if not problem:
324 return HTTPError(f'{problem} not found', 404)
325 meta = {
326 'tasks':
327 [json.loads(task.to_json()) for task in problem.test_case.tasks]
328 }
329 return HTTPResponse(data=meta)
332@problem_api.route('/<int:problem_id>/high-score', methods=['GET'])
333@login_required
334@Request.doc('problem_id', 'problem', Problem)
335def high_score(user: User, problem: Problem):
336 return HTTPResponse(data={
337 'score': problem.get_high_score(user=user),
338 })
341@problem_api.route('/clone', methods=['POST'])
342@problem_api.route('/copy', methods=['POST'])
343@identity_verify(0, 1)
344@Request.json('problem_id: int', 'target', 'status')
345@Request.doc('problem_id', 'problem', Problem)
346def clone_problem(
347 user: User,
348 problem: Problem,
349 target,
350 status,
351):
352 if not problem.permission(user, problem.Permission.VIEW):
353 return HTTPError('Problem can not view.', 403)
354 override = drop_none({'status': status})
355 new_problem_id = problem.copy_to(
356 user=user,
357 target=target,
358 **override,
359 )
360 return HTTPResponse(
361 'Success.',
362 data={'problemId': new_problem_id},
363 )
366@problem_api.route('/publish', methods=['POST'])
367@identity_verify(0, 1)
368@Request.json('problem_id')
369@Request.doc('problem_id', 'problem', Problem)
370def publish_problem(user, problem: Problem):
371 if user.role == 1 and problem.owner != user.username:
372 return HTTPError('Not the owner.', 403)
373 Problem.release_problem(problem.problem_id)
374 return HTTPResponse('Success.')
377@problem_api.route('/<int:problem_id>/stats', methods=['GET'])
378@login_required
379@Request.doc('problem_id', 'problem', Problem)
380def problem_stats(user: User, problem: Problem):
381 if not problem.permission(user, problem.Permission.VIEW):
382 return permission_error_response()
383 if not problem.permission(user=user, req=problem.Permission.ONLINE):
384 return online_error_response()
385 ret = {}
386 students = []
387 for course in problem.courses:
388 students += [User(name) for name in course.student_nicknames.keys()]
389 students_high_scores = [problem.get_high_score(user=u) for u in students]
390 # These score statistics are only counting the scores of the students in the course.
391 ret['acUserRatio'] = [problem.get_ac_user_count(), len(students)]
392 ret['triedUserCount'] = problem.get_tried_user_count()
393 ret['average'] = None if len(students) == 0 else statistics.mean(
394 students_high_scores)
395 ret['std'] = None if len(students) <= 1 else statistics.pstdev(
396 students_high_scores)
397 ret['scoreDistribution'] = students_high_scores
398 # However, submissions include the submissions of teacher and admin.
399 ret['statusCount'] = problem.get_submission_status()
400 params = {
401 'user': user,
402 'offset': 0,
403 'count': 10,
404 'problem': problem.id,
405 'status': 0,
406 }
407 top_10_runtime_submissions = [
408 s.to_dict() for s in Submission.filter(**params, sort_by='runTime')
409 ]
410 ret['top10RunTime'] = top_10_runtime_submissions
411 top_10_memory_submissions = [
412 s.to_dict() for s in Submission.filter(**params, sort_by='memoryUsage')
413 ]
414 ret['top10MemoryUsage'] = top_10_memory_submissions
415 return HTTPResponse('Success.', data=ret)