Python pyramid.httpexceptions 模块,HTTPForbidden() 实例源码

我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用pyramid.httpexceptions.HTTPForbidden()

项目:webpush-channels    作者:webpush-channels    | 项目源码 | 文件源码
def retrieve_channel_information(request):
    channel_id = request.matchdict['channel_id']
    parent_id = '/channels/{}'.format(channel_id)

    registrations, count = request.registry.storage.get_all(
        collection_id=REGISTRATION_COLLECTION_ID,
        parent_id=parent_id)

    user_registered = [r for r in registrations if r['id'] == request.prefixed_userid]

    if not user_registered:
        raise httpexceptions.HTTPForbidden()

    return {"data": {
        "registrations": count,
        "push": 0
    }}


# Channel Registration views
项目:pyramid_notebook    作者:websauna    | 项目源码 | 文件源码
def shell2(request):

    # Make sure we have a logged in user
    auth = request.registry.queryUtility(IAuthenticationPolicy)
    username = auth.authenticated_userid(request)

    if not username:
        # This will trigger HTTP Basic Auth dialog, as per basic_challenge handler below
        raise httpexceptions.HTTPForbidden("You need to be logged in. Hint: user / password")

    notebook_context = {"greeting": "**Executing shell2 context**\n"}
    config_file = request.registry.settings["global_config"]["__file__"]
    startup.make_startup(notebook_context, config_file)
    startup.add_script(notebook_context, SCRIPT)
    startup.add_greeting(notebook_context, GREETING)

    return launch_notebook(request, username, notebook_context=notebook_context)
项目:TonsleyLEDManager    作者:JonnoFTW    | 项目源码 | 文件源码
def delete_plugin(request):
    """
    Delete A plugin
    """
    plugin_id = request.matchdict['plugin_id']
    user = request.user
    query = request.db_session.query(LedPlugin).filter(LedPlugin.id == plugin_id)
    plugin = query.first()
    if plugin is None:
        raise exc.HTTPBadRequest('No such plugin')
    if user != plugin.user and not user.admin:
        raise exc.HTTPForbidden("You don't have access to do that")
    request.db_session.query(LedSchedule).filter(LedSchedule.led_plugin_id == plugin_id).delete()
    request.db_session.query(LedPluginProposed).filter(LedPluginProposed.led_plugin_id == plugin_id).delete()
    query.delete()
    log(request, 'Deleted plugin ' + plugin.name)
    return exc.HTTPFound(location='/plugin')
项目:TonsleyLEDManager    作者:JonnoFTW    | 项目源码 | 文件源码
def show_group(request):
    # a regular user will have their groups listed
    # admin will see all groups
    user = request.user
    group = request.db_session.query(LedGroup).filter(LedGroup.id == request.matchdict['group_id']).first()

    users = request.db_session.query(LedGroupUser).filter(LedGroupUser.led_group == group).all()
    if not (user.admin or user.id in pluck(users, 'led_user_id')):
        raise exc.HTTPForbidden("Only site admins or group members can view this")
    schedule = request.db_session.query(LedSchedule).filter(LedSchedule.led_group == group).order_by(LedSchedule.position.asc()).all()
    subquery = request.db_session.query(LedGroupUser.led_user_id).filter(LedGroupUser.led_group == group)
    other_users = request.db_session.query(LedUser).filter(~LedUser.id.in_(subquery))
    subquery = request.db_session.query(LedSchedule.led_plugin_id).filter(LedSchedule.led_group == group)
    # other_plugins = request.db_session.query(LedPlugin).filter(~LedPlugin.id.in_(subquery))
    other_plugins = request.db_session.query(LedPlugin)
    return {
        'group': group,
        'users': users,
        'schedule': schedule,
        'other_users': other_users,
        'other_plugins': other_plugins,
        'group_admin': can_modify_group(request, group.id, False)
    }
项目:weasyl    作者:Weasyl    | 项目源码 | 文件源码
def submission_media_(request):
    link_type = request.matchdict['linktype']
    submitid = int(request.matchdict['submitid'])
    if link_type == "submissions":
        link_type = "submission"

    submission = Submission.query.get(submitid)
    if submission is None:
        raise httpexceptions.HTTPForbidden()
    elif submission.is_hidden or submission.is_friends_only:
        raise httpexceptions.HTTPForbidden()
    media_items = media.get_submission_media(submitid)
    if not media_items.get(link_type):
        raise httpexceptions.HTTPNotFound()

    return Response(headerlist=[
        ('X-Accel-Redirect', str(media_items[link_type][0]['file_url']),),
        ('Cache-Control', 'max-age=0',),
    ])
项目:stalker_pyramid    作者:eoyilmaz    | 项目源码 | 文件源码
def get_logged_in_user(cls, request):
        """Returns the logged in user as User instance

        :param request: Request object
        """
        from stalker import User
        from stalker.db.session import DBSession
        with DBSession.no_autoflush:
            user = User.query \
                .filter_by(login=authenticated_userid(request)).first()

        if not user:
            raise HTTPForbidden(request)

        return user
