我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用googleapiclient.errors.HttpError()。
def title_from_youtube(bot, url): try: youtube_api_key = bot.config.get_by_path(["spotify", "youtube"]) youtube_client = build("youtube", "v3", developerKey=youtube_api_key) except (KeyError, TypeError) as e: logger.error("<b>YouTube API key isn't configured:</b> {}".format(e)) return "" # Regex by mantish from http://stackoverflow.com/a/9102270 to get the # video id from a YouTube URL. match = re.match( r"^.*(youtu.be\/|v\/|u\/\w\/|embed\/|watch\?v=|\&v=)([^#\&\?]*).*", url) if match and len(match.group(2)) == 11: video_id = match.group(2) else: logger.error("Unable to extract video id: {}".format(url)) return "" # YouTube response is JSON. try: response = youtube_client.videos().list( # pylint: disable=no-member part="snippet", id=video_id).execute() items = response.get("items", []) if items: return items[0]["snippet"]["title"] else: logger.error("<b>YouTube response was empty:</b> {}" .format(response)) return "" except YouTubeHTTPError as e: logger.error("Unable to get video entry from {}, {}".format(url, e)) return ""
def get_object(bucket, filename): if test_mode: return 'Dummy Object' try: service = _create_service() # Use get_media instead of get to get the actual contents of the object. # http://g.co/dv/resources/api-libraries/documentation/storage/v1/python/latest/storage_v1.objects.html#get_media req = service.objects().get_media(bucket=bucket, object=filename) out_file = io.BytesIO() downloader = http.MediaIoBaseDownload(out_file, req) done = False while done is False: status, done = downloader.next_chunk() logging.info("Download {}%.".format(int(status.progress() * 100))) return out_file.getvalue() except errors.HttpError as e: if e.resp.status == 404: raise NotFoundError() raise
def execute(self, kintaro, force=False): try: project = kintaro.projects().rpcGetProject(body={ 'project_id': self.project_id, }).execute() except errors.HttpError as e: logging.exception('Error fetching -> {}'.format(self)) return self.modified_by = project['mod_info'].get('updated_by') self.modified = datetime.datetime.fromtimestamp( int(project['mod_info']['updated_on_millis']) / 1000.0) if force or self.last_run is None or self.modified > self.last_run: if self.webhook_url: self.run_webhook(project) else: logging.info('Skipping (no webhook) -> {}'.format(self)) else: logging.info('Skipping (up-to-date) -> {}'.format(self)) self.last_run = datetime.datetime.now() self.put()
def response(self, resp, content): """Convert the response wire format into a Python object. Args: resp: httplib2.Response, the HTTP response headers and status content: string, the body of the HTTP response Returns: The body de-serialized as a Python object. Raises: googleapiclient.errors.HttpError if a non 2xx response is received. """ self._log_response(resp, content) # Error handling is TBD, for example, do we retry # for some operation/error combinations? if resp.status < 300: if resp.status == 204: # A 204: No Content response should be treated differently # to all the other success states return self.no_content_response return self.deserialize(content) else: logging.debug('Content from bad request was: %s' % content) raise HttpError(resp, content)
def response(self, resp, content): """Convert the response wire format into a Python object. Args: resp: httplib2.Response, the HTTP response headers and status content: string, the body of the HTTP response Returns: The body de-serialized as a Python object. Raises: googleapiclient.errors.HttpError if a non 2xx response is received. """ self._log_response(resp, content) # Error handling is TBD, for example, do we retry # for some operation/error combinations? if resp.status < 300: if resp.status == 204: # A 204: No Content response should be treated differently # to all the other success states return self.no_content_response return self.deserialize(content) else: LOGGER.debug('Content from bad request was: %s' % content) raise HttpError(resp, content)
def get_test_resource(cls): """Return the GCE Instance the audit loop uses. Returns: Dictionary representing the GCE instance, None if not found. Raises: errors.HttpError: On a failed API call. """ try: resource = api.CLIENTS.compute.instances().get( instance=AuditLogLoop.RESOURCE_NAME, project=config.get_project_id(), zone=AuditLogLoop.ZONE).execute() except errors.HttpError as e: if e.resp.status == 404: return None else: raise return resource
def delete_test_resource(self): """Call GCE API to delete the test resource. Returns: GCE operation. Raises: errors.HttpError: On a failed API call. """ request = api.CLIENTS.compute.instances().delete( instance=AuditLogLoop.RESOURCE_NAME, project=config.get_project_id(), zone=AuditLogLoop.ZONE) try: operation = request.execute() except errors.HttpError as e: if e.resp.status == 404: logging.warning('test resource does not exist') return None else: raise self.record_call('delete', operation) return operation
def insert_test_resource(self): """Call GCE API to create the test resource. Returns: GCE operation. Raises: errors.HttpError: On a failed API call. """ body = self.create_test_resource_body() try: request = api.CLIENTS.compute.instances().insert( project=config.get_project_id(), zone=AuditLogLoop.ZONE, body=body) operation = request.execute() except errors.HttpError as e: if e.resp.status == 409: logging.warning('test resource already exists') return None else: raise self.record_call('insert', operation) return operation
def validate_region(region, project_id, credentials): cloudcompute = cloud_service(credentials, 'compute') regionrequest = cloudcompute.regions().get( region=region, project=project_id ) try: regionrequest.execute() except HttpError as err: if err.resp.status == 404: click.echo( 'Error: Couldn\'t find region "{}" for project "{}".'.format( region, project_id)) elif err.resp.status == 403: click.echo('Error: Forbidden access to resources.') click.echo('Raw response:\n{}'.format(err.content)) click.echo( 'Make sure to enable "Cloud Compute API", "ML Engine" and ' '"Storage" for project.') else: click.echo('Unknown error: {}'.format(err.resp)) sys.exit(1)
def _create_secgrp_rules_if_needed(self, context, secgrp_ids): try: core_plugin = NeutronManager.get_plugin() except AttributeError: core_plugin = directory.get_plugin() secgrp_rules = [] for secgrp_id in secgrp_ids: secgrp = core_plugin.get_security_group(context._plugin_context, secgrp_id) secgrp_rules.extend(secgrp['security_group_rules']) if secgrp_rules: network_name = self._gce_subnet_network_name(context) compute, project = self.gce_svc, self.gce_project network = gceutils.get_network(compute, project, network_name) network_link = network['selfLink'] for secgrp_rule in secgrp_rules: try: gce_rule_name = self._gce_secgrp_id(secgrp_rule['id']) gceutils.get_firewall_rule(compute, project, gce_rule_name) except gce_errors.HttpError: self._create_secgrp_rule(context, secgrp_rule, network_link)
def test_delete_sg_rule_with_error(self, mock_delete, mock_wait): http_error = gce_errors.HttpError( httplib2.Response({ 'status': 404, 'reason': 'Not Found' }), content='') mock_delete.side_effect = http_error mock_wait.side_effect = gce_mock.wait_for_operation sg_rule = self.get_fake_sg_rule() self.assertIsNone( self._driver._delete_secgrp_rule(self.context, sg_rule['id'])) mock_delete.assert_called_once_with(self._driver.gce_svc, self._driver.gce_project, "secgrp-" + sg_rule['id']) mock_wait.assert_not_called()
def update_project(self, project_id, body): """Return an updated project resource.""" params = { 'projectId': project_id, 'body': body, } projects_update = self.crm.projects().update(**params) try: return projects_update.execute() except errors.HttpError as httperror: error = json.loads(httperror.content)['error'] print '[%s]' % error['message'] return {} # # Admin SDK - Directory #
def exists(self, bucket, object): """ Checks for the existence of a file in Google Cloud Storage. :param bucket: The Google cloud storage bucket where the object is. :type bucket: string :param object: The name of the object to check in the Google cloud storage bucket. :type object: string """ service = self.get_conn() try: service \ .objects() \ .get(bucket=bucket, object=object) \ .execute() return True except errors.HttpError as ex: if ex.resp['status'] == '404': return False raise # pylint:disable=redefined-builtin
def delete(self, bucket, object, generation=None): """ Delete an object if versioning is not enabled for the bucket, or if generation parameter is used. :param bucket: name of the bucket, where the object resides :type bucket: string :param object: name of the object to delete :type object: string :param generation: if present, permanently delete the object of this generation :type generation: string :return: True if succeeded """ service = self.get_conn() try: service \ .objects() \ .delete(bucket=bucket, object=object, generation=generation) \ .execute() return True except errors.HttpError as ex: if ex.resp['status'] == '404': return False raise
def drop_tables(self, table_ids, force=None): """ Delete google fusiontables tables. """ results = [] errors = [] for table_id in table_ids: try: results = {table_id: self.drop_table(table_id)} except GoogleHttpError: msg = "Table '{0}' does not exist.".format( table_id ) if not force: raise TableDoesNotExistException(msg) errors.append(msg) return results, errors
def list_disks(compute, project, zone): """ Lists all persistent disks used by project """ backup_logger.debug("Finding all disks for specified project") all_disks = [] try: result = compute.disks().list(project=project, zone=zone).execute() all_disks.extend(result['items']) while 'nextPageToken' in result: result = compute.disks().list(project=project, zone=zone, \ pageToken=result['nextPageToken']).execute() all_disks.extend(result['items']) except HttpError: backup_logger.error("Error with HTTP request made to list_disks") sys.exit(1) return all_disks
def list_snapshots(compute, project): """ Lists all snapshots created for this project """ backup_logger.debug("Finding all snapshots for specified project") all_snapshots = [] try: result = compute.snapshots().list(project=project).execute() all_snapshots.extend(result['items']) while 'nextPageToken' in result: result = compute.snapshots().list(project=project, \ pageToken=result['nextPageToken']).execute() all_snapshots.extend(result['items']) except HttpError: backup_logger.error("Error with HTTP request made to list_snapshots") sys.exit(1) return all_snapshots
def _gadmin_user_delete(service, message, email): """ ????????????? :param service: Google API ???? :param email: ??????? """ try: # ????????????????? result = service.users().get(userKey=email).execute() if not result['suspended']: botsend(message, '????????????????????\n' '`$gadmin user suspend {}` ????????????????????'.format(email)) else: service.users().delete(userKey=email).execute() botsend(message, '???? `{}` ???????'.format(email)) except HttpError as e: botsend(message, '??????????????\n`{}`'.format(e))
def _gadmin_user_update(service, message, email, suspended): """ ???????????? :param service: Google API ???? :param email: ??????? :param suspended: ????????True or False """ body = { 'suspended': suspended, } try: service.users().update(userKey=email, body=body).execute() if suspended: botsend(message, '???? `{}` ???????'.format(email)) else: botsend(message, '???? `{}` ???????'.format(email)) except HttpError as e: botsend(message, '????????????????\n`{}`'.format(e))
def _gadmin_user_password_reset(service, message, email): """ ????????????????? :param service: Google API ???? :param email: ??????? """ # ?????????? password = _generate_password() body = { 'password': password, } try: service.users().update(userKey=email, body=body).execute() botsend(message, '???? `{}` ???????????????'.format(email)) # password ????????DM???? _send_password_on_dm(message, email, password) except HttpError as e: botsend(message, '?????????????????????\n`{}`'.format(e))
def gadmin_alias_list(message, email): """ ???????????????????? :param email: ??????? """ service = _get_service() email = _get_default_domain_email(email) try: result = service.users().aliases().list(userKey=email).execute() msg = '' aliases = result.get('aliases', []) if aliases: msg = '`{}` ????????\n'.format(email) msg += ', '.join('`{}`'.format(alias['alias']) for alias in aliases) botsend(message, msg) else: msg = '`{}` ????????????'.format(email) botsend(message, msg) except HttpError as e: botsend(message, '??????????????\n`{}`'.format(e))
def _gadmin_group_insert(message, service, group, name): """ ????????????? :param service: Google API ???? :param group: ???????????? :param name: ??????? """ body = { 'name': name, 'email': group, } try: service.groups().insert(body=body).execute() except HttpError as e: botsend(message, '??????????????\n`{}`'.format(e)) return botsend(message, '`{}` ???????????'.format(group)) botsend(message, '`$gadmin member insert {} ???????` ??????????????????'.format(group))
def _gadmin_group_delete(message, service, group): """ ????????????? :param service: Google API ???? :param group: ???????????? """ try: result = service.groups().get(groupKey=group).execute() count = result['directMembersCount'] if count != '0': # ???????????????? botsend(message, ''' `{group}` ????????????????????? `$gadmin member delete {group} ???????` ???????????????????????? `$gadmin member list {group}` ?????????????????? '''.format(group=group)) else: service.groups().delete(groupKey=group).execute() botsend(message, '`{}` ???????????'.format(group)) except HttpError as e: botsend(message, '??????????????\n`{}`'.format(e)) return
def gadmin_member_list(message, group): """ ?????????????? :param key: ????????????(@?????) """ service = _get_service() group = _get_default_domain_email(group) try: members_list = service.members().list(groupKey=group).execute() except HttpError: botsend(message, '`{}` ???????????????'.format(group)) return count = 0 msg = '' for member in members_list.get('members', []): email = member['email'] msg += '- {}\n'.format(email) count += 1 msg = '*{}* ?????????({}????)\n'.format(group, count) + msg botsend(message, msg)
def _gadmin_member_insert(message, service, group, emails): """ ?????????????????????? :param service: Google API ???? :param group: ???????????? :param mail: ??/???????????????? """ for email in emails: body = { 'email': email, } try: service.members().insert(groupKey=group, body=body).execute() botsend(message, '`{}` ????? `{}` ???????'.format(group, email)) except HttpError as e: # TODO: ??????????????????????????? botsend(message, '??????????????\n`{}`'.format(e))
def _call_api(self, method_name, *args, **kwargs): wait_sec = 1 for i in range(7): try: method = getattr(self, '_api_{}'.format(method_name)) return method(*args, **kwargs) except HttpError as error: error = self._create_error(error) if error.code == 403 and error.reason == 'userRateLimitExceeded': time.sleep(wait_sec) wait_sec *= 2 continue error.method = method_name error.method_args = args error.method_kwargs = kwargs error.common_params = self.common_params raise error
def _get_project_networks(self): """Enumerate the current project networks and returns a sorted list. Returns: list: A sorted list of network names. """ networks = set() try: response = self.firewall_api.list_networks( self.project_id, fields='items/selfLink') except errors.HttpError as e: LOGGER.error('Error listing networks for project %s: %s', self.project_id, e) else: for item in response.get('items', []): if 'selfLink' in item: network_name = fe.get_network_name_from_url( item['selfLink']) networks.add(network_name) else: LOGGER.error('Network URL not found in %s for project %s', item, self.project_id) return sorted(networks)
def get_instances(self, project_id): """Gets all CloudSQL instances for a project. Args: project_id (int): The project id for a GCP project. Returns: list: A list of database Instance resource dicts for a project_id. https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances [{"kind": "sql#instance", "name": "sql_instance1", ...} {"kind": "sql#instance", "name": "sql_instance2", ...}, {...}] Raises: ApiExecutionError: ApiExecutionError is raised if the call to the GCP ClodSQL API fails """ try: paged_results = self.repository.instances.list(project_id) return api_helpers.flatten_list_results(paged_results, 'items') except (errors.HttpError, HttpLib2Error) as e: LOGGER.warn(api_errors.ApiExecutionError(project_id, e)) raise api_errors.ApiExecutionError('instances', e)
def get_global_operation(self, project_id, operation_id): """Get the Operations Status Args: project_id (str): The project id. operation_id (str): The operation id. Returns: dict: Global Operation status and info. https://cloud.google.com/compute/docs/reference/latest/globalOperations/get Raises: ApiNotEnabledError: Returns if the api is not enabled. ApiExecutionError: Returns if the api is not executable. """ try: return self.repository.global_operations.get( project_id, operation_id) except (errors.HttpError, HttpLib2Error) as e: api_not_enabled, details = _api_not_enabled(e) if api_not_enabled: raise api_errors.ApiNotEnabledError(details, e) raise api_errors.ApiExecutionError(project_id, e)
def get_project(self, project_id): """Returns the specified Project resource. Args: project_id (str): The project id. Returns: dict: A Compute Project resource dict. https://cloud.google.com/compute/docs/reference/latest/projects/get """ try: return self.repository.projects.get(project_id) except (errors.HttpError, HttpLib2Error) as e: api_not_enabled, details = _api_not_enabled(e) if api_not_enabled: raise api_errors.ApiNotEnabledError(details, e) raise api_errors.ApiExecutionError(project_id, e)
def is_api_enabled(self, project_id): """Checks if the Compute API is enabled for the specified project. Args: project_id (str): The project id. Returns: bool: True if the API is enabled, else False. """ try: result = self.repository.projects.get(project_id, fields='name') return bool('name' in result) # True if name, otherwise False. except (errors.HttpError, HttpLib2Error) as e: api_not_enabled, _ = _api_not_enabled(e) if api_not_enabled: return False raise api_errors.ApiExecutionError(project_id, e)
def get_group_members(self, group_key): """Get all the members for specified groups. Args: group_key (str): The group's unique id assigned by the Admin API. Returns: list: A list of member objects from the API. Raises: api_errors.ApiExecutionError: If group member retrieval fails. """ try: paged_results = self.repository.members.list(group_key) return api_helpers.flatten_list_results(paged_results, 'members') except (errors.HttpError, HttpLib2Error) as e: raise api_errors.ApiExecutionError(group_key, e)
def get_groups(self, customer_id='my_customer'): """Get all the groups for a given customer_id. A note on customer_id='my_customer'. This is a magic string instead of using the real customer id. See: https://developers.google.com/admin-sdk/directory/v1/guides/manage-groups#get_all_domain_groups Args: customer_id (str): The customer id to scope the request to. Returns: list: A list of group objects returned from the API. Raises: api_errors.ApiExecutionError: If groups retrieval fails. """ try: paged_results = self.repository.groups.list(customer=customer_id) return api_helpers.flatten_list_results(paged_results, 'groups') except (errors.HttpError, HttpLib2Error) as e: raise api_errors.ApiExecutionError('groups', e)
def get_datasets_for_projectid(self, project_id): """Return BigQuery datasets stored in the requested project_id. Args: project_id (str): String representing the project id. Returns: list: A list of datasetReference objects for a given project_id. [{'datasetId': 'dataset-id', 'projectId': 'project-id'}, {...}] """ try: results = self.repository.datasets.list( resource=project_id, fields='datasets/datasetReference,nextPageToken', all=True) flattened = api_helpers.flatten_list_results(results, 'datasets') except (errors.HttpError, HttpLib2Error) as e: raise api_errors.ApiExecutionError(project_id, e) datasets = [result.get('datasetReference') for result in flattened if 'datasetReference' in result] return datasets
def get_app(self, project_id): """Gets information about an application. Args: project_id (str): The id of the project. Returns: dict: The response of retrieving the AppEngine app. """ try: return self.repository.apps.get(project_id) except (errors.HttpError, HttpLib2Error) as e: if isinstance(e, errors.HttpError) and e.resp.status == 404: # TODO: handle error more gracefully # application not found return {} raise api_errors.ApiExecutionError(project_id, e)
def get_service(self, project_id, service_id): """Gets information about a specific service. Args: project_id (str): The id of the project. service_id (str): The id of the service to query. Returns: dict: A Service resource dict for a given project_id and service_id. """ try: return self.repository.app_services.get( project_id, target=service_id) except (errors.HttpError, HttpLib2Error) as e: if isinstance(e, errors.HttpError) and e.resp.status == 404: return {} raise api_errors.ApiExecutionError(project_id, e)
def list_services(self, project_id): """Lists services of a project. Args: project_id (str): The id of the project. Returns: list: A list of Service resource dicts for a project_id. """ try: paged_results = self.repository.app_services.list(project_id) return api_helpers.flatten_list_results(paged_results, 'services') except (errors.HttpError, HttpLib2Error) as e: if isinstance(e, errors.HttpError) and e.resp.status == 404: return [] raise api_errors.ApiExecutionError(project_id, e)
def list_versions(self, project_id, service_id): """Lists versions of a given service. Args: project_id (str): The id of the project. service_id (str): The id of the service to query. Returns: list: A list of Version resource dicts for a given Service. """ try: paged_results = self.repository.service_versions.list( project_id, services_id=service_id) return api_helpers.flatten_list_results(paged_results, 'versions') except (errors.HttpError, HttpLib2Error) as e: if isinstance(e, errors.HttpError) and e.resp.status == 404: return [] raise api_errors.ApiExecutionError(project_id, e)
def get_instance(self, project_id, service_id, version_id, instances_id): """Gets information about a specific instance of a service. Args: project_id (str): The id of the project. service_id (str): The id of the service to query. version_id (str): The id of the version to query. instances_id (str): The id of the instance to query. Returns: dict: An Instance resource dict for a given project_id, service_id and version_id. """ try: return self.repository.version_instances.get( project_id, target=instances_id, services_id=service_id, versions_id=version_id) except (errors.HttpError, HttpLib2Error) as e: if isinstance(e, errors.HttpError) and e.resp.status == 404: return {} raise api_errors.ApiExecutionError(project_id, e)
def list_instances(self, project_id, service_id, version_id): """Lists instances of a given service and version. Args: project_id (str): The id of the project. service_id (str): The id of the service to query. version_id (str): The id of the version to query. Returns: list: A list of Instance resource dicts for a given Version. """ try: paged_results = self.repository.version_instances.list( project_id, services_id=service_id, versions_id=version_id) return api_helpers.flatten_list_results(paged_results, 'instances') except (errors.HttpError, HttpLib2Error) as e: if isinstance(e, errors.HttpError) and e.resp.status == 404: return [] raise api_errors.ApiExecutionError(project_id, e)
def get_project(self, project_id): """Get all the projects from organization. Args: project_id (str): The project id (not project number). Returns: dict: The project response object. Raises: ApiExecutionError: An error has occurred when executing the API. """ try: return self.repository.projects.get(project_id) except (errors.HttpError, HttpLib2Error) as e: raise api_errors.ApiExecutionError(project_id, e)
def get_org_iam_policies(self, resource_name, org_id): """Get all the iam policies of an org. Args: resource_name (str): The resource type. org_id (int): The org id number. Returns: dict: Organization IAM policy for given org_id. https://cloud.google.com/resource-manager/reference/rest/Shared.Types/Policy Raises: ApiExecutionError: An error has occurred when executing the API. """ resource_id = 'organizations/%s' % org_id try: iam_policy = ( self.repository.organizations.get_iam_policy(resource_id)) return {'org_id': org_id, 'iam_policy': iam_policy} except (errors.HttpError, HttpLib2Error) as e: raise api_errors.ApiExecutionError(resource_name, e)
def get_folder(self, folder_name): """Get a folder. Args: folder_name (str): The unique folder name, with the format "folders/{folderId}". Returns: dict: The folder API response. Raises: ApiExecutionError: An error has occurred when executing the API. """ name = self.repository.folders.get_name(folder_name) try: return self.repository.folders.get(name) except (errors.HttpError, HttpLib2Error) as e: raise api_errors.ApiExecutionError(folder_name, e)
def get_folder_iam_policies(self, resource_name, folder_id): """Get all the iam policies of an folder. Args: resource_name (str): The resource name (type). folder_id (int): The folder id. Returns: dict: Folder IAM policy for given folder_id. Raises: ApiExecutionError: An error has occurred when executing the API. """ resource_id = 'folders/%s' % folder_id try: iam_policy = self.repository.folders.get_iam_policy(resource_id) return {'folder_id': folder_id, 'iam_policy': iam_policy} except (errors.HttpError, HttpLib2Error) as e: raise api_errors.ApiExecutionError(resource_name, e)
def get_text_file(self, full_bucket_path): """Gets a text file object as a string. Args: full_bucket_path (str): The full path of the bucket object. Returns: str: The object's content as a string. Raises: HttpError: HttpError is raised if the call to the GCP storage API fails """ bucket, object_name = get_bucket_and_path_from(full_bucket_path) try: return self.repository.objects.download(bucket, object_name) except errors.HttpError as e: LOGGER.error('Unable to download file: %s', e) raise
def get_buckets(self, project_id): """Gets all GCS buckets for a project. Args: project_id (int): The project id for a GCP project. Returns: list: a list of bucket resource dicts. https://cloud.google.com/storage/docs/json_api/v1/buckets Raises: ApiExecutionError: ApiExecutionError is raised if the call to the GCP ClodSQL API fails """ try: paged_results = self.repository.buckets.list(project_id, projection='full') return api_helpers.flatten_list_results(paged_results, 'items') except (errors.HttpError, HttpLib2Error) as e: LOGGER.warn(api_errors.ApiExecutionError(project_id, e)) raise api_errors.ApiExecutionError('buckets', e)
def get_service_accounts(self, project_id): """Get Service Accounts associated with a project. Args: project_id (str): The project ID to get Service Accounts for. Returns: list: List of service accounts associated with the project. """ name = self.repository.projects_serviceaccounts.get_name(project_id) try: paged_results = self.repository.projects_serviceaccounts.list(name) return api_helpers.flatten_list_results(paged_results, 'accounts') except (errors.HttpError, HttpLib2Error) as e: LOGGER.warn(api_errors.ApiExecutionError(name, e)) raise api_errors.ApiExecutionError('serviceAccounts', e)