我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pyramid.httpexceptions.HTTPNotFound()。
def view_corpus(request): page = int(request.matchdict['page']) corpus = get_corpus(request) if corpus is None or page > (corpus.n_docs - 1): raise exc.HTTPNotFound() nextp = page + 1 if nextp >= corpus.n_docs: nextp = None prevp = page - 1 if prevp < 0: prevp = None return { 'corpus': corpus, 'doc': corpus[page], 'nextp': nextp, 'prevp': prevp, 'page': page }
def add_language(request): """ This view adds a language :param request: a request object :return: None """ if request.authenticated_userid is None: raise exc.HTTPNotFound() name = request.matchdict["language"] public_group_id = request.matchdict["public_group_id"] language = models.Language.get_by_name(name) if not language: language = models.Language(name=name) request.db.add(language) # We need to flush the db session here so that language.id will be generated. request.db.flush() url = request.route_url('translation_read', public_language_id=language.pubid, public_group_id=public_group_id) return exc.HTTPSeeOther(url)
def add_page(request): """ Add a page to the database :param request: a request object :return: a redirect to the translation_read URL """ if request.authenticated_userid is None: raise exc.HTTPNotFound() name = request.matchdict["language_name"] page_id = urllib.unquote(urllib.unquote(request.matchdict["page_url"])) public_group_id = request.matchdict["public_group_id"] language = annotran.languages.models.Language.get_by_name(name) page = annotran.pages.models.Page.get_by_uri(page_id) if not page: page = annotran.pages.models.Page(uri=page_id) request.db.add(page) request.db.flush() url = request.route_url('translation_read', public_language_id=language.pubid, public_group_id=public_group_id) return exc.HTTPSeeOther(url)
def activate_by_email(self, activation_code: str, location=None) -> Response: """Active a user after user after the activation email. * User clicks link in the activation email * User enters the activation code on the form by hand """ request = self.request settings = request.registry.settings user_registry = get_user_registry(request) after_activate_url = request.route_url(settings.get('websauna.activate_redirect', 'index')) login_after_activation = asbool(settings.get('websauna.login_after_activation', False)) user = user_registry.activate_user_by_email_token(activation_code) if not user: raise HTTPNotFound("Activation code not found") if login_after_activation: login_service = get_login_service(self.request.registry) return login_service.authenticate(self.request, user) else: self.request.registry.notify(RegistrationActivatedEvent(self.request, user, None)) return HTTPFound(location=location or after_activate_url)
def reset_password(self, activation_code: str, password: str, location=None) -> Response: """Perform actual password reset operations. User has following password reset link (GET) or enters the code on a form. """ request = self.request user_registry = get_user_registry(request) user = user_registry.get_user_by_password_reset_token(activation_code) if not user: return HTTPNotFound("Activation code not found") user_registry.reset_password(user, password) messages.add(request, msg="The password reset complete. Please sign in with your new password.", kind='success', msg_id="msg-password-reset-complete") request.registry.notify(PasswordResetEvent(self.request, user, password)) request.registry.notify(UserAuthSensitiveOperation(self.request, user, "password_reset")) location = location or get_config_route(request, 'websauna.reset_password_redirect') return HTTPFound(location=location)
def discussion_attitude(request, for_api=False, api_data=None): """ View configuration for discussion step, where we will ask the user for her attitude towards a statement. :param request: request of the web server :param for_api: Boolean :param api_data: :return: dictionary """ # '/discuss/{slug}/attitude/{statement_id}' logger('Views', 'discussion_attitude', 'request.matchdict: {}'.format(request.matchdict)) logger('Views', 'discussion_attitude', 'request.params: {}'.format(request.params)) prepared_discussion = __call_from_discussion_step(request, discussion.attitude, for_api, api_data) if not prepared_discussion: raise HTTPNotFound() return prepared_discussion # justify page
def discussion_justify(request, for_api=False, api_data=None): """ View configuration for discussion step, where we will ask the user for her a justification of her opinion/interest. :param request: request of the web server :param for_api: Boolean :param api_data: :return: dictionary """ # '/discuss/{slug}/justify/{statement_or_arg_id}/{mode}*relation' logger('views', 'discussion_justify', 'request.matchdict: {}'.format(request.matchdict)) logger('views', 'discussion_justify', 'request.params: {}'.format(request.params)) prepared_discussion = __call_from_discussion_step(request, discussion.justify, for_api, api_data) if not prepared_discussion: raise HTTPNotFound() return prepared_discussion # reaction page
def discussion_reaction(request, for_api=False, api_data=None): """ View configuration for discussion step, where we will ask the user for her reaction (support, undercut, rebut)... :param request: request of the web server :param for_api: Boolean :param api_data: :return: dictionary """ # '/discuss/{slug}/reaction/{arg_id_user}/{mode}*arg_id_sys' logger('views', 'discussion_reaction', 'request.matchdict: {}'.format(request.matchdict)) logger('views', 'discussion_reaction', 'request.params: {}'.format(request.params)) prepared_discussion = __call_from_discussion_step(request, discussion.reaction, for_api, api_data) if not prepared_discussion: raise HTTPNotFound() return prepared_discussion # support page
def discussion_support(request, for_api=False, api_data=None): """ View configuration for discussion step, where we will present another supportive argument. :param request: request of the web server :param for_api: Boolean :param api_data: :return: dictionary """ # '/discuss/{slug}/jump/{arg_id}' logger('views', 'discussion_support', 'request.matchdict: {}'.format(request.matchdict)) logger('views', 'discussion_support', 'request.params: {}'.format(request.params)) prepared_discussion = __call_from_discussion_step(request, discussion.support, for_api, api_data) if not prepared_discussion: raise HTTPNotFound() return prepared_discussion # finish page
def discussion_choose(request, for_api=False, api_data=None): """ View configuration for discussion step, where the user has to choose between given statements. :param request: request of the web server :param for_api: Boolean :param api_data: :return: dictionary """ # '/discuss/{slug}/choose/{is_argument}/{supportive}/{id}*pgroup_ids' match_dict = request.matchdict params = request.params logger('discussion_choose', 'def', 'request.matchdict: {}'.format(match_dict)) logger('discussion_choose', 'def', 'request.params: {}'.format(params)) prepared_discussion = __call_from_discussion_step(request, discussion.choose, for_api, api_data) if not prepared_discussion: raise HTTPNotFound() return prepared_discussion # jump page
def get_nuimo_component_view(request): component_id = request.matchdict['component_id'] mac_address = request.matchdict['mac_address'].replace('-', ':') with open(request.registry.settings['nuimo_app_config_path'], 'r') as f: config = yaml.load(f) try: nuimo = config['nuimos'][mac_address] except (KeyError, TypeError): return HTTPNotFound("No Nuimo with such ID") components = nuimo['components'] try: return next(c for c in components if c['id'] == component_id) except StopIteration: raise HTTPNotFound
def delete_nuimo_component_view(request): component_id = request.matchdict['component_id'] mac_address = request.matchdict['mac_address'].replace('-', ':') with open(request.registry.settings['nuimo_app_config_path'], 'r+') as f: config = yaml.load(f) try: nuimo = config['nuimos'][mac_address] except (KeyError, TypeError): return HTTPNotFound("No Nuimo with such ID") components = nuimo['components'] try: component = next(c for c in components if c['id'] == component_id) except StopIteration: raise HTTPNotFound components.remove(component) f.seek(0) # We want to overwrite the config file with the new configuration f.truncate() yaml.dump(config, f, default_flow_style=False)
def root_factory(request, user_id=None): """The factory function for the root context""" # OK, this is the old code... I need to do better, but fix first. from ..models import Discussion if request.matchdict and 'discussion_id' in request.matchdict: discussion_id = int(request.matchdict['discussion_id']) discussion = Discussion.default_db.query(Discussion).get(discussion_id) if not discussion: raise HTTPNotFound("No discussion ID %d" % (discussion_id,)) return discussion elif request.matchdict and 'discussion_slug' in request.matchdict: discussion_slug = request.matchdict['discussion_slug'] discussion = Discussion.default_db.query(Discussion).filter_by( slug=discussion_slug).first() if not discussion: raise HTTPNotFound("No discussion named %s" % (discussion_slug,)) return discussion return app_root_factory(request, user_id)
def delete_idea(request): idea_id = request.matchdict['id'] idea = Idea.get_instance(idea_id) if not idea: raise HTTPNotFound("Idea with id '%s' not found." % idea_id) if isinstance(idea, RootIdea): raise HTTPBadRequest("Cannot delete root idea.") num_childrens = len(idea.children) if num_childrens > 0: raise HTTPBadRequest("Idea cannot be deleted because it still has %d child ideas." % num_childrens) num_extracts = len(idea.extracts) if num_extracts > 0: raise HTTPBadRequest("Idea cannot be deleted because it still has %d extracts." % num_extracts) for link in idea.source_links: link.is_tombstone = True idea.is_tombstone = True # Maybe return tombstone() ? request.response.status = HTTPNoContent.code return HTTPNoContent()
def get_idea_extracts(request): discussion = request.context idea_id = request.matchdict['id'] idea = Idea.get_instance(idea_id) view_def = request.GET.get('view') or 'default' user_id = authenticated_userid(request) or Everyone permissions = request.permissions if not idea: raise HTTPNotFound("Idea with id '%s' not found." % idea_id) extracts = Extract.default_db.query(Extract).filter( Extract.idea_id == idea.id ).order_by(Extract.order.desc()) return [extract.generic_json(view_def, user_id, permissions) for extract in extracts]
def get_agent(request): view_def = request.GET.get('view') or 'default' agent_id = request.matchdict['id'] agent = AgentProfile.get_instance(agent_id) if not agent: raise HTTPNotFound("Agent with id '%s' not found." % agent_id) discussion = request.context user_id = authenticated_userid(request) or Everyone permissions = request.permissions agent_json = agent.generic_json(view_def, user_id, permissions) if user_id == agent.id: # We probably should add all profile info here. agent_json['preferred_email'] = agent.get_preferred_email() return agent_json
def get_object(request): classname = request.matchdict['cls'] id = request.matchdict['id'] view = request.matchdict['view'] or '/default' view = view[1:] cls = getattr(assembl.models, classname, None) if not cls: raise HTTPNotFound("Class '%s' not found." % classname) obj = cls.get(id) if not obj: raise HTTPNotFound("Id %s of class '%s' not found." % (id, classname)) if not get_view_def(view): raise HTTPNotFound("View '%s' not found." % view) user_id = authenticated_userid(request) or Everyone permissions = request.permissions return obj.generic_json(view, user_id, permissions)
def get_synthesis(request): synthesis_id = request.matchdict['id'] discussion = request.context if synthesis_id == 'next_synthesis': synthesis = discussion.get_next_synthesis() else: synthesis = Synthesis.get_instance(synthesis_id) if not synthesis: raise HTTPNotFound("Synthesis with id '%s' not found." % synthesis_id) view_def = request.GET.get('view') or 'default' user_id = authenticated_userid(request) or Everyone return synthesis.generic_json(view_def, user_id, request.permissions) # Update
def register(request): """Set the login route and view.""" if request.method == "GET": return {} if request.method == "POST": username = request.POST['username'] password = request.POST['password'] password_check = request.POST['password-check'] check_username = request.dbsession.query(User.username).filter(User.username == username).one_or_none() if not username or not password: return {'error': 'Please provide a username and password.'} if check_username is None: if password == password_check: new_user = User( username=username, password=hash_password(password) ) request.dbsession.add(new_user) return HTTPFound( location=request.route_url('login'), detail='Registration successful!' ) else: return HTTPNotFound({'error': 'Passwords do not match.'}) return {'error': 'Username already in use.'}
def submission_media_(request): link_type = request.matchdict['linktype'] submitid = int(request.matchdict['submitid']) if link_type == "submissions": link_type = "submission" submission = Submission.query.get(submitid) if submission is None: raise httpexceptions.HTTPForbidden() elif submission.is_hidden or submission.is_friends_only: raise httpexceptions.HTTPForbidden() media_items = media.get_submission_media(submitid) if not media_items.get(link_type): raise httpexceptions.HTTPNotFound() return Response(headerlist=[ ('X-Accel-Redirect', str(media_items[link_type][0]['file_url']),), ('Cache-Control', 'max-age=0',), ])
def item_view(context, request): frame = request.params.get('frame', 'page') if getattr(request, '__parent__', None) is None: # We need the response headers from non subrequests try: response = render_view_to_response(context, request, name=frame) except PredicateMismatch: # Avoid this view emitting PredicateMismatch exc_class, exc, tb = sys.exc_info() exc.__class__ = HTTPNotFound raise_with_traceback(exc, tb) else: if response is None: raise HTTPNotFound('?frame=' + frame) return response path = request.resource_path(context, '@@' + frame) if request.query_string: path += '?' + request.query_string return request.embed(path, as_user=True)
def corpus(self): """ Return a corpus based on environment. It will try to return it from cache, otherwise load it from disk. If corpus hasn't been extracted from the document, it will redirect to a corpus creation tool. """ corpus = get_corpus(self.request) if corpus is None: raise exc.HTTPNotFound() return corpus
def corpus(self): """ Return a corpus based on environment. It will try to return it from cache, otherwise load it from disk. """ corpus = get_corpus(self.request) if corpus is None: raise exc.HTTPNotFound() return corpus
def notfound(context, request): with JSONAPIResponse(request.response) as resp: _in = u'Failed' code, status = JSONAPIResponse.NOT_FOUND request.response.status_int = code message = 'Resource not found' if isinstance(context, HTTPNotFound): if context.content_type == 'application/json': return context elif context.detail: message = context.detail return resp.to_json( _in, code=code, status=status, message=message)
def test_invalid_page(self): request = testing.DummyRequest() request.params['page'] = 'invalid' request.current_route_url = mock.Mock(side_effect=self.get_current_url) self.assertRaises(HTTPNotFound, self.paginate_queryset, request)
def test_invalid_page(self): request = testing.DummyRequest() request.params['page'] = 'invalid' request.current_route_url = mock.Mock(side_effect=self.get_current_url) with pytest.raises(HTTPNotFound): self.paginate_queryset(request)
def test_get_object_not_found(self): view = UserAPIView() view.request = self.request view.lookup_url_kwargs = {'id': 3} self.assertRaises(HTTPNotFound, view.get_object)
def get_object(self): query = self.filter_query(self.get_query()) # If query joins more than one table and you need to base the lookup on something besides # an id field on the self.model, you can provide an alternative lookup as tuple of the model class # and a string of the column name. if isinstance(self.lookup_field, str): lookup_col = getattr(self.model, self.lookup_field) lookup_val = self.lookup_url_kwargs[self.lookup_field] else: assert isinstance(self.lookup_field, tuple), ( "'{}' `lookup_field` attribute should be a string or a tuple of (<model class>, `column`) " .format(self.__class__.__name__) ) lookup_col = getattr(self.lookup_field[0], self.lookup_field[1]) lookup_val = self.lookup_url_kwargs[self.lookup_field[1]] try: instance = query.filter(lookup_col == lookup_val).one() except NoResultFound: raise HTTPNotFound() # May raise HTTPForbidden self.check_object_permissions(self.request, instance) return instance
def test_add_report_to_db_pg_auth_rep_group_lang_none(): """ This should raise HTTPNotFound as all page, author, reporter, group, and language is None. """ request = _mock_request(authenticated_user=mock.Mock(username="test"), matchdict={'public_language_id': '12345', 'public_group_id': '12345', 'user_id': '12345', 'page_uri': 'http://www.annotran_test.com/'}) with pytest.raises(exc.HTTPNotFound): views.add_report(request)
def test_add_report_to_db_translation_none(): """ This should raise HTTPNotFound as translation is None. """ with mock.patch('annotran.languages.models.Language') as language: language.get_by_public_language_id = MagicMock(return_value=language) with mock.patch('annotran.pages.models.Page') as page: page.get_by_uri = MagicMock(return_value=page) with mock.patch('h.groups.models.Group') as group: group.get_by_pubid = MagicMock(return_value=group) with mock.patch('h.models.User') as user: user.get_by_username = MagicMock(return_value=user) with mock.patch('annotran.translations.models.Translation') as translation: translation.get_translation = MagicMock(return_value=None) request = _mock_request(authenticated_user=mock.Mock(username="test"), matchdict={'public_language_id': '12345', 'public_group_id': '12345', 'user_id': '12345', 'page_uri': 'http://www.annotran_test.com/'}) with pytest.raises(exc.HTTPNotFound): views.add_report(request)
def delete_vote(request): """ Delete a vote from the database :param request: the current request object :return: a redirect to language read """ if request.authenticated_user is None: raise exc.HTTPNotFound() public_language_id = request.matchdict["public_language_id"] public_group_id = request.matchdict['public_group_id'] page_uri = urllib.unquote(urllib.unquote(request.matchdict['page_uri'])) page = annotran.pages.models.Page.get_by_uri(page_uri) language = annotran.languages.models.Language.get_by_public_language_id(public_language_id) # only authenticated used can delete translations and consequently their scores user = h.models.User.get_by_username(request.authenticated_user.username) group = h.groups.models.Group.get_by_pubid(public_group_id) if language is None or page is None \ or group is None or user is None: raise exc.HTTPNotFound() models.Vote.delete_votes(page, language, group, user) request.db.flush() return {}
def test_add_vote_to_db_unauthenticated_user(): """ This should raise HTTPNotFound because of unauthenticated user. """ request = _mock_request(authenticated_user=None) with pytest.raises(httpexceptions.HTTPNotFound): views.add_vote(request)
def test_add_vote_to_db_authenticated_user_objects_none(): """ This should raise HTTPNotFound as all page, author, voter, group, and language are None. """ request = _mock_request(matchdict={'page_uri': 'http://www.annotran_test.com', 'public_group_id': "12345", 'public_language_id': "12345", 'score': 5, 'username':"test_username"}, authenticated_user=mock.Mock(id=2, username="test2", uid="test2")) with pytest.raises(httpexceptions.HTTPNotFound): views.add_vote(request)
def test_delete_vote_unauthenticated_user(): """ This should raise HTTPNotFound because of unauthenticated user. """ request = _mock_request(authenticated_user=None) with pytest.raises(httpexceptions.HTTPNotFound): views.delete_vote(request)
def _wait_for_entry(self, serial): keyfs = self.xom.keyfs next_serial = keyfs.get_next_serial() if serial > next_serial: raise HTTPNotFound("can only wait for next serial") elif serial == next_serial: arrived = keyfs.wait_tx_serial(serial, timeout=self.MAX_REPLICA_BLOCK_TIME) if not arrived: raise HTTPAccepted("no new transaction yet", headers={str("X-DEVPI-SERIAL"): str(keyfs.get_current_serial())}) return keyfs.tx.conn.get_raw_changelog_entry(serial)
def versions(self): versions = self.stage.list_versions(self.project) if not versions: raise HTTPNotFound("The project %s does not exist." % self.project) return get_sorted_versions(versions)
def stable_versions(self): versions = self.stage.list_versions(self.project) if not versions: raise HTTPNotFound("The project %s does not exist." % self.project) versions = get_sorted_versions(versions, stable=True) if not versions: raise HTTPNotFound("The project %s has no stable release." % self.project) return versions
def linkstore(self): try: return self.stage.get_linkstore_perstage(self.project, self.version) except self.stage.MissesRegistration: raise HTTPNotFound( "%s-%s is not registered" % (self.project, self.version))
def view_job(request): job_alias = request.matchdict['job_alias'] job = get_job_by_alias(job_alias) if job is None: raise HTTPNotFound() return { 'job': job }
def _account_key_focus(self, eagerload_web=False): dbLetsEncryptAccountKey = lib_db.get__SslLetsEncryptAccountKey__by_id(self.request.api_context, self.request.matchdict['id'], eagerload_web=eagerload_web, ) if not dbLetsEncryptAccountKey: raise HTTPNotFound('the key was not found') return dbLetsEncryptAccountKey # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def _certificate_focus(self): dbServerCertificate = lib_db.get__SslServerCertificate__by_id(self.request.api_context, self.request.matchdict['id']) if not dbServerCertificate: raise HTTPNotFound('the certificate was not found') return dbServerCertificate
def _queue_renewal_focus(self): item = lib_db.get__SslQueueRenewal__by_id(self.request.api_context, self.request.matchdict['id'], load_events=True) if not item: raise HTTPNotFound('the item was not found') return item
def _queue_domain_focus(self): item = lib_db.get__SslQueueDomain__by_id(self.request.api_context, self.request.matchdict['id'], eagerload_log=True) if not item: raise HTTPNotFound('the item was not found') return item
def _domain_focus(self, eagerload_web=False): domain_identifier = self.request.matchdict['domain_identifier'].strip() if domain_identifier.isdigit(): dbDomain = lib_db.get__SslDomain__by_id(self.request.api_context, domain_identifier, preload=True, eagerload_web=eagerload_web) else: dbDomain = lib_db.get__SslDomain__by_name(self.request.api_context, domain_identifier, preload=True, eagerload_web=eagerload_web) if not dbDomain: raise HTTPNotFound('the domain was not found') return dbDomain
def _unique_fqdn_set_focus(self): dbItem = lib_db.get__SslUniqueFQDNSet__by_id(self.request.api_context, self.request.matchdict['id']) if not dbItem: raise HTTPNotFound('the fqdn set was not found') return dbItem
def _certificate_request_focus(self): dbCertificateRequest = lib_db.get__SslCertificateRequest__by_id(self.request.api_context, self.request.matchdict['id']) if not dbCertificateRequest: raise HTTPNotFound('the certificate was not found') return dbCertificateRequest
def certificate_request_deactivate(self): dbCertificateRequest = self._certificate_request_focus() if not dbCertificateRequest.certificate_request_type_is('acme flow'): raise HTTPNotFound('Only availble for Acme Flow') dbCertificateRequest.is_active = False self.request.api_context.dbSession.flush() return HTTPFound('%s/certificate-request/%s?result=success' % (self.request.registry.settings['admin_prefix'], dbCertificateRequest.id)) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def certificate_request_AcmeFlow_manage(self): dbCertificateRequest = self._certificate_request_focus() if not dbCertificateRequest.certificate_request_type_is('acme flow'): raise HTTPNotFound('Only availble for Acme Flow') return {'project': 'peter_sslers', 'SslCertificateRequest': dbCertificateRequest, 'SslCertificateRequest2SslDomain': None, } # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def certificate_request_AcmeFlow_manage_domain(self): dbCertificateRequest = self._certificate_request_focus() if not dbCertificateRequest.certificate_request_type_is('acme flow'): raise HTTPNotFound('Only availble for Acme Flow') dbCertificateRequest2SslDomain = None domain_identifier = self.request.matchdict['domain_identifier'].strip() if domain_identifier.isdigit(): dbDomain = lib_db.get__SslDomain__by_id(self.request.api_context, domain_identifier, preload=False, eagerload_web=False) else: dbDomain = lib_db.get__SslDomain__by_name(self.request.api_context, domain_identifier, preload=False, eagerload_web=False) if not dbDomain: raise HTTPNotFound('invalid domain') for to_domain in dbCertificateRequest.to_domains: if to_domain.ssl_domain_id == dbDomain.id: dbCertificateRequest2SslDomain = to_domain break if dbCertificateRequest2SslDomain is None: raise HTTPNotFound('invalid domain for certificate request') self.db_SslCertificateRequest = dbCertificateRequest self.db_SslCertificateRequest2SslDomain = dbCertificateRequest2SslDomain if self.request.method == 'POST': return self._certificate_request_AcmeFlow_manage_domain__submit() return self._certificate_request_AcmeFlow_manage_domain__print()