Python jwt 模块,DecodeError() 实例源码

我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用jwt.DecodeError()

项目:cerberus-core    作者:ovh    | 项目源码 | 文件源码
def logout(request):
    """ Logout a user
    """
    try:
        token = request.environ['HTTP_X_API_TOKEN']
    except (KeyError, IndexError, TypeError):
        raise BadRequest('Missing HTTP X-Api-Token header')

    try:
        data = jwt.decode(token, settings.SECRET_KEY)
        data = json.loads(CRYPTO.decrypt(str(data['data'])))
        user = User.objects.get(id=data['id'])
        user.last_login = datetime.fromtimestamp(0)
        user.save()
        return {'message': 'Logged out'}
    except (utils.CryptoException, KeyError, jwt.DecodeError,
            jwt.ExpiredSignature, User.DoesNotExist):
        raise BadRequest('Invalid token')
项目:os-reststack-manager    作者:gemagomez    | 项目源码 | 文件源码
def authenticate():
    # logger.debug("endpoint request: %s" % request.endpoint)
    if re.search('tenant_provisioned', str(request.endpoint)):
        g.user = "phone_home"
        logger.info("Authentication bypassed: tenant_provisioned")
        return

    try:
        decoded = jwt.decode(request.headers['X-Auth-Token'], credentials['tenant_secret'], algorithms=['HS256'])
        g.user = decoded['user']
    except KeyError:
        logger.error("Error: key error.")
        abort(401)
    except jwt.DecodeError:
        logger.error("Error: decode error")
        abort(401)
项目:django-redis-pubsub    作者:andrewyoung1991    | 项目源码 | 文件源码
def authjwt_method(token):
    """ an authentication method using rest_framework_jwt
    """
    import jwt
    from rest_framework_jwt.authentication import (jwt_decode_handler,
                                                    jwt_get_username_from_payload)
    try:
        payload = jwt_decode_handler(token)
    except (jwt.ExpiredSignature, jwt.DecodeError, jwt.InvalidTokenError):
        return None

    User = get_user_model()
    username = jwt_get_username_from_payload(payload)
    if not username:  # pragma: no cover
        return None

    try:
        user = User.objects.get_by_natural_key(username)
    except User.DoesNotExist:  # pragma: no cover
        return None

    return user
项目:django-open-volunteering-platform    作者:OpenVolunteeringPlatform    | 项目源码 | 文件源码
def authenticate(self, request):
    """
    Returns a two-tuple of `User` and token if a valid signature has been
    supplied using JWT-based authentication.  Otherwise returns `None`.
    """
    jwt_value = self.get_jwt_value(request)
    if jwt_value is None:
      return None

    try:
      payload = jwt_decode_handler(jwt_value)
    except jwt.ExpiredSignature:
      msg = _('Signature has expired.')
      raise exceptions.AuthenticationFailed(msg)
    except jwt.DecodeError:
      msg = _('Error decoding signature.')
      raise exceptions.AuthenticationFailed(msg)
    except jwt.InvalidTokenError:
      raise exceptions.AuthenticationFailed()

    user = self.authenticate_credentials(payload, request.channel)

    return (user, jwt_value)
项目:nanosphere    作者:walkr    | 项目源码 | 文件源码
def decode_jwt(token, options=None):
    """ Authenticate via JSON Web Token

    Options: `secret`, `algorithms` """

    options = options or {}
    secret = options.get('secret', CONF and CONF.secret)
    algorithms = options.get('algorithms', CONF and CONF.algorithms)

    try:
        data = jwt.decode(token, secret, algorithms)
    except jwt.DecodeError:
        raise Exception('Cannot decode token')
    else:
        logging.debug('* Decoded data = {}'.format(data))
        return data
