我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用jwt.DecodeError()。
def logout(request): """ Logout a user """ try: token = request.environ['HTTP_X_API_TOKEN'] except (KeyError, IndexError, TypeError): raise BadRequest('Missing HTTP X-Api-Token header') try: data = jwt.decode(token, settings.SECRET_KEY) data = json.loads(CRYPTO.decrypt(str(data['data']))) user = User.objects.get(id=data['id']) user.last_login = datetime.fromtimestamp(0) user.save() return {'message': 'Logged out'} except (utils.CryptoException, KeyError, jwt.DecodeError, jwt.ExpiredSignature, User.DoesNotExist): raise BadRequest('Invalid token')
def authenticate(): # logger.debug("endpoint request: %s" % request.endpoint) if re.search('tenant_provisioned', str(request.endpoint)): g.user = "phone_home" logger.info("Authentication bypassed: tenant_provisioned") return try: decoded = jwt.decode(request.headers['X-Auth-Token'], credentials['tenant_secret'], algorithms=['HS256']) g.user = decoded['user'] except KeyError: logger.error("Error: key error.") abort(401) except jwt.DecodeError: logger.error("Error: decode error") abort(401)
def authjwt_method(token): """ an authentication method using rest_framework_jwt """ import jwt from rest_framework_jwt.authentication import (jwt_decode_handler, jwt_get_username_from_payload) try: payload = jwt_decode_handler(token) except (jwt.ExpiredSignature, jwt.DecodeError, jwt.InvalidTokenError): return None User = get_user_model() username = jwt_get_username_from_payload(payload) if not username: # pragma: no cover return None try: user = User.objects.get_by_natural_key(username) except User.DoesNotExist: # pragma: no cover return None return user
def authenticate(self, request): """ Returns a two-tuple of `User` and token if a valid signature has been supplied using JWT-based authentication. Otherwise returns `None`. """ jwt_value = self.get_jwt_value(request) if jwt_value is None: return None try: payload = jwt_decode_handler(jwt_value) except jwt.ExpiredSignature: msg = _('Signature has expired.') raise exceptions.AuthenticationFailed(msg) except jwt.DecodeError: msg = _('Error decoding signature.') raise exceptions.AuthenticationFailed(msg) except jwt.InvalidTokenError: raise exceptions.AuthenticationFailed() user = self.authenticate_credentials(payload, request.channel) return (user, jwt_value)
def decode_jwt(token, options=None): """ Authenticate via JSON Web Token Options: `secret`, `algorithms` """ options = options or {} secret = options.get('secret', CONF and CONF.secret) algorithms = options.get('algorithms', CONF and CONF.algorithms) try: data = jwt.decode(token, secret, algorithms) except jwt.DecodeError: raise Exception('Cannot decode token') else: logging.debug('* Decoded data = {}'.format(data)) return data
def authenticate(token): """ Tries to authenticate user based on the supplied token. It also checks the token structure and validity. Based on jwt_auth.JSONWebTokenAuthMixin.authenticate """ try: payload = jwt_decode_handler(token) except jwt.ExpiredSignature: msg = 'Signature has expired.' raise exceptions.AuthenticationFailed(msg) except jwt.DecodeError: msg = 'Error decoding signature.' raise exceptions.AuthenticationFailed(msg) user = authenticate_credentials(payload) return user
def token_required(secret_key): def token_required_decorator(f): @wraps(f) def decorated_function(*args, **kwargs): g = f.func_globals if not request.headers.get('Authorization'): return Response(response="Missing authorization header", status=401) try: payload = parse_token(request.headers.get('Authorization').split()[1], secret_key) except jwt.DecodeError: return Response(response="Token is invalid", status=401) except jwt.ExpiredSignature: return Response(response="Token has expired", status=401) # Set username for decorated func g["username"] = payload['sub'] return f(*args, **kwargs) return decorated_function return token_required_decorator
def authenticate(self, auth_token): try: token_payload = jwt.decode(auth_token, secret) username = token_payload['username'] self.username = username self.authenticated = True self.auth_failures = 0 self.send_json(action='AUTH OK') # If we are the first websocket connecting on behalf of # a given user, subscribe to the feed for that user if len(WebSocket.sockets[username]) == 0: WebSocket.subscribe(username) WebSocket.sockets[username].add(self) except jwt.DecodeError: self.send_json(action='AUTH FAILED') except jwt.ExpiredSignatureError: self.send_json(action='AUTH FAILED')
def authenticate(self, request): jwt_value = self.get_jwt_value(request) if jwt_value is None: return None try: if api_settings.JWT_PERMANENT_TOKEN_AUTH: payload = jwt_devices_decode_handler(jwt_value) else: payload = jwt_decode_handler(jwt_value) except jwt.ExpiredSignature: msg = _("Signature has expired.") raise exceptions.AuthenticationFailed(msg) except jwt.DecodeError: msg = _("Error decoding signature.") raise exceptions.AuthenticationFailed(msg) except jwt.InvalidTokenError: raise exceptions.AuthenticationFailed() user = self.authenticate_credentials(payload) return user, jwt_value
def get_certificate(self, kid): # retrieve keys from jwks_url resp = self.request(self.jwks_url(), method='GET') resp.raise_for_status() # find the proper key for the kid for key in resp.json()['keys']: if key['kid'] == kid: x5c = key['x5c'][0] break else: raise DecodeError('Cannot find kid={}'.format(kid)) certificate = '-----BEGIN CERTIFICATE-----\n' \ '{}\n' \ '-----END CERTIFICATE-----'.format(x5c) return load_pem_x509_certificate(certificate.encode(), default_backend())
def user_data(self, access_token, *args, **kwargs): response = kwargs.get('response') id_token = response.get('id_token') # decode the JWT header as JSON dict jwt_header = json.loads( base64.b64decode(id_token.split('.', 1)[0]).decode() ) # get key id and algorithm key_id = jwt_header['kid'] algorithm = jwt_header['alg'] try: # retrieve certificate for key_id certificate = self.get_certificate(key_id) return jwt_decode( id_token, key=certificate.public_key(), algorithms=algorithm, audience=self.setting('SOCIAL_AUTH_AZUREAD_OAUTH2_KEY') ) except (DecodeError, ExpiredSignature) as error: raise AuthTokenError(self, error)
def decode_token(token, secret='', ttl=DEFAULT_TTL, verify=True): try: token = jwt.decode(str(token), secret, verify=verify) except jwt.DecodeError as e: raise TokenInvalid("error decoding JSON Web Token", e) if verify: issue_time = token.get('issuedAt') if issue_time is None: raise TokenInvalid("'issuedAt' is missing from token") issue_time = isodate.parse_datetime(issue_time) expiry_time = issue_time + datetime.timedelta(seconds=ttl) if issue_time > _now(): raise TokenInvalid("token is not yet valid") if expiry_time < _now(): raise TokenInvalid("token has expired") return token
def verify_token(): """ Verify if the token is valid, not expired and not blacklisted """ if 'Authorization' in request.headers: if request.headers['Authorization'] in cache.blacklisted_tokens: abort(403, 'Error: invalid token') try: payload = jwt.decode(request.headers['Authorization'], config.SECRET_KEY) g.current_user = payload['id_user'] except jwt.ExpiredSignatureError: abort(403, 'Error: token expired') except jwt.DecodeError: abort(403, 'Error: invalid token')
def verify_token(cls, user, token): try: data = jwt.decode(token, str(user.id) + SECRET_CODE) except (jwt.ExpiredSignatureError, jwt.DecodeError, AttributeError): return False else: if data["username"] == user.username: return True else: return False
def decode_token(self, token): try: return jwt.decode(token, key=self.secret, algorithms=self.algorithm) except jwt.DecodeError: return False
def azure_ad_authorized(): response = azure_ad.authorized_response() print response if response is None: flask.flash('You denied the request to sign in.') return flask.redirect(util.get_next_url) id_token = response['id_token'] flask.session['oauth_token'] = (id_token, '') try: decoded_id_token = jwt.decode(id_token, verify=False) except (jwt.DecodeError, jwt.ExpiredSignature): flask.flash('You denied the request to sign in.') return flask.redirect(util.get_next_url) user_db = retrieve_user_from_azure_ad(decoded_id_token) return auth.signin_user_db(user_db)
def decode_token(token): """ Get the organisation ID from a token :param token: a JSON Web Token :returns: str, organisation ID :raises: jwt.DecodeError: Invalid token or not signed with our key jwt.ExpiredSignatureError: Token has expired jwt.InvalidAudienceError: Invalid "aud" claim jwt.InvalidIssuerError: Invalid "iss" claim jwt.MissingRequiredClaimError: Missing a required claim """ cert_file = getattr(options, 'ssl_cert', None) or LOCALHOST_CRT with open(cert_file) as f: cert = load_pem_x509_certificate(f.read(), default_backend()) public_key = cert.public_key() payload = jwt.decode(token, public_key, audience=audience(), issuer=issuer(), algorithms=[ALGORITHM], verify=True) if not payload.get('sub'): raise jwt.MissingRequiredClaimError('"sub" claim is required') payload['scope'] = Scope(payload['scope']) return payload
def test_decode_token_different_public_key(): token, expiry = generate_token(CLIENT, SCOPE, 'grant_type') with patch.object(_token, 'LOCALHOST_CRT', koi.CLIENT_CRT): with pytest.raises(jwt.DecodeError): decode_token(token)
def test_invalid_assertion(self): self.request.body_arguments['assertion'] = ['invalid_token'] grant = grants.AuthorizeDelegate(self.request) with pytest.raises(jwt.DecodeError): yield grant.generate_token()
def authenticate(self, request): auth = get_authorization_header(request).split() auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower() if not auth or smart_text(auth[0].lower()) != auth_header_prefix: raise exceptions.AuthenticationFailed() if len(auth) == 1: msg = _("Invalid Authorization header. No credentials provided.") raise exceptions.AuthenticationFailed(msg) elif len(auth) > 2: msg = _("Invalid Authorization header. Credentials string should not contain spaces.") raise exceptions.AuthenticationFailed(msg) try: payload = jwt_decode_handler(auth[1]) except jwt.ExpiredSignature: msg = _("Signature has expired.") raise exceptions.AuthenticationFailed(msg) except jwt.DecodeError: msg = _("Error decoding signature.") raise exceptions.AuthenticationFailed(msg) user = self.authenticate_credentials(payload) return (user, auth[1])
def authenticate(self, request): jwt_value = self.get_jwt_value(request) if jwt_value is None: return None, None try: payload = jwt_decode_handler(jwt_value) except jwt.ExpiredSignature: msg = _("Signature has expired.") raise AuthenticationFailed(msg) except jwt.DecodeError: msg = _("Error decoding signature.") raise AuthenticationFailed(msg) except jwt.InvalidTokenError: raise AuthenticationFailed() # Check blacklist self.check_blacklist(payload) user = self.authenticate_credentials(payload) # Check if password already change invalidated all old token self.check_changed_password_invalidated_old_token(user, payload) return user, jwt_value
def _check_payload(token): # Check payload valid try: payload = jwt_decode_handler(token) except jwt.ExpiredSignature: msg = _("Signature has expired.") raise forms.ValidationError(msg) except jwt.DecodeError: msg = _("Error decoding signature.") raise forms.ValidationError(msg) return payload
def verify_jwt(auth_header, secret): """Extract the jwt token from the header, verify its signature, its expiration time, and return the payload.""" if not auth_header or auth_header == 'null': logging.warning("No Authorization header") return [None, "Unauthorized access: missing authentication"] method,token = auth_header.split() # separate 'JWT' from the jwt itself token = bytes(token, 'utf-8') try: payload = jwt.decode(token, secret, algorithms=['HS256'], verify=True) except (jwt.ExpiredSignatureError, jwt.DecodeError) as err: return [None, str(err)] return [payload, '']
def get_jwt_value(self, request): auth = get_authorization_header(request).split() auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower() if not auth or smart_text(auth[0].lower()) != auth_header_prefix: return None if len(auth) == 1: msg = _('Invalid Authorization header. No credentials provided.') raise exceptions.AuthenticationFailed(msg) elif len(auth) > 2: msg = _('Invalid Authorization header. Credentials string ' 'should contain no spaces.') raise exceptions.AuthenticationFailed(msg) jwt_value = auth[1] try: payload = jwt_decode_handler(jwt_value) except jwt.ExpiredSignature: msg = _('Signature has expired.') raise exceptions.AuthenticationFailed(msg) except jwt.DecodeError: msg = _('Error decoding signature.') raise exceptions.AuthenticationFailed(msg) except jwt.InvalidTokenError: raise exceptions.AuthenticationFailed() return payload
def test_public_key(self): """ Test when the public key is provided to deserialize """ token = scitokens.SciToken(key = self._private_key) serialized_token = token.serialize(issuer = "local") new_token = scitokens.SciToken.deserialize(serialized_token, public_key = self._public_pem, insecure = True) self.assertIsInstance(new_token, scitokens.SciToken) # With invalid key with self.assertRaises(ValueError): scitokens.SciToken.deserialize(serialized_token, insecure=True, public_key = "asdf".encode()) # With a proper key, but not the right one private_key = generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) public_key = private_key.public_key() pem = public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) with self.assertRaises(DecodeError): scitokens.SciToken.deserialize(serialized_token, insecure=True, public_key = pem)
def on_join(data): if current_app.config["AUTH"] == Config.NONE: user = User("Gandalf", superadmin=True) else: token = data.get('jwt') if not token: disconnect() return try: payload = LoginService.parse_api_token_direct(token) except DecodeError: disconnect() return except ExpiredSignature: disconnect() return user = User.query.filter_by(username=payload["username"]).scalar() printers = user.get_accessible_printers() for printer in printers: join_room(str(printer.id)) datatype = { 'id': fields.Integer, 'name': fields.String, 'group': fields.List( fields.Nested({ 'name': fields.String }) ) } emit("printers", marshal(printers, datatype))
def login_required(f): """ Decorator function for routes Checks Authorization header, token validity and injects user into flask global variable g """ @wraps(f) def decorated_function(*args, **kwargs): if current_app.config["AUTH"] == Config.NONE: g.user = User("Gandalf", superadmin=True) return f(*args, **kwargs) if not request.headers.get('Authorization'): return "Missing authorization header", 401 try: payload = LoginService.parse_api_token(request) except DecodeError: return 'Token is invalid', 401 except ExpiredSignature: return 'Token has expired', 401 g.user = User.query.filter_by(username=payload['username']).first() return f(*args, **kwargs) return decorated_function
def superadmin_required(f): """ Decorator function for routes Checks Authorization header, token validity, superadmin permission and injects user into flask global variable g """ @wraps(f) def decorated_function(*args, **kwargs): if current_app.config["AUTH"] == Config.NONE: g.user = User("Gandalf", superadmin=True) return f(*args, **kwargs) if not request.headers.get('Authorization'): return "Missing authorization header", 401 try: payload = LoginService.parse_api_token(request) except DecodeError: return 'Token is invalid', 401 except ExpiredSignature: return 'Token has expired', 401 g.user = User.query.filter_by(username=payload['username']).first() if g.user.superadmin is False: return 'You are not superadmin', 401 return f(*args, **kwargs) return decorated_function
def dispatch(self, request, **kwargs): hdr = request.headers.get('Authorization', '') if not hdr.startswith('Bearer '): return self.handle_no_token(request, **kwargs) bearer = hdr.split(' ', 1)[1].strip() try: request.jwt = jwt.decode(bearer, **self.get_jwt_decode_kwargs(request, **kwargs)) except jwt.DecodeError as err: log.info('Invalid token: %s', err) return self.handle_invalid_token(request, err, **kwargs) self.handle_valid_token(request, **kwargs) return super().dispatch(request, **kwargs)
def check(self, json): """Checking a JWT against passphrase and expiry""" try: payload = jwt.decode(json, self.secret, algorithms=['HS256']) return payload['pgp'], True # something has gone wrong except jwt.DecodeError: # test return "Invalid Token", False except jwt.ExpiredSignature: # test return "Expired Token", False
def token_verify(token): secret_key = current_app.config.get('SECRET_KEY') try: data = jwt.decode(token, secret_key, algorithm='HS256') return UserService.instance().get_one_or_fail(data['id']) except jwt.DecodeError: return False
def jwt_verify(view_func): def _wrapped_view_func(request, *args, **kwargs): if hasattr(request, 'META') and request.META.get('HTTP_AUTHORIZATION'): splitted_token = request.META['HTTP_AUTHORIZATION'].split() if len(splitted_token): token = splitted_token[1] try: payload = jwt.decode(token, settings.JWT_SECRET_KEY, True) return view_func(request, *args, **kwargs) except jwt.DecodeError as err: return HttpResponse(status=401) return _wrapped_view_func
def __call__(self, environ, start_response): try: (event_token_payload, request_body) = TokenVerifierFilter.decode_and_verify_request(environ, self.app_secret, self.appId) if event_token_payload is None or request_body is None: send_403_response(start_response) else: return self.app(environ, start_response) except jwt.DecodeError: send_403_response(start_response)
def handle(self, environ, start_response): if 'event_token_payload' not in environ: try: payload, event_json = TokenVerifierFilter.decode_and_verify_request(environ, self.app_secret, self.app_id) event = Event(event_json) except jwt.DecodeError: send_403_response(start_response) return {} else: payload, event = environ['event_token_payload'], Event(environ['request_body']) if event.name == "app.install": return EventHandlerClient.send_response(self.on_app_install_handler, event, start_response) elif event.name == "app.uninstall": return EventHandlerClient.send_response(self.on_app_uninstall_handler, event, start_response) elif event.name == "chat.generateUrlPreview": return EventHandlerClient.send_response(self.on_chat_generate_url_preview_handler, event, start_response) elif event.name == "chat.receiveMessage": return EventHandlerClient.send_response(self.on_chat_receive_message_handler, event, start_response) elif event.name == "client.flockmlAction": return EventHandlerClient.send_response(self.on_client_flockml_action_handler, event, start_response) elif event.name == "client.messageAction": return EventHandlerClient.send_response(self.on_client_message_action_handler, event, start_response) elif event.name == "client.openAttachmentWidget": return EventHandlerClient.send_response(self.on_client_open_attachment_widget_handler, event, start_response) elif event.name == "client.pressButton": return EventHandlerClient.send_response(self.on_client_press_button_handler, event, start_response) elif event.name == "client.slashCommand": return EventHandlerClient.send_response(self.on_client_slash_command_handler, event, start_response) elif event.name == "client.widgetAction": return EventHandlerClient.send_response(self.on_client_widget_action_handler, event, start_response) else: raise Exception("Unknown event encountered" + event.name)
def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if not request.headers.get('Authorization'): response = jsonify(message='Missing authorization header') response.status_code = 401 return response try: payload = parse_token(request) except DecodeError: response = jsonify(message='Token is invalid') response.status_code = 401 return response except ExpiredSignature: response = jsonify(message='Token has expired') response.status_code = 401 return response g.user_id = payload['sub'] return f(*args, **kwargs) return decorated_function # Helper functions, get currently logged in user
def decode(self, token): try: return jwt.decode(token, self.secret, algorithm=self.algorithm, issuer=self.issuer) except jwt.ExpiredSignature: raise InvalidUsage("Token is expired") except jwt.DecodeError: raise InvalidUsage('Token signature is invalid') except Exception: raise Exception('Unable to parse authentication token.')
def check_auth(self): auth = request.headers.get('Authorization', None) message = '' if not auth: abort(401, message = 'Authorization header is expected') parts = auth.split() if parts[0].lower() != 'bearer': message = 'Authorization header must start with Bearer' elif len(parts) == 1: message = 'Token not found' elif len(parts) > 2: message = 'Authorization header must be Bearer + \s + token' if message: abort(401, message = message) token = parts[1] try: payload = jwt.decode( token, Security.get_jwt_skey(), algorithms = ['HS256'] ) except jwt.ExpiredSignature: message = 'token is expired' except jwt.InvalidAudienceError: message = 'incorrect audience' except jwt.DecodeError: message = 'token signature is invalid' if message: abort(401, message = message) self.logger.debug('Access granted for %s!' % payload['user']['login']) return payload
def decode_token(self, token): try: decoded = jwt.decode(token, self.config['JWT_SECRET_KEY'], algorithms=[self.config['JWT_ALGORITHM']], leeway=self.config.get('JWT_LEEWAY', 0)) except jwt.ExpiredSignatureError: raise werkzeug.exceptions.Unauthorized("JWT Error: Token is expired.") except jwt.DecodeError: raise werkzeug.exceptions.Unauthorized("JWT Error: Token could not be decoded.") except jwt.InvalidTokenError: raise werkzeug.exceptions.Unauthorized("JWT Error: Token is invalid.") return decoded
def do_auth(self, token, *args, **kwargs): dummy, secret = self.get_key_and_secret() try: # Decode the token, using the Application Signature from settings decoded = jwt.decode(token, secret, algorithms=['HS256']) except jwt.DecodeError: # Wrong signature, fail authentication raise AuthCanceled(self) kwargs.update({'response': {'token': decoded}, 'backend': self}) return self.strategy.authenticate(*args, **kwargs)
def user_data(self, access_token, *args, **kwargs): """Return user data by querying Microsoft service""" try: return self.get_json( 'https://graph.microsoft.com/v1.0/me', headers={ 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json', 'Authorization': 'Bearer ' + access_token }, method='GET' ) except (DecodeError, ExpiredSignature) as error: raise AuthTokenError(self, error)
def user_data(self, access_token, *args, **kwargs): response = kwargs.get('response') id_token = response.get('id_token') try: decoded_id_token = jwt_decode(id_token, verify=False) except (DecodeError, ExpiredSignature) as de: raise AuthTokenError(self, de) return decoded_id_token
def verify_auth_token(token): """Get the user from a JWT token.""" try: decoded = jwt.decode(token, secret, algorithms=['HS256']) except jwt.DecodeError: return None user = User.query.get(decoded['id']) return user
def check_token_status(self): """ Simple request to check if session has expired (TBD) :return: Token status """ if self.access_token is None: try: token_file = self.platform['credentials']['token_file'] token_path = os.path.join( self.workspace.workspace_root, self.workspace.platforms_dir, token_file) # Construct the POST login request with open(token_path, "r") as _file: self.access_token = _file.read except: return True print('Public_key=', self.platform_public_key) if self.platform_public_key is None: return True # Some old PyJWT versions crash with public key binary string, instead add # self.platform_public_key.decode('utf-8') try: print('access_token=', self.access_token) print('platform_public_key=', self.platform_public_key.decode('utf-8')) decoded = jwt.decode(self.access_token, self.platform_public_key.decode('utf-8'), True, algorithms='RS256', audience='adapter') # options={'verify_aud': False}) print('contents', decoded) try: self.username = decoded['preferred_username'] return True except: return True except jwt.DecodeError: print('Token cannot be decoded because it failed validation') return False except jwt.ExpiredSignatureError: print('Signature has expired') return False except jwt.InvalidIssuerError: return False except jwt.InvalidIssuedAtError: return False
def check_token_status(): """ Simple request to check if session has expired (TBD) :return: Token status """ import jwt access_token = '' platform_public_key = b'' print('Public_key=', platform_public_key) if platform_public_key is None: return True # Some old PyJWT versions crash with public key binary string, instead add # self.platform_public_key.decode('utf-8') try: print('access_token=', access_token) print('platform_public_key=', platform_public_key.decode('utf-8')) decoded = jwt.decode(access_token, platform_public_key.decode('utf-8'), True, algorithms='RS256', audience='adapter') # options={'verify_aud': False}) print('contents', decoded) try: username = decoded['preferred_username'] return True except: return True except jwt.DecodeError: print('Token cannot be decoded because it failed validation') return False except jwt.ExpiredSignatureError: print('Signature has expired') return False except jwt.InvalidIssuerError: return False except jwt.InvalidIssuedAtError: return False # generate_keypair() # token = user_login('user04', '1234') # print(token) # key = get_platform_public_key() # check_token(key, token) # sign() # result = check_token_status() # print("RESULT=", result)