Python itsdangerous 模块,JSONWebSignatureSerializer() 实例源码

我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用itsdangerous.JSONWebSignatureSerializer()

项目:MyCoin_Backend    作者:Four-Undefined    | 项目源码 | 文件源码
def verify_auth_token(token) :
        s = Serializer(current_app.config['SECRET_KEY'])
        try :
            data = s.loads(token)
        except :
            None
        return User.query.get_or_404(data['id'])
项目:MyCoin_Backend    作者:Four-Undefined    | 项目源码 | 文件源码
def generate_auth_token(self) :
        s = Serializer(
            current_app.config['SECRET_KEY']
        )
        return s.dumps({'id' : self.id})
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def generate_jwt(self, data_dict):
        """Generate a JSON web token for this BTS.

        IMPORTANT: This is subject to replay attacks if there's not a nonce in
        the JWT. It's the callers responsibility to include this in the data
        dict if replay attacks are a concern (a random, unique msgid would
        suffice).
        """
        serializer = itsdangerous.JSONWebSignatureSerializer(self.secret)
        return serializer.dumps(data_dict)
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def check_signed_params(self, jwt_data):
        """
        Decodes the params, makes sure they pass signature (i.e., are valid),
        and then checks that we haven't seen the msgid before. Raises a
        ValueError if errors, else returns True.

        TODO(matt): this particular method seems to be unused (not so the one
                    in federer_handlers.config.config).
        """
        s = itsdangerous.JSONWebSignatureSerializer(self.conf['bts_secret'])
        try:
            data = s.loads(jwt_data)
        except itsdangerous.BadSignature:
            logger.error("Bad jwt signature for request, ignoring.")
            raise ValueError("Bad signature")

        # make sure the msg hasn't been seen before, if so, discard it
        if "msgid" in data:
            if self.msgid_db.seen(str(data['msgid'])):
                logger.error("Endaga: Repeat msgid: %s" % (data['msgid'],))
                raise ValueError("Repeat msgid: %s" % (data['msgid'],))
        else:
            logger.error("Endaga: No message ID.")
            raise ValueError("No message ID.")

        return data
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def check_signed_params(self, jwt_data):
        """Checks a JWT signature and message ID.

        Decodes the params, makes sure they pass signature (i.e., are valid),
        and then checks that we haven't seen the msgid before.

        TODO(matt): refactor as this was copied from federer_handlers.common.
                    Inheriting from common as before does not work because CI
                    cannot import ESL, an import that comes from
                    freeswitch_interconnect.

        Raises:
          ValueError if there are errors

        Returns:
          True if everything checks out
        """
        s = itsdangerous.JSONWebSignatureSerializer(self.conf['bts_secret'])
        try:
            data = s.loads(jwt_data)
        except itsdangerous.BadSignature:
            logger.emergency("Bad signature for request, ignoring.")
            raise ValueError("Bad signature")
        # Make sure the msg hasn't been seen before, if so, discard it.
        if "msgid" in data:
            if self.msgid_db.seen(str(data['msgid'])):
                logger.error("Endaga: Repeat msgid: %s" % (data['msgid'],))
                raise ValueError("Repeat msgid: %s" % (data['msgid'],))
        else:
            logger.error("Endaga: No message ID.")
            raise ValueError("No message ID.")
        return data
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def setUpClass(cls):
        """Setup the test app."""
        cls.test_app = TestApp(core.federer.app.wsgifunc())
        cls.endpoint = '/config/deactivate_number'
        # Setup a serializer so we can send signed data.  Bootstrap the secret.
        config_db = ConfigDB()
        config_db['bts_secret'] = 'yup'
        cls.serializer = itsdangerous.JSONWebSignatureSerializer(
            config_db['bts_secret'])
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def setUpClass(cls):
        """Setup the test app."""
        cls.test_app = TestApp(core.federer.app.wsgifunc())
        cls.endpoint = '/config/deactivate_subscriber'
        # Setup a serializer so we can send signed data.  Bootstrap the secret.
        config_db = ConfigDB()
        config_db['bts_secret'] = 'yup'
        cls.serializer = itsdangerous.JSONWebSignatureSerializer(
            config_db['bts_secret'])
项目:catpel_backend    作者:tangxiangru    | 项目源码 | 文件源码
def generate_confirmation_token(self, expiration=3600):
        s = Serializer(current_app.config['SECRET_KEY'],expiration)
        return s.dumps({'confirm': self.id})
