我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用pyramid.httpexceptions.HTTPForbidden()。
def retrieve_channel_information(request): channel_id = request.matchdict['channel_id'] parent_id = '/channels/{}'.format(channel_id) registrations, count = request.registry.storage.get_all( collection_id=REGISTRATION_COLLECTION_ID, parent_id=parent_id) user_registered = [r for r in registrations if r['id'] == request.prefixed_userid] if not user_registered: raise httpexceptions.HTTPForbidden() return {"data": { "registrations": count, "push": 0 }} # Channel Registration views
def shell2(request): # Make sure we have a logged in user auth = request.registry.queryUtility(IAuthenticationPolicy) username = auth.authenticated_userid(request) if not username: # This will trigger HTTP Basic Auth dialog, as per basic_challenge handler below raise httpexceptions.HTTPForbidden("You need to be logged in. Hint: user / password") notebook_context = {"greeting": "**Executing shell2 context**\n"} config_file = request.registry.settings["global_config"]["__file__"] startup.make_startup(notebook_context, config_file) startup.add_script(notebook_context, SCRIPT) startup.add_greeting(notebook_context, GREETING) return launch_notebook(request, username, notebook_context=notebook_context)
def delete_plugin(request): """ Delete A plugin """ plugin_id = request.matchdict['plugin_id'] user = request.user query = request.db_session.query(LedPlugin).filter(LedPlugin.id == plugin_id) plugin = query.first() if plugin is None: raise exc.HTTPBadRequest('No such plugin') if user != plugin.user and not user.admin: raise exc.HTTPForbidden("You don't have access to do that") request.db_session.query(LedSchedule).filter(LedSchedule.led_plugin_id == plugin_id).delete() request.db_session.query(LedPluginProposed).filter(LedPluginProposed.led_plugin_id == plugin_id).delete() query.delete() log(request, 'Deleted plugin ' + plugin.name) return exc.HTTPFound(location='/plugin')
def show_group(request): # a regular user will have their groups listed # admin will see all groups user = request.user group = request.db_session.query(LedGroup).filter(LedGroup.id == request.matchdict['group_id']).first() users = request.db_session.query(LedGroupUser).filter(LedGroupUser.led_group == group).all() if not (user.admin or user.id in pluck(users, 'led_user_id')): raise exc.HTTPForbidden("Only site admins or group members can view this") schedule = request.db_session.query(LedSchedule).filter(LedSchedule.led_group == group).order_by(LedSchedule.position.asc()).all() subquery = request.db_session.query(LedGroupUser.led_user_id).filter(LedGroupUser.led_group == group) other_users = request.db_session.query(LedUser).filter(~LedUser.id.in_(subquery)) subquery = request.db_session.query(LedSchedule.led_plugin_id).filter(LedSchedule.led_group == group) # other_plugins = request.db_session.query(LedPlugin).filter(~LedPlugin.id.in_(subquery)) other_plugins = request.db_session.query(LedPlugin) return { 'group': group, 'users': users, 'schedule': schedule, 'other_users': other_users, 'other_plugins': other_plugins, 'group_admin': can_modify_group(request, group.id, False) }
def submission_media_(request): link_type = request.matchdict['linktype'] submitid = int(request.matchdict['submitid']) if link_type == "submissions": link_type = "submission" submission = Submission.query.get(submitid) if submission is None: raise httpexceptions.HTTPForbidden() elif submission.is_hidden or submission.is_friends_only: raise httpexceptions.HTTPForbidden() media_items = media.get_submission_media(submitid) if not media_items.get(link_type): raise httpexceptions.HTTPNotFound() return Response(headerlist=[ ('X-Accel-Redirect', str(media_items[link_type][0]['file_url']),), ('Cache-Control', 'max-age=0',), ])
def get_logged_in_user(cls, request): """Returns the logged in user as User instance :param request: Request object """ from stalker import User from stalker.db.session import DBSession with DBSession.no_autoflush: user = User.query \ .filter_by(login=authenticated_userid(request)).first() if not user: raise HTTPForbidden(request) return user
def reset_password_page(request): session = DBSession() guid = request.params.get('guid') user_id = request.params.get('u') reset = session.query(PasswordReset).filter(PasswordReset.user_id == user_id).first() if not reset.validate_guid(guid): raise HTTPForbidden() else: # Link is over 24 hours old if reset.time + datetime.timedelta(days=1) < datetime.datetime.now(): raise HTTPForbidden() return {"guid": guid, "user_id": user_id}
def reset_password(request): session = DBSession() guid = request.params.get('guid') user_id = request.params.get('user_id') new_password = request.params.get('new_password') confirm_new_password = request.params.get('confirm_new_password') if not session.query(PasswordReset).filter(PasswordReset.user_id == user_id).first().validate_guid(guid): raise HTTPForbidden() if confirm_new_password != new_password: params = {"message": "Passwords did not match", "message_type": "change_password"} return HTTPFound(location=request.route_url('viewAccount', _query=params)) pword_invalid_check = check_invalid_password(new_password, confirm_new_password) if pword_invalid_check: return HTTPFound(location=request.route_url('login', _query=pword_invalid_check)) session.query(User).filter(User.id == user_id).update({User.password: bcrypt.encrypt(new_password)}) session.query(PasswordReset).filter(PasswordReset.user_id == user_id).delete() params = {"message": "Congratulations! Password successfully changed", "message_type": "success"} return HTTPFound(location=request.route_url('login', _query=params))
def permission_denied(self, request, message): # Todo figure out how to determine if this is a authorization vs authentication error. raise HTTPForbidden(detail=message)
def notebook_proxy(request): # Make sure we have a logged in user auth = request.registry.queryUtility(IAuthenticationPolicy) username = auth.authenticated_userid(request) if not username: # This will trigger HTTP Basic Auth dialog, as per basic_challenge handler below raise httpexceptions.HTTPForbidden("You need to be logged in. Hint: user / password") return _notebook_proxy(request, username)
def shell1(request): # Make sure we have a logged in user auth = request.registry.queryUtility(IAuthenticationPolicy) username = auth.authenticated_userid(request) if not username: # This will trigger HTTP Basic Auth dialog, as per basic_challenge handler below raise httpexceptions.HTTPForbidden("You need to be logged in. Hint: user / password") notebook_context = {"greeting": "**Executing shell1 context**\n"} config_file = request.registry.settings["global_config"]["__file__"] startup.make_startup(notebook_context, config_file) return launch_notebook(request, username, notebook_context=notebook_context)
def shutdown_notebook(request): # Make sure we have a logged in user auth = request.registry.queryUtility(IAuthenticationPolicy) username = auth.authenticated_userid(request) if not username: # This will trigger HTTP Basic Auth dialog, as per basic_challenge handler below raise httpexceptions.HTTPForbidden("You need to be logged in. Hint: user / password") _shutdown_notebook(request, username) return HTTPFound(request.route_url("home"))
def forbidden_view(self): if authenticated_userid(self.request): return HTTPForbidden() url = self.request.route_url('login', _query=( ('next', self.request.path),) ) return HTTPFound(url)
def admin_only(func): def _admin_only(*args, **kwargs): user = args[1].user if user is None or not user.admin: print user, "is forbidden" raise exc.HTTPForbidden('You do not have sufficient permissions to view this page') return func(args, **kwargs) return _admin_only
def update_plugin(request): # make sure the plugin id exists plugin_id = request.matchdict['plugin_id'] plugin = request.db_session.query(LedPlugin).filter(LedPlugin.id == plugin_id).first() if not plugin: raise exc.HTTPBadRequest("No such plugin") else: # only a site admin or plugin owner can edit # a non-admin plugin owner can suggest changes user = request.user if user != plugin.user and not user.admin: raise exc.HTTPForbidden("You don't have access to do that") POST = {k: to_null(v) for k, v in request.POST.items()} if not user.admin: if 'code' not in POST: raise exc.HTTPBadRequest("Please provide the code") plugin_update = request.db_session.query(LedPluginProposed).filter(LedPluginProposed.led_plugin_id == plugin_id).first() if plugin_update is not None: plugin_update.code = POST['code'] else: request.db_session.add(LedPluginProposed(led_plugin_id=plugin_id, code=POST['code'])) log(request, "Proposed changes to plugin <a href='/plugin/{}'>{}</a>".format(plugin.id, plugin.name)) return { 'msg': 'Your update is now awaiting approval' } else: if POST['code']: plugin.code = POST['code'] if 'name' in POST and POST['name']: plugin.name = POST['name'] log(request, 'Updated <a href="/plugin/{0}">plugin {1}</a>: '.format(plugin_id, plugin.name, )) return { 'msg': 'Updated code' }
def can_modify_group(request, gid, raise_exc=True): user = request.user if user.admin: return True group_user = request.db_session.query(LedGroupUser).filter(and_( LedGroupUser.led_group_id == gid, LedGroupUser.led_user == user)).first() if group_user is None or group_user.access_level != 2: if raise_exc: raise exc.HTTPForbidden("You can't modify this group") return False return True
def cached_view_embedded(context, request): source = context.model.source allowed = set(source['principals_allowed']['view']) if allowed.isdisjoint(request.effective_principals): raise HTTPForbidden() return source['embedded']
def cached_view_object(context, request): source = context.model.source allowed = set(source['principals_allowed']['view']) if allowed.isdisjoint(request.effective_principals): raise HTTPForbidden() return source['object']
def cached_view_audit(context, request): source = context.model.source allowed = set(source['principals_allowed']['audit']) if allowed.isdisjoint(request.effective_principals): raise HTTPForbidden() return { '@id': source['object']['@id'], 'audit': source['audit'], }
def cached_view_audit_self(context, request): source = context.model.source allowed = set(source['principals_allowed']['audit']) if allowed.isdisjoint(request.effective_principals): raise HTTPForbidden() path = source['object']['@id'] return { '@id': path, 'audit': [a for a in chain(*source['audit'].values()) if a['path'] == path], }
def get_changes(self): # this method is called from all replica servers # and either returns changelog entry content for {serial} or, # if it points to the "next" serial, will block and wait # until that serial is committed. However, after # MAX_REPLICA_BLOCK_TIME, we return 202 Accepted to indicate # the replica should try again. The latter has two benefits: # - nginx' timeout would otherwise return 504 (Gateway Timeout) # - if the replica is not waiting anymore we would otherwise # never time out here, leading to more and more threads # if no commits happen. if not self.xom.is_master(): raise HTTPForbidden("Replication protocol disabled") expected_uuid = self.request.headers.get(H_EXPECTED_MASTER_ID, None) master_uuid = self.xom.config.get_master_uuid() # we require the header but it is allowed to be empty # (during initialization) if expected_uuid is None: msg = "replica sent no %s header" % H_EXPECTED_MASTER_ID threadlog.error(msg) raise HTTPBadRequest(msg) if expected_uuid and expected_uuid != master_uuid: threadlog.error("expected %r as master_uuid, replica sent %r", master_uuid, expected_uuid) raise HTTPBadRequest("expected %s as master_uuid, replica sent %s" % (master_uuid, expected_uuid)) serial = self.request.matchdict["serial"] with self.update_replica_status(serial): keyfs = self.xom.keyfs if serial.lower() == "nop": raw_entry = b"" else: try: serial = int(serial) except ValueError: raise HTTPNotFound("serial needs to be int") raw_entry = self._wait_for_entry(serial) devpi_serial = keyfs.get_current_serial() r = Response(body=raw_entry, status=200, headers={ str("Content-Type"): str("application/octet-stream"), str("X-DEVPI-SERIAL"): str(devpi_serial), }) return r