Coverage for mongo/engine.py: 100%

235 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2024-11-05 04:22 +0000

1from mongoengine import * 

2from mongoengine import signals 

3import mongoengine 

4import os 

5import html 

6from enum import IntEnum 

7from datetime import datetime 

8from zipfile import ZipFile, BadZipFile 

9 

10__all__ = [*mongoengine.__all__] 

11 

12MONGO_HOST = os.environ.get('MONGO_HOST', 'mongomock://localhost') 

13connect('normal-oj', host=MONGO_HOST) 

14 

15 

16def handler(event): 

17 ''' 

18 Signal decorator to allow use of callback functions as class decorators. 

19 reference: http://docs.mongoengine.org/guide/signals.html 

20 ''' 

21 

22 def decorator(fn): 

23 

24 def apply(cls): 

25 event.connect(fn, sender=cls) 

26 return cls 

27 

28 fn.apply = apply 

29 return fn 

30 

31 return decorator 

32 

33 

34@handler(signals.pre_save) 

35def escape_markdown(sender, document): 

36 document.markdown = html.escape(document.markdown) 

37 

38 

39class ZipField(FileField): 

40 

41 def __init__(self, max_size=0, **ks): 

42 super().__init__(**ks) 

43 self.max_size = max_size 

44 

45 def validate(self, value): 

46 super().validate(value) 

47 # skip check 

48 if not value: 

49 return 

50 try: 

51 with ZipFile(value) as zf: 

52 # the size of original files 

53 size = sum(info.file_size for info in zf.infolist()) 

54 except BadZipFile: 

55 self.error('Only accept zip file.') 

56 # no limit 

57 if self.max_size <= 0: 

58 return 

59 if size > self.max_size: 

60 self.error( 

61 f'{size} bytes exceed the max size limit ({self.max_size} bytes)' 

62 ) 

63 

64 

65class IntEnumField(IntField): 

66 

67 def __init__(self, enum: IntEnum, **ks): 

68 super().__init__(**ks) 

69 self.enum = enum 

70 

71 def validate(self, value): 

72 choices = (*self.enum.__members__.values(), ) 

73 if value not in choices: 

74 self.error(f'Value must be one of {choices}') 

75 

76 

77class Profile(EmbeddedDocument): 

78 displayed_name = StringField( 

79 db_field='displayedName', 

80 default='', 

81 max_length=16, 

82 ) 

83 bio = StringField( 

84 max_length=64, 

85 required=True, 

86 default='', 

87 ) 

88 

89 

90class EditorConfig(EmbeddedDocument): 

91 font_size = IntField(db_field='fontSize', 

92 min_value=8, 

93 max_value=72, 

94 default=14) 

95 theme = StringField( 

96 default='default', 

97 choices=[ 

98 "default", "base16-dark", "base16-light", "dracula", "eclipse", 

99 "material", "monokai" 

100 ], 

101 ) 

102 indent_type = IntField(db_field='indentType', default=1, choices=[0, 1]) 

103 tab_size = IntField( 

104 db_field='tabSize', 

105 default=4, 

106 min_value=1, 

107 max_value=8, 

108 ) 

109 language = IntField( 

110 default=0, 

111 choices=[0, 1, 2], 

112 ) 

113 

114 

115class Duration(EmbeddedDocument): 

116 start = DateTimeField(default=datetime.now) 

117 end = DateTimeField(default=datetime(2111, 10, 10)) 

118 

119 def __contains__(self, other) -> bool: 

120 if not isinstance(other, datetime): 

121 return False 

122 return self.start <= other <= self.end 

123 

124 

125class User(Document): 

126 

127 class Role(IntEnum): 

128 ADMIN = 0 

129 TEACHER = 1 

130 STUDENT = 2 

131 

132 username = StringField(max_length=16, required=True, primary_key=True) 

133 user_id = StringField(db_field='userId', max_length=24, required=True) 

134 user_id2 = StringField(db_field='userId2', max_length=24, default='') 

