Python pyramid.httpexceptions 模块,HTTPOk() 实例源码

我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用pyramid.httpexceptions.HTTPOk()

项目:pyramid_notebook    作者:websauna    | 项目源码 | 文件源码
def serve_websocket(request, port):
    """Start UWSGI websocket loop and proxy."""
    env = request.environ

    # Send HTTP response 101 Switch Protocol downstream
    uwsgi.websocket_handshake(env['HTTP_SEC_WEBSOCKET_KEY'], env.get('HTTP_ORIGIN', ''))

    # Map the websocket URL to the upstream localhost:4000x Notebook instance
    parts = urlparse(request.url)
    parts = parts._replace(scheme="ws", netloc="localhost:{}".format(port))
    url = urlunparse(parts)

    # Proxy initial connection headers
    headers = [(header, value) for header, value in request.headers.items() if header.lower() in CAPTURE_CONNECT_HEADERS]

    logger.info("Connecting to upstream websockets: %s, headers: %s", url, headers)

    ws = ProxyClient(url, headers=headers)
    ws.connect()

    # TODO: Will complain loudly about already send headers - how to abort?
    return httpexceptions.HTTPOk()
项目:pyramid-zappa-api-boilerplate    作者:web-masons    | 项目源码 | 文件源码
def update_user(request):
    # ID
    user_id = request.matchdict.get('user_id')
    current_user_from_db = DBSession().query(User).filter_by(id=user_id).first()
    # First Name
    if request.matchdict.get('first_name') :
        new_first_name = request.matchdict.get('first_name')
    else :
        new_first_name = current_user_from_db.first_name
    # Last Name
    if request.matchdict.get('last_name') :
        new_last_name = request.matchdict.get('last_name')
    else :
        new_last_name = current_user_from_db.last_name

    # Created At
    createdAt = current_user_from_db.created_at

    with transaction.manager:
        DBSession.query(User).filter_by(id=user_id).update({"first_name": new_first_name, "last_name": new_last_name, "created_at": createdAt})
        return HTTPOk()


# DELETE user
项目:idealoom    作者:conversence    | 项目源码 | 文件源码
def event_reorder_up(request):
    """Switch this event with the previous event
    and adjust the next event's previous_event_id"""
    ctx = request.context
    instance = ctx._instance
    db = instance.db
    next_event = db.query(TimelineEvent).filter_by(
        previous_event_id=instance.id).first()
    previous_event = instance.previous_event
    pre_previous_id = previous_event.previous_event_id
    # clear first to avoid index uniqueness checks
    previous_event.previous_event_id = None
    instance.previous_event_id = None
    if next_event:
        next_event.previous_event_id = None
    db.flush()
    instance.previous_event_id = pre_previous_id
    previous_event.previous_event_id = instance.id
    if next_event:
        next_event.previous_event_id = previous_event.id
    return HTTPOk()
项目:stalker_pyramid    作者:eoyilmaz    | 项目源码 | 文件源码
def update_sequence(request):
    """runs when adding a new sequence
    """
    logged_in_user = get_logged_in_user(request)

    sequence_id = request.params.get('sequence_id')
    sequence = Sequence.query.filter_by(id=sequence_id).first()

    name = request.params.get('name')
    code = request.params.get('code')

    status_id = request.params.get('status_id')
    status = Status.query.filter_by(id=status_id).first()

    if sequence and code and name and status:
        # get descriptions
        description = request.params.get('description')

        #update the sequence
        sequence.name = name
        sequence.code = code
        sequence.description = description
        sequence.status = status
        sequence.updated_by = logged_in_user
        sequence.date_updated = datetime.datetime.now()

        DBSession.add(sequence)

    else:
        logger.debug('there are missing parameters')
        logger.debug('name      : %s' % name)
        logger.debug('status    : %s' % status)
        HTTPServerError()

    return HTTPOk()
项目:pyramid_sms    作者:websauna    | 项目源码 | 文件源码
def sms_view_test(request):
    """Dummy view to simulate outgoing SMS."""

    send_sms(request, "+15551234", "Test message")

    # Check DummyService recorded this
    service = request.registry.queryAdapter(request, ISMSService)
    assert service.last_outgoing == "Test message"
    return HTTPOk()
项目:pyramid-zappa-api-boilerplate    作者:web-masons    | 项目源码 | 文件源码
def create_user(request):
    newFirstName = request.matchdict.get('first_name')
    newLastName = request.matchdict.get('last_name')
    createdAt = datetime.now()
    user = User(first_name=newFirstName, last_name=newLastName, created_at=createdAt)
    with transaction.manager:
        DBSession.add(user)
        # Flush to get the post.id from the database
        DBSession.flush()
        return HTTPOk()


