Coverage for mongo/engine.py: 99%

241 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-14 03:01 +0000

1from mongoengine import * 

2from mongoengine import signals 

3import mongoengine 

4import os 

5import html 

6from enum import IntEnum 

7from datetime import datetime 

8from zipfile import ZipFile, BadZipFile 

9 

10__all__ = [*mongoengine.__all__] 

11 

12MONGO_HOST = os.environ.get('MONGO_HOST', 'mongomock://localhost') 

13 

14# FIXME: we should use config to check whether is in testing 

15if MONGO_HOST.startswith('mongomock'): 

16 import mongomock 

17 MONGO_HOST = MONGO_HOST.replace('mongomock', 'mongodb') 

18 connect( 

19 'normal-oj', 

20 host=MONGO_HOST, 

21 mongo_client_class=mongomock.MongoClient, 

22 ) 

23else: 

24 connect('normal-oj', host=MONGO_HOST) 

25 

26 

27def handler(event): 

28 ''' 

29 Signal decorator to allow use of callback functions as class decorators. 

30 reference: http://docs.mongoengine.org/guide/signals.html 

31 ''' 

32 

33 def decorator(fn): 

34 

35 def apply(cls): 

36 event.connect(fn, sender=cls) 

37 return cls 

38 

39 fn.apply = apply 

40 return fn 

41 

42 return decorator 

43 

44 

45@handler(signals.pre_save) 

46def escape_markdown(sender, document): 

47 document.markdown = html.escape(document.markdown) 

48 

49 

50class ZipField(FileField): 

51 

52 def __init__(self, max_size=0, **ks): 

53 super().__init__(**ks) 

54 self.max_size = max_size 

55 

56 def validate(self, value): 

57 super().validate(value) 

58 # skip check 

59 if not value: 

60 return 

61 try: 

62 with ZipFile(value) as zf: 

63 # the size of original files 

64 size = sum(info.file_size for info in zf.infolist()) 

65 except BadZipFile: 

66 self.error('Only accept zip file.') 

67 # no limit 

68 if self.max_size <= 0: 

69 return 

70 if size > self.max_size: 

71 self.error( 

72 f'{size} bytes exceed the max size limit ({self.max_size} bytes)' 

73 ) 

74 

75 

76class IntEnumField(IntField): 

77 

78 def __init__(self, enum: IntEnum, **ks): 

79 super().__init__(**ks) 

80 self.enum = enum 

81 

82 def validate(self, value): 

83 choices = (*self.enum.__members__.values(), ) 

84 if value not in choices: 

85 self.error(f'Value must be one of {choices}') 

86 

87 

88class Profile(EmbeddedDocument): 

89 displayed_name = StringField( 

90 db_field='displayedName', 

91 default='', 

92 max_length=16, 

93 ) 

94 bio = StringField( 

95 max_length=64, 

96 required=True, 

97 default='', 

98 ) 

99 

100 

101class EditorConfig(EmbeddedDocument): 

102 font_size = IntField(db_field='fontSize', 

103 min_value=8, 

104 max_value=72, 

105 default=14) 

106 theme = StringField( 

107 default='default', 

108 choices=[ 

109 "default", "base16-dark", "base16-light", "dracula", "eclipse", 

110 "material", "monokai" 

111 ], 

112 ) 

113 indent_type = IntField(db_field='indentType', default=1, choices=[0, 1]) 

114 tab_size = IntField( 

115 db_field='tabSize', 

116 default=4, 

117 min_value=1, 

118 max_value=8, 

119 ) 

120 language = IntField( 

121 default=0, 

122 choices=[0, 1, 2], 

123 ) 

124 

125 

126class Duration(EmbeddedDocument): 

127 start = DateTimeField(default=datetime.now) 

128 end = DateTimeField(default=datetime(2111, 10, 10)) 

129 

130 def __contains__(self, other) -> bool: 

131 if not isinstance(other, datetime): 

132 return False 

133 return self.start <= other <= self.end 

134 

135 

136class User(Document): 

137 

138 class Role(IntEnum): 

139 ADMIN = 0 

140 TEACHER = 1 

141 STUDENT = 2 

142 

