我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用oauth2client.tools.run_flow()。
def _get_credentials(self, config): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ store = oauth2client.contrib.dictionary_storage.DictionaryStorage(config, 'oauth2') credentials = store.get() if not credentials or credentials.invalid: print("Ask credentials for " + config['user_id']) flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES) flow.user_agent = APPLICATION_NAME parser = argparse.ArgumentParser(add_help=False) parser.add_argument('--logging_level', default='ERROR') parser.add_argument('--noauth_local_webserver', action='store_true', default=True, help='Do not run a local web server.') args = parser.parse_args([]) credentials = tools.run_flow(flow, store, args) config.save() return credentials
def get_credentials(): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ home_dir = os.path.expanduser('~') credential_path = CREDS_FILENAME store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES) flow.user_agent = APPLICATION_NAME credentials = tools.run_flow(flow, store) return credentials
def get_credentials(flags): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ store = Storage(flags.credfile) with warnings.catch_warnings(): warnings.simplefilter("ignore") credentials = store.get() if not credentials or credentials.invalid: flow = client.OAuth2WebServerFlow(**CLIENT_CREDENTIAL) credentials = tools.run_flow(flow, store, flags) print('credential file saved at\n\t' + flags.credfile) return credentials
def test_run_flow_webserver( self, webbrowser_open_mock, server_ctor_mock, logging_mock): server_ctor_mock.return_value = self.server self.server.query_params = {'code': 'auth_code'} # Successful exchange. returned_credentials = tools.run_flow( self.flow, self.storage, flags=self.server_flags) self.assertEqual(self.credentials, returned_credentials) self.assertEqual(self.flow.redirect_uri, 'http://localhost:8080/') self.flow.step2_exchange.assert_called_once_with( 'auth_code', http=None) self.storage.put.assert_called_once_with(self.credentials) self.credentials.set_store.assert_called_once_with(self.storage) self.assertTrue(self.server.handle_request.called) webbrowser_open_mock.assert_called_once_with( 'http://example.com/auth', autoraise=True, new=1)
def test_run_flow_webserver_fallback( self, input_mock, server_ctor_mock, logging_mock): server_ctor_mock.side_effect = socket.error() input_mock.return_value = 'auth_code' # It should catch the socket error and proceed as if # noauth_local_webserver was specified. returned_credentials = tools.run_flow( self.flow, self.storage, flags=self.server_flags) self.assertEqual(self.credentials, returned_credentials) self.assertEqual(self.flow.redirect_uri, client.OOB_CALLBACK_URN) self.flow.step2_exchange.assert_called_once_with( 'auth_code', http=None) self.assertTrue(server_ctor_mock.called) self.assertFalse(self.server.handle_request.called)
def auth_stored_credentials(self, client_secrets_file, scopes=[]): """Authorize stored credentials.""" try: import argparse parser = argparse.ArgumentParser(parents=[tools.argparser]) parser.add_argument('args', nargs=argparse.REMAINDER) flags = parser.parse_args() flags.noauth_local_webserver = True except ImportError: flags = None store = Storage(self.credentials_file) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets( client_secrets_file, scopes, ) flow.user_agent = self.app_name if flags: credentials = tools.run_flow(flow, store, flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) print 'Saved credentials to ' + self.credentials_file return credentials
def authorize_application(client_secret_file, scope, credential_cache_file='credentials_cache.json', flow_params=[]): """ authorize an application to the requested scope by asking the user in a browser. :param client_secret_file: json file containing the client secret for an offline application :param scope: scope(s) to authorize the application for :param credential_cache_file: if provided or not None, the credenials will be cached in a file. The user does not need to be reauthenticated :param flow_params: oauth2 flow parameters :return OAuth2Credentials object """ FLOW = flow_from_clientsecrets(client_secret_file, scope=scope) storage = Storage(credential_cache_file) credentials = storage.get() if credentials is None or credentials.invalid: # Run oauth2 flow with default arguments. level = logging.getLogger().level credentials = tools.run_flow(FLOW, storage, tools.argparser.parse_args(flow_params)) logging.getLogger().setLevel(level) return credentials
def new_credentials(): """Perform OAuth2 flow to obtain the new credentials. Return: Credentials, the obtained credential. """ credential_path = os.path.join(args.creds_dir, 'ct_gdrive_creds.json') store = oauth2client.file.Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(args.client_secret, SCOPES) flow.user_agent = APPLICATION_NAME credentials = tools.run_flow(flow, store, args) print('Storing credentials to ' + credential_path) return credentials
def get_drive_service(): ''' Returns an object used to interact with the Google Drive API. ''' flow = client.flow_from_clientsecrets( get_credentials_path('secret.json'), 'https://www.googleapis.com/auth/drive') flow.user_agent = USER_AGENT_NAME store = Storage(get_credentials_path('storage.dat', False)) credentials = store.get() if not credentials or credentials.invalid: flags = tools.argparser.parse_args(args=[]) credentials = tools.run_flow(flow, store, flags) http = credentials.authorize(httplib2.Http()) service = discovery.build('drive', 'v3', http=http) return service
def get_credentials(): """ credentials????????? """ dirname = os.path.dirname(__file__) credential_path = os.path.join(dirname, CREDENTIAL_FILE) client_secret_file = os.path.join(dirname, CLIENT_SECRET_FILE) store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(client_secret_file, SCOPES) flow.user_agent = APPLICATION_NAME credentials = tools.run_flow(flow, store) print('credentials?{}???????'.format(credential_path)) return credentials
def get_user_account_credentials(self): from oauth2client.client import OAuth2WebServerFlow from oauth2client.file import Storage from oauth2client.tools import run_flow, argparser flow = OAuth2WebServerFlow( client_id=('495642085510-k0tmvj2m941jhre2nbqka17vqpjfddtd' '.apps.googleusercontent.com'), client_secret='kOc9wMptUtxkcIFbtZCcrEAc', scope=self.scope, redirect_uri='urn:ietf:wg:oauth:2.0:oob') storage = Storage('bigquery_credentials.dat') credentials = storage.get() if credentials is None or credentials.invalid or self.reauth: credentials = run_flow(flow, storage, argparser.parse_args([])) return credentials
def initYoutube(): flow = flow_from_clientsecrets(constants.YOUTUBE_CLIENT_SECRETS_FILE,\ message=constants.MISSING_CLIENT_SECRETS_MESSAGE,\ scope=constants.YOUTUBE_READ_WRITE_SCOPE, redirect_uri='http://localhost:8080/') storage = Storage("%s-oauth2.json" % sys.argv[0]) credentials = storage.get() if credentials is None or credentials.invalid: flags = argparser.parse_args() credentials = run_flow(flow, storage, flags) else: print("Found valid oauth2 json") youtube = build(constants.YOUTUBE_API_SERVICE_NAME, constants.YOUTUBE_API_VERSION, http=credentials.authorize(httplib2.Http())) print("Done authentication with Youtube") return youtube
def get_credentials(self): conf_dir = self.get_conf_dir() credential_path = os.path.join(conf_dir, self.event + '_credentials.json') store = oauth2client.file.Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: log.warn("No current valid Google credentials. Starting authentication flow...") flow = client.flow_from_clientsecrets(os.path.join(conf_dir, 'client_secret.json'), 'https://www.googleapis.com/auth/calendar') flow.user_agent = "HOTBot" if self.flags: credentials = tools.run_flow(flow, store, self.flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) log.info('Storing credentials to ' + credential_path) return credentials
def update_credentials(self): """Gets valid user credentials from Gmail using Oauth 2.0 Returns: Credentials, the obtained credential. """ if not os.path.exists(self._config['credentials_dir']): os.makedirs(self._config['credentials_dir']) store = Storage(self._credentials_file) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(self._client_secret_file, self._config['oauth2_scopes']) flow.user_agent = self._config['application_name'] credentials = tools.run_flow(flow, store, self._oauth2_flags) self._logger.info('Storing credentials to ' + self._credentials_file) else: self._logger.info("Credentials exist.") return credentials
def _get_credentials(self): """Get OAuth credentials :return: OAuth credentials :rtype: :class:`oauth2client.client.Credentials` """ credential_dir = join(self.config['creds_cache_dir'], 'cached_oauth_credentials') if not exists(credential_dir): makedirs(credential_dir) credential_path = join(credential_dir, 'googleapis.json') store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(self.config['creds'], self.config['scope']) flow.user_agent = 'Iris Gmail Integration' credentials = tools.run_flow( flow, store, tools.argparser.parse_args(args=['--noauth_local_webserver'])) logger.info('Storing credentials to %s' % credential_path) else: credentials.refresh(self.http) return credentials
def get_credentials(self, client_secret_file, scopes, credentials_dir, credentials_file, flags=None): # Create credentials folder, if necessary if not os.path.exists(credentials_dir): os.makedirs(credentials_dir) # Store for credentials file credential_path = os.path.join(credentials_dir, credentials_file) store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(client_secret_file, scopes) flow.user_agent = APPLICATION_NAME if self._flags: credentials = tools.run_flow(flow, store, self._flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) self._logger.debug("Storing credentials to '{}'".format(credential_path)) else: self._logger.debug("Got valid credential from '{}'".format(credential_path)) return credentials # Create a Google Calendar API service object
def get_credentials(): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ credential_path = os.path.join(SAVED_CREDENTIALS) print(credential_path) store = oauth2client.file.Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES) flow.user_agent = APPLICATION_NAME if flags: credentials = tools.run_flow(flow, store, flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) print('Storing credentials to ' + credential_path) return credentials
def get_credentials(): home_dir = os.path.expanduser('~') credential_dir = os.path.join(home_dir, '.credentials') print("checking for cached credentials") if not os.path.exists(credential_dir): os.makedirs(credential_dir) credential_path = os.path.join(credential_dir,'mycroft-gmail-skill.json') store = oauth2client.file.Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: credentials = tools.run_flow(OAuth2WebServerFlow(client_id=CID,client_secret=CIS,scope=SCOPES,user_agent=APPLICATION_NAME),store) print 'Storing credentials to ' + credential_dir print 'Your GMail Skill is now authenticated ' else: print 'Loaded credentials for Gmail Skill from ' + credential_dir return credentials
def main(): flow = client.flow_from_clientsecrets(CLIENT_SECRETS_PATH, SCOPES) print('Starting credentials flow...') credentials = tools.run_flow(flow, NullStorage()) # Save the credentials in the same format as the Cloud SDK's authorized # user file. data = { 'type': 'authorized_user', 'client_id': flow.client_id, 'client_secret': flow.client_secret, 'refresh_token': credentials.refresh_token } with open(AUTHORIZED_USER_PATH, 'w') as fh: json.dump(data, fh, indent=4) print('Created {}.'.format(AUTHORIZED_USER_PATH))
def _get_credentials(self): """Get OAuth credentials :return: OAuth credentials :rtype: :class:`oauth2client.client.Credentials` """ credential_dir = join(self.var_dir, 'cached_oauth_credentials') if not exists(credential_dir): makedirs(credential_dir) credential_path = join(credential_dir, 'googleapis.json') store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(self.config['creds'], self.config['scope']) flow.user_agent = 'Iris Gmail Integration' credentials = tools.run_flow( flow, store, tools.argparser.parse_args(args=['--noauth_local_webserver'])) logger.info('Storing credentials to %s', credential_path) else: credentials.refresh(self.http) return credentials
def get_credentials(): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ import argparse flags = argparse.ArgumentParser(parents=[tools.argparser]).parse_args() flags.noauth_local_webserver = True store = Storage(gmail_api.CREDENTIAL_FILE) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(gmail_api.CLIENT_SECRET_FILE, gmail_api.SCOPES) flow.user_agent = gmail_api.APPLICATION_NAME if flags: credentials = tools.run_flow(flow, store, flags) print 'Storing credentials to ' + gmail_api.CREDENTIAL_FILE return credentials
def get_credentials(): home_dir = path.expanduser('~') credential_dir = path.join(home_dir, '.credentials') if not path.exists(credential_dir): makedirs(credential_dir) credential_path = path.join(credential_dir, 'sheets.googleapis.com-python-quickstart.json') store = file.Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(client_secret_path, SCOPES) flow.user_agent = APPLICATION_NAME if flags: credentials = tools.run_flow(flow, store, flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) print('Storing credentials to ' + credential_path) return credentials
def get_credentials(self, cred_file='client_secret.json'): import os from oauth2client.file import Storage from oauth2client import client home_dir = os.path.expanduser('~') credential_dir = os.path.join(home_dir, '.credentials') if not os.path.exists(credential_dir): os.makedirs(credential_dir) credential_path = os.path.join(credential_dir, 'python-quickstart.json') store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(cred_file, scope=['https://www.googleapis.com/auth/spreadsheets', 'https://www.googleapis.com/auth/drive']) flow.user_agent = self.app_name try: credentials = tools.run_flow(flow, store, self.flags) except: # Needed only for compatibility with Python 2.6 try: credentials = tools.run_flow(flow, store) except: credentials = tools.run(flow, store) print('Storing credentials to ' + credential_path) return credentials
def get_credentials(): home_dir = os.path.expanduser('~') credential_dir = os.path.join(home_dir, '.credentials') print("checking for cached credentials") if not os.path.exists(credential_dir): os.makedirs(credential_dir) credential_path = os.path.join(credential_dir,'mycroft-googlecalendar-skill.json') store = oauth2client.file.Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: credentials = tools.run_flow(OAuth2WebServerFlow(client_id=CID,client_secret=CIS,scope=SCOPES,user_agent=APPLICATION_NAME),store) print 'Storing credentials to ' + credential_dir print 'Your Google Calendar Skill is now authenticated ' else: print 'Loaded credentials for Google Calendar Skill from ' + credential_dir return credentials
def get_credentials(): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ credential_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '.credentials') if not os.path.exists(credential_dir): os.makedirs(credential_dir) credential_path = os.path.join(credential_dir,'drive-to-photos.json') store = oauth2client.file.Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES) flow.user_agent = APPLICATION_NAME if flags: credentials = tools.run_flow(flow, store, flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) print 'Storing credentials to ' + credential_path return credentials
def get_credentials(): # taken from https://developers.google.com/google-apps/calendar/quickstart/python global credentials home_dir = os.path.expanduser('~') credential_dir = os.path.join(home_dir, '.credentials') if not os.path.exists(credential_dir): print('Creating', credential_dir) os.makedirs(credential_dir) credential_path = os.path.join(credential_dir, 'pi_remind.json') store = oauth2client.file.Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES) flow.user_agent = APPLICATION_NAME if flags: credentials = tools.run_flow(flow, store, flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) print('Storing credentials to', credential_path) return credentials
def get_authenticated_service(): ''' Authorize the request and store authorization credentials. ''' flow = flow_from_clientsecrets(CLIENT_SECRETS_FILE, scope=YOUTUBE_READ_WRITE_SSL_SCOPE, message=MISSING_CLIENT_SECRETS_MESSAGE) storage = Storage("%s-oauth2.json" % sys.argv[0]) credentials = storage.get() if credentials is None or credentials.invalid: credentials = run_flow(flow, storage) # Trusted testers can download this discovery document from the # developers page and it should be in the same directory with the code. return build(API_SERVICE_NAME, API_VERSION, http=credentials.authorize(httplib2.Http()))
def get_authenticated_service(): '''Authorize the request and store authorization credentials to YouTube''' flow = flow_from_clientsecrets(CLIENT_SECRETS_FILE, scope=YOUTUBE_READ_WRITE_SSL_SCOPE, message=MISSING_CLIENT_SECRETS_MESSAGE) storage = Storage("%s-oauth2.json" % sys.argv[0]) credentials = storage.get() if credentials is None or credentials.invalid: credentials = run_flow(flow, storage) # Trusted testers can download this discovery document from the # developers page and it should be in the same directory with the code. return build(API_SERVICE_NAME, API_VERSION, http=credentials.authorize(httplib2.Http()))
def initClientSecret(self, path='client_secrets.json'): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ if not j.sal.fs.exists(path, followlinks=True): raise j.exceptions.Input(message="Could not find google secrets file in %s, please dwonload" % path, level=1, source="", tags="", msgpub="") store = Storage(self.secretsFilePath) self._credentials = store.get() if not j.sal.fs.exists(self.secretsFilePath) or not self._credentials or self._credentials.invalid: flow = client.flow_from_clientsecrets(path, SCOPES) flow.user_agent = APPLICATION_NAME self._credentials = tools.run_flow(flow, store) # credentials = tools.run(flow, store) self.logger.info('Storing credentials to ' + self.secretsFilePath)
def get_credentials(): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ home_dir = os.path.expanduser('~') credential_dir = os.path.join(home_dir, '.credentials') if not os.path.exists(credential_dir): os.makedirs(credential_dir) credential_path = os.path.join(credential_dir, 'sendEmail.json') store = oauth2client.file.Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES) flow.user_agent = APPLICATION_NAME if flags: credentials = tools.run_flow(flow, store, flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) print('Storing credentials to ' + credential_path) return credentials
def get_credentials(): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ home_dir = os.path.expanduser('~') credential_dir = os.path.join(home_dir, '.credentials') if not os.path.exists(credential_dir): os.makedirs(credential_dir) credential_path = os.path.join(credential_dir, 'admin-reports_v1-python-quickstart.json') store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES) flow.user_agent = APPLICATION_NAME if flags: credentials = tools.run_flow(flow, store, flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) print('Storing credentials to ' + credential_path) return credentials
def get_credentials(): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ home_dir = os.path.expanduser('~') credential_dir = os.path.join(home_dir, '.credentials') if not os.path.exists(credential_dir): os.makedirs(credential_dir) credential_path = os.path.join(credential_dir, 'drive-python-quickstart.json') store = oauth2client.file.Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES) flow.user_agent = APPLICATION_NAME if flags: credentials = tools.run_flow(flow, store, flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) print('Storing credentials to ' + credential_path) return credentials
def get_authenticated_service(args): flow = flow_from_clientsecrets(CLIENT_SECRETS_FILE, scope=YOUTUBE_READ_WRITE_SSL_SCOPE, message=MISSING_CLIENT_SECRETS_MESSAGE) storage = Storage("youtube-api-snippets-oauth2.json") credentials = storage.get() if credentials is None or credentials.invalid: credentials = run_flow(flow, storage, args) # Trusted testers can download this discovery document from the developers page # and it should be in the same directory with the code. return build(API_SERVICE_NAME, API_VERSION, http=credentials.authorize(httplib2.Http()))
def create_credentials(client_secret_path): """Gets valid user credentials from storage. Args: path (str), path to client_secret.json file """ flow = client.flow_from_clientsecrets(client_secret_path, ' '.join(SCOPES)) flow.user_agent = 'Clouseau' flow.params['access_type'] = 'offline' flow.params['approval_prompt'] = 'force' store = oauth2client.file.Storage(CREDENTIALS_PATH) flags = argparse.ArgumentParser(parents=[tools.argparser]).parse_args(['--noauth_local_webserver']) tools.run_flow(flow, store, flags)
def get_credentials(): """ Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ home_dir = os.path.expanduser('~') credential_dir = os.path.join(home_dir, '.credentials') if not os.path.exists(credential_dir): os.makedirs(credential_dir) credential_path = os.path.join(credential_dir, 'quick-add.json') store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES) flow.user_agent = APPLICATION_NAME if flags: credentials = tools.run_flow(flow, store, flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) print('Storing credentials to ' + credential_path) return credentials
def get_credentials(): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ home_dir = os.path.expanduser('~') credential_dir = os.path.join(home_dir, '.credentials') if not os.path.exists(credential_dir): os.makedirs(credential_dir) credential_path = os.path.join(credential_dir, 'drive-python-quickstart.json') store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES) flow.user_agent = APPLICATION_NAME if flags: credentials = tools.run_flow(flow, store, flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) print('Storing credentials to ' + credential_path) return credentials
def get_credentials(): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ home_dir = os.path.expanduser('~') credential_dir = os.path.join(home_dir, '.credentials') if not os.path.exists(credential_dir): os.makedirs(credential_dir) credential_path = os.path.join(credential_dir,'academiadasapostas.json') store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES) flow.user_agent = APPLICATION_NAME if flags: credentials = tools.run_flow(flow, store, flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) print('Storing credentials to ' + credential_path) return credentials
def get_credentials(): """Gets valid user credentials from storage. If nothing has been stored, or if the stored credentials are invalid, the OAuth2 flow is completed to obtain the new credentials. Returns: Credentials, the obtained credential. """ home_dir = os.path.expanduser('~') credential_dir = os.path.join(home_dir, '.credentials') if not os.path.exists(credential_dir): os.makedirs(credential_dir) credential_path = os.path.join(credential_dir, 'sheets.googleapis.com-python-quickstart.json') store = Storage(credential_path) credentials = store.get() if not credentials or credentials.invalid: flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES) flow.user_agent = APPLICATION_NAME if flags: credentials = tools.run_flow(flow, store, flags) else: # Needed only for compatibility with Python 2.6 credentials = tools.run(flow, store) print('Storing credentials to ' + credential_path) return credentials
def test_run_flow_no_webserver(self, input_mock, logging_mock): input_mock.return_value = 'auth_code' # Successful exchange. returned_credentials = tools.run_flow(self.flow, self.storage) self.assertEqual(self.credentials, returned_credentials) self.assertEqual(self.flow.redirect_uri, client.OOB_CALLBACK_URN) self.flow.step2_exchange.assert_called_once_with( 'auth_code', http=None) self.storage.put.assert_called_once_with(self.credentials) self.credentials.set_store.assert_called_once_with(self.storage)
def test_run_flow_no_webserver_explicit_flags( self, input_mock, logging_mock): input_mock.return_value = 'auth_code' # Successful exchange. returned_credentials = tools.run_flow( self.flow, self.storage, flags=self.flags) self.assertEqual(self.credentials, returned_credentials) self.assertEqual(self.flow.redirect_uri, client.OOB_CALLBACK_URN) self.flow.step2_exchange.assert_called_once_with( 'auth_code', http=None)
def test_run_flow_no_webserver_exchange_error( self, input_mock, logging_mock): input_mock.return_value = 'auth_code' self.flow.step2_exchange.side_effect = client.FlowExchangeError() # Error while exchanging. with self.assertRaises(SystemExit): tools.run_flow(self.flow, self.storage, flags=self.flags) self.flow.step2_exchange.assert_called_once_with( 'auth_code', http=None)
def test_run_flow_webserver_exchange_error( self, webbrowser_open_mock, server_ctor_mock, logging_mock): server_ctor_mock.return_value = self.server self.server.query_params = {'error': 'any error'} # Exchange returned an error code. with self.assertRaises(SystemExit): tools.run_flow(self.flow, self.storage, flags=self.server_flags) self.assertTrue(self.server.handle_request.called)