# Update User
项目:pyramid-zappa-api-boilerplate    作者:web-masons    | 项目源码 | 文件源码
def delete_user(request):
    user_id = request.matchdict.get('user_id')
    user = DBSession.query(User).filter_by(id=user_id).first()
    if not user:
        msg = "The User with id '{}' was not found.".format(user_id)
        return Response(status=404, json_body={'error': msg})
    else:
        with transaction.manager:
            DBSession.delete(user)
            return HTTPOk()
项目:websauna    作者:websauna    | 项目源码 | 文件源码
def redis_test(request):
    redis = get_redis(request)
    redis.set("foo", b"bar")
    assert redis.get("foo") == b"bar"

    redis.set("foo", "ÅÄÖ")
    assert redis.get("foo").decode("utf-8") == "ÅÄÖ"

    return HTTPOk()
项目:websauna    作者:websauna    | 项目源码 | 文件源码
def throttle_sample(request):
    return HTTPOk()
项目:websauna    作者:websauna    | 项目源码 | 文件源码
def traversing_test(self):
        return HTTPOk("Editing: {}".format(self.context.id))
项目:websauna    作者:websauna    | 项目源码 | 文件源码
def traversing_test(self):
        return HTTPOk("Showing: {}".format(self.context.id))
项目:stalker_pyramid    作者:eoyilmaz    | 项目源码 | 文件源码
def update_asset(request):
    """updates an Asset
    """
    logger.debug('***update_asset method starts ***')
    logged_in_user = get_logged_in_user(request)

    # get params
    asset_id = request.matchdict.get('id', -1)
    asset = Asset.query.filter_by(id=asset_id).first()

    name = request.params.get('name')
    code = request.params.get('code')
    description = request.params.get('description')
    type_name = request.params.get('type_name')

    status_id = request.params.get('status_id')
    status = Status.query.filter_by(id=status_id).first()

    if asset and name and code and type_name and status:
        # get the type
        type_ = Type.query\
            .filter_by(target_entity_type='Asset')\
            .filter_by(name=type_name)\
            .first()

        if type_ is None:
            # create a new Type
            type_ = Type(
                name=type_name,
                code=type_name,
                target_entity_type='Asset'
            )

        # update the asset
        logger.debug('code      : %s' % code)
        asset.name = name
        asset.code = code
        asset.description = description
        asset.type = type_
        asset.status = status
        asset.updated_by = logged_in_user
        asset.date_updated = datetime.datetime.now()

        DBSession.add(asset)

    return HTTPOk()
项目:stalker_pyramid    作者:eoyilmaz    | 项目源码 | 文件源码
def create_shot(request):
    """runs when adding a new shot
    """
    logged_in_user = get_logged_in_user(request)

    name = request.params.get('name')
    code = request.params.get('code')

    status_id = request.params.get('status_id')
    status = Status.query.filter_by(id=status_id).first()

    project_id = request.params.get('project_id')
    project = Project.query.filter_by(id=project_id).first()
    logger.debug('project_id   : %s' % project_id)

    if name and code and status and project:
        # get descriptions
        description = request.params.get('description')

        sequence_id = request.params['sequence_id']
        sequence = Sequence.query.filter_by(id=sequence_id).first()

        # get the status_list
        status_list = StatusList.query.filter_by(
            target_entity_type='Shot'
        ).first()

        # there should be a status_list
        # TODO: you should think about how much possible this is
        if status_list is None:
            return HTTPServerError(detail='No StatusList found')

        new_shot = Shot(
            name=name,
            code=code,
            description=description,
            sequence=sequence,
            status_list=status_list,
            status=status,
            created_by=logged_in_user,
            project=project
        )

        DBSession.add(new_shot)

    else:
        logger.debug('there are missing parameters')
        logger.debug('name      : %s' % name)
        logger.debug('code      : %s' % code)
        logger.debug('status    : %s' % status)
        logger.debug('project   : %s' % project)
        HTTPServerError()

    return HTTPOk()
项目:stalker_pyramid    作者:eoyilmaz    | 项目源码 | 文件源码
def update_shot(request):
    """runs when adding a new shot
    """
    logged_in_user = get_logged_in_user(request)

    shot_id = request.params.get('shot_id')
    shot = Shot.query.filter_by(id=shot_id).first()

    name = request.params.get('name')
    code = request.params.get('code')

    cut_in = int(request.params.get('cut_in', 1))
    cut_out = int(request.params.get('cut_out', 1))

    status_id = request.params.get('status_id')
    status = Status.query.filter_by(id=status_id).first()

    if shot and code and name and status:
        # get descriptions
        description = request.params.get('description')

        sequence_id = request.params['sequence_id']
        sequence = Sequence.query.filter_by(id=sequence_id).first()

        #update the shot

        shot.name = name
        shot.code = code
        shot.description = description
        shot.sequences = [sequence]
        shot.status = status
        shot.updated_by = logged_in_user
        shot.date_updated = datetime.datetime.now()
        shot.cut_in = cut_in
        shot.cut_out = cut_out



        DBSession.add(shot)

    else:
        logger.debug('there are missing parameters')
        logger.debug('name      : %s' % name)
        logger.debug('status    : %s' % status)
        HTTPServerError()

    return HTTPOk()
