我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用oauth2client.file.Storage()。
def test_imap_old(user): storage = Storage('credentials_file') credentials = storage.get() xoauth = xoauth2_str(user, credentials.access_token) conn = imaplib.IMAP4_SSL('imap.googlemail.com') conn.debug = 4 conn.authenticate('XOAUTH2', lambda x: xoauth) status, labels = conn.list() conn.select("[Gmail]/All Mail") # Once authenticated everything from the impalib.IMAP4_SSL class will # work as per usual without any modification to your code. typ, msgnums = conn.search(None, 'X-GM-RAW', 'vget') print 'typ', typ print 'num', msgnums # conn.select('INBOX') # print conn.list()
def get_credentials(self): """ Returns user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. """ self.home_dir = os.path.expanduser('~') self.credential_dir = os.path.join(self.home_dir, '.credentials') if not os.path.exists(self.credential_dir): logging.debug("Cred directory not found...creating: %s", self.credential_dir) os.makedirs(self.credential_dir) self.credential_path = os.path.join(self.credential_dir, 'admin-reports_v1-python-quickstart.json') self.store = Storage(self.credential_path) self.credentials = self.store.get() if not self.credentials or self.credentials.invalid: if not os.path.isfile(self.client_file): logging.error("'client_secret.json file is missing. \ Google OAuth must be configured") sys.exit(1) self.flow = client.flow_from_clientsecrets(self.client_file, self.scope) self.flow.user_agent = self.appname
def get_credentials(): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ home_dir = os.path.expanduser('~') credential_path = CREDS_FILENAME store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES) flow.user_agent = APPLICATION_NAME credentials = tools.run_flow(flow, store) return credentials
def get_credentials(flags): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ store = Storage(flags.credfile) with warnings.catch_warnings(): warnings.simplefilter("ignore") credentials = store.get() if not credentials or credentials.invalid: flow = client.OAuth2WebServerFlow(**CLIENT_CREDENTIAL) credentials = tools.run_flow(flow, store, flags) print('credential file saved at\n\t' + flags.credfile) return credentials
def main(): # Arguments parsing parser = argparse.ArgumentParser("Client ID and Secret are mandatory arguments") parser.add_argument("-i", "--id", required=True, help="Client id", metavar='<client-id>') parser.add_argument("-s", "--secret", required=True, help="Client secret", metavar='<client-secret>') parser.add_argument("-c", "--console", default=False, help="Authenticate only using console (for headless systems)", action="store_true") args = parser.parse_args() # Scopes of authorization activity = "https://www.googleapis.com/auth/fitness.activity.write" body = "https://www.googleapis.com/auth/fitness.body.write" location = "https://www.googleapis.com/auth/fitness.location.write" scopes = activity + " " + body + " " + location flow = OAuth2WebServerFlow(args.id, args.secret, scopes) storage = Storage('google.json') flags = ['--noauth_local_webserver'] if args.console else [] run_flow(flow, storage, argparser.parse_args(flags))
def test_pickle_and_json_interop(self): # Write a file with a pickled OAuth2Credentials. credentials = self._create_test_credentials() credentials_file = open(FILENAME, 'wb') pickle.dump(credentials, credentials_file) credentials_file.close() # Storage should be not be able to read that object, as the capability # to read and write credentials as pickled objects has been removed. storage = file_module.Storage(FILENAME) read_credentials = storage.get() self.assertIsNone(read_credentials) # Now write it back out and confirm it has been rewritten as JSON storage.put(credentials) with open(FILENAME) as credentials_file: data = json.load(credentials_file) self.assertEquals(data['access_token'], 'foo') self.assertEquals(data['_class'], 'OAuth2Credentials') self.assertEquals(data['_module'], client.OAuth2Credentials.__module__)
def test_access_token_credentials(self): access_token = 'foo' user_agent = 'refresh_checker/1.0' credentials = client.AccessTokenCredentials(access_token, user_agent) storage = file_module.Storage(FILENAME) credentials = storage.put(credentials) credentials = storage.get() self.assertIsNotNone(credentials) self.assertEquals('foo', credentials.access_token) self.assertTrue(os.path.exists(FILENAME)) if os.name == 'posix': # pragma: NO COVER mode = os.stat(FILENAME).st_mode self.assertEquals('0o600', oct(stat.S_IMODE(mode)))
def auth_stored_credentials(self, client_secrets_file, scopes=[]): """Authorize stored credentials.""" try: import argparse parser = argparse.ArgumentParser(parents=[tools.argparser]) parser.add_argument('args', nargs=argparse.REMAINDER) flags = parser.parse_args() flags.noauth_local_webserver = True except ImportError: flags = None store = Storage(self.credentials_file) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets( client_secrets_file, scopes, ) flow.user_agent = self.app_name if flags: credentials = tools.run_flow(flow, store, flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) print 'Saved credentials to ' + self.credentials_file return credentials
def authorize_application(client_secret_file, scope, credential_cache_file='credentials_cache.json', flow_params=[]): """ authorize an application to the requested scope by asking the user in a browser. :param client_secret_file: json file containing the client secret for an offline application :param scope: scope(s) to authorize the application for :param credential_cache_file: if provided or not None, the credenials will be cached in a file. The user does not need to be reauthenticated :param flow_params: oauth2 flow parameters :return OAuth2Credentials object """ FLOW = flow_from_clientsecrets(client_secret_file, scope=scope) storage = Storage(credential_cache_file) credentials = storage.get() if credentials is None or credentials.invalid: # Run oauth2 flow with default arguments. level = logging.getLogger().level credentials = tools.run_flow(FLOW, storage, tools.argparser.parse_args(flow_params)) logging.getLogger().setLevel(level) return credentials
def get_drive_service(): ''' Returns an object used to interact with the Google Drive API. ''' flow = client.flow_from_clientsecrets( get_credentials_path('secret.json'), 'https://www.googleapis.com/auth/drive') flow.user_agent = USER_AGENT_NAME store = Storage(get_credentials_path('storage.dat', False)) credentials = store.get() if not credentials or credentials.invalid: flags = tools.argparser.parse_args(args=[]) credentials = tools.run_flow(flow, store, flags) http = credentials.authorize(httplib2.Http()) service = discovery.build('drive', 'v3', http=http) return service
def storage_delete_object(storage_service, bucket, filename): ''' Deletes a file from the Google Cloud Storage. Arguments: - `storage_service`: - `bucket`: - `filename`: ''' req = storage_service.objects().delete(bucket=bucket, object=os.path.basename(filename)) resp = req.execute() return resp # ============================================================ # GOOGLE CLOUD SPEECH API # ============================================================
def upload_to_cloud(self, next_state): ''' State machine action to upload a WAV file to Google Cloud Storage. ''' logger.info('Uploading to cloud storage %s', str(self)) filename = local_trimmed_wav_path(self.job_name) try: response = storage_upload_object(self.services['storage'], BUCKET, filename = filename) except socket.error: logger.warning('socket.error') response = None time.sleep(0.5) if response: if os.stat(filename).st_size == int(response['size']): self.job_record['state'] = next_state self.pstorage.save() return True self.set_next_tick(5) return False
def get_credentials(): """ credentials????????? """ dirname = os.path.dirname(__file__) credential_path = os.path.join(dirname, CREDENTIAL_FILE) client_secret_file = os.path.join(dirname, CLIENT_SECRET_FILE) store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(client_secret_file, SCOPES) flow.user_agent = APPLICATION_NAME credentials = tools.run_flow(flow, store) print('credentials?{}???????'.format(credential_path)) return credentials
def get_credentials(handler=None): home_dir = os.path.expanduser('~') credential_dir = os.path.join(home_dir, '.credentials') if not os.path.exists(credential_dir): os.makedirs(credential_dir) credential_path = os.path.join(credential_dir, 'google-api-quickstart.json') store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(filename=CLIENT_SECRET_FILE,scope=SCOPES,redirect_uri=redirect_url) url = flow.step1_get_authorize_url() if handler: handler.redirect(url) else: return None return credentials
def get_user_account_credentials(self): from oauth2client.client import OAuth2WebServerFlow from oauth2client.file import Storage from oauth2client.tools import run_flow, argparser flow = OAuth2WebServerFlow( client_id=('495642085510-k0tmvj2m941jhre2nbqka17vqpjfddtd' '.apps.googleusercontent.com'), client_secret='kOc9wMptUtxkcIFbtZCcrEAc', scope=self.scope, redirect_uri='urn:ietf:wg:oauth:2.0:oob') storage = Storage('bigquery_credentials.dat') credentials = storage.get() if credentials is None or credentials.invalid or self.reauth: credentials = run_flow(flow, storage, argparser.parse_args([])) return credentials
def initYoutube(): flow = flow_from_clientsecrets(constants.YOUTUBE_CLIENT_SECRETS_FILE,\ message=constants.MISSING_CLIENT_SECRETS_MESSAGE,\ scope=constants.YOUTUBE_READ_WRITE_SCOPE, redirect_uri='http://localhost:8080/') storage = Storage("%s-oauth2.json" % sys.argv[0]) credentials = storage.get() if credentials is None or credentials.invalid: flags = argparser.parse_args() credentials = run_flow(flow, storage, flags) else: print("Found valid oauth2 json") youtube = build(constants.YOUTUBE_API_SERVICE_NAME, constants.YOUTUBE_API_VERSION, http=credentials.authorize(httplib2.Http())) print("Done authentication with Youtube") return youtube
def _get_credentials(self): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ store = Storage(self._credentials_file) credentials = store.get() if not credentials or credentials.invalid: raise Exception( "Could not find Google Mail credentials at %s" % self._credentials_file) return credentials
def update_credentials(self): """Gets valid user credentials from Gmail using Oauth 2.0 Returns: Credentials, the obtained credential. """ if not os.path.exists(self._config['credentials_dir']): os.makedirs(self._config['credentials_dir']) store = Storage(self._credentials_file) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(self._client_secret_file, self._config['oauth2_scopes']) flow.user_agent = self._config['application_name'] credentials = tools.run_flow(flow, store, self._oauth2_flags) self._logger.info('Storing credentials to ' + self._credentials_file) else: self._logger.info("Credentials exist.") return credentials
def _get_credentials(self): """Get OAuth credentials :return: OAuth credentials :rtype: :class:`oauth2client.client.Credentials` """ credential_dir = join(self.config['creds_cache_dir'], 'cached_oauth_credentials') if not exists(credential_dir): makedirs(credential_dir) credential_path = join(credential_dir, 'googleapis.json') store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(self.config['creds'], self.config['scope']) flow.user_agent = 'Iris Gmail Integration' credentials = tools.run_flow( flow, store, tools.argparser.parse_args(args=['--noauth_local_webserver'])) logger.info('Storing credentials to %s' % credential_path) else: credentials.refresh(self.http) return credentials
def get_credentials(self, client_secret_file, scopes, credentials_dir, credentials_file, flags=None): # Create credentials folder, if necessary if not os.path.exists(credentials_dir): os.makedirs(credentials_dir) # Store for credentials file credential_path = os.path.join(credentials_dir, credentials_file) store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(client_secret_file, scopes) flow.user_agent = APPLICATION_NAME if self._flags: credentials = tools.run_flow(flow, store, self._flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) self._logger.debug("Storing credentials to '{}'".format(credential_path)) else: self._logger.debug("Got valid credential from '{}'".format(credential_path)) return credentials # Create a Google Calendar API service object
def _get_credentials(self): """Get OAuth credentials :return: OAuth credentials :rtype: :class:`oauth2client.client.Credentials` """ credential_dir = join(self.var_dir, 'cached_oauth_credentials') if not exists(credential_dir): makedirs(credential_dir) credential_path = join(credential_dir, 'googleapis.json') store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(self.config['creds'], self.config['scope']) flow.user_agent = 'Iris Gmail Integration' credentials = tools.run_flow( flow, store, tools.argparser.parse_args(args=['--noauth_local_webserver'])) logger.info('Storing credentials to %s', credential_path) else: credentials.refresh(self.http) return credentials
def get_credentials(): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ import argparse flags = argparse.ArgumentParser(parents=[tools.argparser]).parse_args() flags.noauth_local_webserver = True store = Storage(gmail_api.CREDENTIAL_FILE) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(gmail_api.CLIENT_SECRET_FILE, gmail_api.SCOPES) flow.user_agent = gmail_api.APPLICATION_NAME if flags: credentials = tools.run_flow(flow, store, flags) print 'Storing credentials to ' + gmail_api.CREDENTIAL_FILE return credentials
def get_credentials(): home_dir = path.expanduser('~') credential_dir = path.join(home_dir, '.credentials') if not path.exists(credential_dir): makedirs(credential_dir) credential_path = path.join(credential_dir, 'sheets.googleapis.com-python-quickstart.json') store = file.Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(client_secret_path, SCOPES) flow.user_agent = APPLICATION_NAME if flags: credentials = tools.run_flow(flow, store, flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) print('Storing credentials to ' + credential_path) return credentials
def get_credentials(self, cred_file='client_secret.json'): import os from oauth2client.file import Storage from oauth2client import client home_dir = os.path.expanduser('~') credential_dir = os.path.join(home_dir, '.credentials') if not os.path.exists(credential_dir): os.makedirs(credential_dir) credential_path = os.path.join(credential_dir, 'python-quickstart.json') store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(cred_file, scope=['https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive']) flow.user_agent = self.app_name try: credentials = tools.run_flow(flow, store, self.flags) except: # Needed only for compatibility with Python 2.6 try: credentials = tools.run_flow(flow, store) except: credentials = tools.run(flow, store) print('Storing credentials to ' + credential_path) return credentials
def get_authenticated_service(): ''' Authorize the request and store authorization credentials. ''' flow = flow_from_clientsecrets(CLIENT_SECRETS_FILE, scope=YOUTUBE_READ_WRITE_SSL_SCOPE, message=MISSING_CLIENT_SECRETS_MESSAGE) storage = Storage("%s-oauth2.json" % sys.argv[0]) credentials = storage.get() if credentials is None or credentials.invalid: credentials = run_flow(flow, storage) # Trusted testers can download this discovery document from the # developers page and it should be in the same directory with the code. return build(API_SERVICE_NAME, API_VERSION, http=credentials.authorize(httplib2.Http()))
def get_authenticated_service(): '''Authorize the request and store authorization credentials to YouTube''' flow = flow_from_clientsecrets(CLIENT_SECRETS_FILE, scope=YOUTUBE_READ_WRITE_SSL_SCOPE, message=MISSING_CLIENT_SECRETS_MESSAGE) storage = Storage("%s-oauth2.json" % sys.argv[0]) credentials = storage.get() if credentials is None or credentials.invalid: credentials = run_flow(flow, storage) # Trusted testers can download this discovery document from the # developers page and it should be in the same directory with the code. return build(API_SERVICE_NAME, API_VERSION, http=credentials.authorize(httplib2.Http()))
def initClientSecret(self, path='client_secrets.json'): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ if not j.sal.fs.exists(path, followlinks=True): raise j.exceptions.Input(message="Could not find google secrets file in %s, please dwonload" % path, level=1, source="", tags="", msgpub="") store = Storage(self.secretsFilePath) self._credentials = store.get() if not j.sal.fs.exists(self.secretsFilePath) or not self._credentials or self._credentials.invalid: flow = client.flow_from_clientsecrets(path, SCOPES) flow.user_agent = APPLICATION_NAME self._credentials = tools.run_flow(flow, store) # credentials = tools.run(flow, store) self.logger.info('Storing credentials to ' + self.secretsFilePath)
def code_auth(code): credentials = credentials_from_clientsecrets_and_code( 'client_secrets.json', SCOPES, code) storage = Storage('credentials_file') storage.put(credentials) print credentials.to_json()
def auth2(code): flow = get_flow() credentials = flow.step2_exchange(code) print credentials.to_json() storage = Storage('credentials_file') storage.put(credentials)
def get_credentials(): storage = Storage('credentials_file') credentials = storage.get() if credentials.access_token_expired: print 'Refreshing...' credentials.refresh(httplib2.Http()) return credentials
def save(self, cred_file): """Save credentials to an external file.""" self.check_credentials() storage = Storage(cred_file) storage.put(self.credentials) logger.info('Saved credentials to %s', cred_file)
def load(self, cred_file): """Load pre-saved credentials.""" storage = Storage(cred_file) self.credentials = storage.get()
def get_credentials(): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ home_dir = os.path.expanduser('~') credential_dir = os.path.join(home_dir, '.credentials') if not os.path.exists(credential_dir): os.makedirs(credential_dir) credential_path = os.path.join(credential_dir, 'admin-reports_v1-python-quickstart.json') store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES) flow.user_agent = APPLICATION_NAME if flags: credentials = tools.run_flow(flow, store, flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) print('Storing credentials to ' + credential_path) return credentials
def get_authenticated_service(args): flow = flow_from_clientsecrets(CLIENT_SECRETS_FILE, scope=YOUTUBE_READ_WRITE_SSL_SCOPE, message=MISSING_CLIENT_SECRETS_MESSAGE) storage = Storage("youtube-api-snippets-oauth2.json") credentials = storage.get() if credentials is None or credentials.invalid: credentials = run_flow(flow, storage, args) # Trusted testers can download this discovery document from the developers page # and it should be in the same directory with the code. return build(API_SERVICE_NAME, API_VERSION, http=credentials.authorize(httplib2.Http()))
def GetGoogleClient(self): """Returns an authenticated google fit client object""" logging.debug("Creating Google client") credentials = Storage(self.googleCredsFile).get() http = credentials.authorize(httplib2.Http()) client = build('fitness', 'v1', http=http) logging.debug("Google client created") return client
def GetDataSourceId(self, dataType): """Returns a data source id for Google Fit dataType -- type of data. Possible options: steps, weight, heart_rate """ dataSource = self.GetDataSource(dataType) projectNumber = Storage(self.googleCredsFile).get().client_id.split('-')[0] return ':'.join(( dataSource['type'], dataSource['dataType']['name'], projectNumber, dataSource['device']['manufacturer'], dataSource['device']['model'], dataSource['device']['uid']))
def get_credentials(): """ Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ home_dir = os.path.expanduser('~') credential_dir = os.path.join(home_dir, '.credentials') if not os.path.exists(credential_dir): os.makedirs(credential_dir) credential_path = os.path.join(credential_dir, 'quick-add.json') store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES) flow.user_agent = APPLICATION_NAME if flags: credentials = tools.run_flow(flow, store, flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) print('Storing credentials to ' + credential_path) return credentials
def get_credentials(): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ home_dir = os.path.expanduser('~') credential_dir = os.path.join(home_dir, '.credentials') if not os.path.exists(credential_dir): os.makedirs(credential_dir) credential_path = os.path.join(credential_dir, 'drive-python-quickstart.json') store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES) flow.user_agent = APPLICATION_NAME if flags: credentials = tools.run_flow(flow, store, flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) print('Storing credentials to ' + credential_path) return credentials
def get_credentials(): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ home_dir = os.path.expanduser('~') credential_dir = os.path.join(home_dir, '.credentials') if not os.path.exists(credential_dir): os.makedirs(credential_dir) credential_path = os.path.join(credential_dir,'academiadasapostas.json') store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES) flow.user_agent = APPLICATION_NAME if flags: credentials = tools.run_flow(flow, store, flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) print('Storing credentials to ' + credential_path) return credentials
def get_credentials(): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ home_dir = os.path.expanduser('~') credential_dir = os.path.join(home_dir, '.credentials') if not os.path.exists(credential_dir): os.makedirs(credential_dir) credential_path = os.path.join(credential_dir, 'sheets.googleapis.com-python-quickstart.json') store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES) flow.user_agent = APPLICATION_NAME if flags: credentials = tools.run_flow(flow, store, flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) print('Storing credentials to ' + credential_path) return credentials
def test_non_existent_file_storage(self, warn_mock): storage = file_module.Storage(FILENAME) credentials = storage.get() warn_mock.assert_called_with( _helpers._MISSING_FILE_MESSAGE.format(FILENAME)) self.assertIsNone(credentials)
def test_directory_file_storage(self): storage = file_module.Storage(FILENAME) os.mkdir(FILENAME) try: with self.assertRaises(IOError): storage.get() finally: os.rmdir(FILENAME)
def test_no_sym_link_credentials(self): SYMFILENAME = FILENAME + '.sym' os.symlink(FILENAME, SYMFILENAME) storage = file_module.Storage(SYMFILENAME) try: with self.assertRaises(IOError): storage.get() finally: os.unlink(SYMFILENAME)
def test_token_refresh_store_expired(self): expiration = (datetime.datetime.utcnow() - datetime.timedelta(minutes=15)) credentials = self._create_test_credentials(expiration=expiration) storage = file_module.Storage(FILENAME) storage.put(credentials) credentials = storage.get() new_cred = copy.copy(credentials) new_cred.access_token = 'bar' storage.put(new_cred) access_token = '1/3w' token_response = {'access_token': access_token, 'expires_in': 3600} response_content = json.dumps(token_response).encode('utf-8') http = http_mock.HttpMock(data=response_content) credentials._refresh(http) self.assertEquals(credentials.access_token, access_token) # Verify mocks. self.assertEqual(http.requests, 1) self.assertEqual(http.uri, credentials.token_uri) self.assertEqual(http.method, 'POST') expected_body = { 'grant_type': ['refresh_token'], 'client_id': [credentials.client_id], 'client_secret': [credentials.client_secret], 'refresh_token': [credentials.refresh_token], } self.assertEqual(urllib_parse.parse_qs(http.body), expected_body) expected_headers = { 'content-type': 'application/x-www-form-urlencoded', 'user-agent': credentials.user_agent, } self.assertEqual(http.headers, expected_headers)
def test_token_refresh_good_store(self): expiration = (datetime.datetime.utcnow() + datetime.timedelta(minutes=15)) credentials = self._create_test_credentials(expiration=expiration) storage = file_module.Storage(FILENAME) storage.put(credentials) credentials = storage.get() new_cred = copy.copy(credentials) new_cred.access_token = 'bar' storage.put(new_cred) credentials._refresh(None) self.assertEquals(credentials.access_token, 'bar')
def test_token_refresh_stream_body(self): expiration = (datetime.datetime.utcnow() + datetime.timedelta(minutes=15)) credentials = self._create_test_credentials(expiration=expiration) storage = file_module.Storage(FILENAME) storage.put(credentials) credentials = storage.get() new_cred = copy.copy(credentials) new_cred.access_token = 'bar' storage.put(new_cred) valid_access_token = '1/3w' token_response = {'access_token': valid_access_token, 'expires_in': 3600} http = http_mock.HttpMockSequence([ ({'status': http_client.UNAUTHORIZED}, b'Initial token expired'), ({'status': http_client.UNAUTHORIZED}, b'Store token expired'), ({'status': http_client.OK}, json.dumps(token_response).encode('utf-8')), ({'status': http_client.OK}, 'echo_request_body') ]) body = six.StringIO('streaming body') credentials.authorize(http) _, content = transport.request(http, 'https://example.com', body=body) self.assertEqual(content, 'streaming body') self.assertEqual(credentials.access_token, valid_access_token)
def test_credentials_delete(self): credentials = self._create_test_credentials() storage = file_module.Storage(FILENAME) storage.put(credentials) credentials = storage.get() self.assertIsNotNone(credentials) storage.delete() credentials = storage.get() self.assertIsNone(credentials)