我们从Python开源项目中,提取了以下41个代码示例,用于说明如何使用flask.request.blueprint()。
def navigate(self, text, description=None): """A decorator to add a navigation menu entry for the actual view func. This is a decorator like the "route()" from `flask.Flask` or `flask.Blueprint`. It register a navigation menu entry for the current view function. The text argument is used as menu-text and the description sets an optional anchor-title. :param text: The menu entry text. :param description: a anchor title. """ def decorator(f): element = self._navigation_item( bake_endpoint(self.blueprint, f), text, description) self._elements.append(element) return f return decorator
def register_on(self, app): """This registers the `BlueprintNavigation` for the Flask app. This uses a `list` in the flask config object to register the navigation. The key for the `list` is "blueprint_navigation". The template iterates over this list and renders the navigation elements. The order the elements are registered is the order the elements will be shown. :param app: A `flask.Flask` application instance. """ if self.blueprint.name not in app.blueprints: raise Exception("Blueprint %s is not registred in Flask app" % self.blueprint.name) else: assert app.blueprints[self.blueprint.name] is self.blueprint, \ "Blueprint resistred as %s in Flask app is not the one you " \ "register navigation for!" if "blueprint_navigation" not in app.config: app.config["blueprint_navigation"] = list() app.config["blueprint_navigation"].append(self)
def __init__(self, rate_limiter): super(LogApiCallCount, self).__init__() if self.stats: tags = ['version:'+str(Config.API_VERSION_MINOR), 'application:rest-api', 'api-key:'+rate_limiter._auth_key.id, 'method:'+request.environ['REQUEST_METHOD'], 'endpoint:'+request.url_rule.rule.split('<')[0].split(request.blueprint)[1]] ip = get_remote_addr() self.stats.track(get_remote_addr(), 'api_call_count', properties=dict(tags=tags, ip=ip, ignore_time = True, )) # self.stats.increment('api.call.count', # tags=tags,)
def hashed_url_for_static_file(endpoint, values): if 'static' == endpoint or endpoint.endswith('.static'): filename = values.get('filename') if filename: if '.' in endpoint: # has higher priority blueprint = endpoint.rsplit('.', 1)[0] else: blueprint = request.blueprint # can be None too if blueprint: static_folder = app.blueprints[blueprint].static_folder else: static_folder = app.static_folder param_name = 'h' while param_name in values: param_name = '_' + param_name values[param_name] = static_file_hash(os.path.join(static_folder, filename)) ########################################################################### # Routes ################################################################## ###########################################################################
def exempt(self, view): """Mark a view or blueprint to be excluded from CSRF protection. :: @app.route('/some-view', methods=['POST']) @csrf.exempt def some_view(): ... :: bp = Blueprint(...) csrf.exempt(bp) """ if isinstance(view, Blueprint): self._exempt_blueprints.add(view.name) return view if isinstance(view, string_types): view_location = view else: view_location = '%s.%s' % (view.__module__, view.__name__) self._exempt_views.add(view_location) return view
def init_app(self, app): """ Initializes extension with app :param app: Flask application instance :type app: flask.Flask """ if app.debug: # don't mess with debug return @app.url_defaults def hashed_url_for_static_file(endpoint, values): if 'static' == endpoint or endpoint.endswith('.static'): filename = values.get('filename') if filename: # has higher priority blueprint = endpoint.rsplit('.', 1)[0] \ if '.' in endpoint else request.blueprint # file from blueprint or project? static_folder = app.blueprints[blueprint].static_folder \ if blueprint else app.static_folder # avoids querystring key collision param_name = 'h' while param_name in values: param_name = '_' + param_name filepath = safe_join(static_folder, filename) values[param_name] = self.get_file_hash(filepath)
def before_request(): g.log = dict() g.log['out'] = list() g.log['request'] = log_request(request) g.endpoint = request.endpoint if request.blueprint: g.endpoint = g.endpoint[len(request.blueprint):].lstrip('.') reseller_info = get_reseller_info() g.reseller_name = reseller_info.name g.company_name = 'N/A' if not reseller_info.name: allow_public_endpoints_only() return if not check_oauth_signature(request): abort(401) g.auth = reseller_info.auth g.reseller = Reseller(reseller_info.name, reseller_info.id, None) g.reseller.refresh() if not g.reseller.token and not reseller_info.is_new: abort(403)
def before_request(): if current_user.is_authenticated: current_user.ping() if not current_user.confirmed \ and request.endpoint \ and request.blueprint != 'auth' \ and request.endpoint != 'static': return redirect(url_for('auth.unconfirmed'))
def _is_api(request): ''' Checks if the error comes from the api ''' return request.blueprint == 'api' or 'api' in request.url
def security_processer(): if not request.blueprint == 'security': return if request.endpoint == 'security.login': form = CanellaLoginForm() if form.validate_on_submit(): session['lang'] = form.lang.data elif request.endpoint == 'security.logout' and 'lang' in session: session.pop('lang')
def schema(path=None): """Validate the request payload with a JSONSchema. Decorator func that will be used to specify the path to the schema that the route/endpoint will be validated against. :param path: path to the schema file :type path: string :returns: list of errors if there are any :raises: InvalidAPIRequest if there are any errors ex: @schema('/path/to/schema.json') @app.route('/app-route') def app_route(): ... """ def decorator(func): @wraps(func) def wrapped(*args, **kwargs): _path = path.lstrip('/') schema_path = os.path.join(SCHEMAS_PATH, request.blueprint, _path) payload = get_request_payload(request.method)(request) errors = validate_schema(payload, get_schema(schema_path)) if errors: raise InvalidAPIRequest(message=errors) return func(*args, **kwargs) return wrapped return decorator
def __init__(self, blueprint, text, description=None, blueprint_access=None): """Init the `BlueprintNavigation` instance. :param blueprint: A `flask.Blueprint` instance. :param text: The text for the top bar navigation. :param description: An optional anchor title. """ self.blueprint = blueprint self.text = text self.description = description self._elements = [] if blueprint_access is None: blueprint_access = BlueprintAccess(blueprint) self._access = blueprint_access
def is_allowed(self): """Checks if the user has general access to the blueprint. This uses the `BlueprintAccess.has_general_access` to find out if the user has access to this blueprint (and its navigation). If there is no `BlueprintNavigation` given we assume everything is granted. """ return self._access.has_general_access
def is_active(self): """Tells if the blueprint of this instance is active. Active is a blueprint if a view func of it is shown. :return: True if active, False else. """ return self.blueprint.name == request.blueprint
def _show_toolbar(self): """Return a boolean to indicate if we need to show the toolbar.""" if request.blueprint == 'debugtoolbar': return False hosts = current_app.config['DEBUG_TB_HOSTS'] if hosts and request.remote_addr not in hosts: return False return True
def init_app(self, app): app.extensions['csrf'] = self app.config.setdefault('WTF_CSRF_ENABLED', True) app.config.setdefault('WTF_CSRF_CHECK_DEFAULT', True) app.config['WTF_CSRF_METHODS'] = set(app.config.get( 'WTF_CSRF_METHODS', ['POST', 'PUT', 'PATCH', 'DELETE'] )) app.config.setdefault('WTF_CSRF_FIELD_NAME', 'csrf_token') app.config.setdefault( 'WTF_CSRF_HEADERS', ['X-CSRFToken', 'X-CSRF-Token'] ) app.config.setdefault('WTF_CSRF_TIME_LIMIT', 3600) app.config.setdefault('WTF_CSRF_SSL_STRICT', True) app.jinja_env.globals['csrf_token'] = generate_csrf app.context_processor(lambda: {'csrf_token': generate_csrf}) @app.before_request def csrf_protect(): if not app.config['WTF_CSRF_ENABLED']: return if not app.config['WTF_CSRF_CHECK_DEFAULT']: return if request.method not in app.config['WTF_CSRF_METHODS']: return if not request.endpoint: return view = app.view_functions.get(request.endpoint) if not view: return if request.blueprint in self._exempt_blueprints: return dest = '%s.%s' % (view.__module__, view.__name__) if dest in self._exempt_views: return self.protect()
def init_app(self, app): self._app = app app.jinja_env.globals['csrf_token'] = generate_csrf app.config.setdefault( 'WTF_CSRF_HEADERS', ['X-CSRFToken', 'X-CSRF-Token'] ) app.config.setdefault('WTF_CSRF_SSL_STRICT', True) app.config.setdefault('WTF_CSRF_ENABLED', True) app.config.setdefault('WTF_CSRF_CHECK_DEFAULT', True) app.config.setdefault('WTF_CSRF_METHODS', ['POST', 'PUT', 'PATCH']) # expose csrf_token as a helper in all templates @app.context_processor def csrf_token(): return dict(csrf_token=generate_csrf) if not app.config['WTF_CSRF_ENABLED']: return if not app.config['WTF_CSRF_CHECK_DEFAULT']: return @app.before_request def _csrf_protect(): # many things come from django.middleware.csrf if request.method not in app.config['WTF_CSRF_METHODS']: return if self._exempt_views or self._exempt_blueprints: if not request.endpoint: return view = app.view_functions.get(request.endpoint) if not view: return dest = '%s.%s' % (view.__module__, view.__name__) if dest in self._exempt_views: return if request.blueprint in self._exempt_blueprints: return self.protect()
def create_app(configfile=None): # do the imports here to not shadow e.g. "import bayesian.frontend.api_v1" # by Blueprint imported here from .api_v1 import api_v1 from .exceptions import HTTPError from .utils import JSONEncoderWithExtraTypes app = Flask(__name__) AppConfig(app, configfile) cache.init_app(app) # actually init the DB with config values now rdb.init_app(app) app.rdb = rdb # We need JSON encoder that can serialize datetime.datetime app.json_encoder = JSONEncoderWithExtraTypes app.register_blueprint(api_v1) # Redirect to latest API version if /api is accessed app.route('/api')(lambda: redirect(url_for('api_v1.apiendpoints__slashless'))) # Likewise for base URL, and make that accessible by name @app.route('/') def base_url(): return redirect(url_for('api_v1.apiendpoints__slashless')) @app.errorhandler(HTTPError) def handleerrors(e): bp = app.blueprints.get(request.blueprint) # if there's an error pre-request (e.g. during authentication) in non-GET requests, # request.blueprint is not set yet if not bp: # sort by the length of url_prefix, filter out blueprints without prefix bps = reversed(sorted( [(name, b) for name, b in app.blueprints.items() if b.url_prefix is not None], key=lambda tpl: len(tpl[1].url_prefix))) for bp_name, b in bps: if request.environ['PATH_INFO'].startswith(b.url_prefix): bp = b break if bp: handler = getattr(bp, 'coreapi_http_error_handler', None) if handler: return handler(e) return Response(e.error, status=e.status_code) setup_logging(app) @app.before_request def set_current_user(): g.current_user = None return app