项目:auction-backend    作者:luissalgadofreire    | 项目源码 | 文件源码
def authenticate(token):
    """
    Tries to authenticate user based on the supplied token. It also checks
    the token structure and validity.

    Based on jwt_auth.JSONWebTokenAuthMixin.authenticate
    """
    try:
        payload = jwt_decode_handler(token)
    except jwt.ExpiredSignature:
        msg = 'Signature has expired.'
        raise exceptions.AuthenticationFailed(msg)
    except jwt.DecodeError:
        msg = 'Error decoding signature.'
        raise exceptions.AuthenticationFailed(msg)

    user = authenticate_credentials(payload)

    return user
项目:pivportal    作者:starboarder2001    | 项目源码 | 文件源码
def token_required(secret_key):
    def token_required_decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            g = f.func_globals

            if not request.headers.get('Authorization'):
                return Response(response="Missing authorization header", status=401)
            try:
                payload = parse_token(request.headers.get('Authorization').split()[1], secret_key)
            except jwt.DecodeError:
                return Response(response="Token is invalid", status=401)
            except jwt.ExpiredSignature:
                return Response(response="Token has expired", status=401)

            # Set username for decorated func
            g["username"] = payload['sub']

            return f(*args, **kwargs)
        return decorated_function
    return token_required_decorator
项目:baselayer    作者:cesium-ml    | 项目源码 | 文件源码
def authenticate(self, auth_token):
        try:
            token_payload = jwt.decode(auth_token, secret)
            username = token_payload['username']

            self.username = username
            self.authenticated = True
            self.auth_failures = 0
            self.send_json(action='AUTH OK')

            # If we are the first websocket connecting on behalf of
            # a given user, subscribe to the feed for that user
            if len(WebSocket.sockets[username]) == 0:
                WebSocket.subscribe(username)

            WebSocket.sockets[username].add(self)

        except jwt.DecodeError:
            self.send_json(action='AUTH FAILED')
        except jwt.ExpiredSignatureError:
            self.send_json(action='AUTH FAILED')
项目:drf-jwt-devices    作者:ArabellaTech    | 项目源码 | 文件源码
def authenticate(self, request):
        jwt_value = self.get_jwt_value(request)
        if jwt_value is None:
            return None

        try:
            if api_settings.JWT_PERMANENT_TOKEN_AUTH:
                payload = jwt_devices_decode_handler(jwt_value)
            else:
                payload = jwt_decode_handler(jwt_value)
        except jwt.ExpiredSignature:
            msg = _("Signature has expired.")
            raise exceptions.AuthenticationFailed(msg)
        except jwt.DecodeError:
            msg = _("Error decoding signature.")
            raise exceptions.AuthenticationFailed(msg)
        except jwt.InvalidTokenError:
            raise exceptions.AuthenticationFailed()

        user = self.authenticate_credentials(payload)

        return user, jwt_value
项目:social-core    作者:python-social-auth    | 项目源码 | 文件源码
def get_certificate(self, kid):
        # retrieve keys from jwks_url
        resp = self.request(self.jwks_url(), method='GET')
        resp.raise_for_status()

        # find the proper key for the kid
        for key in resp.json()['keys']:
            if key['kid'] == kid:
                x5c = key['x5c'][0]
                break
        else:
            raise DecodeError('Cannot find kid={}'.format(kid))

        certificate = '-----BEGIN CERTIFICATE-----\n' \
                      '{}\n' \
                      '-----END CERTIFICATE-----'.format(x5c)

        return load_pem_x509_certificate(certificate.encode(),
                                         default_backend())
项目:social-core    作者:python-social-auth    | 项目源码 | 文件源码
def user_data(self, access_token, *args, **kwargs):
        response = kwargs.get('response')
        id_token = response.get('id_token')

        # decode the JWT header as JSON dict
        jwt_header = json.loads(
            base64.b64decode(id_token.split('.', 1)[0]).decode()
        )

        # get key id and algorithm
        key_id = jwt_header['kid']
        algorithm = jwt_header['alg']

        try:
            # retrieve certificate for key_id
            certificate = self.get_certificate(key_id)

            return jwt_decode(
                id_token,
                key=certificate.public_key(),
                algorithms=algorithm,
                audience=self.setting('SOCIAL_AUTH_AZUREAD_OAUTH2_KEY')
            )
        except (DecodeError, ExpiredSignature) as error:
            raise AuthTokenError(self, error)