项目:stalker_pyramid    作者:eoyilmaz    | 项目源码 | 文件源码
def append_entities_to_entity(request):
    """Appends entities to entity for example appends Projects to user.projects
    etc.
    """
    logger.debug('append_class_to_entity is running')

    entity_id = request.matchdict.get('id', -1)
    from stalker import Entity
    entity = Entity.query.filter_by(id=entity_id).first()

    # selected_list = get_multi_integer(request, 'selected_items[]')
    from stalker_pyramid.views import EntityViewBase
    selected_list = EntityViewBase.get_multi_integer(request, 'selected_ids')
    logger.debug('selected_list: %s' % selected_list)

    if entity and selected_list:

        appended_entities = Entity.query\
            .filter(Entity.id.in_(selected_list)).all()
        if appended_entities:
            attr_name = appended_entities[0].plural_class_name.lower()
            eval(
                'entity.%(attr_name)s.extend(appended_entities)' %
                {'attr_name': attr_name}
            )
            from stalker.db.session import DBSession
            DBSession.add(entity)

            logger.debug('entity is updated successfully')

            request.session.flash(
                'success:User <strong>%s</strong> is updated successfully' %
                entity.name
            )
            logger.debug('***append_entities_to_entity method ends ***')
    else:
        logger.debug('not all parameters are in request.params')
        from pyramid.httpexceptions import HTTPServerError
        HTTPServerError()

    from pyramid.httpexceptions import HTTPOk
    return HTTPOk()
项目:stalker_pyramid    作者:eoyilmaz    | 项目源码 | 文件源码
def create_studio(request):
    """creates the studio
    """
    name = request.params.get('name', None)
    dwh = request.params.get('dwh', None)
    wh_mon_start = get_time(request, 'mon_start')
    wh_mon_end   = get_time(request, 'mon_end')
    wh_tue_start = get_time(request, 'tue_start')
    wh_tue_end   = get_time(request, 'tue_end')
    wh_wed_start = get_time(request, 'wed_start')
    wh_wed_end   = get_time(request, 'wed_end')
    wh_thu_start = get_time(request, 'thu_start')
    wh_thu_end   = get_time(request, 'thu_end')
    wh_fri_start = get_time(request, 'fri_start')
    wh_fri_end   = get_time(request, 'fri_end')
    wh_sat_start = get_time(request, 'sat_start')
    wh_sat_end   = get_time(request, 'sat_end')
    wh_sun_start = get_time(request, 'sun_start')
    wh_sun_end   = get_time(request, 'sun_end')

    if name and dwh:
        # create new studio
        studio = Studio(
            name=name,
            daily_working_hours=int(dwh)
        )
        wh = WorkingHours()

        def set_wh_for_day(day, start, end):
            if start != end:
                wh[day] = [[start.seconds/60, end.seconds/60]]
            else:
                wh[day] = []

        set_wh_for_day('mon', wh_mon_start, wh_mon_end)
        set_wh_for_day('tue', wh_tue_start, wh_tue_end)
        set_wh_for_day('wed', wh_wed_start, wh_wed_end)
        set_wh_for_day('thu', wh_thu_start, wh_thu_end)
        set_wh_for_day('fri', wh_fri_start, wh_fri_end)
        set_wh_for_day('sat', wh_sat_start, wh_sat_end)
        set_wh_for_day('sun', wh_sun_start, wh_sun_end)

        studio.working_hours = wh

        DBSession.add(studio)
        # Commit will be handled by the zope transaction extension

    return HTTPOk()
项目:stalker_pyramid    作者:eoyilmaz    | 项目源码 | 文件源码
def create_sequence(request):
    """runs when adding a new sequence
    """
    logged_in_user = get_logged_in_user(request)

    name = request.params.get('name')
    code = request.params.get('code')

    status_id = request.params.get('status_id')
    status = Status.query.filter_by(id=status_id).first()

    project_id = request.params.get('project_id')
    project = Project.query.filter_by(id=project_id).first()

    logger.debug('project_id   : %s' % project_id)

    if name and code and status and project:
        # get descriptions
        description = request.params.get('description')

        # get the status_list
        status_list = StatusList.query.filter_by(
            target_entity_type='Sequence'
        ).first()

        # there should be a status_list
        # TODO: you should think about how much possible this is
        if status_list is None:
            return HTTPServerError(detail='No StatusList found')

        new_sequence = Sequence(
            name=name,
            code=code,
            description=description,
            status_list=status_list,
            status=status,
            created_by=logged_in_user,
            project=project
        )

        DBSession.add(new_sequence)

    else:
        logger.debug('there are missing parameters')
        logger.debug('name      : %s' % name)
        logger.debug('code      : %s' % code)
        logger.debug('status    : %s' % status)
        logger.debug('project   : %s' % project)
        HTTPServerError()

    return HTTPOk()