135 email = EmailField(required=True, unique=True, max_length=128) 

136 md5 = StringField(required=True, max_length=32) 

137 active = BooleanField(default=False) 

138 role = IntEnumField(default=Role.STUDENT, enum=Role) 

139 profile = EmbeddedDocumentField(Profile, default=Profile) 

140 editor_config = EmbeddedDocumentField( 

141 EditorConfig, 

142 db_field='editorConfig', 

143 default=EditorConfig, 

144 null=True, 

145 ) 

146 courses = ListField(ReferenceField('Course')) 

147 submissions = ListField(ReferenceField('Submission')) 

148 last_submit = DateTimeField(default=datetime.min) 

149 AC_problem_ids = ListField(IntField(), default=list) 

150 AC_submission = IntField(default=0) 

151 submission = IntField(default=0) 

152 problem_submission = DictField(db_field='problemSubmission') 

153 

154 @property 

155 def info(self): 

156 return { 

157 'username': self.username, 

158 'displayedName': self.profile.displayed_name, 

159 'md5': self.md5, 

160 'role': self.role, 

161 } 

162 

163 

164@escape_markdown.apply 

165class Homework(Document): 

166 

167 homework_name = StringField( 

168 max_length=64, 

169 required=True, 

170 db_field='homeworkName', 

171 unique_with='course_id', 

172 ) 

173 markdown = StringField(max_length=10000, default='') 

174 scoreboard_status = IntField( 

175 default=0, 

176 choices=[0, 1], 

177 db_field='scoreboardStatus', 

178 ) 

179 course_id = StringField(required=True, db_field='courseId') 

180 duration = EmbeddedDocumentField(Duration, default=Duration) 

181 problem_ids = ListField(IntField(), db_field='problemIds') 

182 student_status = DictField(db_field='studentStatus') 

183 ip_filters = ListField(StringField(max_length=64), default=list) 

184 penalty = StringField(max_length=10000, default='score = 0') 

185 

186 

187class Course(Document): 

188 course_name = StringField( 

189 max_length=64, 

190 required=True, 

191 unique=True, 

192 db_field='courseName', 

193 ) 

194 student_nicknames = DictField(db_field='studentNicknames') 

195 course_status = IntField(default=0, choices=[0, 1]) 

196 teacher = ReferenceField('User') 

197 tas = ListField(ReferenceField('User')) 

198 homeworks = ListField(ReferenceField('Homework', reverse_delete_rule=PULL)) 

199 announcements = ListField(ReferenceField('Announcement')) 

200 posts = ListField(ReferenceField('Post'), default=list) 

201 student_scores = DictField(db_field='studentScores') 

202 

203 

204class Number(Document): 

205 name = StringField( 

206 max_length=64, 

207 primary_key=True, 

208 ) 

209 number = IntField(default=1) 

210 

211 

212class ProblemCase(EmbeddedDocument): 

213 task_score = IntField(required=True, db_field='taskScore') 

214 case_count = IntField(required=True, db_field='caseCount') 

215 memory_limit = IntField(required=True, db_field='memoryLimit') 

216 time_limit = IntField(required=True, db_field='timeLimit') 

217 

218 

219class ProblemTestCase(EmbeddedDocument): 

220 language = IntField(choices=[0, 1, 2]) 

221 fill_in_template = StringField(db_field='fillInTemplate', max_length=16000) 

222 tasks = EmbeddedDocumentListField( 

223 ProblemCase, 

224 default=list, 

225 ) 

226 # zip file contains testcase input/output 

227 case_zip = ZipField( 

228 db_field='caseZip', 

229 defautl=None, 

230 null=True, 

231 ) 

232 

233 

234class ProblemDescription(EmbeddedDocument): 

235 description = StringField(max_length=100000) 

236 input = StringField(max_length=100000) 

237 output = StringField(max_length=100000) 

238 hint = StringField(max_length=100000) 

239 sample_input = ListField( 

240 StringField(max_length=1024), 

241 default=list, 

242 db_field='sampleInput', 

243 ) 

