我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用netaddr.valid_ipv6()。
def __init__(self, listen_info, handle=None, backlog=None, spawn='default', **ssl_args): assert backlog is None assert spawn == 'default' if netaddr.valid_ipv6(listen_info[0]): self.server = eventlet.listen(listen_info, family=socket.AF_INET6) elif os.path.isdir(os.path.dirname(listen_info[0])): # Case for Unix domain socket self.server = eventlet.listen(listen_info[0], family=socket.AF_UNIX) else: self.server = eventlet.listen(listen_info) if ssl_args: def wrap_and_handle(sock, addr): ssl_args.setdefault('server_side', True) handle(ssl.wrap_socket(sock, **ssl_args), addr) self.handle = wrap_and_handle else: self.handle = handle
def serialize(self): if netaddr.valid_ipv6(self.ip_addr): # Sets Peer IP Address family bit to IPv6 self.type |= self.IP_ADDR_FAMILY_BIT ip_addr = ip.text_to_bin(self.ip_addr) if self.type & self.AS_NUMBER_SIZE_BIT or self.as_num > 0xffff: # Four octet AS number self.type |= self.AS_NUMBER_SIZE_BIT as_num = struct.pack('!I', self.as_num) else: # Two octet AS number as_num = struct.pack('!H', self.as_num) buf = struct.pack(self._HEADER_FMT, self.type, addrconv.ipv4.text_to_bin(self.bgp_id)) return buf + ip_addr + as_num
def serialize(self): # fixup if (netaddr.valid_ipv4(self.peer_ip) and netaddr.valid_ipv4(self.local_ip)): self.afi = self.AFI_IPv4 elif (netaddr.valid_ipv6(self.peer_ip) and netaddr.valid_ipv6(self.local_ip)): self.afi = self.AFI_IPv6 else: raise ValueError( 'peer_ip and local_ip must be the same address family: ' 'peer_ip=%s, local_ip=%s' % (self.peer_ip, self.local_ip)) buf = struct.pack(self._HEADER_FMT, self.peer_as, self.local_as, self.if_index, self.afi) buf += ip.text_to_bin(self.peer_ip) buf += ip.text_to_bin(self.local_ip) buf += struct.pack(self._STATES_FMT, self.old_state, self.new_state) return buf
def serialize(self): # fixup if (netaddr.valid_ipv4(self.peer_ip) and netaddr.valid_ipv4(self.local_ip)): self.afi = self.AFI_IPv4 elif (netaddr.valid_ipv6(self.peer_ip) and netaddr.valid_ipv6(self.local_ip)): self.afi = self.AFI_IPv6 else: raise ValueError( 'peer_ip and local_ip must be the same address family: ' 'peer_ip=%s, local_ip=%s' % (self.peer_ip, self.local_ip)) buf = struct.pack(self._HEADER_FMT, self.peer_as, self.local_as, self.if_index, self.afi) buf += ip.text_to_bin(self.peer_ip) buf += ip.text_to_bin(self.local_ip) buf += self.bgp_message.serialize() return buf
def serialize(self): if ip.valid_ipv4(self.prefix): self.family = socket.AF_INET # fixup prefix_addr, prefix_num = self.prefix.split('/') body_bin = struct.pack( self._IPV4_BODY_FMT, addrconv.ipv4.text_to_bin(prefix_addr), int(prefix_num)) elif ip.valid_ipv6(self.prefix): self.family = socket.AF_INET6 # fixup prefix_addr, prefix_num = self.prefix.split('/') body_bin = struct.pack( self._IPV6_BODY_FMT, addrconv.ipv6.text_to_bin(prefix_addr), int(prefix_num)) else: raise ValueError('Invalid prefix: %s' % self.prefix) return struct.pack(self._FAMILY_FMT, self.family) + body_bin
def serialize(self): # fixup if ip.valid_ipv4(self.prefix): self.family = socket.AF_INET elif ip.valid_ipv6(self.prefix): self.family = socket.AF_INET6 else: raise ValueError('Invalid prefix: %s' % self.prefix) buf = struct.pack(self._FAMILY_FMT, self.family) buf += _serialize_ip_prefix(self.prefix) buf += struct.pack(self._METRIC_FMT, self.metric) return buf + _serialize_nexthops(self.nexthops)
def validate(value): if isinstance(value, six.string_types) and netaddr.valid_ipv6(value): return value raise ValueError(_("Expected IPv6 String, got '%s'") % value)
def is_valid_ipv6(address): """Verify that address represents a valid IPv6 address. :param address: Value to verify :type address: string :returns: bool .. versionadded:: 1.1 """ try: return netaddr.valid_ipv6(address) except netaddr.AddrFormatError: return False
def build_base_url(protocol, host, port): proto_str = '%s://' % protocol host_str = '[%s]' % host if netaddr.valid_ipv6(host) else host port_str = '' if port is None else ':%d' % port return proto_str + host_str + port_str
def netlist(nets): v4nets = [] v6nets = [] for net in nets: ipNetwork = netaddr.IPNetwork(net) parts = str(ipNetwork).split("/") ip = parts[0] mask = parts[1] if netaddr.valid_ipv4(ip) and int(mask) <= 32: v4nets.append(ipNetwork) elif netaddr.valid_ipv6(ip) and int(mask) <= 128: v6nets.append(ipNetwork) return v4nets, v6nets
def ipset(nets): v4nets = netaddr.IPSet() v6nets = netaddr.IPSet() for net in nets: ipNetwork = netaddr.IPNetwork(net) parts = str(ipNetwork).split("/") ip = parts[0] mask = parts[1] if netaddr.valid_ipv4(ip) and int(mask) <= 32: v4nets.add(ipNetwork) elif netaddr.valid_ipv6(ip) and int(mask) <= 128: v6nets.add(ipNetwork) return v4nets, v6nets
def valid_ip_address(addr): if not netaddr.valid_ipv4(addr) and not netaddr.valid_ipv6(addr): return False return True
def _check_rf_and_normalize(prefix): """ check prefix's route_family and if the address is IPv6 address, return IPv6 route_family and normalized IPv6 address. If the address is IPv4 address, return IPv4 route_family and the prefix itself. """ ip, masklen = prefix.split('/') if netaddr.valid_ipv6(ip): # normalize IPv6 address ipv6_prefix = str(netaddr.IPAddress(ip)) + '/' + masklen return vrfs.VRF_RF_IPV6, ipv6_prefix else: return vrfs.VRF_RF_IPV4, prefix
def validate_ip_addr(ip_addr): if netaddr.valid_ipv4(ip_addr): return lib_consts.IP_VERSION_4 elif netaddr.valid_ipv6(ip_addr): return lib_consts.IP_VERSION_6 else: raise bgp_driver_exc.InvalidParamType(param=ip_addr, param_type='ip-address')
def is_ip(self, ip_addr=None): """ Return true if valid IP address return false if invalid IP address :param ip_addr: optional IP to pass. Takes from root class if not specified >>> from ipinformation import IPInformation >>> print IPInformation(ip_address='8.8.8.8').is_ip() True >>> print IPInformation(ip_address='NotAnIP').is_ip() False """ if not ip_addr: ip_addr = self.ip_address valid = True if netaddr.valid_ipv4( ip_addr ): #IPv4 Address if not re.match( valid_ip_regex, ip_addr ): valid = False elif netaddr.valid_ipv6( ip_addr ): pass else: # print '"%s" is not a valid IP Address.' %ip_addr valid = False return valid
def validate_ip_addr(addr, version=None): """ Validates that an IP address is valid. Returns true if valid, false if not. Version can be "4", "6", None for "IPv4", "IPv6", or "either" respectively. """ if version == 4: return netaddr.valid_ipv4(addr) elif version == 6: return netaddr.valid_ipv6(addr) else: return netaddr.valid_ipv4(addr) or netaddr.valid_ipv6(addr)
def valid_ipv6(addr, flags=0): """ Wrapper function of "netaddr.valid_ipv6()". The function extends "netaddr.valid_ipv6()" to enable to validate IPv4 network address in "xxxx:xxxx:xxxx::/xx" format. :param addr: IP address to be validated. :param flags: See the "netaddr.valid_ipv6()" docs for details. :return: True is valid. False otherwise. """ return _valid_ip(netaddr.valid_ipv6, 128, addr, flags)
def next_hop(self, addr): if not netaddr.valid_ipv4(addr) and not netaddr.valid_ipv6(addr): raise ValueError('Invalid address for next_hop: %s' % addr) self._next_hop = addr self.next_hop_list[0] = addr
def next_hop_list(self, addr_list): if not isinstance(addr_list, (list, tuple)): addr_list = [addr_list] for addr in addr_list: if not netaddr.valid_ipv4(addr) and not netaddr.valid_ipv6(addr): raise ValueError('Invalid address for next_hop: %s' % addr) self._next_hop = addr_list[0] self._next_hop_list = addr_list
def _serialize_ip_prefix(prefix): if ip.valid_ipv4(prefix): prefix_addr, prefix_num = prefix.split('/') return bgp.IPAddrPrefix(int(prefix_num), prefix_addr).serialize() elif ip.valid_ipv6(prefix): prefix_addr, prefix_num = prefix.split('/') return IPv6Prefix(int(prefix_num), prefix_addr).serialize() else: raise ValueError('Invalid prefix: %s' % prefix)
def __init__(self, ifindex, ifc_flags, family, prefix, dest): super(_ZebraInterfaceAddress, self).__init__() self.ifindex = ifindex self.ifc_flags = ifc_flags self.family = family if isinstance(prefix, (IPv4Prefix, IPv6Prefix)): prefix = prefix.prefix self.prefix = prefix assert netaddr.valid_ipv4(dest) or netaddr.valid_ipv6(dest) self.dest = dest
def serialize(self): if ip.valid_ipv4(self.prefix): self.family = socket.AF_INET # fixup prefix_addr, prefix_num = self.prefix.split('/') body_bin = struct.pack( self._IPV4_BODY_FMT, addrconv.ipv4.text_to_bin(prefix_addr), int(prefix_num), addrconv.ipv4.text_to_bin(self.dest)) elif ip.valid_ipv6(self.prefix): self.family = socket.AF_INET6 # fixup prefix_addr, prefix_num = self.prefix.split('/') body_bin = struct.pack( self._IPV6_BODY_FMT, addrconv.ipv6.text_to_bin(prefix_addr), int(prefix_num), addrconv.ipv6.text_to_bin(self.dest)) else: raise ValueError( 'Invalid address family for prefix=%s and dest=%s' % (self.prefix, self.dest)) buf = struct.pack(self._HEADER_FMT, self.ifindex, self.ifc_flags, self.family) return buf + body_bin
def __init__(self, addr, metric=None, nexthops=None): super(_ZebraIPNexthopLookup, self).__init__() assert netaddr.valid_ipv4(addr) or netaddr.valid_ipv6(addr) self.addr = addr self.metric = metric nexthops = nexthops or [] for nexthop in nexthops: assert isinstance(nexthop, _NextHop) self.nexthops = nexthops
def __init__(self, prefix, metric=None, nexthops=None): super(_ZebraIPImportLookup, self).__init__() assert netaddr.valid_ipv4(prefix) or netaddr.valid_ipv6(prefix) self.prefix = prefix self.metric = metric nexthops = nexthops or [] for nexthop in nexthops: assert isinstance(nexthop, _NextHop) self.nexthops = nexthops
def __init__(self, addr, distance, metric, nexthops=None): super(_ZebraIPNexthopLookupMRib, self).__init__() assert netaddr.valid_ipv4(addr) or netaddr.valid_ipv6(addr) self.addr = addr self.distance = distance self.metric = metric nexthops = nexthops or [] for nexthop in nexthops: assert isinstance(nexthop, _NextHop) self.nexthops = nexthops
def is_valid_ipv6(ipv6): """Returns True if given `ipv6` is a valid IPv6 address """ return netaddr.valid_ipv6(ipv6)
def detect_address_family(host): if netaddr.valid_ipv4(host): return socket.AF_INET elif netaddr.valid_ipv6(host): return socket.AF_INET6 elif os.path.isdir(os.path.dirname(host)): return socket.AF_UNIX else: return None
def create_connection(address): """ Wrapper for socket.create_connection() function. If *address* (a 2-tuple ``(host, port)``) contains a valid IPv4/v6 address, passes *address* to socket.create_connection(). If *host* is valid path to Unix Domain socket, tries to connect to the server listening on the given socket. :param address: IP address or path to Unix Domain socket. :return: Socket instance. """ host, _port = address if (netaddr.valid_ipv4(host) or netaddr.valid_ipv6(host)): return socket.create_connection(address) elif os.path.exists(host): sock = None try: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect(host) except socket.error as e: if sock is not None: sock.close() raise e return sock else: raise ValueError('Invalid IP address or Unix Socket: %s' % host)
def parse_server_string(server_str): """Parses the given server_string and returns a tuple of host and port. If it's not a combination of host part and port, the port element is an empty string. If the input is invalid expression, return a tuple of two empty strings. """ try: # First of all, exclude pure IPv6 address (w/o port). if netaddr.valid_ipv6(server_str): return (server_str, '') # Next, check if this is IPv6 address with a port number combination. if server_str.find("]:") != -1: (address, port) = server_str.replace('[', '', 1).split(']:') return (address, port) # Third, check if this is a combination of an address and a port if server_str.find(':') == -1: return (server_str, '') # This must be a combination of an address and a port (address, port) = server_str.split(':') return (address, port) except (ValueError, netaddr.AddrFormatError): LOG.error(_LE('Invalid server_string: %s'), server_str) return ('', '')
def general_info(self): """ Return IP in bits, ip_type (ie: private, multicast, loopback,etc..), time updated/returned and version for an IP Address >>> from ipinformation import IPInformation >>> from pprint import pprint >>> pprint( IPInformation(ip_address='8.8.8.8').general_info() ) {'general': {'bits': '00001000000010000000100000001000', 'type': 'public', 'updated': datetime.datetime(2016, 1, 16, 18, 7, 4, 288512), 'version': '4'}} >>> pprint( IPInformation(ip_address='127.0.0.1').general_info() ) {'general': {'bits': '01111111000000000000000000000001', 'type': 'loopback', 'updated': datetime.datetime(2016, 1, 16, 18, 10, 6, 729149), 'version': '4'}} """ data = { 'general': { 'bits': None, 'type': None, 'updated': None, 'version': None} } if not self.ISIP: # print '"%s" is not a valid IP Address.' %self.ip_address # logging_file.error( '"{0}" is not a valid IP Address.'.format(self.ip_address) ) return data if netaddr.valid_ipv4( self.ip_address ): #IPv4 Address ip_version = '4' data['general'].update({'version':ip_version}) ip_bits = netaddr.IPAddress( self.ip_address ).bits().replace( '.', '' ) #Set the IP bits for searching by subnet data['general'].update({'bits':ip_bits}) ip_addr = netaddr.IPAddress(self.ip_address) if ip_addr.is_private(): ip_type = 'private' elif ip_addr.is_multicast(): ip_type = 'multicast' elif ip_addr.is_loopback(): ip_type = 'loopback' elif ip_addr.is_netmask(): ip_type = 'netmask' elif ip_addr.is_reserved(): ip_type = 'reserved' elif ip_addr.is_link_local(): ip_type = 'link_local' elif ip_addr.is_unicast(): ip_type = 'public' else: #Unknown Type ip_type = 'unknown' logging_file.error( '"{0}" is an unknown IP Address.'.format(self.ip_address) ) elif netaddr.valid_ipv6( self.ip_address ): #IPv6 Address#TODO:Finish IPv6 ip_version = '6' print 'Is IPv6' return False data['general'].update( { 'type': ip_type } ) data['general'].update( { 'updated': datetime.utcnow() } ) return data
def main(): module = AnsibleModule( argument_spec=dict( neighbor=dict(required=False, default=None), direction=dict(required=False, choices=['adv', 'rec']), prefix=dict(required=False, default=None) ), supports_check_mode=False ) m_args = module.params neighbor = m_args['neighbor'] direction = m_args['direction'] prefix = m_args['prefix'] regex_ip = re.compile('[0-9a-fA-F.:]+') regex_iprange = re.compile('[0-9a-fA-F.:]+\/\d+') regex_ipv4 = re.compile('[12][0-9]{0,2}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\/?\d+?') if neighbor == None and direction == None and prefix == None: module.fail_json(msg="No support of parsing 'show ip bgp' full prefix table yet") return if neighbor and ((not netaddr.valid_ipv4(neighbor)) and (not netaddr.valid_ipv6(neighbor))): err_message = "Invalid neighbor address %s ??" % neighbor module.fail_json(msg=err_message) return if (neighbor and not direction) or (neighbor and 'adv' not in direction.lower()): err_message = 'No support of parsing this command " show ip(v6) bgp neighbor %s %s" yet' % (neighbor, direction) module.fail_json(msg=err_message) return try: bgproute = BgpRoutes(neighbor, direction, prefix) if prefix: if regex_ipv4.match(prefix): command = "docker exec -i bgp vtysh -c 'show ip bgp " + str(prefix) + "'" else: command = "docker exec -i bgp vtysh -c 'show ipv6 bgp " + str(prefix) + "'" rc, out, err = module.run_command(command) if rc != 0: err_message = "command %s failed rc=%d, out=%s, err=%s" %(command, rt, out, err) module.fail_json(msg=err_message) return bgproute.parse_bgp_route_prefix(out) elif neighbor: if netaddr.valid_ipv4(neighbor): command = "docker exec -i bgp vtysh -c 'show ip bgp neighbor " + str(neighbor) + " " + str(direction) + "'" else: command = "docker exec -i bgp vtysh -c 'show ipv6 bgp neighbor " + str(neighbor) + " " + str(direction) + "'" rc, out, err = module.run_command(command) if rc != 0: err_message = "command %s failed rc=%d, out=%s, err=%s" %(command, rt, out, err) module.fail_json(msg=err_message) return bgproute.parse_bgp_route_adv(out) results = bgproute.get_facts() module.exit_json(ansible_facts=results) except Exception as e: fail_msg = "cannot correctly parse BGP Routing facts!\n" fail_msg += str(e) module.fail_json(msg=fail_msg) return
def _send_ip_route_impl( self, prefix, nexthops=None, safi=packet_safi.UNICAST, flags=zebra.ZEBRA_FLAG_INTERNAL, distance=None, metric=None, mtu=None, tag=None, is_withdraw=False): if ip.valid_ipv4(prefix): if is_withdraw: msg_cls = zebra.ZebraIPv4RouteDelete else: msg_cls = zebra.ZebraIPv4RouteAdd elif ip.valid_ipv6(prefix): if is_withdraw: msg_cls = zebra.ZebraIPv6RouteDelete else: msg_cls = zebra.ZebraIPv6RouteAdd else: raise ValueError('Invalid prefix: %s' % prefix) nexthop_list = [] for nexthop in nexthops: if netaddr.valid_ipv4(nexthop): nexthop_list.append(zebra.NextHopIPv4(addr=nexthop)) elif netaddr.valid_ipv6(nexthop): nexthop_list.append(zebra.NextHopIPv6(addr=nexthop)) else: raise ValueError('Invalid nexthop: %s' % nexthop) msg = zebra.ZebraMessage( version=self.zserv_ver, body=msg_cls( route_type=self.route_type, flags=flags, message=0, safi=safi, prefix=prefix, nexthops=nexthop_list, distance=distance, metric=metric, mtu=mtu, tag=tag)) self.send_msg(msg) return msg