我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用boto3.exceptions()。
def add_tags(self): """Add tags to security group. Returns: True: Upon successful completion. """ session = boto3.session.Session(profile_name=self.env, region_name=self.region) resource = session.resource('ec2') group_id = get_security_group_id(self.app_name, self.env, self.region) security_group = resource.SecurityGroup(group_id) try: tag = security_group.create_tags( DryRun=False, Tags=[{ 'Key': 'app_group', 'Value': self.group }, { 'Key': 'app_name', 'Value': self.app_name }]) self.log.debug('Security group has been tagged: %s', tag) except botocore.exceptions.ClientError as error: self.log.warning(error) return True
def update_dns_zone_record(env, zone_id, **kwargs): """Create a Route53 CNAME record in _env_ zone. Args: env (str): Deployment environment. zone_id (str): Route53 zone id. Keyword Args: dns_name (str): FQDN of application's dns entry to add/update. dns_name_aws (str): FQDN of AWS resource dns_ttl (int): DNS time-to-live (ttl) """ client = boto3.Session(profile_name=env).client('route53') response = {} hosted_zone_info = client.get_hosted_zone(Id=zone_id) zone_name = hosted_zone_info['HostedZone']['Name'].rstrip('.') dns_name = kwargs.get('dns_name') if dns_name and dns_name.endswith(zone_name): dns_name_aws = kwargs.get('dns_name_aws') # This is what will be added to DNS dns_json = get_template(template_file='infrastructure/dns_upsert.json.j2', **kwargs) LOG.info('Attempting to create DNS record %s (%s) in Hosted Zone %s (%s)', dns_name, dns_name_aws, zone_id, zone_name) try: response = client.change_resource_record_sets( HostedZoneId=zone_id, ChangeBatch=json.loads(dns_json), ) LOG.info('Upserted DNS record %s (%s) in Hosted Zone %s (%s)', dns_name, dns_name_aws, zone_id, zone_name) except botocore.exceptions.ClientError as error: LOG.info('Error creating DNS record %s (%s) in Hosted Zone %s (%s)', dns_name, dns_name_aws, zone_id, zone_name) LOG.debug(error) else: LOG.info('Skipping creating DNS record %s in non-matching Hosted Zone %s (%s)', dns_name, zone_id, zone_name) LOG.debug('Route53 JSON Response: \n%s', pformat(response))
def add_cidr_rules(self, rules): """Add cidr rules to security group via boto. Args: rules (list): Allowed Security Group ports and protocols. Returns: True: Upon successful completion. Raises: SpinnakerSecurityGroupError: boto3 call failed to add CIDR block to Security Group. """ session = boto3.session.Session(profile_name=self.env, region_name=self.region) client = session.client('ec2') group_id = get_security_group_id(self.app_name, self.env, self.region) for rule in rules: data = { 'DryRun': False, 'GroupId': group_id, 'IpPermissions': [{ 'IpProtocol': rule['protocol'], 'FromPort': rule['start_port'], 'ToPort': rule['end_port'], 'IpRanges': [{ 'CidrIp': rule['app'] }] }] } self.log.debug('Security Group rule: %s', data) try: client.authorize_security_group_ingress(**data) except botocore.exceptions.ClientError as error: if 'InvalidPermission.Duplicate' in str(error): self.log.debug('Duplicate rule exist, that is OK.') else: msg = 'Unable to add cidr rules to {}'.format(rule.get('app')) self.log.error(msg) raise SpinnakerSecurityGroupError(msg) return True
def update_failover_dns_record(env, zone_id, **kwargs): """Create a Failover Route53 alias record in _env_ zone. Args: env (str): Deployment environment. zone_id (str): Route53 zone id. Keyword Args: dns_name (str): FQDN of application's dns entry to add/update. dns_ttl (int): DNS time-to-live (ttl) elb_aws_dns (str): DNS A Record of ELB from AWS elb_dns_zone_id (str): Zone ID of ELB DNS failover_state (str): if the record is primary or secondary primary_region (str): Primary AWS region for DNS """ client = boto3.Session(profile_name=env).client('route53') response = {} hosted_zone_info = client.get_hosted_zone(Id=zone_id) zone_name = hosted_zone_info['HostedZone']['Name'].rstrip('.') dns_name = kwargs.get('dns_name') # Check that the primary record exists failover_state = kwargs.get('failover_state') if failover_state.lower() != 'primary': primary_record = find_existing_record(env, zone_id, dns_name, check_key='Failover', check_value='PRIMARY') if not primary_record: raise PrimaryDNSRecordNotFound("Primary Failover DNS record not found: {}".format(dns_name)) if dns_name and dns_name.endswith(zone_name): dns_json = get_template(template_file='infrastructure/dns_failover_upsert.json.j2', **kwargs) LOG.info('Attempting to create DNS Failover record %s (%s) in Hosted Zone %s (%s)', dns_name, kwargs['elb_aws_dns'], zone_id, zone_name) try: delete_existing_cname(env, zone_id, dns_name) response = client.change_resource_record_sets( HostedZoneId=zone_id, ChangeBatch=json.loads(dns_json), ) LOG.info('Upserted DNS Failover record %s (%s) in Hosted Zone %s (%s)', dns_name, kwargs['elb_aws_dns'], zone_id, zone_name) except botocore.exceptions.ClientError as error: LOG.info('Error creating DNS Failover record %s (%s) in Hosted Zone %s (%s)', dns_name, kwargs['elb_aws_dns'], zone_id, zone_name) LOG.debug(error) else: LOG.info('Skipping creating DNS record %s in non-matching Hosted Zone %s (%s)', dns_name, zone_id, zone_name) LOG.debug('Route53 JSON Response: \n%s', pformat(response))
def get_file_header(meta, filename): """ Gets S3 file object :param meta: file meta object :return: S3 object if no error, otherwise None. Check meta[ u'result_status'] for status. """ logger = getLogger(__name__) akey = SnowflakeS3Util._get_s3_object(meta, filename) try: # HTTP HEAD request akey.load() except botocore.exceptions.ClientError as e: if e.response[u'Error'][u'Code'] == EXPIRED_TOKEN: logger.debug(u"AWS Token expired. Renew and retry") meta[u'result_status'] = ResultStatus.RENEW_TOKEN return None elif e.response[u'Error'][u'Code'] == u'404': logger.debug(u'not found. bucket: %s, path: %s', akey.bucket_name, akey.key) meta[u'result_status'] = ResultStatus.NOT_FOUND_FILE return FileHeader( digest=None, content_length=None, encryption_metadata=None, ) elif e.response[u'Error'][u'Code'] == u'400': logger.debug(u'Bad request, token needs to be renewed: %s. ' u'bucket: %s, path: %s', e.response[u'Error'][u'Message'], akey.bucket_name, akey.key) meta[u'result_status'] = ResultStatus.RENEW_TOKEN return None logger.debug( u"Failed to get metadata for %s, %s: %s", akey.bucket_name, akey.key, e) meta[u'result_status'] = ResultStatus.ERROR return None meta[u'result_status'] = ResultStatus.UPLOADED encryption_metadata = EncryptionMetadata( key=akey.metadata.get(AMZ_KEY), iv=akey.metadata.get(AMZ_IV), matdesc=akey.metadata.get(AMZ_MATDESC), ) if akey.metadata.get(AMZ_KEY) else None return FileHeader( digest=akey.metadata.get(SFC_DIGEST), content_length=akey.content_length, encryption_metadata=encryption_metadata )
def _native_download_file(meta, full_dst_file_name, max_concurrency): logger = getLogger(__name__) try: akey = SnowflakeS3Util._get_s3_object(meta, meta[u'src_file_name']) akey.download_file( full_dst_file_name, Callback=meta[u'get_callback']( meta[u'src_file_name'], meta[u'src_file_size'], output_stream=meta[u'get_callback_output_stream']) if meta[u'get_callback'] else None, Config=TransferConfig( multipart_threshold=SnowflakeS3Util.DATA_SIZE_THRESHOLD, max_concurrency=max_concurrency, num_download_attempts=10, ) ) meta[u'result_status'] = ResultStatus.DOWNLOADED except botocore.exceptions.ClientError as err: if err.response[u'Error'][u'Code'] == EXPIRED_TOKEN: meta[u'result_status'] = ResultStatus.RENEW_TOKEN else: logger.debug( u"Failed to download a file: %s, err: %s", full_dst_file_name, err, exc_info=True) raise err except RetriesExceededError as err: meta[u'result_status'] = ResultStatus.NEED_RETRY meta[u'last_error'] = err except OpenSSL.SSL.SysCallError as err: if err.args[0] not in ( ERRORNO_WSAECONNABORTED, errno.ECONNRESET, errno.ETIMEDOUT, errno.EPIPE, -1): raise err meta[u'last_error'] = err if err.args[0] == ERRORNO_WSAECONNABORTED: # connection was disconnected by S3 # because of too many connections. retry with # less concurrency to mitigate it meta[u'result_status'] = ResultStatus.NEED_RETRY_WITH_LOWER_CONCURRENCY else: meta[u'result_status'] = ResultStatus.NEED_RETRY