我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用netaddr.EUI。
def get_ipv6_addr_by_EUI64(cidr, mac): """Generate a IPv6 addr by EUI-64 with CIDR and MAC :param str cidr: a IPv6 CIDR :param str mac: a MAC address :return: an IPv6 Address :rtype: netaddr.IPAddress """ # Check if the prefix is IPv4 address is_ipv4 = netaddr.valid_ipv4(cidr) if is_ipv4: msg = "Unable to generate IP address by EUI64 for IPv4 prefix" raise TypeError(msg) try: eui64 = int(netaddr.EUI(mac).eui64()) prefix = netaddr.IPNetwork(cidr) return netaddr.IPAddress(prefix.first + eui64 ^ (1 << 57)) except (ValueError, netaddr.AddrFormatError): raise TypeError('Bad prefix or mac format for generating IPv6 ' 'address by EUI-64: %(prefix)s, %(mac)s:' % {'prefix': cidr, 'mac': mac}) except TypeError: raise TypeError('Bad prefix type for generate IPv6 address by ' 'EUI-64: %s' % cidr)
def genmac(value, prefix='', length=12): ''' deterministically generates a "random" MAC with a configurable prefix ''' # from: http://serverfault.com/questions/40712/what-range-of-mac-addresses-can-i-safely-use-for-my-virtual-machines if prefix == '' : mac_prefix = "0ac04d" # random "cord"-esque # deterministically generate a value h = hashlib.new('sha1') h.update(value) # build/trim MAC mac_string = (mac_prefix + h.hexdigest())[0:length] return netaddr.EUI(mac_string)
def sendWoL(mac, broadcast=findBroadcast()): """Given string mac and broadcast: Turn on computer using WoL This was taken from http://code.activestate.com/recipes/358449-wake-on-lan/. Thanks, Fadly! """ # Cleans and removes delimiters from MAC try: mac = format_mac(EUI(mac), mac_bare) except AddrFormatError: raise ValueError('Incorrect MAC address format') # Pad the synchronization stream. data = ''.join(['FFFFFFFFFFFF', mac * 20]) send_data = b'' # Split up the hex values and pack. for i in range(0, len(data), 2): send_data = b''.join([send_data, pack('B', int(data[i: i + 2], 16))]) # Broadcast it to the LAN. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) sock.sendto(send_data, (broadcast, 7))
def _to_mac_range(val): cidr_parts = val.split("/") prefix = cidr_parts[0] # FIXME(anyone): replace is slow, but this doesn't really # get called ever. Fix maybe? prefix = prefix.replace(':', '') prefix = prefix.replace('-', '') prefix_length = len(prefix) if prefix_length < 6 or prefix_length > 12: raise q_exc.InvalidMacAddressRange(cidr=val) diff = 12 - len(prefix) if len(cidr_parts) > 1: mask = int(cidr_parts[1]) else: mask = 48 - diff * 4 mask_size = 1 << (48 - mask) prefix = "%s%s" % (prefix, "0" * diff) try: cidr = "%s/%s" % (str(netaddr.EUI(prefix)).replace("-", ":"), mask) except netaddr.AddrFormatError: raise q_exc.InvalidMacAddressRange(cidr=val) prefix_int = int(prefix, base=16) return cidr, prefix_int, prefix_int + mask_size
def test_allocate_v6_with_mac_fails_policy_raises(self): cidr = netaddr.IPNetwork("fe80::dead:beef/64") allocation_pool = [{"start": cidr[-4], "end": cidr[-2]}] subnet = dict(allocation_pools=allocation_pool, cidr="fe80::dead:beef/64", ip_version=6, next_auto_assign_ip=0, tenant_id="fake") subnet = {"subnet": subnet} network = dict(name="public", tenant_id="fake", network_plugin="BASE") network = {"network": network} ip_policy = {"exclude": ["fe80::dead:beef/64"]} ip_policy = {"ip_policy": ip_policy} mac = models.MacAddress() mac["address"] = netaddr.EUI("AA:BB:CC:DD:EE:FF") old_override = cfg.CONF.QUARK.v6_allocation_attempts cfg.CONF.set_override('v6_allocation_attempts', 1, 'QUARK') with self._stubs(network, subnet, ip_policy) as (net, sub, ipp): with self.assertRaises(n_exc.IpAddressGenerationFailure): self.ipam.allocate_ip_address(self.context, [], net["id"], 0, 0, subnets=[sub["id"]]) cfg.CONF.set_override('v6_allocation_attempts', old_override, 'QUARK')
def test_update_port_with_security_groups(self, redis_cli): mock_client = mock.MagicMock() redis_cli.return_value = mock_client port_id = str(uuid.uuid4()) device_id = str(uuid.uuid4()) mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF").value security_groups = [str(uuid.uuid4())] payload = {} mock_client.serialize_groups.return_value = payload self.driver.update_port( context=self.context, network_id="public_network", port_id=port_id, device_id=device_id, mac_address=mac_address, security_groups=security_groups) mock_client.serialize_groups.assert_called_once_with(security_groups) mock_client.apply_rules.assert_called_once_with( device_id, mac_address, payload)
def test_delete_port_redis_is_dead(self, sg_cli): device_id = str(uuid.uuid4()) mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF").value mock_client = mock.MagicMock() sg_cli.return_value = mock_client mock_client.delete_vif.side_effect = Exception try: self.driver.delete_port(context=self.context, port_id=2, mac_address=mac_address, device_id=device_id) mock_client.delete_vif.assert_called_once_with( device_id, mac_address) except Exception: # This test fails without the exception handling in # _delete_port_security_groups self.fail("This shouldn't have raised")
def test_apply_rules(self, strict_redis, uuid4): client = sg_client.SecurityGroupsClient() device_id = "device" uuid4.return_value = "uuid" mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF") client.apply_rules(device_id, mac_address.value, []) self.assertTrue(client._client.master.hset.called) redis_key = client.vif_key(device_id, mac_address.value) rule_dict = {"rules": []} client._client.master.hset.assert_any_call( redis_key, sg_client.SECURITY_GROUP_HASH_ATTR, json.dumps(rule_dict)) client._client.master.hset.assert_any_call( redis_key, sg_client.SECURITY_GROUP_ACK, False)
def __init__(self, id=0, name=None, mac_address=None, ip_address=None, connected_ssid=None, inactivity_time=None, rx_packets=None, tx_packets=None, rx_bitrate=None, tx_bitrate=None, signal=None): self.id = id self.name = name self.mac_address = mac_address self.ip_address = ip_address self.connected_ssid = connected_ssid self.inactivity_time = inactivity_time self.rx_packets = rx_packets self.tx_packets = tx_packets self.tx_bitrate = tx_bitrate self.rx_bitrate = rx_bitrate self.signal = signal self.vendor = None try: self.vendor = EUI(mac_address).oui.registration().org # OUI - Organizational Unique Identifier except Exception: pass
def to_global(prefix, mac, project_id): project_hash = netaddr.IPAddress( int(hashlib.sha1(project_id).hexdigest()[:8], 16) << 32) static_num = netaddr.IPAddress(0xff << 24) try: mac_suffix = netaddr.EUI(mac).words[3:] int_addr = int(''.join(['%02x' % i for i in mac_suffix]), 16) mac_addr = netaddr.IPAddress(int_addr) maskIP = netaddr.IPNetwork(prefix).ip return (project_hash ^ static_num ^ mac_addr | maskIP).format() except netaddr.AddrFormatError: raise TypeError(_('Bad mac for to_global_ipv6: %s') % mac) except TypeError: raise TypeError(_('Bad prefix for to_global_ipv6: %s') % prefix) except NameError: raise TypeError(_('Bad project_id for to_global_ipv6: %s') % project_id)
def test_serialization_with_eui_in_different_dialect(self, model): field = fields.MACAddressField() model.mac = netaddr.EUI('78-F8-82-B2-E5-5A', dialect=netaddr.mac_eui48) serialized = field.serialize('mac', model) assert serialized == '78:f8:82:b2:e5:5a'
def test_deserialization(self): field = fields.MACAddressField() value = '78:f8:82:b2:e5:5a' deserialized = field.deserialize(value) assert deserialized == '78:f8:82:b2:e5:5a' assert repr(deserialized) == 'EUI(\'78:f8:82:b2:e5:5a\')' assert deserialized.dialect == netaddr.mac_unix_expanded
def _to_python(self, value): eui = netaddr.EUI(value) eui.dialect = self.default_dialect return eui
def mac(raw): """ Converts a raw string to a standardised MAC Address EUI Format. :param raw: the raw string containing the value of the MAC Address :return: a string with the MAC Address in EUI format Example: .. code-block:: python >>> mac('0123.4567.89ab') u'01:23:45:67:89:AB' Some vendors like Cisco return MAC addresses like a9:c5:2e:7b:6: which is not entirely valid (with respect to EUI48 or EUI64 standards). Therefore we need to stuff with trailing zeros Example >>> mac('a9:c5:2e:7b:6:') u'A9:C5:2E:7B:60:00' If Cisco or other obscure vendors use their own standards, will throw an error and we can fix later, however, still works with weird formats like: >>> mac('123.4567.89ab') u'01:23:45:67:89:AB' >>> mac('23.4567.89ab') u'00:23:45:67:89:AB' """ if raw.endswith(':'): flat_raw = raw.replace(':', '') raw = '{flat_raw}{zeros_stuffed}'.format( flat_raw=flat_raw, zeros_stuffed='0'*(12-len(flat_raw)) ) return py23_compat.text_type(EUI(raw, dialect=_MACFormat))
def get_ipv6_addr_by_EUI64(prefix, mac): """Calculate IPv6 address using EUI-64 specification. This method calculates the IPv6 address using the EUI-64 addressing scheme as explained in rfc2373. :param prefix: IPv6 prefix. :param mac: IEEE 802 48-bit MAC address. :returns: IPv6 address on success. :raises ValueError, TypeError: For any invalid input. .. versionadded:: 1.4 """ # Check if the prefix is an IPv4 address if is_valid_ipv4(prefix): msg = _("Unable to generate IP address by EUI64 for IPv4 prefix") raise ValueError(msg) try: eui64 = int(netaddr.EUI(mac).eui64()) prefix = netaddr.IPNetwork(prefix) return netaddr.IPAddress(prefix.first + eui64 ^ (1 << 57)) except (ValueError, netaddr.AddrFormatError): raise ValueError(_('Bad prefix or mac format for generating IPv6 ' 'address by EUI-64: %(prefix)s, %(mac)s:') % {'prefix': prefix, 'mac': mac}) except TypeError: raise TypeError(_('Bad prefix type for generating IPv6 address by ' 'EUI-64: %s') % prefix)
def slaac(value, query = ''): ''' Get the SLAAC address within given network ''' try: vtype = ipaddr(value, 'type') if vtype == 'address': v = ipaddr(value, 'cidr') elif vtype == 'network': v = ipaddr(value, 'subnet') if ipaddr(value, 'version') != 6: return False value = netaddr.IPNetwork(v) except: return False if not query: return False try: mac = hwaddr(query, alias = 'slaac') eui = netaddr.EUI(mac) except: return False return eui.ipv6(value.network) # ---- HWaddr / MAC address filters ----
def hwaddr(value, query = '', alias = 'hwaddr'): ''' Check if string is a HW/MAC address and filter it ''' query_func_extra_args = { '': ('value',), } query_func_map = { '': _empty_hwaddr_query, 'bare': _bare_query, 'bool': _bool_hwaddr_query, 'int': _int_hwaddr_query, 'cisco': _cisco_query, 'eui48': _win_query, 'linux': _linux_query, 'pgsql': _postgresql_query, 'postgresql': _postgresql_query, 'psql': _postgresql_query, 'unix': _unix_query, 'win': _win_query, } try: v = netaddr.EUI(value) except: if query and query != 'bool': raise errors.AnsibleFilterError(alias + ': not a hardware address: %s' % value) extras = [] for arg in query_func_extra_args.get(query, tuple()): extras.append(locals()[arg]) try: return query_func_map[query](v, *extras) except KeyError: raise errors.AnsibleFilterError(alias + ': unknown filter type: %s' % query) return False
def table_9_test_preparation(self, env, tg_port, sw_port, packet_num, ipgap, offset=0, arp_packet=None): """Prepare ports, packets for table 9 related tests. """ # Configure ARP packet and stream if arp_packet: srcmac = arp_packet[0]['Ether']['src'] arp_packet[0]['Ether']['src'] = str(EUI(EUI(srcmac).value + offset, dialect=mac_unix_expanded)) srcip = arp_packet[1]['ARP']['psrc'] arp_packet[1]['ARP']['psrc'] = str(IPAddress(srcip) + offset) else: raise UIException("ARP packet not supplied") # set admin status of host and switch to Up, wait till operational status is up sw_port_id = int(sw_port.split()[1]) sw_instances = [switch for switch in env.switch[1].node.values() if switch.id != sw_port.split()[0]] sw_instance = [switch for switch in env.switch[1].node.values() if switch.id == sw_port.split()[0]] if not sw_instance: raise UIException("Not found switch id and port pair connection in configuration") env.tg[1].connect_port(tg_port) sw_instance[0].ui.modify_ports(ports=[sw_port_id], adminMode='Up') sw_instance[0].ui.wait_for_port_value_to_change([sw_port_id], 'operationalStatus', "Up") # Prepare and send stream arp_stream = env.tg[1].set_stream(arp_packet, count=packet_num, inter=ipgap, iface=tg_port, arp_sa_increment=(1, packet_num + offset), arp_sip_increment=(1, packet_num + offset)) env.tg[1].start_streams([arp_stream]) time.sleep(ipgap * packet_num) env.tg[1].stop_streams([arp_stream]) return sw_instances
def _port_dict(port, fields=None): res = {"id": port.get("id"), "name": port.get("name"), "network_id": port["network_id"], "tenant_id": port.get("tenant_id"), "mac_address": port.get("mac_address"), "admin_state_up": port.get("admin_state_up"), "status": "ACTIVE", "security_groups": [group.get("id", None) for group in port.get("security_groups", None)], "device_id": port.get("device_id"), "device_owner": port.get("device_owner")} if "mac_address" in res and res["mac_address"]: mac = str(netaddr.EUI(res["mac_address"])).replace('-', ':') res["mac_address"] = mac # NOTE(mdietz): more pythonic key in dict check fails here. Leave as get if port.get("bridge"): res["bridge"] = port["bridge"] # NOTE(ClifHouck): This causes another trip to the DB since tags are # are not eager loaded. According to mdietz this be a small impact on # performance, but if the tag system gets used more on ports, we may # want to eager load the tags. try: t = PORT_TAG_REGISTRY.get_all(port) res.update(t) except Exception as e: # NOTE(morgabra) We really don't want to break port-listing if # this goes sideways here, so we pass. msg = ("Unknown error loading tags for port %s: %s" % (port["id"], e)) LOG.exception(msg) return res
def delete_port(context, id): """Delete a port. : param context: neutron api request context : param id: UUID representing the port to delete. """ LOG.info("delete_port %s for tenant %s" % (id, context.tenant_id)) port = db_api.port_find(context, id=id, scope=db_api.ONE) if not port: raise n_exc.PortNotFound(port_id=id) if 'device_id' in port: # false is weird, but ignore that LOG.info("delete_port %s for tenant %s has device %s" % (id, context.tenant_id, port['device_id'])) backend_key = port["backend_key"] mac_address = netaddr.EUI(port["mac_address"]).value ipam_driver = _get_ipam_driver(port["network"], port=port) ipam_driver.deallocate_mac_address(context, mac_address) ipam_driver.deallocate_ips_by_port( context, port, ipam_reuse_after=CONF.QUARK.ipam_reuse_after) net_driver = _get_net_driver(port["network"], port=port) base_net_driver = _get_net_driver(port["network"]) net_driver.delete_port(context, backend_key, device_id=port["device_id"], mac_address=port["mac_address"], base_net_driver=base_net_driver) with context.session.begin(): db_api.port_delete(context, port)
def test_to_mac_range_cidr_format(self): cidr, first, last = mac_address_ranges._to_mac_range("AA:BB:CC/24") first_mac = str(netaddr.EUI(first, dialect=netaddr.mac_unix)) last_mac = str(netaddr.EUI(last, dialect=netaddr.mac_unix)) self.assertEqual(cidr, "AA:BB:CC:00:00:00/24") self.assertEqual(first_mac, "aa:bb:cc:0:0:0") self.assertEqual(last_mac, "aa:bb:cd:0:0:0")
def test_to_mac_range_just_prefix(self): cidr, first, last = mac_address_ranges._to_mac_range("AA:BB:CC") first_mac = str(netaddr.EUI(first, dialect=netaddr.mac_unix)) last_mac = str(netaddr.EUI(last, dialect=netaddr.mac_unix)) self.assertEqual(cidr, "AA:BB:CC:00:00:00/24") self.assertEqual(first_mac, "aa:bb:cc:0:0:0") self.assertEqual(last_mac, "aa:bb:cd:0:0:0")
def test_to_mac_range_unix_cidr_format(self): cidr, first, last = mac_address_ranges._to_mac_range("AA-BB-CC/24") first_mac = str(netaddr.EUI(first, dialect=netaddr.mac_unix)) last_mac = str(netaddr.EUI(last, dialect=netaddr.mac_unix)) self.assertEqual(cidr, "AA:BB:CC:00:00:00/24") self.assertEqual(first_mac, "aa:bb:cc:0:0:0") self.assertEqual(last_mac, "aa:bb:cd:0:0:0")
def test_to_mac_range_unix_cidr_format_normal_length(self): cidr, first, last = mac_address_ranges._to_mac_range("aabbcc000000/29") first_mac = str(netaddr.EUI(first, dialect=netaddr.mac_unix)) last_mac = str(netaddr.EUI(last, dialect=netaddr.mac_unix)) self.assertEqual(cidr, "AA:BB:CC:00:00:00/29") self.assertEqual(first_mac, "aa:bb:cc:0:0:0") self.assertEqual(last_mac, "aa:bb:cc:8:0:0")
def _build_expected_unicorn_request_body(self, floating_ip_address, ports, actual_body=None): if actual_body: # Since the port order is non-deterministic, we need to ensure # that the order is correct actual_port_ids = [endpoint['port']['uuid'] for endpoint in actual_body['floating_ip']['endpoints']] reordered_ports = [] for port_id in actual_port_ids: for port in ports: if port['id'] == port_id: reordered_ports.append(port) ports = reordered_ports endpoints = [] for port in ports: fixed_ips = [] for fixed_ip in port['fixed_ips']: fixed_ips.append({ 'ip_address': fixed_ip['ip_address'], 'version': self.user_subnet['ip_version'], 'subnet_id': self.user_subnet['id'], 'cidr': self.user_subnet['cidr'], 'address_type': 'fixed' }) port_mac = int(netaddr.EUI(port['mac_address'].replace(':', '-'))) endpoints.append({ 'port': { 'uuid': port['id'], 'name': port['name'], 'network_uuid': port['network_id'], 'mac_address': port_mac, 'device_id': port['device_id'], 'device_owner': port['device_owner'], 'fixed_ip': fixed_ips }, 'private_ip': port['fixed_ips'][0]['ip_address'] }) body = {'public_ip': floating_ip_address, 'endpoints': endpoints} return {'floating_ip': body}
def test_mac_address_ranges(self): mr1_mac = netaddr.EUI("AA:AA:AA:00:00:00") mr1 = {"cidr": "AA:AA:AA/24", "do_not_use": False, "first_address": mr1_mac.value, "last_address": netaddr.EUI("AA:AA:AA:FF:FF:FF").value, "next_auto_assign_mac": mr1_mac.value} with self._fixtures([mr1]): with self.context.session.begin(): ranges = db_api.mac_address_range_find_allocation_counts( self.context) self.assertTrue(ranges[0]["cidr"], mr1["cidr"])
def test_mac_address_ranges_do_not_use_returns_nothing(self): mr1_mac = netaddr.EUI("AA:AA:AA:00:00:00") mr1 = {"cidr": "AA:AA:AA/24", "do_not_use": True, "first_address": mr1_mac.value, "last_address": netaddr.EUI("AA:AA:AA:FF:FF:FF").value, "next_auto_assign_mac": mr1_mac.value} with self._fixtures([mr1]): with self.context.session.begin(): ranges = db_api.mac_address_range_find_allocation_counts( self.context) self.assertTrue(ranges is None)
def test_update_port_with_security_groups_removal(self, redis_cli): mock_client = mock.MagicMock() redis_cli.return_value = mock_client port_id = str(uuid.uuid4()) device_id = str(uuid.uuid4()) mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF").value security_groups = [] self.driver.update_port( context=self.context, network_id="public_network", port_id=port_id, device_id=device_id, mac_address=mac_address, security_groups=security_groups) self.assertEqual(mock_client.serialize_groups.call_count, 0) mock_client.delete_vif_rules.assert_called_once_with( device_id, mac_address)
def test_delete_port(self, sg_cli): device_id = str(uuid.uuid4()) mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF").value mock_client = mock.MagicMock() sg_cli.return_value = mock_client self.driver.delete_port(context=self.context, port_id=2, mac_address=mac_address, device_id=device_id) mock_client.delete_vif.assert_called_once_with( device_id, mac_address)
def test_allocate_v6_with_mac_generates_exceeds_limit_raises(self): subnet6 = dict(cidr="feed::/104", first_ip=self.v6_fip.value, id=1, ip_version=6, ip_policy=None, last_ip=self.v6_lip.value, next_auto_assign_ip=self.v6_fip.value) address = models.IPAddress() address["address"] = self.v46_val address["version"] = 4 address["subnet"] = models.Subnet(cidr="::ffff:0:0/96") address["allocated_at"] = timeutils.utcnow() mac = models.MacAddress() mac["address"] = netaddr.EUI("AA:BB:CC:DD:EE:FF") old_override = cfg.CONF.QUARK.v6_allocation_attempts cfg.CONF.set_override('v6_allocation_attempts', 0, 'QUARK') with self._stubs(subnets=[[(subnet6, 0)]], addresses=[address, None, None]): with self.assertRaises(n_exc.IpAddressGenerationFailure): addr = [] self.ipam.allocate_ip_address(self.context, addr, 0, 0, 0, mac_address=mac) cfg.CONF.set_override('v6_allocation_attempts', old_override, 'QUARK')
def test_allocate_v6_with_mac(self): port_id = "945af340-ed34-4fec-8c87-853a2df492b4" subnet6 = dict(id=1, first_ip=0, last_ip=0, cidr="feed::/104", ip_version=6, next_auto_assign_ip=0, ip_policy=None) subnet6 = models.Subnet(**subnet6) mac = models.MacAddress() mac["address"] = netaddr.EUI("AA:BB:CC:DD:EE:FF") old_override = cfg.CONF.QUARK.v6_allocation_attempts cfg.CONF.set_override('v6_allocation_attempts', 1, 'QUARK') ip_address = netaddr.IPAddress("fe80::") with self._stubs(policies=[], ip_address=ip_address) as ( policy_find, ip_find, ip_create, ip_update): a = self.ipam._allocate_from_v6_subnet(self.context, 0, subnet6, port_id, self.reuse_after, mac_address=mac) self.assertEqual(ip_address.value, a["address"]) # NCP-1548 - leaving this test to show the change from sometimes # creating the IP address to always creating the IP address. self.assertEqual(0, ip_update.call_count) self.assertEqual(1, ip_create.call_count) cfg.CONF.set_override('v6_allocation_attempts', old_override, 'QUARK')
def test_rfc2462_generates_valid_ip(self): mac = netaddr.EUI("AA:BB:CC:DD:EE:FF") cidr = "fe80::/120" ip = quark.ipam.rfc2462_ip(mac, cidr) self.assertEqual(ip, netaddr.IPAddress('fe80::a8bb:ccff:fedd:eeff').value)
def test_v6_generator(self): mac = netaddr.EUI("AA:BB:CC:DD:EE:FF") cidr = "fe80::/120" port_id = "945af340-ed34-4fec-8c87-853a2df492b4" cidr = "fe80::/120" gen = quark.ipam.generate_v6(mac, port_id, cidr) ip = gen.next() self.assertEqual(ip, netaddr.IPAddress('fe80::a8bb:ccff:fedd:eeff').value) ip = gen.next() self.assertEqual(ip, netaddr.IPAddress('fe80::40c9:a95:d83a:2ffa').value)
def test_apply_rules_set_fails_gracefully(self): port_id = 1 mac_address = netaddr.EUI("AA:BB:CC:DD:EE:FF") conn_err = redis.ConnectionError with mock.patch("quark.cache.security_groups_client." "redis_base.ClientBase") as redis_mock: mocked_redis_cli = mock.MagicMock() redis_mock.return_value = mocked_redis_cli client = sg_client.SecurityGroupsClient() mocked_redis_cli.master.hset.side_effect = conn_err with self.assertRaises(q_exc.RedisConnectionFailure): client.apply_rules(port_id, mac_address.value, [])
def rfc2462_ip(mac, cidr): # NOTE(mdietz): see RFC2462 int_val = netaddr.IPNetwork(cidr).value mac = netaddr.EUI(mac) LOG.info("Using RFC2462 method to generate a v6 with MAC %s" % mac) int_val += mac.eui64().value int_val ^= MAGIC_INT return int_val
def deallocate_mac_address(self, context, address, **kwargs): admin_context = context.elevated() mac = db_api.mac_address_find(admin_context, address=address, scope=db_api.ONE) if not mac: raise q_exc.MacAddressNotFound( mac_address_id=address, readable_mac=netaddr.EUI(address)) if (mac["mac_address_range"] is None or mac["mac_address_range"]["do_not_use"]): db_api.mac_address_delete(admin_context, mac) else: db_api.mac_address_update(admin_context, mac, deallocated=True, deallocated_at=timeutils.utcnow())
def vif_key(self, device_id, mac_address): mac = str(netaddr.EUI(mac_address)) # Lower cases and strips hyphens from the mac mac = mac.translate(MAC_TRANS_TABLE, ":-") return "{0}.{1}".format(device_id, mac)
def macGrab(self, packet): """Defines the OUI for a given MAC This function serves as an example, it is not ready for implementation """ try: parsed_mac = netaddr.EUI(packet.addr2) print parsed_mac.oui.registration().org except netaddr.core.NotRegisteredError, e: fields.append('UNKNOWN') ### Move to handler.py
def process_flow_template(self, bridge, flow_template): """Method adds flows into the vswitch based on given flow template and configuration of multistream feature. """ if ('pre_installed_flows' in self._traffic and self._traffic['pre_installed_flows'].lower() == 'yes' and 'multistream' in self._traffic and self._traffic['multistream'] > 0 and 'stream_type' in self._traffic): # multistream feature is enabled and flows should be inserted into OVS # so generate flows based on template and multistream configuration if self._traffic['stream_type'] == 'L2': # iterate through destimation MAC address dst_mac_value = netaddr.EUI(self._traffic['l2']['dstmac']).value for i in range(self._traffic['multistream']): tmp_mac = netaddr.EUI(dst_mac_value + i) tmp_mac.dialect = netaddr.mac_unix_expanded flow_template.update({'dl_dst':tmp_mac}) # optimize flow insertion by usage of cache self._vswitch.add_flow(bridge, flow_template, cache='on') elif self._traffic['stream_type'] == 'L3': # iterate through destimation IP address dst_ip_value = netaddr.IPAddress(self._traffic['l3']['dstip']).value for i in range(self._traffic['multistream']): tmp_ip = netaddr.IPAddress(dst_ip_value + i) flow_template.update({'dl_type':'0x0800', 'nw_dst':tmp_ip}) # optimize flow insertion by usage of cache self._vswitch.add_flow(bridge, flow_template, cache='on') elif self._traffic['stream_type'] == 'L4': # read transport protocol from configuration and iterate through its destination port proto = _PROTO_TCP if self._traffic['l3']['proto'].lower() == 'tcp' else _PROTO_UDP for i in range(self._traffic['multistream']): flow_template.update({'dl_type':'0x0800', 'nw_proto':proto, 'tp_dst':i}) # optimize flow insertion by usage of cache self._vswitch.add_flow(bridge, flow_template, cache='on') else: self._logger.error('Stream type is set to uknown value %s', self._traffic['stream_type']) # insert cached flows into the OVS self._vswitch.add_flow(bridge, [], cache='flush') else: self._vswitch.add_flow(bridge, flow_template)
def slaac(value, query = ''): ''' Get the SLAAC address within given network ''' try: vtype = ipaddr(value, 'type') if vtype == 'address': v = ipaddr(value, 'cidr') elif vtype == 'network': v = ipaddr(value, 'subnet') if v.version != 6: return False value = netaddr.IPNetwork(v) except: return False if not query: return False try: mac = hwaddr(query, alias = 'slaac') eui = netaddr.EUI(mac) except: return False return eui.ipv6(value.network) # ---- HWaddr / MAC address filters ----
def hwaddr(value, query = '', alias = 'hwaddr'): ''' Check if string is a HW/MAC address and filter it ''' query_func_extra_args = { '': ('value',), } query_func_map = { '': _empty_hwaddr_query, 'bare': _bare_query, 'bool': _bool_hwaddr_query, 'cisco': _cisco_query, 'eui48': _win_query, 'linux': _linux_query, 'pgsql': _postgresql_query, 'postgresql': _postgresql_query, 'psql': _postgresql_query, 'unix': _unix_query, 'win': _win_query, } try: v = netaddr.EUI(value) except: if query and query != 'bool': raise errors.AnsibleFilterError(alias + ': not a hardware address: %s' % value) extras = [] for arg in query_func_extra_args.get(query, tuple()): extras.append(locals()[arg]) try: return query_func_map[query](v, *extras) except KeyError: raise errors.AnsibleFilterError(alias + ': unknown filter type: %s' % query) return False
def __init__(self, id=0, ssid=None, client_mac=None, location=None): self.id = id self.ssid = ssid self.client_mac = client_mac self.location = location self.client_org = None try: self.client_org = EUI(self.client_mac).oui.registration().org # OUI - Organizational Unique Identifier except: pass # OUI not registered exception
def __init__(self, id=0, ssid=None, bssid=None, date=None, location=None): self.id = id self.ssid = ssid self.bssid = bssid self.date = date self.location = location self.ap_org = None try: self.ap_org = EUI(self.bssid).oui.registration().org # OUI - Organizational Unique Identifier except: pass
def __init__(self, id=0, ssid=None, client_mac=None, date=None, location=None): self.id = id self.ssid = ssid self.client_mac = client_mac self.date = date self.location = location self.client_org = None try: self.client_org = EUI(self.client_mac).oui.registration().org # OUI - Organizational Unique Identifier except: pass
def get_vendor(mac): if mac != "": maco = EUI(mac) # EUI - Extended Unique Identifier try: return maco.oui.registration().org # OUI - Organizational Unique Identifier except: # OUI not registered exception return None return None
def source(self): """ Source MAC address """ return str(netaddr.EUI(self._src_mac))
def source(self, value): self._src_mac = netaddr.EUI(value).value
def destination(self): """ destination MAC address """ return str(netaddr.EUI(self._dst_mac))
def destination(self, value): self._dst_mac = netaddr.EUI(value).value
def test_traffic_variable(self): tx = self.layer.tx rx = self.layer.rx tx.streams[2].is_enabled = True tx.streams[2].save() utils.send_and_receive(tx, rx, duration=1, save_as='capture.pcap') if utils.is_pypy(): return capture = pyshark.FileCapture('capture.pcap') for num_pkt, pkt in enumerate(capture): self.assertEqual((num_pkt + 1) * 10, int(pkt.eth.len)) src = netaddr.EUI(pkt.eth.src).value self.assertEqual(num_pkt * 0x100, src) dst = netaddr.EUI(pkt.eth.dst).value self.assertEqual(0xffffffffffff - num_pkt * 0x1000000, dst)