我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_dict()。
def _load_credentials(cls, keydata): """Load ServiceAccountCredentials from Google service account JSON keyfile Args: keydata (dict): The loaded keyfile data from a Google service account JSON file Returns: oauth2client.service_account.ServiceAccountCredentials: Instance of service account credentials for this discovery service """ try: creds = ServiceAccountCredentials.from_json_keyfile_dict( keydata, scopes=cls._SCOPES) except (ValueError, KeyError): # This has the potential to raise errors. See: https://tinyurl.com/y8q5e9rm LOGGER.exception('Could not generate credentials from keyfile for %s', cls.type()) return False return creds
def get_authorized_http_object(scopes): '''JWT credentials authorization. :param scopes: the authorization scope to be requested :return: httplib2.Http object, with authorized credentials ''' def _build_jwt_dict(): jwt_dict = {key.replace(JWT_NAMESPACE, '').lower(): settings[key] for key in settings if key.startswith(JWT_NAMESPACE)} # Handle newlines in private key if 'private_key' in jwt_dict: jwt_dict['private_key'] = \ jwt_dict['private_key'].replace('\\n', '\n') jwt_dict['PROJECT_ID'] = settings['GOOGLE_API_PROJECT_ID'] return jwt_dict credentials = ServiceAccountCredentials.from_json_keyfile_dict( _build_jwt_dict(), scopes=scopes) return credentials.authorize(httplib2.Http())
def _get_application_default_credential_from_file(filename): """Build the Application Default Credentials from file.""" # read the credentials from the file with open(filename) as file_obj: client_credentials = json.load(file_obj) credentials_type = client_credentials.get('type') if credentials_type == AUTHORIZED_USER: required_fields = set(['client_id', 'client_secret', 'refresh_token']) elif credentials_type == SERVICE_ACCOUNT: required_fields = set(['client_id', 'client_email', 'private_key_id', 'private_key']) else: raise ApplicationDefaultCredentialsError( "'type' field should be defined (and have one of the '" + AUTHORIZED_USER + "' or '" + SERVICE_ACCOUNT + "' values)") missing_fields = required_fields.difference(client_credentials.keys()) if missing_fields: _raise_exception_for_missing_fields(missing_fields) if client_credentials['type'] == AUTHORIZED_USER: return GoogleCredentials( access_token=None, client_id=client_credentials['client_id'], client_secret=client_credentials['client_secret'], refresh_token=client_credentials['refresh_token'], token_expiry=None, token_uri=GOOGLE_TOKEN_URI, user_agent='Python client library') else: # client_credentials['type'] == SERVICE_ACCOUNT from oauth2client.service_account import ServiceAccountCredentials return ServiceAccountCredentials.from_json_keyfile_dict( client_credentials)
def __init__(self, config): self.api_key = config["apiKey"] self.auth_domain = config["authDomain"] self.database_url = config["databaseURL"] self.storage_bucket = config["storageBucket"] self.credentials = None self.requests = requests.Session() if config.get("serviceAccount"): scopes = [ 'https://www.googleapis.com/auth/firebase.database', 'https://www.googleapis.com/auth/userinfo.email', "https://www.googleapis.com/auth/cloud-platform" ] service_account_type = type(config["serviceAccount"]) if service_account_type is str: self.credentials = ServiceAccountCredentials.from_json_keyfile_name(config["serviceAccount"], scopes) if service_account_type is dict: self.credentials = ServiceAccountCredentials.from_json_keyfile_dict(config["serviceAccount"], scopes) if is_appengine_sandbox(): # Fix error in standard GAE environment # is releated to https://github.com/kennethreitz/requests/issues/3187 # ProtocolError('Connection aborted.', error(13, 'Permission denied')) adapter = appengine.AppEngineAdapter(max_retries=3) else: adapter = requests.adapters.HTTPAdapter(max_retries=3) for scheme in ('http://', 'https://'): self.requests.mount(scheme, adapter)
def authorize_service_account(json_credentials, scope, sub=None): """ authorize to the provide scope with a service credentials json file. :param json_credentials: dictonary representing a service account, in most cases created from json.load :param scope: scope(s) to authorize the application for :param sub: User ID to authorize the application for (if needed) :return Credentials to be used for http object """ try: from oauth2client.service_account import ServiceAccountCredentials except ImportError: ServiceAccountCredentials = None if ServiceAccountCredentials is not None and hasattr(ServiceAccountCredentials, "from_json_keyfile_dict"): # running oauth2client version 2.0 creds = ServiceAccountCredentials.from_json_keyfile_dict(json_credentials, scope) if sub is not None: creds = creds.create_delegated(sub) else: try: from oauth2client.client import SignedJwtAssertionCredentials except ImportError: raise EnvironmentError("Service account can not be used because PyCrypto is not available. Please install PyCrypto.") if not isinstance(scope, (list, tuple)): scope = [scope] creds = SignedJwtAssertionCredentials( service_account_name=json_credentials['client_email'], private_key=json_credentials['private_key'], scope=scope, sub=sub) return creds
def create_credentials(): scopes = ['https://www.googleapis.com/auth/admin.directory.resource.calendar', 'https://www.googleapis.com/auth/calendar'] client_secret_dict = ast.literal_eval(os.environ['MR_CLIENT_SECRET_JSON']) credentials = ServiceAccountCredentials.from_json_keyfile_dict( client_secret_dict, scopes=scopes).create_delegated(os.environ['MR_DELEGATED_ACCOUNT']) http = credentials.authorize(Http()) directory = build('admin', 'directory_v1', http=http) calendar = build('calendar', 'v3', http=http) return directory, calendar
def __init__(self, credentials_dict): """ Build GCE service account credentials from info stored in environment variables, then build a GCE API wrapper with those credentials. Only one of the two environment variables must be set. :param creds_env_var_name: JSON string that contains your GCE service account credentials :param creds_path_env_var_name: string that contains the path to the file containing your GCE service account credentials. """ credentials = ServiceAccountCredentials.from_json_keyfile_dict( credentials_dict, scopes='https://www.googleapis.com/auth/cloud-platform') self.compute = discovery.build('compute', 'v1', credentials=credentials) self.deployment_manager = discovery.build('deploymentmanager', 'v2', credentials=credentials) self.project_id = credentials_dict['project_id']
def get_gcloud(ctx, version: str= 'v1'): """ Get a configured Google Compute API Client instance. Note that the Google API Client is not threadsafe. Cache the instance locally if you want to avoid OAuth overhead between calls. Parameters ---------- version Compute API version """ SCOPES = 'https://www.googleapis.com/auth/compute' credentials = None if ctx.config.get('gcloud_json_keyfile_name'): credentials = ServiceAccountCredentials.from_json_keyfile_name( ctx.config.get('gcloud_json_keyfile_name'), scopes=SCOPES) if ctx.config.get('gcloud_json_keyfile_string'): keyfile = json.loads(ctx.config.get('gcloud_json_keyfile_string')) credentials = ServiceAccountCredentials.from_json_keyfile_dict( keyfile, scopes=SCOPES) if not credentials: credentials = GoogleCredentials.get_application_default() if not credentials: raise RuntimeError("Auth for Google Cloud was not configured") compute = discovery.build( 'compute', version, credentials=credentials, # https://github.com/google/google-api-python-client/issues/299#issuecomment-268915510 cache_discovery=False ) return compute
def get_service_account_credentials(self): # Bug fix for https://github.com/pydata/pandas/issues/12572 # We need to know that a supported version of oauth2client is installed # Test that either of the following is installed: # - SignedJwtAssertionCredentials from oauth2client.client # - ServiceAccountCredentials from oauth2client.service_account # SignedJwtAssertionCredentials is available in oauthclient < 2.0.0 # ServiceAccountCredentials is available in oauthclient >= 2.0.0 oauth2client_v1 = True oauth2client_v2 = True try: from oauth2client.client import SignedJwtAssertionCredentials except ImportError: oauth2client_v1 = False try: from oauth2client.service_account import ServiceAccountCredentials except ImportError: oauth2client_v2 = False if not oauth2client_v1 and not oauth2client_v2: raise ImportError("Missing oauth2client required for BigQuery " "service account support") from os.path import isfile try: if isfile(self.private_key): with open(self.private_key) as f: json_key = json.loads(f.read()) else: # ugly hack: 'private_key' field has new lines inside, # they break json parser, but we need to preserve them json_key = json.loads(self.private_key.replace('\n', ' ')) json_key['private_key'] = json_key['private_key'].replace( ' ', '\n') if compat.PY3: json_key['private_key'] = bytes( json_key['private_key'], 'UTF-8') if oauth2client_v1: return SignedJwtAssertionCredentials( json_key['client_email'], json_key['private_key'], self.scope, ) else: return ServiceAccountCredentials.from_json_keyfile_dict( json_key, self.scope) except (KeyError, ValueError, TypeError, AttributeError): raise InvalidPrivateKeyFormat( "Private key is missing or invalid. It should be service " "account private key JSON (file path or string contents) " "with at least two keys: 'client_email' and 'private_key'. " "Can be obtained from: https://console.developers.google." "com/permissions/serviceaccounts")