143 username = StringField(max_length=16, required=True, primary_key=True) 

144 user_id = StringField(db_field='userId', max_length=24, required=True) 

145 user_id2 = StringField(db_field='userId2', max_length=24, default='') 

146 email = EmailField(required=True, unique=True, max_length=128) 

147 md5 = StringField(required=True, max_length=32) 

148 active = BooleanField(default=False) 

149 role = IntEnumField(default=Role.STUDENT, enum=Role) 

150 profile = EmbeddedDocumentField(Profile, default=Profile) 

151 editor_config = EmbeddedDocumentField( 

152 EditorConfig, 

153 db_field='editorConfig', 

154 default=EditorConfig, 

155 null=True, 

156 ) 

157 courses = ListField(ReferenceField('Course')) 

158 submissions = ListField(ReferenceField('Submission')) 

159 last_submit = DateTimeField(default=datetime.min) 

160 AC_problem_ids = ListField(IntField(), default=list) 

161 AC_submission = IntField(default=0) 

162 submission = IntField(default=0) 

163 problem_submission = DictField(db_field='problemSubmission') 

164 

165 @property 

166 def info(self): 

167 return { 

168 'username': self.username, 

169 'displayedName': self.profile.displayed_name, 

170 'md5': self.md5, 

171 'role': self.role, 

172 } 

173 

174 

175@escape_markdown.apply 

176class Homework(Document): 

177 

178 homework_name = StringField( 

179 max_length=64, 

180 required=True, 

181 db_field='homeworkName', 

182 unique_with='course_id', 

183 ) 

184 markdown = StringField(max_length=10000, default='') 

185 scoreboard_status = IntField( 

186 default=0, 

187 choices=[0, 1], 

188 db_field='scoreboardStatus', 

189 ) 

190 course_id = StringField(required=True, db_field='courseId') 

191 duration = EmbeddedDocumentField(Duration, default=Duration) 

192 problem_ids = ListField(IntField(), db_field='problemIds') 

193 student_status = DictField(db_field='studentStatus') 

194 ip_filters = ListField(StringField(max_length=64), default=list) 

195 penalty = StringField(max_length=10000, default='score = 0') 

196 

197 

198class Course(Document): 

199 course_name = StringField( 

200 max_length=64, 

201 required=True, 

202 unique=True, 

203 db_field='courseName', 

204 ) 

205 student_nicknames = DictField(db_field='studentNicknames') 

206 course_status = IntField(default=0, choices=[0, 1]) 

207 teacher = ReferenceField('User') 

208 tas = ListField(ReferenceField('User')) 

209 homeworks = ListField(ReferenceField('Homework', reverse_delete_rule=PULL)) 

210 announcements = ListField(ReferenceField('Announcement')) 

211 posts = ListField(ReferenceField('Post'), default=list) 

212 student_scores = DictField(db_field='studentScores') 

213 

214 

215class Number(Document): 

216 name = StringField( 

217 max_length=64, 

218 primary_key=True, 

219 ) 

220 number = IntField(default=1) 

221 

222 

223class ProblemCase(EmbeddedDocument): 

224 task_score = IntField(required=True, db_field='taskScore') 

225 case_count = IntField(required=True, db_field='caseCount') 

226 memory_limit = IntField(required=True, db_field='memoryLimit') 

227 time_limit = IntField(required=True, db_field='timeLimit') 

228 

229 

230class ProblemTestCase(EmbeddedDocument): 

231 language = IntField(choices=[0, 1, 2]) 

232 fill_in_template = StringField(db_field='fillInTemplate', max_length=16000) 

233 tasks = EmbeddedDocumentListField( 

234 ProblemCase, 

235 default=list, 

236 ) 

237 # zip file contains testcase input/output 

238 case_zip = ZipField( 

239 db_field='caseZip', 

240 defautl=None, 

241 null=True, 

242 ) 

243 case_zip_minio_path = StringField( 

244 null=True, 

245 max_length=256, 

246 db_field='caseZipMinioPath', 

247 ) 

248 

249 

250class ProblemDescription(EmbeddedDocument): 

251 description = StringField(max_length=100000) 

252 input = StringField(max_length=100000) 