项目:idealoom    作者:conversence    | 项目源码 | 文件源码
def decode_token(token, secret='', ttl=DEFAULT_TTL, verify=True):
    try:
        token = jwt.decode(str(token), secret, verify=verify)
    except jwt.DecodeError as e:
        raise TokenInvalid("error decoding JSON Web Token", e)

    if verify:
        issue_time = token.get('issuedAt')
        if issue_time is None:
            raise TokenInvalid("'issuedAt' is missing from token")

        issue_time = isodate.parse_datetime(issue_time)
        expiry_time = issue_time + datetime.timedelta(seconds=ttl)

        if issue_time > _now():
            raise TokenInvalid("token is not yet valid")
        if expiry_time < _now():
            raise TokenInvalid("token has expired")

    return token
项目:IntegraTI-API    作者:discentes-imd    | 项目源码 | 文件源码
def verify_token():
    """
    Verify if the token is valid, not expired and not blacklisted
    """
    if 'Authorization' in request.headers:
        if request.headers['Authorization'] in cache.blacklisted_tokens:
            abort(403, 'Error: invalid token')
        try:
            payload = jwt.decode(request.headers['Authorization'], config.SECRET_KEY)
            g.current_user = payload['id_user']
        except jwt.ExpiredSignatureError:
            abort(403, 'Error: token expired')
        except jwt.DecodeError:
            abort(403, 'Error: invalid token')
项目:xiaodi    作者:shenaishiren    | 项目源码 | 文件源码
def verify_token(cls, user, token):
        try:
            data = jwt.decode(token, str(user.id) + SECRET_CODE)
        except (jwt.ExpiredSignatureError, jwt.DecodeError, AttributeError):
            return False
        else:
            if data["username"] == user.username:
                return True
            else:
                return False
项目:arc    作者:lap00zza    | 项目源码 | 文件源码
def decode_token(self, token):
        try:
            return jwt.decode(token, key=self.secret, algorithms=self.algorithm)
        except jwt.DecodeError:
            return False
项目:vote4code    作者:welovecoding    | 项目源码 | 文件源码
def azure_ad_authorized():
  response = azure_ad.authorized_response()
  print response
  if response is None:
    flask.flash('You denied the request to sign in.')
    return flask.redirect(util.get_next_url)
  id_token = response['id_token']
  flask.session['oauth_token'] = (id_token, '')
  try:
    decoded_id_token = jwt.decode(id_token, verify=False)
  except (jwt.DecodeError, jwt.ExpiredSignature):
    flask.flash('You denied the request to sign in.')
    return flask.redirect(util.get_next_url)
  user_db = retrieve_user_from_azure_ad(decoded_id_token)
  return auth.signin_user_db(user_db)
项目:auth-srv    作者:openpermissions    | 项目源码 | 文件源码
def decode_token(token):
    """
    Get the organisation ID from a token

    :param token: a JSON Web Token
    :returns: str, organisation ID
    :raises:
        jwt.DecodeError: Invalid token or not signed with our key
        jwt.ExpiredSignatureError: Token has expired
        jwt.InvalidAudienceError: Invalid "aud" claim
        jwt.InvalidIssuerError: Invalid "iss" claim
        jwt.MissingRequiredClaimError: Missing a required claim
    """
    cert_file = getattr(options, 'ssl_cert', None) or LOCALHOST_CRT
    with open(cert_file) as f:
        cert = load_pem_x509_certificate(f.read(), default_backend())
        public_key = cert.public_key()

    payload = jwt.decode(token,
                         public_key,
                         audience=audience(),
                         issuer=issuer(),
                         algorithms=[ALGORITHM],
                         verify=True)

    if not payload.get('sub'):
        raise jwt.MissingRequiredClaimError('"sub" claim is required')

    payload['scope'] = Scope(payload['scope'])

    return payload
