我们从Python开源项目中,提取了以下41个代码示例,用于说明如何使用rest_framework.status.HTTP_500_INTERNAL_SERVER_ERROR。
def test_500_error_context_logged_out(self): """ Assert context values for 500 error page when logged out """ # case with specific page with patch('ui.templatetags.render_bundle._get_bundle') as get_bundle: response = self.client.get('/500/') assert response.context['authenticated'] is False assert response.context['name'] == "" assert response.context['is_public'] is True assert response.context['has_zendesk_widget'] is True self.assertContains(response, 'Share this page', status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) bundles = [bundle[0][1] for bundle in get_bundle.call_args_list] assert set(bundles) == { 'common', 'public', 'sentry_client', 'style', 'style_public', 'zendesk_widget', }
def test_add_channel_failed_create_channel(mock_staff_client, mocker): """If client.channels.create fails an exception should be raised""" response_500 = Response() response_500.status_code = statuses.HTTP_500_INTERNAL_SERVER_ERROR mock_staff_client.channels.create.return_value.raise_for_status.side_effect = HTTPError(response=response_500) with pytest.raises(ChannelCreationException) as ex: api.add_channel( Search.from_dict({}), "title", "name", "public_description", "channel_type", 123, 456, ) assert ex.value.args[0] == "Error creating channel name" mock_staff_client.channels.create.return_value.raise_for_status.assert_called_with() assert mock_staff_client.channels.create.call_count == 1 assert PercolateQuery.objects.count() == 0 assert Channel.objects.count() == 0
def test_generate_mailgun_response_json_with_failed_json_call(self): """ Tests that generate_mailgun_response_json() returns without erroring if Response.json() call fails for non 401 status code """ # Response.json() error response = Mock( spec=Response, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, json=lambda: (_ for _ in []).throw(ValueError), # To get .json() to throw ValueError reason="reason" ) self.assertDictEqual( generate_mailgun_response_json(response), {"message": response.reason} )
def test_view_response_improperly_configured(self): """ Test that the SearchResultMailView will raise ImproperlyConfigured if mailgun returns 401, which results in returning 500 since micromasters.utils.custom_exception_handler catches ImproperlyConfigured """ with patch( 'mail.views.get_all_query_matching_emails', autospec=True, return_value=self.email_results ), patch( 'mail.views.MailgunClient' ) as mock_mailgun_client, patch( 'mail.views.get_mail_vars', autospec=True, return_value=self.email_vars, ) as mock_get_mail_vars: mock_mailgun_client.send_batch.side_effect = ImproperlyConfigured resp = self.client.post(self.search_result_mail_url, data=self.request_data, format='json') assert resp.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR mock_get_mail_vars.assert_called_once_with(self.email_results)
def heartbeat(request): # pylint: disable=unused-argument """ View to check if database is reachable and ready to handle requests. """ try: db_status() except DatabaseError: return JsonResponse( {'OK': False}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) return JsonResponse( {'OK': True}, status=status.HTTP_200_OK )
def logs(self, request, **kwargs): app = self.get_object() try: return Response(app.logs(request.query_params.get('log_lines', str(settings.LOG_LINES))), status=status.HTTP_200_OK, content_type='text/plain') except requests.exceptions.RequestException: return Response("Error accessing logs for {}".format(app.id), status=status.HTTP_500_INTERNAL_SERVER_ERROR, content_type='text/plain') except EnvironmentError as e: if e.message == 'Error accessing deis-logger': return Response("Error accessing logs for {}".format(app.id), status=status.HTTP_500_INTERNAL_SERVER_ERROR, content_type='text/plain') else: return Response("No logs for {}".format(app.id), status=status.HTTP_204_NO_CONTENT, content_type='text/plain')
def comment_post(request, pid, co, owner_id): """ https://themoviebook.herokuapp.com/posts/comment/userpid=<>/postid=<>/rating=<>/ GET request: Adds comment """ try: profile = request.user.profile post_owner = UserProfile.objects.get(pk=owner_id) post = post_owner.posts.get(pk=pid) post.comment(profile, co) return JsonResponse({'detail': 'successful'}, safe=False, status=status.HTTP_200_OK) except Exception: return JsonResponse({'detail': 'failed'}, safe=False, status=status.HTTP_500_INTERNAL_SERVER_ERROR) # ['GET']
def custom_exception_handler(exc, context): """ Custom django rest framework exception handler that includes 'status_code' field in the responses. """ # Call REST framework's default exception handler first response = exception_handler(exc, context) # Catch unexpected exceptions if response is None: if isinstance(exc, Exception): data = {'detail': 'A server error occurred.'} set_rollback() response = Response(data, status=status.HTTP_500_INTERNAL_SERVER_ERROR) # Now add the HTTP status code to the response. if response is not None: response.data['status_code'] = response.status_code return response
def create(self, validated_data): if self._get_cache() is not None: raise Throttled() callback = validated_data['callback'] signed_data = signing.dumps(dict(callback=callback, user_id=self.user.pk)) callback = add_params(callback, sign=signed_data) email_message = get_password_reset_email(self.user, callback) try: email_message.send() except smtplib.SMTPServerDisconnected as e: raise serializers.ValidationError( 'Mail sending timeout.', code=status.HTTP_500_INTERNAL_SERVER_ERROR) except smtplib.SMTPException as e: raise serializers.ValidationError( 'Unknown SMTP error: %s.' % str(e), code=status.HTTP_500_INTERNAL_SERVER_ERROR) else: self._set_cache() return 'OK'
def handle_exception(self, exc): exception_handler = self.settings.EXCEPTION_HANDLER context = self.get_exception_handler_context() response_data = exception_handler(exc, context) if response_data is None: response_data = { 'detail': 'Internal server error.', 'status': status.HTTP_500_INTERNAL_SERVER_ERROR } else: status_code = getattr(exc, 'status_code', None) if status_code is not None: response_data.update({'status': status_code}) self.send_exception(response_data)
def handle_exception(self, exc): exception_handler = self.settings.EXCEPTION_HANDLER context = self.get_exception_handler_context() response_data = exception_handler(exc, context) if response_data is None: status_code = rest_framework_status.HTTP_500_INTERNAL_SERVER_ERROR logger.error(exc) else: status_code = getattr( exc, 'status_code', rest_framework_status.HTTP_500_INTERNAL_SERVER_ERROR ) self.route_send( self.request.reply_channel, data=response_data if response_data else {'detail': 'Internal server error.'}, status=status_code )
def direct_channel(self, request): """ Gets or creates a direct channel to the user --- request_serializer: DirectChannelSerializer response_serializer: ChannelSerializer """ serializer = self.get_serializer(data=request.data) channel = None if serializer.is_valid(raise_exception=True): user = serializer.validated_data['user'] channel = get_or_create_direct_channel(request.user, user) if not channel: return Response( {'status': "Couldn't get or create a direct channel"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) response_serializer = ChannelSerializer(channel) return Response(response_serializer.data)
def task_channel(self, request, task_id=None): """ Gets or creates task channel --- response_serializer: ChannelSerializer """ task = get_object_or_404(Task.objects.all(), pk=task_id) channel = None if task: channel = get_or_create_task_channel(request.user, task) if not channel: return Response( {'status': "Couldn't create task channel"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) response_serializer = ChannelSerializer(channel, context={'request': request}) return Response(response_serializer.data)
def post(self, request, token): serializer = UpdateSerializer(data=request.data) if serializer.is_valid(): serializer.save() try: bot = Bot.objects.get(token=token) bot.handle(Update.de_json(request.data, bot._bot)) except Bot.DoesNotExist: logger.warning("Token %s not associated to a bot" % token) return Response(serializer.errors, status=status.HTTP_404_NOT_FOUND) except: exc_info = sys.exc_info() traceback.print_exception(*exc_info) logger.error("Error processing %s for token %s" % (request.data, token)) return Response(serializer.errors, status=status.HTTP_500_INTERNAL_SERVER_ERROR) else: return Response(serializer.data, status=status.HTTP_200_OK) logger.error("Validation error: %s from message %s" % (serializer.errors, request.data)) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def __init__(self, request, action): self.Code = status.HTTP_500_INTERNAL_SERVER_ERROR self.Result = {} self.Errors = [] self.request = request self.action = action self.input_data = self.request.data
def process(self): # Must be overwriten self.setCode(status.HTTP_500_INTERNAL_SERVER_ERROR)
def __init__(self, request, endpoint, action): self.Code = status.HTTP_500_INTERNAL_SERVER_ERROR self.Result = {} self.Errors = [] has_callback = request.GET.get('callback', None) if has_callback: self.Callback = has_callback self.setCode(self.process(request, endpoint, action))
def test_500_error_context_logged_in(self): """ Assert context values for 500 error page when logged in """ with mute_signals(post_save): profile = self.create_and_login_user() self.client.force_login(profile.user) with override_settings(EMAIL_SUPPORT='support'), patch( 'ui.templatetags.render_bundle._get_bundle' ) as get_bundle: response = self.client.get('/500/') assert response.context['authenticated'] is True assert response.context['name'] == profile.preferred_name assert response.context['support_email'] == 'support' assert response.context['is_public'] is True assert response.context['has_zendesk_widget'] is True self.assertContains(response, 'Share this page', status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) bundles = [bundle[0][1] for bundle in get_bundle.call_args_list] assert set(bundles) == { 'common', 'public', 'sentry_client', 'style', 'style_public', 'zendesk_widget', }
def test_enrollment_fails(self, mock_refresh, mock_edx_enr): # pylint: disable=unused-argument """ Test error when backend raises an exception """ error = HTTPError() error.response = MagicMock() error.response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR mock_edx_enr.side_effect = error resp = self.client.post(self.url, {'course_id': self.course_id}, format='json') assert resp.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR # the response has a structure like {"error": "<message>"} assert isinstance(resp.data, dict) assert 'error' in resp.data assert mock_edx_enr.call_count == 1 # assert just the second argument, since the first is `self` assert mock_edx_enr.call_args[0][1] == self.course_id # if instead edX returns a 400 error, an exception is raised by # the view and the user gets a different error message error.response.status_code = status.HTTP_400_BAD_REQUEST mock_edx_enr.side_effect = error resp = self.client.post(self.url, {'course_id': self.course_id}, format='json') assert resp.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR assert isinstance(resp.data, list) assert len(resp.data) == 1 assert PossiblyImproperlyConfigured.__name__ in resp.data[0] # if the error from the call to edX is is not HTTPError, the user gets a normal json error mock_edx_enr.side_effect = ValueError() resp = self.client.post(self.url, {'course_id': self.course_id}, format='json') assert resp.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR # the response has a structure like {"error": "<message>"} assert isinstance(resp.data, dict) assert 'error' in resp.data
def test_improperly_configured(self, exception_to_raise, mock_sentry): """ Test a standard exception not handled by default by the rest framework """ exp = exception_to_raise('improperly configured') resp = custom_exception_handler(exp, self.context) assert resp.status_code == status.HTTP_500_INTERNAL_SERVER_ERROR assert resp.data == ['{0}: improperly configured'.format(exception_to_raise.__name__)] mock_sentry.assert_called_once_with()
def get(self, request): if "number" not in request.GET: return Response("No number specified.", status=status.HTTP_400_BAD_REQUEST) query_num = request.GET["number"] try: n = Number.objects.get(number=query_num) if n.state == 'inuse': if not n.subscriber: # See issue 260. print 'Error: number %s has no subscriber' % n.number return Response( 'Number %s has no associated subscriber.' % n.number, status=status.HTTP_500_INTERNAL_SERVER_ERROR) if not n.subscriber.bts: # See issue 386. print 'Error: subscriber "%s" has no BTS' % n.subscriber return Response( 'Subscriber "%s" has no BTS.' % n.subscriber, status=status.HTTP_404_NOT_FOUND) # Strip the protocol field and just return the rest, # removing any trailing slash. bts_info = urlparse.urlparse(n.subscriber.bts.inbound_url) bts_netloc = bts_info.netloc bts_host = bts_info.hostname result = { 'netloc': bts_netloc, 'number': n.number, 'hostname': bts_host, 'source': n.kind, 'owner': n.network.id } return Response(result, status=status.HTTP_200_OK) else: return Response("No such number", status=status.HTTP_404_NOT_FOUND) except Number.DoesNotExist: return Response("No such number", status=status.HTTP_404_NOT_FOUND)
def get(self, request, *args, **kwargs): health_check = HealthCheck() result = health_check.run() status_code = status.HTTP_500_INTERNAL_SERVER_ERROR if result.is_healthy: status_code = status.HTTP_200_OK return Response(result.data, status=status_code)
def like_post(request, pid, ra, owner_id): """ https://themoviebook.herokuapp.com/posts/like/userpid=<>/postid=<>/rating=<>/ GET request: adds Like """ try: profile = request.user.profile post_owner = UserProfile.objects.get(pk=owner_id) post = post_owner.posts.get(pk=pid) post.like(profile, ra) return JsonResponse({'detail': 'successful'}, safe=False, status=status.HTTP_200_OK) except Exception: return JsonResponse({'detail': 'failed'}, safe=False, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def api_500(request): return RenderedResponse({'detail': 'Internal server error', 'request':request, 'status': status.HTTP_500_INTERNAL_SERVER_ERROR})
def deploy_view(request): if request.data['payload']['outcome'] == 'success': build_info_obj = save_build_object(request) clone_or_pull_repo(request) config_dict = open_just_config(request.data['payload']['reponame']) if (config_dict == None): return Response(status=status.HTTP_500_INTERNAL_SERVER_ERROR) allowed_branches = config_dict['Deploy'].keys() deploy_build(request, config_dict, allowed_branches) return Response(status=status.HTTP_200_OK)
def custom_exception_handler(exc, context): """ Formats REST exceptions like: { "error": "error_code", "error_description": "description of the error", } :param exc: Exception :return: Response """ # Call REST framework's default exception handler first, # to get the standard error response. response = exception_handler(exc, context) if not response: # Unhandled exceptions (500 internal server errors) response = Response(data={ 'error': 'server_error', 'error_description': unicode(exc), }, status=status.HTTP_500_INTERNAL_SERVER_ERROR) return response if hasattr(exc, 'default_error'): response.data['error'] = exc.default_error else: response.data['error'] = 'api_error' if hasattr(exc, 'default_detail'): response.data['error_description'] = exc.default_detail elif 'detail' in response.data: response.data['error_description'] = response.data['details'] if 'detail' in response.data: del response.data['detail'] return response
def create(self, request, *args, **kwargs): post_pk = kwargs['pk'] like_user = request.user.pk if PostLike.objects.filter(post=post_pk, like_user=like_user): return Response({"detail": "?? ???? ?? ????"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) request.data._mutable = True request.data['like_user'] = request.user.pk request.data['post'] = post_pk return super().create(request, *args, **kwargs)
def destroy(self, request, *args, **kwargs): post_pk = kwargs['pk'] like_user = request.user.pk if PostLike.objects.filter(post=post_pk, like_user=like_user): instance = PostLike.objects.get(post=post_pk, like_user=like_user) self.perform_destroy(instance) return Response(status=status.HTTP_204_NO_CONTENT) else: return Response({"detail": "?? ???? ??? ?? ????"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def create(self, request, *args, **kwargs): post = kwargs['pk'] bookmark_user = request.user.pk if PostBookMark.objects.filter(post=post, bookmark_user=bookmark_user): return Response({"detail": "?? ???? ????"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) request.data._mutable = True request.data['bookmark_user'] = request.user.pk request.data['post'] = kwargs['pk'] return super().create(request, *args, **kwargs)
def destroy(self, request, *args, **kwargs): post = kwargs['pk'] bookmark_user = request.user.pk if PostBookMark.objects.filter(post=post, bookmark_user=bookmark_user): instance = PostBookMark.objects.get(post=post, bookmark_user=bookmark_user) self.perform_destroy(instance) return Response(status=status.HTTP_204_NO_CONTENT) else: return Response({"detail": "?? ????? ?? ????"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def post(self, request, hook_id): """ Process Messenger webhook. 1. Get an enabled Messenger bot 3. For each message serialize 4. For each message create :class:`MessengerMessage <permabots.models.messenger_api.MessengerMessage>` 5. Delay processing of each message to a task 6. Response provider """ try: bot = caching.get_or_set(MessengerBot, hook_id) except MessengerBot.DoesNotExist: logger.warning("Hook id %s not associated to a bot" % hook_id) return Response(status=status.HTTP_404_NOT_FOUND) logger.debug("Messenger Bot %s attending request %s" % (bot, request.data)) webhook = Webhook.from_json(request.data) for webhook_entry in webhook.entries: for webhook_message in webhook_entry.messaging: try: if webhook_message.is_delivery: raise OnlyTextMessages message = self.create_message(webhook_message, bot) if bot.enabled: logger.debug("Messenger Bot %s attending request %s" % (bot, message)) handle_messenger_message.delay(message.id, bot.id) else: logger.error("Message %s ignored by disabled bot %s" % (message, bot)) except OnlyTextMessages: logger.warning("Not text message %s for bot %s" % (message, hook_id)) except: exc_info = sys.exc_info() traceback.print_exception(*exc_info) logger.error("Error processing %s for bot %s" % (webhook_message, hook_id)) return Response(status=status.HTTP_500_INTERNAL_SERVER_ERROR) return Response(status=status.HTTP_200_OK)
def post(self, request, hook_id): """ Process Telegram webhook. 1. Serialize Telegram message 2. Get an enabled Telegram bot 3. Create :class:`Update <permabots.models.telegram_api.Update>` 5. Delay processing to a task 6. Response provider """ serializer = UpdateSerializer(data=request.data) if serializer.is_valid(): try: bot = caching.get_or_set(TelegramBot, hook_id) except TelegramBot.DoesNotExist: logger.warning("Hook id %s not associated to an bot" % hook_id) return Response(serializer.errors, status=status.HTTP_404_NOT_FOUND) try: update = self.create_update(serializer, bot) if bot.enabled: logger.debug("Telegram Bot %s attending request %s" % (bot.token, request.data)) handle_update.delay(update.id, bot.id) else: logger.error("Update %s ignored by disabled bot %s" % (update, bot.token)) except OnlyTextMessages: logger.warning("Not text message %s for bot %s" % (request.data, hook_id)) return Response(status=status.HTTP_200_OK) except: exc_info = sys.exc_info() traceback.print_exception(*exc_info) logger.error("Error processing %s for bot %s" % (request.data, hook_id)) return Response(serializer.errors, status=status.HTTP_500_INTERNAL_SERVER_ERROR) else: return Response(serializer.data, status=status.HTTP_200_OK) logger.error("Validation error: %s from message %s" % (serializer.errors, request.data)) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def post(self, request, key): """ Process notitication hooks: 1. Obtain Hook 2. Check Auth 3. Delay processing to a task 4. Respond requester """ try: hook = Hook.objects.get(key=key, enabled=True) except Hook.DoesNotExist: msg = _("Key %s not associated to an enabled hook or bot") % key logger.warning(msg) return Response(msg, status=status.HTTP_404_NOT_FOUND) if hook.bot.owner != request.user: raise exceptions.AuthenticationFailed() try: parsed_data = request.data logger.debug("Hook %s attending request %s" % (hook, parsed_data)) handle_hook.delay(hook.id, parsed_data) except ParseError as e: return Response(str(e), status=status.HTTP_400_BAD_REQUEST) except: exc_info = sys.exc_info() traceback.print_exception(*exc_info) msg = _("Error processing %s for key %s") % (request.data, key) logger.error(msg) return Response(msg, status=status.HTTP_500_INTERNAL_SERVER_ERROR) else: return Response(status=status.HTTP_200_OK)
def test_get_internal_error(self, mock_obj): mock_obj.return_value = ANSWER_INTERNAL_ERROR self.client.force_authenticate(user=self.user) response = self.client.get(self.url) self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
def devino_request(self, serializer=None): if serializer: serializer.is_valid(raise_exception=True) json_data = json.dumps( serializer.validated_data if serializer else None, default=date_handler ) devino_request = models.DevinoRequest.objects.create(api_resource=self.api_resource, data=json_data) try: if serializer: answer = self.api_resource_lib(**serializer.validated_data) else: answer = self.api_resource_lib() models.DevinoAnswer.objects.create( code=answer.code, description=answer.description, result=answer.result, request=devino_request, ) if answer.code == consts.STATUS_BAD_REQUEST: status_response = status.HTTP_400_BAD_REQUEST elif answer.code == consts.STATUS_ERROR_API: status_response = status.HTTP_500_INTERNAL_SERVER_ERROR else: status_response = status.HTTP_200_OK return Response({'code': answer.code, 'description': answer.description, 'result': answer.result}, status=status_response) except DevinoException as ex: error = models.DevinoAnswer.objects.create( code=ex.error.code, description=ex.error.description, request=devino_request, is_fail=True, ) return Response({'code': error.code, 'description': error.description}, status=status.HTTP_400_BAD_REQUEST)
def support_channel(self, request): """ Gets or creates a direct channel for the current user or customer --- request_serializer: SupportChannelSerializer response_serializer: ChannelSerializer """ serializer = self.get_serializer(data=request.data) channel = None if serializer.is_valid(raise_exception=True): if request.user.is_authenticated(): # Create support channel for logged in user channel = get_or_create_support_channel(request.user) else: # Create support channel for anonymous user channel_id = serializer.validated_data.get('id', None) if channel_id: channel = get_object_or_404(self.get_queryset(), pk=channel_id) email = serializer.validated_data.get('email', None) name = serializer.validated_data.get('name', None) if email: customer = Inquirer.objects.create(name=name, email=email) channel.content_object = customer channel.save() else: return Response( {'email': "Couldn't get or create a support channel"}, status=status.HTTP_400_BAD_REQUEST ) else: channel = Channel.objects.create(type=CHANNEL_TYPE_SUPPORT)#, content_object=customer) if not channel: return Response( {'status': "Couldn't get or create a support channel"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) response_serializer = ChannelSerializer(channel, context={'request': request}) return Response(response_serializer.data)
def developer_channel(self, request): """ Gets or creates a developer channel for the current user --- request_serializer: DeveloperChannelSerializer response_serializer: ChannelSerializer """ serializer = self.get_serializer(data=request.data) channel = None if serializer.is_valid(raise_exception=True): # Create developer channel subject = serializer.validated_data['subject'] message = serializer.validated_data['message'] channel = create_channel( request.user, channel_type=CHANNEL_TYPE_DEVELOPER, subject=subject, messages=[ dict(user=request.user, body=message) ] ) if not channel: return Response( {'status': "Couldn't get or create a support channel"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) response_serializer = ChannelSerializer(channel, context={'request': request}) return Response(response_serializer.data)
def post(self, request, format=None): try: rfid = Rfid.objects.get(code=request.data.get('rfid')) device = Device.objects.get(identifier=request.data.get('device')) deviceAuth = DeviceAuth.objects.get(device=device.identifier, rfid=rfid.id) except ObjectDoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) except ValidationError as e: # except: # logger.exception("An error occurred") return Response(status=status.HTTP_500_INTERNAL_SERVER_ERROR) serializer = AuthSerializer( instance={'name': device.name, 'rfid': rfid.code, 'device': device.identifier}) return Response(serializer.data, status=200)
def post(self, request, hook_id): """ Process Kik webhook: 1. Get an enabled Kik bot 2. Verify Kik signature 3. Serialize each message 4. For each message create :class:`KikMessage <permabots.models.kik_api.KikMessage>` and :class:`KikUser <permabots.models.kik_api.KikUser>` 5. Delay each message processing to a task 6. Response provider """ try: bot = caching.get_or_set(KikBot, hook_id) except KikBot.DoesNotExist: logger.warning("Hook id %s not associated to a bot" % hook_id) return Response(status=status.HTTP_404_NOT_FOUND) signature = request.META.get('HTTP_X_KIK_SIGNATURE') if signature: signature.encode('utf-8') if not bot._bot.verify_signature(signature, request.stream.body): logger.debug("Kik Bot data %s not verified %s" % (request.data, signature)) return Response(status=403) logger.debug("Kik Bot data %s verified" % (request.data)) for kik_message in request.data['messages']: serializer = KikMessageSerializer(data=kik_message) logger.debug("Kik message %s serialized" % (kik_message)) if serializer.is_valid(): try: if not self.accepted_types(serializer): raise OnlyTextMessages message = self.create_message(serializer, bot) if bot.enabled: logger.debug("Kik Bot %s attending request %s" % (bot, kik_message)) handle_message.delay(message.id, bot.id) else: logger.error("Message %s ignored by disabled bot %s" % (message, bot)) except OnlyTextMessages: logger.warning("Not text message %s for bot %s" % (kik_message, hook_id)) return Response(status=status.HTTP_200_OK) except: exc_info = sys.exc_info() traceback.print_exception(*exc_info) logger.error("Error processing %s for bot %s" % (kik_message, hook_id)) return Response(serializer.errors, status=status.HTTP_500_INTERNAL_SERVER_ERROR) else: logger.error("Validation error: %s from kik message %s" % (serializer.errors, kik_message)) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) return Response(serializer.data, status=status.HTTP_200_OK)
def send_form(request): """ Receives data from a contact form and creates a GPG-encrypted email including that data. Depending on the data's given confidentiality the email is sent to pre-defined receivers. """ form = ContactForm(request.POST, request.FILES) if not form.is_valid(): return Response({'status': 'error', 'errors': form.errors}, status=status.HTTP_400_BAD_REQUEST) data = form.cleaned_data uploaded_files = request.FILES.getlist('files') if uploaded_files: res = form.files_are_valid(request) if res is not True: return Response({'status': 'error', 'errors': res}, status=status.HTTP_400_BAD_REQUEST) logger.debug("Contact data includes %i uploaded file(s).", len(uploaded_files)) subject = "Neue Nachricht von Streetcoverage" body = EMAIL_TEMPLATE % ( data['name'] or "(kein Name angegeben)", data['email'] or "(keine Emailadresse angegeben)", data['subject'], "ja" if data['journalist'] else "nein", "ja" if data['confidential'] else "nein", data['message'] ) if data['confidential']: receivers = settings.EMAIL_TO_CONTACT_CONFIDENTIAL else: receivers = settings.EMAIL_TO_CONTACT_NON_CONFIDENTIAL email = GPGEmailMessage(subject, body, settings.EMAIL_FROM_CONTACT, receivers) for uploaded_file in uploaded_files: # uploaded_file is an InMemoryUploadedFile created by MemoryFileUploadHandler (see setting # FILE_UPLOAD_HANDLERS). Its mode includes 'b' so its content is a byte string. # Django's attach functionality internally invokes encode() when the file consists of text. # Byte strings however do not have an encode() method, so we decode the byte string to a normal string. # Alternative: instead of attach() we could use attach_file() which has been made aware of types: # https://github.com/django/django/commit/c6da621def9bc04a5feacd5652c7f9d84f48ad2c # This would require writing the uploaded file to disk, e.g. using method(). # Note that TemporaryUploadedFile created by TemporaryFileUploadHandler does write files to disk, but uses # tempfile.NamedTemporaryFile's default mode 'w+b', so we could create a subclass of both classes as # another alternative. if uploaded_file.content_type and uploaded_file.content_type.split('/')[0] == 'text': email.attach(uploaded_file.name, uploaded_file.read().decode('utf-8'),) else: email.attach(uploaded_file.name, uploaded_file.read(),) try: email.send() logger.info("Contact email has been sent.") except GPGException: return Response({'status': 'error', 'errors': 'Email could not be sent.'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) return Response({'status': 'success'})