项目:catpel_backend    作者:tangxiangru    | 项目源码 | 文件源码
def generate_auth_token(self):
        s = Serializer(current_app.config['SECRET_KEY'])
        return s.dumps({'id': self.id})
项目:catpel_backend    作者:tangxiangru    | 项目源码 | 文件源码
def verify_auth_token(token):
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except:
            return None
        return User.query.get(data['id'])
项目:oniongate-web    作者:DonnchaC    | 项目源码 | 文件源码
def create_jwt(payload):
    """
    Create a signed JSON Web Token
    """
    serializer = JSONWebSignatureSerializer(current_app.secret_key, algorithm_name='HS256')
    return serializer.dumps(payload)
项目:oniongate-web    作者:DonnchaC    | 项目源码 | 文件源码
def load_jwt(token):
    """
    Verify and load a signed JSON Web Token
    """
    serializer = JSONWebSignatureSerializer(current_app.secret_key, algorithm_name='HS256')
    return serializer.loads(token)
项目:suite    作者:Staffjoy    | 项目源码 | 文件源码
def get_user(cls, token):
        """Use this function to validate tokens sent by users."""
        # Decode token, try to fetch and verify key, and if so return user

        # DEVELOPMENT ACCESS
        if current_app.config.get("DEBUG") == True:
            if token == current_app.config.get("DEV_CRON_API_KEY"):
                return user_model.User.query.filter_by(
                    email=current_app.config.get("DEV_CRON_EMAIL")).first()

        s = Signer(current_app.config["SECRET_KEY"])
        try:
            data = s.loads(token.strip())  # Remove whitespace
        except:
            return None

        if not data.get("id"):
            return None

        if not data.get("key"):
            return None

        apikey = ApiKey.query.get(data.get("id"))
        if apikey is not None:
            if apikey.verify_key(data.get("key")):
                return user_model.User.query.get(apikey.user_id)
        return None
项目:suite    作者:Staffjoy    | 项目源码 | 文件源码
def generate_key(user_id, name):
        """Generate a new key and return the plaintext token"""
        apikey = ApiKey()
        apikey.user_id = user_id
        apikey.name = name

        # Generate key
        # Want 40 char string which is urandom 20.
        # urandom is cryptographically secure.
        key = binascii.hexlify(os.urandom(20))

        apikey.key = key
        db.session.add(apikey)
        db.session.commit()

        # Send an email about this action
        u = user_model.User.query.get(user_id)
        u.send_email(
            "[Alert] New API Key Issued",
            render_template(
                "email/new-api-key.html",
                name=name, ),
            force_send=True)

        # Generate token, which include the indexed id and key
        s = Signer(current_app.config["SECRET_KEY"])
        return s.dumps({"id": apikey.id, "key": key})
项目:zeus    作者:getsentry    | 项目源码 | 文件源码
def generate_token(tenant: Tenant) -> str:
    s = JSONWebSignatureSerializer(current_app.secret_key, salt='auth')
    payload = {
        'repo_ids': [str(o) for o in tenant.repository_ids],
    }
    if getattr(tenant, 'user_id', None):
        payload['uid'] = str(tenant.user_id)
    return s.dumps(payload)
项目:zeus    作者:getsentry    | 项目源码 | 文件源码
def parse_token(token: str) -> str:
    s = JSONWebSignatureSerializer(current_app.secret_key, salt='auth')
    try:
        return s.loads(token)
    except BadSignature:
        return None
项目:2017-diary-backend    作者:muxi-mini-project    | 项目源码 | 文件源码
def generate_auth_token(self) :
        s = Serializer(current_app.config['SECRET_KEY'])
        return s.dumps({'id':self.id})

    # ????
项目:2017-diary-backend    作者:muxi-mini-project    | 项目源码 | 文件源码
def confirm(self,token) :
        s = Serializer(current_app.config['SECRET_KEY'],expiration)
        try : 
            data = s.loads(token)
        except : 
            return False
        if  data.get('confirm') != self.id :
            return False 
        self.comfirmed = True 
        db.session.add(self)
        db.session.commit()
        return True 

    #?????????
项目:2017-diary-backend    作者:muxi-mini-project    | 项目源码 | 文件源码
def verify_auth_token(token) :
        s = Serializer(current_app.config['SECRET_KEY'])
        try : 
            data = s.loads(token)
            print 'hhahahha'
            print data
        except : 
            return None
        return User.query.get(data['id'])
