我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用pyramid.view.view_config()。
def class_view(request): ctx = request.context user_id = authenticated_userid(request) or Everyone check = check_permissions(ctx, user_id, CrudPermissions.READ) view = request.GET.get('view', None) or ctx.get_default_view() or 'id_only' tombstones = asbool(request.GET.get('tombstones', False)) q = ctx.create_query(view == 'id_only', tombstones) if check == IF_OWNED: if user_id == Everyone: raise HTTPUnauthorized() q = ctx.get_target_class().restrict_to_owners(q, user_id) if view == 'id_only': return [ctx._class.uri_generic(x) for (x,) in q.all()] else: permissions = ctx.get_permissions() r = [i.generic_json(view, user_id, permissions) for i in q.all()] return [x for x in r if x is not None] # @view_config(context=InstanceContext, renderer='json', name="jsonld", # request_method='GET', permission=P_READ, # accept="application/ld+json;q=0.9") # @view_config(context=InstanceContext, renderer='json', # request_method='GET', permission=P_READ, # accept="application/ld+json;q=0.9")
def get_user_reviews(request): """RESTful version of getting all reviews of a task """ logger.debug('get_user_reviews is running') reviewer_id = request.matchdict.get('id', -1) # also try to get reviews with specified status review_status = request.params.get('status', None) if review_status: where_conditions = \ """where "Reviews".reviewer_id = %(reviewer_id)s and "Reviews_Statuses".code = '%(status)s' """ % { 'reviewer_id': reviewer_id, 'status': review_status } else: where_conditions = """where "Reviews".reviewer_id = %(reviewer_id)s""" % { 'reviewer_id': reviewer_id } return get_reviews(request, where_conditions) # @view_config( # route_name='get_user_reviews_count', # renderer='json' # )
def update_email_settings(request): session = DBSession() user_id = authenticated_userid(request) if not user_id: return HTTPFound('/login') new_email = request.params.get('email') contactable = True if request.params.get('emailContact') == "on" else False session.query(User).filter(User.id == user_id).\ update({User.contactable: contactable, User.email: new_email}) params = {"message": "Congratulations! Email settings successfully updated", "message_type": "success"} return HTTPFound(location=request.route_url('account_settings', _query=params)) # @view_config(route_name='home', renderer='../templates/login.mako') # def home(request): # try: # user = get_user(request) # headers = remember(request, user.id) # return HTTPFound("/team", headers=headers) # except: # return HTTPFound("/login") # # return common_context( # # request.registry.settings['SOCIAL_AUTH_AUTHENTICATION_BACKENDS'], # # load_strategy(request), # # user=get_user(request), # # plus_id=request.registry.settings.get( # # 'SOCIAL_AUTH_GOOGLE_PLUS_KEY' # # ), # # )
def done(request): user = get_user(request) headers = remember(request, user.id) return HTTPFound('/team', headers=headers) # return {"user": get_user(request), # "plus_id": request.registry.settings.get( # 'SOCIAL_AUTH_STEAM_KEY' # ), # } # return common_context( # request.registry.settings['SOCIAL_AUTH_AUTHENTICATION_BACKENDS'], # load_strategy(request), # user=get_user(request), # plus_id=request.registry.settings['SOCIAL_AUTH_GOOGLE_PLUS_KEY'], # ) # @view_config(route_name='email_required', renderer='common:templates/home.jinja2') # def email_required(request): # strategy = load_strategy(request) # partial_token = request.GET.get('partial_token') # partial = strategy.partial_load(partial_token) # return common_context( # request.registry.settings['SOCIAL_AUTH_AUTHENTICATION_BACKENDS'], # strategy, # user=get_user(request), # plus_id=request.registry.settings['SOCIAL_AUTH_GOOGLE_PLUS_KEY'], # email_required=True, # partial_backend_name=partial.backend, # partial_token=partial_token # )
def add_localizer(event): request = event.request localizer = get_localizer(request) if '_LOCALE_' in request.GET: language = request.GET['_LOCALE_'] print('lang is ', language) response = request.response response.set_cookie('_LOCALE_', value=language, max_age=31536000) def auto_translate(string, mapping=None, domain=None): return localizer.translate(tsf(string),mapping=mapping, domain=domain) request.localizer = localizer request.translate = auto_translate # @view_config(route_name='locale') # def set_locale_cookie(request): # if 'language' in request.GET: # language = request.GET['language'] # response = Response() # response.set_cookie('_LOCALE_', # value=language, # max_age=31536000) # max_age = year # return HTTPFound(location='/', # headers=response.headers) # class MyRequest(Request): # pass
def view_config(*args, **kwargs): """Override pyramid's view_config to log API requests and responses.""" return pyramid_view_config(*args, decorator=logging_view_decorator, **kwargs)
def get_task_last_reviews(request): """RESTful version of getting all reviews of a task """ logger.debug('get_task_last_reviews is running') task_id = request.matchdict.get('id', -1) task = Task.query.filter(Task.id == task_id).first() if not task: transaction.abort() return Response('There is no task with id: %s' % task_id, 500) where_condition1 = """where "Review_Tasks".id = %(task_id)s""" % { 'task_id': task_id } where_condition2 = '' logger.debug("task.status.code : %s" % task.status.code) if task.status.code == 'PREV': where_condition2 = """ and "Review_Tasks".review_number +1 = "Reviews".review_number""" where_conditions = '%s %s' % (where_condition1, where_condition2) reviews = get_reviews(request, where_conditions) else: # where_condition2 =""" and "Review_Tasks".review_number = "Reviews".review_number""" reviews = [ { 'review_number': task.review_number, 'review_id': 0, 'review_status_code': 'WTNG', 'review_status_name': 'Waiting', 'review_status_color': 'wip', 'task_id': task.id, 'task_review_number': task.review_number, 'reviewer_id': responsible.id, 'reviewer_name': responsible.name, 'reviewer_thumbnail_full_path': responsible.thumbnail.full_path if responsible.thumbnail else None, 'reviewer_department': responsible.departments[0].name } for responsible in task.responsible ] return reviews # @view_config( # route_name='get_user_reviews', # renderer='json' # )
def api_textbook(request): method, params, url_param = get_params(request) if method == 'OPTIONS': return preflight_handler(request) # Get a deck from database if method == "POST": params = json.loads(params) deckid = int(params['deckid']) userid = url_param['userid'] uuid_list = params['uuids'] with transaction.manager: for uuid in uuid_list: importCardsFromCnxDb(uuid, deckid, CNXDB_HOST) target_deck = DBSession.query(Deck).filter( Deck.id == deckid and Deck.user_id == userid).first() deck = card2dict(target_deck.cards) deck['title'] = str(target_deck.title) deck['color'] = str(target_deck.color) deck['id'] = deckid response = update_header(body=json.dumps(deck)) return response # @view_config(route_name='api_textbook', renderer='json') # def api_textbook(request): # method, params, url_param = get_params(request) # if method == 'OPTIONS': # return preflight_handler(request) # # # Get a deck from database # if method == "GET": # params = json.loads(params) # deckid = int(params['deckid']) # uuid_list = params['uuids'] # for uuid in uuid_list: # importCardsFromCnxDb(uuid, deckid, cnxdbHost) # # with transaction.manager: # target_deck = DBSession.query(Deck).filter(Deck.id == deckid).first() # deck = card2dict(target_deck.cards) # deck['title'] = str(target_deck.title) # deck['color'] = str(target_deck.color) # deck['id'] = deck_id # # response = update_header(body=json.dumps(deck)) # return response # @view_config(route_name='add_user', renderer = 'json') # def add_user(request): # req_post = request.POST # user_name = str(req_post['user_name']) # with transaction.manager: # model = User(user_name=user_name) # DBSession.add(model) # return {'add user': 'success'}