244 sample_output = ListField( 

245 StringField(max_length=1024), 

246 default=list, 

247 db_field='sampleOutput', 

248 ) 

249 

250 def escape(self): 

251 self.description, self.input, self.output, self.hint = (html.escape( 

252 v or '') for v in ( 

253 self.description, 

254 self.input, 

255 self.output, 

256 self.hint, 

257 )) 

258 _io = zip(self.sample_input, self.sample_output) 

259 for i, (ip, op) in enumerate(_io): 

260 self.sample_input[i] = ip or html.escape(ip) 

261 self.sample_output[i] = op or html.escape(op) 

262 

263 

264@handler(signals.pre_save) 

265def problem_desc_escape(sender, document): 

266 document.description.escape() 

267 

268 

269@problem_desc_escape.apply 

270class Problem(Document): 

271 

272 class Visibility: 

273 SHOW = 0 

274 HIDDEN = 1 

275 

276 problem_id = SequenceField( 

277 db_field='problemId', 

278 required=True, 

279 primary_key=True, 

280 ) 

281 courses = ListField(ReferenceField('Course'), default=list) 

282 problem_status = IntField( 

283 default=1, 

284 choices=[Visibility.SHOW, Visibility.HIDDEN], 

285 db_field='problemStatus', 

286 ) 

287 problem_type = IntField( 

288 default=0, 

289 choices=[0, 1, 2], 

290 db_field='problemType', 

291 ) 

292 problem_name = StringField( 

293 db_field='problemName', 

294 max_length=64, 

295 required=True, 

296 ) 

297 description = EmbeddedDocumentField( 

298 ProblemDescription, 

299 default=ProblemDescription, 

300 ) 

301 owner = StringField(max_length=16, required=True) 

302 # pdf = 

303 tags = ListField(StringField(max_length=16)) 

304 test_case = EmbeddedDocumentField( 

305 ProblemTestCase, 

306 db_field='testCase', 

307 default=ProblemTestCase, 

308 ) 

309 ac_user = IntField(db_field='ACUser', default=0) 

310 submitter = IntField(default=0) 

311 homeworks = ListField(ReferenceField('Homework'), default=list) 

312 # user can view stdout/stderr 

313 can_view_stdout = BooleanField(db_field='canViewStdout', default=True) 

314 cpp_report_url = StringField( 

315 db_field='cppReportUrl', 

316 default='', 

317 max_length=128, 

318 ) 

319 python_report_url = StringField( 

320 db_field='pythonReportUrl', 

321 default='', 

322 max_length=128, 

323 ) 

324 # moss_status (not started: 0, processing: 1, done: 2) 

325 moss_status = IntField( 

326 default=0, 

327 choices=[0, 1, 2], 

328 db_field='mossStatus', 

329 ) 

330 # bitmask of allowed languages (c: 1, cpp: 2, py3: 4) 

331 allowed_language = IntField(db_field='allowedLanguage', default=7) 

332 # high score for each student 

333 # Dict[username, score] 

334 high_scores = DictField(db_field='highScore', default={}) 

335 quota = IntField(default=-1) 

336 default_code = StringField( 

337 db_field='defaultCode', 

338 max_length=10**4, 

339 default='', 

340 ) 

341 

342 

343class CaseResult(EmbeddedDocument): 

344 status = IntField(required=True) 

345 exec_time = IntField(required=True, db_field='execTime') 

346 memory_usage = IntField(required=True, db_field='memoryUsage') 

347 output = ZipField( 

348 required=True, 

349 null=True, 

350 max_size=11**9, 

351 ) 

352 

353 

354class TaskResult(EmbeddedDocument): 

355 status = IntField(default=-1) 

356 exec_time = IntField(default=-1, db_field='execTime') 

357 memory_usage = IntField(default=-1, db_field='memoryUsage') 

358 score = IntField(default=0) 

359 cases = EmbeddedDocumentListField(CaseResult, default=list) 

360 

361 

362class Submission(Document): 