项目:auth-srv    作者:openpermissions    | 项目源码 | 文件源码
def test_decode_token_different_public_key():
    token, expiry = generate_token(CLIENT, SCOPE, 'grant_type')

    with patch.object(_token, 'LOCALHOST_CRT', koi.CLIENT_CRT):
        with pytest.raises(jwt.DecodeError):
            decode_token(token)
项目:auth-srv    作者:openpermissions    | 项目源码 | 文件源码
def test_invalid_assertion(self):
        self.request.body_arguments['assertion'] = ['invalid_token']
        grant = grants.AuthorizeDelegate(self.request)

        with pytest.raises(jwt.DecodeError):
            yield grant.generate_token()
项目:graphene-jwt-auth    作者:darwin4031    | 项目源码 | 文件源码
def authenticate(self, request):
        auth = get_authorization_header(request).split()
        auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()

        if not auth or smart_text(auth[0].lower()) != auth_header_prefix:
            raise exceptions.AuthenticationFailed()

        if len(auth) == 1:
            msg = _("Invalid Authorization header. No credentials provided.")
            raise exceptions.AuthenticationFailed(msg)
        elif len(auth) > 2:
            msg = _("Invalid Authorization header. Credentials string should not contain spaces.")
            raise exceptions.AuthenticationFailed(msg)

        try:
            payload = jwt_decode_handler(auth[1])
        except jwt.ExpiredSignature:
            msg = _("Signature has expired.")
            raise exceptions.AuthenticationFailed(msg)
        except jwt.DecodeError:
            msg = _("Error decoding signature.")
            raise exceptions.AuthenticationFailed(msg)

        user = self.authenticate_credentials(payload)

        return (user, auth[1])
项目:graphene-jwt-auth    作者:darwin4031    | 项目源码 | 文件源码
def authenticate(self, request):
        jwt_value = self.get_jwt_value(request)

        if jwt_value is None:
            return None, None

        try:
            payload = jwt_decode_handler(jwt_value)
        except jwt.ExpiredSignature:
            msg = _("Signature has expired.")
            raise AuthenticationFailed(msg)
        except jwt.DecodeError:
            msg = _("Error decoding signature.")
            raise AuthenticationFailed(msg)
        except jwt.InvalidTokenError:
            raise AuthenticationFailed()

        # Check blacklist
        self.check_blacklist(payload)

        user = self.authenticate_credentials(payload)

        # Check if password already change invalidated all old token
        self.check_changed_password_invalidated_old_token(user, payload)

        return user, jwt_value
项目:graphene-jwt-auth    作者:darwin4031    | 项目源码 | 文件源码
def _check_payload(token):
        # Check payload valid
        try:
            payload = jwt_decode_handler(token)
        except jwt.ExpiredSignature:
            msg = _("Signature has expired.")
            raise forms.ValidationError(msg)
        except jwt.DecodeError:
            msg = _("Error decoding signature.")
            raise forms.ValidationError(msg)

        return payload
项目:varapp-backend-py    作者:varapp    | 项目源码 | 文件源码
def verify_jwt(auth_header, secret):
    """Extract the jwt token from the header, verify its signature,
       its expiration time, and return the payload."""
    if not auth_header or auth_header == 'null':
        logging.warning("No Authorization header")
        return [None, "Unauthorized access: missing authentication"]
    method,token = auth_header.split()   # separate 'JWT' from the jwt itself
    token = bytes(token, 'utf-8')
    try:
        payload = jwt.decode(token, secret, algorithms=['HS256'], verify=True)
    except (jwt.ExpiredSignatureError, jwt.DecodeError) as err:
        return [None, str(err)]
    return [payload, '']
