我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用boto.vpc()。
def vpc_exists(module, vpc, name, cidr_block, multi): """Returns True or False in regards to the existence of a VPC. When supplied with a CIDR, it will check for matching tags to determine if it is a match otherwise it will assume the VPC does not exist and thus return false. """ matched_vpc = None try: matching_vpcs=vpc.get_all_vpcs(filters={'tag:Name' : name, 'cidr-block' : cidr_block}) except Exception as e: e_msg=boto_exception(e) module.fail_json(msg=e_msg) if len(matching_vpcs) == 1: matched_vpc = matching_vpcs[0] elif len(matching_vpcs) > 1: if multi: module.fail_json(msg='Currently there are %d VPCs that have the same name and ' 'CIDR block you specified. If you would like to create ' 'the VPC anyway please pass True to the multi_ok param.' % len(matching_vpcs)) return matched_vpc
def update_vpc_tags(vpc, module, vpc_obj, tags, name): if tags is None: tags = dict() tags.update({'Name': name}) try: current_tags = dict((t.name, t.value) for t in vpc.get_all_tags(filters={'resource-id': vpc_obj.id})) if cmp(tags, current_tags): if not module.check_mode: vpc.create_tags(vpc_obj.id, tags) return True else: return False except Exception as e: e_msg=boto_exception(e) module.fail_json(msg=e_msg)
def main(): argument_spec = ec2_argument_spec() argument_spec.update( dict( filters = dict(default=None, type='dict') ) ) module = AnsibleModule(argument_spec=argument_spec) if not HAS_BOTO: module.fail_json(msg='boto required for this module') region, ec2_url, aws_connect_params = get_aws_connection_info(module) if region: try: connection = connect_to_aws(boto.vpc, region, **aws_connect_params) except (boto.exception.NoAuthHandlerFound, AnsibleAWSError) as e: module.fail_json(msg=str(e)) else: module.fail_json(msg="region must be specified") list_ec2_vpc_route_tables(connection, module)
def ensure_igw_absent(vpc_conn, vpc_id, check_mode): igws = vpc_conn.get_all_internet_gateways( filters={'attachment.vpc-id': vpc_id}) if not igws: return {'changed': False} if check_mode: return {'changed': True} for igw in igws: try: vpc_conn.detach_internet_gateway(igw.id, vpc_id) vpc_conn.delete_internet_gateway(igw.id) except EC2ResponseError as e: raise AnsibleIGWException( 'Unable to delete Internet Gateway, error: {0}'.format(e)) return {'changed': True}
def main(): argument_spec = ec2_argument_spec() argument_spec.update( dict( filters = dict(default=None, type='dict') ) ) module = AnsibleModule(argument_spec=argument_spec) if not HAS_BOTO: module.fail_json(msg='boto required for this module') region, ec2_url, aws_connect_params = get_aws_connection_info(module) if region: try: connection = connect_to_aws(boto.vpc, region, **aws_connect_params) except (boto.exception.NoAuthHandlerFound, AnsibleAWSError) as e: module.fail_json(msg=str(e)) else: module.fail_json(msg="region must be specified") list_ec2_vpc_subnets(connection, module)
def find_igw(vpc_conn, vpc_id): """ Finds the Internet gateway for the given VPC ID. Raises an AnsibleIgwSearchException if either no IGW can be found, or more than one found for the given VPC. Note that this function is duplicated in other ec2 modules, and should potentially be moved into potentially be moved into a shared module_utils """ igw = vpc_conn.get_all_internet_gateways( filters={'attachment.vpc-id': vpc_id}) if not igw: raise AnsibleIgwSearchException('No IGW found for VPC {0}'. format(vpc_id)) elif len(igw) == 1: return igw[0].id else: raise AnsibleIgwSearchException('Multiple IGWs found for VPC {0}'. format(vpc_id))
def _get_aws_connection(self): # try: # vpc_conn = boto.vpc.connect_to_region( # self.region, # aws_access_key_id=self.aws_access_key, # aws_secret_access_key=self.aws_secret_key # ) # except boto.exception.NoAuthHandlerFound, e: # module.fail_json(msg = str(e)) # return vpc_conn try: return connect_to_aws(boto.vpc, self.region, **self.aws_connect_params) except boto.exception.NoAuthHandlerFound, e: self.module.fail_json(msg=str(e))
def __init__(self, module, id=None, type=None, availability_zone=None, vpc=None, route_table_ids=None, tags=None, region=None, **aws_connect_params ): self.module = module self.id = id self.type = type self.availability_zone = availability_zone self.vpc = vpc self.route_table_ids = route_table_ids self.tags = tags self.region = region self.aws_connect_params = aws_connect_params self.changed = False self.status = 'gone' self.attach_status = 'detached' self.aws_conn = self._get_aws_connection() self.vpn_gw = self._get_vpn_gateway()
def __init__(self, module, id=None, type=None, cgw=None, vpn_gw=None, vpc=None, static_routes_only=None, static_routes=None, tags=None, region=None, **aws_connect_params ): self.module = module self.id = id self.type = type self.cgw = cgw self.vpn_gw = vpn_gw self.vpc = vpc self.static_routes_only = static_routes_only self.static_routes = static_routes self.tags = tags self.region = region self.aws_connect_params = aws_connect_params self.changed = False self.status = 'gone' self.aws_conn = self._get_aws_connection() self.vpn_conn = self._get_vpn_connection()
def _get_vpc_connection(module, region, aws_connect_params): try: return connect_to_aws(boto.vpc, region, **aws_connect_params) except (boto.exception.NoAuthHandlerFound, AnsibleAWSError) as e: module.fail_json(msg=str(e))
def main(): argument_spec = ec2_argument_spec() argument_spec.update( dict( vpc_id = dict(required=True), state = dict(default='present', choices=['present', 'absent']) ) ) module = AnsibleModule( argument_spec=argument_spec, supports_check_mode=True, ) if not HAS_BOTO: module.fail_json(msg='boto is required for this module') region, ec2_url, aws_connect_params = get_aws_connection_info(module) if region: try: connection = connect_to_aws(boto.vpc, region, **aws_connect_params) except (boto.exception.NoAuthHandlerFound, AnsibleAWSError) as e: module.fail_json(msg=str(e)) else: module.fail_json(msg="region must be specified") vpc_id = module.params.get('vpc_id') state = module.params.get('state', 'present') try: if state == 'present': result = ensure_igw_present(connection, vpc_id, check_mode=module.check_mode) elif state == 'absent': result = ensure_igw_absent(connection, vpc_id, check_mode=module.check_mode) except AnsibleIGWException as e: module.fail_json(msg=str(e)) module.exit_json(**result)
def ensure_ok(self): """Create the vpngateway""" if not self.vpn_gw: self._create_vpn_gw() #self._set_tags() #Update tags no matter what (need to make declaritive though, # i.e. clean up / delete ones not in list so only tags remaining are ones explicity passed self._set_tags() if self.vpc and self.id: self._attach_vpn_gw()
def ensure_gone(self): """Destroy the VPN Gateway""" if self.vpc and self.id: self._detach_vpn_gw() if self.vpn_gw: #self.module.fail_json(msg="self.vpn_gw true: Attempting to delete vpngateway") self._delete_vpn_gw()
def _attach_vpn_gw(self): # True if succeeds, exception raised if not result = self.aws_conn.attach_vpn_gateway( self.id, self.vpc ) if result: self.changed = True self.attach_status = 'attached'
def _get_aws_connection(self): try: return connect_to_aws(boto.vpc, self.region, **self.aws_connect_params) except boto.exception.NoAuthHandlerFound, e: self.module.fail_json(msg=str(e))
def lookup_ig(self, name): if name is None: return None try: vpc_conn = boto.vpc.connect_to_region(region_name=self.region, profile_name=self.profile) except Exception as e: raise AnsibleError(e) filters = {'tag:Name': name} gateway = vpc_conn.get_all_internet_gateways(filters=filters) if gateway and gateway[0]: return gateway[0].id.encode('utf-8') return name
def run(self, terms, variables=None, **kwargs): filters = kwargs.get('filters', None) profile = kwargs.get('profile', None) return_facts = kwargs.get('return', None) region = kwargs.get('region', 'us-east-1') if type(return_facts) is str: return_facts = return_facts.split(',') try: connection = boto.vpc.connect_to_region(region_name=region, profile_name=profile) except BotoServerError as e: raise AnsibleError(e) try: all_subnets = connection.get_all_subnets(filters=filters) except BotoServerError as e: raise AnsibleError(e) results = [] d={} for subnet in all_subnets: facts = ec2_vpc_subnet_facts.get_subnet_info(subnet) if return_facts is None: results.append(facts) else: for f in return_facts: try: return_fact = facts[f] except KeyError: return_fact = None d[f]=return_fact results.append(d.copy()) return(results)
def main(): argument_spec = ec2_argument_spec() argument_spec.update( dict( az = dict(default=None, required=False), cidr = dict(default=None, required=True), state = dict(default='present', choices=['present', 'absent']), tags = dict(default=None, required=False, type='dict', aliases=['resource_tags']), vpc_id = dict(default=None, required=True) ) ) module = AnsibleModule(argument_spec=argument_spec, supports_check_mode=True) if not HAS_BOTO: module.fail_json(msg='boto is required for this module') region, ec2_url, aws_connect_params = get_aws_connection_info(module) if region: try: connection = connect_to_aws(boto.vpc, region, **aws_connect_params) except (boto.exception.NoAuthHandlerFound, AnsibleAWSError) as e: module.fail_json(msg=str(e)) else: module.fail_json(msg="region must be specified") vpc_id = module.params.get('vpc_id') tags = module.params.get('tags') cidr = module.params.get('cidr') az = module.params.get('az') state = module.params.get('state') try: if state == 'present': result = ensure_subnet_present(connection, vpc_id, cidr, az, tags, check_mode=module.check_mode) elif state == 'absent': result = ensure_subnet_absent(connection, vpc_id, cidr, check_mode=module.check_mode) except AnsibleVPCSubnetException as e: module.fail_json(msg=str(e)) module.exit_json(**result)
def main(): argument_spec = ec2_argument_spec() argument_spec.update( dict( lookup = dict(default='tag', required=False, choices=['tag', 'id']), propagating_vgw_ids = dict(default=None, required=False, type='list'), route_table_id = dict(default=None, required=False), routes = dict(default=[], required=False, type='list'), state = dict(default='present', choices=['present', 'absent']), subnets = dict(default=None, required=False, type='list'), tags = dict(default=None, required=False, type='dict', aliases=['resource_tags']), vpc_id = dict(default=None, required=True) ) ) module = AnsibleModule(argument_spec=argument_spec, supports_check_mode=True) if not HAS_BOTO: module.fail_json(msg='boto is required for this module') region, ec2_url, aws_connect_params = get_aws_connection_info(module) if region: try: connection = connect_to_aws(boto.vpc, region, **aws_connect_params) except (boto.exception.NoAuthHandlerFound, AnsibleAWSError) as e: module.fail_json(msg=str(e)) else: module.fail_json(msg="region must be specified") lookup = module.params.get('lookup') route_table_id = module.params.get('route_table_id') state = module.params.get('state', 'present') if lookup == 'id' and route_table_id is None: module.fail_json("You must specify route_table_id if lookup is set to id") try: if state == 'present': result = ensure_route_table_present(connection, module) elif state == 'absent': result = ensure_route_table_absent(connection, module) except AnsibleRouteTableException as e: module.fail_json(msg=str(e)) module.exit_json(**result)
def main(): argument_spec = ec2_argument_spec() argument_spec.update(dict( state={ 'required': True, 'choices': ['present', 'absent'] }, id={ 'default': None, 'required': False, 'type': 'str' }, type={ 'default': 'ipsec.1', 'required': False, 'type': 'str' }, availability_zone={ 'default': None, 'required': False, 'type': 'str' }, vpc={ 'default': None, 'required': False, 'type': 'str'}, route_table_ids={ 'default': None, 'required': False, 'type': 'list' }, tags={ 'required': True, 'type': 'dict' } ) ) module = AnsibleModule( argument_spec=argument_spec, ) region, ec2_url, aws_connect_params = get_aws_connection_info(module) state = module.params['state'] id = module.params['id'] type = module.params['type'] availability_zone = module.params['availability_zone'] vpc = module.params['vpc'] route_table_ids = module.params['route_table_ids'] tags = module.params['tags'] if not region: module.fail_json(msg="Region must be specified as a parameter, in EC2_REGION or AWS_REGION environment variables or in boto configuration file") vpn_gw_man = VPNGatewayManager( module, id, type, availability_zone, vpc, route_table_ids, tags, region, **aws_connect_params ) if state == 'present': if route_table_ids and vpn_gw_man.vpn_gw: vpn_gw_man._enable_route_propagation() else: vpn_gw_man.ensure_ok() if state == 'absent': if route_table_ids and vpn_gw_man.vpn_gw: vpn_gw_man._disable_route_propagation() else: vpn_gw_man.ensure_gone() ansible_facts = {'ec2_vpn_gateway': 'info'} ec2_facts_result = dict(changed=vpn_gw_man.changed, vpn_gw=vpn_gw_man.get_info(), ansible_facts=ansible_facts) module.exit_json(**ec2_facts_result) # import module snippets
def main(): argument_spec = ec2_argument_spec() argument_spec.update(dict( state={'required': True, 'choices': ['present', 'absent']}, id={'default': None, 'required': False, 'type': 'str'}, type={'default': 'ipsec.1', 'required': False, 'type': 'str'}, cgw={'default': None, 'required': False, 'type': 'str'}, vpn_gw={'default': None, 'required': False, 'type': 'str'}, vpc={'default': None, 'required': False, 'type': 'str'}, static_routes_only={'default': False, 'required': False, 'type': 'bool'}, static_routes={'default': None, 'required': False, 'type': 'list' }, tags={'required': True, 'type': 'dict'} ) ) module = AnsibleModule( argument_spec=argument_spec, ) region, ec2_url, aws_connect_params = get_aws_connection_info(module) state = module.params['state'] id = module.params['id'] type = module.params['type'] cgw = module.params['cgw'] vpn_gw = module.params['vpn_gw'] vpc = module.params['vpc'] static_routes_only = module.params['static_routes_only'] static_routes = module.params['static_routes'] tags = module.params['tags'] if not region: module.fail_json(msg="Region must be specified as a parameter, in EC2_REGION or AWS_REGION environment variables or in boto configuration file") vpn_conn_man = VPNConnectionManager( module, id, type, cgw, vpn_gw, vpc, static_routes_only, static_routes, tags, region, **aws_connect_params ) if state == 'present': vpn_conn_man.ensure_ok() elif state == 'absent': vpn_conn_man.ensure_gone() ansible_facts = {'ec2_vpn_connection': 'info'} ec2_facts_result = dict(changed=vpn_conn_man.changed, vpn_conn=vpn_conn_man.get_info(), ansible_facts=ansible_facts) module.exit_json(**ec2_facts_result) # import module snippets