我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用netaddr.IPAddress()。
def is_address_in_network(network, address): """ Determine whether the provided address is within a network range. :param network (str): CIDR presentation format. For example, '192.168.1.0/24'. :param address: An individual IPv4 or IPv6 address without a net mask or subnet prefix. For example, '192.168.1.1'. :returns boolean: Flag indicating whether address is in network. """ try: network = netaddr.IPNetwork(network) except (netaddr.core.AddrFormatError, ValueError): raise ValueError("Network (%s) is not in CIDR presentation format" % network) try: address = netaddr.IPAddress(address) except (netaddr.core.AddrFormatError, ValueError): raise ValueError("Address (%s) is not in correct presentation format" % address) if address in network: return True else: return False
def make_very_small_cloud(self, with_cell=False): project_id = self.make_project('project_1', foo='P1', zoo='P2', boo='P3') cloud_id = self.make_cloud(project_id, 'cloud_1', zoo='CL1') region_id = self.make_region( project_id, cloud_id, 'region_1', foo='R1', bar='R2', bax='R3') if with_cell: cell_id = self.make_cell(project_id, cloud_id, region_id, 'cell_1', bar='C2') else: cell_id = None host_id = self.make_host(project_id, cloud_id, region_id, 'www1.example.com', IPAddress(u'10.1.2.101'), 'server', cell_id=cell_id, foo='H1', baz='H3') return project_id, cloud_id, region_id, cell_id, host_id
def test_host_labels_delete(self): cloud_id = self.make_cloud(self.mock_project_id, 'cloud_1') region_id = self.make_region(self.mock_project_id, cloud_id, 'region_1', foo='R1') host_id = self.make_host(self.mock_project_id, cloud_id, region_id, 'www.example.xyz', IPAddress(u'10.1.2.101'), 'server', bar='bar2') _labels = {"labels": ["tom", "jerry", "jones"]} dbapi.hosts_labels_update(self.context, host_id, _labels) host = dbapi.hosts_get_by_id(self.context, host_id) self.assertEqual(sorted(host.labels), sorted(_labels["labels"])) _dlabels = {"labels": ["tom"]} dbapi.hosts_labels_delete(self.context, host_id, _dlabels) host = dbapi.hosts_get_by_id(self.context, host_id) self.assertEqual(host.labels, {"jerry", "jones"})
def test_hosts_get_all_with_filters_noexist(self): project_id = self.make_project('project_1', foo='P1', zoo='P2') cloud_id = self.make_cloud(project_id, 'cloud_1') region_id = self.make_region(project_id, cloud_id, 'region_1', foo='R1') host_id = self.make_host(project_id, cloud_id, region_id, 'www.example.xyz', IPAddress(u'10.1.2.101'), 'server') variables = {"key1": "value1", "key2": "value2"} dbapi.variables_update_by_resource_id( self.context, "hosts", host_id, variables ) filters = { "region_id": 1, "vars": "key1:value5", } res, _ = dbapi.hosts_get_all(self.context, filters, default_pagination) self.assertEqual(len(res), 0)
def test_hosts_create_sets_parent_id(self): project_id = self.make_project('project_1') cloud_id = self.make_cloud(project_id, 'cloud_1') region_id = self.make_region(project_id, cloud_id, 'region_1') parent_id = self.make_host( project_id, cloud_id, region_id, '1.www.example.com', IPAddress(u'10.1.2.101'), 'server' ) child = dbapi.hosts_create( self.context, { 'project_id': project_id, 'cloud_id': cloud_id, 'region_id': region_id, 'hostname': '2.www.example.com', 'ip_address': IPAddress(u'10.1.2.102'), 'device_type': 'server', 'parent_id': parent_id, } ) self.assertEqual(parent_id, child.parent_id)
def test_hosts_update_fails_when_parent_id_set_to_own_id(self): project_id = self.make_project('project_1') cloud_id = self.make_cloud(project_id, 'cloud_1') region_id = self.make_region(project_id, cloud_id, 'region_1') host1 = dbapi.hosts_create( self.context, { 'project_id': project_id, 'cloud_id': cloud_id, 'region_id': region_id, 'hostname': '1.www.example.com', 'ip_address': IPAddress(u'10.1.2.101'), 'device_type': 'server', 'parent_id': None, } ) self.assertRaises( exceptions.BadRequest, dbapi.hosts_update, self.context, host1.id, { 'parent_id': host1.id, } )
def validate(self, value): super(IPField, self).validate(value) if not value and not self.required: return try: if self.mask: self.ip = netaddr.IPNetwork(value) else: self.ip = netaddr.IPAddress(value) except Exception: raise ValidationError(self.invalid_format_message) if not any([self.version & IPv4 > 0 and self.ip.version == 4, self.version & IPv6 > 0 and self.ip.version == 6]): raise ValidationError(self.invalid_version_message) if self.mask: if self.ip.version == 4 and \ not self.min_mask <= self.ip.prefixlen <= self.max_v4_mask: raise ValidationError(self.invalid_mask_message) if self.ip.version == 6 and \ not self.min_mask <= self.ip.prefixlen <= self.max_v6_mask: raise ValidationError(self.invalid_mask_message)
def _create_subnet_with_last_subnet_block(cls, network, ip_version=4): """Derive last subnet CIDR block from tenant CIDR and create the subnet with that derived CIDR """ if ip_version == 4: cidr = netaddr.IPNetwork(CONF.network.project_network_cidr) mask_bits = CONF.network.project_network_mask_bits elif ip_version == 6: cidr = netaddr.IPNetwork(CONF.network.project_network_v6_cidr) mask_bits = CONF.network.project_network_v6_mask_bits subnet_cidr = list(cidr.subnet(mask_bits))[-1] gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1) body = cls.create_subnet(network, gateway=gateway_ip, cidr=subnet_cidr, mask_bits=mask_bits) return body['subnet']
def test_create_load_balancer_missing_vip_address(self): """Test create load balancer with a missing vip_address field,checks for ipversion and actual ip address """ load_balancer = self._create_active_load_balancer( vip_subnet_id=self.subnet['id']) self.addCleanup(self._delete_load_balancer, load_balancer['id']) load_balancer_ip_initial = load_balancer['vip_address'] ip = netaddr.IPAddress(load_balancer_ip_initial) self.assertEqual(ip.version, 4) load_balancer = self._show_load_balancer( load_balancer['id']) load_balancer_final = load_balancer['vip_address'] self.assertEqual(load_balancer_ip_initial, load_balancer_final)
def isIpRangeInUse(api_client, publicIpRange): ''' Check that if any Ip in the IP Range is in use currently ''' vmList = VirtualMachine.list(api_client, zoneid=publicIpRange.zoneid, listall=True) if not vmList: return False for vm in vmList: for nic in vm.nic: publicIpAddresses = PublicIPAddress.list(api_client, associatednetworkid=nic.networkid, listall=True) if validateList(publicIpAddresses)[0] == PASS: for ipaddress in publicIpAddresses: if IPAddress(publicIpRange.startip) <=\ IPAddress(ipaddress.ipaddress) <=\ IPAddress(publicIpRange.endip): return True return False
def obj_to_primitive(obj): """Recursively turn an object into a python primitive. A ZunObject becomes a dict, and anything that implements ObjectListBase becomes a list. """ if isinstance(obj, ObjectListBase): return [obj_to_primitive(x) for x in obj] elif isinstance(obj, ZunObject): result = {} for key in obj.obj_fields: if obj.obj_attr_is_set(key) or key in obj.obj_extra_fields: result[key] = obj_to_primitive(getattr(obj, key)) return result elif isinstance(obj, netaddr.IPAddress): return str(obj) elif isinstance(obj, netaddr.IPNetwork): return str(obj) else: return obj
def get_ip_network(ip_str): """ Try to return the owner of the IP address (based on ips.py) :param str ip_str: The IP address :rtype: str :return: The owner if find else None """ try: ip_addr = netaddr.IPAddress(ip_str) except (netaddr.AddrConversionError, netaddr.AddrFormatError): return None for brand, networks in IPS_NETWORKS.iteritems(): for net in networks: if net.netmask.value & ip_addr.value == net.value: return brand return None
def test_ipnetwork_list_operations_v4(): ip = IPNetwork('192.0.2.16/29') assert len(ip) == 8 ip_list = list(ip) assert len(ip_list) == 8 assert ip_list == [ IPAddress('192.0.2.16'), IPAddress('192.0.2.17'), IPAddress('192.0.2.18'), IPAddress('192.0.2.19'), IPAddress('192.0.2.20'), IPAddress('192.0.2.21'), IPAddress('192.0.2.22'), IPAddress('192.0.2.23'), ]
def test_ipaddress_and_ipnetwork_canonical_sort_order_by_version(): ip_list = [ IPAddress('192.0.2.130'), IPNetwork('192.0.2.128/28'), IPAddress('::'), IPNetwork('192.0.3.0/24'), IPNetwork('192.0.2.0/24'), IPNetwork('fe80::/64'), IPNetwork('172.24/12'), IPAddress('10.0.0.1'), ] random.shuffle(ip_list) ip_list.sort() assert ip_list == [ IPAddress('10.0.0.1'), IPNetwork('172.24.0.0/12'), IPNetwork('192.0.2.0/24'), IPNetwork('192.0.2.128/28'), IPAddress('192.0.2.130'), IPNetwork('192.0.3.0/24'), IPAddress('::'), IPNetwork('fe80::/64'), ]
def test_ipnetwork_slices_v4(): assert list(IPNetwork('192.0.2.0/29')[0:-1]) == [ IPAddress('192.0.2.0'), IPAddress('192.0.2.1'), IPAddress('192.0.2.2'), IPAddress('192.0.2.3'), IPAddress('192.0.2.4'), IPAddress('192.0.2.5'), IPAddress('192.0.2.6'), ] assert list(IPNetwork('192.0.2.0/29')[::-1]) == [ IPAddress('192.0.2.7'), IPAddress('192.0.2.6'), IPAddress('192.0.2.5'), IPAddress('192.0.2.4'), IPAddress('192.0.2.3'), IPAddress('192.0.2.2'), IPAddress('192.0.2.1'), IPAddress('192.0.2.0'), ]
def test_iterhosts_v4(): assert list(IPNetwork('192.0.2.0/29').iter_hosts()) == [ IPAddress('192.0.2.1'), IPAddress('192.0.2.2'), IPAddress('192.0.2.3'), IPAddress('192.0.2.4'), IPAddress('192.0.2.5'), IPAddress('192.0.2.6'), ] assert list(IPNetwork("192.168.0.0/31")) == [ IPAddress('192.168.0.0'), IPAddress('192.168.0.1'), ] assert list(IPNetwork("1234::/128")) == [IPAddress('1234::')] assert list(IPNetwork("1234::/128").iter_hosts()) == [] assert list(IPNetwork("192.168.0.0/31").iter_hosts()) == [IPAddress('192.168.0.0'),IPAddress('192.168.0.1')] assert list(IPNetwork("192.168.0.0/32").iter_hosts()) == [IPAddress('192.168.0.0')]
def test_advanced_comparisons(): assert IPNetwork('192.0.2.0/24') == IPNetwork('192.0.2.112/24') assert IPNetwork('192.0.2.0/24').ip != IPNetwork('192.0.2.112/24').ip assert IPNetwork('192.0.2.0/24').ip < IPNetwork('192.0.2.112/24').ip assert IPNetwork('192.0.2.0/24').cidr == IPNetwork('192.0.2.112/24').cidr assert IPNetwork('192.0.2.0/24') != IPNetwork('192.0.3.0/24') assert IPNetwork('192.0.2.0/24') < IPNetwork('192.0.3.0/24') assert IPAddress('192.0.2.0') != IPNetwork('192.0.2.0/32') assert IPAddress('192.0.2.0') == IPNetwork('192.0.2.0/32')[0] assert IPAddress('192.0.2.0') == IPNetwork('192.0.2.0/32')[-1] assert IPAddress('192.0.2.0') == IPNetwork('192.0.2.0/32')[0] assert IPAddress('192.0.2.0') == IPNetwork('192.0.2.0/32').ip assert IPAddress('192.0.2.0') == IPNetwork('192.0.2.0/32').ip
def test_ipset_adding_and_removing_members_ip_addresses_as_ints(): s1 = IPSet(['10.0.0.0/25']) s1.add('10.0.0.0/24') assert s1 == IPSet(['10.0.0.0/24']) integer1 = int(IPAddress('10.0.0.1')) integer2 = int(IPAddress('fe80::')) integer3 = int(IPAddress('10.0.0.2')) s2 = IPSet([integer1, integer2]) assert s2 == IPSet(['10.0.0.1/32', 'fe80::/128']) s2.add(integer3) assert s2 == IPSet(['10.0.0.1/32', '10.0.0.2/32', 'fe80::/128']) s2.remove(integer2) assert s2 == IPSet(['10.0.0.1/32', '10.0.0.2/32']) s2.update([integer2]) assert s2 == IPSet(['10.0.0.1/32', '10.0.0.2/32', 'fe80::/128'])
def test_iprange_boundaries(): assert list(iter_iprange('192.0.2.0', '192.0.2.7')) == [ IPAddress('192.0.2.0'), IPAddress('192.0.2.1'), IPAddress('192.0.2.2'), IPAddress('192.0.2.3'), IPAddress('192.0.2.4'), IPAddress('192.0.2.5'), IPAddress('192.0.2.6'), IPAddress('192.0.2.7'), ] assert list(iter_iprange('::ffff:192.0.2.0', '::ffff:192.0.2.7')) == [ IPAddress('::ffff:192.0.2.0'), IPAddress('::ffff:192.0.2.1'), IPAddress('::ffff:192.0.2.2'), IPAddress('::ffff:192.0.2.3'), IPAddress('::ffff:192.0.2.4'), IPAddress('::ffff:192.0.2.5'), IPAddress('::ffff:192.0.2.6'), IPAddress('::ffff:192.0.2.7'), ]
def test_iprange_slicing(): iprange = IPRange('192.0.2.1', '192.0.2.254') assert list(iprange[0:3]) == [ IPAddress('192.0.2.1'), IPAddress('192.0.2.2'), IPAddress('192.0.2.3'), ] assert list(iprange[0:10:2]) == [ IPAddress('192.0.2.1'), IPAddress('192.0.2.3'), IPAddress('192.0.2.5'), IPAddress('192.0.2.7'), IPAddress('192.0.2.9'), ] assert list(iprange[0:1024:512]) == [IPAddress('192.0.2.1')]
def test_iter_nmap_range_with_multiple_targets_including_cidr(): assert list(iter_nmap_range('192.168.0.0/29', '192.168.3-5,7.1', 'fe80::1')) == [ IPAddress('192.168.0.0'), IPAddress('192.168.0.1'), IPAddress('192.168.0.2'), IPAddress('192.168.0.3'), IPAddress('192.168.0.4'), IPAddress('192.168.0.5'), IPAddress('192.168.0.6'), IPAddress('192.168.0.7'), IPAddress('192.168.3.1'), IPAddress('192.168.4.1'), IPAddress('192.168.5.1'), IPAddress('192.168.7.1'), IPAddress('fe80::1'), ]
def _get_subnet_dhcp_options_for_port(self, port, ip_version): """Returns the subnet dhcp options for the port. Return the first found DHCP options belong for the port. """ subnets = [ fixed_ip['subnet_id'] for fixed_ip in port['fixed_ips'] if netaddr.IPAddress(fixed_ip['ip_address']).version == ip_version] get_opts = self._nb_ovn.get_subnets_dhcp_options(subnets) if get_opts: if ip_version == const.IP_VERSION_6: # Always try to find a dhcpv6 stateful v6 subnet to return. # This ensures port can get one stateful v6 address when port # has multiple dhcpv6 stateful and stateless subnets. for opts in get_opts: # We are setting ovn_const.DHCPV6_STATELESS_OPT to "true" # in _get_ovn_dhcpv6_opts, so entries in DHCP_Options table # should have unicode type 'true' if they were defined as # dhcpv6 stateless. if opts['options'].get( ovn_const.DHCPV6_STATELESS_OPT) != 'true': return opts return get_opts[0]
def _get_ntp_entity(self, peer_type): ntp_entities = {} command = 'show ntp peers' output = self.device.send_command(command) for line in output.splitlines(): # Skip first two lines and last line of command output if line == "" or '-----' in line or 'Peer IP Address' in line: continue elif IPAddress(len(line.split()[0])).is_unicast: peer_addr = line.split()[0] ntp_entities[peer_addr] = {} else: raise ValueError("Did not correctly find a Peer IP Address") return ntp_entities
def is_ipv6(address): """Determine whether provided address is IPv6 or not.""" try: address = netaddr.IPAddress(address) except netaddr.AddrFormatError: # probably a hostname - so not an address at all! return False return address.version == 6
def _get_for_address(address, key): """Retrieve an attribute of or the physical interface that the IP address provided could be bound to. :param address (str): An individual IPv4 or IPv6 address without a net mask or subnet prefix. For example, '192.168.1.1'. :param key: 'iface' for the physical interface name or an attribute of the configured interface, for example 'netmask'. :returns str: Requested attribute or None if address is not bindable. """ address = netaddr.IPAddress(address) for iface in netifaces.interfaces(): addresses = netifaces.ifaddresses(iface) if address.version == 4 and netifaces.AF_INET in addresses: addr = addresses[netifaces.AF_INET][0]['addr'] netmask = addresses[netifaces.AF_INET][0]['netmask'] network = netaddr.IPNetwork("%s/%s" % (addr, netmask)) cidr = network.cidr if address in cidr: if key == 'iface': return iface else: return addresses[netifaces.AF_INET][0][key] if address.version == 6 and netifaces.AF_INET6 in addresses: for addr in addresses[netifaces.AF_INET6]: if not addr['addr'].startswith('fe80'): network = netaddr.IPNetwork("%s/%s" % (addr['addr'], addr['netmask'])) cidr = network.cidr if address in cidr: if key == 'iface': return iface elif key == 'netmask' and cidr: return str(cidr).split('/')[1] else: return addr[key] return None
def run(self): Analyzer.run(self) data = self.getParam('data', None, 'Data is missing') if self.data_type != 'fqdn' and self.data_type != 'ip': self.error('Invalid data type') if self.allowed_networks is not None: if self.data_type == 'fqdn': address = IPAddress(socket.gethostbyname(data)) else: try: address = IPAddress(data) except Exception as e: self.error("{}".format(e)) if not any(address in IPNetwork(network) for network in self.allowed_networks): self.error('Invalid target: not in any allowed network') scanner_args = { 'url': self.url, 'login': self.login, 'password': self.password } if self.ca_bundle is not None: scanner_args.update({'ca_bundle': self.ca_bundle}) else: scanner_args.update({'insecure': True}) try: scanner = ness6rest.Scanner(**scanner_args) scanner.policy_set(name=self.policy) scanner.scan_add(targets=data, name="cortex scan for " + data) self._run_scan(scanner) results = self._get_scan_results(scanner) self._delete_scan(scanner) except Exception as ex: self.error('Scanner error: %s' % ex) self.report(results)
def _get_for_address(address, key): """Retrieve an attribute of or the physical interface that the IP address provided could be bound to. :param address (str): An individual IPv4 or IPv6 address without a net mask or subnet prefix. For example, '192.168.1.1'. :param key: 'iface' for the physical interface name or an attribute of the configured interface, for example 'netmask'. :returns str: Requested attribute or None if address is not bindable. """ address = netaddr.IPAddress(address) for iface in netifaces.interfaces(): addresses = netifaces.ifaddresses(iface) if address.version == 4 and netifaces.AF_INET in addresses: addr = addresses[netifaces.AF_INET][0]['addr'] netmask = addresses[netifaces.AF_INET][0]['netmask'] network = netaddr.IPNetwork("%s/%s" % (addr, netmask)) cidr = network.cidr if address in cidr: if key == 'iface': return iface else: return addresses[netifaces.AF_INET][0][key] if address.version == 6 and netifaces.AF_INET6 in addresses: for addr in addresses[netifaces.AF_INET6]: network = _get_ipv6_network_from_address(addr) if not network: continue cidr = network.cidr if address in cidr: if key == 'iface': return iface elif key == 'netmask' and cidr: return str(cidr).split('/')[1] else: return addr[key] return None
def is_ip(address): """ Returns True if address is a valid IP address. """ try: # Test to see if already an IPv4/IPv6 address address = netaddr.IPAddress(address) return True except (netaddr.AddrFormatError, ValueError): return False
def ip_addr_version(addr, resolve=True, timeout=dns_default_timeout()): """Determine what IP version an address, CIDR block or hostname represents. When resolving hostnames to an IP, the search order will be A followed by AAAA. The returned tuple is (version, ip), where version is 4, 6 or None of nothing can be determined and ip is the ip address supplied or resolved. """ # Chop out any CIDR suffix. slash_index = addr.rfind('/') if slash_index > 0: try: int(addr[slash_index + 1:]) addr = addr[:slash_index] except ValueError: # Do nothing; will try to resolve if doing that. pass try: return (netaddr.IPAddress(addr).version, addr) except (netaddr.core.AddrFormatError, ValueError): # Don't care, will resolve. pass if not resolve: return (None, None) for ip_version in [4, 6]: resolved = dns_resolve(addr, ip_version=ip_version, timeout=timeout) if resolved is not None: return (ip_addr_version(resolved, resolve=False)[0], resolved) return (None, None) # # Find common IP version between two addresses
def __refresh(self): """ Update the list of local interfaces if needed """ if self.addresses is None \ or datetime.datetime.now() > self.expires: self.addresses = {} if_regex = r'%.*$' # Netifaces returns a very deep structure. for ifhash in [netifaces.ifaddresses(iface) for iface in netifaces.interfaces()]: for afamily in ifhash: for iface in ifhash[afamily]: address = re.sub(if_regex, '', iface["addr"]) try: addr_object = netaddr.IPAddress(address) self.addresses[addr_object] = 1 except netaddr.core.AddrFormatError: # Don't care about things that don't look like IPS. pass self.expires = datetime.datetime.now() \ + datetime.timedelta(seconds=self.refresh)
def __contains__(self, item): """ Determine if item is in the address list """ self.__refresh() item_ip = netaddr.IPAddress(item) return item_ip in self.addresses
def _address(addresses, network): """ Return all addresses in the given network Note: list comprehension vs. netaddr vs. simple """ matched = [] for address in addresses: log.debug("_address: ip {} in network {} ".format(address, network)) if IPAddress(address) in IPNetwork(network): matched.append(address) return matched
def setUp(self): super().setUp() project_id = self.make_project('project_1') cloud_id = self.make_cloud(project_id, 'cloud_1') region_id = self.make_region(project_id, cloud_id, 'region_1') net_device1_id = self.make_network_device( project_id, cloud_id, region_id, 'switch1.example.com', IPAddress('10.1.2.101'), 'switch' ) net_device2_id = self.make_network_device( project_id, cloud_id, region_id, 'switch2.example.com', IPAddress('10.1.2.102'), 'switch', parent_id=net_device1_id ) host1_id = self.make_host( project_id, cloud_id, region_id, 'www1.example.com', IPAddress(u'10.1.2.103'), 'server', parent_id=net_device2_id ) host2_id = self.make_host( project_id, cloud_id, region_id, 'www2.example.com', IPAddress(u'10.1.2.104'), 'container', parent_id=host1_id ) host3_id = self.make_host( project_id, cloud_id, region_id, 'www3.example.com', IPAddress(u'10.1.2.105'), 'server' ) self.parent = net_device1_id self.children = [net_device2_id] self.descendants = [net_device2_id, host1_id, host2_id] self.all = [ net_device1_id, net_device2_id, host1_id, host2_id, host3_id ]
def test_hosts_create_duplicate_raises(self): cloud_id = self.make_cloud(self.mock_project_id, 'cloud_1') region_id = self.make_region(self.mock_project_id, cloud_id, 'region_1') self.make_host(self.mock_project_id, cloud_id, region_id, 'www1.example.com', IPAddress(u'10.1.2.101'), 'server') new_host = {'name': 'www1.example.com', 'region_id': region_id, 'ip_address': IPAddress(u'10.1.2.101'), 'device_type': 'server', 'cloud_id': cloud_id, 'project_id': self.mock_project_id} self.assertRaises(exceptions.DuplicateDevice, dbapi.hosts_create, self.context, new_host)
def test_hosts_update(self): cloud_id = self.make_cloud(self.mock_project_id, 'cloud_1') region_id = self.make_region(self.mock_project_id, cloud_id, 'region_1') host_id = self.make_host(self.mock_project_id, cloud_id, region_id, 'example', IPAddress(u'10.1.2.101'), 'server', bar='bar2') name = "Host_New" res = dbapi.hosts_update(self.context, host_id, {'name': 'Host_New'}) self.assertEqual(res.name, name)
def test_hosts_resolved_vars_no_cells(self): project_id = self.make_project('project_1') cloud_id = self.make_cloud(project_id, 'cloud_1') region_id = self.make_region(project_id, cloud_id, 'region_1', foo='R1') host_id = self.make_host(project_id, cloud_id, region_id, 'www.example.xyz', IPAddress(u'10.1.2.101'), 'server', bar='bar2') host = dbapi.hosts_get_by_id(self.context, host_id) self.assertEqual(host.name, 'www.example.xyz') self.assertEqual(host.resolved, {'bar': 'bar2', 'foo': 'R1'})
def test_host_labels_create(self): cloud_id = self.make_cloud(self.mock_project_id, 'cloud_1') region_id = self.make_region(self.mock_project_id, cloud_id, 'region_1', foo='R1') host_id = self.make_host(self.mock_project_id, cloud_id, region_id, 'www.example.xyz', IPAddress(u'10.1.2.101'), 'server', bar='bar2') labels = {"labels": ["tom", "jerry"]} dbapi.hosts_labels_update(self.context, host_id, labels)
def test_hosts_get_all_with_label_filters(self): cloud_id = self.make_cloud(self.mock_project_id, 'cloud_1') region_id = self.make_region(self.mock_project_id, cloud_id, 'region_1') labels = {"labels": ["compute"]} host1 = self.make_host( self.mock_project_id, cloud_id, region_id, 'www1.example.com', IPAddress(u'10.1.2.101'), 'server', ) dbapi.hosts_labels_update(self.context, host1, labels) self.make_host( self.mock_project_id, cloud_id, region_id, 'www1.example2.com', IPAddress(u'10.1.2.102'), 'server', ) res, _ = dbapi.hosts_get_all(self.context, {"label": "compute"}, default_pagination) self.assertEqual(len(res), 1) self.assertEqual(res[0].name, 'www1.example.com')
def test_hosts_get_with_key_value_filters(self): project_id = self.make_project('project_1', foo='P1', zoo='P2') cloud_id = self.make_cloud(project_id, 'cloud_1') region_id = self.make_region(project_id, cloud_id, 'region_1', foo='R1') host1 = self.make_host(project_id, cloud_id, region_id, 'www.example.xyz', IPAddress(u'10.1.2.101'), 'server') variables = {"key1": "example1", "key2": "Tom"} dbapi.variables_update_by_resource_id( self.context, "hosts", host1, variables ) # Second host with own variables host2 = self.make_host(project_id, cloud_id, region_id, 'www.example2.xyz', IPAddress(u'10.1.2.102'), 'server') variables = {"key1": "example2", "key2": "Tom"} dbapi.variables_update_by_resource_id( self.context, "hosts", host2, variables ) filters = {"vars": "key1:example2"} res, _ = dbapi.hosts_get_all(self.context, filters, default_pagination) self.assertEqual(len(res), 1) self.assertEqual('www.example2.xyz', res[0].name) filters = {"vars": "key2:Tom"} res, _ = dbapi.hosts_get_all(self.context, filters, default_pagination) self.assertEqual(len(res), 2)
def test_hosts_update_sets_parent_id(self): project_id = self.make_project('project_1') cloud_id = self.make_cloud(project_id, 'cloud_1') region_id = self.make_region(project_id, cloud_id, 'region_1') parent_id = self.make_host( project_id, cloud_id, region_id, '1.www.example.com', IPAddress(u'10.1.2.101'), 'server' ) child = dbapi.hosts_create( self.context, { 'project_id': project_id, 'cloud_id': cloud_id, 'region_id': region_id, 'hostname': '2.www.example.com', 'ip_address': IPAddress(u'10.1.2.102'), 'device_type': 'server', 'parent_id': None, } ) self.assertIsNone(child.parent_id) child_update = dbapi.hosts_update( self.context, child.id, { 'parent_id': parent_id, } ) self.assertEqual(parent_id, child_update.parent_id)
def test_hosts_get_all_with_resolved_var_filters(self): project_id = self.make_project('project_1', foo='P1', zoo='P2') cloud_id = self.make_cloud(project_id, 'cloud_1') region_id = self.make_region( project_id, cloud_id, 'region_1', foo='R1') switch_id = self.make_network_device( project_id, cloud_id, region_id, 'switch1.example.com', IPAddress('10.1.2.101'), 'switch', zoo='S1', bar='S2') self.make_host( project_id, cloud_id, region_id, 'www.example.xyz', IPAddress(u'10.1.2.101'), 'server', parent_id=switch_id, key1="value1", key2="value2") self.make_host( project_id, cloud_id, region_id, 'www2.example.xyz', IPAddress(u'10.1.2.102'), 'server', parent_id=switch_id, key1="value-will-not-match", key2="value2") filters = { "region_id": 1, "vars": "key1:value1,zoo:S1,foo:R1", "resolved-values": True, } res, _ = dbapi.hosts_get_all( self.context, filters, default_pagination) self.assertEqual(len(res), 1) self.assertEqual(res[0].name, 'www.example.xyz')