项目:drf-jwt-knox    作者:ssaavedra    | 项目源码 | 文件源码
def get_jwt_value(self, request):
        auth = get_authorization_header(request).split()
        auth_header_prefix = api_settings.JWT_AUTH_HEADER_PREFIX.lower()

        if not auth or smart_text(auth[0].lower()) != auth_header_prefix:
            return None

        if len(auth) == 1:
            msg = _('Invalid Authorization header. No credentials provided.')
            raise exceptions.AuthenticationFailed(msg)
        elif len(auth) > 2:
            msg = _('Invalid Authorization header. Credentials string '
                    'should contain no spaces.')
            raise exceptions.AuthenticationFailed(msg)

        jwt_value = auth[1]

        try:
            payload = jwt_decode_handler(jwt_value)
        except jwt.ExpiredSignature:
            msg = _('Signature has expired.')
            raise exceptions.AuthenticationFailed(msg)
        except jwt.DecodeError:
            msg = _('Error decoding signature.')
            raise exceptions.AuthenticationFailed(msg)
        except jwt.InvalidTokenError:
            raise exceptions.AuthenticationFailed()

        return payload
项目:scitokens    作者:scitokens    | 项目源码 | 文件源码
def test_public_key(self):
        """
        Test when the public key is provided to deserialize
        """

        token = scitokens.SciToken(key = self._private_key)
        serialized_token = token.serialize(issuer = "local")

        new_token = scitokens.SciToken.deserialize(serialized_token, public_key = self._public_pem, insecure = True)
        self.assertIsInstance(new_token, scitokens.SciToken)

        # With invalid key
        with self.assertRaises(ValueError):
            scitokens.SciToken.deserialize(serialized_token, insecure=True, public_key = "asdf".encode())

        # With a proper key, but not the right one
        private_key = generate_private_key(
            public_exponent=65537,
            key_size=2048,
            backend=default_backend()
        )
        public_key = private_key.public_key()
        pem = public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        with self.assertRaises(DecodeError):
            scitokens.SciToken.deserialize(serialized_token, insecure=True, public_key = pem)
项目:OctoPrint-Dashboard    作者:meadowfrey    | 项目源码 | 文件源码
def on_join(data):
    if current_app.config["AUTH"] == Config.NONE:
        user = User("Gandalf", superadmin=True)
    else:
        token = data.get('jwt')
        if not token:
            disconnect()
            return

        try:
            payload = LoginService.parse_api_token_direct(token)
        except DecodeError:
            disconnect()
            return
        except ExpiredSignature:
            disconnect()
            return
        user = User.query.filter_by(username=payload["username"]).scalar()
    printers = user.get_accessible_printers()

    for printer in printers:
        join_room(str(printer.id))

    datatype = {
        'id': fields.Integer,
        'name': fields.String,
        'group': fields.List(
            fields.Nested({
                'name': fields.String
            })
        )
    }
    emit("printers", marshal(printers, datatype))
项目:OctoPrint-Dashboard    作者:meadowfrey    | 项目源码 | 文件源码
def login_required(f):
    """
    Decorator function for routes
    Checks Authorization header, token validity and injects user into flask global variable g
    """

    @wraps(f)
    def decorated_function(*args, **kwargs):
        if current_app.config["AUTH"] == Config.NONE:
            g.user = User("Gandalf", superadmin=True)
            return f(*args, **kwargs)

        if not request.headers.get('Authorization'):
            return "Missing authorization header", 401

        try:
            payload = LoginService.parse_api_token(request)
        except DecodeError:
            return 'Token is invalid', 401
        except ExpiredSignature:
            return 'Token has expired', 401
        g.user = User.query.filter_by(username=payload['username']).first()
        return f(*args, **kwargs)

    return decorated_function
项目:OctoPrint-Dashboard    作者:meadowfrey    | 项目源码 | 文件源码
def superadmin_required(f):
    """
    Decorator function for routes
    Checks Authorization header, token validity, superadmin permission and injects user into flask global variable g
    """

    @wraps(f)
    def decorated_function(*args, **kwargs):
        if current_app.config["AUTH"] == Config.NONE:
            g.user = User("Gandalf", superadmin=True)
            return f(*args, **kwargs)

        if not request.headers.get('Authorization'):
            return "Missing authorization header", 401

        try:
            payload = LoginService.parse_api_token(request)
        except DecodeError:
            return 'Token is invalid', 401
        except ExpiredSignature:
            return 'Token has expired', 401

        g.user = User.query.filter_by(username=payload['username']).first()
        if g.user.superadmin is False:
            return 'You are not superadmin', 401

        return f(*args, **kwargs)

    return decorated_function
