我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用flask.request.authorization()。
def requires_auth(function): """ Bind against LDAP to validate users credentials """ @wraps(function) def decorated(*args, **kwargs): logger = logging.getLogger("ca_server") auth = request.authorization ldap = LdapClient() if not auth or not ldap.check_auth(auth.username, auth.password): output = DataBag() output.set_error("Access denied") if hasattr(auth, "username"): logger.info("requires_auth: access denied {}".format(auth.username)) else: logger.info("requires_auth: access denied") return output.get_json() return function(auth.username) return decorated
def check_auth(): session = None user = None token = request.headers.get('X-Auth-Token') if token: session = Session.query.filter_by(token=token).first() if not session: return make_error_response('Invalid session token', 401) user = session.user else: auth = request.authorization if auth: user = User.find_by_email_or_username(auth.username) if not (user and user.password == auth.password): return make_error_response('Invalid username/password combination', 401) g.current_session = session g.current_user = user
def add_basic_auth(blueprint, username, password, realm='RQ Scheduler Dashboard'): """Add HTTP Basic Auth to a blueprint. Note this is only for casual use! """ @blueprint.before_request def basic_http_auth(*args, **kwargs): auth = request.authorization if ( auth is None or auth.password != password or auth.username != username): return Response( 'Please login', 401, {'WWW-Authenticate': 'Basic realm="{}"'.format(realm)})
def _template_rendering(template): def decorator(fn): @wraps(fn) def inner_fn(*args, **kwargs): data = fn(*args, **kwargs) auth = request.authorization basic_auth = '' if not auth else base64.b64encode(bytes(':'.join([auth.username, auth.password]), 'utf-8')).decode('utf-8') data.update({ 'basic_auth': basic_auth, 'base_url': g.cn.g_('app_config').get('base_url'), 'ecs_clusters': g.cn.f_('aws.get_ecs_clusters', region=g.cn.g_('app_config').get('ecs_region')), 'selected_ecs_cluster': g.cn.g_('session').get('selected_ecs_cluster') }) return render_template(template, **data) return inner_fn return decorator
def login_required(self, f): @wraps(f) def decorated(*args, **kwargs): auth = request.authorization # We need to ignore authentication headers for OPTIONS to avoid # unwanted interactions with CORS. # Chrome and Firefox issue a preflight OPTIONS request to check # Access-Control-* headers, and will fail if it returns 401. if request.method != 'OPTIONS': if auth: password = self.get_password_callback(auth.username) else: password = None if not self.authenticate(auth, password): return self.auth_error_callback() return f(*args, **kwargs) return decorated
def require_token(func): """ verifies the uuid/token combo of the given account. account type can be: customer, fox, merchant """ @wraps(func) def decorator(*args, **kwargs): if request.authorization: uuid = request.authorization.username token = request.authorization.password try: manager = SessionManager() valid = manager.verify(uuid, token) if not valid: return UnauthorizedResponseJson().make_response() except Exception as e: traceback.print_exc() return ExceptionResponseJson("unable to validate credentials", e).make_response() else: return UnauthorizedResponseJson().make_response() return func(*args, **kwargs) return decorator
def require_password(func): """ verifies the given username/password combo """ @wraps(func) def decorator(*args, **kwargs): if request.authorization: username = request.authorization.username password = request.authorization.password try: manager = AccountManager() valid = manager.verify_account(username, password) if not valid: return UnauthorizedResponseJson().make_response() except Exception as e: traceback.print_exc() return ExceptionResponseJson("unable to validate credentials", e).make_response() else: return UnauthorizedResponseJson().make_response() return func(*args, **kwargs) return decorator
def requires_admin(f): # pragma: no cover """Decorator to define endpoints that requires Basic Auth. Args: f (func): An route function. Returns: response (object): 401 if unauthorized. f (func): The route to call. """ @wraps(f) def decorated(*args, **kwargs): auth = request.authorization if not auth or not check_auth(auth.username, auth.password): return authenticate() return f(*args, **kwargs) return decorated
def basic_authenticate(f): @wraps(f) def decorated(*args, **kwargs): if not current_app.config.get("USE_AUTH"): return f(*args, **kwargs) if not getattr(f, 'basic_authenticate', True): return f(*args, **kwargs) auth = request.authorization if auth and auth.username and auth.password != '': password = current_app.config.get("{}_PASSWORD".format(auth.username.upper())) if auth.password == password: role = AUTH_ROLES[auth.username] logging.debug("Authenticated '{}' with role '{}' via basic auth".format(auth.username, role)) current_app.auth_role = role return f(*args, **kwargs) return abort(401) return decorated
def put(self): """ Queue the specific pipeline """ data = request.get_json(force=True) config = data.get('config') user = auth_get_username(request.authorization, data.get('user')) errors = None # Pipeline.validate_config(config, user) if not errors: config = Pipeline.load_cfg(config) # Get id from DB db_info = dbmodel.PipelineDb(config['name'], config, Pipeline.ordered_steps(config), user) config['run_id'] = db_info.run_id ut.pretty_print("Submitting pipeline %s (ID %d) for user %s" % (config['label'], config['run_id'], user)) return pm.add_pipeline(config, user) else: return errors, 400
def get_blender_id_oauth_token() -> str: """Returns the Blender ID auth token, or an empty string if there is none.""" from flask import request token = session.get('blender_id_oauth_token') if token: if isinstance(token, (tuple, list)): # In a past version of Pillar we accidentally stored tuples in the session. # Such sessions should be actively fixed. # TODO(anyone, after 2017-12-01): refactor this if-block so that it just converts # the token value to a string and use that instead. token = token[0] session['blender_id_oauth_token'] = token return token if request.authorization and request.authorization.username: return request.authorization.username if current_user.is_authenticated and current_user.id: return current_user.id return ''
def requires_basic_auth(f): """ wrapper function to check authentication credentials are valid """ @wraps(f) def decorated(*args, **kwargs): # check credentials supplied in the http request are valid auth = request.authorization if not auth: return jsonify(message="Missing credentials"), 401 if (auth.username != settings.config.api_user or auth.password != settings.config.api_password): return jsonify(message="username/password mismatch with the " "configuration file"), 401 return f(*args, **kwargs) return decorated
def get(self, id_, type_): """GET object with id = id_ from the database.""" if get_authentication(): if request.authorization is None: return failed_authentication() else: auth = check_authorization(request, get_session()) if auth is False: return failed_authentication() class_type = get_doc().collections[type_]["collection"].class_.title if checkClassOp(class_type, "GET"): try: response = crud.get(id_, class_type, api_name=get_api_name(), session=get_session()) return set_response_headers(jsonify(hydrafy(response))) except Exception as e: status_code, message = e.get_HTTP() return set_response_headers(jsonify(message), status_code=status_code) abort(405)
def delete(self, id_, type_): """Delete object with id=id_ from database.""" if get_authentication(): if request.authorization is None: return failed_authentication() else: auth = check_authorization(request, get_session()) if auth is False: return failed_authentication() class_type = get_doc().collections[type_]["collection"].class_.title if checkClassOp(class_type, "DELETE"): try: crud.delete(id_, class_type, session=get_session()) response = {"message": "Object with ID %s successfully deleted" % (id_)} return set_response_headers(jsonify(response)) except Exception as e: status_code, message = e.get_HTTP() return set_response_headers(jsonify(message), status_code=status_code) abort(405)
def delete(self, type_): """Delete a non Collection class item.""" if get_authentication(): if request.authorization is None: return failed_authentication() else: auth = check_authorization(request, get_session()) if auth is False: return failed_authentication() if checkEndpoint("DELETE", type_): # No Delete Operation for collections if type_ in get_doc().parsed_classes and type_+"Collection" not in get_doc().collections: try: crud.delete_single(type_, session=get_session()) response = {"message": "Object successfully deleted"} return set_response_headers(jsonify(response)) except Exception as e: status_code, message = e.get_HTTP() return set_response_headers(jsonify(message), status_code=status_code) abort(405)
def requires_auth(f): @wraps(f) def decorated(*args, **kwargs): auth = request.authorization if not auth or not check_auth(auth.username, auth.password): return authenticate() return f(*args, **kwargs) return decorated
def require(self, users=(), roles=(), test_auth=None, test_method=None): """ Authenticates/authorizes a request based on the content of the request.authorization parameter. users -- users permitted to call this method roles -- roles permitted to call this method test_auth -- credentials only for testing test_method -- method only for testing """ users = self._process_targets(users) roles = self._process_targets(roles) def loaded_decorated(f): @wraps(f) def decorated(*args, **kwargs): auth = test_auth or request.authorization method = test_method.upper() if test_method else request.method authenticated = auth and (auth.username in self.users) and \ self.users[auth.username] == auth.password if not authenticated: return self.no_authentication() allowed_users = users[method] if users else None allowed_roles = roles[method] if roles else None if allowed_users or allowed_roles: auth_as_user = auth.username in allowed_users auth_as_role = allowed_roles & self.roles[auth.username] if not auth_as_user and not auth_as_role: return self.no_authorization() return f(*args, **kwargs) return decorated return loaded_decorated
def requires_auth(f): @wraps(f) def decorated(*args, **kwargs): auth = request.authorization if not auth or not check_auth(auth.username, auth.password): if _globals.get('test'): return f(*args, **kwargs) return authenticate() return f(*args, **kwargs) return decorated
def requires_auth(f): """Wrapper function.""" @wraps(f) def decorated(*args, **kwargs): auth = request.authorization if not auth or not check_auth(auth.username, auth.password): return authenticate() return f(*args, **kwargs) return decorated
def authed(func: Callable[[], str]) -> Callable[[], Union[Response, str]]: """ Given a function returns one that requires basic auth """ @wraps(func) def decorator(): auth = request.authorization if auth and validAuth(auth.username, auth.password): return func() return authFailure() return decorator
def login_required(func): """ desc: ??????? """ @wraps(func) def _decorator_func(*args, **kwargs): if request.authorization is None: content_type = request.headers["Content-Type"] if "application/x-www-form-urlencoded" in content_type: data = request.form.to_dict() elif "application/json" in content_type: data = request.get_json() elif "multipart/form-data" in content_type: data = request.get_json() if data is None: data = request.form.to_dict() else: raise error_handlers.BadToken(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"????"})) if not isinstance(data, dict): raise error_handlers.BadToken(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"???json????"})) token = data.get("token", None) if token is None: raise error_handlers.MissToken( http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"???????token"}) ) else: token = request.authorization["username"] g.user = User.verify_auth_token(token) # ???? identity = g.cache.get(token) if identity is not None: g.identity = pickle.loads(identity) return func(*args, **kwargs) return _decorator_func
def api_generic(request, request_processor, app, resource): req = {} req['method'] = request.method req['resource'] = resource req['json'] = request.json req['params'] = {} params = {} for k, v in request.args.iteritems(): try: in_json = json.loads(v) if type(in_json) is dict: params[k] = in_json else: params[k] = str(in_json) except: params[k] = v req['params'] = params auth_form = {} if request.authorization: auth_form['user'] = request.authorization.username auth_form['password'] = request.authorization.password else: if 'X-User' in request.headers: auth_form['user'] = request.headers['X-User'] if 'X-Token' in request.headers: auth_form['token'] = request.headers['X-Token'] req['auth_form'] = auth_form response = request_processor(app, req) if 'errors' in response: return jsonify({'errors': response['errors']}), 400 elif request.method == 'GET': return jsonify({resource: response['json']}), response['status'] return jsonify(response['json']), response['status']
def requires_auth(f): @wraps(f) def decorated(*args, **kwargs): auth = request.authorization if not auth or not check_auth(auth.username, auth.password): return authenticate() return f(*args, **kwargs) return decorated # import the user created module
def requires_auth(f): @wraps(f) def decorated(*args, **kwargs): auth = request.authorization api_key = request.headers.get("API_KEY") or request.args.get("API_KEY") if (API_KEY is not None) and api_key == API_KEY: return f(*args, **kwargs) if (AUTH_USER is not None) and (not auth or not check_auth( auth.username, auth.password)): return authenticate() return f(*args, **kwargs) return decorated
def username(self): if not request.authorization: return "" return request.authorization.username
def token(): if request.authorization is not None: username = request.authorization.get('username', None) passwd = request.authorization.get('password', None) if username is not None and passwd is not None: user = User.query.filter(User.username_iequal(username)).first() if user is None or user.deleted: pass elif not user.active: raise APIError("User '{0}' is blocked".format(username), 403) elif user.verify_password(passwd): return jsonify({'status': 'OK', 'token': user.get_token()}) raise APIError('Username or password invalid', 401) raise APIError('You are not authorized to access the resource', 401)
def requires_auth(f): @wraps(f) def decorated(*args, **kwargs): logger.info("{0} {1} {2} {3}".format(request.remote_addr, request.method, request.url, str(request.args))) logger.debug("data: {0}".format(str(request.form))) auth = request.authorization logger.debug('Check_auth: ' + str(auth)) if not auth or not check_auth(auth.username, auth.password): logger.warning("Unauthorized.") return {'error' : 'Unauthorized.'}, 401 return f(*args, **kwargs) return decorated
def requires_authentication(function): """Creates a decorator that can be applied with @requires_authentication to protect an API endpoint.""" @wraps(function) def decorated(*args, **kwargs): """Checks for authorization headers. Tells the user to authenticate if none are found.""" auth = request.authorization if not auth or not check_authentication(auth.username, auth.password): return authenticate() return function(*args, **kwargs) return decorated
def require_auth_header(func): """ no auth check is performed but the auth headers are still required. auth check is performed by a service other than ourselves """ @wraps(func) def decorator(*args, **kwargs): if not request.authorization: return ErrorResponseJson("auth header required").make_response() return func(*args, **kwargs) return decorator
def after_request(response): """ called after every request """ # log the endpoint hit and any errors delta = int((time.time() - g.start_time) * 1000) start_utc = datetime.datetime.utcfromtimestamp(g.start_time) username = request.authorization.username if request.authorization else None err_msg = response.get_data(as_text=True) if response.status_code // 100 >= 4 else None Logger.endpoint_hit(start_utc, delta, request.base_url, username, request.method, response.status_code, err_msg) return response
def verify_user(self, require_admin=True, require_credentials=True): auth = request.authorization if not auth: return False username = auth.username password = auth.password user = self._mongo.db['users'].find_one({'username': username}) if not user: return False result = False ip = get_ip() if not require_credentials: result = self._verify_user_by_token(user, password, ip) if not result and self._is_blocked_temporarily(username): return False if not result: result = _verify_user_by_credentials(user, password) if not result: self._add_block_entry(username) return False if not require_admin or user['is_admin']: return True return False
def _is_blocked_temporarily(self, username): num_login_attempts = self._config.defaults['authorization']['num_login_attempts'] block_for_seconds = self._config.defaults['authorization']['block_for_seconds'] self._mongo.db['block_entries'].delete_many({'timestamp': {'$lt': time() - block_for_seconds}}) block_entries = list(self._mongo.db['block_entries'].find({'username': username})) if len(block_entries) > num_login_attempts: return True return False
def issue_token(self): salt = urandom(16) kdf = _kdf(salt) token = generate_secret() username = request.authorization.username ip = get_ip() self._mongo.db['tokens'].insert_one({ 'username': username, 'ip': ip, 'salt': salt, 'token': kdf.derive(token.encode('utf-8')), 'timestamp': time() }) return token
def _verify_user_by_token(self, user, token, ip): tokens_valid_for_seconds = self._config.defaults['authorization']['tokens_valid_for_seconds'] self._mongo.db['tokens'].delete_many({'timestamp': {'$lt': time() - tokens_valid_for_seconds}}) cursor = self._mongo.db['tokens'].find( {'username': user['username'], 'ip': ip}, {'token': 1, 'salt': 1} ) for c in cursor: try: kdf = _kdf(c['salt']) kdf.verify(token.encode('utf-8'), c['token']) return True except: pass return False
def getCurrentUser(request): auth = request.authorization if not auth: return None token = auth.username return User.verify_auth_token(token)
def requires_passcode(func): @functools.wraps(func) def wrapper(*args, **kwargs): if config.get('web_passcode') and not session.get('allowed') and ( not request.authorization or request.authorization.username != config['web_passcode']): return redirect(url_for('login')) return func(*args, **kwargs) return wrapper
def ensure_admin_authenticated(): auth = request.authorization if not auth or auth.username != config.admin_username or auth.password != config.admin_password: return Response('401 Unauthorized', 401, { 'WWW-Authenticate': 'Basic realm="Login Required"' })
def requires_auth(f): @wraps(f) def decorated(*args, **kwargs): sl = SettingLoader() settings = sl.settings if settings.rest_api.password_protected: auth = request.authorization if not auth or not check_auth(auth.username, auth.password): return authenticate() return f(*args, **kwargs) return decorated
def requires_auth(f): @wraps(f) def decorated(*args, **kwargs): auth = request.authorization if not auth or not check_auth(auth.username, auth.password): return authenticate() return f(*args, **kwargs) return decorated #####################