253 output = StringField(max_length=100000) 

254 hint = StringField(max_length=100000) 

255 sample_input = ListField( 

256 StringField(max_length=1024), 

257 default=list, 

258 db_field='sampleInput', 

259 ) 

260 sample_output = ListField( 

261 StringField(max_length=1024), 

262 default=list, 

263 db_field='sampleOutput', 

264 ) 

265 

266 def escape(self): 

267 self.description, self.input, self.output, self.hint = (html.escape( 

268 v or '') for v in ( 

269 self.description, 

270 self.input, 

271 self.output, 

272 self.hint, 

273 )) 

274 _io = zip(self.sample_input, self.sample_output) 

275 for i, (ip, op) in enumerate(_io): 

276 self.sample_input[i] = ip or html.escape(ip) 

277 self.sample_output[i] = op or html.escape(op) 

278 

279 

280@handler(signals.pre_save) 

281def problem_desc_escape(sender, document): 

282 document.description.escape() 

283 

284 

285@problem_desc_escape.apply 

286class Problem(Document): 

287 

288 class Visibility: 

289 SHOW = 0 

290 HIDDEN = 1 

291 

292 problem_id = SequenceField( 

293 db_field='problemId', 

294 required=True, 

295 primary_key=True, 

296 ) 

297 courses = ListField(ReferenceField('Course'), default=list) 

298 problem_status = IntField( 

299 default=1, 

300 choices=[Visibility.SHOW, Visibility.HIDDEN], 

301 db_field='problemStatus', 

302 ) 

303 problem_type = IntField( 

304 default=0, 

305 choices=[0, 1, 2], 

306 db_field='problemType', 

307 ) 

308 problem_name = StringField( 

309 db_field='problemName', 

310 max_length=64, 

311 required=True, 

312 ) 

313 description = EmbeddedDocumentField( 

314 ProblemDescription, 

315 default=ProblemDescription, 

316 ) 

317 owner = StringField(max_length=16, required=True) 

318 # pdf = 

319 tags = ListField(StringField(max_length=16)) 

320 test_case = EmbeddedDocumentField( 

321 ProblemTestCase, 

322 db_field='testCase', 

323 default=ProblemTestCase, 

324 ) 

325 ac_user = IntField(db_field='ACUser', default=0) 

326 submitter = IntField(default=0) 

327 homeworks = ListField(ReferenceField('Homework'), default=list) 

328 # user can view stdout/stderr 

329 can_view_stdout = BooleanField(db_field='canViewStdout', default=True) 

330 cpp_report_url = StringField( 

331 db_field='cppReportUrl', 

332 default='', 

333 max_length=128, 

334 ) 

335 python_report_url = StringField( 

336 db_field='pythonReportUrl', 

337 default='', 

338 max_length=128, 

339 ) 

340 # moss_status (not started: 0, processing: 1, done: 2) 

341 moss_status = IntField( 

342 default=0, 

343 choices=[0, 1, 2], 

344 db_field='mossStatus', 

345 ) 

346 # bitmask of allowed languages (c: 1, cpp: 2, py3: 4) 

347 allowed_language = IntField(db_field='allowedLanguage', default=7) 

348 # high score for each student 

349 # Dict[username, score] 

350 high_scores = DictField(db_field='highScore', default={}) 

351 quota = IntField(default=-1) 

352 default_code = StringField( 

353 db_field='defaultCode', 

354 max_length=10**4, 

355 default='', 

356 ) 

357 

358 

359class CaseResult(EmbeddedDocument): 

360 status = IntField(required=True) 

361 exec_time = IntField(required=True, db_field='execTime') 

362 memory_usage = IntField(required=True, db_field='memoryUsage') 

363 output = ZipField( 

364 required=True, 

365 null=True, 

366 max_size=11**9, 

367 ) 

368 

369 

370class TaskResult(EmbeddedDocument): 

371 status = IntField(default=-1) 

372 exec_time = IntField(default=-1, db_field='execTime') 

373 memory_usage = IntField(default=-1, db_field='memoryUsage') 

374 score = IntField(default=0) 

375 cases = EmbeddedDocumentListField(CaseResult, default=list) 

376 

377 