363 meta = { 

364 'indexes': [ 

365 ( 

366 'id', 

367 'user', 

368 'score', 

369 'status', 

370 'problem', 

371 'language', 

372 'timestamp', 

373 ), 

374 ] 

375 } 

376 problem = ReferenceField(Problem, required=True) 

377 user = ReferenceField(User, required=True) 

378 language = IntField( 

379 required=True, 

380 min_value=0, 

381 max_value=3, 

382 db_field='languageType', 

383 ) 

384 timestamp = DateTimeField(required=True) 

385 status = IntField(default=-2) 

386 score = IntField(default=-1) 

387 tasks = EmbeddedDocumentListField(TaskResult, default=list) 

388 exec_time = IntField(default=-1, db_field='runTime') 

389 memory_usage = IntField(default=-1, db_field='memoryUsage') 

390 code = ZipField(required=True, null=True, max_size=10**7) 

391 last_send = DateTimeField(db_field='lastSend', default=datetime.now) 

392 comment = FileField(default=None, null=True) 

393 ip_addr = StringField(default=None, null=True) 

394 

395 

396@escape_markdown.apply 

397class Message(Document): 

398 timestamp = DateTimeField(default=datetime.now) 

399 sender = StringField(max_length=16, required=True) 

400 receivers = ListField(StringField(max_length=16), required=True) 

401 status = IntField(default=0, choices=[0, 1]) # not delete / delete 

402 title = StringField(max_length=32, required=True) 

403 markdown = StringField(max_length=100000, required=True) 

404 

405 

406@escape_markdown.apply 

407class Announcement(Document): 

408 status = IntField(default=0, choices=[0, 1]) # not delete / delete 

409 title = StringField(max_length=64, required=True) 

410 course = ReferenceField('Course', required=True) 

411 create_time = DateTimeField(db_field='createTime', default=datetime.now) 

412 update_time = DateTimeField(db_field='updateTime', default=datetime.now) 

413 creator = ReferenceField('User', required=True) 

414 updater = ReferenceField('User', required=True) 

415 markdown = StringField(max_length=100000, required=True) 

416 pinned = BooleanField(default=False) 

417 

418 

419@escape_markdown.apply 

420class PostThread(Document): 

421 markdown = StringField(default='', required=True, max_length=100000) 

422 author = ReferenceField('User', db_field='author') 

423 course_id = ReferenceField('Course', db_field='courseId') 

424 depth = IntField(default=0) # 0 is top post, 1 is reply to post 

425 created = DateTimeField(required=True) 

426 updated = DateTimeField(required=True) 

427 status = IntField(default=0, choices=[0, 1]) # not delete / delete 

428 reply = ListField(ReferenceField('PostThread', db_field='postThread'), 

429 dafault=list) 

430 

431 

432class Post(Document): 

433 post_name = StringField(default='', required=True, max_length=64) 

434 thread = ReferenceField('PostThread', db_field='postThread') 

435 

436 

437class Config(Document): 

438 meta = { 

439 'allow_inheritance': True, 

440 } 

441 name = StringField(required=True, max_length=64, primary_key=True) 

442 

443 

444class Sandbox(EmbeddedDocument): 

445 name = StringField(required=True) 

446 url = StringField(required=True) 

447 token = StringField(required=True) 

448 

449 

450class SubmissionConfig(Config): 

451 rate_limit = IntField(default=0, db_field='rateLimit') 

452 sandbox_instances = EmbeddedDocumentListField( 

453 Sandbox, 

454 default=[ 

455 Sandbox( 

456 name='Sandbox-0', 

457 url='http://sandbox:1450', 

458 token='KoNoSandboxDa', 

459 ), 

460 ], 

461 db_field='sandboxInstances', 

462 ) 

463 

464 

465class LoginRecords(Document): 

466 user_id = StringField(required=True) 

467 ip_addr = StringField(required=True) 

468 success = BooleanField(required=True, default=False) 

469 timestamp = DateTimeField(required=True, default=datetime.now)