我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用oauth2client.client.credentials_from_clientsecrets_and_code()。
def test_exchange_code_and_file_for_token(self): payload = (b'{' b' "access_token":"asdfghjkl",' b' "expires_in":3600' b'}') http = http_mock.HttpMock(data=payload) credentials = client.credentials_from_clientsecrets_and_code( datafile('client_secrets.json'), self.scope, self.code, http=http) self.assertEqual(credentials.access_token, 'asdfghjkl') self.assertNotEqual(None, credentials.token_expiry) self.assertEqual(set(['foo']), credentials.scopes)
def test_exchange_code_and_cached_file_for_token(self): http = http_mock.HttpMock(data=b'{ "access_token":"asdfghjkl"}') cache_mock = http_mock.CacheMock() load_and_cache('client_secrets.json', 'some_secrets', cache_mock) credentials = client.credentials_from_clientsecrets_and_code( 'some_secrets', self.scope, self.code, http=http, cache=cache_mock) self.assertEqual(credentials.access_token, 'asdfghjkl') self.assertEqual(set(['foo']), credentials.scopes)
def test_exchange_code_and_file_for_token_fail(self): http = http_mock.HttpMock( headers={'status': http_client.BAD_REQUEST}, data=b'{"error":"invalid_request"}', ) with self.assertRaises(client.FlowExchangeError): client.credentials_from_clientsecrets_and_code( datafile('client_secrets.json'), self.scope, self.code, http=http)
def google_login(request): """ 1. Receive google token from google's signin callback. 2. Get token's email & get/create a user against it. 3. Mark the user as logged in. """ # Exchange auth code for access token, refresh token, and ID token auth_code = request.POST['authCode'] credentials = client.credentials_from_clientsecrets_and_code( settings.CLIENT_SECRET_FILE, ['https://www.googleapis.com/auth/drive', 'profile', 'email'], auth_code) email = credentials.id_token['email'] guser_id = credentials.id_token['sub'] username = email.split('@')[0] try: user = User.objects.get(email=email) except: user = User(username=username, email=email) user.save() googleuser, created = GoogleUser.objects.get_or_create(user=user) googleuser.access_token = credentials.access_token googleuser.refresh_token = credentials.refresh_token googleuser.guser_id = guser_id googleuser.save() login(request, user) return JsonResponse({"success": True})
def oauth2callback(request): auth_code = request.GET['code'] print auth_code credentials = client.credentials_from_clientsecrets_and_code( settings.CLIENT_SECRET_FILE, ['https://www.googleapis.com/auth/drive', 'profile', 'email'], auth_code, redirect_uri=settings.HOST_ADDR+'/oauth2callback') # credentials = flow.step2_exchange(auth_code) print credentials.id_token email = credentials.id_token['email'] guser_id = credentials.id_token['sub'] print credentials.id_token username = email.split('@')[0] try: user = User.objects.get(email=email) except: user = User(username=username, email=email) user.save() googleuser, created = GoogleUser.objects.get_or_create(user=user) googleuser.access_token = credentials.access_token googleuser.refresh_token = credentials.refresh_token googleuser.guser_id = guser_id googleuser.save() login(request, user) state = request.GET.get('state') if state: param = state else: param = '' redirect_uri = '/?' + param return HttpResponseRedirect(redirect_uri)
def googlesignin(): auth_code = Utils.getParam(request.form, 'data', '') if not auth_code: return '' client_secret_file = '/etc/ostrich_conf/google_client_secret.json' credentials = client.credentials_from_clientsecrets_and_code( client_secret_file, ['profile', 'email'], auth_code) http_auth = credentials.authorize(httplib2.Http()) users_service = discovery.build('oauth2', 'v2', http=http_auth) user_document = users_service.userinfo().get().execute() #TODO check consistencies b/w session and user cache #from app import cache #cache_key = credentials.id_token['sub'] #user = cache.get(cache_key) user = session.get('_user', None) if not user: user_data = { 'username': user_document['email'], 'name': user_document['name'], 'email': user_document['email'], 'google_id': credentials.id_token['sub'], 'picture': user_document['picture'], 'source': 'web' } user = User.createUser(user_data) WebUtils.storeUserSession(user, client=Utils.getParam(request.form, 'client', default=None)) user = session['_user'] visible_user_data = {'user': user} return jsonify(data=visible_user_data)
def action(self, action, d): base = "http://localhost:8080" if tools.on_dev_server() else BASE if action == 'login': scope = "email profile" flow = User.get_auth_flow(scope=scope) flow.params['access_type'] = 'offline' flow.params['include_granted_scopes'] = 'true' auth_uri = flow.step1_get_authorize_url(state=scope) self.json_out({'auth_uri': auth_uri}, success=True, debug=True) elif action == 'oauth2callback': error = self.request.get('error') code = self.request.get('code') scope = self.request.get('scope') state_scopes = self.request.get('state') if code: CLIENT_SECRET_FILE = os.path.join(os.path.dirname(__file__), 'client_secrets.json') credentials = client.credentials_from_clientsecrets_and_code( CLIENT_SECRET_FILE, scope.split(' '), code, redirect_uri=base + "/api/auth/oauth2callback") user = self.user if not user: email = credentials.id_token['email'] user = User.GetByEmail(email) if not user: # Create account user = User.Create(email=email) if user: user.save_credentials(credentials) user.put() self.session['user'] = user elif error: logging.error(error) self.redirect("/app/settings")