我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用pyramid.httpexceptions.HTTPOk()。
def serve_websocket(request, port): """Start UWSGI websocket loop and proxy.""" env = request.environ # Send HTTP response 101 Switch Protocol downstream uwsgi.websocket_handshake(env['HTTP_SEC_WEBSOCKET_KEY'], env.get('HTTP_ORIGIN', '')) # Map the websocket URL to the upstream localhost:4000x Notebook instance parts = urlparse(request.url) parts = parts._replace(scheme="ws", netloc="localhost:{}".format(port)) url = urlunparse(parts) # Proxy initial connection headers headers = [(header, value) for header, value in request.headers.items() if header.lower() in CAPTURE_CONNECT_HEADERS] logger.info("Connecting to upstream websockets: %s, headers: %s", url, headers) ws = ProxyClient(url, headers=headers) ws.connect() # TODO: Will complain loudly about already send headers - how to abort? return httpexceptions.HTTPOk()
def update_user(request): # ID user_id = request.matchdict.get('user_id') current_user_from_db = DBSession().query(User).filter_by(id=user_id).first() # First Name if request.matchdict.get('first_name') : new_first_name = request.matchdict.get('first_name') else : new_first_name = current_user_from_db.first_name # Last Name if request.matchdict.get('last_name') : new_last_name = request.matchdict.get('last_name') else : new_last_name = current_user_from_db.last_name # Created At createdAt = current_user_from_db.created_at with transaction.manager: DBSession.query(User).filter_by(id=user_id).update({"first_name": new_first_name, "last_name": new_last_name, "created_at": createdAt}) return HTTPOk() # DELETE user
def event_reorder_up(request): """Switch this event with the previous event and adjust the next event's previous_event_id""" ctx = request.context instance = ctx._instance db = instance.db next_event = db.query(TimelineEvent).filter_by( previous_event_id=instance.id).first() previous_event = instance.previous_event pre_previous_id = previous_event.previous_event_id # clear first to avoid index uniqueness checks previous_event.previous_event_id = None instance.previous_event_id = None if next_event: next_event.previous_event_id = None db.flush() instance.previous_event_id = pre_previous_id previous_event.previous_event_id = instance.id if next_event: next_event.previous_event_id = previous_event.id return HTTPOk()
def update_sequence(request): """runs when adding a new sequence """ logged_in_user = get_logged_in_user(request) sequence_id = request.params.get('sequence_id') sequence = Sequence.query.filter_by(id=sequence_id).first() name = request.params.get('name') code = request.params.get('code') status_id = request.params.get('status_id') status = Status.query.filter_by(id=status_id).first() if sequence and code and name and status: # get descriptions description = request.params.get('description') #update the sequence sequence.name = name sequence.code = code sequence.description = description sequence.status = status sequence.updated_by = logged_in_user sequence.date_updated = datetime.datetime.now() DBSession.add(sequence) else: logger.debug('there are missing parameters') logger.debug('name : %s' % name) logger.debug('status : %s' % status) HTTPServerError() return HTTPOk()
def sms_view_test(request): """Dummy view to simulate outgoing SMS.""" send_sms(request, "+15551234", "Test message") # Check DummyService recorded this service = request.registry.queryAdapter(request, ISMSService) assert service.last_outgoing == "Test message" return HTTPOk()
def create_user(request): newFirstName = request.matchdict.get('first_name') newLastName = request.matchdict.get('last_name') createdAt = datetime.now() user = User(first_name=newFirstName, last_name=newLastName, created_at=createdAt) with transaction.manager: DBSession.add(user) # Flush to get the post.id from the database DBSession.flush() return HTTPOk() # Update User
def delete_user(request): user_id = request.matchdict.get('user_id') user = DBSession.query(User).filter_by(id=user_id).first() if not user: msg = "The User with id '{}' was not found.".format(user_id) return Response(status=404, json_body={'error': msg}) else: with transaction.manager: DBSession.delete(user) return HTTPOk()
def redis_test(request): redis = get_redis(request) redis.set("foo", b"bar") assert redis.get("foo") == b"bar" redis.set("foo", "ÅÄÖ") assert redis.get("foo").decode("utf-8") == "ÅÄÖ" return HTTPOk()
def throttle_sample(request): return HTTPOk()
def traversing_test(self): return HTTPOk("Editing: {}".format(self.context.id))
def traversing_test(self): return HTTPOk("Showing: {}".format(self.context.id))
def update_asset(request): """updates an Asset """ logger.debug('***update_asset method starts ***') logged_in_user = get_logged_in_user(request) # get params asset_id = request.matchdict.get('id', -1) asset = Asset.query.filter_by(id=asset_id).first() name = request.params.get('name') code = request.params.get('code') description = request.params.get('description') type_name = request.params.get('type_name') status_id = request.params.get('status_id') status = Status.query.filter_by(id=status_id).first() if asset and name and code and type_name and status: # get the type type_ = Type.query\ .filter_by(target_entity_type='Asset')\ .filter_by(name=type_name)\ .first() if type_ is None: # create a new Type type_ = Type( name=type_name, code=type_name, target_entity_type='Asset' ) # update the asset logger.debug('code : %s' % code) asset.name = name asset.code = code asset.description = description asset.type = type_ asset.status = status asset.updated_by = logged_in_user asset.date_updated = datetime.datetime.now() DBSession.add(asset) return HTTPOk()
def create_shot(request): """runs when adding a new shot """ logged_in_user = get_logged_in_user(request) name = request.params.get('name') code = request.params.get('code') status_id = request.params.get('status_id') status = Status.query.filter_by(id=status_id).first() project_id = request.params.get('project_id') project = Project.query.filter_by(id=project_id).first() logger.debug('project_id : %s' % project_id) if name and code and status and project: # get descriptions description = request.params.get('description') sequence_id = request.params['sequence_id'] sequence = Sequence.query.filter_by(id=sequence_id).first() # get the status_list status_list = StatusList.query.filter_by( target_entity_type='Shot' ).first() # there should be a status_list # TODO: you should think about how much possible this is if status_list is None: return HTTPServerError(detail='No StatusList found') new_shot = Shot( name=name, code=code, description=description, sequence=sequence, status_list=status_list, status=status, created_by=logged_in_user, project=project ) DBSession.add(new_shot) else: logger.debug('there are missing parameters') logger.debug('name : %s' % name) logger.debug('code : %s' % code) logger.debug('status : %s' % status) logger.debug('project : %s' % project) HTTPServerError() return HTTPOk()
def update_shot(request): """runs when adding a new shot """ logged_in_user = get_logged_in_user(request) shot_id = request.params.get('shot_id') shot = Shot.query.filter_by(id=shot_id).first() name = request.params.get('name') code = request.params.get('code') cut_in = int(request.params.get('cut_in', 1)) cut_out = int(request.params.get('cut_out', 1)) status_id = request.params.get('status_id') status = Status.query.filter_by(id=status_id).first() if shot and code and name and status: # get descriptions description = request.params.get('description') sequence_id = request.params['sequence_id'] sequence = Sequence.query.filter_by(id=sequence_id).first() #update the shot shot.name = name shot.code = code shot.description = description shot.sequences = [sequence] shot.status = status shot.updated_by = logged_in_user shot.date_updated = datetime.datetime.now() shot.cut_in = cut_in shot.cut_out = cut_out DBSession.add(shot) else: logger.debug('there are missing parameters') logger.debug('name : %s' % name) logger.debug('status : %s' % status) HTTPServerError() return HTTPOk()
def append_entities_to_entity(request): """Appends entities to entity for example appends Projects to user.projects etc. """ logger.debug('append_class_to_entity is running') entity_id = request.matchdict.get('id', -1) from stalker import Entity entity = Entity.query.filter_by(id=entity_id).first() # selected_list = get_multi_integer(request, 'selected_items[]') from stalker_pyramid.views import EntityViewBase selected_list = EntityViewBase.get_multi_integer(request, 'selected_ids') logger.debug('selected_list: %s' % selected_list) if entity and selected_list: appended_entities = Entity.query\ .filter(Entity.id.in_(selected_list)).all() if appended_entities: attr_name = appended_entities[0].plural_class_name.lower() eval( 'entity.%(attr_name)s.extend(appended_entities)' % {'attr_name': attr_name} ) from stalker.db.session import DBSession DBSession.add(entity) logger.debug('entity is updated successfully') request.session.flash( 'success:User <strong>%s</strong> is updated successfully' % entity.name ) logger.debug('***append_entities_to_entity method ends ***') else: logger.debug('not all parameters are in request.params') from pyramid.httpexceptions import HTTPServerError HTTPServerError() from pyramid.httpexceptions import HTTPOk return HTTPOk()
def create_studio(request): """creates the studio """ name = request.params.get('name', None) dwh = request.params.get('dwh', None) wh_mon_start = get_time(request, 'mon_start') wh_mon_end = get_time(request, 'mon_end') wh_tue_start = get_time(request, 'tue_start') wh_tue_end = get_time(request, 'tue_end') wh_wed_start = get_time(request, 'wed_start') wh_wed_end = get_time(request, 'wed_end') wh_thu_start = get_time(request, 'thu_start') wh_thu_end = get_time(request, 'thu_end') wh_fri_start = get_time(request, 'fri_start') wh_fri_end = get_time(request, 'fri_end') wh_sat_start = get_time(request, 'sat_start') wh_sat_end = get_time(request, 'sat_end') wh_sun_start = get_time(request, 'sun_start') wh_sun_end = get_time(request, 'sun_end') if name and dwh: # create new studio studio = Studio( name=name, daily_working_hours=int(dwh) ) wh = WorkingHours() def set_wh_for_day(day, start, end): if start != end: wh[day] = [[start.seconds/60, end.seconds/60]] else: wh[day] = [] set_wh_for_day('mon', wh_mon_start, wh_mon_end) set_wh_for_day('tue', wh_tue_start, wh_tue_end) set_wh_for_day('wed', wh_wed_start, wh_wed_end) set_wh_for_day('thu', wh_thu_start, wh_thu_end) set_wh_for_day('fri', wh_fri_start, wh_fri_end) set_wh_for_day('sat', wh_sat_start, wh_sat_end) set_wh_for_day('sun', wh_sun_start, wh_sun_end) studio.working_hours = wh DBSession.add(studio) # Commit will be handled by the zope transaction extension return HTTPOk()
def create_sequence(request): """runs when adding a new sequence """ logged_in_user = get_logged_in_user(request) name = request.params.get('name') code = request.params.get('code') status_id = request.params.get('status_id') status = Status.query.filter_by(id=status_id).first() project_id = request.params.get('project_id') project = Project.query.filter_by(id=project_id).first() logger.debug('project_id : %s' % project_id) if name and code and status and project: # get descriptions description = request.params.get('description') # get the status_list status_list = StatusList.query.filter_by( target_entity_type='Sequence' ).first() # there should be a status_list # TODO: you should think about how much possible this is if status_list is None: return HTTPServerError(detail='No StatusList found') new_sequence = Sequence( name=name, code=code, description=description, status_list=status_list, status=status, created_by=logged_in_user, project=project ) DBSession.add(new_sequence) else: logger.debug('there are missing parameters') logger.debug('name : %s' % name) logger.debug('code : %s' % code) logger.debug('status : %s' % status) logger.debug('project : %s' % project) HTTPServerError() return HTTPOk()