我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.request.user()。
def usergetter(self, f): """Register a function as the user getter. This decorator is only required for **password credential** authorization:: @oauth.usergetter def get_user(username, password, client, request, *args, **kwargs): # client: current request client if not client.has_password_credential_permission: return None user = User.get_user_by_username(username) if not user.validate_password(password): return None # parameter `request` is an OAuthlib Request object. # maybe you will need it somewhere return user """ self._usergetter = f return f
def tokengetter(self, f): """Register a function as the token getter. The function accepts an `access_token` or `refresh_token` parameters, and it returns a token object with at least these information: - access_token: A string token - refresh_token: A string token - client_id: ID of the client - scopes: A list of scopes - expires: A `datetime.datetime` object - user: The user object The implementation of tokengetter should accepts two parameters, one is access_token the other is refresh_token:: @oauth.tokengetter def bearer_token(access_token=None, refresh_token=None): if access_token: return get_token(access_token=access_token) if refresh_token: return get_token(refresh_token=refresh_token) return None """ self._tokengetter = f return f
def tokensetter(self, f): """Register a function to save the bearer token. The setter accepts two parameters at least, one is token, the other is request:: @oauth.tokensetter def set_token(token, request, *args, **kwargs): save_token(token, request.client, request.user) The parameter token is a dict, that looks like:: { u'access_token': u'6JwgO77PApxsFCU8Quz0pnL9s23016', u'token_type': u'Bearer', u'expires_in': 3600, u'scope': u'email address' } The request is an object, that contains an user object and a client object. """ self._tokensetter = f return f
def validate_code(self, client_id, code, client, request, *args, **kwargs): """Ensure the grant code is valid.""" client = client or self._clientgetter(client_id) log.debug( 'Validate code for client %r and code %r', client.client_id, code ) grant = self._grantgetter(client_id=client.client_id, code=code) if not grant: log.debug('Grant not found.') return False if hasattr(grant, 'expires') and \ datetime.datetime.utcnow() > grant.expires: log.debug('Grant is expired.') return False request.state = kwargs.get('state') request.user = grant.user request.scopes = grant.scopes return True
def validate_refresh_token(self, refresh_token, client, request, *args, **kwargs): """Ensure the token is valid and belongs to the client This method is used by the authorization code grant indirectly by issuing refresh tokens, resource owner password credentials grant (also indirectly) and the refresh token grant. """ token = self._tokengetter(refresh_token=refresh_token) if token and token.client_id == client.client_id: # Make sure the request object contains user and client_id request.client_id = token.client_id request.user = token.user return True return False
def validate_user(self, username, password, client, request, *args, **kwargs): """Ensure the username and password is valid. Attach user object on request for later using. """ log.debug('Validating username %r and its password', username) if self._usergetter is not None: user = self._usergetter( username, password, client, request, *args, **kwargs ) if user: request.user = user return True return False log.debug('Password credential authorization is disabled.') return False
def authenticated(f): @wraps(f) def decorator(*args, **kwargs): token = (request.json or {}).get('auth') or \ request.values.get('auth') or \ request.cookies.get('auth') request.user = None request.auth = None if token is not None: user_id = get_cache(token) if user_id: try: user = User.get(pk=user_id) except User.DoesNotExist: pass else: request.user = user request.auth = token return f(*args, **kwargs) return decorator
def with_game(f): @wraps(f) def decorator(token, *args, **kwargs): from game import Game try: game = Game.load_game(token) except errors.GameNotStartedError as exc: data = { 'type': consts.TYPES[exc.type]['name'], 'limit': exc.limit, } if (exc.token): data['invite'] = exc.token return send_data(data) except errors.GameNotFoundError as exc: return send_error(exc.message) if game._loaded_by == consts.WHITE: if game.model.player_white is not None and game.model.player_white != request.user: return send_error('wrong user') else: if game.model.player_black is not None and game.model.player_black != request.user: return send_error('wrong user') return f(game, *args, **kwargs) return decorator
def recover(token): @validated(RecoverValidator) def _post(user, data): user.set_password(data['password']) user.save() delete_cache(token) return send_message('password changed') user = User.get_by_token(token) if user: if request.method == 'GET': return send_success() elif request.method == 'POST': return _post(user) return send_error('token not found')
def invited(token): try: enemy_token, game_type, game_limit = get_cache('invite_{}'.format(token)) except: return send_error('game not found') enemy_user = None user_id = get_cache('user_{}'.format(enemy_token)) if user_id: try: enemy_user = User.get(pk=user_id) except User.DoesNotExist: # TODO: if user not found game will be created with None as white player pass user_token = generate_token(True) game = Game.new_game( enemy_token, user_token, game_type, game_limit, white_user=enemy_user, black_user=request.user ) delete_cache('wait_{}'.format(enemy_token)) result = {'game': user_token} result.update(game.get_info(consts.BLACK)) return send_data(result)
def load_game(self, token): try: game = Game.load_game(token) except errors.GameNotStartedError as e: data = { 'type': consts.TYPES[e.type]['name'], 'limit': e.limit, } if (e.token): data['invite'] = e.token return data except errors.GameNotFoundError as e: raise errors.APIException(e.message) if game._loaded_by == consts.WHITE: if game.model.player_white is not None and game.model.player_white != request.user: raise errors.APIException('wrong user') else: if game.model.player_black is not None and game.model.player_black != request.user: raise errors.APIException('wrong user') self.game = game
def get(self): result = [] count = 0 for pool in GamePool.select().where( GamePool.is_started == False, GamePool.is_lost == False, GamePool.player1 is not None, ).order_by(GamePool.date_created.desc()): if pool.user1 and pool.user1 == request.user: continue result.append({ 'id': pool.pk, 'date_created': pool.date_created.isoformat(), 'user': pool.user1.username if pool.user1 else None, 'type': consts.TYPES[pool.type_game]['name'], 'limit': pool.time_limit, }) count += 1 if count > 9: break return {'games': result}
def post(self, game_id): try: pool = GamePool.get(GamePool.pk == game_id) except GamePool.DoesNotExist: raise errors.APINotFound('game') except Exception as e: raise errors.APIException('wrong format') if pool.user1 and pool.user1 == request.user: raise errors.APIException('you cannot start game with yourself') pool.player2 = generate_token(True) pool.user2 = request.user pool.is_started = True pool.save() game = Game.new_game( pool.player1, pool.player2, pool.type_game, pool.time_limit, white_user=pool.user1, black_user=pool.user2 ) delete_cache('wait_{}'.format(pool.player1)) result = {'game': pool.player2} result.update(game.get_info(consts.BLACK)) return result
def get(self, token): try: enemy_token, game_type, game_limit = get_cache('invite_{}'.format(token)) except: raise errors.APINotFound('game') enemy_user = None user_id = get_cache('user_{}'.format(enemy_token)) if user_id: try: enemy_user = User.get(pk=user_id) except User.DoesNotExist: # TODO: if user not found game will be created with None as white player pass user_token = generate_token(True) game = Game.new_game( enemy_token, user_token, game_type, game_limit, white_user=enemy_user, black_user=request.user ) delete_cache('wait_{}'.format(enemy_token)) result = {'game': user_token} result.update(game.get_info(consts.BLACK)) return result
def get(self): ''' ?????? ''' query_dict = request.data user = request.user page, number = self.page_info keys = ['name', 'description'] order_by = gen_order_by(query_dict, keys) filter_dict = gen_filter_dict(query_dict, keys, user=user) albums = Album.query.filter_by( **filter_dict).order_by(*order_by).paginate(page, number) serializer = AlbumSerializer(albums.items, True) pageinfo = PageInfo(albums) return HTTPResponse( HTTPResponse.NORMAL_STATUS, data=serializer.data, pageinfo=pageinfo).to_response()
def post(self): ''' ???? ''' post_data = request.data user = request.user name = post_data.pop('name', None) description = post_data.pop('description', None) if name is None: msg = '????????' return HTTPResponse( HTTPResponse.HTTP_CODE_PARA_ERROR, message=msg).to_response() album = Album(name=name, user=user) if description is not None: album.description = description album.save() serializer = AlbumSerializer(album) return HTTPResponse( HTTPResponse.NORMAL_STATUS, data=serializer.data).to_response()
def put(self, pk): ''' ???? ''' post_data = request.data user = request.user name = post_data.pop('name', None) description = post_data.pop('description', None) album = Album.query.filter_by(id=pk, user=user).first() if not album: msg = '?????' return HTTPResponse( HTTPResponse.HTTP_CODE_NOT_EXIST, message=msg).to_response() if name is not None: album.name = name if description is not None: album.description = description album.save() serializer = AlbumSerializer(album) album.delete() return HTTPResponse( HTTPResponse.NORMAL_STATUS, data=serializer.data).to_response()
def put(self, pk): ''' ?????? ''' post_data = request.data user = request.user name = post_data.pop('name', None) description = post_data.pop('description', None) image = Image.query.filter_by(id=pk, user=user).first() if not image: msg = '?????' return HTTPResponse( HTTPResponse.HTTP_CODE_NOT_EXIST, message=msg).to_response() if name is not None: image.name = name image.url = os.path.join(image.path, name) if description is not None: image.description = description image.save() serializer = ImageSerializer(image) return HTTPResponse( HTTPResponse.NORMAL_STATUS, data=serializer.data).to_response()
def delete(self, pk): ''' ???? ''' user = request.user image = Image.query.filter_by(id=pk, user=user).first() if not image: msg = '?????' return HTTPResponse( HTTPResponse.HTTP_CODE_NOT_EXIST, message=msg).to_response() serializer = ImageSerializer(image) img_path = os.path.join(current_app.config['UPLOAD_FOLDER_ROOT'], image.url) # ???? if os.path.exists(img_path): os.remove(img_path) # ????? thumb_path = os.path.join(current_app.config['UPLOAD_FOLDER_ROOT'], image.url.replace('photo', 'thumb')) if os.path.exists(thumb_path): os.remove(thumb_path) image.delete() return HTTPResponse( HTTPResponse.NORMAL_STATUS, data=serializer.data).to_response()
def process_request(): """ Process request. - Set api_url """ base_url = request.base_url referrer = request.headers.get('referer') if referrer: # we use referrer as base url parts = urlparse(referrer) base_url = urlunparse((parts.scheme, parts.netloc, '', '', '', '')) elif APP_URL: base_url = APP_URL # Used in building full URIs request.api_url = urljoin(base_url, API_PREFIX + '/') request.user = flask_session.get('user') request.realm = flask_session.get('realm', 'employees')
def notifications(): if request.user != 'NOTIFICATION': return ('', 403) data = json.loads(request.data) if not 'events' in data: return ('', 204) for event in data['events']: logging.debug(event) # At this time the possible values for action seem to be # push, pull, delete, and mount: # https://github.com/docker/distribution/blob/master/notifications/event.go#L10 action = event['action'] repo = event['target']['repository'] digest = event['target']['digest'] # tag may not be present tag = event['target'].get('tag') timestamp = dateutil.parser.parse(event['timestamp']) user = event['actor']['name'] logging.info("action '{}' repo '{}' digest '{}' tag '{}' timestamp '{}' user '{}'".format(action, repo, digest, tag, timestamp, user)) # Save to database, etc. here return ('', 204)
def requires_auth(func): ''' Decorator for view functions that require basic authentication. ''' from .models import Session, User @functools.wraps(func) def wrapper(*args, **kwargs): user_name = session.get('user_name') user_passhash = session.get('user_passhash') with Session() as db_session: user = User.get_by(db_session, user_name, user_passhash) if not user: return redirect(url_for('login')) request.user = user return func(*args, **kwargs) return wrapper
def after_request(self, f): """Register functions to be invoked after accessing the resource. The function accepts ``valid`` and ``request`` as parameters, and it should return a tuple of them:: @oauth.after_request def valid_after_request(valid, oauth): if oauth.user in black_list: return False, oauth return valid, oauth """ self._after_request_funcs.append(f) return f
def grantsetter(self, f): """Register a function to save the grant code. The function accepts `client_id`, `code`, `request` and more:: @oauth.grantsetter def set_grant(client_id, code, request, *args, **kwargs): save_grant(client_id, code, request.user, request.scopes) """ self._grantsetter = f return f
def validate_grant_type(self, client_id, grant_type, client, request, *args, **kwargs): """Ensure the client is authorized to use the grant type requested. It will allow any of the four grant types (`authorization_code`, `password`, `client_credentials`, `refresh_token`) by default. Implemented `allowed_grant_types` for client object to authorize the request. It is suggested that `allowed_grant_types` should contain at least `authorization_code` and `refresh_token`. """ if self._usergetter is None and grant_type == 'password': log.debug('Password credential authorization is disabled.') return False default_grant_types = ( 'authorization_code', 'password', 'client_credentials', 'refresh_token', ) # Grant type is allowed if it is part of the 'allowed_grant_types' # of the selected client or if it is one of the default grant types if hasattr(client, 'allowed_grant_types'): if grant_type not in client.allowed_grant_types: return False else: if grant_type not in default_grant_types: return False if grant_type == 'client_credentials': if not hasattr(client, 'user'): log.debug('Client should have a user property') return False request.user = client.user return True
def login_required(f): @wraps(f) def decorator(*args, **kwargs): if getattr(request, 'user', None): return f(*args, **kwargs) return send_error('not authorized') return decorator
def messages(): @validated(MessageValidator) def _post(data): message = ChatMessage.create(user=request.user, text=data['text']) result = {'message': MessageSerializer(message).calc()} send_ws(result, consts.WS_CHAT_MESSAGE) return send_data(result) if request.method == 'GET': try: limit = int(request.args.get('limit', -1)) offset = int(request.args.get('offset', -1)) if limit < 0: limit = config.DEFAULT_COUNT_MESSAGES if offset < 0: offset = 0 except Exception as e: log.error(e) return send_error('wrong arguments') messages = ChatMessage.select()\ .where(ChatMessage.chat == None)\ .order_by(-ChatMessage.date_created)\ .offset(offset)\ .limit(limit) return send_data({ 'messages': [MessageSerializer(m).calc() for m in messages], }) elif request.method == 'POST': return _post()
def register(data): username = data['username'] password = data['password'] email = data['email'] user = User.add(username, password, email) if email: token = user.get_verification() data = { 'username': username, 'url': urljoin(config.SITE_URL, config.VERIFY_URL), 'token': token, } send_mail_template('registration', [email], data=data) return send_message('registration successful')
def verify(token): user = User.get_by_token(token) if user: user.verify() delete_cache(token) return send_success() return send_error('token not found')
def reset(data): try: user = User.get(User.email == data['email']) except User.DoesNotExist: return send_error('email not found') else: token = user.get_reset() data = { 'username': user.username, 'url': urljoin(config.SITE_URL, config.RECOVER_URL), 'token': token, } send_mail_template('reset', [user.email], data=data) return send_success()
def authorized(): if request.user: return send_data({'username': request.user.username}) return send_error('not authorized')
def new(): @validated(GameNewValidator) def _post(data): game_type = data['type'] game_limit = data['limit'] token = generate_token(True) pool = GamePool.create( player1 = token, user1 = request.user, type_game = game_type, time_limit = game_limit, ) set_cache('wait_{}'.format(token), (game_type, game_limit)) return send_data({'game': token}) if request.method == 'GET': result = [] count = 0 for pool in GamePool.select().where( GamePool.is_started == False, GamePool.is_lost == False, GamePool.player1 is not None, ).order_by(GamePool.date_created.desc()): if pool.user1 and pool.user1 == request.user: continue result.append({ 'id': pool.pk, 'date_created': pool.date_created.isoformat(), 'user': pool.user1.username if pool.user1 else None, 'type': consts.TYPES[pool.type_game]['name'], 'limit': pool.time_limit, }) count += 1 if count > 9: break return send_data({'games': result}) elif request.method == 'POST': return _post()
def invite(data): game_type = data['type'] game_limit = data['limit'] if game_type != consts.TYPE_NOLIMIT and not game_limit: return send_error('game limit must be set for no limit game') token_game = generate_token(True) token_invite = generate_token(True) set_cache('invite_{}'.format(token_invite), (token_game, game_type, game_limit)) if request.user: set_cache('user_{}'.format(token_game), request.user.pk, 3600) set_cache('wait_{}'.format(token_game), (game_type, game_limit, token_invite)) return send_data({ 'game': token_game, 'invite': token_invite, })
def games(): from models import Game result = { 'games': { 'actives': [], 'ended': [], } } if request.user: games = Game.select().where( Game.date_end == None, (Game.player_white == request.user) | (Game.player_black == request.user), ) for game in games: if game.player_white == request.user: result['games']['actives'].append(game.white) else: result['games']['actives'].append(game.black) games = Game.select().where( Game.date_end != None, (Game.player_white == request.user) | (Game.player_black == request.user), ).limit(10) for game in games: if game.player_white == request.user: result['games']['ended'].append(game.white) else: result['games']['ended'].append(game.black) return send_data(result)
def post(self): message = ChatMessage.create(user=request.user, text=self.data['text']) result = {'message': MessageSerializer(message).calc()} send_ws(result, WS_CHAT_MESSAGE) return result
def post(self): username = self.data['username'] password = self.data['password'] email = self.data['email'] user = User.add(username, password, email) if email: token = user.get_verification() data = { 'username': username, 'url': urljoin(config.SITE_URL, config.VERIFY_URL), 'token': token, } send_mail_template('registration', [email], data=data) return 'registration successful'
def get(self, token): user = User.get_by_token(token) if user: user.verify() delete_cache(token) return 'verification completed' raise APINotFound('token')
def get(self): if request.user: return {'username': request.user.username} raise APIUnauthorized
def post(self): try: user = User.get(User.email == self.data['email']) except User.DoesNotExist: raise APINotFound('hey email') token = user.get_reset() data = { 'username': user.username, 'url': urljoin(config.SITE_URL, config.RECOVER_URL), 'token': token, } send_mail_template('reset', [user.email], data=data) return 'send recover email'
def get(self, token): user = User.get_by_token(token) if not user: raise APINotFound('token') return 'token is found'
def post(self, token): user = User.get_by_token(token) if not user: raise APINotFound('token') user.set_password(self.data['password']) user.save() delete_cache(token) return 'password changed'
def post(self): game_type = self.data['type'] game_limit = self.data['limit'] token = generate_token(True) pool = GamePool.create( player1 = token, user1 = request.user, type_game = game_type, time_limit = game_limit, ) set_cache('wait_{}'.format(token), (game_type, game_limit)) return {'game': token}
def get(self): from models import Game result = { 'games': { 'actives': [], 'ended': [], } } if request.user: games = Game.select().where( Game.date_end == None, (Game.player_white == request.user) | (Game.player_black == request.user), ) for game in games: if game.player_white == request.user: result['games']['actives'].append(game.white) else: result['games']['actives'].append(game.black) games = Game.select().where( Game.date_end != None, (Game.player_white == request.user) | (Game.player_black == request.user), ).limit(10) for game in games: if game.player_white == request.user: result['games']['ended'].append(game.white) else: result['games']['ended'].append(game.black) return result
def get(self, pk): ''' ?????? ''' user = request.user album = Album.query.filter_by(id=pk, user=user).first() if not album: msg = '?????' return HTTPResponse( HTTPResponse.HTTP_CODE_NOT_EXIST, message=msg).to_response() serializer = AlbumSerializer(album) return HTTPResponse( HTTPResponse.NORMAL_STATUS, data=serializer.data).to_response()
def delete(self, pk): ''' ?????? ''' user = request.user album = Album.query.filter_by(id=pk, user=user).first() if not album: msg = '?????' return HTTPResponse( HTTPResponse.HTTP_CODE_NOT_EXIST, message=msg).to_response() serializer = AlbumSerializer(album) album.delete() return HTTPResponse( HTTPResponse.NORMAL_STATUS, data=serializer.data).to_response()
def get(self, pk): ''' ???? ''' user = request.user image = Image.query.filter_by(id=pk, user=user).first() if not image: msg = '?????' return HTTPResponse( HTTPResponse.HTTP_CODE_NOT_EXIST, message=msg).to_response() serializer = ImageSerializer(image) return HTTPResponse( HTTPResponse.NORMAL_STATUS, data=serializer.data).to_response()
def preprocess_request(self): g.user = current_user request.user = current_user._get_current_object() if request.method == 'GET': request.data = request.args.to_dict() else: request.data = request.json if request.data is None: request.data = request.form.to_dict()
def session(cls, **kwargs) -> dict: return { 'username': request.user, 'last_login': flask_session.get('last_login', datetime.now().isoformat()) }
def get_allowed_actions(user, actions): # determine what actions are allowed here logging.debug('Requested actions: {}'.format(actions)) # The three actions used by the registry are 'push', 'pull', and '*': # https://github.com/docker/distribution/blob/master/registry/handlers/app.go#L875 allowed_actions = actions # allowed_actions = [] # if 'pull' in actions: # actions.remove('pull') logging.debug('Allowed actions: {}'.format(allowed_actions)) return allowed_actions
def getpassword(): user = request.user token = Token('password', subject=user) encoded_token = token.encode_token() logging.info('Issued registry password to {}'.format(user)) return jsonify(username='PASSTOKEN', password=encoded_token)