Coverage for mongo/course.py: 100%

126 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2024-11-05 04:22 +0000

1from . import engine 

2from .user import * 

3from .utils import * 

4import re 

5import enum 

6from typing import Dict, List, Optional 

7from .base import MongoBase 

8from datetime import datetime 

9 

10__all__ = [ 

11 'Course', 

12] 

13 

14 

15class Course(MongoBase, engine=engine.Course): 

16 

17 class Permission(enum.IntFlag): 

18 VIEW = enum.auto() # view course basic info 

19 SCORE = enum.auto() # only can view self score 

20 MODIFY = enum.auto() # manage course 

21 GRADE = enum.auto() # grade students' score 

22 

23 def __new__(cls, course_name, *args, **kwargs): 

24 try: 

25 new = super().__new__(cls, course_name) 

26 except engine.ValidationError: 

27 try: 

28 pk = Course.engine.objects(course_name=course_name).get() 

29 new = super().__new__(cls, pk) 

30 except engine.DoesNotExist: 

31 new = super().__new__(cls, '0' * 24) 

32 return new 

33 

34 def update_student_namelist( 

35 self, 

36 student_nicknames: Dict[str, str], 

37 ): 

38 from .homework import Homework 

39 if not all(User(name) for name in student_nicknames): 

40 raise engine.DoesNotExist(f'User not found') 

41 drop_user = set(self.student_nicknames) - set(student_nicknames) 

42 for user in drop_user: 

43 self.remove_user(User(user).obj) 

44 new_user = set(student_nicknames) - set(self.student_nicknames) 

45 for user in new_user: 

46 self.add_user(User(user).obj) 

47 self.student_nicknames = student_nicknames 

48 # TODO: use event to update homework data 

49 drop_user = [*map(User, drop_user)] 

50 new_user = [*map(User, new_user)] 

51 for homework in map(Homework, self.homeworks): 

52 homework.remove_student(drop_user) 

53 homework.add_student(new_user) 

54 self.save() 

55 

56 def add_user(self, user: User): 

57 if not self: 

58 raise engine.DoesNotExist(f'Course [{self.course_name}]') 

59 user.update(add_to_set__courses=self.id) 

60 user.reload('courses') 

61 

62 def remove_user(self, user: User): 

63 user.update(pull__courses=self.id) 

64 user.reload('courses') 

65 

66 @classmethod 

67 def get_all(cls): 

68 return engine.Course.objects 

69 

70 @classmethod 

71 def get_user_courses(cls, user): 

72 if user.role != 0: 

73 return user.courses 

74 else: 

75 return cls.get_all() 

76 

77 def edit_course(self, user, new_course, teacher): 

78 if re.match(r'^[a-zA-Z0-9._\- ]+$', new_course) is None: 

79 raise ValueError 

80 

81 if not self: 

82 raise engine.DoesNotExist('Course') 

83 if not perm(self, user): 

84 raise PermissionError 

85 te = User(teacher) 

86 if not te: 

87 raise engine.DoesNotExist('User') 

88 

89 self.course_name = new_course 

90 if te.obj != self.teacher: 

91 self.remove_user(self.teacher) 

92 self.add_user(te.obj) 

93 self.teacher = te.obj 

94 self.save() 

95 return True 

96 

97 def delete_course(self, user): 

98 if not self: 

99 # course not found 

100 raise engine.DoesNotExist('Course') 

101 if not perm(self, user): 

102 # user is not the TA or teacher in course 

103 raise PermissionError 

104 

105 self.remove_user(self.teacher) 

106 self.delete() 

107 return True 

108 

109 def get_scoreboard( 

110 self, 

111 problem_ids: List[int], 

112 start: Optional[float] = None, 

113 end: Optional[float] = None, 

114 ) -> List[Dict]: 

115 scoreboard = [] 

116 usernames = [User(u).id for u in self.student_nicknames.keys()] 

117 matching = { 

118 "user": { 

119 "$in": usernames 

120 }, 

121 "problem": { 

122 "$in": problem_ids 

123 }, 

124 "timestamp": {}, 

125 } 

126 if start: 

127 matching['timestamp']['$gte'] = datetime.fromtimestamp(start) 

128 if end: 

129 matching['timestamp']['$lte'] = datetime.fromtimestamp(end) 

130 if not matching["timestamp"]: 

131 del matching["timestamp"] 

132 pipeline = [ 

133 { 

134 "$match": matching 

135 }, 

136 { 

137 "$group": { 

138 "_id": { 

139 "user": "$user", 

140 "problem": "$problem", 

141 }, 

142 "count": { 

143 "$sum": 1 

144 }, 

145 "max": { 

146 "$max": "$score" 

147 }, 

148 "min": { 

149 "$min": "$score" 

150 }, 

151 "avg": { 

152 "$avg": "$score" 

153 }, 

154 } 

155 }, 

156 { 

157 "$group": { 

158 "_id": "$_id.user", 

159 "scores": { 

160 "$push": { 

161 "pid": "$_id.problem", 

162 "count": "$count", 

163 "max": "$max", 

164 "min": "$min", 

165 "avg": "$avg", 

166 }, 

167 }, 

168 } 

169 }, 

170 ] 

171 cursor = engine.Submission.objects().aggregate(pipeline) 

172 unrecorded_users = set(usernames) 

173 for item in cursor: 

174 sum_of_score = sum(s['max'] for s in item['scores']) 

175 scoreboard.append({ 

176 'user': User(item['_id']).info, 

177 'sum': sum_of_score, 

178 'avg': sum_of_score / len(problem_ids), 

179 **{f'{score["pid"]}': score 

180 for score in item['scores']}, 

181 }) 

182 unrecorded_users.remove(item['_id']) 

183 for u in unrecorded_users: 

184 scoreboard.append({ 

185 'user': User(u).info, 

186 'sum': 0, 

187 'avg': 0, 

188 }) 

189 

190 return scoreboard 

191 

192 @classmethod 

193 def add_course(cls, course, teacher): 

194 if re.match(r'^[a-zA-Z0-9._\- ]+$', course) is None: 

195 raise ValueError 

196 teacher = User(teacher) 

197 if not teacher: 

198 raise engine.DoesNotExist('User') 

199 if teacher.role >= 2: 

200 raise PermissionError( 

201 f'{teacher} is not permitted to create a course') 

202 co = cls.engine( 

203 course_name=course, 

204 teacher=teacher.obj, 

205 ).save() 

206 cls(co).add_user(teacher.obj) 

207 return True 

208 

209 @classmethod 

210 def get_public(cls): 

211 if not cls('Public'): 

212 cls.add_course('Public', 'first_admin') 

213 return cls('Public') 

214 

215 def own_permission(self, user) -> Permission: 

216 ROLE_CAPABILITY = { 

217 0: 

218 self.Permission(0), 

219 1: 

220 self.Permission.VIEW | self.Permission.SCORE, 

221 2: 

222 self.Permission.VIEW | self.Permission.GRADE, 

223 3: 

224 self.Permission.VIEW | self.Permission.GRADE 

225 | self.Permission.MODIFY, 

226 4: 

227 self.Permission.VIEW | self.Permission.GRADE 

228 | self.Permission.MODIFY, 

229 } 

230 

231 role = perm(self.obj, user) 

232 

233 return ROLE_CAPABILITY[role] 

234 

235 def permission(self, user, req) -> bool: 

236 """ 

237 check whether user own `req` permission 

238 """ 

239 

240 return bool(self.own_permission(user) & req)