我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用jwt.ExpiredSignatureError()。
def decode_auth_token(auth_token): """ Validates the auth token :param auth_token: :return: integer|string """ try: payload = jwt.decode(auth_token, app.config.get('SECRET_KEY')) is_blacklisted_token = BlacklistToken.check_blacklist(auth_token) if is_blacklisted_token: return 'Token blacklisted. Please log in again.' else: return payload['sub'] except jwt.ExpiredSignatureError: return 'Signature expired. Please log in again.' except jwt.InvalidTokenError: return 'Invalid token. Please log in again.'
def decode_auth_token(token): """ Decoding the token to get the payload and then return the user Id in 'sub' :param token: Auth Token :return: """ try: payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms='HS256') is_token_blacklisted = BlackListToken.check_blacklist(token) if is_token_blacklisted: return 'Token was Blacklisted, Please login In' return payload['sub'] except jwt.ExpiredSignatureError: return 'Signature expired, Please sign in again' except jwt.InvalidTokenError: return 'Invalid token. Please sign in again'
def authenticate(self, auth_token): try: token_payload = jwt.decode(auth_token, secret) username = token_payload['username'] self.username = username self.authenticated = True self.auth_failures = 0 self.send_json(action='AUTH OK') # If we are the first websocket connecting on behalf of # a given user, subscribe to the feed for that user if len(WebSocket.sockets[username]) == 0: WebSocket.subscribe(username) WebSocket.sockets[username].add(self) except jwt.DecodeError: self.send_json(action='AUTH FAILED') except jwt.ExpiredSignatureError: self.send_json(action='AUTH FAILED')
def is_authenticated(req): auth_token = req.headers.get('Authentication-Token') if auth_token: try: auth_token_payload = jwt.decode(auth_token, os.environ["JWT_SECRET"]) user_id = int(auth_token_payload['user_id']) user = User.query.filter_by(id=user_id, is_active=True).first() if user: return user else: return False except jwt.ExpiredSignatureError: return False else: return False
def get_current_user(self): """ Overrides the built-in function to get the user id based on a JWT-token. If there is no token included in the Authorization header None is returned. In case the token is not valid an corresponding exception is raised. :return: The user id as a string or None if the uid couldn't be extracted :raises jwt.InvalidIssuedAtError: Raised if the IAT-claim is less than the last time the user password changed :raises jwt.ExpiredSignatureError: Raised if the EXT-claim is less than the current UNIX time :raises jwt.InvalidTokenError: Raised if the token is invalid for reasons other than the above mentioned """ auth_header = self.request.headers.get('Authorization') if auth_header is not None and auth_header.startswith("Bearer "): encoded_jwt_token = auth_header[7:] payload = TokenGenerator.decode_token(encoded_jwt_token) # TODO Should we check if payload["iat"] < last password change? if False: raise jwt.InvalidIssuedAtError return payload["uid"] return None
def prepare(self): """ Ensures that the caller is a validated user """ super().prepare() try: if self.current_user is None: self.send_error(401) # TODO Add details except jwt.InvalidIssuedAtError: log.error("The IAT-claim of the passed token is less than the last time the user password changed") self.send_error(401, summary="invalidated token") except jwt.ExpiredSignatureError: log.error("The EXT-claim of the passed token is less than the current UNIX time") self.send_error(401, summary="expired token") except jwt.InvalidTokenError: log.error("The passed token could not be validated") self.send_error(401, summary="invalid token")
def _user_from_refresh_token(self, jwtstr: str, key_pemstr: str, expected_issuer: Optional[str]=None, expected_audience: Optional[str]=None) -> Optional[MNUser]: _log.debug("Received refresh token: %s", jwtstr) try: token = jwt.decode(jwtstr, key_pemstr, algorithms=["RS256"], leeway=10, issuer=expected_issuer, audience=expected_audience) except (jwt.ExpiredSignatureError, jwt.InvalidAlgorithmError, jwt.InvalidIssuerError, jwt.InvalidTokenError) as e: _log.warning("Rejected refresh token because of %s", str(e)) return None if "sub" not in token: _log.error("BUG? Valid refresh token without user in subject. %s", jwtstr) return None try: user = MNUser.objects.get(pk=token["sub"]) # type: MNUser except MNUser.DoesNotExist: _log.warning("No such user from valid JWT. %s", jwtstr) return None return user
def verify_token(): """ Verify if the token is valid, not expired and not blacklisted """ if 'Authorization' in request.headers: if request.headers['Authorization'] in cache.blacklisted_tokens: abort(403, 'Error: invalid token') try: payload = jwt.decode(request.headers['Authorization'], config.SECRET_KEY) g.current_user = payload['id_user'] except jwt.ExpiredSignatureError: abort(403, 'Error: token expired') except jwt.DecodeError: abort(403, 'Error: invalid token')
def decode_auth_token(auth_token): """Decodes the auth token - :param auth_token: - :return: integer|string""" try: payload = jwt.decode( auth_token, current_app.config.get('SECRET_KEY')) return payload['sub'] except jwt.ExpiredSignatureError: return 'Signature expired. Please log in again.' except jwt.InvalidTokenError: return 'Invalid token. Please log in again.'
def verify_jwt(signed_token): unvalidated_token = jwt.decode(signed_token, verify=False) hipchat_room = HipChatRoom.query.filter(HipChatRoom.hipchat_oauth_id == unvalidated_token['iss']).first_or_404() try: jwt.decode(signed_token, hipchat_room.hipchat_oauth_secret) except jwt.exceptions.DecodeError: flask.flash('Unable to decode the Java Web Token provided', 'error') flask.abort(401) except jwt.ExpiredSignatureError: flask.flash('The provided Java Web Token is expired: try refreshing the page', 'error') flask.abort(401) else: return hipchat_room
def verify_token(cls, user, token): try: data = jwt.decode(token, str(user.id) + SECRET_CODE) except (jwt.ExpiredSignatureError, jwt.DecodeError, AttributeError): return False else: if data["username"] == user.username: return True else: return False
def decode_token(token): """ Get the organisation ID from a token :param token: a JSON Web Token :returns: str, organisation ID :raises: jwt.DecodeError: Invalid token or not signed with our key jwt.ExpiredSignatureError: Token has expired jwt.InvalidAudienceError: Invalid "aud" claim jwt.InvalidIssuerError: Invalid "iss" claim jwt.MissingRequiredClaimError: Missing a required claim """ cert_file = getattr(options, 'ssl_cert', None) or LOCALHOST_CRT with open(cert_file) as f: cert = load_pem_x509_certificate(f.read(), default_backend()) public_key = cert.public_key() payload = jwt.decode(token, public_key, audience=audience(), issuer=issuer(), algorithms=[ALGORITHM], verify=True) if not payload.get('sub'): raise jwt.MissingRequiredClaimError('"sub" claim is required') payload['scope'] = Scope(payload['scope']) return payload
def test_decode_expired_token(): in_the_past = NOW - timedelta(minutes=EXPIRY * 2 + 1) with patch.object(_token.datetime, 'utcnow', return_value=in_the_past): token, expiry = generate_token(CLIENT, SCOPE, 'grant_type') with pytest.raises(jwt.ExpiredSignatureError): decode_token(token)
def verify_jwt(auth_header, secret): """Extract the jwt token from the header, verify its signature, its expiration time, and return the payload.""" if not auth_header or auth_header == 'null': logging.warning("No Authorization header") return [None, "Unauthorized access: missing authentication"] method,token = auth_header.split() # separate 'JWT' from the jwt itself token = bytes(token, 'utf-8') try: payload = jwt.decode(token, secret, algorithms=['HS256'], verify=True) except (jwt.ExpiredSignatureError, jwt.DecodeError) as err: return [None, str(err)] return [payload, '']
def test_expired_token(app): with app.test_request_context(): delta = timedelta(minutes=-5) access_token = create_access_token('username', expires_delta=delta) refresh_token = create_refresh_token('username', expires_delta=delta) with pytest.raises(ExpiredSignatureError): decode_token(access_token) with pytest.raises(ExpiredSignatureError): decode_token(refresh_token)
def verify_token(self, token): try: decode = jwt.decode(token, self.token_config.get('secret'), algorithm=self.token_config.get('algorithm')) except jwt.ExpiredSignatureError: raise AuthException('Time is up', 401, AuthException.TOKEN_TIME_UP) except: raise AuthException('Token is not valid', 401, AuthException.TOKEN_ERROR) return decode
def check_token(key, access_token): import jwt """ Simple request to check if session has expired (TBD) :return: Token status """ try: contents = jwt.decode(access_token, key, True, algorithms='RS256', audience='adapter') # options={'verify_aud': False}) print('contents', contents) return True except jwt.ExpiredSignatureError: print('Signature has expired') return False
def check_token(): data = request.get_json() auth_token = data['token'] try: auth_token_payload = jwt.decode(auth_token, os.environ["JWT_SECRET"]) expiration_time = auth_token_payload['exp'] is_valid = datetime.now() + timedelta(minutes=30) <= datetime.fromtimestamp(expiration_time) except jwt.ExpiredSignatureError: is_valid = False return make_response(jsonify({'isValid': is_valid})), 200
def is_token_valid(token): """ Check if token is valid. """ try: jwt.decode(token, 'secret', algorithms=JWTHelper.JWT_ALGORITHM) return True, "Valid" except jwt.ExpiredSignatureError: return False, "Token Expired" except jwt.InvalidTokenError: return False, "Token is Invalid"
def decode_token(self, token): try: decoded = jwt.decode(token, self.config['JWT_SECRET_KEY'], algorithms=[self.config['JWT_ALGORITHM']], leeway=self.config.get('JWT_LEEWAY', 0)) except jwt.ExpiredSignatureError: raise werkzeug.exceptions.Unauthorized("JWT Error: Token is expired.") except jwt.DecodeError: raise werkzeug.exceptions.Unauthorized("JWT Error: Token could not be decoded.") except jwt.InvalidTokenError: raise werkzeug.exceptions.Unauthorized("JWT Error: Token is invalid.") return decoded
def decode_token(token): """Decode the access token from the Authorization header.""" try: payload = jwt.decode(token, current_app.config.get('SECRET')) return payload['sub'] except jwt.ExpiredSignatureError: return "Expired token. Please log in to get a new token" except jwt.InvalidTokenError: return "Invalid token. Please register or login"
def _set_error_handler_callbacks(self, app): """ Sets the error handler callbacks used by this extension """ @app.errorhandler(NoAuthorizationError) def handle_auth_error(e): return self._unauthorized_callback(str(e)) @app.errorhandler(CSRFError) def handle_auth_error(e): return self._unauthorized_callback(str(e)) @app.errorhandler(ExpiredSignatureError) def handle_expired_error(e): return self._expired_token_callback() @app.errorhandler(InvalidHeaderError) def handle_invalid_header_error(e): return self._invalid_token_callback(str(e)) @app.errorhandler(InvalidTokenError) def handle_invalid_token_error(e): return self._invalid_token_callback(str(e)) @app.errorhandler(JWTDecodeError) def handle_jwt_decode_error(e): return self._invalid_token_callback(str(e)) @app.errorhandler(WrongTokenError) def handle_wrong_token_error(e): return self._invalid_token_callback(str(e)) @app.errorhandler(RevokedTokenError) def handle_revoked_token_error(e): return self._revoked_token_callback() @app.errorhandler(FreshTokenRequired) def handle_fresh_token_required(e): return self._needs_fresh_token_callback() @app.errorhandler(UserLoadError) def handler_user_load_error(e): # The identity is already saved before this exception was raised, # otherwise a different exception would be raised, which is why we # can safely call get_jwt_identity() here identity = get_jwt_identity() return self._user_loader_error_callback(identity) @app.errorhandler(UserClaimsVerificationError) def handle_failed_user_claims_verification(e): return self._claims_verification_failed_callback()
def check_token_status(self): """ Simple request to check if session has expired (TBD) :return: Token status """ if self.access_token is None: try: token_file = self.platform['credentials']['token_file'] token_path = os.path.join( self.workspace.workspace_root, self.workspace.platforms_dir, token_file) # Construct the POST login request with open(token_path, "r") as _file: self.access_token = _file.read except: return True print('Public_key=', self.platform_public_key) if self.platform_public_key is None: return True # Some old PyJWT versions crash with public key binary string, instead add # self.platform_public_key.decode('utf-8') try: print('access_token=', self.access_token) print('platform_public_key=', self.platform_public_key.decode('utf-8')) decoded = jwt.decode(self.access_token, self.platform_public_key.decode('utf-8'), True, algorithms='RS256', audience='adapter') # options={'verify_aud': False}) print('contents', decoded) try: self.username = decoded['preferred_username'] return True except: return True except jwt.DecodeError: print('Token cannot be decoded because it failed validation') return False except jwt.ExpiredSignatureError: print('Signature has expired') return False except jwt.InvalidIssuerError: return False except jwt.InvalidIssuedAtError: return False
def check_token_status(): """ Simple request to check if session has expired (TBD) :return: Token status """ import jwt access_token = '' platform_public_key = b'' print('Public_key=', platform_public_key) if platform_public_key is None: return True # Some old PyJWT versions crash with public key binary string, instead add # self.platform_public_key.decode('utf-8') try: print('access_token=', access_token) print('platform_public_key=', platform_public_key.decode('utf-8')) decoded = jwt.decode(access_token, platform_public_key.decode('utf-8'), True, algorithms='RS256', audience='adapter') # options={'verify_aud': False}) print('contents', decoded) try: username = decoded['preferred_username'] return True except: return True except jwt.DecodeError: print('Token cannot be decoded because it failed validation') return False except jwt.ExpiredSignatureError: print('Signature has expired') return False except jwt.InvalidIssuerError: return False except jwt.InvalidIssuedAtError: return False # generate_keypair() # token = user_login('user04', '1234') # print(token) # key = get_platform_public_key() # check_token(key, token) # sign() # result = check_token_status() # print("RESULT=", result)