我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用itsdangerous.BadSignature()。
def verify_auth_token(token): """Validate the token whether is night.""" serializer = Serializer( current_app.config['SECRET_KEY']) try: # serializer object already has tokens in itself and wait for # compare with token from HTTP Request /api/posts Method `POST`. data = serializer.loads(token) except SignatureExpired: return None except BadSignature: return None user = User.query.filter_by(id=data['id']).first() return user
def activate_user(payload): s = get_serializer() try: user_id = s.loads(payload) except BadSignature: abort(400) user = User.query.filter(User.id == user_id).first() if user is not None: user.activate() else: abort(400) return "user activated" # from . import users, listings, books
def get(self, request, *args, **kwargs): if request.GET.get('key'): serializer = URLSafeTimedSerializer(settings.SECRET_KEY) try: user_id = serializer.loads( request.GET.get('key'), max_age=60 * 2, # Signature expires after 2 minutes ) user = get_object_or_404(User, id=user_id) user.backend = 'django.contrib.auth.backends.ModelBackend' login(request, user) return redirect('home') except (BadSignature, BadTimeSignature): return redirect('login') return super().get(request, *args, **kwargs)
def change_email(self, token): """Verify the new email for this user.""" s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except (BadSignature, SignatureExpired): return False if data.get('change_email') != self.id: return False new_email = data.get('new_email') if new_email is None: return False if self.query.filter_by(email=new_email).first() is not None: return False self.email = new_email db.session.add(self) db.session.commit() return True
def ensure_user(view_func): """Decorator that errors if the user is not logged in. This is analagous to frontend.ensure_user """ @wraps(view_func) def inner(*args, **kwargs): header_name = 'X-Session-Key' err_msg = 'A valid {0} header is required.'.format(header_name) key = request.headers.get(header_name, '') try: signer = get_signer() token = signer.unsign(key).decode('utf-8') except (BadSignature, ValueError): abort(403, err_msg) user = core.user_for_token(token) if user is None: abort(403, err_msg) return view_func(user, *args, **kwargs) return inner
def open_session(self, app, request): sid = request.cookies.get(app.session_cookie_name) if not sid: sid = self._generate_sid() return self.session_class(sid=sid, permanent=self.permanent) if self.use_signer: signer = self._get_signer(app) if signer is None: return None try: sid_as_bytes = signer.unsign(sid) sid = sid_as_bytes.decode() except BadSignature: sid = self._generate_sid() return self.session_class(sid=sid, permanent=self.permanent) data = self.cache.get(self.key_prefix + sid) if data is not None: return self.session_class(data, sid=sid) return self.session_class(sid=sid, permanent=self.permanent)
def verify_token(username, token): """ Verify validity of token """ s = TimedJWSSerializer(app.config['SECRET_KEY']) try: ut.pretty_print("Trying to load the token") data = s.loads(token) except SignatureExpired: ut.pretty_print("ERROR: Expired Token") return False except BadSignature: ut.pretty_print("ERROR: Invalid Token") return False else: ut.pretty_print("Token successfully loaded") stored = db.sessions.find_one(filter={'username': data['username']}, sort=[('_id',-1)]) if not stored: return False result = json_util.loads(json_util.dumps(stored)) return pwd_context.verify(data['password'], result['password_hash']) and data['username'] == username
def is_valid_token(token, token_expiry): """ Validates if the supplied token is valid, and hasn't expired. :param token: Token to check :param token_expiry: When the token expires in seconds :return: True if token is valid, and user_id contained in token """ entropy = current_app.secret_key if current_app.secret_key else 'un1testingmode' serializer = URLSafeTimedSerializer(entropy) try: tokenised_user_id = serializer.loads(token, max_age=token_expiry) except SignatureExpired: current_app.logger.debug('Token has expired') return False, None except BadSignature: current_app.logger.debug('Bad Token Signature') return False, None return True, tokenised_user_id
def token_find(self, token: str) -> int: """Return a user ID from a token. Parses the token to get the user ID and then unsigns it using the user's hashed password as a secret key """ userid_encoded = token.split('.')[0] try: userid = int(base64.urlsafe_b64decode(userid_encoded)) except (binascii.Error, ValueError): return None raw_user = self.get_raw_user(userid) if raw_user is None: return s = TimestampSigner(raw_user['password']['hash']) try: s.unsign(token) except itsdangerous.BadSignature: return return userid
def verify_auth_token(token): s = Serializer(app.config['SECRET_KEY']) try: data = s.loads(b64decode(token)) except SignatureExpired: return None # valid token, but expired except BadSignature: return None # invalid token user = User.query.get(data['id']) return user
def open_session(self, app, request): s = self.get_signing_serializer(app) if s is None: return None val = request.cookies.get(app.session_cookie_name) if not val: return self.session_class() max_age = total_seconds(app.permanent_session_lifetime) try: data = s.loads(val, max_age=max_age) return self.session_class(data) except BadSignature: return self.session_class()
def verify_auth_token(cls, token): s = TimedJSONWebSignatureSerializer(current_app.config.get("SECRET_KEY", "No secret key")) try: data = s.loads(token) except SignatureExpired: raise TokenExpired(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"Token?????????"})) except BadSignature: raise BadToken(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"Token????????"})) try: user = User.get_object(id=data["user_id"]) except ObjectNotExists: raise BadToken(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"Token????????"})) return user
def set_password(token): """Set initial customer password. The template for this route contains bootstrap.css, bootstrap-theme.css and main.css. This is similar to the password reset option with two exceptions: it has a longer expiration time and does not require old password. :param token: Token generated by :meth:`app.models.User.generate_reset_token` :return: """ s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY']) try: s.loads(token) except BadSignature: flash('Signature expired.') return redirect(url_for('main.index')) form = SetPasswordForm() if form.validate_on_submit(): User.set_password(token, form.data['password']) flash('Your new password has been set.') return redirect(url_for('main.index')) for field, err in form.errors.items(): flash(err[0], 'danger') return render_template('auth/set_password.html', form=form, token=token)
def open_session(self, app, request): s = self.get_serializer(app) if s is None: return None val = request.cookies.get(app.session_cookie_name) if not val: return self.session_class() max_age = app.permanent_session_lifetime.total_seconds() try: data = s.loads(val, max_age=max_age) return self.session_class(data) except BadSignature: return self.session_class()
def check_signed_params(self, jwt_data): """ Decodes the params, makes sure they pass signature (i.e., are valid), and then checks that we haven't seen the msgid before. Raises a ValueError if errors, else returns True. TODO(matt): this particular method seems to be unused (not so the one in federer_handlers.config.config). """ s = itsdangerous.JSONWebSignatureSerializer(self.conf['bts_secret']) try: data = s.loads(jwt_data) except itsdangerous.BadSignature: logger.error("Bad jwt signature for request, ignoring.") raise ValueError("Bad signature") # make sure the msg hasn't been seen before, if so, discard it if "msgid" in data: if self.msgid_db.seen(str(data['msgid'])): logger.error("Endaga: Repeat msgid: %s" % (data['msgid'],)) raise ValueError("Repeat msgid: %s" % (data['msgid'],)) else: logger.error("Endaga: No message ID.") raise ValueError("No message ID.") return data
def check_signed_params(self, jwt_data): """Checks a JWT signature and message ID. Decodes the params, makes sure they pass signature (i.e., are valid), and then checks that we haven't seen the msgid before. TODO(matt): refactor as this was copied from federer_handlers.common. Inheriting from common as before does not work because CI cannot import ESL, an import that comes from freeswitch_interconnect. Raises: ValueError if there are errors Returns: True if everything checks out """ s = itsdangerous.JSONWebSignatureSerializer(self.conf['bts_secret']) try: data = s.loads(jwt_data) except itsdangerous.BadSignature: logger.emergency("Bad signature for request, ignoring.") raise ValueError("Bad signature") # Make sure the msg hasn't been seen before, if so, discard it. if "msgid" in data: if self.msgid_db.seen(str(data['msgid'])): logger.error("Endaga: Repeat msgid: %s" % (data['msgid'],)) raise ValueError("Repeat msgid: %s" % (data['msgid'],)) else: logger.error("Endaga: No message ID.") raise ValueError("No message ID.") return data
def reset_password(self, token: str, new_password: str) -> None: """Reset a users password by using a token. .. note:: Don't forget to commit the database. :param token: A token as generated by :py:meth:`User.get_reset_token`. :param new_password: The new password to set. :returns: Nothing. :raises psef.auth.PermissionException: If something was wrong with the given token. """ ts = URLSafeTimedSerializer(psef.app.config['SECRET_KEY']) try: username = ts.loads( token, max_age=psef.app.config['RESET_TOKEN_TIME'], salt=self.reset_token ) except BadSignature: import traceback traceback.print_exc() raise psef.auth.PermissionException( 'The given token is not valid', f'The given token {token} is not valid.', psef.errors.APICodes.INVALID_CREDENTIALS, 403 ) # This should never happen but better safe than sorry. if (username != self.username or self.reset_token is None): # pragma: no cover raise psef.auth.PermissionException( 'The given token is not valid for this user', f'The given token {token} is not valid for user "{self.id}".', psef.errors.APICodes.INVALID_CREDENTIALS, 403 ) self.password = new_password self.reset_token = None
def lab(token): try: lab_id, instance_id, exam_id, response_id = external_serializer.loads(token, max_age=60) except (BadSignature, SignatureExpired): abort(403) url_base = '{0}/labapiConnection/ShowLab?labInstanceGuid={1}&fullScreen=False' okay_states = {'STARTING', 'ACTIVE'} if instance_id: lab_instance_url = '{0}/labapi/v1/instance?id={1}'.format(current_app.config['XTREME_URL'], instance_id) resp = requests.get(lab_instance_url, auth=HTTPBasicAuth(username=current_app.config['XTREME_ID'], password=current_app.config['XTREME_SECRET'])) if resp.status_code != 200 or resp.json()['state'] not in okay_states: abort(400) url = resp.json()['connectionUrl'] or url_base.format(current_app.config['XTREME_URL'], instance_id) else: payload = { 'labID': lab_id } lab_url = '{0}/labapi/v1/Create'.format(current_app.config['XTREME_URL']) resp = requests.put(lab_url, json=payload, auth=HTTPBasicAuth(username=current_app.config['XTREME_ID'], password=current_app.config['XTREME_SECRET'])) if resp.status_code != 200: abort(400) instance_id = resp.json()['id'] redis_store.setex(response_id, 3600, instance_id) url = url_base.format(current_app.config['XTREME_URL'], instance_id) return render_template('xtreme.html', url=url, response_id=response_id, instance_id=instance_id, exam_id=exam_id)
def jwt_domain(): """ Require a valid JWT token to be present in the request """ token = read_auth_header() if token is None: abort(401, message='Authorization Required: Request does not contain ' 'an access token') try: payload = load_jwt(token) except BadSignature: abort(401, message='Invalid authorization token') return payload['domain']