项目:paws    作者:funkybob    | 项目源码 | 文件源码
def dispatch(self, request, **kwargs):
        hdr = request.headers.get('Authorization', '')
        if not hdr.startswith('Bearer '):
            return self.handle_no_token(request, **kwargs)

        bearer = hdr.split(' ', 1)[1].strip()
        try:
            request.jwt = jwt.decode(bearer, **self.get_jwt_decode_kwargs(request, **kwargs))
        except jwt.DecodeError as err:
            log.info('Invalid token: %s', err)
            return self.handle_invalid_token(request, err, **kwargs)

        self.handle_valid_token(request, **kwargs)

        return super().dispatch(request, **kwargs)
项目:congredi    作者:toxik-io    | 项目源码 | 文件源码
def check(self, json):
        """Checking a JWT against passphrase and expiry"""
        try:
            payload = jwt.decode(json, self.secret, algorithms=['HS256'])
            return payload['pgp'], True
        # something has gone wrong
        except jwt.DecodeError:  # test
            return "Invalid Token", False
        except jwt.ExpiredSignature:  # test
            return "Expired Token", False
项目:hug-n-rest    作者:khanhicetea    | 项目源码 | 文件源码
def token_verify(token):
    secret_key = current_app.config.get('SECRET_KEY')
    try:
        data = jwt.decode(token, secret_key, algorithm='HS256')
        return UserService.instance().get_one_or_fail(data['id'])
    except jwt.DecodeError:
        return False
项目:forest-django    作者:ForestAdmin    | 项目源码 | 文件源码
def jwt_verify(view_func):
    def _wrapped_view_func(request, *args, **kwargs):
        if hasattr(request, 'META') and request.META.get('HTTP_AUTHORIZATION'):
            splitted_token = request.META['HTTP_AUTHORIZATION'].split()
            if len(splitted_token):
                token = splitted_token[1]
        try:
            payload = jwt.decode(token, settings.JWT_SECRET_KEY, True)
            return view_func(request, *args, **kwargs)
        except jwt.DecodeError as err:
            return HttpResponse(status=401)

    return _wrapped_view_func
项目:pyflock    作者:flockchat    | 项目源码 | 文件源码
def __call__(self, environ, start_response):
        try:
            (event_token_payload, request_body) = TokenVerifierFilter.decode_and_verify_request(environ,
                                                                                                self.app_secret,
                                                                                                self.appId)
            if event_token_payload is None or request_body is None:
                send_403_response(start_response)
            else:
                return self.app(environ, start_response)
        except jwt.DecodeError:
            send_403_response(start_response)
项目:pyflock    作者:flockchat    | 项目源码 | 文件源码
def handle(self, environ, start_response):
        if 'event_token_payload' not in environ:
            try:
                payload, event_json = TokenVerifierFilter.decode_and_verify_request(environ,
                                                                                    self.app_secret,
                                                                                    self.app_id)
                event = Event(event_json)
            except jwt.DecodeError:
                send_403_response(start_response)
                return {}
        else:
            payload, event = environ['event_token_payload'], Event(environ['request_body'])

        if event.name == "app.install":
            return EventHandlerClient.send_response(self.on_app_install_handler, event, start_response)
        elif event.name == "app.uninstall":
            return EventHandlerClient.send_response(self.on_app_uninstall_handler, event, start_response)
        elif event.name == "chat.generateUrlPreview":
            return EventHandlerClient.send_response(self.on_chat_generate_url_preview_handler, event, start_response)
        elif event.name == "chat.receiveMessage":
            return EventHandlerClient.send_response(self.on_chat_receive_message_handler, event, start_response)
        elif event.name == "client.flockmlAction":
            return EventHandlerClient.send_response(self.on_client_flockml_action_handler, event, start_response)
        elif event.name == "client.messageAction":
            return EventHandlerClient.send_response(self.on_client_message_action_handler, event, start_response)
        elif event.name == "client.openAttachmentWidget":
            return EventHandlerClient.send_response(self.on_client_open_attachment_widget_handler, event,
                                                    start_response)
        elif event.name == "client.pressButton":
            return EventHandlerClient.send_response(self.on_client_press_button_handler, event, start_response)
        elif event.name == "client.slashCommand":
            return EventHandlerClient.send_response(self.on_client_slash_command_handler, event, start_response)
        elif event.name == "client.widgetAction":
            return EventHandlerClient.send_response(self.on_client_widget_action_handler, event, start_response)
        else:
            raise Exception("Unknown event encountered" + event.name)
