我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用httplib2.Http()。
def net_billings(self, username, now_bytes_total): global monitor_vnodes if not username in self.net_lastbillings.keys(): self.net_lastbillings[username] = 0 elif int(now_bytes_total/self.bytes_per_beans) < self.net_lastbillings[username]: self.net_lastbillings[username] = 0 diff = int(now_bytes_total/self.bytes_per_beans) - self.net_lastbillings[username] if diff > 0: auth_key = env.getenv('AUTH_KEY') data = {"owner_name":username,"billing":diff, "auth_key":auth_key} header = {'Content-Type':'application/x-www-form-urlencoded'} http = Http() [resp,content] = http.request("http://"+self.master_ip+"/billing/beans/","POST",urlencode(data),headers = header) logger.info("response from master:"+content.decode('utf-8')) self.net_lastbillings[username] += diff monitor_vnodes[username]['net_stats']['net_billings'] = self.net_lastbillings[username]
def create_service(self, host): credentials = oauth.get_or_create_credentials( scope=OAUTH_SCOPES, storage_key=STORAGE_KEY) http = httplib2.Http(ca_certs=utils.get_cacerts_path()) http = credentials.authorize(http) # Kintaro's server doesn't seem to be able to refresh expired tokens # properly (responds with a "Stateless token expired" error). So we # manage state ourselves and refresh slightly more often than once # per hour. now = datetime.datetime.now() if self._last_run is None \ or now - self._last_run >= datetime.timedelta(minutes=50): credentials.refresh(http) self._last_run = now url = DISCOVERY_URL.replace('{host}', host) return discovery.build('content', 'v1', http=http, discoveryServiceUrl=url)
def _refresh(self, http_request): """Refreshes the access_token. Since the underlying App Engine app_identity implementation does its own caching we can skip all the storage hoops and just to a refresh using the API. Args: http_request: callable, a callable that matches the method signature of httplib2.Http.request, used to make the refresh request. Raises: AccessTokenRefreshError: When the refresh fails. """ try: scopes = self.scope.split() (token, _) = app_identity.get_access_token( scopes, service_account_id=self.service_account_id) except app_identity.Error as e: raise AccessTokenRefreshError(str(e)) self.access_token = token
def postWithPayload(loginUrl, payload=None): urlHeadersJson = {'content-type': 'application/json'} try: h = httplib2.Http('.cache', disable_ssl_certificate_validation=True) if payload is None: logger.debug('POST: ' + loginUrl) (response, content) = h.request(loginUrl, 'POST', '', urlHeadersJson) logger.debug(content) else: logger.debug('POST: ' + loginUrl + ' <- Data: ' + str(payload)) (response, content) = h.request(loginUrl, 'POST', body=payload, headers=urlHeadersJson) logger.debug(response) logger.debug(content) except Exception, e: raise Exception('Got an error code: ', e) return content
def postWithPayloadAndHeaders(loginUrl, urlHeadersJson, payload=None): try: h = httplib2.Http('.cache', disable_ssl_certificate_validation=True) if payload is None: logger.debug('POST: ' + loginUrl) (response, content) = h.request(loginUrl, 'POST', '', urlHeadersJson) else: logger.debug('POST: ' + loginUrl + ' <- Data: ' + str(payload)) (response, content) = h.request(loginUrl, 'POST', body=payload, headers=urlHeadersJson) except Exception, e: raise Exception('Got an error code: ', e) return content
def postOperation(url, apiKey, payload=''): urlHeadersJson = {'content-type': 'application/json', 'X-Api-Key': '%s' % str(apiKey)} try: h = httplib2.Http('.cache', disable_ssl_certificate_validation=True) if payload is None: logger.debug('POST: ' + url) (response, content) = h.request(url, 'POST', json.dumps(payload), urlHeadersJson) else: logger.debug('POST: ' + url + ' <- Data: ' + str(payload)) (response, content) = h.request(url, 'POST', json.dumps(payload), headers=urlHeadersJson) except Exception, e: raise Exception('Got an error code: ', e) return content
def patch(url, payload, apiKey): urlHeadersJson = {'content-type': 'application/json', 'X-Api-Key': '%s' % str(apiKey)} try: h = httplib2.Http('.cache', disable_ssl_certificate_validation=True) logger.debug('PATCH: ' + url + ' <-- Attribute: ' + str(payload)) (response, content) = h.request(url, 'PATCH', json.dumps(payload), urlHeadersJson) except Exception, e: # print (response, content) raise Exception('Got an error code: ', e) return content
def test__googleauth(self): """ TODO(supertom): add mocking, make more robust, etc. This test make a lot of assumptions: 1. Running on GCE 3. Doesn't truly verify the Http object is authorized. However, this function is critical for valid GCP operation so it is good to have a sanity check that we have an Http object. """ from httplib2 import Http # default creds http_auth = auth._googleauth() self.assertTrue(isinstance(http_auth, Http)) # service account key test_key_file = self._get_fixture('testkey.json') http_auth = auth._googleauth(key_file=test_key_file) self.assertTrue(isinstance(http_auth, Http))
def _refresh(self, http_request): """Refreshes the access_token. Since the underlying App Engine app_identity implementation does its own caching we can skip all the storage hoops and just to a refresh using the API. Args: http_request: callable, a callable that matches the method signature of httplib2.Http.request, used to make the refresh request. Raises: AccessTokenRefreshError: When the refresh fails. """ try: scopes = self.scope.split() (token, _) = app_identity.get_access_token(scopes) except app_identity.Error, e: raise AccessTokenRefreshError(str(e)) self.access_token = token
def __init__(self, github_owner_username, github_repo_name, github_oauth_token, rate_limit, http_instance=None): """Initialize the GitHubService. Args: github_owner_username: The username of the owner of the repository. github_repo_name: The GitHub repository name. github_oauth_token: The oauth token to use for the requests. rate_limit: Whether or not to rate limit GitHub API requests. http_instance: The HTTP instance to use, if not set a default will be used. """ self.github_owner_username = github_owner_username self.github_repo_name = github_repo_name self._github_oauth_token = github_oauth_token self._rate_limit = rate_limit self._http = http_instance if http_instance else httplib2.Http()
def main(): """Shows basic usage of the Google Admin SDK Reports API. Creates a Google Admin SDK Reports API service object and outputs a list of last 10 login events. """ credentials = get_credentials() http = credentials.authorize(httplib2.Http()) service = discovery.build('admin', 'reports_v1', http=http) print('Getting the last 10 login events') results = service.activities().list(userKey='all', applicationName='login', maxResults=10).execute() activities = results.get('items', []) if not activities: print('No logins found.') else: print('Logins:') for activity in activities: print('{0}: {1} ({2})'.format(activity['id']['time'], activity['actor']['email'], activity['events'][0]['name']))
def execute(self): """ Returns GSuite events based on given app/activity. Other parameters are optional. """ logging.debug("Authenticating to GSuite") self.get_credentials() self.http = self.credentials.authorize(httplib2.Http()) self.service = discovery.build('admin', 'reports_v1', http=self.http) logging.debug("Retrieving %s events from: %s to %s", self.app, convert_time(self.s_time), convert_time(self.e_time)) self.results = self.service.activities().list(userKey=self.user, applicationName=self.app, startTime=self.s_time, endTime=self.e_time, maxResults=self.max).execute() return self.results.get('items', [])
def getGDataOAuthToken(gdataObj, credentials=None): if not credentials: credentials = getClientCredentials(API.FAM2_SCOPES) try: credentials.refresh(httplib2.Http(disable_ssl_certificate_validation=GC.Values[GC.NO_VERIFY_SSL])) except httplib2.ServerNotFoundError as e: systemErrorExit(NETWORK_ERROR_RC, str(e)) except oauth2client.client.AccessTokenRefreshError as e: return handleOAuthTokenError(str(e), False) gdataObj.additional_headers[u'Authorization'] = u'Bearer {0}'.format(credentials.access_token) if not GC.Values[GC.DOMAIN]: GC.Values[GC.DOMAIN] = credentials.id_token.get(u'hd', u'UNKNOWN').lower() if not GC.Values[GC.CUSTOMER_ID]: GC.Values[GC.CUSTOMER_ID] = GC.MY_CUSTOMER GM.Globals[GM.ADMIN] = credentials.id_token.get(u'email', u'UNKNOWN').lower() GM.Globals[GM.OAUTH2_CLIENT_ID] = credentials.client_id gdataObj.domain = GC.Values[GC.DOMAIN] gdataObj.source = GAM_INFO return True
def getGDataUserCredentials(api, user, i, count): userEmail = convertUIDtoEmailAddress(user) _, _, api_version, cred_family = API.getVersion(api) disc_file, discovery = readDiscoveryFile(api_version) GM.Globals[GM.CURRENT_API_USER] = userEmail credentials = getClientCredentials(cred_family) try: GM.Globals[GM.CURRENT_API_SCOPES] = list(set(list(discovery[u'auth'][u'oauth2'][u'scopes'])).intersection(credentials.scopes)) except KeyError: invalidDiscoveryJsonExit(disc_file) if not GM.Globals[GM.CURRENT_API_SCOPES]: systemErrorExit(NO_SCOPES_FOR_API_RC, Msg.NO_SCOPES_FOR_API.format(discovery.get(u'title', api_version))) credentials = getSvcAcctCredentials(GM.Globals[GM.CURRENT_API_SCOPES], userEmail) try: credentials.refresh(httplib2.Http(disable_ssl_certificate_validation=GC.Values[GC.NO_VERIFY_SSL])) return (userEmail, credentials) except httplib2.ServerNotFoundError as e: systemErrorExit(NETWORK_ERROR_RC, str(e)) except oauth2client.client.AccessTokenRefreshError as e: handleOAuthTokenError(str(e), True) entityUnknownWarning(Ent.USER, userEmail, i, count) return (userEmail, None)
def getCRMService(login_hint): from oauth2client.contrib.dictionary_storage import DictionaryStorage scope = u'https://www.googleapis.com/auth/cloud-platform' client_id = u'297408095146-fug707qsjv4ikron0hugpevbrjhkmsk7.apps.googleusercontent.com' client_secret = u'qM3dP8f_4qedwzWQE1VR4zzU' flow = oauth2client.client.OAuth2WebServerFlow(client_id=client_id, client_secret=client_secret, scope=scope, redirect_uri=oauth2client.client.OOB_CALLBACK_URN, user_agent=GAM_INFO, access_type=u'online', response_type=u'code', login_hint=login_hint) storage_dict = {} storage = DictionaryStorage(storage_dict, u'credentials') flags = cmd_flags(noLocalWebserver=GC.Values[GC.NO_BROWSER]) httpObj = httplib2.Http(disable_ssl_certificate_validation=GC.Values[GC.NO_VERIFY_SSL]) try: credentials = oauth2client.tools.run_flow(flow=flow, storage=storage, flags=flags, http=httpObj) except httplib2.CertificateValidationUnsupported: noPythonSSLExit() credentials.user_agent = GAM_INFO httpObj = credentials.authorize(httplib2.Http(disable_ssl_certificate_validation=GC.Values[GC.NO_VERIFY_SSL], cache=None)) return (googleapiclient.discovery.build(u'cloudresourcemanager', u'v1', http=httpObj, cache_discovery=False), httpObj)
def doUpdateProject(): login_hint = getEmailAddress(noUid=True, optional=True) checkForExtraneousArguments() login_hint = getValidateLoginHint(login_hint) _, httpObj = getCRMService(login_hint) cs_data = readFile(GC.Values[GC.CLIENT_SECRETS_JSON], mode=u'rb', continueOnError=True, displayError=True, encoding=None) if not cs_data: systemErrorExit(14, u'Your client secrets file:\n\n%s\n\nis missing. Please recreate the file.' % GC.Values[GC.CLIENT_SECRETS_JSON]) try: cs_json = json.loads(cs_data) projectName = 'project:%s' % cs_json[u'installed'][u'project_id'] except (ValueError, IndexError, KeyError): systemErrorExit(3, u'The format of your client secrets file:\n\n%s\n\nis incorrect. Please recreate the file.' % GC.Values[GC.CLIENT_SECRETS_JSON]) simplehttp = httplib2.Http(disable_ssl_certificate_validation=GC.Values[GC.NO_VERIFY_SSL]) enableProjectAPIs(simplehttp, httpObj, projectName, True) # gam whatis <EmailItem> [noinfo]
def build_http(): """Builds httplib2.Http object Returns: A httplib2.Http object, which is used to make http requests, and which has timeout set by default. To override default timeout call socket.setdefaulttimeout(timeout_in_sec) before interacting with this method. """ if socket.getdefaulttimeout() is not None: http_timeout = socket.getdefaulttimeout() else: http_timeout = DEFAULT_HTTP_TIMEOUT_SEC return httplib2.Http(timeout=http_timeout)
def testHttpsContext(self): client = httplib2.Http(ca_certs=self.ca_certs_path) # Establish connection to local server client.request('https://localhost:%d/' % (self.port)) # Verify that connection uses a TLS context with the correct hostname conn = client.connections['https:localhost:%d' % self.port] self.assertIsInstance(conn.sock, ssl.SSLSocket) self.assertTrue(hasattr(conn.sock, 'context')) self.assertIsInstance(conn.sock.context, ssl.SSLContext) self.assertTrue(conn.sock.context.check_hostname) self.assertEqual(conn.sock.server_hostname, 'localhost') self.assertEqual(conn.sock.context.verify_mode, ssl.CERT_REQUIRED) self.assertEqual(conn.sock.context.protocol, ssl.PROTOCOL_SSLv23)
def test_ssl_hostname_mismatch_repeat(self): # https://github.com/httplib2/httplib2/issues/5 # FIXME(temoto): as of 2017-01-05 this is only a reference code, not useful test. # Because it doesn't provoke described error on my machine. # Instead `SSLContext.wrap_socket` raises `ssl.CertificateError` # which was also added to original patch. # url host is intentionally different, we provoke ssl hostname mismatch error url = 'https://127.0.0.1:%d/' % (self.port,) http = httplib2.Http(ca_certs=self.ca_certs_path, proxy_info=None) def once(): try: http.request(url) assert False, 'expected certificate hostname mismatch error' except Exception as e: print('%s errno=%s' % (repr(e), getattr(e, 'errno', None))) once() once()
def testGetViaHttpsKeyCert(self): # At this point I can only test # that the key and cert files are passed in # correctly to httplib. It would be nice to have # a real https endpoint to test against. # bitworking.org presents an certificate for a non-matching host # (*.webfaction.com), so we need to disable cert checking for this test. http = httplib2.Http(timeout=2, disable_ssl_certificate_validation=True) http.add_certificate("akeyfile", "acertfile", "bitworking.org") try: (response, content) = http.request("https://bitworking.org", "GET") except: pass self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile") self.assertEqual(http.connections["https:bitworking.org"].cert_file, "acertfile") try: (response, content) = http.request("https://notthere.bitworking.org", "GET") except: pass self.assertEqual(http.connections["https:notthere.bitworking.org"].key_file, None) self.assertEqual(http.connections["https:notthere.bitworking.org"].cert_file, None)
def testGetViaHttpsKeyCert(self): # At this point I can only test # that the key and cert files are passed in # correctly to httplib. It would be nice to have # a real https endpoint to test against. http = httplib2.Http(timeout=2) http.add_certificate("akeyfile", "acertfile", "bitworking.org") try: (response, content) = http.request("https://bitworking.org", "GET") except AttributeError: self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile") self.assertEqual(http.connections["https:bitworking.org"].cert_file, "acertfile") except IOError: # Skip on 3.2 pass try: (response, content) = http.request("https://notthere.bitworking.org", "GET") except httplib2.ServerNotFoundError: self.assertEqual(http.connections["https:notthere.bitworking.org"].key_file, None) self.assertEqual(http.connections["https:notthere.bitworking.org"].cert_file, None) except IOError: # Skip on 3.2 pass
def testSslCertValidation(self): # Test that we get an ssl.SSLError when specifying a non-existent CA # certs file. http = httplib2.Http(ca_certs='/nosuchfile') self.assertRaises(IOError, http.request, "https://www.google.com/", "GET") # Test that we get a SSLHandshakeError if we try to access # https://www.google.com, using a CA cert file that doesn't contain # the CA Google uses (i.e., simulating a cert that's not signed by a # trusted CA). other_ca_certs = os.path.join( os.path.dirname(os.path.abspath(httplib2.__file__ )), "test", "other_cacerts.txt") http = httplib2.Http(ca_certs=other_ca_certs) self.assertRaises(ssl.SSLError, http.request,"https://www.google.com/", "GET")
def main(): """Shows basic usage of the Google Drive API. Creates a Google Drive API service object and outputs the names and IDs for up to 10 files. """ credentials = get_credentials() http = credentials.authorize(httplib2.Http()) service = discovery.build('drive', 'v3', http=http) results = service.files().list( pageSize=10, fields="nextPageToken, files(id, name)").execute() items = results.get('files', []) if not items: print('No files found.') else: print('Files:') for item in items: print('{0} ({1})'.format(item['name'], item['id']))
def send(To, Subject, Body, Cc=[], Bcc=[], html=False, files=[]): """Send an email """ subtype = 'html' if html else 'plain' message = MIMEMultipart() message['To'] = ', '.join(To) message['Subject'] = Subject message['Cc'] = ', '.join(Cc) message['Bcc'] = ', '.join(Bcc) message.attach(MIMEText(Body, subtype)) for f in files: with open(f, "rb") as In: part = MIMEApplication(In.read(), Name=basename(f)) part['Content-Disposition'] = 'attachment; filename="%s"' % basename(f) message.attach(part) message = {'raw': base64.urlsafe_b64encode(message.as_string())} credentials = oauth2client.file.Storage(CREDENTIALS_PATH).get() Http = credentials.authorize(httplib2.Http()) service = discovery.build('gmail', 'v1', http=Http) message = service.users().messages().send(userId='me', body=message).execute()
def _refresh(self, http_request): """Refreshes the access_token. Since the underlying App Engine app_identity implementation does its own caching we can skip all the storage hoops and just to a refresh using the API. Args: http_request: callable, a callable that matches the method signature of httplib2.Http.request, used to make the refresh request. Raises: AccessTokenRefreshError: When the refresh fails. """ try: (token, _) = app_identity.get_access_token(self.scope) except app_identity.Error, e: raise AccessTokenRefreshError(str(e)) self.access_token = token
def is_admin(self, credentials): """Check if user has appengine.admin role. Calls iam.projects.testIamPermissions with appengine.applications.update to determine if the current logged in user is an application admin. Args: credentials: the user's access token. Returns: True if user is an admin, False otherwise. """ admin_permission = 'appengine.applications.update' body = {'permissions': admin_permission} http = credentials.authorize(httplib2.Http()) response = api.CLIENTS.iam.projects().testIamPermissions( resource=config.get_project_id(), body=body).execute(http=http) return admin_permission in response.get('permissions', [])
def main(): """ Shows basic usage of the Google Calendar API. Creates a Google Calendar API service object, logs the query from the arguments, and creates an event with Quick Add. """ credentials = get_credentials() http = credentials.authorize(httplib2.Http()) service = discovery.build('calendar', 'v3', http=http) if len(sys.argv) > 1: with open("log.txt", "a") as logfile: logfile.write(flags.query + "\n") created_event = service.events().quickAdd( calendarId='primary', text=flags.query ).execute()
def gdisconnect(): # Only disconnect a connected user. credentials = login_session.get('credentials') if credentials is None: response = make_response( json.dumps('Current user not connected.'), 401) response.headers['Content-Type'] = 'application/json' return response access_token = credentials.access_token url = 'https://accounts.google.com/o/oauth2/revoke?token=%s' % access_token h = httplib2.Http() result = h.request(url, 'GET')[0] if result['status'] != '200': # For whatever reason, the given token was invalid. response = make_response( json.dumps('Failed to revoke token for given user.', 400)) response.headers['Content-Type'] = 'application/json' return response # Disconnect based on provider
def authenticate(): global service global http_auth global calendar try: # read credentials credentials = ServiceAccountCredentials.from_json_keyfile_name(CREDENTIAL_FILE_PATH, scopes) # authorize and get the calendar service http_auth = credentials.authorize(Http()) service = discovery.build('calendar', 'v3', http=http_auth) calendar = service.calendars().get(calendarId=CALENDAR_ID).execute() except: logging.getLogger('BoilerLogger').error('failed to authenticate to google calendar service, will retry...') init() # get calendar events in a window sorted by start time
def delete(config, disk_name=None, disk_zone=None): # TODO: implement # submit a request to the gce api for a new disk with the given parameters # if inputs is not None, run a pipeline job to populate the disk projectId = config.project_id zones = [disk_zone if disk_zone is not None else x for x in config.zones.split(',')] credentials = GoogleCredentials.get_application_default() http = credentials.authorize(httplib2.Http()) if credentials.access_token_expired: credentials.refresh(http) gce = discovery.build('compute', 'v1', http=http) for z in zones: try: resp = gce.disks().delete(project=projectId, zone=z, disk=disk_name).execute() except HttpError as e: raise DataDiskError("Couldn't delete data disk {n}: {reason}".format(n=disk_name, reason=e)) while True: try: result = gce.zoneOperations().get(project=projectId, zone=z, operation=resp['name']).execute() except HttpError: break else: if result['status'] == 'DONE': break
def create(apiName, apiVersion): credentials = GoogleCredentials.get_application_default() http = credentials.authorize(httplib2.Http()) if credentials.access_token_expired: credentials.refresh(http) return discovery.build(apiName, apiVersion, http)
def get_authenticated_service(self): """ Create youtube oauth2 connection """ credentials = AccessTokenCredentials( access_token=self.get_auth_code(), user_agent='Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36' ) return build( 'youtube', 'v3', http=credentials.authorize(httplib2.Http()) )
def request_master(url,data): global G_masterip header = {'Content-Type':'application/x-www-form-urlencoded'} http = Http() [resp,content] = http.request("http://"+G_masterip+url,"POST",urlencode(data),headers = header) logger.info("response from master:"+content.decode('utf-8')) # The class is to collect data of containers on each worker
def request_master(url,data): global G_masterip #logger.info("master_ip:"+str(G_masterip)) header = {'Content-Type':'application/x-www-form-urlencoded'} http = Http() for masterip in G_masterips: [resp,content] = http.request("http://"+masterip+url,"POST",urlencode(data),headers = header) logger.info("response from master:"+content.decode('utf-8'))
def oauth2callback(): flow = oauth2client.client.flow_from_clientsecrets( 'client_secrets_oauth.json', scope=[ 'https://www.googleapis.com/auth/userinfo.email', 'https://www.googleapis.com/auth/userinfo.profile'], redirect_uri=flask.url_for('oauth2callback', _external=True)) if 'code' not in flask.request.args: auth_uri = flow.step1_get_authorize_url() return flask.redirect(auth_uri) else: auth_code = flask.request.args.get('code') credentials = flow.step2_exchange(auth_code) flask.session['credentials'] = credentials.to_json() # use token to get user profile from google oauth api http_auth = credentials.authorize(httplib2.Http()) userinfo_client = apiclient.discovery.build('oauth2', 'v2', http_auth) user = userinfo_client.userinfo().v2().me().get().execute() # TODO only allow columbia.edu emails # if 'columbia.edu' not in user['email']: # return flask.redirect(flask.url_for('bademail')) um = users_model.Users() flask.session['google_user'] = user flask.session['id'] = um.get_or_create_user(user) # now add is_student and is_teacher to flask.session im = index_model.Index(flask.session['id']) flask.session['is_student'] = True if im.is_student() else False flask.session['is_teacher'] = True if im.is_teacher() else False redirect = flask.session['redirect'] flask.session.pop('redirect', None) return flask.redirect(redirect)
def create_service(): credentials = appengine.AppAssertionCredentials(SCOPE) http = httplib2.Http() http = credentials.authorize(http) credentials.refresh(http) return discovery.build('content', 'v1', http=http, discoveryServiceUrl=DISCOVERY_URL)
def create_pubsub_client(http=None): credentials = oauth2client.GoogleCredentials.get_application_default() if credentials.create_scoped_required(): credentials = credentials.create_scoped(PUBSUB_SCOPES) if not http: http = httplib2.Http() credentials.authorize(http) return discovery.build('pubsub', 'v1', http=http)
def create_datastore_client(http=None): credentials = oauth2client.GoogleCredentials.get_application_default() if not http: http = httplib2.Http() credentials.authorize(http) return datastore.Client(credentials=credentials)
def authorize(self, http): """Take an httplib2.Http instance (or equivalent) and authorizes it. Authorizes it for the set of credentials, usually by replacing http.request() with a method that adds in the appropriate headers and then delegates to the original Http.request() method. Args: http: httplib2.Http, an http object to be used to make the refresh request. """ _abstract()
def refresh(self, http): """Forces a refresh of the access_token. Args: http: httplib2.Http, an http object to be used to make the refresh request. """ _abstract()
def refresh(self, http): """Forces a refresh of the access_token. Args: http: httplib2.Http, an http object to be used to make the refresh request. """ self._refresh(http.request)
def revoke(self, http): """Revokes a refresh_token and makes the credentials void. Args: http: httplib2.Http, an http object to be used to make the revoke request. """ self._revoke(http.request)
def retrieve_scopes(self, http): """Retrieves the canonical list of scopes for this access token. Gets the scopes from the OAuth2 provider. Args: http: httplib2.Http, an http object to be used to make the refresh request. Returns: A set of strings containing the canonical list of scopes. """ self._retrieve_scopes(http.request) return self.scopes
def get_access_token(self, http=None): """Return the access token and its expiration information. If the token does not exist, get one. If the token expired, refresh it. """ if not self.access_token or self.access_token_expired: if not http: http = httplib2.Http() self.refresh(http) return AccessTokenInfo(access_token=self.access_token, expires_in=self._expires_in())
def _refresh(self, http_request): """Refreshes the access_token. This method first checks by reading the Storage object if available. If a refresh is still needed, it holds the Storage lock until the refresh is completed. Args: http_request: callable, a callable that matches the method signature of httplib2.Http.request, used to make the refresh request. Raises: HttpAccessTokenRefreshError: When the refresh fails. """ if not self.store: self._do_refresh_request(http_request) else: self.store.acquire_lock() try: new_cred = self.store.locked_get() if (new_cred and not new_cred.invalid and new_cred.access_token != self.access_token and not new_cred.access_token_expired): logger.info('Updated access_token read from Storage') self._updateFromCredential(new_cred) else: self._do_refresh_request(http_request) finally: self.store.release_lock()
def _revoke(self, http_request): """Revokes this credential and deletes the stored copy (if it exists). Args: http_request: callable, a callable that matches the method signature of httplib2.Http.request, used to make the revoke request. """ self._do_revoke(http_request, self.refresh_token or self.access_token)
def _do_revoke(self, http_request, token): """Revokes this credential and deletes the stored copy (if it exists). Args: http_request: callable, a callable that matches the method signature of httplib2.Http.request, used to make the refresh request. token: A string used as the token to be revoked. Can be either an access_token or refresh_token. Raises: TokenRevokeError: If the revoke request does not return with a 200 OK. """ logger.info('Revoking token') query_params = {'token': token} token_revoke_uri = _update_query_params(self.revoke_uri, query_params) resp, content = http_request(token_revoke_uri) if resp.status == http_client.OK: self.invalid = True else: error_msg = 'Invalid response %s.' % resp.status try: d = json.loads(_from_bytes(content)) if 'error' in d: error_msg = d['error'] except (TypeError, ValueError): pass raise TokenRevokeError(error_msg) if self.store: self.store.delete()
def _retrieve_scopes(self, http_request): """Retrieves the list of authorized scopes from the OAuth2 provider. Args: http_request: callable, a callable that matches the method signature of httplib2.Http.request, used to make the revoke request. """ self._do_retrieve_scopes(http_request, self.access_token)
def _do_retrieve_scopes(self, http_request, token): """Retrieves the list of authorized scopes from the OAuth2 provider. Args: http_request: callable, a callable that matches the method signature of httplib2.Http.request, used to make the refresh request. token: A string used as the token to identify the credentials to the provider. Raises: Error: When refresh fails, indicating the the access token is invalid. """ logger.info('Refreshing scopes') query_params = {'access_token': token, 'fields': 'scope'} token_info_uri = _update_query_params(self.token_info_uri, query_params) resp, content = http_request(token_info_uri) content = _from_bytes(content) if resp.status == http_client.OK: d = json.loads(content) self.scopes = set(util.string_to_scopes(d.get('scope', ''))) else: error_msg = 'Invalid response %s.' % (resp.status,) try: d = json.loads(content) if 'error_description' in d: error_msg = d['error_description'] except (TypeError, ValueError): pass raise Error(error_msg)