项目:fantasy-dota-heroes    作者:ThePianoDentist    | 项目源码 | 文件源码
def reset_password_page(request):
    session = DBSession()
    guid = request.params.get('guid')
    user_id = request.params.get('u')
    reset = session.query(PasswordReset).filter(PasswordReset.user_id == user_id).first()
    if not reset.validate_guid(guid):
        raise HTTPForbidden()
    else:
        # Link is over 24 hours old
        if reset.time + datetime.timedelta(days=1) < datetime.datetime.now():
            raise HTTPForbidden()
        return {"guid": guid, "user_id": user_id}
项目:fantasy-dota-heroes    作者:ThePianoDentist    | 项目源码 | 文件源码
def reset_password(request):
    session = DBSession()
    guid = request.params.get('guid')
    user_id = request.params.get('user_id')
    new_password = request.params.get('new_password')
    confirm_new_password = request.params.get('confirm_new_password')
    if not session.query(PasswordReset).filter(PasswordReset.user_id == user_id).first().validate_guid(guid):
        raise HTTPForbidden()

    if confirm_new_password != new_password:
        params = {"message": "Passwords did not match",
                  "message_type": "change_password"}
        return HTTPFound(location=request.route_url('viewAccount', _query=params))

    pword_invalid_check = check_invalid_password(new_password, confirm_new_password)
    if pword_invalid_check:
        return HTTPFound(location=request.route_url('login', _query=pword_invalid_check))

    session.query(User).filter(User.id == user_id).update({User.password: bcrypt.encrypt(new_password)})
    session.query(PasswordReset).filter(PasswordReset.user_id == user_id).delete()
    params = {"message": "Congratulations! Password successfully changed",
              "message_type": "success"}
    return HTTPFound(location=request.route_url('login', _query=params))
项目:pyramid-restful-framework    作者:danpoland    | 项目源码 | 文件源码
def permission_denied(self, request, message):
        # Todo figure out how to determine if this is a authorization vs authentication error.
        raise HTTPForbidden(detail=message)
项目:pyramid_notebook    作者:websauna    | 项目源码 | 文件源码
def notebook_proxy(request):

    # Make sure we have a logged in user
    auth = request.registry.queryUtility(IAuthenticationPolicy)
    username = auth.authenticated_userid(request)

    if not username:
        # This will trigger HTTP Basic Auth dialog, as per basic_challenge handler below
        raise httpexceptions.HTTPForbidden("You need to be logged in. Hint: user / password")


    return _notebook_proxy(request, username)
项目:pyramid_notebook    作者:websauna    | 项目源码 | 文件源码
def shell1(request):

    # Make sure we have a logged in user
    auth = request.registry.queryUtility(IAuthenticationPolicy)
    username = auth.authenticated_userid(request)

    if not username:
        # This will trigger HTTP Basic Auth dialog, as per basic_challenge handler below
        raise httpexceptions.HTTPForbidden("You need to be logged in. Hint: user / password")

    notebook_context = {"greeting": "**Executing shell1 context**\n"}
    config_file = request.registry.settings["global_config"]["__file__"]
    startup.make_startup(notebook_context, config_file)

    return launch_notebook(request, username, notebook_context=notebook_context)
项目:pyramid_notebook    作者:websauna    | 项目源码 | 文件源码
def shutdown_notebook(request):

    # Make sure we have a logged in user
    auth = request.registry.queryUtility(IAuthenticationPolicy)
    username = auth.authenticated_userid(request)

    if not username:
        # This will trigger HTTP Basic Auth dialog, as per basic_challenge handler below
        raise httpexceptions.HTTPForbidden("You need to be logged in. Hint: user / password")

    _shutdown_notebook(request, username)

    return HTTPFound(request.route_url("home"))
项目:antut    作者:lyctc    | 项目源码 | 文件源码
def forbidden_view(self):
        if authenticated_userid(self.request):
            return HTTPForbidden()
        url = self.request.route_url('login', _query=(
            ('next', self.request.path),)
        )
        return HTTPFound(url)
项目:TonsleyLEDManager    作者:JonnoFTW    | 项目源码 | 文件源码
def admin_only(func):
    def _admin_only(*args, **kwargs):
        user = args[1].user
        if user is None or not user.admin:
            print user, "is forbidden"
            raise exc.HTTPForbidden('You do not have sufficient permissions to view this page')
        return func(args, **kwargs)
    return _admin_only
