我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask._request_ctx_stack.top()。
def get_connection(self, bind=None): """Return a py2neo.Graph object based on config settings""" # assume default if no bind specified if bind is None: bind = self.DEFAULT_GRAPH_KEY # credentials are bound to the application context ctx = stack.top if ctx is not None: if not hasattr(ctx, 'ogm_graphs'.format(bind)): ctx.ogm_graphs = {} if bind not in ctx.ogm_graphs: ctx.ogm_graphs[bind] = self.connect(bind=bind) return ctx.ogm_graphs[bind] raise OutOfApplicationContextError()
def _get_translations(): """Returns the correct gettext translations. Copy from flask-babel with some modifications. """ ctx = _request_ctx_stack.top if ctx is None: return None # babel should be in extensions for get_locale if 'babel' not in ctx.app.extensions: return None translations = getattr(ctx, 'wtforms_translations', None) if translations is None: dirname = messages_path() translations = support.Translations.load( dirname, [get_locale()], domain='wtforms' ) ctx.wtforms_translations = translations return translations
def extract_token(self): ''' Extracts a token from the current HTTP request if it is available. Invokes the `save_user` callback if authentication is successful. ''' header = request.headers.get(b'authorization') if header and header.startswith(b'Negotiate '): token = header[10:] user, token = _gssapi_authenticate(token, self._service_name) if token is not None: stack.top.kerberos_token = token if user is not None: self._save_user(user) else: # Invalid Kerberos ticket, we could not complete authentication abort(403)
def _gssapi_authenticate(token): state = None ctx = stack.top try: rc, state = kerberos.authGSSServerInit(_SERVICE_NAME) if rc != kerberos.AUTH_GSS_COMPLETE: return None rc = kerberos.authGSSServerStep(state, token) if rc == kerberos.AUTH_GSS_COMPLETE: ctx.kerberos_token = kerberos.authGSSServerResponse(state) ctx.kerberos_user = kerberos.authGSSServerUserName(state) return rc elif rc == kerberos.AUTH_GSS_CONTINUE: return kerberos.AUTH_GSS_CONTINUE else: return None except kerberos.GSSError: return None finally: if state: kerberos.authGSSServerClean(state)
def requires_authentication(function): @wraps(function) def decorated(*args, **kwargs): header = request.headers.get("Authorization") if header: ctx = stack.top token = ''.join(header.split()[1:]) rc = _gssapi_authenticate(token) if rc == kerberos.AUTH_GSS_COMPLETE: g.user = ctx.kerberos_user response = function(*args, **kwargs) response = make_response(response) if ctx.kerberos_token is not None: response.headers['WWW-Authenticate'] = ' '.join(['negotiate', ctx.kerberos_token]) return response elif rc != kerberos.AUTH_GSS_CONTINUE: return _forbidden() return _unauthorized() return decorated
def get_user_session(): if has_request_context() and not hasattr(_request_ctx_stack.top, 'user_session'): # Look in the header user_session = get_user_session_from_header() # Fallback to the cookie if user_session is None: user_session = get_user_session_from_cookie() if user_session is not None: user = user_session.user if not user.is_enabled: user_session = None # Set the user session _request_ctx_stack.top.user_session = user_session user_session = getattr(_request_ctx_stack.top, 'user_session', None) if user_session is None: user_session = AnonymousSession() return user_session
def __exit__(self, exc_type, exc_value, tb): self.preserve_context = False # on exit we want to clean up earlier. Normally the request context # stays preserved until the next request in the same thread comes # in. See RequestGlobals.push() for the general behavior. top = _request_ctx_stack.top if top is not None and top.preserved: top.pop()
def redis_connection(self): ctx = stack.top if ctx is not None: if not hasattr(ctx, 'tus_redis'): ctx.tus_redis = self.redis_connect() return ctx.tus_redis
def query_current_user(): return getattr(_request_ctx_stack.top, 'current_identity', None)
def check_jwt_authorization(): current_identity = getattr(_request_ctx_stack.top, 'current_identity', None) if current_identity: return current_identity skip_check = False if current_app.config.get("disable_jwt", False): skip_check = True if request.endpoint in current_app.view_functions: fn = current_app.view_functions[request.endpoint] # Check Flask-RESTful endpoints for openness if hasattr(fn, "view_class"): exempt = getattr(fn.view_class, "no_jwt_check", []) if request.method in exempt: skip_check = True elif fn in _open_endpoints: skip_check = True # the static folder is open to all without authentication if request.endpoint == "static" or request.url.endswith("favicon.ico"): skip_check = True # In case the endpoint requires no authorization, and the request does not # carry any authorization info as well, we will not try to verify any JWT's if skip_check and 'Authorization' not in request.headers: return token, auth_type = get_auth_token_and_type() current_identity = verify_token(token, auth_type) if auth_type == "JWT": # Cache this token cache_token(current_identity) # Authorization token has now been converted to a verified payload _request_ctx_stack.top.current_identity = current_identity return current_identity
def use_forwarded_port(graph): """ Inject the `X-Forwarded-Port` (if any) into the current URL adapter. The URL adapter is used by `url_for` to build a URLs. """ # There must be a better way! context = _request_ctx_stack.top if _request_ctx_stack is None: return None # determine the configured overrides forwarded_host = graph.config.port_forwarding.get("host") forwarded_port = request.headers.get("X-Forwarded-Port") if not forwarded_port and not forwarded_host: return None # determine the current server name if ":" in context.url_adapter.server_name: server_host, server_port = context.url_adapter.server_name.split(":", 1) else: server_host = context.url_adapter.server_name server_port = 443 if context.url_adapter.url_scheme == "https" else 80 # choose a new server name if forwarded_host: server_name = forwarded_host elif server_port: server_name = "{}:{}".format(server_host, forwarded_port) else: server_name = "{}:{}".format(server_host, server_port) context.url_adapter.server_name = server_name return server_name
def teardown_request(self, exception): ctx = _ctx_stack.top if hasattr(ctx, "mysql_db"): ctx.mysql_db.close()
def get_db(self): ctx = _ctx_stack.top if ctx is not None: if not hasattr(ctx, "mysql_db"): ctx.mysql_db = self.connect() return ctx.mysql_db
def _generate_request_arguments(url, spec, endpoint, headers, args, kwargs): # Prepare (g)requests arguments data = None params = None custom_url = url if hasattr(stack.top, 'call_id'): headers['KlueCallID'] = stack.top.call_id if hasattr(stack.top, 'call_path'): headers['KlueCallPath'] = stack.top.call_path if endpoint.param_in_path: # Fill url with values from kwargs, and remove those params from kwargs custom_url = _format_flask_url(url, kwargs) if endpoint.param_in_query: # The query parameters are contained in **kwargs params = kwargs # TODO: validate params? or let the server do that... elif endpoint.param_in_body: # The body parameter is the first elem in *args if len(args) != 1: raise ValidationError("%s expects exactly 1 parameter" % endpoint.handler_client) data = json.dumps(spec.model_to_json(args[0])) # Prune undefined parameters that would otherwise be turned into '=None' # query params if params: for k in list(params.keys()): if params[k] is None: del params[k] return custom_url, params, data, headers
def append_header(self, response): ''' Adds WWW-Authenticate header with SPNEGO challenge or Kerberos token ''' token = getattr(stack.top, 'kerberos_token', None) if response.status_code == 401: # Negotiate is an additional authenticate method. response.headers.add('WWW-Authenticate', 'Negotiate') elif token: response.headers['WWW-Authenticate'] = 'Negotiate {}'.format(token) return response
def before_request(self): ctx = _request_ctx_stack.top ctx.mysql_db = self.connect()
def teardown_request(self, exception): ctx = _request_ctx_stack.top if hasattr(ctx, "mysql_db"): ctx.mysql_db.close()
def get_db(self): ctx = _request_ctx_stack.top if ctx is not None: return ctx.mysql_db