我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.has_request_context()。
def error_mail(subject, data, r, via_web=True): body = ''' remote URL: {r.url} status code: {r.status_code} request data: {data} status code: {r.status_code} content-type: {r.headers[content-type]} reply: {r.text} '''.format(r=r, data=data) if not has_request_context(): via_web = False if via_web: user = get_username() body = 'site URL: {}\nuser: {}\n'.format(request.url, user) + body send_mail(subject, body)
def make_command(command): """Create an command from a method signature.""" # pylint: disable=missing-docstring @wraps(command) def actualcommand(self, *args, **kwds): data = command(self, *args, **kwds) name = command.__name__[3:] # pylint: disable=protected-access signal = '{uuid}{sep}{event}'.format( uuid=self._uuid, sep=SEPARATOR, event=name ) if flask.has_request_context(): emit(signal, {'data': pack(data)}) else: sio = flask.current_app.extensions['socketio'] sio.emit(signal, {'data': pack(data)}) eventlet.sleep() return actualcommand
def make_getter(getter): """Create an command from a method signature.""" # pylint: disable=missing-docstring def get(self, timeout=10): name = getter.__name__ # pylint: disable=protected-access signal = '{uuid}{sep}{event}'.format( uuid=self._uuid, sep=SEPARATOR, event=name ) event = LightQueue(1) if flask.has_request_context(): emit(signal, callback=lambda x: event.put(unpack(x))) else: sio = flask.current_app.extensions['socketio'] sio.emit(signal, callback=lambda x: event.put(unpack(x))) data = event.get(timeout=timeout) return getter(self, data) # don't want to copy the signature in this case get.__doc__ = getter.__doc__ return get
def save(key, value): """Store the key value pair. Parameters ---------- key : str The key to determine where it's stored, you'll need this to load the value later. value : object The value to store in the cache. Returns ------- None """ signal = 'cache_save' if flask.has_request_context(): emit(signal, {'key': pack(key), 'data': pack(value)}) else: sio = flask.current_app.extensions['socketio'] sio.emit(signal, {'key': pack(key), 'data': pack(value)}) eventlet.sleep()
def load(key): """Load the value stored with the key. Parameters ---------- key : str The key to lookup the value stored. Returns ------- object The value if the key exists in the cache, otherwise None. """ signal = 'cache_load' event = LightQueue(1) if flask.has_request_context(): emit(signal, {'data': pack(key)}, callback=event.put) else: sio = flask.current_app.extensions['socketio'] sio.emit(signal, {'data': pack(key)}, callback=event.put) return msgpack.unpackb(bytes(event.get(timeout=10)), encoding='utf8')
def _message(status, content): """Send message interface. Parameters ---------- status : str The type of message content : str """ event = 'message.{}'.format(status) if flask.has_request_context(): emit(event, dict(data=pack(content))) else: sio = flask.current_app.extensions['socketio'] sio.emit(event, dict(data=pack(content))) eventlet.sleep()
def get_user_session(): if has_request_context() and not hasattr(_request_ctx_stack.top, 'user_session'): # Look in the header user_session = get_user_session_from_header() # Fallback to the cookie if user_session is None: user_session = get_user_session_from_cookie() if user_session is not None: user = user_session.user if not user.is_enabled: user_session = None # Set the user session _request_ctx_stack.top.user_session = user_session user_session = getattr(_request_ctx_stack.top, 'user_session', None) if user_session is None: user_session = AnonymousSession() return user_session
def get_request_information(): """ Returns a dictionary of contextual information about the user at the time of logging. Returns: The dictionary. """ information = {} if has_request_context(): information["request"] = { "api_endpoint_method": request.method, "api_endpoint": request.path, "ip": request.remote_addr, "platform": request.user_agent.platform, "browser": request.user_agent.browser, "browser_version": request.user_agent.version, "user_agent":request.user_agent.string } if api.auth.is_logged_in(): user = api.user.get_user() team = api.user.get_team() groups = api.team.get_groups() information["user"] = { "username": user["username"], "email": user["email"], "team_name": team["team_name"], "groups": [group["name"] for group in groups] } return information
def test_context_test(self): app = flask.Flask(__name__) self.assert_false(flask.request) self.assert_false(flask.has_request_context()) ctx = app.test_request_context() ctx.push() try: self.assert_true(flask.request) self.assert_true(flask.has_request_context()) finally: ctx.pop()
def get_request_information(): """ Returns a dictionary of contextual information about the user at the time of logging. Returns: The dictionary. """ information = {} if has_request_context(): information["request"] = { "api_endpoint_method": request.method, "api_endpoint": request.path, "ip": request.remote_addr, "platform": request.user_agent.platform, "browser": request.user_agent.browser, "browser_version": request.user_agent.version, "user_agent":request.user_agent.string } if api.auth.is_logged_in(): user = api.user.get_user() team = api.user.get_team() groups = api.team.get_groups() information["user"] = { "username": user["username"], "email": user["email"], "team_name": team["team_name"], "school": team["school"], "groups": [group["name"] for group in groups] } return information
def message(self): if (self.legacy and has_request_context() and g.get('api_version') and g.api_version == API_VERSIONS.v1): return self.details # legacy for v1: return dict in message if getattr(self, 'error_message', None) is not None: return self.error_message # raise ValidateError('error message') if self.details: # raise ValidateError(details={'smth': 'wrong'}) return u'Invalid data: {0}'.format( json.dumps(self.details, ensure_ascii=False)) return u'Invalid data' # raise ValidateError()
def execute(self, response): """Generates fixture objects from the given response and stores them in the application-specific cache. :param response: the recorded :class:`Response` """ if not has_request_context: return self._fallback_fixture_names() try: app = self.auto_fixture.app # Create response fixture fixture = Fixture.from_response(response, app, self.response_name) self.auto_fixture.add_fixture(fixture) # Create request fixture if request.data: fixture = Fixture.from_request(request, app, self.request_name) self.auto_fixture.add_fixture(fixture) except TypeError: # pragma: no cover warnings.warn("Could not create fixture for unsupported mime type") return response
def login_user(cls, email, password): try: user = cls.query.filter_by(email=email).one() except exc.InvalidRequestError: return None if pbkdf2.crypt(password, user.pwhash) == user.pwhash: if flask.has_request_context(): user.last_login_ip = flask.request.remote_addr db.session.commit() return user return None
def create(cls, email, nick, password, team=None): first_user = True if not cls.query.count() else False user = cls() db.session.add(user) user.email = email user.nick = nick user.set_password(password) if not first_user: user.team = team else: user.promote() if flask.has_request_context(): user.create_ip = flask.request.remote_addr return user
def _get_auth(): if has_request_context() and not hasattr(_request_ctx_stack.top, 'api_auth'): return default_auth_data return getattr(_request_ctx_stack.top, 'api_auth', None)
def _get_params(): if has_request_context() and hasattr(_request_ctx_stack.top, 'api_params'): return getattr(_request_ctx_stack.top, 'api_params', None) return None
def get_locale(): """Defines what's the current language for the user. It uses different approaches""" # 'en' is supported by default supported_languages = ['en'] + [ translation.language for translation in babel.list_translations() ] locale = None # This is used also from tasks (which are not in a context environment) if has_request_context(): # If user accesses http://localhost:5000/?locale=es force it to Spanish, for example locale = request.args.get('locale', None) if locale not in supported_languages: locale = None # If not explicitly stated (?locale=something), use whatever WebLab-Deusto said if locale is None: locale = weblab_user.locale or None if locale not in supported_languages: locale = None if locale is None: locale = weblab_user.data.get('locale') # Otherwise, check what the web browser is using (the web browser might state multiple # languages) if has_request_context(): if locale is None: locale = request.accept_languages.best_match(supported_languages) # Otherwise... use the default one (English) if locale is None: locale = 'en' # Store the decision so next time we don't need to check everything again if weblab_user.active: weblab_user.data['locale'] = locale return locale
def paginate(self, page=None, per_page=None, error_out=True): """Returns `per_page` items from page `page`. By default it will abort with 404 if no items were found and the page was larger than 1. This behavor can be disabled by setting `error_out` to `False`. If page or per_page are None, they will be retrieved from the request query. If the values are not ints and ``error_out`` is true, it will abort with 404. If there is no request or they aren't in the query, they default to page 1 and 20 respectively. Returns an :class:`Pagination` object. """ if has_request_context(): if page is None: try: page = int(request.args.get('page', 1)) except (TypeError, ValueError): if error_out: abort(404) page = 1 if per_page is None: try: per_page = int(request.args.get('per_page', 20)) except (TypeError, ValueError): if error_out: abort(404) per_page = 20 else: if page is None: page = 1 if per_page is None: per_page = 20 if error_out and page < 1: abort(404) items = self.limit(per_page).offset((page - 1) * per_page).all() if not items and page != 1 and error_out: abort(404) # No need to count if we're on the first page and there are fewer # items than we expected. if page == 1 and len(items) < per_page: total = len(items) else: total = self.order_by(None).count() return Pagination(self, page, per_page, total, items)