我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用rest_framework.status.HTTP_404_NOT_FOUND。
def time_entry_item(request, id): try: item = TimeEntry.objects.get(user=request.user, id=id) except NoCurrentEntry: return Response(status=status.HTTP_404_NOT_FOUND) user = request.user if request.method == 'GET': serializer = TimeEntrySerializer(request.user, item) return Response(serializer.data) elif request.method == 'PUT': serializer = TimeEntrySerializer(request.user, item, data=request.data) if (serializer.is_valid()): serializer.save() return Response(serializer.data) return Response(serializer.errors) elif request.method == 'DELETE': pass
def delete(self, request, id, kind, format=None): """ WARNING: Force delete """ if kind == 'badge': kind = get_object_or_404(Badge, pk=id) elif kind == 'category': kind = get_object_or_404(Category, pk=id) elif kind == 'event': kind = get_object_or_404(Event, pk=id) elif kind == 'keyword': kind = get_object_or_404(Keyword, pk=id) elif kind == 'location': kind = get_object_or_404(Location, pk=id) elif kind == 'position': kind = get_object_or_404(Position, pk=id) elif kind == 'role': kind = get_object_or_404(Role, pk=id) else: return Response(status=status.HTTP_404_NOT_FOUND) kind.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def _test_pagination(self): """Test pagination.""" self.authenticate() publishers_url = reverse('publisherdocument-list', kwargs={}) books_url = reverse('bookdocument-list', kwargs={}) data = {} invalid_page_url = books_url + '?page=3&page_size=30' invalid_response = self.client.get(invalid_page_url, data) self.assertEqual( invalid_response.status_code, status.HTTP_404_NOT_FOUND ) valid_page_url = publishers_url + '?limit=5&offset=8' # Check if response now is valid valid_response = self.client.get(valid_page_url, data) self.assertEqual(valid_response.status_code, status.HTTP_200_OK) # Check totals self.assertEqual(len(valid_response.data['results']), 5)
def process(self): if self.requires_action: if settings.DEBUG: self.setCode(getattr(self, str(self.request.method.lower() + "_" + self.action))()) else: try: self.setCode(getattr(self, str(self.request.method.lower() + "_" + self.action))()) except AttributeError: self.addError("not_found") self.setCode(status.HTTP_404_NOT_FOUND) else: try: self.setCode(getattr(self, str(self.request.method.lower() + "_process"))()) except AttributeError: self.addError("not_found") self.setCode(status.HTTP_404_NOT_FOUND)
def get_process_single(self, id): ##### # Retrieves a single object ##### try: clean_id = int(id) except: return status.HTTP_400_BAD_REQUEST try: object = self.Meta.Model.objects.get(id=clean_id) except ObjectDoesNotExist: return status.HTTP_404_NOT_FOUND self.setResult(object.get_as_dict()) return status.HTTP_200_OK
def put_process_single(self, id): ##### # Updates an Object ##### try: clean_id = int(id) except: return status.HTTP_400_BAD_REQUEST try: object = self.Meta.Model.objects.get(id=clean_id) except: return status.HTTP_404_NOT_FOUND serializer = self.Meta.Serializer(object, data=self.getInput(), partial=True) if serializer.is_valid(): serializer.save() self.setResult(serializer.data) return status.HTTP_202_ACCEPTED
def delete_process_single(self, id): ##### # Deletes a single product ##### try: clean_id = int(id) except: return status.HTTP_400_BAD_REQUEST try: object = self.Meta.Model.objects.get(id=clean_id) except ObjectDoesNotExist: return status.HTTP_404_NOT_FOUND object.delete() return status.HTTP_200_OK
def user_detail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = User.objects.get(id=id) except User.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = UserSerializer(snippet) return Response(serializer.data) elif request.method == 'PUT': serializer = UserSerializer(snippet, data=request.data) if serializer.is_valid(): serializer.save() return Response(serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) elif request.method == 'DELETE': if not request.user.has_perm('OpsManage.delete_user'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def assetsLog_detail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = Log_Assets.objects.get(id=id) except Log_Assets.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = AssetsLogsSerializer(snippet) return Response(serializer.data) elif request.method == 'DELETE' and request.user.has_perm('OpsManage.delete_log_assets'): if not request.user.has_perm('OpsManage.delete_log_assets'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def deploy_detail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = Project_Config.objects.get(id=id) except Project_Config.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = ProjectConfigSerializer(snippet) return Response(serializer.data) elif request.method == 'DELETE': if not request.user.has_perm('OpsManage.delete_project_config'): return Response(status=status.HTTP_403_FORBIDDEN) recordProject.delay(project_user=str(request.user),project_id=id,project_name=snippet.project_name,project_content="????") snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def deployLogs_detail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = Log_Project_Config.objects.get(id=id) except Log_Project_Config.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = DeployLogsSerializer(snippet) return Response(serializer.data) elif request.method == 'DELETE': if not request.user.has_perm('OpsManage.delete_log_project_config'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def modelLogsdetail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = Log_Ansible_Model.objects.get(id=id) except Log_Ansible_Model.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = AnsibleModelLogsSerializer(snippet) return Response(serializer.data) elif request.method == 'DELETE': if not request.user.has_perm('OpsManage.can_delete_log_ansible_model'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def playbookLogsdetail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = Log_Ansible_Playbook.objects.get(id=id) except Log_Ansible_Playbook.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = AnsiblePlaybookLogsSerializer(snippet) return Response(serializer.data) elif request.method == 'DELETE': if not request.user.has_perm('OpsManage.delete_log_ansible_playbook'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def cron_detail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = Cron_Config.objects.get(id=id) except Cron_Config.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = CronSerializer(snippet) return Response(serializer.data) elif request.method == 'PUT': serializer = CronSerializer(snippet, data=request.data) if serializer.is_valid(): serializer.save() return Response(serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) elif request.method == 'DELETE': if not request.user.has_perm('OpsManage.delete_service_assets'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def cronLogsdetail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = Log_Cron_Config.objects.get(id=id) except Log_Cron_Config.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = CronLogsSerializer(snippet) return Response(serializer.data) elif request.method == 'DELETE': if not request.user.has_perm('OpsManage.delete_log_cron_config'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def results(self, request, pk=None): job = self.get_object() if job.status == AnalysisJob.Status.COMPLETE: results = OrderedDict([ ('census_block_count', job.census_block_count), ('census_blocks_url', job.census_blocks_url), ('connected_census_blocks_url', job.connected_census_blocks_url), ('destinations_urls', job.destinations_urls), ('tile_urls', job.tile_urls), ('overall_scores', job.overall_scores), ('overall_scores_url', job.overall_scores_url), ('score_inputs_url', job.score_inputs_url), ('ways_url', job.ways_url), ]) return Response(results, status=status.HTTP_200_OK) else: return Response(None, status=status.HTTP_404_NOT_FOUND)
def post(self, request, *args, **kwargs): if self.alias_type.upper() not in api_settings.PASSWORDLESS_AUTH_TYPES: # Only allow auth types allowed in settings. return Response(status=status.HTTP_404_NOT_FOUND) serializer = self.serializer_class(data=request.data, context={'request': request}) if serializer.is_valid(raise_exception=True): # Validate - user = serializer.validated_data['user'] # Create and send callback token success = TokenService.send_token(user, self.alias_type, **self.message_payload) # Respond With Success Or Failure of Sent if success: status_code = status.HTTP_200_OK response_detail = self.success_response else: status_code = status.HTTP_400_BAD_REQUEST response_detail = self.failure_response return Response({'detail': response_detail}, status=status_code) else: return Response(serializer.error_messages, status=status.HTTP_400_BAD_REQUEST)
def test_coupon_not_redeemable(self): """ A 404 should be returned if coupon is not redeemable """ with patch( 'ecommerce.views.is_coupon_redeemable', autospec=True ) as _is_redeemable_mock: _is_redeemable_mock.return_value = False resp = self.client.post( reverse('coupon-user-create', kwargs={'code': self.coupon.coupon_code}), data={ "username": get_social_username(self.user) }, format='json', ) assert resp.status_code == status.HTTP_404_NOT_FOUND _is_redeemable_mock.assert_called_with(self.coupon, self.user)
def vmServer_detail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = VmServer.objects.get(id=id) except VmServer.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = VmServerSerializer(snippet) return Response(serializer.data) elif request.method == 'PUT': serializer = VmServerSerializer(snippet, data=request.data) if serializer.is_valid(): serializer.save() return Response(serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) elif request.method == 'DELETE': if not request.user.has_perm('vmanageplatform.delete_vmserver'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def vmlog_detail(request, id,format=None): """ Retrieve, update or delete a server assets instance. """ try: snippet = VmLogs.objects.get(id=id) except VmLogs.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = VmLogsSerializer(snippet) return Response(serializer.data) elif request.method == 'PUT': serializer = VmLogsSerializer(snippet, data=request.data) if serializer.is_valid(): serializer.save() return Response(serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) elif request.method == 'DELETE': if not request.user.has_perm('vmanageplatform.delete_vmserver'): return Response(status=status.HTTP_403_FORBIDDEN) snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def test_get_refresth_token_detail(self): self.client.credentials( HTTP_AUTHORIZATION='JWT ' + utils.jwt_encode_handler( utils.jwt_payload_handler(self.user))) response = self.client.get(self.detail_url) self.assertEqual( response.status_code, status.HTTP_200_OK, (response.status_code, response.content) ) response = self.client.get(self.detail_url1) self.assertEqual( response.status_code, status.HTTP_404_NOT_FOUND, (response.status_code, response.content) )
def test_delete_refresth_token(self): self.client.credentials( HTTP_AUTHORIZATION='JWT ' + utils.jwt_encode_handler( utils.jwt_payload_handler(self.user))) response = self.client.delete(self.detail_url) self.assertEqual( response.status_code, status.HTTP_204_NO_CONTENT, (response.status_code, response.content) ) response = self.client.delete(self.detail_url1) self.assertEqual( response.status_code, status.HTTP_404_NOT_FOUND, (response.status_code, response.content) )
def get_corpus_status(request, name=None): if request.method == 'GET': try: corpus = Corpus.objects.get(name=name) except ObjectDoesNotExist: return HttpResponse('Could not find the specified corpus.', status=status.HTTP_404_NOT_FOUND) if corpus.current_task_id is None: return JsonResponse(data={'data': 'ready'}, status=status.HTTP_200_OK) else: res = AsyncResult(corpus.current_task_id) if res.state == cstates.SUCCESS: corpus.current_task_id = None if corpus.status == Corpus.IMPORT_RUNNING: corpus.status = Corpus.IMPORTED corpus.save() return JsonResponse(data={'data': 'ready'}, status=status.HTTP_200_OK) elif res.state == cstates.FAILURE: corpus.current_task_id = None if corpus.status == Corpus.IMPORT_RUNNING: corpus.status = Corpus.NOT_IMPORTED corpus.save() return JsonResponse(data={'data': 'error'}, status=status.HTTP_200_OK) return JsonResponse(data={'data': 'busy'}, status=status.HTTP_200_OK)
def testCanDeleteOwnedDomain(self): httpretty.enable() httpretty.register_uri(httpretty.DELETE, settings.NSLORD_PDNS_API + '/zones/' + self.ownedDomains[1].name + '.') httpretty.register_uri(httpretty.DELETE, settings.NSMASTER_PDNS_API + '/zones/' + self.ownedDomains[1].name+ '.') url = reverse('domain-detail', args=(self.ownedDomains[1].pk,)) response = self.client.delete(url) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) self.assertEqual(httpretty.last_request().method, 'DELETE') self.assertEqual(httpretty.last_request().headers['Host'], 'nsmaster:8081') httpretty.reset() httpretty.register_uri(httpretty.DELETE, settings.NSLORD_PDNS_API + '/zones/' + self.ownedDomains[1].name + '.') httpretty.register_uri(httpretty.DELETE, settings.NSMASTER_PDNS_API + '/zones/' + self.ownedDomains[1].name+ '.') response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertTrue(isinstance(httpretty.last_request(), httpretty.core.HTTPrettyRequestEmpty))
def testCanDeleteOwnedDynDomain(self): httpretty.enable() httpretty.register_uri(httpretty.DELETE, settings.NSLORD_PDNS_API + '/zones/' + self.ownedDomains[1].name + '.') httpretty.register_uri(httpretty.DELETE, settings.NSMASTER_PDNS_API + '/zones/' + self.ownedDomains[1].name + '.') url = reverse('domain-detail', args=(self.ownedDomains[1].pk,)) response = self.client.delete(url) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) # FIXME In this testing scenario, the parent domain dedyn.io does not # have the proper NS and DS records set up, so we cannot test their # deletion. httpretty.reset() httpretty.register_uri(httpretty.DELETE, settings.NSLORD_PDNS_API + '/zones/' + self.ownedDomains[1].name + '.') httpretty.register_uri(httpretty.DELETE, settings.NSMASTER_PDNS_API + '/zones/' + self.ownedDomains[1].name+ '.') response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertTrue(isinstance(httpretty.last_request(), httpretty.core.HTTPrettyRequestEmpty))
def assertIP(self, ipv4=None, ipv6=None, name=None): old_credentials = self.client._credentials['HTTP_AUTHORIZATION'] self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.password) name = name or self.username def verify_response(type_, ip): url = reverse('rrset', args=(name, '', type_,)) response = self.client.get(url) if ip is not None: self.assertEqual(response.data['records'][0], ip) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data['ttl'], 60) else: self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) verify_response('A', ipv4) verify_response('AAAA', ipv6) self.client.credentials(HTTP_AUTHORIZATION=old_credentials)
def test_scenario_put(self): # detail view for PUT (udpate) url = reverse('scenario-detail', args=[self.scenario.pk]) # foo can update self.client.login(username='foo', password='secret') data = { "name": "New Scenario (Updated)", "scene_set": [], } response = self.client.put(url, data, format='json') self.assertEqual(response.status_code, status.HTTP_200_OK) # bar cannot update self.client.login(username='bar', password='secret') data = { "name": "New Scenario (Updated)", "scene_set": [], } response = self.client.put(url, data, format='json') self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
def test_scenario_start(self, mocked_scene_method): # start view url = reverse('scenario-start', args=[self.scenario.pk]) # bar cannot see self.client.login(username='bar', password='secret') response = self.client.put(url, {}, format='json') self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertEqual(mocked_scene_method.call_count, 0) # foo can start self.client.login(username='foo', password='secret') response = self.client.put(url, {}, format='json') self.assertEqual(response.status_code, status.HTTP_200_OK) mocked_scene_method.assert_called_with( self.scenario, self.user_foo)
def post(self, request, target_id): target = get_object_or_404(Feature, pk=target_id) feature_ids = request.data.get('features') or [] features_queryset = Feature.objects.filter(id__in=feature_ids, dataset=target.dataset) # Make sure that we filtered for all feature ids if features_queryset.count() != len(feature_ids) and len(feature_ids) > 0: return Response(status=HTTP_404_NOT_FOUND, data={'detail': 'Not found.'}) result = ResultCalculationMap.objects.filter(target=target).last() slices_queryset = Slice.objects.filter(result_calculation_map=result).annotate( feature_count=Count('features')).filter(feature_count=len(feature_ids)) for feature in features_queryset.all(): slices_queryset = slices_queryset.filter(features=feature) if slices_queryset.count() == 1: output_definition = slices_queryset.last().output_definition else: return Response([]) return Response(output_definition)
def email_share(self, user, email, type, id): # Get our e-mail formatter formatter = self.get_email_formatter(type) instance = None # Attempt to get the object instance we want to share try: instance = apps.get_model(self.AVAILABLE_TYPES[type]).objects.get(id=id) except ObjectDoesNotExist: # If it doesn't exist, respond with a 404 return Response({"message": f"Object with id {id} does not exist"}, status=status.HTTP_404_NOT_FOUND) # Create our e-mail body email_body = { "to": email, "subject": f"[TalentMAP] Shared {type}", "body": formatter(instance) } # TODO: Implement actual e-mail sending here when avaiable e-mail servers are clarified # Return a 202 ACCEPTED with a copy of the email body return Response({"message": f"Position shared externally via email at {email}", "email_body": email_body}, status=status.HTTP_202_ACCEPTED)
def internal_share(self, user, email, type, id): receiving_user = None # Attempt to get the object instance we want to share try: apps.get_model(self.AVAILABLE_TYPES[type]).objects.get(id=id) except ObjectDoesNotExist: # If it doesn't exist, respond with a 404 return Response({"message": f"Object with id {id} does not exist"}, status=status.HTTP_404_NOT_FOUND) # Attempt to get the receiving user by e-mail address try: receiving_user = UserProfile.objects.get(user__email=email) except ObjectDoesNotExist: return Response({"message": f"User with email {email} does not exist"}, status=status.HTTP_404_NOT_FOUND) # Create our sharable object using the source user, receiving user, id, and model # This will auto-populate in the receiving user's received shares on their profile Sharable.objects.create(sharing_user=user.profile, receiving_user=receiving_user, sharable_id=id, sharable_model=self.AVAILABLE_TYPES[type]) return Response({"message": f"Position shared internally to user with email {email}"}, status=status.HTTP_202_ACCEPTED)
def test_favorite_action_endpoints(authorized_client, authorized_user): position = mommy.make_recipe('talentmap_api.position.tests.position') response = authorized_client.get(f'/api/v1/position/{position.id}/favorite/') assert response.status_code == status.HTTP_404_NOT_FOUND response = authorized_client.put(f'/api/v1/position/{position.id}/favorite/') assert response.status_code == status.HTTP_204_NO_CONTENT response = authorized_client.get(f'/api/v1/position/{position.id}/favorite/') assert response.status_code == status.HTTP_204_NO_CONTENT response = authorized_client.delete(f'/api/v1/position/{position.id}/favorite/') assert response.status_code == status.HTTP_204_NO_CONTENT response = authorized_client.get(f'/api/v1/position/{position.id}/favorite/') assert response.status_code == status.HTTP_404_NOT_FOUND
def snippet_detail(request, pk, format=None): """ Retrieve, update or delete a snippet instance. """ print(pk) try: snippet = Snippet.objects.get(pk=pk) except Snippet.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if request.method == 'GET': serializer = SnippetSerializer(snippet) return Response(serializer.data) elif request.method == 'PUT': serializer = SnippetSerializer(snippet, data=request.data) if serializer.is_valid(): serializer.save() return Response(serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) elif request.method == 'DELETE': snippet.delete() return Response(status=status.HTTP_204_NO_CONTENT)
def get(self, request): logger.warning("Use of deprecated API call %s" % (request.GET, )) if "uuid" not in request.GET: return Response("No uuid specified.", status=status.HTTP_400_BAD_REQUEST) query_num = request.GET["uuid"] try: network = get_network_from_user(request.user) bts = BTS.objects.get(uuid=query_num) # Strip the protocol field and just return the rest, removing any # trailing slash. bts_info = urlparse.urlparse(bts.inbound_url) result = { 'netloc': bts_info.netloc, 'hostname': bts_info.hostname, 'owner': bts.network.id } return Response(result, status=status.HTTP_200_OK) except Number.DoesNotExist: return Response("No such UUID", status=status.HTTP_404_NOT_FOUND)
def get(self, request): """"Handles GET requests.""" user_profile = models.UserProfile.objects.get(user=request.user) if not user_profile.user.is_staff: return response.Response('', status=status.HTTP_404_NOT_FOUND) numbers = models.Number.objects.all() # Configure the table of numbers. Do not show any pagination controls # if the total number of numbers is small. number_table = django_tables.NumberTable(list(numbers)) numbers_per_page = 20 paginate = False if numbers.count() > numbers_per_page: paginate = {'per_page': numbers_per_page} tables.RequestConfig(request, paginate=paginate).configure( number_table) context = { 'networks': get_objects_for_user(request.user, 'view_network', klass=models.Network), 'user_profile': user_profile, 'number_table': number_table, } # Render template. numbers_template = template.loader.get_template( 'dashboard/staff/numbers.html') html = numbers_template.render(context, request) return http.HttpResponse(html)
def delete(self, request, bts_uuid=None, number=None, format=None): """ Dis-associate a number from a BTS and mark it available. """ if not (number or bts_uuid): return Response("", status=status.HTTP_400_BAD_REQUEST) network = get_network_from_user(request.user) try: bts = models.BTS.objects.get(uuid=bts_uuid, network=network) except models.BTS.DoesNotExist: return Response("User is not associated with that BTS.", status=status.HTTP_403_FORBIDDEN) with transaction.atomic(): q = models.Number.objects.filter(number__exact=number).filter( network=bts.network) for number in q: number.state = "available" number.network = None number.subscriber = None number.save() return Response(None, status=status.HTTP_200_OK) return Response(None, status=status.HTTP_404_NOT_FOUND)
def test_wf_module_detail_get(self): # Also tests [Workflow, Module, WfModule].get workflow_id = Workflow.objects.get(name='Workflow 1').id module_id = Module.objects.get(name='Module 1').id module_version = ModuleVersion.objects.get(module=Module.objects.get(name='Module 1')) pk_wf_module = WfModule.objects.get(workflow_id=workflow_id, module_version=module_version).id notes = WfModule.objects.get(workflow_id=workflow_id, module_version=module_version).notes response = self.client.get('/api/wfmodules/%d/' % pk_wf_module) self.assertIs(response.status_code, status.HTTP_200_OK) self.assertEqual(response.data['id'], pk_wf_module) self.assertEqual(response.data['workflow'], workflow_id) self.assertEqual(response.data['notes'], notes) self.assertEqual(response.data['module_version']['module']['id'], module_id) self.assertEqual(response.data['status'], WfModule.READY) self.assertEqual(response.data['error_msg'], '') response = self.client.get('/api/wfmodules/%d/' % 10000) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) # this constrains the output API format, detects changes that would break client code
def test_workflow_duplicate_view(self): old_ids = [w.id for w in Workflow.objects.all()] # list of all current workflow ids response = self.client.get('/api/workflows/%d/duplicate' % self.workflow1.id) self.assertEqual(response.status_code, status.HTTP_201_CREATED) new_id = response.data['id'] self.assertFalse(new_id in old_ids) # created at entirely new id self.assertEqual(response.data['name'], 'Copy of ' + self.workflow1.name) new_wf = Workflow.objects.get(pk=new_id) # will fail if no Workflow created # Ensure 404 with bad id response = self.client.get('/api/workflows/%d/duplicate' % 999999) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) # Ensure 403 when another user tries to clone private workflow self.assertFalse(self.workflow1.public) self.client.force_login(self.otheruser) response = self.client.get('/api/workflows/%d/duplicate' % self.workflow1.id) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) # But another user can duplicate public workflow self.workflow1.public = True self.workflow1.save() response = self.client.get('/api/workflows/%d/duplicate' % self.workflow1.id) self.assertEqual(response.status_code, status.HTTP_201_CREATED)
def workflow_duplicate(request, pk): try: workflow = Workflow.objects.get(pk=pk) except Workflow.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if not workflow.user_authorized_read(request.user): return HttpResponseForbidden() workflow2 = workflow.duplicate(request.user) serializer = WorkflowSerializerLite(workflow2) return Response(serializer.data, status.HTTP_201_CREATED) # Undo or redo
def parameterval_event(request, pk, format=None): try: param = ParameterVal.objects.get(pk=pk) except ParameterVal.DoesNotExist: return Response(status=status.HTTP_404_NOT_FOUND) if not param.user_authorized_write(request.user): return HttpResponseForbidden() # change parameter value data = request.data dispatch_response = module_dispatch_event(param.wf_module, parameter=param, event=data, request=request) if dispatch_response: return dispatch_response return Response(status=status.HTTP_204_NO_CONTENT) # Return a parameter val that is actually an image
def list(self, request, *args, **kwargs): application = Application.objects.filter(owner=request.user).first() if application: resellers = Reseller.objects.filter(application=application) else: resellers = Reseller.objects.filter(owner=request.user) if not resellers: client_user = ClientUser.objects.filter(owner=request.user).first() if not client_user: return Response("Resellers do not exist for such account", status=HTTP_404_NOT_FOUND) queryset = [client_user.client.reseller, ] serializer = ResellerNameSerializer(queryset, many=True) return Response(serializer.data) queryset = resellers serializer = ResellerSerializer(queryset, many=True) return Response(serializer.data)
def retrieve(self, request, *args, **kwargs): """ Return particular client which owned by particular reseller """ application = Application.objects.filter(owner=request.user).first() if application: reseller = Reseller.objects.filter(name=kwargs['reseller_name'], application=application).first() else: reseller = Reseller.objects.filter(name=kwargs['reseller_name'], owner=request.user).first() if not reseller: admin = ClientUser.objects.filter(owner=request.user, admin=True).first() if not admin: return Response("Client does not exist", status=HTTP_404_NOT_FOUND) if not admin.client.name == kwargs['name']: return Response("Authorization failed", status=status.HTTP_403_FORBIDDEN) reseller = admin.client.reseller client = Client.objects.filter(reseller=reseller, name=kwargs['name']).first() if not client: return Response("Client does not exist", status=status.HTTP_404_NOT_FOUND) queryset = (client, ) serializer = ClientSerializer(queryset, many=True) return Response(serializer.data[0])
def token(self, request, **kwargs): application = Application.objects.filter(owner=request.user).first() if application: reseller = Reseller.objects.filter(name=kwargs['reseller_name'], application=application).first() else: reseller = Reseller.objects.filter(name=kwargs['reseller_name'], owner=request.user).first() if not reseller: admin = ClientUser.objects.filter(owner=request.user, admin=True).first() if not admin: return Response("Client does not exist", status=HTTP_404_NOT_FOUND) reseller = admin.client.reseller client = Client.objects.filter(reseller=reseller, name=kwargs['client_name']) clientuser = ClientUser.objects.filter(client=client, user_id=kwargs['user_id']).first() if not clientuser or not clientuser.owner: return Response("User does not exist", status=status.HTTP_404_NOT_FOUND) user = clientuser.owner token = get_jwt_token(user) return Response(token, status=status.HTTP_200_OK)
def upload(self, request, video_id=None): """ Upload a video file. """ try: video_upload_url = models.VideoUploadUrl.objects.available().get(public_video_id=video_id) except models.VideoUploadUrl.DoesNotExist: return Response(status=rest_status.HTTP_404_NOT_FOUND) # CORS headers cors_headers = {} if video_upload_url.origin: cors_headers["Access-Control-Allow-Origin"] = video_upload_url.origin # OPTIONS call if request.method == 'OPTIONS': return Response({}, headers=cors_headers) # POST call video_file = request.FILES.get('file') if video_file is None or video_file.size == 0: return Response({'file': "Missing argument"}, status=rest_status.HTTP_400_BAD_REQUEST, headers=cors_headers) tasks.upload_video(video_upload_url.public_video_id, video_file) return Response({'id': video_upload_url.public_video_id}, headers=cors_headers)
def get(self, request, commodity_id): result = check_token(request) if result['permission'] is False: return result['response'] if all(auth not in result['auth'] for auth in ['1', '11', ]): alert = {'alert': '??????????????'} return Response(alert, status=status.HTTP_403_FORBIDDEN) commodity = self.get_object(commodity_id) if commodity is None: alert = {'alert': '?????????'} return Response(alert, status=status.HTTP_404_NOT_FOUND) serializer = CommoditySerializer(commodity) return Response(serializer.data)
def get(self, request, ie_id): result = check_token(request) if result['permission'] is False: return result['response'] if all(auth not in result['auth'] for auth in ['1', '11', ]): alert = {'alert': '??????????????'} return Response(alert, status=status.HTTP_403_FORBIDDEN) in_entrepot_detail_list = self.get_object_list(ie_id) if not in_entrepot_detail_list: try: in_entrepot = InEntrepot.objects.get(ie_id=ie_id) except InEntrepot.DoesNotExist: alert = {'alert': '????????'} return Response(alert, status=status.HTTP_404_NOT_FOUND) serializer = InEntrepotSerializer(in_entrepot) return Response(serializer.data) serializer = InEntrepotDetailSerializer(in_entrepot_detail_list, many=True) return Response(serializer.data)
def get(self, request, oe_id): result = check_token(request) if result['permission'] is False: return result['response'] if all(auth not in result['auth'] for auth in ['1', '11', ]): alert = {'alert': '??????????????'} return Response(alert, status=status.HTTP_403_FORBIDDEN) out_entrepot_detail_list = self.get_object_list(oe_id) if not out_entrepot_detail_list: try: out_entrepot = OutEntrepot.objects.get(oe_id=oe_id) except OutEntrepot.DoesNotExist: alert = {'alert': '????????'} return Response(alert, status=status.HTTP_404_NOT_FOUND) serializer = OutEntrepotSerializer(out_entrepot) return Response(serializer.data) serializer = OutEntrepotDetailSerializer(out_entrepot_detail_list, many=True) return Response(serializer.data)
def get(self, request, emp_id): result = check_token(request) if result['permission'] is False: return result['response'] if all(auth not in result['auth'] for auth in ['1', '2', ]) and result['emp_id'] != emp_id: alert = {'alert': '??????????????'} return Response(alert, status=status.HTTP_403_FORBIDDEN) employee = self.get_object(emp_id) if employee is None: alert = {'alert': '?????????'} return Response(alert, status=status.HTTP_404_NOT_FOUND) serializer = EmployeeSerializer(employee) return Response(serializer.data)
def get(self, request, repair_id): result = check_token(request) if not result['permission']: return result['response'] repair = self.get_object(repair_id) if repair is None: alert = {'alert': '????????'} return Response(alert, status=status.HTTP_404_NOT_FOUND) if all(auth not in result['auth'] for auth in ['1', '10']) and repair.part_id != result['part_id']: alert = {'alert': '?????????????'} return Response(alert, status=status.HTTP_403_FORBIDDEN) serializer = RepairSerializer(repair) return Response(serializer.data)
def get(self, request): result = check_token(request) if not result['permission']: return result['response'] if '1' in result['auth']: dailies = Daily.objects.all() else: try: part = Department.objects.get(part_id=result['part_id']) except Department.DoesNotExist: alert = {'alert': '???????'} return Response(alert, status=status.HTTP_404_NOT_FOUND) dailies = Daily.objects.filter(part=part) if not dailies: alert = {'alert': '??????!'} return Response(alert) serializer = DailySerializer(dailies, many=True) return Response(serializer.data)