我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用oauth2client.client.OAuth2WebServerFlow()。
def _make_flow(request, scopes, return_url=None): """Creates a Web Server Flow""" # Generate a CSRF token to prevent malicious requests. csrf_token = hashlib.sha256(os.urandom(1024)).hexdigest() request.session[_CSRF_KEY] = csrf_token state = json.dumps({ 'csrf_token': csrf_token, 'return_url': return_url, }) flow = client.OAuth2WebServerFlow( client_id=django_util.oauth2_settings.client_id, client_secret=django_util.oauth2_settings.client_secret, scope=scopes, state=state, redirect_uri=request.build_absolute_uri( urlresolvers.reverse("google_oauth:callback"))) flow_key = _FLOW_KEY.format(csrf_token) request.session[flow_key] = pickle.dumps(flow) return flow
def _create_flow(self, request_handler): """Create the Flow object. The Flow is calculated lazily since we don't know where this app is running until it receives a request, at which point redirect_uri can be calculated and then the Flow object can be constructed. Args: request_handler: webapp.RequestHandler, the request handler. """ if self.flow is None: redirect_uri = request_handler.request.relative_url( self._callback_path) # Usually /oauth2callback self.flow = OAuth2WebServerFlow( self._client_id, self._client_secret, self._scope, redirect_uri=redirect_uri, user_agent=self._user_agent, auth_uri=self._auth_uri, token_uri=self._token_uri, revoke_uri=self._revoke_uri, **self._kwargs)
def _create_flow(self, request_handler): """Create the Flow object. The Flow is calculated lazily since we don't know where this app is running until it receives a request, at which point redirect_uri can be calculated and then the Flow object can be constructed. Args: request_handler: webapp.RequestHandler, the request handler. """ if self.flow is None: redirect_uri = request_handler.request.relative_url( self._callback_path) # Usually /oauth2callback self.flow = OAuth2WebServerFlow(self._client_id, self._client_secret, self._scope, redirect_uri=redirect_uri, user_agent=self._user_agent, auth_uri=self._auth_uri, token_uri=self._token_uri, revoke_uri=self._revoke_uri, **self._kwargs)
def _create_flow(self, request_handler): """Create the Flow object. The Flow is calculated lazily since we don't know where this app is running until it receives a request, at which point redirect_uri can be calculated and then the Flow object can be constructed. Args: request_handler: webapp.RequestHandler, the request handler. """ if self.flow is None: redirect_uri = request_handler.request.relative_url( self._callback_path) # Usually /oauth2callback self.flow = client.OAuth2WebServerFlow( self._client_id, self._client_secret, self._scope, redirect_uri=redirect_uri, user_agent=self._user_agent, auth_uri=self._auth_uri, token_uri=self._token_uri, revoke_uri=self._revoke_uri, **self._kwargs)
def get_credentials(flags): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ store = Storage(flags.credfile) with warnings.catch_warnings(): warnings.simplefilter("ignore") credentials = store.get() if not credentials or credentials.invalid: flow = client.OAuth2WebServerFlow(**CLIENT_CREDENTIAL) credentials = tools.run_flow(flow, store, flags) print('credential file saved at\n\t' + flags.credfile) return credentials
def _create_flow(self, request_handler): """Create the Flow object. The Flow is calculated lazily since we don't know where this app is running until it receives a request, at which point redirect_uri can be calculated and then the Flow object can be constructed. Args: request_handler: webapp.RequestHandler, the request handler. """ if self.flow is None: redirect_uri = request_handler.request.relative_url( self._callback_path) # Usually /oauth2callback self.flow = OAuth2WebServerFlow(self._client_id, self._client_secret, self._scope, redirect_uri=redirect_uri, user_agent=self._user_agent, auth_uri=self._auth_uri, token_uri=self._token_uri, **self._kwargs)
def main(): # Arguments parsing parser = argparse.ArgumentParser("Client ID and Secret are mandatory arguments") parser.add_argument("-i", "--id", required=True, help="Client id", metavar='<client-id>') parser.add_argument("-s", "--secret", required=True, help="Client secret", metavar='<client-secret>') parser.add_argument("-c", "--console", default=False, help="Authenticate only using console (for headless systems)", action="store_true") args = parser.parse_args() # Scopes of authorization activity = "https://www.googleapis.com/auth/fitness.activity.write" body = "https://www.googleapis.com/auth/fitness.body.write" location = "https://www.googleapis.com/auth/fitness.location.write" scopes = activity + " " + body + " " + location flow = OAuth2WebServerFlow(args.id, args.secret, scopes) storage = Storage('google.json') flags = ['--noauth_local_webserver'] if args.console else [] run_flow(flow, storage, argparser.parse_args(flags))
def test_override_flow_via_kwargs(self): """Passing kwargs to override defaults.""" flow = client.OAuth2WebServerFlow( client_id='client_id+1', client_secret='secret+1', scope='foo', redirect_uri=client.OOB_CALLBACK_URN, user_agent='unittest-sample/1.0', access_type='online', response_type='token' ) authorize_url = flow.step1_get_authorize_url() parsed = urllib.parse.urlparse(authorize_url) q = urllib.parse.parse_qs(parsed[4]) self.assertEqual('client_id+1', q['client_id'][0]) self.assertEqual('token', q['response_type'][0]) self.assertEqual('foo', q['scope'][0]) self.assertEqual(client.OOB_CALLBACK_URN, q['redirect_uri'][0]) self.assertEqual('online', q['access_type'][0])
def test_step1_get_authorize_url_redirect_override(self, logger): flow = client.OAuth2WebServerFlow('client_id+1', scope='foo', redirect_uri=client.OOB_CALLBACK_URN) alt_redirect = 'foo:bar' self.assertEqual(flow.redirect_uri, client.OOB_CALLBACK_URN) result = flow.step1_get_authorize_url(redirect_uri=alt_redirect) # Make sure the redirect value was updated. self.assertEqual(flow.redirect_uri, alt_redirect) query_params = { 'client_id': flow.client_id, 'redirect_uri': alt_redirect, 'scope': flow.scope, 'access_type': 'offline', 'response_type': 'code', } expected = _helpers.update_query_params(flow.auth_uri, query_params) assertUrisEqual(self, expected, result) # Check stubs. self.assertEqual(logger.warning.call_count, 1)
def test_step1_get_authorize_url_pkce_invalid_verifier( self, fake_verifier, fake_challenge): fake_verifier.return_value = self.good_verifier fake_challenge.return_value = self.good_challenger flow = client.OAuth2WebServerFlow( 'client_id+1', scope='foo', redirect_uri='http://example.com', pkce=True, code_verifier=self.bad_verifier) auth_url = urllib.parse.urlparse(flow.step1_get_authorize_url()) self.assertEqual(flow.code_verifier, self.bad_verifier) results = dict(urllib.parse.parse_qsl(auth_url.query)) self.assertEqual( results['code_challenge'], self.good_challenger.decode()) self.assertEqual(results['code_challenge_method'], 'S256') fake_verifier.assert_not_called() fake_challenge.assert_called_with(self.bad_verifier)
def test_step1_get_authorize_url_without_login_hint(self): login_hint = 'There are wascally wabbits nearby' flow = client.OAuth2WebServerFlow('client_id+1', scope='foo', redirect_uri=client.OOB_CALLBACK_URN, login_hint=login_hint) result = flow.step1_get_authorize_url() query_params = { 'client_id': flow.client_id, 'login_hint': login_hint, 'redirect_uri': client.OOB_CALLBACK_URN, 'scope': flow.scope, 'access_type': 'offline', 'response_type': 'code', } expected = _helpers.update_query_params(flow.auth_uri, query_params) assertUrisEqual(self, expected, result)
def test_exchange_with_pkce(self): http = http_mock.HttpMockSequence([ ({'status': http_client.OK}, b'access_token=SlAV32hkKG'), ]) flow = client.OAuth2WebServerFlow( 'client_id+1', scope='foo', redirect_uri='http://example.com', pkce=True, code_verifier=self.good_verifier) flow.step2_exchange(code='some random code', http=http) self.assertEqual(len(http.requests), 1) test_request = http.requests[0] self.assertIn( 'code_verifier={0}'.format(self.good_verifier.decode()), test_request['body'])
def test_exchange_using_authorization_header(self): auth_header = 'Basic Y2xpZW50X2lkKzE6c2Vjexc_managerV0KzE=', flow = client.OAuth2WebServerFlow( client_id='client_id+1', authorization_header=auth_header, scope='foo', redirect_uri=client.OOB_CALLBACK_URN, user_agent='unittest-sample/1.0', revoke_uri='dummy_revoke_uri', ) http = http_mock.HttpMockSequence([ ({'status': http_client.OK}, b'access_token=SlAV32hkKG'), ]) credentials = flow.step2_exchange(code='some random code', http=http) self.assertEqual('SlAV32hkKG', credentials.access_token) self.assertEqual(len(http.requests), 1) test_request = http.requests[0] # Did we pass the Authorization header? self.assertEqual(test_request['headers']['Authorization'], auth_header) # Did we omit client_secret from POST body? self.assertTrue('client_secret' not in test_request['body'])
def callback(): code = request.args.get('code', None) if code is None: return render_template( 'show_message.html', message='There was an error while trying to authenticate you.' 'Please, try again.'), 503 flow = client.OAuth2WebServerFlow( YOUTUBE_CLIENT_ID, YOUTUBE_CLIENT_SECRET, scope='https://www.googleapis.com/auth/youtube.readonly', redirect_uri=YOUTUBE_REDIRECT_URI ) credentials = flow.step2_exchange(code) session['credentials'] = credentials.to_json() influxdb.count('youtube.logins') return redirect(url_for('youtube.index'))
def get(self, request): flow = OAuth2WebServerFlow( client_id=CLIENT_ID, client_secret=CLIENT_SECRET, redirect_uri=request.build_absolute_uri(reverse('gmail:callback')), scope='https://www.googleapis.com/auth/gmail.send', ) #log.debug(flow.redirect_uri) auth_uri = flow.step1_get_authorize_url() try: GmailAccount.objects.get(user=request.user) #return HttpResponseRedirect(auth_uri) except GmailAccount.DoesNotExist: pass GmailAccount(user=request.user, flow=flow).save() return HttpResponseRedirect(auth_uri)
def _make_flow(self, return_url=None, **kwargs): """Creates a Web Server Flow""" # Generate a CSRF token to prevent malicious requests. csrf_token = hashlib.sha256(os.urandom(1024)).hexdigest() session['google_oauth2_csrf_token'] = csrf_token state = json.dumps({ 'csrf_token': csrf_token, 'return_url': return_url }) kw = self.flow_kwargs.copy() kw.update(kwargs) extra_scopes = util.scopes_to_string(kw.pop('scopes', '')) scopes = ' '.join([util.scopes_to_string(self.scopes), extra_scopes]) return OAuth2WebServerFlow( client_id=self.client_id, client_secret=self.client_secret, scope=scopes, state=state, redirect_uri=url_for('oauth2.callback', _external=True), **kw)
def get_user_account_credentials(self): from oauth2client.client import OAuth2WebServerFlow from oauth2client.file import Storage from oauth2client.tools import run_flow, argparser flow = OAuth2WebServerFlow( client_id=('495642085510-k0tmvj2m941jhre2nbqka17vqpjfddtd' '.apps.googleusercontent.com'), client_secret='kOc9wMptUtxkcIFbtZCcrEAc', scope=self.scope, redirect_uri='urn:ietf:wg:oauth:2.0:oob') storage = Storage('bigquery_credentials.dat') credentials = storage.get() if credentials is None or credentials.invalid or self.reauth: credentials = run_flow(flow, storage, argparser.parse_args([])) return credentials
def get_credentials(): home_dir = os.path.expanduser('~') credential_dir = os.path.join(home_dir, '.credentials') print("checking for cached credentials") if not os.path.exists(credential_dir): os.makedirs(credential_dir) credential_path = os.path.join(credential_dir,'mycroft-gmail-skill.json') store = oauth2client.file.Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: credentials = tools.run_flow(OAuth2WebServerFlow(client_id=CID,client_secret=CIS,scope=SCOPES,user_agent=APPLICATION_NAME),store) print 'Storing credentials to ' + credential_dir print 'Your GMail Skill is now authenticated ' else: print 'Loaded credentials for Gmail Skill from ' + credential_dir return credentials