我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用boto.connect_iam()。
def test_group_users(self): # A very basic test to create a group, a user, add the user # to the group and then delete everything iam = boto.connect_iam() name = 'boto-test-%d' % time.time() username = 'boto-test-user-%d' % time.time() iam.create_group(name) iam.create_user(username) iam.add_user_to_group(name, username) iam.remove_user_from_group(name, username) iam.delete_user(username) iam.delete_group(name)
def include_rds_clusters_by_region(self, region): if not HAS_BOTO3: self.fail_with_error("Working with RDS clusters requires boto3 - please install boto3 and try again", "getting RDS clusters") client = ec2_utils.boto3_inventory_conn('client', 'rds', region, **self.credentials) marker, clusters = '', [] while marker is not None: resp = client.describe_db_clusters(Marker=marker) clusters.extend(resp["DBClusters"]) marker = resp.get('Marker', None) account_id = boto.connect_iam().get_user().arn.split(':')[4] c_dict = {} for c in clusters: # remove these datetime objects as there is no serialisation to json # currently in place and we don't need the data yet if 'EarliestRestorableTime' in c: del c['EarliestRestorableTime'] if 'LatestRestorableTime' in c: del c['LatestRestorableTime'] if self.ec2_instance_filters == {}: matches_filter = True else: matches_filter = False try: # arn:aws:rds:<region>:<account number>:<resourcetype>:<name> tags = client.list_tags_for_resource( ResourceName='arn:aws:rds:' + region + ':' + account_id + ':cluster:' + c['DBClusterIdentifier']) c['Tags'] = tags['TagList'] if self.ec2_instance_filters: for filter_key, filter_values in self.ec2_instance_filters.items(): # get AWS tag key e.g. tag:env will be 'env' tag_name = filter_key.split(":", 1)[1] # Filter values is a list (if you put multiple values for the same tag name) matches_filter = any(d['Key'] == tag_name and d['Value'] in filter_values for d in c['Tags']) if matches_filter: # it matches a filter, so stop looking for further matches break except Exception as e: if e.message.find('DBInstanceNotFound') >= 0: # AWS RDS bug (2016-01-06) means deletion does not fully complete and leave an 'empty' cluster. # Ignore errors when trying to find tags for these pass # ignore empty clusters caused by AWS bug if len(c['DBClusterMembers']) == 0: continue elif matches_filter: c_dict[c['DBClusterIdentifier']] = c self.inventory['db_clusters'] = c_dict
def test_cloudtrail(self): cloudtrail = boto.connect_cloudtrail() # Don't delete existing customer data! res = cloudtrail.describe_trails() if len(res['trailList']): self.fail('A trail already exists on this account!') # Who am I? iam = boto.connect_iam() response = iam.get_user() account_id = response['get_user_response']['get_user_result'] \ ['user']['user_id'] # Setup a new bucket s3 = boto.connect_s3() bucket_name = 'cloudtrail-integ-{0}'.format(time()) policy = DEFAULT_S3_POLICY.replace('<BucketName>', bucket_name)\ .replace('<CustomerAccountID>', account_id)\ .replace('<Prefix>/', '') b = s3.create_bucket(bucket_name) b.set_policy(policy) # Setup CloudTrail cloudtrail.create_trail(trail={'Name': 'test', 'S3BucketName': bucket_name}) cloudtrail.update_trail(trail={'Name': 'test', 'IncludeGlobalServiceEvents': False}) trails = cloudtrail.describe_trails() self.assertEqual('test', trails['trailList'][0]['Name']) self.assertFalse(trails['trailList'][0]['IncludeGlobalServiceEvents']) cloudtrail.start_logging(name='test') status = cloudtrail.get_trail_status(name='test') self.assertTrue(status['IsLogging']) cloudtrail.stop_logging(name='test') status = cloudtrail.get_trail_status(name='test') self.assertFalse(status['IsLogging']) # Clean up cloudtrail.delete_trail(name='test') for key in b.list(): key.delete() s3.delete_bucket(bucket_name)