我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用itsdangerous.SignatureExpired()。
def verify_auth_token(token): """Validate the token whether is night.""" serializer = Serializer( current_app.config['SECRET_KEY']) try: # serializer object already has tokens in itself and wait for # compare with token from HTTP Request /api/posts Method `POST`. data = serializer.loads(token) except SignatureExpired: return None except BadSignature: return None user = User.query.filter_by(id=data['id']).first() return user
def password_reset(token): try: user_id = validate_password_reset_token(token) except BadTimeSignature: flash('Invalid token', 'danger') return redirect('/login') except SignatureExpired: flash('Expired token', 'danger') return redirect('/login') if request.method == 'POST': password = request.form.get('password', '') confirm = request.form.get('password_confirmation', '') if valid_new_password(password, confirm): user = User(get_or_404(User.get_collection(), _id=user_id)) change_password(user, password) flash('Password was successfully changed.', 'success') return redirect('/login') return render_template('password_reset.html')
def change_email(self, token): """Verify the new email for this user.""" s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except (BadSignature, SignatureExpired): return False if data.get('change_email') != self.id: return False new_email = data.get('new_email') if new_email is None: return False if self.query.filter_by(email=new_email).first() is not None: return False self.email = new_email db.session.add(self) db.session.commit() return True
def verify_token(username, token): """ Verify validity of token """ s = TimedJWSSerializer(app.config['SECRET_KEY']) try: ut.pretty_print("Trying to load the token") data = s.loads(token) except SignatureExpired: ut.pretty_print("ERROR: Expired Token") return False except BadSignature: ut.pretty_print("ERROR: Invalid Token") return False else: ut.pretty_print("Token successfully loaded") stored = db.sessions.find_one(filter={'username': data['username']}, sort=[('_id',-1)]) if not stored: return False result = json_util.loads(json_util.dumps(stored)) return pwd_context.verify(data['password'], result['password_hash']) and data['username'] == username
def is_valid_token(token, token_expiry): """ Validates if the supplied token is valid, and hasn't expired. :param token: Token to check :param token_expiry: When the token expires in seconds :return: True if token is valid, and user_id contained in token """ entropy = current_app.secret_key if current_app.secret_key else 'un1testingmode' serializer = URLSafeTimedSerializer(entropy) try: tokenised_user_id = serializer.loads(token, max_age=token_expiry) except SignatureExpired: current_app.logger.debug('Token has expired') return False, None except BadSignature: current_app.logger.debug('Bad Token Signature') return False, None return True, tokenised_user_id
def try_refresh_token(self, session_id): morsel = context.cookies.get(self.refresh_token_key) if not morsel or morsel.value is None or not morsel.value.strip(): self.bad() return refresh_token_encoded = morsel.value # Decoding the refresh token try: refresh_principal = JwtRefreshToken.load(refresh_token_encoded) self.ok( self.create_principal(member_id=refresh_principal.id, session_id=session_id), setup_header=True ) except itsdangerous.SignatureExpired: self.bad() except itsdangerous.BadData: self.bad() raise HttpBadRequest()
def authenticate_request(self): if self.token_key not in context.environ: self.bad() return encoded_token = context.environ[self.token_key] if encoded_token is None or not encoded_token.strip(): self.bad() return try: self.ok(self.verify_token(encoded_token)) except itsdangerous.SignatureExpired as ex: # The token has expired. So we're trying to restore it using refresh-token. session_id = ex.payload.get('sessionId') if session_id: self.try_refresh_token(session_id) else: self.bad() raise HttpUnauthorized() except itsdangerous.BadData: # The token is Malformed self.bad() raise HttpBadRequest()
def unsign_url_safe(token, secret_key, salt=None, **kw): """ To sign url safe data. If expires_in is provided it will Time the signature :param token: :param secret_key: :param salt: (string) a namespace key :param kw: :return: """ if len(token.split(".")) == 3: s = URLSafeTimedSerializer2(secret_key=secret_key, salt=salt, **kw) value, timestamp = s.loads(token, max_age=None, return_timestamp=True) now = datetime.datetime.utcnow() if timestamp > now: return value else: raise itsdangerous.SignatureExpired( 'Signature age %s < %s ' % (timestamp, now), payload=value, date_signed=timestamp) else: s = itsdangerous.URLSafeSerializer(secret_key=secret_key, salt=salt, **kw) return s.loads(token)
def verify_auth_token(token): s = Serializer(app.config['SECRET_KEY']) try: data = s.loads(b64decode(token)) except SignatureExpired: return None # valid token, but expired except BadSignature: return None # invalid token user = User.query.get(data['id']) return user
def verify_auth_token(cls, token): s = TimedJSONWebSignatureSerializer(current_app.config.get("SECRET_KEY", "No secret key")) try: data = s.loads(token) except SignatureExpired: raise TokenExpired(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"Token?????????"})) except BadSignature: raise BadToken(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"Token????????"})) try: user = User.get_object(id=data["user_id"]) except ObjectNotExists: raise BadToken(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"Token????????"})) return user
def _verify_token(self, token): s = Serializer(current_app.config['SECRET_KEY']) data = None expired, invalid = False, False try: data = s.loads(token) except SignatureExpired: expired = True except Exception: invalid = True return expired, invalid, data
def lab(token): try: lab_id, instance_id, exam_id, response_id = external_serializer.loads(token, max_age=60) except (BadSignature, SignatureExpired): abort(403) url_base = '{0}/labapiConnection/ShowLab?labInstanceGuid={1}&fullScreen=False' okay_states = {'STARTING', 'ACTIVE'} if instance_id: lab_instance_url = '{0}/labapi/v1/instance?id={1}'.format(current_app.config['XTREME_URL'], instance_id) resp = requests.get(lab_instance_url, auth=HTTPBasicAuth(username=current_app.config['XTREME_ID'], password=current_app.config['XTREME_SECRET'])) if resp.status_code != 200 or resp.json()['state'] not in okay_states: abort(400) url = resp.json()['connectionUrl'] or url_base.format(current_app.config['XTREME_URL'], instance_id) else: payload = { 'labID': lab_id } lab_url = '{0}/labapi/v1/Create'.format(current_app.config['XTREME_URL']) resp = requests.put(lab_url, json=payload, auth=HTTPBasicAuth(username=current_app.config['XTREME_ID'], password=current_app.config['XTREME_SECRET'])) if resp.status_code != 200: abort(400) instance_id = resp.json()['id'] redis_store.setex(response_id, 3600, instance_id) url = url_base.format(current_app.config['XTREME_URL'], instance_id) return render_template('xtreme.html', url=url, response_id=response_id, instance_id=instance_id, exam_id=exam_id)
def check_token(self, token_sign): """ ?? token, ?????? token """ from itsdangerous import TimestampSigner, SignatureExpired, BadTimeSignature s = TimestampSigner(self._sign_key) try: token = s.unsign(token_sign, max_age=60) # 60??? return {'success': token} except SignatureExpired as e: # ?????? return {'error': e.message} except BadTimeSignature as e: # ?????? return {'error': e.message}
def verfy_auth_token(token): serializer=Serializer( current_app.config['SECRET_KEY'] ) try: data=serializer.loads(token) except SignatureExpired: return None except BadSignature: return None user=User.query.filter_by(id=data['id']).first() return user
def _get_auth_status(self, authuser, authpassword): try: val = self.serializer.loads(authpassword, max_age=self.LOGIN_EXPIRATION) except itsdangerous.SignatureExpired: return dict(status="expired") except itsdangerous.BadData: # check if we got user/password direct authentication return self._validate(authuser, authpassword) else: if not isinstance(val, list) or len(val) != 2 or val[0] != authuser: threadlog.debug("mismatch credential for user %r", authuser) return dict(status="nouser") return dict(status="ok", groups=val[1])
def get_by_token(cls, token): s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY']) # may raise BadSignature or SignatureExpired data = s.loads(token) user = current_app.user_datastore.find_user(id=data['id']) if not user: raise HTTPError(401, 'Unknow user with id {}'.format(data['id'])) if user.token == token: if datetime.datetime.utcnow() < user.token_expires: return user raise SignatureExpired('bad token') raise BadSignature('bad token')
def confirm_account(self, token): """Verify that the provided token is for this user's id.""" s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except (BadSignature, SignatureExpired): return False if data.get('confirm') != self.id: return False self.confirmed = True db.session.add(self) db.session.commit() return True
def reset_password(self, token, new_password): """Verify the new password for this user.""" s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except (BadSignature, SignatureExpired): return False if data.get('reset') != self.id: return False self.password = new_password db.session.add(self) db.session.commit() return True
def verify_auth_token(token): s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except SignatureExpired: return None # valid token, but expired except BadSignature: return None # invalid token user = User.query.get(data['id']) return user
def get_token_status(token, serializer, max_age=None, return_data=False): """Get the status of a token. :param token: The token to check :param serializer: The name of the seriailzer. Can be one of the following: ``confirm``, ``login``, ``reset`` :param max_age: The name of the max age config option. Can be on of the following: ``CONFIRM_EMAIL``, ``LOGIN``, ``RESET_PASSWORD`` """ serializer = getattr(_security, serializer + '_serializer') max_age = get_max_age(max_age) user, data = None, None expired, invalid = False, False try: data = serializer.loads(token, max_age=max_age) except SignatureExpired: d, data = serializer.loads_unsafe(token) expired = True except (BadSignature, TypeError, ValueError): invalid = True if data: user = _datastore.find_user(id=data[0]) expired = expired and (user is not None) if return_data: return expired, invalid, user, data else: return expired, invalid, user
def verify_token(self, encoded_token): principal = super().verify_token(encoded_token) if not self.validate_session(principal.session_id): raise itsdangerous.SignatureExpired('The token has already invalidated', principal.payload) return principal
def verify_token(self, encoded_token): principal = super().verify_token(encoded_token) if token_expired: raise itsdangerous.SignatureExpired('Simulating', payload=principal.payload) return principal
def get_payload_from_token(token): s = Serializer(current_app.config['SECRET_KEY']) cipher = AESCipher(current_app.config['SECRET_KEY'][:16]) try: data = json.loads(cipher.decrypt(s.loads(token))) return data except SignatureExpired, se: time_offset = (datetime.now()- se.date_signed).total_seconds() current_app.logger.error('token expired: %s. signature date %s. offset with current date = %s'%(se.message,str(se.date_signed),str(time_offset))) current_app.logger.error('current date %s, token date %s'%(str(datetime.now()), str(se.date_signed))) if -1<= time_offset < 0:#allow for 1 seconds out of sync machines current_app.logger.info('token time offset within grace period. allowing auth') return json.loads(cipher.decrypt(se.payload)) else: LogApiTokenExpired() # raise SignatureExpired(se) raise TokenExpired() # abort(419, message = 'Authentication expired.') except BadSignature, e: current_app.logger.error('bad signature in token') encoded_payload = e.payload if encoded_payload is not None: try: decoded_payload = s.load_payload(encoded_payload) payload= json.loads(cipher.decrypt(decoded_payload)) LogApiTokenInvalid(payload) except BadData: LogApiTokenInvalid(dict(error='bad data in token', token=token)) abort(401, message = 'bad signature in token')
def validate_csrf(data, secret_key=None, time_limit=None, token_key=None): """Check if the given data is a valid CSRF token. This compares the given signed token to the one stored in the session. :param data: The signed CSRF token to be checked. :param secret_key: Used to securely sign the token. Default is ``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``. :param time_limit: Number of seconds that the token is valid. Default is ``WTF_CSRF_TIME_LIMIT`` or 3600 seconds (60 minutes). :param token_key: Key where token is stored in session for comparision. Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``. :raises ValidationError: Contains the reason that validation failed. .. versionchanged:: 0.14 Raises ``ValidationError`` with a specific error message rather than returning ``True`` or ``False``. """ secret_key = _get_config( secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key, message='A secret key is required to use CSRF.' ) field_name = _get_config( token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token', message='A field name is required to use CSRF.' ) time_limit = _get_config( time_limit, 'WTF_CSRF_TIME_LIMIT', 3600, required=False ) if not data: raise ValidationError('The CSRF token is missing.') if field_name not in session: raise ValidationError('The CSRF session token is missing.') s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token') try: token = s.loads(data, max_age=time_limit) except SignatureExpired: raise ValidationError('The CSRF token has expired.') except BadData: raise ValidationError('The CSRF token is invalid.') if not safe_str_cmp(session[field_name], token): raise ValidationError('The CSRF tokens do not match.')
def load_token(token): """ Flask-Login token_loader callback. The token_loader function asks this function to take the token that was stored on the users computer process it to check if its valid and then return a User Object if its valid or None if its not valid. :param token: Token generated by :meth:`app.models.User.get_auth_token` """ # The Token itself was generated by User.get_auth_token. So it is up to # us to known the format of the token data itself. # The Token was encrypted using itsdangerous.URLSafeTimedSerializer which # allows us to have a max_age on the token itself. When the cookie is # stored # on the users computer it also has a exipry date, but could be changed by # the user, so this feature allows us to enforce the exipry date of the # token # server side and not rely on the users cookie to exipre. max_age = current_app.config['REMEMBER_COOKIE_DURATION'].total_seconds() # Decrypt the Security Token, data = [username, hashpass, id] s = URLSafeTimedSerializer( current_app.config['SECRET_KEY'], salt='user-auth', signer_kwargs=dict(key_derivation='hmac', digest_method=hashlib.sha256)) try: data = s.loads(token, max_age=max_age) except (BadTimeSignature, SignatureExpired): return None # Find the User user = User.query.get(data[2]) # 2FA check totp_endpoint = request.endpoint == 'auth.verify_totp' if user and user.otp_enabled and not totp_endpoint and len(data) < 4: return None # Check Password and return user or None if user and data[1] == user._password: return user return None