项目:the-catalog    作者:thurstonemerson    | 项目源码 | 文件源码
def login_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if not request.headers.get('Authorization'):
            response = jsonify(message='Missing authorization header')
            response.status_code = 401
            return response

        try:
            payload = parse_token(request)
        except DecodeError:
            response = jsonify(message='Token is invalid')
            response.status_code = 401
            return response
        except ExpiredSignature:
            response = jsonify(message='Token has expired')
            response.status_code = 401
            return response

        g.user_id = payload['sub']

        return f(*args, **kwargs)

    return decorated_function

# Helper functions, get currently logged in user
项目:dpr-api    作者:oki-archive    | 项目源码 | 文件源码
def decode(self, token):
        try:
            return jwt.decode(token,
                              self.secret,
                              algorithm=self.algorithm,
                              issuer=self.issuer)
        except jwt.ExpiredSignature:
            raise InvalidUsage("Token is expired")
        except jwt.DecodeError:
            raise InvalidUsage('Token signature is invalid')
        except Exception:
            raise Exception('Unable to parse authentication token.')
项目:jenova    作者:inova-tecnologias    | 项目源码 | 文件源码
def check_auth(self):
    auth = request.headers.get('Authorization', None)
    message = ''
    if not auth:
      abort(401, message = 'Authorization header is expected')

    parts = auth.split()

    if parts[0].lower() != 'bearer':
      message = 'Authorization header must start with Bearer'
    elif len(parts) == 1:
      message = 'Token not found'
    elif len(parts) > 2:
      message = 'Authorization header must be Bearer + \s + token'

    if message:
      abort(401, message = message)

    token = parts[1]
    try:
      payload = jwt.decode(
        token, 
        Security.get_jwt_skey(), 
        algorithms = ['HS256']
      )
    except jwt.ExpiredSignature:
      message = 'token is expired'
    except jwt.InvalidAudienceError:
      message = 'incorrect audience'
    except jwt.DecodeError:
      message = 'token signature is invalid'

    if message:
      abort(401, message = message)

    self.logger.debug('Access granted for %s!' % payload['user']['login'])

    return payload
项目:stethoscope    作者:Netflix    | 项目源码 | 文件源码
def decode_token(self, token):
    try:
      decoded = jwt.decode(token, self.config['JWT_SECRET_KEY'],
          algorithms=[self.config['JWT_ALGORITHM']], leeway=self.config.get('JWT_LEEWAY', 0))
    except jwt.ExpiredSignatureError:
      raise werkzeug.exceptions.Unauthorized("JWT Error: Token is expired.")
    except jwt.DecodeError:
      raise werkzeug.exceptions.Unauthorized("JWT Error: Token could not be decoded.")
    except jwt.InvalidTokenError:
      raise werkzeug.exceptions.Unauthorized("JWT Error: Token is invalid.")
    return decoded
项目:social-core    作者:python-social-auth    | 项目源码 | 文件源码
def do_auth(self, token, *args, **kwargs):
        dummy, secret = self.get_key_and_secret()
        try:  # Decode the token, using the Application Signature from settings
            decoded = jwt.decode(token, secret, algorithms=['HS256'])
        except jwt.DecodeError:  # Wrong signature, fail authentication
            raise AuthCanceled(self)
        kwargs.update({'response': {'token': decoded}, 'backend': self})
        return self.strategy.authenticate(*args, **kwargs)
