我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用itsdangerous.TimedJSONWebSignatureSerializer()。
def verify_access_token(access_token): """ ?? Access_token :param access_token: access_token :return: ??????????,???? False """ s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(access_token) except: return False if data.get('token_usage') != 'access': return False user = User(user_id=data.get('token_uid')) if not user or not user.user_id or not user.is_active: return False return user
def reset_password(self, token, new_pass): """Reset password. Token is generated by :meth:`~User.generate_reset_token` :param token: :param new_pass: :return: """ s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except: return False if data.get('user_id') == self.id: self.password = new_pass db.session.add(self) db.session.commit() return True return False
def change_email(self, token): """Change email address using token. """ s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except: return False if data.get('change_email') != self.id: return False new_email = data.get('new_email') if new_email is None: return False if self.query.filter_by(email=new_email).first() is not None: return False self.email = new_email db.session.add(self) return True
def verify_auth_token(token): """Validate the token whether is night.""" serializer = Serializer( current_app.config['SECRET_KEY']) try: # serializer object already has tokens in itself and wait for # compare with token from HTTP Request /api/posts Method `POST`. data = serializer.loads(token) except SignatureExpired: return None except BadSignature: return None user = User.query.filter_by(id=data['id']).first() return user
def confirm(self, token): """ ???? :param token: ????? token :return: ???? True,???? False """ s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except: return False if data.get('confirm_uid') != self.user_id: return False self.confirmed = 1 mongo.db.users.update_one({ 'user_id': self.user_id }, { '$set': { 'confirmed': 1 } }) return True
def refresh_access_token(refresh_token, expiration=3600): """ ?? Access_token :param refresh_token: refresh_token :param expiration: ? Access_token ???? :return: ???????? Access_token ???,????False """ s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(refresh_token) except: return False if data.get('token_usage') != 'refresh': return False if not data.get('token_uid'): return False sa = Serializer(current_app.config['SECRET_KEY'], expires_in=expiration) access_token = sa.dumps({'token_uid': data.get('token_uid'), 'token_usage': 'access'}).decode('ascii') return dict(access_token=access_token, refresh_token=refresh_token, expires_in=expiration, expires_at=int(time.time())+expiration, token_type='Bearer') ## # ???? ##
def change_email(self, token): s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except: return False if data.get('change_email') != self.id: return False new_email = data.get('new_email') if new_email is None: return False if self.query.filter_by(email=new_email).first() is not None: return False self.email = new_email self.avatar_hash = hashlib.md5( self.email.encode('utf-8')).hexdigest() db.session.add(self) return True
def change_email(self,token): s = Serializer(current_app.config['SECRET_KEY'],expiration) try: data = s.loads(token) except: return False if data.get('change_email') != self.id: return False new_email = data.get('new_email') if new_email is None: return False if self.query.filter_by(email=new_email).first() is not None: return False self.email = new_email self.avatar_hash = hashlib.md5(self.email.encode(utf-8)).hexdigest() db.session.add(self) return True
def confirm_user_account(token): serializer = Serializer(current_app.config['SECRET_KEY']) try: data = serializer.loads(token) except: return False user = user_repository.get_by_id(data.get('confirm')) if user is None: return False user.confirmed = True user_repository.save(user) return True
def confirm(token): s = Serializer(current_app.config["SECRET_KEY"]) try: data = s.loads(token) except: flash("The confirmation link is invalid or has expired.", "danger") return redirect(url_for("auth.unconfirmed")) u = User.query.get(data.get("confirm")) if u is None: flash("The confirmation link is invalid or has expired.", "danger") return redirect(url_for("auth.unconfirmed")) if not u.confirm(token): flash("The confirmation link is invalid or has expired.", "danger") return redirect(url_for("auth.unconfirmed")) # Confirmation complete! # Login: login_user(u) # Tell them they are good: flash("You have confirmed your account!", "success") return redirect(url_for("main.index"))
def activate_account(self, token, name, password, username): s = Serializer(current_app.config["SECRET_KEY"]) try: data = s.loads(token) except: return False if data.get("activation") != self.id: return False self.password = password self.name = name self.username = username self.confirmed = True self.active = True db.session.add(self) current_app.logger.info("User account activated: user id %s (%s)" % (self.id, self.email)) self.track_event("activated_account") return True
def confirm(self, token): s = Serializer(current_app.config["SECRET_KEY"]) try: data = s.loads(token) except: return False if data.get("confirm") != self.id: return False self.confirmed = True self.active = True db.session.add(self) db.session.commit() current_app.logger.info("User account confirmed: user id %s (%s)" % (self.id, self.email)) self.track_event("confirmed_account") if data.get("trial") is True: self.track_event("started_free_trial") return True
def change_email(self, token): s = Serializer(current_app.config["SECRET_KEY"]) try: data = s.loads(token) except: return False if data.get("change_email") != self.id: return False new_email = data.get("new_email") if new_email is None: return False if self.query.filter_by(email=new_email).first() is not None: return False self.email = new_email try: db.session.add(self) db.session.commit() except: db.session.rollback() raise Exception("Dirty session") self.track_event("changed_email") return True
def load_session_token(token): """Load cookie session""" s = Serializer(current_app.config["SECRET_KEY"], current_app.config.get("SESSION_EXPIRATION")) try: data = s.loads(token) except: return None if SessionCache.validate_session( data.get("user_id", -1), data.get("session_id", "-1")): user = User.query.get(data["user_id"]) user.set_session_id(data["session_id"]) current_app.logger.debug("Loading user %s from cookie session %s" % (user.id, user.session_id)) return user return None
def get_auth_token(self): """Cookie info. Must be secure.""" s = Serializer(current_app.config["SECRET_KEY"], current_app.config["COOKIE_EXPIRATION"]) current_app.logger.debug("Generating auth token for user %s" % self.id) if not self.is_authenticated: raise Exception("User not authenticated") return s.dumps({ "user_id": self.id, "session_id": SessionCache.create_session( self.id, expiration=current_app.config["COOKIE_EXPIRATION"]) })
def change_email(self, token): """Verify the new email for this user.""" s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except (BadSignature, SignatureExpired): return False if data.get('change_email') != self.id: return False new_email = data.get('new_email') if new_email is None: return False if self.query.filter_by(email=new_email).first() is not None: return False self.email = new_email db.session.add(self) db.session.commit() return True
def change_email(self,token) : s = Serializer(current_app.config['SECRET_KEY']) try : data = s.loads(token) except : return False if data.get('change_email') != self.id : return False new_email = data.get('new_email') if new_email is None : return False if self.query.filter_by(email=new_email).first() is not None : return False self.email = new_email self.avatar_hash = hashlib.md5(self.email.encode('utf-8')).hexdigest() db.session.add(self) return True
def verify_auth_token(cls, token): """ Ensures that the token received from the client exists and returns the User that the token belongs to. Returns None if token doesn't exist. :param token: str :return: User object or None """ s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except: return None user = User.query.get(data['id']) if user and user.session_token == token: return user return None # DB Helpers
def change_email(self, token): s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except: return False if data.get('change_email') != self.id: return False new_email = data.get('new_email') if new_email is None: return False if self.query.filter_by(email = new_email).first() is not None: return False self.email = new_email self.avatar_hash = hashlib.md5(self.email.encode('utf-8')).hexdigest() db.session.add(self) return True
def generate_token(username, password, expiration=600): """ Generate an authorized token """ doc = {'username':username, 'password_hash':pwd_context.encrypt(password)} db.sessions.find_one_and_update( {'username': username}, {"$set": doc}, upsert=True ) if (cfg.ACME_PROD or cfg.ACME_DEV) and (username == 'serveruser'): EXPIRES_IN_A_YEAR = 365 * 24 * 60 * 60 print 'token that EXPIRES_IN_A_YEAR' s = TimedJWSSerializer(app.config['SECRET_KEY'], expires_in=EXPIRES_IN_A_YEAR) else: print 'token that expires', cfg.ACME_LCL s = TimedJWSSerializer(app.config['SECRET_KEY'], expires_in=expiration) return s.dumps({'username': username, 'password': password})
def verify_token(username, token): """ Verify validity of token """ s = TimedJWSSerializer(app.config['SECRET_KEY']) try: ut.pretty_print("Trying to load the token") data = s.loads(token) except SignatureExpired: ut.pretty_print("ERROR: Expired Token") return False except BadSignature: ut.pretty_print("ERROR: Invalid Token") return False else: ut.pretty_print("Token successfully loaded") stored = db.sessions.find_one(filter={'username': data['username']}, sort=[('_id',-1)]) if not stored: return False result = json_util.loads(json_util.dumps(stored)) return pwd_context.verify(data['password'], result['password_hash']) and data['username'] == username
def change_email(self, token): """????""" s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except: return False if data.get('change_email') != self.id: return False new_email = data.get('new_email') if new_email is None: return False if self.query.filter_by(email=new_email).first() is not None: return False self.email = new_email return operate_model.db_add(self)
def generate_auth_token(self, expiration = 3600): s = Serializer(app.config['SECRET_KEY'], expires_in = expiration) str = s.dumps({'id': self.id}) return b64encode(str).decode('utf-8')
def verify_auth_token(token): s = Serializer(app.config['SECRET_KEY']) try: data = s.loads(b64decode(token)) except SignatureExpired: return None # valid token, but expired except BadSignature: return None # invalid token user = User.query.get(data['id']) return user
def generate_confirmation_token(self, expiration=3600): s = Serializer(current_app.config['SECRET_KEY'], expiration) return s.dumps({'confirm': self.id})
def confirm(self, token): s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except: return False if data.get('confirm') != self.id: return False self.confirmed = True db.session.add(self) return True
def generate_reset_token(self, expiration=3600): s = Serializer(current_app.config['SECRET_KEY'], expiration) return s.dumps({'reset': self.id})
def reset_password(self, token, new_password): s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except: return False if data.get('reset') != self.id: return False self.password = new_password db.session.add(self) return True
def generate_email_change_token(self, new_email, expiration=3600): s = Serializer(current_app.config['SECRET_KEY'], expiration) return s.dumps({'change_email': self.id, 'new_email': new_email})
def generate_auth_token(self, expiration): s = Serializer(current_app.config['SECRET_KEY'], expires_in=expiration) return s.dumps({'id': self.id}).decode('ascii')
def verify_auth_token(token): s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except: return None return User.query.get(data['id'])
def generate_auth_token(self): """ desc: ???????token params: user_id ??????? return: token date: 2016-10-28 """ s = TimedJSONWebSignatureSerializer(current_app.config.get("SECRET_KEY", "No secret key"), current_app.config.get("USER_TOKEN_EXPIRATION", 3600)) return s.dumps({"user_id": self.id})
def verify_auth_token(cls, token): s = TimedJSONWebSignatureSerializer(current_app.config.get("SECRET_KEY", "No secret key")) try: data = s.loads(token) except SignatureExpired: raise TokenExpired(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"Token?????????"})) except BadSignature: raise BadToken(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"Token????????"})) try: user = User.get_object(id=data["user_id"]) except ObjectNotExists: raise BadToken(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"Token????????"})) return user
def set_password(token): """Set initial customer password. The template for this route contains bootstrap.css, bootstrap-theme.css and main.css. This is similar to the password reset option with two exceptions: it has a longer expiration time and does not require old password. :param token: Token generated by :meth:`app.models.User.generate_reset_token` :return: """ s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY']) try: s.loads(token) except BadSignature: flash('Signature expired.') return redirect(url_for('main.index')) form = SetPasswordForm() if form.validate_on_submit(): User.set_password(token, form.data['password']) flash('Your new password has been set.') return redirect(url_for('main.index')) for field, err in form.errors.items(): flash(err[0], 'danger') return render_template('auth/set_password.html', form=form, token=token)
def generate_reset_token(self, expiry=900): """Generate a JSON Web Signature that will be used to reset customer's password. For details see :meth:`itsdangerous.JSONWebSignatureSerializer.dumps` :param expiry: Token expiration time (seconds) :return: """ s = TimedJSONWebSignatureSerializer( current_app.config['SECRET_KEY'], expiry ) return s.dumps({'user_id': self.id})
def set_password(cls, token, passwd): """Set the password for user :param token: :param passwd: :return: """ s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY']) data = s.loads(token) user = cls.get(data.get('user_id')) user.password = passwd db.session.add(user) db.session.commit()
def generate_reset_token(self, expiration=3600): """Generate token for password reset only with email address. """ s = Serializer(current_app.config['SECRET_KEY'], expiration) return s.dumps({'reset': self.id})
def reset_password(self, token, new_password): """Reset password with tokens. """ s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except: return False if data.get('reset') != self.id: return False self.password = new_password db.session.add(self) return True
def generate_email_change_token(self, new_email, expiration=3600): """Generate token for changing email address. """ s = Serializer(current_app.config['SECRET_KEY'], expiration) return s.dumps({'change_email': self.id, 'new_email': new_email})
def generate_auth_token(self, expiration): """Generate token for authentication. """ s = Serializer(current_app.config['SECRET_KEY'], expires_in=expiration) return s.dumps({'id': self.id}).decode('ascii')
def verify_token(token): from flask import current_app expire_time = current_app.config.get("EXPIRES_TIME") or 3600 token_key = current_app.config["APP_KEY"] s = TimedJSONWebSignatureSerializer(token_key, expires_in=expire_time) try: d = s.loads(token) user = User.query.get(d["uid"]) g.session_id = d["session"] return user except: return None
def generate_token(self, session): from flask import current_app expire_time = current_app.config.get("EXPIRES_TIME") or 3600 token_key = current_app.config["APP_KEY"] s = TimedJSONWebSignatureSerializer(token_key, expires_in=expire_time) d = s.dumps({"username": self.xh, "uid": self.id_, "session": session}) return d.decode("ascii")
def post(self): """Can be execute when receive HTTP Method `POST`.""" args = parsers.user_post_parser.parse_args() user = User.query.filter_by(username=args['username']).first() # Check the args['password'] whether as same as user.password. if user.check_password(args['password']): # serializer object will be saved the token period of time. serializer = Serializer( current_app.config['SECRET_KEY'], expires_in=600) return {'token': serializer.dumps({'id': user.id})} else: abort(401)
def _make_token(self, data, timeout): s = Serializer(current_app.config['SECRET_KEY'], timeout) return s.dumps(data)
def _verify_token(self, token): s = Serializer(current_app.config['SECRET_KEY']) data = None expired, invalid = False, False try: data = s.loads(token) except SignatureExpired: expired = True except Exception: invalid = True return expired, invalid, data
def generate_auth_token(self, expiration): s = Serializer(current_app.config['ECRET_KEY'],expires_in=expiration) return s.dumps({'id': self.id})
def verify_auth_token(token): s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except: return None return User.query.get(data['id']) # ??????JSON???????? # ???????????????????????