# coding=utf-8 import json import traceback from django.db.models import Sum, F from rest_framework.decorators import action from rest_framework.views import APIView from django.db import transaction from rest_framework.viewsets import ReadOnlyModelViewSet from django.db.models import Q from utils.custom_modelviewset import CustomModelViewSet from utils import response_ok, response_error from utils.permission import IsAdministrator from utils.format import ExcelImporter, WordImporter from apps.examination.examquestion.serializers import * from apps.examination.examquestion.filters import * from apps.system.models import SysLog from apps.foundation.models import Subject, Chapter class ExamQuestionViewSet(CustomModelViewSet): permission_classes = [IsAdministrator, ] queryset = ExamQuestion.objects.filter(delete=False) serializer_class = ExamQuestionSerializer def filter_queryset(self, queryset): queryset = queryset.filter() f = ExamQuestionFilter(self.request.GET, queryset=queryset) return f.qs def perform_create(self, serializer): super(ExamQuestionViewSet, self).perform_create(serializer) instance = serializer.instance validated_data = serializer.validated_data SysLog.objects.addnew(self.request.user, SysLog.INSERT, u'添加试题库试题,id=%d' % (instance.id), validated_data) def perform_update(self, serializer): super(ExamQuestionViewSet, self).perform_update(serializer) instance = serializer.instance validated_data = serializer.validated_data SysLog.objects.addnew(self.request.user, SysLog.UPDATE, u'修改试题库试题,id=%d' % (instance.id), validated_data) if 'feedback' in self.request.data and self.request.data['feedback']: feedback = ExamQuestionFeedback.getById(self.request.data['feedback']) feedback.process(self.request.user) feedback.save() SysLog.objects.addnew(self.request.user, SysLog.UPDATE, u'处理试题错误反馈,id=%d' % (feedback.id)) def destroy(self, request, *args, **kwargs): with transaction.atomic(): instance = self.get_object() instance.delete = True instance.save() SysLog.objects.addnew(request.user, SysLog.INSERT, u"删除试题库试题,id=%d" % instance.id) return response_ok() @action(methods=['post'], detail=False) def batch_del(self, request): ids = request.data try: if ids: with transaction.atomic(): ExamQuestion.objects.filter(id__in=ids).update(delete=True) except CustomError as e: return response_error(e.get_error_msg()) except Exception as e: traceback.print_exc() return response_error(str(e)) return response_ok() @action(methods=['get'], detail=False) def export(self, request): queryset = self.filter_queryset(self.queryset) serializer = self.get_serializer(queryset, many=True) return response_ok(serializer.data) @action(methods=['post'], detail=False) def import_single(self, request): file = request.FILES.get('excel_file') status = ExcelImporter.validity(file) if not status['success']: return response_error(status['errors']) data = status['data'] question_count = 0 line = 2 try: with transaction.atomic(): cur_question = None for row in data: try: title = row['试题内容'] subject = row['科目'] chapter = row['章节'] difficulty_text = row['难度'] scores= row['分数'] analysis = row['解析'] content = row['单选选项'] right = row['正确答案'] except: raise CustomError(u'模版文件不正确,请检查模版文件或重新下载') if title and content: raise CustomError(u'第%d行:数据格式不正确' % line) if title and not content: if not subject: raise CustomError(u'第%d行:科目不能为空' % line) if not chapter: raise CustomError(u'第%d行:章节不能为空' % line) if not difficulty_text: raise CustomError(u'第%d行:难度不能为空' % line) if not scores: raise CustomError(u'第%d行:分数不能为空' % line) if difficulty_text == u'简单': difficulty = ExamQuestion.SIMPLE elif difficulty_text == u'中等': difficulty = ExamQuestion.MID elif difficulty_text == u'困难': difficulty = ExamQuestion.HARD else: raise CustomError(u'第%d行:难度为无效数据' % line) subject = Subject.objects.filter(name=subject, delete=False).first() if not subject: raise CustomError(u'第%d行:科目为无效数据' % line) chapter = Chapter.objects.filter(subject=subject, name=chapter, delete=False).first() if not chapter: raise CustomError(u'第%d行:章节为无效数据' % line) try: scores = int(scores) except: raise CustomError(u'第%d行:分数为无效数据' % line) cur_question = ExamQuestion.objects.create( chapter=chapter, type=ExamQuestion.SINGLE, difficulty=difficulty, scores=scores, title=title, analysis=analysis, create_user=request.user, create_time=timezone.now() ) question_count += 1 if not title and not cur_question: raise CustomError(u'第%d行:试题内容不能为空' % line) if content and not title: if not cur_question: continue if right: ExamQuestionOption.objects.create(main=cur_question, content=content, right=True) else: ExamQuestionOption.objects.create(main=cur_question, content=content) line += 1 SysLog.objects.addnew(request.user, SysLog.IMPORT, u"导入[%d]道单选题" % (question_count)) except CustomError as e: return response_error(e.get_error_msg()) except Exception as e: traceback.print_exc() return response_error(str(e)) return response_ok() @action(methods=['post'], detail=False) def import_multiple(self, request): file = request.FILES.get('excel_file') status = ExcelImporter.validity(file) if not status['success']: return response_error(status['errors']) data = status['data'] question_count = 0 line = 2 try: with transaction.atomic(): cur_question = None for row in data: try: title = row['试题内容'] subject = row['科目'] chapter = row['章节'] difficulty_text = row['难度'] scores = row['分数'] analysis = row['解析'] content = row['多选选项'] right = row['正确答案'] except: raise CustomError(u'模版文件不正确,请检查模版文件或重新下载') if title and content: raise CustomError(u'第%d行:数据格式不正确' % line) if title and not content: if not subject: raise CustomError(u'第%d行:科目不能为空' % line) if not chapter: raise CustomError(u'第%d行:章节不能为空' % line) if not difficulty_text: raise CustomError(u'第%d行:难度不能为空' % line) if not scores: raise CustomError(u'第%d行:分数不能为空' % line) if difficulty_text == u'简单': difficulty = ExamQuestion.SIMPLE elif difficulty_text == u'中等': difficulty = ExamQuestion.MID elif difficulty_text == u'困难': difficulty = ExamQuestion.HARD else: raise CustomError(u'第%d行:难度为无效数据' % line) subject = Subject.objects.filter(name=subject, delete=False).first() if not subject: raise CustomError(u'第%d行:科目为无效数据' % line) chapter = Chapter.objects.filter(subject=subject, name=chapter, delete=False).first() if not chapter: raise CustomError(u'第%d行:章节为无效数据' % line) try: scores = int(scores) except: raise CustomError(u'第%d行:分数为无效数据' % line) cur_question = ExamQuestion.objects.create( chapter=chapter, type=ExamQuestion.MULTIPLE, difficulty=difficulty, scores=scores, title=title, analysis=analysis, create_user=request.user, create_time=timezone.now() ) question_count += 1 if not title and not cur_question: raise CustomError(u'第%d行:试题内容不能为空' % line) if content and not title: if not cur_question: continue if right: ExamQuestionOption.objects.create(main=cur_question, content=content, right=True) else: ExamQuestionOption.objects.create(main=cur_question, content=content) line += 1 SysLog.objects.addnew(request.user, SysLog.IMPORT, u"导入[%d]道多选题" % (question_count)) except CustomError as e: return response_error(e.get_error_msg()) except Exception as e: traceback.print_exc() return response_error(str(e)) return response_ok() @action(methods=['post'], detail=False) def import_fill(self, request): file = request.FILES.get('excel_file') status = ExcelImporter.validity(file) if not status['success']: return response_error(status['errors']) data = status['data'] question_count = 0 line = 2 try: with transaction.atomic(): cur_question = None order = 0 for row in data: try: title = row['试题内容'] subject = row['科目'] chapter = row['章节'] difficulty_text = row['难度'] scores = row['分数'] analysis = row['解析'] content = row['填空答案'] except: raise CustomError(u'模版文件不正确,请检查模版文件或重新下载') if title and not content: raise CustomError(u'第%d行:答案不能为空' % line) if title and content: if not subject: raise CustomError(u'第%d行:科目不能为空' % line) if not chapter: raise CustomError(u'第%d行:章节不能为空' % line) if not difficulty_text: raise CustomError(u'第%d行:难度不能为空' % line) if not scores: raise CustomError(u'第%d行:分数不能为空' % line) if difficulty_text == u'简单': difficulty = ExamQuestion.SIMPLE elif difficulty_text == u'中等': difficulty = ExamQuestion.MID elif difficulty_text == u'困难': difficulty = ExamQuestion.HARD else: raise CustomError(u'第%d行:难度为无效数据' % line) subject = Subject.objects.filter(name=subject, delete=False).first() if not subject: raise CustomError(u'第%d行:科目为无效数据' % line) chapter = Chapter.objects.filter(subject=subject, name=chapter, delete=False).first() if not chapter: raise CustomError(u'第%d行:章节为无效数据' % line) try: scores = int(scores) except: raise CustomError(u'第%d行:分数为无效数据' % line) order = 1 cur_question = ExamQuestion.objects.create( chapter=chapter, type=ExamQuestion.FILL, difficulty=difficulty, scores=scores, title=title, analysis=analysis, create_user=request.user, create_time=timezone.now() ) ExamQuestionFill.objects.create(main=cur_question, content=content, order=order) question_count += 1 order += 1 if not title and not cur_question: raise CustomError(u'第%d行:试题内容不能为空' % line) if content and not title: if (not cur_question) or (not order): continue ExamQuestionFill.objects.create(main=cur_question, content=content, order=order) order += 1 line += 1 SysLog.objects.addnew(request.user, SysLog.IMPORT, u"导入[%d]道填空题" % (question_count)) except CustomError as e: return response_error(e.get_error_msg()) except Exception as e: traceback.print_exc() return response_error(str(e)) return response_ok() @action(methods=['post'], detail=False) def import_judgment(self, request): file = request.FILES.get('excel_file') status = ExcelImporter.validity(file) if not status['success']: return response_error(status['errors']) data = status['data'] line = 2 try: with transaction.atomic(): for row in data: try: title = row['试题内容'] subject = row['科目'] chapter = row['章节'] difficulty_text = row['难度'] scores = row['分数'] analysis = row['解析'] judgment_text = row['判断答案'] except: raise CustomError(u'模版文件不正确,请检查模版文件或重新下载') if not title: raise CustomError(u'第%d行:试题内容不能为空' % line) if not subject: raise CustomError(u'第%d行:科目不能为空' % line) if not chapter: raise CustomError(u'第%d行:章节不能为空' % line) if not difficulty_text: raise CustomError(u'第%d行:难度不能为空' % line) if not scores: raise CustomError(u'第%d行:分数不能为空' % line) if not judgment_text: raise CustomError(u'第%d行:答案不能为空' % line) if difficulty_text == u'简单': difficulty = ExamQuestion.SIMPLE elif difficulty_text == u'中等': difficulty = ExamQuestion.MID elif difficulty_text == u'困难': difficulty = ExamQuestion.HARD else: raise CustomError(u'第%d行:难度为无效数据' % line) if judgment_text == u'正确': judgment = True elif judgment_text == u'错误': judgment = False else: raise CustomError(u'第%d行:答案为无效数据' % line) subject = Subject.objects.filter(name=subject, delete=False).first() if not subject: raise CustomError(u'第%d行:科目为无效数据' % line) chapter = Chapter.objects.filter(subject=subject, name=chapter, delete=False).first() if not chapter: raise CustomError(u'第%d行:章节为无效数据' % line) try: scores = int(scores) except: raise CustomError(u'第%d行:分数为无效数据' % line) ExamQuestion.objects.create( chapter=chapter, type=ExamQuestion.JUDGMENT, difficulty=difficulty, scores=scores, title=title, judgment=judgment, analysis=analysis, create_user=request.user, create_time=timezone.now() ) line += 1 SysLog.objects.addnew(request.user, SysLog.IMPORT, u"导入[%d]道判断题" % (line - 2)) except CustomError as e: return response_error(e.get_error_msg()) except Exception as e: traceback.print_exc() return response_error(str(e)) return response_ok() @action(methods=['post'], detail=False) def import_discuss(self, request): file = request.FILES.get('excel_file') status = ExcelImporter.validity(file) if not status['success']: return response_error(status['errors']) data = status['data'] line = 2 try: with transaction.atomic(): for row in data: try: title = row['试题内容'] subject = row['科目'] chapter = row['章节'] difficulty_text = row['难度'] scores = row['分数'] analysis = row['解析'] discuss_text = row['正确答案'] except: raise CustomError(u'模版文件不正确,请检查模版文件或重新下载') if not title: raise CustomError(u'第%d行:试题内容不能为空' % line) if not subject: raise CustomError(u'第%d行:科目不能为空' % line) if not chapter: raise CustomError(u'第%d行:章节不能为空' % line) if not difficulty_text: raise CustomError(u'第%d行:难度不能为空' % line) if not scores: raise CustomError(u'第%d行:分数不能为空' % line) if not discuss_text: raise CustomError(u'第%d行:答案不能为空' % line) if difficulty_text == u'简单': difficulty = ExamQuestion.SIMPLE elif difficulty_text == u'中等': difficulty = ExamQuestion.MID elif difficulty_text == u'困难': difficulty = ExamQuestion.HARD else: raise CustomError(u'第%d行:难度为无效数据' % line) subject = Subject.objects.filter(name=subject, delete=False).first() if not subject: raise CustomError(u'第%d行:科目为无效数据' % line) chapter = Chapter.objects.filter(subject=subject, name=chapter, delete=False).first() if not chapter: raise CustomError(u'第%d行:章节为无效数据' % line) try: scores = int(scores) except: raise CustomError(u'第%d行:分数为无效数据' % line) ExamQuestion.objects.create( chapter=chapter, type=ExamQuestion.DISCUSS, difficulty=difficulty, scores=scores, title=title, discuss_answer=discuss_text, analysis=analysis, create_user=request.user, create_time=timezone.now() ) line += 1 SysLog.objects.addnew(request.user, SysLog.IMPORT, u"导入[%d]道论述题" % (line - 2)) except CustomError as e: return response_error(e.get_error_msg()) except Exception as e: traceback.print_exc() return response_error(str(e)) return response_ok() @action(methods=['post'], detail=False) @transaction.atomic def import_word(self, request): file = request.FILES.get('word_file') data = WordImporter.parse(file) main_data = request.data.get('main_data') main_data = json.loads(main_data) # print(data) subject_id = main_data.get('subject') subject = Subject.objects.filter(id=subject_id).first() if not subject: return response_error('科目不存在') ExamQuestion.batch_create(subject, data, request.user) return response_ok(data) class ExamQuestionFeedbackViewSet(ReadOnlyModelViewSet): permission_classes = [IsAdministrator, ] queryset = ExamQuestionFeedback.objects.filter().order_by('status', '-id') serializer_class = ExamQuestionFeedbackSerializer def filter_queryset(self, queryset): f = ExamQuestionFeedbackFilter(self.request.GET, queryset=queryset) return f.qs