我们从Python开源项目中,提取了以下32个代码示例,用于说明如何使用botocore.client()。
def get_s3_client(): endpoint_url = os.environ.get("S3_ENDPOINT_URL") s3_client = boto3.client('s3', # region_name='us-east-1', aws_access_key_id=config['STORAGE_ACCESS_KEY_ID'], config=Config(signature_version='s3v4'), aws_secret_access_key=config['STORAGE_SECRET_ACCESS_KEY'], endpoint_url=endpoint_url ) if endpoint_url: try: s3 = boto3.resource('s3', aws_access_key_id=config['STORAGE_ACCESS_KEY_ID'], config=Config(signature_version='s3v4'), aws_secret_access_key=config['STORAGE_SECRET_ACCESS_KEY'], endpoint_url=endpoint_url) s3.create_bucket(Bucket=config['STORAGE_BUCKET_NAME']) bucket = s3.Bucket(config['STORAGE_BUCKET_NAME']) bucket.Acl().put(ACL='public-read') except: # noqa logging.exception('Failed to create the bucket') pass return s3_client
def fake_kms_client(keysize=32): mock_kms_client = MagicMock(__class__=botocore.client.BaseClient) mock_kms_client.generate_data_key.return_value = { 'Plaintext': VALUES['data_keys'][keysize]['plaintext'], 'CiphertextBlob': VALUES['data_keys'][keysize]['encrypted'], 'KeyId': VALUES['arn'] } mock_kms_client.encrypt.return_value = { 'CiphertextBlob': VALUES['data_keys'][keysize]['encrypted'], 'KeyId': VALUES['arn'] } mock_kms_client.decrypt.return_value = { 'Plaintext': VALUES['data_keys'][keysize]['plaintext'], 'KeyId': VALUES['arn'] } return mock_kms_client
def _client(self, key_id): """Returns a Boto3 KMS client for the appropriate region. :param str key_id: KMS CMK ID """ try: region_name = key_id.split(':', 4)[3] if self.default_region is None: self.default_region = region_name except IndexError: if self.default_region is None: raise UnknownRegionError( 'No default region found and no region determinable from key id: {}'.format(key_id) ) region_name = self.default_region self.add_regional_client(region_name) return self._regional_clients[region_name]
def info(auth_token): """Authorize a client for the file uploading. :param auth_token: authentication token to test """ # Verify client, deny access if not verified try: # Get request payload userid = services.get_user_id(auth_token) if userid is None: return Response(status=401) # Make response payload urls = [] for scheme, port in [('http', '80'), ('https', '443')]: for host, path in [(config['STORAGE_BUCKET_NAME'], '')]: urls.extend([ '%s://%s:%s/%s' % (scheme, host, port, path), '%s://%s/%s' % (scheme, host, path), ]) urls = [os.path.join(url, userid) for url in urls] response_payload = { 'prefixes': urls } # Return response payload return json.dumps(response_payload) except Exception as exception: logging.exception('Bad request (info)') return Response(status=400)
def presign(auth_token, url, ownerid=None): """Generates S3 presigned URLs if necessary :param auth_token: authentication token from auth :param ownerid: ownerid for dataset :param url: url to check for sigend URL """ s3 = get_s3_client() try: needs_signed_url = requests.head(url) if needs_signed_url.status_code != 403: return json.dumps({'url': url}) # Verify client, deny access if not verified if ownerid is None: return Response(status=401) if not services.verify(auth_token, ownerid): return Response(status=403) parsed_url = urllib.parse.urlparse(url) bucket = parsed_url.netloc key = parsed_url.path.lstrip('/') # Handle s3 path-style URLs if bucket.endswith('amazonaws.com'): bucket, key = key.split('/', 1) # Make sure file belongs to user (only in case of pkgstore) if (config['STORAGE_BUCKET_NAME'] != bucket) and (ownerid not in url): return Response(status=403) signed_url = s3.generate_presigned_url( ClientMethod='get_object', Params={ 'Bucket': bucket, 'Key': key }, ExpiresIn=3600*24) return json.dumps({'url': signed_url}) except Exception as exception: logging.exception('Bad request') return Response(status=400)
def get_default_client_config(self): """Retrieves the default config for creating clients :rtype: botocore.client.Config :returns: The default client config object when creating clients. If the value is ``None`` then there is no default config object attached to the session. """ return self._client_config
def set_default_client_config(self, client_config): """Sets the default config for creating clients :type client_config: botocore.client.Config :param client_config: The default client config object when creating clients. If the value is ``None`` then there is no default config object attached to the session. """ self._client_config = client_config
def patch(): if getattr(botocore.client, '_datadog_patch', False): return setattr(botocore.client, '_datadog_patch', True) wrapt.wrap_function_wrapper('botocore.client', 'BaseClient._make_api_call', patched_api_call) Pin(service="aws", app="aws", app_type="web").onto(botocore.client.BaseClient)
def unpatch(): if getattr(botocore.client, '_datadog_patch', False): setattr(botocore.client, '_datadog_patch', False) unwrap(botocore.client.BaseClient, '_make_api_call')
def mock_paginator(): return mock.Mock(spec=botocore.client.Paginator)
def mock_ec2(): return mock.Mock(spec=botocore.client)
def get_s3_client(): return boto3.client( 's3', 'us-east-1', config=Config( s3={'addressing_style': 'path'} ) )
def __init__(self, max_tries=30, interval=5): self.s3_client = get_s3_client() self.athena_client = boto3.client( 'athena', region_name='us-east-1' ) self.max_tries = max_tries self.interval = interval
def setUp(self): self.mock_botocore_session_patcher = patch( 'aws_encryption_sdk.key_providers.kms.botocore.session.Session' ) self.mock_botocore_session = self.mock_botocore_session_patcher.start() self.mock_boto3_session_patcher = patch( 'aws_encryption_sdk.key_providers.kms.boto3.session.Session' ) self.mock_boto3_session = self.mock_boto3_session_patcher.start() self.mock_boto3_session_instance = MagicMock() self.mock_boto3_session.return_value = self.mock_boto3_session_instance self.mock_boto3_client_instance = MagicMock() self.mock_boto3_client_instance.__class__ = botocore.client.BaseClient self.mock_boto3_session_instance.client.return_value = self.mock_boto3_client_instance
def test_add_regional_client_new(self): test = KMSMasterKeyProvider() test._regional_clients = {} test.add_regional_client('ex_region_name') self.mock_boto3_session.assert_called_once_with( region_name='ex_region_name', botocore_session=ANY ) self.mock_boto3_session_instance.client.assert_called_once_with('kms') assert test._regional_clients['ex_region_name'] is self.mock_boto3_client_instance
def test_client_valid_region_name(self, mock_add_client): test = KMSMasterKeyProvider() test._regional_clients['us-east-1'] = self.mock_boto3_client_instance client = test._client('arn:aws:kms:us-east-1:222222222222:key/aaaaaaaa-1111-2222-3333-bbbbbbbbbbbb') mock_add_client.assert_called_once_with('us-east-1') assert client is self.mock_boto3_client_instance
def test_client_no_region_name_with_default(self, mock_add_client): test = KMSMasterKeyProvider() test.default_region = sentinel.default_region test._regional_clients[sentinel.default_region] = sentinel.default_client client = test._client('') assert client is sentinel.default_client mock_add_client.assert_called_once_with(sentinel.default_region)
def add_regional_client(self, region_name): """Adds a regional client for the specified region if it does not already exist. :param str region_name: AWS Region ID (ex: us-east-1) """ if region_name not in self._regional_clients: self._regional_clients[region_name] = boto3.session.Session( region_name=region_name, botocore_session=self.config.botocore_session ).client('kms')
def _new_master_key(self, key_id): """Returns a KMSMasterKey for the specified key_id. :param bytes key_id: KMS CMK ID :returns: KMS Master Key based on key_id :rtype: aws_encryption_sdk.key_providers.kms.KMSMasterKey :raises InvalidKeyIdError: if key_id is not a valid KMS CMK ID to which this key provider has access """ _key_id = to_str(key_id) # KMS client requires str, not bytes return KMSMasterKey(config=KMSMasterKeyConfig( key_id=key_id, client=self._client(_key_id) ))
def __init__(self, **kwargs): # pylint: disable=unused-argument """Performs transformations needed for KMS.""" self._key_id = to_str(self.key_id) # KMS client requires str, not bytes self.config.client.meta.config.user_agent_extra = extend_user_agent_suffix( user_agent=self.config.client.meta.config.user_agent_extra, suffix=USER_AGENT_SUFFIX )
def _generate_data_key(self, algorithm, encryption_context=None): """Generates data key and returns plaintext and ciphertext of key. :param algorithm: Algorithm on which to base data key :type algorithm: aws_encryption_sdk.identifiers.Algorithm :param dict encryption_context: Encryption context to pass to KMS :returns: Generated data key :rtype: aws_encryption_sdk.structures.DataKey """ kms_params = { 'KeyId': self._key_id, 'NumberOfBytes': algorithm.kdf_input_len } if encryption_context is not None: kms_params['EncryptionContext'] = encryption_context if self.config.grant_tokens: kms_params['GrantTokens'] = self.config.grant_tokens # Catch any boto3 errors and normalize to expected EncryptKeyError try: response = self.config.client.generate_data_key(**kms_params) plaintext = response['Plaintext'] ciphertext = response['CiphertextBlob'] key_id = response['KeyId'] except (ClientError, KeyError): error_message = 'Master Key {key_id} unable to generate data key'.format(key_id=self._key_id) _LOGGER.exception(error_message) raise GenerateKeyError(error_message) return DataKey( key_provider=MasterKeyInfo( provider_id=self.provider_id, key_info=key_id ), data_key=plaintext, encrypted_data_key=ciphertext )
def _decrypt_data_key(self, encrypted_data_key, algorithm, encryption_context=None): """Decrypts an encrypted data key and returns the plaintext. :param data_key: Encrypted data key :type data_key: aws_encryption_sdk.structures.EncryptedDataKey :type algorithm: `aws_encryption_sdk.identifiers.Algorithm` (not used for KMS) :param dict encryption_context: Encryption context to use in decryption :returns: Decrypted data key :rtype: aws_encryption_sdk.structures.DataKey :raises DecryptKeyError: if Master Key is unable to decrypt data key """ kms_params = { 'CiphertextBlob': encrypted_data_key.encrypted_data_key } if encryption_context: kms_params['EncryptionContext'] = encryption_context if self.config.grant_tokens: kms_params['GrantTokens'] = self.config.grant_tokens # Catch any boto3 errors and normalize to expected DecryptKeyError try: response = self.config.client.decrypt(**kms_params) plaintext = response['Plaintext'] except (ClientError, KeyError): error_message = 'Master Key {key_id} unable to decrypt data key'.format(key_id=self._key_id) _LOGGER.exception(error_message) raise DecryptKeyError(error_message) return DataKey( key_provider=self.key_provider, data_key=plaintext, encrypted_data_key=encrypted_data_key.encrypted_data_key )