项目:social-core    作者:python-social-auth    | 项目源码 | 文件源码
def user_data(self, access_token, *args, **kwargs):
        """Return user data by querying Microsoft service"""
        try:
            return self.get_json(
                'https://graph.microsoft.com/v1.0/me',
                headers={
                    'Content-Type': 'application/x-www-form-urlencoded',
                    'Accept': 'application/json',
                    'Authorization': 'Bearer ' + access_token
                },
                method='GET'
            )
        except (DecodeError, ExpiredSignature) as error:
            raise AuthTokenError(self, error)
项目:social-core    作者:python-social-auth    | 项目源码 | 文件源码
def user_data(self, access_token, *args, **kwargs):
        response = kwargs.get('response')
        id_token = response.get('id_token')
        try:
            decoded_id_token = jwt_decode(id_token, verify=False)
        except (DecodeError, ExpiredSignature) as de:
            raise AuthTokenError(self, de)
        return decoded_id_token
项目:2017-rapdev-backend    作者:rit-sse    | 项目源码 | 文件源码
def verify_auth_token(token):
        """Get the user from a JWT token."""
        try:
            decoded = jwt.decode(token, secret, algorithms=['HS256'])
        except jwt.DecodeError:
            return None
        user = User.query.get(decoded['id'])
        return user
项目:son-cli    作者:sonata-nfv    | 项目源码 | 文件源码
def check_token_status(self):
        """
        Simple request to check if session has expired (TBD)
        :return: Token status
        """

        if self.access_token is None:
            try:
                token_file = self.platform['credentials']['token_file']
                token_path = os.path.join(
                    self.workspace.workspace_root,
                    self.workspace.platforms_dir,
                    token_file)

                # Construct the POST login request
                with open(token_path, "r") as _file:
                    self.access_token = _file.read
            except:
                return True

        print('Public_key=', self.platform_public_key)

        if self.platform_public_key is None:
            return True

        # Some old PyJWT versions crash with public key binary string, instead add
        # self.platform_public_key.decode('utf-8')
        try:
            print('access_token=', self.access_token)
            print('platform_public_key=', self.platform_public_key.decode('utf-8'))
            decoded = jwt.decode(self.access_token, self.platform_public_key.decode('utf-8'),
                                 True, algorithms='RS256', audience='adapter')
            # options={'verify_aud': False})
            print('contents', decoded)
            try:
                self.username = decoded['preferred_username']
                return True
            except:
                return True
        except jwt.DecodeError:
            print('Token cannot be decoded because it failed validation')
            return False
        except jwt.ExpiredSignatureError:
            print('Signature has expired')
            return False
        except jwt.InvalidIssuerError:
            return False
        except jwt.InvalidIssuedAtError:
            return False
项目:son-cli    作者:sonata-nfv    | 项目源码 | 文件源码
def check_token_status():
    """
    Simple request to check if session has expired (TBD)
    :return: Token status
    """
    import jwt

    access_token = ''
    platform_public_key = b''
    print('Public_key=', platform_public_key)

    if platform_public_key is None:
        return True

    # Some old PyJWT versions crash with public key binary string, instead add
    # self.platform_public_key.decode('utf-8')
    try:
        print('access_token=', access_token)
        print('platform_public_key=', platform_public_key.decode('utf-8'))
        decoded = jwt.decode(access_token, platform_public_key.decode('utf-8'),
                             True, algorithms='RS256', audience='adapter')
        # options={'verify_aud': False})
        print('contents', decoded)
        try:
            username = decoded['preferred_username']
            return True
        except:
            return True
    except jwt.DecodeError:
        print('Token cannot be decoded because it failed validation')
        return False
    except jwt.ExpiredSignatureError:
        print('Signature has expired')
        return False
    except jwt.InvalidIssuerError:
        return False
    except jwt.InvalidIssuedAtError:
        return False


# generate_keypair()
# token = user_login('user04', '1234')
# print(token)
# key = get_platform_public_key()
# check_token(key, token)
# sign()

# result = check_token_status()
# print("RESULT=", result)