我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用boto3.session()。
def __init__(self, client=None, **kwargs): self.stream_id = str(uuid.uuid4()) self.dimensions = [] self.timers = {} self.dimension_stack = [] self.storage_resolution = 60 self.with_dimension('MetricStreamId', self.stream_id) if client: self.client = client else: profile = kwargs.get('Profile') if profile: session = boto3.session.Session(profile_name=profile) self.client = session.client('cloudwatch') else: self.client = boto3.client('cloudwatch')
def create_role(session, role_name, account_number): client = session.client('iam') res = None try: role = client.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(default_trust(str(account_number))) ) client.attach_role_policy( RoleName=role_name, PolicyArn='arn:aws:iam::aws:policy/AdministratorAccess' ) res = role['Role']['Arn'] print "+ Created IAM role: {}".format(res) except ClientError as e: raise e return res
def create_cw_event_trigger(session): client = session.client('dynamodb') rule_arn = [] try: response = client.put_rule( Name='{}'.format(CW_RULE_NAME), ScheduleExpression='cron(0/10 * * * ? *)', Description='schedule ec2 lamabda function to run every 10 minutes.' ) rule_arn = response['RuleArn'] print "+ Created Cloud Watch Rule: ".format(rule_arn) except Exception as e: raise e return rule_arn
def _connect_to_aws_service(self, service_name): """ Connect to the specified AWS service via explicit credentials (shared by the AWS CLI) or an instance role """ service = None region = self.args.aws_region # prefer explicit region vs. CLI config if not region: region = self._get_aws_region_from_config() try: aws = boto3.session.Session(aws_access_key_id=self.aws_credentials['aws_access_key_id'], aws_secret_access_key=self.aws_credentials['aws_secret_access_key'], region_name=region) service = aws.client(service_name) self._log("Connected to AWS {} in region {}".format(service_name.capitalize(), region)) except Exception, err: self._log("Could not connect to AWS {} in region {} using local CLI credentials".format(service_name.capitalize(), region), err=err) try: service = boto3.client(service_name) self._log("Connected to AWS {} in region {}".format(service_name)) except Exception, err: self._log("Could not connect to AWS {} in region {} using an instance role".format(service_name.capitalize(), region), err=err) return service
def populate_ecs_service_params(self, session, cf_params, cluster, elb_name, env, region, listener_port): elb_client = session.client('elbv2', region_name=region) balancer_arn, vpc_id = ApplyECS.get_load_balancer(elb_client, elb_name, cluster, env) listener_arn = ApplyECS.get_elb_listener(elb_client, balancer_arn, port=listener_port) cf_params['vpcid'] = vpc_id cf_params['listenerarn'] = listener_arn response = elb_client.describe_rules(ListenerArn=listener_arn) rules = response['Rules'] existing_priorities = set([rule['Priority'] for rule in rules]) if len(existing_priorities) >= 75: logging.error("Listener %s already has %d rules, cannot add more services" % (listener_arn, len(existing_priorities))) raise Exception("Listener %s already has %d rules, cannot add more services" % (listener_arn, len(existing_priorities))) for i in range(10, 21): if str(i) not in existing_priorities: cf_params['priority'] = str(i) break
def setup_s3_client(job_data): """Creates an S3 client Uses the credentials passed in the event by CodePipeline. These credentials can be used to access the artifact bucket. Args: job_data: The job data structure Returns: An S3 client with the appropriate credentials """ key_id = job_data['artifactCredentials']['accessKeyId'] key_secret = job_data['artifactCredentials']['secretAccessKey'] session_token = job_data['artifactCredentials']['sessionToken'] session = Session( aws_access_key_id=key_id, aws_secret_access_key=key_secret, aws_session_token=session_token) return session.client('s3', config=botocore.client.Config(signature_version='s3v4'))
def create_client( self, access_key=AWSDefaults.CREDS['access_key'], secret_key=AWSDefaults.CREDS['secret_key'], region=AWSDefaults.CREDS['region'] ): if access_key and secret_key and region: self.session = self.create_session( access_key=access_key, secret_key=secret_key, region=region ) return self.session.client(self.aws_service) else: return boto3.client( self.aws_service, region_name=self.metadata['region'] )
def setup_s3_client(job_data): key_id = job_data['artifactCredentials']['accessKeyId'] key_secret = job_data['artifactCredentials']['secretAccessKey'] session_token = job_data['artifactCredentials']['sessionToken'] session = Session(aws_access_key_id=key_id, aws_secret_access_key=key_secret, aws_session_token=session_token) return session.client('s3', config=botocore.client.Config(signature_version='s3v4'))
def __init__(self, s3_staging_dir=None, region_name=None, schema_name='default', poll_interval=1, encryption_option=None, kms_key=None, profile_name=None, converter=None, formatter=None, retry_exceptions=('ThrottlingException', 'TooManyRequestsException'), retry_attempt=5, retry_multiplier=1, retry_max_delay=1800, retry_exponential_base=2, cursor_class=Cursor, **kwargs): if s3_staging_dir: self.s3_staging_dir = s3_staging_dir else: self.s3_staging_dir = os.getenv(self._ENV_S3_STAGING_DIR, None) assert self.s3_staging_dir, 'Required argument `s3_staging_dir` not found.' assert schema_name, 'Required argument `schema_name` not found.' self.region_name = region_name self.schema_name = schema_name self.poll_interval = poll_interval self.encryption_option = encryption_option self.kms_key = kms_key if profile_name: session = Session(profile_name=profile_name, **kwargs) self._client = session.client('athena', region_name=region_name, **kwargs) else: self._client = boto3.client('athena', region_name=region_name, **kwargs) self._converter = converter if converter else TypeConverter() self._formatter = formatter if formatter else ParameterFormatter() self.retry_exceptions = retry_exceptions self.retry_attempt = retry_attempt self.retry_multiplier = retry_multiplier self.retry_max_delay = retry_max_delay self.retry_exponential_base = retry_exponential_base self.cursor_class = cursor_class
def build_table(session, table_name, account_data): client = session.client('dynamodb') try: t = client.create_table(TableName=table_name, KeySchema=[ { 'AttributeName': 'name', 'KeyType': 'HASH' } ], AttributeDefinitions=[ { 'AttributeName': 'name', 'AttributeType': 'S' } ], ProvisionedThroughput={ 'ReadCapacityUnits': 10, 'WriteCapacityUnits': 1 }, ) resource = session.resource('dynamodb') print "+ Created Dynamodb Table: {}... Waiting for table creation to propagate before inserting items".format(table_name) sleep(15) table = resource.Table(table_name) for i in account_data: table.put_item(Item=i) except ClientError as e: raise e return t['TableDescription']['TableName']
def cfn_client(self): if self.__cfn_client is None: session = Session( profile_name = env.get('Profile'), region_name = env.get('Region'), aws_access_key_id = env.get('AccessKeyId'), aws_secret_access_key = env.get('SecretAccessKey') ) self.__cfn_client = session.client('cloudformation') return self.__cfn_client
def cfn_resource(self): if self.__cfn_resource is None: session = Session( profile_name = env.get('Profile'), region_name = env.get('Region'), aws_access_key_id = env.get('AccessKeyId'), aws_secret_access_key = env.get('SecretAccessKey') ) self.__cfn_resource = session.resource('cloudformation') return self.__cfn_resource
def console(self): """ Open AWS Console on your default Web browser. """ import webbrowser session = boto3.session.Session() webbrowser.open('https://%(region)s.console.aws.amazon.com/cloudformation/home?region=%(region)s#/stacks?filter=active' % dict( region = session.region_name ))
def setup_s3_client(): """ :return: Boto3 S3 session. Uses IAM credentials """ session = Session() return session.client('s3', config=botocore.client.Config(signature_version='s3v4'))
def setup(event): # Extract attributes passed in by CodePipeline job_id = event['CodePipeline.job']['id'] job_data = event['CodePipeline.job']['data'] artifact = job_data['inputArtifacts'][0] config = job_data['actionConfiguration']['configuration'] credentials = job_data['artifactCredentials'] from_bucket = artifact['location']['s3Location']['bucketName'] from_key = artifact['location']['s3Location']['objectKey'] from_revision = artifact['revision'] #output_artifact = job_data['outputArtifacts'][0] #to_bucket = output_artifact['location']['s3Location']['bucketName'] #to_key = output_artifact['location']['s3Location']['objectKey'] user_parameters = config['UserParameters'] # Temporary credentials to access CodePipeline artifact in S3 key_id = credentials['accessKeyId'] key_secret = credentials['secretAccessKey'] session_token = credentials['sessionToken'] session = Session(aws_access_key_id=key_id, aws_secret_access_key=key_secret, aws_session_token=session_token) s3 = session.client('s3', config=botocore.client.Config(signature_version='s3v4')) return (job_id, s3, from_bucket, from_key, from_revision, user_parameters)
def get_chartwerk_bucket(): session = Session( region_name=app_settings.AWS_REGION, aws_access_key_id=app_settings.AWS_ACCESS_KEY_ID, aws_secret_access_key=app_settings.AWS_SECRET_ACCESS_KEY ) s3 = session.resource('s3') return s3.Bucket(app_settings.AWS_BUCKET)
def create_client_with_profile(profile_name, region, resource_name='ec2'): """ Create a new boto3 client with a boto3 profile in ~/.aws/credentials Args: profile_name (str): The name of the profile that you have set in your ~/.aws/credentials profile. region (str): The aws region you want to connect to. resource_name (str): Valid aws resource. default=ec2 Basic Usage: >>> client, err_msg = create_client_with_profile('lab01', 'us-west-2') Returns: Tuple (botocore.client.EC2, str) """ client = None err_msg = '' try: session = ( boto3.session.Session( profile_name=profile_name, region_name=region ) ) client = session.client(resource_name) except Exception as e: err_msg = str(e) return client, err_msg
def process_cf_file(self, args): try: cf_params_local = copy.deepcopy(args[0]) cluster = args[1] elb_name_suffix = args[2] env = args[3] filename = args[4] has_ecs_service = args[5] listener_port = args[6] region = args[7] session = boto3.session.Session() if has_ecs_service: elb_name = 'ecs-elb-' + cluster if elb_name_suffix is not None: elb_name = "-".join([elb_name, elb_name_suffix]) self.populate_ecs_service_params(session, cf_params_local, cluster, elb_name, env, region, listener_port) # Skip non-cf files ext = filename.split('.')[-1] if ext != 'template' and ext != 'yml': return cf_client = session.client('cloudformation', region_name=region) name = filename.split('/')[-1].split('.')[0] logging.info("%s: Processing CloudFormation Template" % filename) cf_params_local['name'] = name parameters = [{'ParameterKey': 'name', 'ParameterValue': name}] if name is None or name in filename: with open(filename, 'r') as f_h: try: cf_template = f_h.read() except: logging.exception("%s: Error reading file." % (filename)) self.catfile(filename) raise validate_response = self.validate_template(cf_client, cf_template, filename) service_name = "%s-%s-%s" % (env, name, cluster) if elb_name_suffix is not None: service_name = "-".join([service_name, elb_name_suffix]) cf_command = cf_client.create_stack existing_stack_id = self.find_existing_stack(cf_client, cf_params_local, service_name) if existing_stack_id is not None: cf_command = cf_client.update_stack self.populate_cf_params(cf_params_local, existing_stack_id, filename, parameters, validate_response) logging.info("%s: Updating CloudFormation Stack" % (service_name)) try: cf_response = cf_command(StackName=service_name, TemplateBody=cf_template, Parameters=parameters, Capabilities=["CAPABILITY_IAM"]) creating_stack_id = cf_response['StackId'] stack_status = self.wait_for_stack_creation(cf_client, creating_stack_id, service_name) except botocore.exceptions.ClientError as e: if e.response["Error"]["Message"] == 'No updates are to be performed.': logging.info("%s: No updates to be performed, CF update succeeded." % service_name) else: raise self.q.put("%s Succeeded" % filename) logging.info("%s Succeeded" % filename) except Exception as e: logging.error("%s: Error executing CloudFormation Stack" % filename) logging.exception(e) self.q.put("%s Failed" % filename)