378class Submission(Document): 

379 meta = { 

380 'indexes': [ 

381 ( 

382 'id', 

383 'user', 

384 'score', 

385 'status', 

386 'problem', 

387 'language', 

388 'timestamp', 

389 ), 

390 ] 

391 } 

392 problem = ReferenceField(Problem, required=True) 

393 user = ReferenceField(User, required=True) 

394 language = IntField( 

395 required=True, 

396 min_value=0, 

397 max_value=3, 

398 db_field='languageType', 

399 ) 

400 timestamp = DateTimeField(required=True) 

401 status = IntField(default=-2) 

402 score = IntField(default=-1) 

403 tasks = EmbeddedDocumentListField(TaskResult, default=list) 

404 exec_time = IntField(default=-1, db_field='runTime') 

405 memory_usage = IntField(default=-1, db_field='memoryUsage') 

406 code = ZipField(required=True, null=True, max_size=10**7) 

407 code_minio_path = StringField( 

408 null=True, 

409 max_length=256, 

410 db_field='codeMinioPath', 

411 ) 

412 last_send = DateTimeField(db_field='lastSend', default=datetime.now) 

413 comment = FileField(default=None, null=True) 

414 ip_addr = StringField(default=None, null=True) 

415 

416 

417@escape_markdown.apply 

418class Message(Document): 

419 timestamp = DateTimeField(default=datetime.now) 

420 sender = StringField(max_length=16, required=True) 

421 receivers = ListField(StringField(max_length=16), required=True) 

422 status = IntField(default=0, choices=[0, 1]) # not delete / delete 

423 title = StringField(max_length=32, required=True) 

424 markdown = StringField(max_length=100000, required=True) 

425 

426 

427@escape_markdown.apply 

428class Announcement(Document): 

429 status = IntField(default=0, choices=[0, 1]) # not delete / delete 

430 title = StringField(max_length=64, required=True) 

431 course = ReferenceField('Course', required=True) 

432 create_time = DateTimeField(db_field='createTime', default=datetime.now) 

433 update_time = DateTimeField(db_field='updateTime', default=datetime.now) 

434 creator = ReferenceField('User', required=True) 

435 updater = ReferenceField('User', required=True) 

436 markdown = StringField(max_length=100000, required=True) 

437 pinned = BooleanField(default=False) 

438 

439 

440@escape_markdown.apply 

441class PostThread(Document): 

442 markdown = StringField(default='', required=True, max_length=100000) 

443 author = ReferenceField('User', db_field='author') 

444 course_id = ReferenceField('Course', db_field='courseId') 

445 depth = IntField(default=0) # 0 is top post, 1 is reply to post 

446 created = DateTimeField(required=True) 

447 updated = DateTimeField(required=True) 

448 status = IntField(default=0, choices=[0, 1]) # not delete / delete 

449 reply = ListField(ReferenceField('PostThread', db_field='postThread'), 

450 dafault=list) 

451 

452 

453class Post(Document): 

454 post_name = StringField(default='', required=True, max_length=64) 

455 thread = ReferenceField('PostThread', db_field='postThread') 

456 

457 

458class Config(Document): 

459 meta = { 

460 'allow_inheritance': True, 

461 } 

462 name = StringField(required=True, max_length=64, primary_key=True) 

463 

464 

465class Sandbox(EmbeddedDocument): 

466 name = StringField(required=True) 

467 url = StringField(required=True) 

468 token = StringField(required=True) 

469 

470 

471class SubmissionConfig(Config): 

472 rate_limit = IntField(default=0, db_field='rateLimit') 

473 sandbox_instances = EmbeddedDocumentListField( 

474 Sandbox, 

475 default=[ 

476 Sandbox( 

477 name='Sandbox-0', 

478 url='http://sandbox:1450', 

479 token='KoNoSandboxDa', 

480 ), 

481 ], 

482 db_field='sandboxInstances', 

483 ) 

484 

485 

486class LoginRecords(Document): 

487 user_id = StringField(required=True) 

488 ip_addr = StringField(required=True) 

489 success = BooleanField(required=True, default=False) 

490 timestamp = DateTimeField(required=True, default=datetime.now)