项目:TonsleyLEDManager    作者:JonnoFTW    | 项目源码 | 文件源码
def update_plugin(request):
    # make sure the plugin id exists
    plugin_id = request.matchdict['plugin_id']
    plugin = request.db_session.query(LedPlugin).filter(LedPlugin.id == plugin_id).first()
    if not plugin:
        raise exc.HTTPBadRequest("No such plugin")
    else:
        # only a site admin or plugin owner can edit

        # a non-admin plugin owner can suggest changes
        user = request.user
        if user != plugin.user and not user.admin:
            raise exc.HTTPForbidden("You don't have access to do that")
        POST = {k: to_null(v) for k, v in request.POST.items()}
        if not user.admin:
            if 'code' not in POST:
                raise exc.HTTPBadRequest("Please provide the code")
            plugin_update = request.db_session.query(LedPluginProposed).filter(LedPluginProposed.led_plugin_id == plugin_id).first()
            if plugin_update is not None:
                    plugin_update.code = POST['code']
            else:
                request.db_session.add(LedPluginProposed(led_plugin_id=plugin_id, code=POST['code']))
            log(request, "Proposed changes to plugin <a href='/plugin/{}'>{}</a>".format(plugin.id, plugin.name))
            return {
                'msg': 'Your update is now awaiting approval'
            }

        else:
            if POST['code']:
                plugin.code = POST['code']
            if 'name' in POST and POST['name']:
                plugin.name = POST['name']
            log(request, 'Updated <a href="/plugin/{0}">plugin {1}</a>: '.format(plugin_id, plugin.name, ))
            return {
                'msg': 'Updated code'
            }
项目:TonsleyLEDManager    作者:JonnoFTW    | 项目源码 | 文件源码
def can_modify_group(request, gid, raise_exc=True):
    user = request.user
    if user.admin:
        return True
    group_user = request.db_session.query(LedGroupUser).filter(and_(
        LedGroupUser.led_group_id == gid,
        LedGroupUser.led_user == user)).first()
    if group_user is None or group_user.access_level != 2:
        if raise_exc:
            raise exc.HTTPForbidden("You can't modify this group")
        return False
    return True
项目:snovault    作者:ENCODE-DCC    | 项目源码 | 文件源码
def cached_view_embedded(context, request):
    source = context.model.source
    allowed = set(source['principals_allowed']['view'])
    if allowed.isdisjoint(request.effective_principals):
        raise HTTPForbidden()
    return source['embedded']
项目:snovault    作者:ENCODE-DCC    | 项目源码 | 文件源码
def cached_view_object(context, request):
    source = context.model.source
    allowed = set(source['principals_allowed']['view'])
    if allowed.isdisjoint(request.effective_principals):
        raise HTTPForbidden()
    return source['object']
项目:snovault    作者:ENCODE-DCC    | 项目源码 | 文件源码
def cached_view_audit(context, request):
    source = context.model.source
    allowed = set(source['principals_allowed']['audit'])
    if allowed.isdisjoint(request.effective_principals):
        raise HTTPForbidden()
    return {
        '@id': source['object']['@id'],
        'audit': source['audit'],
    }
项目:snovault    作者:ENCODE-DCC    | 项目源码 | 文件源码
def cached_view_audit_self(context, request):
    source = context.model.source
    allowed = set(source['principals_allowed']['audit'])
    if allowed.isdisjoint(request.effective_principals):
        raise HTTPForbidden()
    path = source['object']['@id']
    return {
        '@id': path,
        'audit': [a for a in chain(*source['audit'].values()) if a['path'] == path],
    }
项目:devpi    作者:devpi    | 项目源码 | 文件源码
def get_changes(self):
        # this method is called from all replica servers
        # and either returns changelog entry content for {serial} or,
        # if it points to the "next" serial, will block and wait
        # until that serial is committed.  However, after
        # MAX_REPLICA_BLOCK_TIME, we return 202 Accepted to indicate
        # the replica should try again.  The latter has two benefits:
        # - nginx' timeout would otherwise return 504 (Gateway Timeout)
        # - if the replica is not waiting anymore we would otherwise
        #   never time out here, leading to more and more threads
        # if no commits happen.

        if not self.xom.is_master():
            raise HTTPForbidden("Replication protocol disabled")
        expected_uuid = self.request.headers.get(H_EXPECTED_MASTER_ID, None)
        master_uuid = self.xom.config.get_master_uuid()
        # we require the header but it is allowed to be empty
        # (during initialization)
        if expected_uuid is None:
            msg = "replica sent no %s header" % H_EXPECTED_MASTER_ID
            threadlog.error(msg)
            raise HTTPBadRequest(msg)

        if expected_uuid and expected_uuid != master_uuid:
            threadlog.error("expected %r as master_uuid, replica sent %r", master_uuid,
                      expected_uuid)
            raise HTTPBadRequest("expected %s as master_uuid, replica sent %s" %
                                 (master_uuid, expected_uuid))

        serial = self.request.matchdict["serial"]

        with self.update_replica_status(serial):
            keyfs = self.xom.keyfs
            if serial.lower() == "nop":
                raw_entry = b""
            else:
                try:
                    serial = int(serial)
                except ValueError:
                    raise HTTPNotFound("serial needs to be int")
                raw_entry = self._wait_for_entry(serial)

            devpi_serial = keyfs.get_current_serial()
            r = Response(body=raw_entry, status=200, headers={
                str("Content-Type"): str("application/octet-stream"),
                str("X-DEVPI-SERIAL"): str(devpi_serial),
            })
            return r