我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用jwt.InvalidTokenError()。
def decode_auth_token(auth_token): """ Validates the auth token :param auth_token: :return: integer|string """ try: payload = jwt.decode(auth_token, app.config.get('SECRET_KEY')) is_blacklisted_token = BlacklistToken.check_blacklist(auth_token) if is_blacklisted_token: return 'Token blacklisted. Please log in again.' else: return payload['sub'] except jwt.ExpiredSignatureError: return 'Signature expired. Please log in again.' except jwt.InvalidTokenError: return 'Invalid token. Please log in again.'
def _verify(auth_token, owner, public_key): """Verify Auth Token. :param auth_token: Authentication token to verify :param owner: dataset owner """ if not auth_token or not owner: return False try: token = jwt.decode(auth_token.encode('ascii'), public_key, algorithm='RS256') # TODO: check service in the future has_permission = True # has_permission = token.get('permissions', {}) \ # .get('datapackage-upload', False) # service = token.get('service') # has_permission = has_permission and service == 'os.datastore' has_permission = has_permission and owner == token.get('userid') return has_permission except jwt.InvalidTokenError: return False
def verify(auth_token, owner): """Verify Auth Token. :param auth_token: Authentication token to verify :param owner: dataset owner """ if not auth_token: return False if auth_token == 'testing-token' and owner == '__tests': return True try: token = jwt.decode(auth_token.encode('ascii'), public_key(), algorithm='RS256') has_permission = owner == token.get('userid') # TODO: Check service in the future # service = token.get('service') # has_permission = has_permission and service == 'world' # has_permission = has_permission and owner == token.get('userid') return has_permission except jwt.InvalidTokenError: return False
def get_user_id(auth_token): """Returns the user id from an Auth Token. :param auth_token: Authentication token to verify :returns user id """ if not auth_token: return None try: token = jwt.decode(auth_token.encode('ascii'), public_key(), algorithm='RS256') # TODO: Check service in the future # service = token.get('service') # if service == 'world': return token.get('userid') except jwt.InvalidTokenError: pass return None
def authjwt_method(token): """ an authentication method using rest_framework_jwt """ import jwt from rest_framework_jwt.authentication import (jwt_decode_handler, jwt_get_username_from_payload) try: payload = jwt_decode_handler(token) except (jwt.ExpiredSignature, jwt.DecodeError, jwt.InvalidTokenError): return None User = get_user_model() username = jwt_get_username_from_payload(payload) if not username: # pragma: no cover return None try: user = User.objects.get_by_natural_key(username) except User.DoesNotExist: # pragma: no cover return None return user
def post(self): """Check whether a token is authorized to access the client""" try: token = self.request.body_arguments['token'][0] except (KeyError, IndexError): raise exceptions.HTTPError(400, 'Token is required') try: grant = oauth2.get_grant(self.request, token=token) yield grant.verify_access(token) self.finish({'status': 200, 'has_access': True}) except oauth2.BadRequest as exc: raise exceptions.HTTPError(400, exc.args[0]) except oauth2.Unauthorized as exc: logging.error('Unauthorized: %s', exc.args[0]) self.finish({'status': 200, 'has_access': False}) except jwt.InvalidTokenError as exc: logging.error('Invaild token: %s', exc.args[0]) self.finish({'status': 200, 'has_access': False})
def post(self): """Return a token""" try: grant = oauth2.get_grant(self.request) except oauth2.InvalidGrantType: raise exceptions.HTTPError(400, 'invalid_grant') try: token, expiry = yield grant.generate_token() except (oauth2.InvalidScope, jwt.InvalidTokenError, ValueError) as exc: raise exceptions.HTTPError(400, exc.args[0]) except oauth2.Unauthorized as exc: raise exceptions.HTTPError(403, exc.args[0]) self.finish({ 'status': 200, 'access_token': token, 'token_type': 'bearer', 'expiry': expiry })
def authenticate(self, request): """ Returns a two-tuple of `User` and token if a valid signature has been supplied using JWT-based authentication. Otherwise returns `None`. """ jwt_value = self.get_jwt_value(request) if jwt_value is None: return None try: payload = jwt_decode_handler(jwt_value) except jwt.ExpiredSignature: msg = _('Signature has expired.') raise exceptions.AuthenticationFailed(msg) except jwt.DecodeError: msg = _('Error decoding signature.') raise exceptions.AuthenticationFailed(msg) except jwt.InvalidTokenError: raise exceptions.AuthenticationFailed() user = self.authenticate_credentials(payload, request.channel) return (user, jwt_value)
def decode_auth_token(token): """ Decoding the token to get the payload and then return the user Id in 'sub' :param token: Auth Token :return: """ try: payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms='HS256') is_token_blacklisted = BlackListToken.check_blacklist(token) if is_token_blacklisted: return 'Token was Blacklisted, Please login In' return payload['sub'] except jwt.ExpiredSignatureError: return 'Signature expired, Please sign in again' except jwt.InvalidTokenError: return 'Invalid token. Please sign in again'
def authenticate(self, request): jwt_value = self.get_jwt_value(request) if jwt_value is None: return None try: if api_settings.JWT_PERMANENT_TOKEN_AUTH: payload = jwt_devices_decode_handler(jwt_value) else: payload = jwt_decode_handler(jwt_value) except jwt.ExpiredSignature: msg = _("Signature has expired.") raise exceptions.AuthenticationFailed(msg) except jwt.DecodeError: msg = _("Error decoding signature.") raise exceptions.AuthenticationFailed(msg) except jwt.InvalidTokenError: raise exceptions.AuthenticationFailed() user = self.authenticate_credentials(payload) return user, jwt_value
def get_current_user(self): """ Overrides the built-in function to get the user id based on a JWT-token. If there is no token included in the Authorization header None is returned. In case the token is not valid an corresponding exception is raised. :return: The user id as a string or None if the uid couldn't be extracted :raises jwt.InvalidIssuedAtError: Raised if the IAT-claim is less than the last time the user password changed :raises jwt.ExpiredSignatureError: Raised if the EXT-claim is less than the current UNIX time :raises jwt.InvalidTokenError: Raised if the token is invalid for reasons other than the above mentioned """ auth_header = self.request.headers.get('Authorization') if auth_header is not None and auth_header.startswith("Bearer "): encoded_jwt_token = auth_header[7:] payload = TokenGenerator.decode_token(encoded_jwt_token) # TODO Should we check if payload["iat"] < last password change? if False: raise jwt.InvalidIssuedAtError return payload["uid"] return None
def prepare(self): """ Ensures that the caller is a validated user """ super().prepare() try: if self.current_user is None: self.send_error(401) # TODO Add details except jwt.InvalidIssuedAtError: log.error("The IAT-claim of the passed token is less than the last time the user password changed") self.send_error(401, summary="invalidated token") except jwt.ExpiredSignatureError: log.error("The EXT-claim of the passed token is less than the current UNIX time") self.send_error(401, summary="expired token") except jwt.InvalidTokenError: log.error("The passed token could not be validated") self.send_error(401, summary="invalid token")
def test_decode_failure_invalid_token(self): """ Verifies the function logs decode failures, and raises an InvalidTokenError if the token cannot be decoded """ # Create tokens using each invalid issuer and attempt to decode them against # the valid issuers list, which won't work with mock.patch('edx_rest_framework_extensions.utils.logger') as patched_log: with self.assertRaises(jwt.InvalidTokenError): # Attempt to decode an invalid token, which will fail with an InvalidTokenError utils.jwt_decode_handler("invalid.token") # Verify that the proper entries were written to the log file msg = "Token decode failed for issuer 'test-issuer-1'" patched_log.info.assert_any_call(msg, exc_info=True) msg = "Token decode failed for issuer 'test-issuer-2'" patched_log.info.assert_any_call(msg, exc_info=True) msg = "All combinations of JWT issuers and secret keys failed to validate the token." patched_log.error.assert_any_call(msg)
def _user_from_refresh_token(self, jwtstr: str, key_pemstr: str, expected_issuer: Optional[str]=None, expected_audience: Optional[str]=None) -> Optional[MNUser]: _log.debug("Received refresh token: %s", jwtstr) try: token = jwt.decode(jwtstr, key_pemstr, algorithms=["RS256"], leeway=10, issuer=expected_issuer, audience=expected_audience) except (jwt.ExpiredSignatureError, jwt.InvalidAlgorithmError, jwt.InvalidIssuerError, jwt.InvalidTokenError) as e: _log.warning("Rejected refresh token because of %s", str(e)) return None if "sub" not in token: _log.error("BUG? Valid refresh token without user in subject. %s", jwtstr) return None try: user = MNUser.objects.get(pk=token["sub"]) # type: MNUser except MNUser.DoesNotExist: _log.warning("No such user from valid JWT. %s", jwtstr) return None return user
def perform_create(self, serializer): jwt_string = self.request.POST['JWT'] try: payload = jwt.decode(jwt_string, base64.b64decode(settings.AUTH0_SECRET, '-_'), algorithms=['HS256'], audience=settings.AUTH0_CLIENT_ID) except jwt.InvalidTokenError: print("No/Bad JWT Token.") if User.objects.filter(email=payload['email']).exists(): return User.objects.filter(email=payload['email']) else: user = User(username=payload['email'], email=payload['email']) user.set_unusable_password() user.save() return user
def get_jwt_user(): try: token = jwt.request_callback() payload = jwt.jwt_decode_callback(token) user = jwt.identity_callback(payload) except InvalidTokenError: user = None except JWTError: user = None return user
def decode_auth_token(auth_token): """Decodes the auth token - :param auth_token: - :return: integer|string""" try: payload = jwt.decode( auth_token, current_app.config.get('SECRET_KEY')) return payload['sub'] except jwt.ExpiredSignatureError: return 'Signature expired. Please log in again.' except jwt.InvalidTokenError: return 'Invalid token. Please log in again.'
def verify(user_token): try: return jwt.decode(user_token, token, algorithm="HS256") except (KeyError, jwt.InvalidTokenError): return False
def create(): """Create blueprint. """ # Create instance blueprint = Blueprint('search', 'search') # Controller Proxies search_controller = controllers.search def search(kind='dataset'): token = request.headers.get('auth-token') or request.values.get('jwt') userid = None try: if token is not None: token = jwt.decode(token, PRIVATE_KEY) userid = token.get('userid') except jwt.InvalidTokenError: pass ret = search_controller(kind, userid, request.args) if ret is None: abort(400) return jsonpify(ret) # Register routes blueprint.add_url_rule( 'search', 'search', search, methods=['GET']) blueprint.add_url_rule( 'search/<kind>', 'events', search, methods=['GET']) # Return blueprint return blueprint
def authenticate(self, request): jwt_value = self.get_jwt_value(request) if jwt_value is None: return None, None try: payload = jwt_decode_handler(jwt_value) except jwt.ExpiredSignature: msg = _("Signature has expired.") raise AuthenticationFailed(msg) except jwt.DecodeError: msg = _("Error decoding signature.") raise AuthenticationFailed(msg) except jwt.InvalidTokenError: raise AuthenticationFailed() # Check blacklist self.check_blacklist(payload) user = self.authenticate_credentials(payload) # Check if password already change invalidated all old token self.check_changed_password_invalidated_old_token(user, payload) return user, jwt_value
def get_jwt_value(self, request): auth = get_authorization_header(request).split() auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower() if not auth or smart_text(auth[0].lower()) != auth_header_prefix: return None if len(auth) == 1: msg = _('Invalid Authorization header. No credentials provided.') raise exceptions.AuthenticationFailed(msg) elif len(auth) > 2: msg = _('Invalid Authorization header. Credentials string ' 'should contain no spaces.') raise exceptions.AuthenticationFailed(msg) jwt_value = auth[1] try: payload = jwt_decode_handler(jwt_value) except jwt.ExpiredSignature: msg = _('Signature has expired.') raise exceptions.AuthenticationFailed(msg) except jwt.DecodeError: msg = _('Error decoding signature.') raise exceptions.AuthenticationFailed(msg) except jwt.InvalidTokenError: raise exceptions.AuthenticationFailed() return payload
def _jwt_required(realm): """Does the actual work of verifying the JWT data in the current request. This is done automatically for you by `jwt_required()` but you could call it manually. Doing so would be useful in the context of optional JWT access in your APIs. :param realm: an optional realm """ token = _jwt.request_callback() if token is None: raise JWTError('Authorization Required', 'Request does not contain an access token', headers={'WWW-Authenticate': 'JWT realm="%s"' % realm}) try: payload = _jwt.jwt_decode_callback(token) except jwt.InvalidTokenError as e: raise JWTError('Invalid token', str(e)) identity = _jwt.identity_callback(payload) if identity is None: raise JWTError('Invalid JWT', 'User does not exist') _app_ctx_stack.top.current_identity = identity
def verify_token(token, secret, validate_expiration=True): try: token = token.encode('ascii') except UnicodeEncodeError: logger.exception('Invalid token - char encoding') return try: claim = jwt.decode(token, secret, options={'verify_exp': validate_expiration}) except jwt.InvalidTokenError: logger.exception('Invalid token') return None return claim
def extract_token(token): try: return jwt.decode(token, verify=False) except jwt.InvalidTokenError as e: raise ValueError('Invalid token %s' % e)
def get_current_user(self): try: return User( self.request.headers.get('Authorization') or self.get_query_argument('token', default=None), verify=False ) except jwt.InvalidTokenError: return User(None)
def get_current_user(self): try: return User( self.request.headers.get('Authorization') or self.get_query_argument('token', default=None) ) except jwt.InvalidTokenError: return User(None)
def decode_token(): """Decode the authorization token read from the request header.""" token = request.headers.get('Authorization') if token is None: return {} if token.startswith('Bearer '): _, token = token.split(' ', 1) pub_key = fetch_public_key(current_app) audiences = current_app.config.get('BAYESIAN_JWT_AUDIENCE').split(',') for aud in audiences: try: decoded_token = jwt.decode(token, pub_key, audience=aud) except jwt.InvalidTokenError: current_app.logger.error('Auth Token could not be decoded for audience {}'.format(aud)) decoded_token = None if decoded_token is not None: break if decoded_token is None: raise jwt.InvalidTokenError('Auth token audience cannot be verified.') return decoded_token
def is_token_valid(token): """ Check if token is valid. """ try: jwt.decode(token, 'secret', algorithms=JWTHelper.JWT_ALGORITHM) return True, "Valid" except jwt.ExpiredSignatureError: return False, "Token Expired" except jwt.InvalidTokenError: return False, "Token is Invalid"
def _decode_jwt_token(self, req): # Decodes the jwt token into a payload auth_header = req.get_header('Authorization') token = self.parse_auth_token_from_request(auth_header=auth_header) options = dict(('verify_' + claim, True) for claim in self.verify_claims) options.update( dict(('require_' + claim, True) for claim in self.required_claims) ) try: payload = jwt.decode(jwt=token, key=self.secret_key, options=options, algorithms=[self.algorithm], issuer=self.issuer, audience=self.audience, leeway=self.leeway) except InvalidTokenError as ex: raise falcon.HTTPUnauthorized( title='401 Unauthorized', description=str(ex), challenges=None) return payload
def valid_jwt(encoded_jwt, secret, algorithms=[]): if not algorithms: algorithms = [] try: jwt.decode(encoded_jwt, secret, algorithms=algorithms) return True except (jwt.InvalidTokenError, jwt.exceptions.InvalidKeyError, TypeError, ValueError, AttributeError) as e: logger.warning("Invalid JWT token", extra={ 'exception': str(e), 'jwt_token': encoded_jwt}) return False
def test_decode_raises_for_invalid_token_error(self): with pytest.raises(werkzeug.exceptions.Unauthorized): with mock.patch('jwt.decode', side_effect=jwt.InvalidTokenError): self.auth.decode_token(None)
def decode_token(self, token): try: decoded = jwt.decode(token, self.config['JWT_SECRET_KEY'], algorithms=[self.config['JWT_ALGORITHM']], leeway=self.config.get('JWT_LEEWAY', 0)) except jwt.ExpiredSignatureError: raise werkzeug.exceptions.Unauthorized("JWT Error: Token is expired.") except jwt.DecodeError: raise werkzeug.exceptions.Unauthorized("JWT Error: Token could not be decoded.") except jwt.InvalidTokenError: raise werkzeug.exceptions.Unauthorized("JWT Error: Token is invalid.") return decoded
def test_decode_failure(self): """ Verifies the function logs decode failures, and raises an InvalidTokenError if the token cannot be decoded """ # Create tokens using each invalid issuer and attempt to decode them against # the valid issuers list, which won't work with mock.patch('edx_rest_framework_extensions.utils.logger') as patched_log: with self.assertRaises(jwt.InvalidTokenError): self.payload['iss'] = 'invalid-issuer' signing_key = 'invalid-secret-key' # Generate a token using the invalid issuer data token = generate_jwt_token(self.payload, signing_key) # Attempt to decode the token against the entries in the valid issuers list, # which will fail with an InvalidTokenError utils.jwt_decode_handler(token) # Verify that the proper entries were written to the log file msg = "Token decode failed for issuer 'test-issuer-1'" patched_log.info.assert_any_call(msg, exc_info=True) msg = "Token decode failed for issuer 'test-issuer-2'" patched_log.info.assert_any_call(msg, exc_info=True) msg = "All combinations of JWT issuers and secret keys failed to validate the token." patched_log.error.assert_any_call(msg)
def decode_token(token): """Decode the access token from the Authorization header.""" try: payload = jwt.decode(token, current_app.config.get('SECRET')) return payload['sub'] except jwt.ExpiredSignatureError: return "Expired token. Please log in to get a new token" except jwt.InvalidTokenError: return "Invalid token. Please register or login"
def _set_error_handler_callbacks(self, app): """ Sets the error handler callbacks used by this extension """ @app.errorhandler(NoAuthorizationError) def handle_auth_error(e): return self._unauthorized_callback(str(e)) @app.errorhandler(CSRFError) def handle_auth_error(e): return self._unauthorized_callback(str(e)) @app.errorhandler(ExpiredSignatureError) def handle_expired_error(e): return self._expired_token_callback() @app.errorhandler(InvalidHeaderError) def handle_invalid_header_error(e): return self._invalid_token_callback(str(e)) @app.errorhandler(InvalidTokenError) def handle_invalid_token_error(e): return self._invalid_token_callback(str(e)) @app.errorhandler(JWTDecodeError) def handle_jwt_decode_error(e): return self._invalid_token_callback(str(e)) @app.errorhandler(WrongTokenError) def handle_wrong_token_error(e): return self._invalid_token_callback(str(e)) @app.errorhandler(RevokedTokenError) def handle_revoked_token_error(e): return self._revoked_token_callback() @app.errorhandler(FreshTokenRequired) def handle_fresh_token_required(e): return self._needs_fresh_token_callback() @app.errorhandler(UserLoadError) def handler_user_load_error(e): # The identity is already saved before this exception was raised, # otherwise a different exception would be raised, which is why we # can safely call get_jwt_identity() here identity = get_jwt_identity() return self._user_loader_error_callback(identity) @app.errorhandler(UserClaimsVerificationError) def handle_failed_user_claims_verification(e): return self._claims_verification_failed_callback()
def verify(self, signer_requirements, path, sgn_value): """Verify the authenticity of the incoming data :param signer_requirements: what we expect of the key; right now, the CN of the signer :param path: path where the JSON object was found :param sgn_value: The datastructure found at that path :return: confirmed original value from JWT token. :raise A JWTSigningFailed is raised if the verification fails. """ if sgn_value is None: raise JWTSigningFailed( _("Invalid empty value at path %s") % (path)) try: # Load the certificate and verify that it is both a suitable # certificate for this key and one we trust the origin of vcert_str = sgn_value.get("certificate", "") # ("" is an invalid key) # TODO(ijw): why? vcert_str = vcert_str.encode('ascii', 'ignore') # TODO(ijw): how does this fail? vcert_obj = load_pem_x509_certificate( vcert_str, default_backend()) vpublic_key = vcert_obj.public_key() self._check_node_name(signer_requirements, vcert_obj) # TODO(ijw): what checks the cert is signed with the CA? self._verify_certificate(vcert_str) # Unpack the JWT to its raw data jwtok = sgn_value.get("jwt", "") # ("" is an invalid token) dval = jwt.decode(jwtok, vpublic_key, algorithm='RS256') # Check the ancillary tags of the raw data self._check_path(dval, path) # TODO(ijw): check delta # Get and return the originally provided value return dval["value"] except jwt.InvalidTokenError: raise JWTSigningFailed(_("InvalidTokenError: path :%s") % path)