我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用jwt.encode()。
def auth_jwt_project(short_name): """Create a JWT for a project via its secret KEY.""" project_secret_key = None if 'Authorization' in request.headers: project_secret_key = request.headers.get('Authorization') if project_secret_key: project = project_repo.get_by_shortname(short_name) if project and project.secret_key == project_secret_key: token = jwt.encode({'short_name': short_name, 'project_id': project.id}, project.secret_key, algorithm='HS256') return token else: return abort(404) else: return abort(403)
def get_jwt(self): exp = datetime.datetime.utcnow() + datetime.timedelta(minutes=10) exp = calendar.timegm(exp.timetuple()) # Generate the JWT payload = { # issued at time 'iat': int(time.time()), # JWT expiration time (10 minute maximum) 'exp': exp, # Integration's GitHub identifier 'iss': options.get('github.integration-app-id'), } return jwt.encode( payload, options.get('github.integration-private-key'), algorithm='RS256' )
def encode_auth_token(self, user_id): """ Generates the Auth Token :return: string """ try: payload = { 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=0, seconds=5), 'iat': datetime.datetime.utcnow(), 'sub': user_id } return jwt.encode( payload, app.config.get('SECRET_KEY'), algorithm='HS256' ) except Exception as e: return e
def make_auth_header(installation_id): utcnow = datetime.utcnow() + timedelta(seconds=-5) duration = timedelta(seconds=30) payload = { "iat": utcnow, "exp": utcnow + duration, "iss": 2510 } pem = get_private_pem() encoded = jwt.encode(payload, pem, "RS256") headers = { "Authorization": "Bearer " + encoded.decode("utf-8"), "Accept": "application/vnd.github.machine-man-preview+json" } auth_url = "https://api.github.com/installations/{}/access_tokens".format(installation_id) r = requests.post(auth_url, headers=headers) if not r.ok: print(r.json()["message"]) r.raise_for_status() token = r.json()["token"] return { "Authorization": "token {}".format(token) }
def post(self): """Login the user""" username = request.json['username'] password = request.json['password'] us = User.query\ .filter(User.disabled is False)\ .filter(User.sigaa_user_name == username)\ .first() abort_if_none(us, 403, 'Username or password incorrect') if not check_password_hash(us.password, password): return msg('Username or password incorrect'), 403 token = jwt.encode( {'id_user': us.id_user, 'tid': random.random()}, config.SECRET_KEY, algorithm='HS256' ).decode('utf-8') return msg(token, 'token')
def generate_authorization_token(context, private_key): """Generate authorization token from the private key.""" expiry = datetime.datetime.utcnow() + datetime.timedelta(days=90) userid = "testuser" path_to_private_key = 'data/{private_key}'.format(private_key=private_key) # initial value context.token = None with open(path_to_private_key) as fin: private_key = fin.read() payload = { 'exp': expiry, 'iat': datetime.datetime.utcnow(), 'sub': userid } token = jwt.encode(payload, key=private_key, algorithm='RS256') decoded = token.decode('utf-8') # print(decoded) context.token = decoded
def test_make_provider_token_calls_jwt_encode_with_correct_args(self): """ Test that APNSConnection.make_provider_token() calls jwt.encode() with the correct arguments. jwt.encode() returns different results each time even with the same data passed in so we cannot just test for the expected return value. """ issued_at = time.time() connection = jwt_apns_client.APNSConnection( team_id='TEAMID', apns_key_id='KEYID', apns_key_path=self.KEY_FILE_PATH) with mock.patch('jwt_apns_client.utils.jwt.encode') as mock_encode: connection.make_provider_token(issued_at=issued_at) mock_encode.assert_called_with( { 'iss': connection.team_id, 'iat': issued_at }, connection.secret, algorithm=connection.algorithm, headers=connection.get_token_headers())
def encode_auth_token(self, user_id): """Generates the auth token""" try: payload = { 'exp': datetime.datetime.utcnow() + datetime.timedelta( days=current_app.config.get('TOKEN_EXPIRATION_DAYS'), seconds=current_app.config.get('TOKEN_EXPIRATION_SECONDS') ), 'iat': datetime.datetime.utcnow(), 'sub': user_id } return jwt.encode( payload, current_app.config.get('SECRET_KEY'), algorithm='HS256' ) except Exception as e: return e
def sign(self, uri, endpoint, endpoint_path, method_verb, *args, **kwargs): try: params = kwargs['params'] except KeyError: params = {} if method_verb != 'POST': endpoint_path += urllib.parse.urlencode(params) msg = {'path': endpoint_path, 'nonce': self.nonce(), 'token_id': self.key} signature = jwt.encode(msg, self.secret, algorithm='HS256') headers = {'X-Quoine-API-Version': '2', 'X-Quoine-Auth': signature, 'Content-Type': 'application/json'} request = {'headers': headers} if method_verb == 'POST': request['json'] = params return self.uri + endpoint_path, request
def auth(): email = request.form["email"] password = request.form["password"] connection = get_db() cursor = connection.cursor() cursor.execute("SELECT email FROM users WHERE email = %s", (email,)) est = cursor.fetchone() if not est: logging.warning('Unregistered user \'%s\' login attempt unsuccessful' % email) return '{"success":false, "message":"Invalid email"}' cursor.execute("SELECT u.password_hash, u.password_salt, r.name FROM users u LEFT JOIN roles r ON u.role_id = r.id WHERE u.email = %s and u.email_verified is True", (email,)) rst = cursor.fetchone() if not rst: return '{"success":false, "message":"Email is not Verified"}' password_hash = rst[0].hex() password_salt = bytes.fromhex(rst[1].hex()) password_hash_new = scrypt.hash(password, password_salt).hex() role = rst[2] if password_hash == password_hash_new: access_token = jwt.encode({'sub': email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1), 'role': role}, jwt_hs256_secret, algorithm='HS256') logging.warning('User: \'' + str(email) + '\' logged in successfully') return '{"access_token": "%s"}\n' % (access_token.decode('utf-8'),) logging.warning('User: \'' + str(email) + '\' login attempt unsuccessful: Incorrect Password') return '{"success":false, "message":"Incorrect Password"}'
def create_jwt(self, data): """ Creates a JWT used for future requests tothe API data is a dict which contains keys username, secret """ message = { 'login_user_name': data['username'], 'time_stamp': 'unused', 'app_id': '', # Do not create new consumer 'app_name': '', # Do not create new consumer 'temenos_id': '', # Whatever that does } if settings.GATEWAYLOGIN_HAS_CBS: # Not sure if that is the right thing to do message['is_first'] = True else: # Fake when there is no core banking system message.update({ 'is_first': False, 'cbs_token': 'dummy', }) token = jwt.encode(message, data['secret'], 'HS256') self.token = token.decode('utf-8') return self.token
def dump(self, secret=None): """ Dump the token into a stringified JWT. :param secret: The secret to sign the JWT with. If this is omitted, the secret will be sourced from ``current_app.config['SECRET_KEY']`` :returns: The stringified JWT. """ self.issued_at = datetime.datetime.now(pytz.UTC) payload, err = self.TokenSchema().dump(self) if secret is None: secret = current_app.config['SECRET_KEY'] return jwt.encode(payload, secret).decode('UTF-8')
def generate_signed_token(private_pem, request): import jwt now = datetime.datetime.utcnow() claims = { 'scope': request.scope, 'exp': now + datetime.timedelta(seconds=request.expires_in) } claims.update(request.claims) token = jwt.encode(claims, private_pem, 'RS256') token = to_unicode(token, "UTF-8") return token
def generate_token(payload: dict, exp_seconds: int) -> str: """ generate a JWT with the given payload. uses HMAC + SHA-256 hash algorithm. the token expires after the given number of seconds. """ jwt_secret = os.getenv("JWT_SECRET") jwt_iss = os.getenv("JWT_ISS") if os.getenv("DEBUG"): # if running in debug mode, use timezone of machine payload["iat"] = int(dt.datetime.now().timestamp()) payload["exp"] = int((dt.datetime.now() + dt.timedelta(seconds=exp_seconds)).timestamp()) else: # if running in production, use UTC payload["iat"] = int(dt.datetime.utcnow().timestamp()) payload["exp"] = int((dt.datetime.utcnow() + dt.timedelta(seconds=exp_seconds)).timestamp()) payload["iss"] = jwt_iss try: token = jwt.encode(payload, jwt_secret, algorithm="HS256") return token.decode("utf-8") except Exception as e: raise(e)
def _generate_jwt(): try: now = int(time.time()) claims["iat"] = now claims["nbf"] = now - 1 claims["exp"] = now + TOKEN_EXPIRE_SECOND claims["jti"] = str(random.uniform(0, RANDOM_MAX_VALUE)) dir_path = os.path.abspath(os.path.dirname(__file__)) with open(os.path.join(dir_path, PRV_FILE), 'r') as rsa_priv_file: key = rsa_priv_file.read() encoded = jwt.encode(claims, key, algorithm=HEAD_ALG, headers=header) logger.debug(encoded) return encoded except Exception, e: logger.error("Generate JWT for registry wrong : %s" % str(e)) raise Exception("Generate JWT for registry wrong : %s" % str(e))
def launch_lti() -> t.Any: """Do a LTI launch. .. :quickref: LTI; Do a LTI Launch. """ lti = { 'params': CanvasLTI.create_from_request(flask.request).launch_params, 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=1) } return flask.redirect( '{}/lti_launch/?inLTI=true&jwt={}'.format( app.config['EXTERNAL_URL'], urllib.parse.quote( jwt.encode( lti, app.config['LTI_SECRET_KEY'], algorithm='HS512' ).decode('utf8') ) ) )
def encode_auth_token(self, user_id): """ Encode the Auth token :param user_id: User's Id :return: """ try: payload = { 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=app.config.get('AUTH_TOKEN_EXPIRY_DAYS'), seconds=app.config.get( 'AUTH_TOKEN_EXPIRY_SECONDS')), 'iat': datetime.datetime.utcnow(), 'sub': user_id } return jwt.encode( payload, app.config['SECRET_KEY'], algorithm='HS256' ) except Exception as e: return e
def create_token(request): # verify basic token approach = request.json.get('auth_approach') username = request.json['username'] password = request.json['password'] if approach == 'password': account = verify_password(username, password) elif approach == 'wxapp': account = verify_wxapp(username, password, request.args.get('code')) if not account: return False, {} payload = { "iss": Config.ISS, "iat": int(time.time()), "exp": int(time.time()) + 86400 * 7, "aud": Config.AUDIENCE, "sub": str(account.id), "nickname": account['nickname'], "scopes": ['open'] } token = jwt.encode(payload, 'secret', algorithm='HS256') return True, {'access_token': token, 'account_id': str(account.id)}
def test_form_valid(self, customer, sync_customer): customer.retrieve.return_value = {"id": "some_stripe_id"} sync_customer.return_value = True transaction_details = { "customer_id": "bar" } encoded = jwt.encode(transaction_details, settings.OCTOBAT_PRIVATE_KEY) data = { "transactionDetails": encoded.decode("utf-8") } self.login(username=self.user.username, password="password") resp = self.client.post(reverse("pinax_stripe_subscription_create"), data) customer.retrieve.assert_called_with("bar") cust = Customer.objects.get(user=self.user) sync_customer.assert_called_with(cust, {"id": "some_stripe_id"}) self.assertRedirects(resp, reverse("pinax_stripe_subscription_list"))
def test_customer_sync_failed(self, customer, sync_customer): customer.retrieve.return_value = {"id": "some_stripe_id"} sync_customer.side_effect = StripeError() transaction_details = { "customer_id": "bar" } encoded = jwt.encode(transaction_details, settings.OCTOBAT_PRIVATE_KEY) data = { "transactionDetails": encoded.decode("utf-8") } self.login(username=self.user.username, password="password") resp = self.client.post(reverse("pinax_stripe_subscription_create"), data, follow=True) customer.retrieve.assert_called_with("bar") cust = Customer.objects.get(user=self.user) sync_customer.assert_called_with(cust, {"id": "some_stripe_id"}) self.assertRedirects(resp, reverse("pinax_stripe_subscription_list")) self.assertContains(resp, "Unable to communicate with stripe")
def post(self, request): serializer = LoginSerializer(data=request.data) if serializer.is_valid(): employee = Employee.objects.get(emp_id=request.data.get('emp_id')) encode = jwt.encode({'emp_id': employee.emp_id, 'auth': employee.auth, 'part_id': employee.part_id, 'create_time': time(), 'ip_addr': request.META.get('REMOTE_ADDR') }, settings.SECRET_KEY, algorithm='HS256') token = dict() token['token'] = 'JWT ' + encode return Response(token) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def register(ctx, request): try: payload = request.json() nickname = payload['nickname'] mail = payload['mail'] password = payload['password'] except KeyError as e: raise HTTPBadRequest('{} is required'.format(e)) except Exception as e: raise HTTPBadRequest(e) user = User.query.filter(or_(User.nickname == nickname, User.mail == mail)).first() if user is not None: return jsonify(code=400, message='user exist') catalog = Catalog(name='notes') user = User(nickname=nickname, mail=mail, catalogs=[catalog], password=bcrypt.hashpw(password.encode(), bcrypt.gensalt())) db.session.add(user) try: db.session.commit() return jsonify(code=200) except Exception as e: logging.error(e) db.session.rollback() raise HTTPInternalServerError(e)
def encode_refresh_token(identity, secret, algorithm, expires_delta, csrf, identity_claim_key): """ Creates a new encoded (utf-8) refresh token. :param identity: Some identifier used to identify the owner of this token :param secret: Secret key to encode the JWT with :param algorithm: Which algorithm to use for the toek :param expires_delta: How far in the future this token should expire (set to False to disable expiration) :type expires_delta: datetime.timedelta or False :param csrf: Whether to include a csrf double submit claim in this token (boolean) :param identity_claim_key: Which key should be used to store the identity :return: Encoded refresh token """ token_data = { identity_claim_key: identity, 'type': 'refresh', } if csrf: token_data['csrf'] = _create_csrf_token() return _encode_jwt(token_data, expires_delta, secret, algorithm)
def decode_pem_key(key_pem): """Convert plaintext PEM key into the format usable for JWT generation Args: key_pam (str): key data in PEM format, presented as plain string Returns: Parsed PEM data """ private_key = serialization.load_pem_private_key( data=key_pem.encode('ascii'), password=None, backend=default_backend()) msg = 'Unexpected private key type' assert isinstance(private_key, rsa.RSAPrivateKey), msg assert private_key.key_size >= 2048, 'RSA key size too small' return private_key
def __call__(self, request, *args, **kwargs): import jwt if self.cookie_name in request.cookies: try: value = request.cookies[self.cookie_name].value data = jwt.decode(value, self.secret) request.session = Session(data) except: pass else: request.session = Session() resp = self.func(request, *args, **kwargs) if not request.session.clean: data = dict(request.session) value = jwt.encode(request.session, self.secret, algorith=self.algorithm) resp.cookies[self.cookie_name] = value return resp
def session(request): data = json.loads(request.body) allowedUsers = get_allowed_users(data.get('renderingId')) user = (u for u in allowedUsers if u['email'] == data['email']).next() if len(user) and passwords_match(data['password'], user['password']): payload = { 'id': user['id'], 'type': 'users', 'data': { 'email': user['email'], 'first_name': user['first_name'], 'last_name': user['last_name'], 'teams': user['teams'] }, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=14) } secret_key = settings.JWT_SECRET_KEY token = jwt.encode(payload, secret_key) return JsonResponse({ 'token': token }) else: return HttpResponse(status=401)
def refresh_jwt_token(token): payload = jwt.decode( token, key=current_app.config['SECRET_KEY'], algorithms=[JWT_ALGORITHM], ) user = User.query.get(payload['identity']) if not user.active: raise ValueError("User is inactive") orig_iat = payload.get('orig_iat') if not orig_iat: raise ValueError("`orig_iat` field is required") refresh_limit = orig_iat + int(JWT_REFRESH_EXPIRATION_DELTA.total_seconds()) now_ts = datetime.datetime.utcnow().timestamp() if now_ts > refresh_limit: raise ValueError("Refresh has expired") new_payload = jwt_payload(user) new_payload["orig_iat"] = orig_iat token = jwt.encode( new_payload, key=current_app.config['SECRET_KEY'], algorithm=JWT_ALGORITHM, ) return token, user
def create_token(): key = current_app.config['priv_key'] try: data = request.form if data.get('grant_type') != 'client_credentials': return _400('Wrong grant_type') client_id = data.get('client_id') client_secret = data.get('client_secret') aud = data.get('audience', '') if not is_authorized_app(client_id, client_secret): return abort(401) now = int(time.time()) token = {'iss': 'https://tokendealer.example.com', 'aud': aud, 'iat': now, 'exp': now + 3600 * 24} token = jwt.encode(token, key, algorithm='RS512') return {'access_token': token.decode('utf8')} except Exception as e: return _400(str(e))
def create_jwt(self): """ Creates a signed JWT, valid for 60 seconds. :return: """ now = int(time.time()) payload = { "iat": now, "exp": now + 60, "iss": self.integration_id } return jwt.encode( payload, key=self.private_key, algorithm="RS256" )
def get_auth_token(self, user_payload): """ Create a JWT authentication token from ``user_payload`` Args: user_payload(dict, required): A `dict` containing required information to create authentication token """ now = datetime.utcnow() payload = { 'user': user_payload } if 'iat' in self.verify_claims: payload['iat'] = now if 'nbf' in self.verify_claims: payload['nbf'] = now + self.leeway if 'exp' in self.verify_claims: payload['exp'] = now + self.expiration_delta return jwt.encode(payload, self.secret_key, json_encoder=ExtendedJSONEncoder).decode('utf-8')
def get_auth_token(self, user_payload): """ Extracts username, password from the `user_payload` and encode the credentials `username:password` in `base64` form """ username = user_payload.get('username') or None password = user_payload.get('password') or None if not username or not password: raise ValueError('`user_payload` must contain both username and password') token = '{username}:{password}'.format( username=username, password=password).encode('utf-8') token_b64 = base64.b64encode(token).decode('utf-8', 'ignore') return '{auth_header_prefix} {token_b64}'.format( auth_header_prefix=self.auth_header_prefix, token_b64=token_b64)
def create_jwt(user, secret=None, algorithm=None, expiration_time=None, extra_data={}): if not secret: secret = settings.JWT_SECRET_KEY if not algorithm: algorithm = settings.JWT_SIGN_ALGORITHM if not expiration_time: expiration_time = settings.JWT_EXPIRATION_TIME_DELTA payload = create_jwt_payload(user=user, expiration_delta=expiration_time, issuer=settings.JWT_ISSUER, **extra_data) return jwt.encode(payload, secret, algorithm=algorithm)
def post(self): parser = reqparse.RequestParser() parser.add_argument('username', type=str, required=True) parser.add_argument('password', type=str, required=True) reqdata = parser.parse_args(strict=True) user = User.query.filter_by(login = reqdata['username']).first() if not user: abort(401, message = 'Wrong credentials') if not Security.check_password(user.password, reqdata['password']): abort(401, message = 'Wrong credentials') user.scopes = Scope.query.all() enc_jwt = jwt.encode({'user' : UserSchema().dump(user).data}, Security.get_jwt_skey(), algorithm='HS256') return { 'response' : { 'token' : enc_jwt } }
def _make_url(self, url: Text, extra: Dict, request: 'Request') -> Text: """ Compute an URL that will go through the redirection system that allows the trigger of the `LinkClick` layer. """ real_url = patch_qs(url, extra) if self.slug: url = urljoin(request.message.get_url_base(), '/links/facebook') url = patch_qs(url, { 'l': jwt.encode( { 'u': request.user.fbid, 'p': request.user.page_id, 'h': real_url, 's': self.slug, }, settings.WEBVIEW_SECRET_KEY, algorithm=settings.WEBVIEW_JWT_ALGORITHM, ) }) return url return real_url
def custom_json_encoder(o): """ A custom json encoder that knows how to encode other types commonly used by Inmanta """ if isinstance(o, uuid.UUID): return str(o) if isinstance(o, datetime): return o.isoformat() if hasattr(o, "to_dict"): return o.to_dict() if isinstance(o, enum.Enum): return o.name if isinstance(o, Exception): # Logs can push exceptions through RPC. Return a string representation. return str(o) if isinstance(o, execute.util.Unknown): return const.UKNOWN_STRING LOGGER.error("Unable to serialize %s", o) raise TypeError(repr(o) + " is not JSON serializable")
def encode_token(client_types, environment=None, idempotent=False, expire=None): cfg = inmanta_config.AuthJWTConfig.get_sign_config() payload = { "iss": cfg.issuer, "aud": [cfg.audience], const.INMANTA_URN + "ct": ",".join(client_types), } if not idempotent: payload["iat"] = int(time.time()) if cfg.expire > 0: payload["exp"] = int(time.time() + cfg.expire) elif expire is not None: payload["exp"] = int(time.time() + expire) if environment is not None: payload[const.INMANTA_URN + "env"] = environment return jwt.encode(payload, cfg.key, cfg.algo).decode()
def _create_jwt(self, claim: Dict[str, Any], key_pemstr: str) -> str: _log.debug("Encoding JWT response: %s", claim) fp = base64.b32encode( SHA256.new( data=RSA.importKey(key_pemstr).publickey().exportKey(format="DER") ).digest()[0:30] # shorten to 240 bit presumably so no padding is necessary ).decode('utf-8') kid = ":".join([fp[i:i + 4] for i in range(0, len(fp), 4)]) jwtstr = jwt.encode( claim, headers={ "typ": "JWT", "alg": "RS256", "kid": kid, }, key=key_pemstr, algorithm="RS256", ).decode('utf-8') _log.debug("JWT response: %s", jwtstr) return jwtstr
def _get_or_create_topic_token(self, topic): # dict of topic to issue date and JWT token token_pair = self.__topicTokens.get(topic) if token_pair is None or self._is_expired_token(token_pair[0]): # Create a new token issued_at = time.time() token_dict = { 'iss': self.__team_id, 'iat': issued_at, } headers = { 'alg': self.__encryption_algorithm, 'kid': self.__auth_key_id, } jwt_token = jwt.encode(token_dict, self.__auth_key, algorithm=self.__encryption_algorithm, headers=headers).decode('ascii') self.__topicTokens[topic] = (issued_at, jwt_token) return jwt_token else: return token_pair[1]
def gen_syllabus_post_data(start_year=str(datetime.now().year),semester=Semester.AUTUMN.value): data =('__EVENTTARGET=&__EVENTARGUMENT=' \ '&__VIEWSTATE=%2FwEPDwUKLTc4MzA3NjE3Mg9kFgICAQ9kFgYCAQ9kFgRmDxAPFgIeBFRleHQFDzIwMTUtMjAxNuWtpuW5tGQQFQcPMjAxMi0yMDEz5a2m5bm0DzIwMTMtMjAxNOWtpuW5tA8yMDE0LTIwMTXlrablubQPMjAxNS0yMDE25a2m5bm0DzIwMTYtMjAxN%2BWtpuW5tA8yMDE3LTIwMTjlrablubQPMjAxOC0yMDE55a2m5bm0FQc' \ 'PMjAxMi0yMDEz5a2m5bm0DzIwMTMtMjAxNOWtpuW5tA8yMDE0LTIwMTXlrablubQPMjAxNS0yMDE25a2m' \ '5bm0DzIwMTYtMjAxN%2BWtpuW5tA8yMDE3LTIwMTjlrablubQPMjAxOC0yMDE55a2m' \ '5bm0FCsDB2dnZ2dnZ2cWAGQCAQ8QZGQWAQICZAIFDxQrAAsP' \ 'FggeCERhdGFLZXlzFgAeC18hSXRlbUNvdW50Zh4JUGFnZUNvdW50Ag' \ 'EeFV8hRGF0YVNvdXJjZUl0ZW1Db3VudGZkZBYEHghDc3NDbGFzcwUMREdQYWdlclN0eWxlHgRfIVN' \ 'CAgIWBB8FBQ1ER0hlYWRlclN0eWxlHwYCAhYEHwUFDURHRm9vdGVyU3R5bGUfBgICFgQfBQULREdJdGVtU3R5bGUfBgICFgQfBQUWREdBbHRlcm5hdGluZ0l0ZW1TdHlsZR8GAgIWBB8FBRNER1NlbGVjdGVkSXRlbVN0eWxlHwYCAhYEHwUFD0RHRWRpdEl0ZW1TdHlsZR8GAgIWBB8FBQJERx8GAgJkFgJmD2QWAgIBD2QWBAIDDw8WAh8ABQ3lhbEw6Zeo6K' \ '%2B%2B56iLZGQCBA8PFgIfAAUHMOWtpuWIhmRkAgYPFCsACw8WAh4HVmlzaWJsZWhkZBYEHwUFDERHUGFnZXJTdHlsZR8GAgIWBB8FBQ1ER0hlYWRlclN0eWxlHwYCAhYEHwUFDURHRm9vdGVyU3R5bGUfBgICFgQfBQULREdJdGVtU3R5bGUfBgICFgQfBQUWREdBbHRlcm5hdGluZ0l0ZW1TdHlsZR8GAgIWBB8FBRNER1NlbGVjdGVkSXRlbVN0eWxlHwYCAhYEHwUFD0RHRWRpdEl0ZW1TdHlsZR8GAgIWBB8FBQJERx8GAgJkZBgBBR5fX0NvbnRyb2xzUmVxdWlyZVBvc3RCYWNrS2V5X18WAgUIdWNzWVMkWE4FCWJ0blNlYXJjaIOA1hvkaeyr6hBNdPilFTCCDv8u' \ '&__VIEWSTATEGENERATOR=E672B8C6&__EVENTVALIDATION=%2FwEWDgKP7Z%2FyDQKA2K7WDwKl3bLICQKs3faYCwKj3ZrWCALF5PiNDALE5IzLDQLD5MCbDwKv3YqZDQL7tY9IAvi1j0gC' \ '%2BrWPSAL63YQMAqWf8%2B4KZKbR9XsGkypHcOunFkHTcdqR6to%3D&' \ 'ucsYS%24XN%24Text={}-{}%' \ 'D1%A7%C4%EA&ucsYS%24XQ=' + str(semester) + '&ucsYS%24hfXN=&btnSearch.x=42&btnSearch.y=21').format(str(start_year), str(int(start_year)+1)) return data.encode('utf-8')
def _integration_authenticated_request(self, method, url): self.since= int(datetime.datetime.now().timestamp()) payload = dict({ 'iat': self.since, 'exp': self.since + self.duration, 'iss': self.integration_id, }) tok = jwt.encode(payload, key=self.rsadata, algorithm='RS256') headers = {'Authorization': 'Bearer {}'.format(tok.decode()), 'Accept': ACCEPT_HEADER, 'Host': 'api.github.com', 'User-Agent': 'python/requests'} req = requests.Request(method, url, headers=headers) prepared = req.prepare() with requests.Session() as s: return s.send(prepared)
def __init__(self, public_key, secret_key, verbose=False): """ You must register in order to get the keys. public_key: It can be obtained from https://developer.ilovepdf.com/user/projects You can see it as "project key" or "jti claim" secret_key: It can be obtained from https://developer.ilovepdf.com/user/projects """ logging.config.fileConfig("logging.cfg") self.logger = logging.getLogger() self.verbose = verbose if self.verbose: self.logger.info("Generating the encoded signed public key...") self.public_key = public_key self.secret_key = secret_key signed_public_key = jwt.encode( {"jti": self.public_key}, self.secret_key, algorithm="HS256" ).decode("utf-8") self.headers = {"Authorization": "Bearer {}".format(signed_public_key)}
def generate_token(self, user_id): """Generates the access token to be used as the Authorization header""" try: # set up a payload with an expiration time payload = { 'exp': datetime.utcnow() + timedelta(minutes=5), 'iat': datetime.utcnow(), 'sub': user_id } # create the byte string token using the payload and the SECRET key jwt_string = jwt.encode( payload, current_app.config.get('SECRET'), algorithm='HS256' ) return jwt_string except Exception as e: # return an error in string format if an exception occurs return str(e)