项目:restfulpy    作者:Carrene    | 项目源码 | 文件源码
def create_serializer(cls, force=False):
        config = cls.get_config()

        if force:
            return JSONWebSignatureSerializer(
                config['secret'],
                algorithm_name=config['algorithm']
            )
        else:
            return TimedJSONWebSignatureSerializer(
                config['secret'],
                expires_in=config['max_age'],
                algorithm_name=config['algorithm']
            )
项目:Nurevam    作者:Maverun    | 项目源码 | 文件源码
def confirm_login():
    log.info("Checking login....")
    # Check for state and for 0 errors
    state = session.get('oauth2_state')
    if not state or request.values.get('error'):
        return redirect(url_for('index'))

    # Fetch token
    discord = utils.make_session(state=state)
    discord_token = discord.fetch_token(
        data_info.TOKEN_URL,
        client_secret=data_info.OAUTH2_CLIENT_SECRET,
        authorization_response=request.url)
    if not discord_token:
        log.info("Not clear, returning")
        return redirect(url_for('index'))

    # Fetch the user
    user = utils.get_user(discord_token)
    # Generate api_key from user_id
    serializer = JSONWebSignatureSerializer(app.config['SECRET_KEY'])
    api_key = str(serializer.dumps({'user_id': user['id']}))
    # Store api_key
    db.set('user:{}:api_key'.format(user['id']), api_key)
    # Store token
    db.set('user:{}:discord_token'.format(user['id']), json.dumps(discord_token))
    # Store api_token in client session
    api_token = {
        'api_key': api_key,
        'user_id': user['id']
    }
    session.permanent = True
    session['api_token'] = api_token
    log.info("Clear, redirect...")
    return redirect(url_for('after_login'))
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def test_deactivate_number(self):
        """We can deactivate one of the Subscriber's numbers via POST.

        Pushes number into the 'available' state and disassociates the number
        from the subscriber.  Starts an async post task (which we will mock).
        Should also create a 'deactivate_number' UsageEvent.
        """
        # This subscriber currently has only one number and deleting the last
        # number is verboten, so first we've gotta add another number.
        new_number = models.Number(number='6285574719987', state="inuse",
                                   network=self.user_profile.network,
                                   subscriber=self.sub,
                                   kind="number.nexmo.monthly")
        new_number.save()
        # Deactivate the original number.
        url = '/api/v2/numbers/%s' % self.number.number
        data = {
            'state': 'available'
        }
        header = {
            'HTTP_AUTHORIZATION': 'Token %s' % self.user_profile.network.api_token
        }
        with mock.patch('endagaweb.tasks.async_post.delay') as mocked_task:
            response = self.client.post(url, data=data, **header)
        self.assertEqual(200, response.status_code)
        # Reload the original number from the db and check its state.
        number = models.Number.objects.get(id=self.number.id)
        self.assertEqual(data['state'], number.state)
        # The subscriber should now have one associated number.  Reload the
        # sub from the db to verify.
        subscriber = models.Subscriber.objects.get(id=self.sub.id)
        self.assertEqual(new_number.number, subscriber.numbers())
        # The original number should not be associated with a Subscriber or a
        # Network.
        self.assertEqual(None, number.subscriber)
        self.assertEqual(None, number.network)
        # The mocked task should have been called with specific arguments
        self.assertTrue(mocked_task.called)
        args, _ = mocked_task.call_args
        task_endpoint, task_data = args
        expected_url = '%s/config/deactivate_number' % self.bts.inbound_url
        self.assertEqual(expected_url, task_endpoint)
        # The task_data should be signed with the BTS UUID and should have a
        # jwt key which is a dict with a number key.
        serializer = itsdangerous.JSONWebSignatureSerializer(self.bts.secret)
        task_data = serializer.loads(task_data['jwt'])
        self.assertEqual(number.number, task_data['number'])
        # A 'deactivate_number' UsageEvent should have been created.
        event = models.UsageEvent.objects.get(to_number=number.number,
                                              kind='deactivate_number')
        self.assertEqual('deactivated phone number: %s' % number.number,
                         event.reason)
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def test_deactivate_subscriber(self):
        """We can deactivate the Subscriber via DELETE.

        Disassociates the subscriber with its BTS and Network.  Starts an async
        post task (which we will mock) to send this info to the client.
        Deactivates all numbers associated with the subscriber and creates a
        'delete_imsi' UsageEvent.  Also deletes all associated
        PendingCreditUpdates.
        """
        url = '/api/v2/subscribers/%s' % self.sub.imsi
        header = {
            'HTTP_AUTHORIZATION': 'Token %s' % self.user_profile.network.api_token
        }
        with mock.patch('endagaweb.celery.app.send_task') as mocked_task:
            response = self.client.delete(url, **header)
        self.assertEqual(200, response.status_code)
        # The subscriber should no longer be in the DB.
        self.assertEqual(0, models.Subscriber.objects.filter(
            imsi=self.sub.imsi).count())
        # Both of the associated numbers should have been deactivated -- reload
        # them from the DB to check their state.
        number = models.Number.objects.get(id=self.number.id)
        self.assertEqual('available', number.state)
        self.assertEqual(None, number.network)
        self.assertEqual(None, number.subscriber)
        number2 = models.Number.objects.get(id=self.number2.id)
        self.assertEqual('available', number2.state)
        # The associated PendingCreditUpdate should be gone.
        self.assertEqual(0, models.PendingCreditUpdate.objects.filter(
            pk=self.pcu.pk).count())
        # The mocked task should have been called with specific arguments
        self.assertTrue(mocked_task.called)
        args, _ = mocked_task.call_args
        task_name, task_args = args
        task_endpoint, task_data = task_args
        self.assertEqual('endagaweb.tasks.async_post', task_name)
        expected_url = '%s/config/deactivate_subscriber' % self.bts.inbound_url
        self.assertEqual(expected_url, task_endpoint)
        # The task_data should be signed with the BTS UUID and should have a
        # jwt key which is a dict with a imsi key.
        serializer = itsdangerous.JSONWebSignatureSerializer(self.bts.secret)
        task_data = serializer.loads(task_data['jwt'])
        self.assertEqual(self.sub.imsi, task_data['imsi'])
        # A 'delete_imsi' UsageEvent should have been created.
        event_count = models.UsageEvent.objects.filter(
            subscriber_imsi=self.sub.imsi, kind='delete_imsi').count()
        self.assertEqual(1, event_count)
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def deactivate(self):
        """Deactivate a subscriber.

        Send an async post to the BTS to deactivate the subscriber.  Sign the
        request using JWT.  Note that we do not also send deactivate number
        commands -- the BTS will handle that on its own.  If the sub does not
        have an associated BTS, the sub's previous tower may have been deleted.
        We can still delete the sub we just do not have to notify a tower.
        """
        if self.bts:
            url = '%s/config/deactivate_subscriber' % self.bts.inbound_url
            data = {
                'imsi': self.imsi,
                # Add a UUID as a nonce for the message.
                'msgid': str(uuid.uuid4()),
            }
            serializer = itsdangerous.JSONWebSignatureSerializer(
                self.bts.secret)
            signed_data = {
                'jwt': serializer.dumps(data),
            }
            # Retry the async_post for three months until it succeeds.
            retry_delay = 60 * 10
            three_months = 3 * 30 * 24 * 60 * 60.
            max_retries = int(three_months / retry_delay)
            celery_app.send_task(
                'endagaweb.tasks.async_post', (url, signed_data),
                max_retries=max_retries)
        # Deactivate all associated Numbers from this Sub.
        numbers = Number.objects.filter(subscriber=self)
        with transaction.atomic():
            now = django.utils.timezone.now()
            # Create a 'delete_imsi' UsageEvent.
            bts_uuid = None
            if self.bts:
                bts_uuid = self.bts.uuid
            event = UsageEvent.objects.create(
                subscriber=self, date=now, bts=self.bts, kind='delete_imsi',
                subscriber_imsi=self.imsi, bts_uuid=bts_uuid,
                oldamt=self.balance, newamt=self.balance, change=0,
                reason='deactivated subscriber: %s' % self.imsi)
            event.save()
            for number in numbers:
                reason = 'deactivated phone number: %s' % number.number
                event = UsageEvent.objects.create(
                    subscriber=self, date=now, bts=self.bts,
                    kind='deactivate_number', to_number=number.number,
                    reason=reason, oldamt=self.balance, newamt=self.balance,
                    change=0)
                event.save()
                number.network = None
                number.subscriber = None
                number.state = 'available'
                number.save()
            # Actually delete the subscriber.  Note that all associated
            # PendingCreditUpdates will be deleted automatically by the default
            # deletion CASCADE behavior.
            self.delete()