我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用netaddr.mac_unix_expanded()。
def table_9_test_preparation(self, env, tg_port, sw_port, packet_num, ipgap, offset=0, arp_packet=None): """Prepare ports, packets for table 9 related tests. """ # Configure ARP packet and stream if arp_packet: srcmac = arp_packet[0]['Ether']['src'] arp_packet[0]['Ether']['src'] = str(EUI(EUI(srcmac).value + offset, dialect=mac_unix_expanded)) srcip = arp_packet[1]['ARP']['psrc'] arp_packet[1]['ARP']['psrc'] = str(IPAddress(srcip) + offset) else: raise UIException("ARP packet not supplied") # set admin status of host and switch to Up, wait till operational status is up sw_port_id = int(sw_port.split()[1]) sw_instances = [switch for switch in env.switch[1].node.values() if switch.id != sw_port.split()[0]] sw_instance = [switch for switch in env.switch[1].node.values() if switch.id == sw_port.split()[0]] if not sw_instance: raise UIException("Not found switch id and port pair connection in configuration") env.tg[1].connect_port(tg_port) sw_instance[0].ui.modify_ports(ports=[sw_port_id], adminMode='Up') sw_instance[0].ui.wait_for_port_value_to_change([sw_port_id], 'operationalStatus', "Up") # Prepare and send stream arp_stream = env.tg[1].set_stream(arp_packet, count=packet_num, inter=ipgap, iface=tg_port, arp_sa_increment=(1, packet_num + offset), arp_sip_increment=(1, packet_num + offset)) env.tg[1].start_streams([arp_stream]) time.sleep(ipgap * packet_num) env.tg[1].stop_streams([arp_stream]) return sw_instances
def process_flow_template(self, bridge, flow_template): """Method adds flows into the vswitch based on given flow template and configuration of multistream feature. """ if ('pre_installed_flows' in self._traffic and self._traffic['pre_installed_flows'].lower() == 'yes' and 'multistream' in self._traffic and self._traffic['multistream'] > 0 and 'stream_type' in self._traffic): # multistream feature is enabled and flows should be inserted into OVS # so generate flows based on template and multistream configuration if self._traffic['stream_type'] == 'L2': # iterate through destimation MAC address dst_mac_value = netaddr.EUI(self._traffic['l2']['dstmac']).value for i in range(self._traffic['multistream']): tmp_mac = netaddr.EUI(dst_mac_value + i) tmp_mac.dialect = netaddr.mac_unix_expanded flow_template.update({'dl_dst':tmp_mac}) # optimize flow insertion by usage of cache self._vswitch.add_flow(bridge, flow_template, cache='on') elif self._traffic['stream_type'] == 'L3': # iterate through destimation IP address dst_ip_value = netaddr.IPAddress(self._traffic['l3']['dstip']).value for i in range(self._traffic['multistream']): tmp_ip = netaddr.IPAddress(dst_ip_value + i) flow_template.update({'dl_type':'0x0800', 'nw_dst':tmp_ip}) # optimize flow insertion by usage of cache self._vswitch.add_flow(bridge, flow_template, cache='on') elif self._traffic['stream_type'] == 'L4': # read transport protocol from configuration and iterate through its destination port proto = _PROTO_TCP if self._traffic['l3']['proto'].lower() == 'tcp' else _PROTO_UDP for i in range(self._traffic['multistream']): flow_template.update({'dl_type':'0x0800', 'nw_proto':proto, 'tp_dst':i}) # optimize flow insertion by usage of cache self._vswitch.add_flow(bridge, flow_template, cache='on') else: self._logger.error('Stream type is set to uknown value %s', self._traffic['stream_type']) # insert cached flows into the OVS self._vswitch.add_flow(bridge, [], cache='flush') else: self._vswitch.add_flow(bridge, flow_template)
def __init__(self, api, router, port, mac, networks, peer=None, may_exist=False, **columns): self.mac = str(netaddr.EUI(mac, dialect=netaddr.mac_unix_expanded)) self.networks = [str(netaddr.IPNetwork(net)) for net in networks] self.router = router self.port = port self.peer = peer self.may_exist = may_exist self.columns = columns super(LrpAddCommand, self).__init__(api)
def __init__(self, api, router, nat_type, external_ip, logical_ip, logical_port=None, external_mac=None, may_exist=False): if nat_type not in const.NAT_TYPES: raise TypeError("nat_type not in %s" % str(const.NAT_TYPES)) external_ip = str(netaddr.IPAddress(external_ip)) if nat_type == const.NAT_DNAT: logical_ip = str(netaddr.IPAddress(logical_ip)) else: net = netaddr.IPNetwork(logical_ip) logical_ip = str(net.ip if net.prefixlen == 32 else net) if (logical_port is None) != (external_mac is None): msg = "logical_port and external_mac must be passed together" raise TypeError(msg) if logical_port and nat_type != const.NAT_BOTH: msg = "logical_port/external_mac only valid for %s" % ( const.NAT_BOTH,) raise TypeError(msg) if external_mac: external_mac = str( netaddr.EUI(external_mac, dialect=netaddr.mac_unix_expanded)) super(LrNatAddCommand, self).__init__(api) self.router = router self.nat_type = nat_type self.external_ip = external_ip self.logical_ip = logical_ip self.logical_port = logical_port or [] self.external_mac = external_mac or [] self.may_exist = may_exist
def arp_packets_sending(self, env, tg_port, sw_port, packet_num, ipgap, arp_packet=None, offset=0, retry=False): cmd = "match -f 5555 -p 30001 get_rules table 9" sw_t9 = {} sw_instances = self.table_9_test_preparation(env, tg_port, sw_port, ipgap=ipgap, packet_num=packet_num, arp_packet=arp_packet, offset=offset) # wait for table 9 to be populated time.sleep(5) # check table 9 entries assert sw_instances, "No end point ports found connected to the host" for switch in sw_instances: output = switch.ui.cli_send_command(cmd).stdout sw_t9[switch] = len(re.findall(r'^table\s+:\s+9', output, re.MULTILINE)) if not retry: return sw_t9 failed = False for switch, count in sw_t9.items(): if count != packet_num: failed = True break if not failed: return sw_t9 # in case not all entries are learned fast, try add them again arp_packet = deepcopy(arp_packet) srcmac = arp_packet[0]['Ether']['src'] srcip = arp_packet[1]['ARP']['psrc'] count = 2 while failed and count: count -= 1 self.class_logger.info("Adding not learned MAC addresses to the table 9") sw_instances = [switch for switch in env.switch[1].node.values() if switch.id != sw_port.split()[0]] for switch in sw_instances: output = switch.ui.cli_send_command("match -f 5555 -p 30001 get_rules table 9").stdout learned = re.findall(r'ethernet.dst_mac\s=\s((?:[\w]{2}:){5}[\w]{2})', output, re.MULTILINE) to_add = [el for el in [str(EUI(EUI(srcmac).value + ix, dialect=mac_unix_expanded)) for ix in range(packet_num)] if el not in learned] self.class_logger.info("{} entries are missing for switch {}".format(len(to_add), switch.name)) for mac in to_add: diff = EUI(mac).value - EUI(srcmac).value arp_packet[0]['Ether']['src'] = str(EUI(EUI(srcmac).value + diff, dialect=mac_unix_expanded)) arp_packet[1]['ARP']['psrc'] = str(IPAddress(IPAddress(srcip).value + diff)) # Prepare and send stream arp_stream = env.tg[1].set_stream(arp_packet, count=1, iface=tg_port) env.tg[1].send_stream(arp_stream) self.class_logger.info("Verify table#9 entries") for switch in sw_instances: output = switch.ui.cli_send_command(cmd).stdout if packet_num == len(re.findall(r'^table\s+:\s+9', output, re.MULTILINE)): failed = False else: failed = True time.sleep(3) break for switch in sw_instances: output = switch.ui.cli_send_command(cmd).stdout sw_t9[switch] = len(re.findall(r'^table\s+:\s+9', output, re.MULTILINE)) return sw_t9
def _expand_vm_settings(self, key, vm_number): """ Expand VM option with given key for given number of VMs """ tmp_value = self.getValue(key) if isinstance(tmp_value, str): scalar = True master_value = tmp_value tmp_value = [tmp_value] else: scalar = False master_value = tmp_value[0] master_value_str = str(master_value) if master_value_str.find('#') >= 0: self.__dict__[key] = [] for vmindex in range(vm_number): value = master_value_str.replace('#VMINDEX', str(vmindex)) for macro, args, param, _, step in re.findall(_PARSE_PATTERN, value): multi = int(step) if len(step) and int(step) else 1 if macro == '#EVAL': # pylint: disable=eval-used tmp_result = str(eval(param)) elif macro == '#MAC': mac_value = netaddr.EUI(param).value mac = netaddr.EUI(mac_value + vmindex * multi) mac.dialect = netaddr.mac_unix_expanded tmp_result = str(mac) elif macro == '#IP': ip_value = netaddr.IPAddress(param).value tmp_result = str(netaddr.IPAddress(ip_value + vmindex * multi)) else: raise RuntimeError('Unknown configuration macro {} in {}'.format(macro, key)) value = value.replace("{}{}".format(macro, args), tmp_result) # retype value to original type if needed if not isinstance(master_value, str): value = ast.literal_eval(value) self.__dict__[key].append(value) else: for vmindex in range(len(tmp_value), vm_number): self.__dict__[key].append(master_value) if scalar: self.__dict__[key] = self.__dict__[key][0] _LOGGER.debug("Expanding option: %s = %s", key, self.__dict__[key])
def _parse_mac_address_table_output(self, output, ignore_port=None): """MAC Address Table parsing. Parses the output of 'sh mac address-table' command. :param output: the output of the command Mac Address Table ------------------------------------------- Vlan Mac Address Type Ports ---- ----------- -------- ----- All 0100.0ccc.cccc STATIC CPU <snip> All ffff.ffff.ffff STATIC CPU 601 000b.7866.5240 DYNAMIC Gi1/0/48 601 0013.20fe.56b4 DYNAMIC Gi1/0/35 <snip> Total Mac Addresses for this criterion: 59 :param ignore_port: ignore this port :return: list of dicts containing device connection metrics """ lookup = [] lines = [line.strip() for line in output.splitlines()] lines.append('') while len(lines) > 0: line = lines.pop(0) # Table entries will always have a '.' for the MAC address. # If there isn't one it's not a row we care about. if line.find('.') < 0: continue values = [entry for entry in line.split() if entry] # Ignore non-physical ports port_types = self.PORT_NOTATION.values() if all(values[3].lower().find(port.lower()) != 0 for port in port_types): continue # If the ignore_port is specified and is the port in question, # ignore it. ignore_port = self._shorthand_port_notation(ignore_port) if ignore_port and values[3] == ignore_port: continue lookup.append( { 'mac': EUI(values[1], dialect=mac_unix_expanded), 'interface': self._shorthand_port_notation(values[3]) } ) return sorted(lookup, key=lambda k: k['interface'])
def _parse_ipdt_output(self, output): """IPDT output parsing. Parses the output of the `show ip device track` command. :param output: the output of the command IP Device Tracking = Enabled IP Device Tracking Probe Count = 3 IP Device Tracking Probe Interval = 30 IP Device Tracking Probe Delay Interval = 0 ------------------------------------------------------------------- IP Address MAC Address Vlan Interface STATE ------------------------------------------------------------------- 192.168.1.12 6cec.eb68.c86f 601 GigabitEthernet1/0/14 ACTIVE 192.168.1.15 6cec.eb67.836c 601 GigabitEthernet1/0/12 ACTIVE <snip> Total number interfaces enabled: 47 Enabled interfaces: Gi1/0/1, Gi1/0/2, Gi1/0/3, Gi1/0/4, Gi1/0/5, Gi1/0/6, Gi1/0/7, <snip> :return: list of dicts containing device connection metrics """ lookup = [] lines = [line.strip() for line in output.splitlines()] lines.append('') while len(lines) > 0: line = lines.pop(0) # Table entries will always have a '.' for the MAC address. # If there isn't one it's not a row we care about. if line.find('.') < 0: continue values = [entry for entry in line.split() if entry] # Ignore any 'INACTIVE' entries, meaning that there hasn't been # traffic from that MAC Address is has likely been unplugged. if values[4] == 'INACTIVE': continue lookup.append( { 'ip': values[0], 'mac': EUI(values[1], dialect=mac_unix_expanded), 'interface': self._shorthand_port_notation(values[3]) } ) return sorted(lookup, key=lambda k: k['interface'])