Python socket 模块,inet_pton() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.inet_pton()

项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def inet_pton(address_family, ip_string):
        if address_family == socket.AF_INET:
            return socket.inet_aton(ip_string)

        addr = sockaddr()
        addr.sa_family = address_family
        addr_size = ctypes.c_int(ctypes.sizeof(addr))

        if WSAStringToAddressA(
                ip_string,
                address_family,
                None,
                ctypes.byref(addr),
                ctypes.byref(addr_size)
        ) != 0:
            raise socket.error(ctypes.FormatError())

        if address_family == socket.AF_INET6:
            return ctypes.string_at(addr.ipv6_addr, 16)

        raise socket.error('unknown address family')
项目:routersploit    作者:reverse-shell    | 项目源码 | 文件源码
def ipv4(address):
    address = address.replace("http://", "").replace("https://", "")

    try:
        socket.inet_pton(socket.AF_INET, address)
    except AttributeError:
        try:
            socket.inet_aton(address)
        except socket.error:
            raise OptionValidationError("Option have to be valid IP address.")

        if address.count('.') == 3:
            return address
        else:
            raise OptionValidationError("Option have to be valid IP address.")
    except socket.error:
        raise OptionValidationError("Option have to be valid IP address.")

    return address
项目:Tenable.io-SDK-for-Python    作者:tenable    | 项目源码 | 文件源码
def is_ipv4(value):
    """Utility function to detect if a value is a valid IPv4

        :param value: The value to match against.
        :return: True if the value is a valid IPv4.
    """
    try:
        socket.inet_pton(socket.AF_INET, value)
    except AttributeError:  # no inet_pton here, sorry
        try:
            socket.inet_aton(value)
        except socket.error:
            return False
        return value.count('.') == 3
    except socket.error:  # not a valid address
        return False
    return True
项目:swtoolz-core    作者:xcme    | 项目源码 | 文件源码
def is_valid_ipv4_address(address):
    try:
        socket.inet_pton(socket.AF_INET, address)
    except AttributeError:  # no inet_pton here, sorry
        try:
            socket.inet_aton(address)
        except socket.error:
            return False
        return address.count('.') == 3
    except socket.error:  # not a valid address
        return False

    return True


# ??????? ??? ??????????? ?????????? ???????????? ? OID. ??????????????? ???????? '%s' ?? ????????? ? ??????? ????? '0' (?????? ????????)
# ????? ??????? ???????? ??? {?} ?? ??????? ? ?? ??????? ? ???????
# ??????, ??? ?????????? ?????????????? ??????? ? ??? ??? ??????? ??? ????, ?? ???? ??????? ????? ?????????? :)
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def ipv4(address):
    address = address.replace("http://", "").replace("https://", "")

    try:
        socket.inet_pton(socket.AF_INET, address)
    except AttributeError:
        try:
            socket.inet_aton(address)
        except socket.error:
            raise OptionValidationError("Option have to be valid IP address.")

        if address.count('.') == 3:
            return address
        else:
            raise OptionValidationError("Option have to be valid IP address.")
    except socket.error:
        raise OptionValidationError("Option have to be valid IP address.")

    return address
项目:pyShadowsocks    作者:FTwOoO    | 项目源码 | 文件源码
def _pack_addr_bytes_from(self, addr_type, addr):
        addr_bytes = None
        if addr_type in [constants.SOCKS5_ADDRTYPE_IPV4, constants.SOCKS5_ADDRTYPE_IPV6]:

            addr_bytes = socket.inet_pton({constants.SOCKS5_ADDRTYPE_IPV4: socket.AF_INET,
                                           constants.SOCKS5_ADDRTYPE_IPV6: socket.AF_INET6}[addr_type],
                                          addr)

        elif addr_type == constants.SOCKS5_ADDRTYPE_HOST:
            if len(addr) > 255:
                addr = addr[:255]
            addr_bytes = chr(len(addr)) + addr
            addr_bytes = addr_bytes.encode('utf-8')

        addr_bytes = chr(addr_type).encode('utf-8') + addr_bytes
        return addr_bytes
项目:PiBunny    作者:tholum    | 项目源码 | 文件源码
def is_fdqn(self):
        # I will assume the following
        # If I can't socket.inet_aton() then it's not an IPv4 address
        # Same for ipv6, but since socket.inet_pton is not available in Windows, I'll look for ':'. There can't be
        # an FQDN with ':'
        # Is it isn't both, then it is a FDQN
        try:
            socket.inet_aton(self.__target)
        except:
            # Not an IPv4
            try:
                self.__target.index(':')
            except:
                # Not an IPv6, it's a FDQN
                return True
        return False
项目:tidb-ansible    作者:pingcap    | 项目源码 | 文件源码
def _convert_host_to_hex(host):
    """
    Convert the provided host to the format in /proc/net/tcp*

    /proc/net/tcp uses little-endian four byte hex for ipv4
    /proc/net/tcp6 uses little-endian per 4B word for ipv6

    Args:
        host: String with either hostname, IPv4, or IPv6 address

    Returns:
        List of tuples containing address family and the
        little-endian converted host
    """
    ips = []
    if host is not None:
        for family, ip in _convert_host_to_ip(host):
            hexip_nf = binascii.b2a_hex(socket.inet_pton(family, ip))
            hexip_hf = ""
            for i in range(0, len(hexip_nf), 8):
                ipgroup_nf = hexip_nf[i:i+8]
                ipgroup_hf = socket.ntohl(int(ipgroup_nf, base=16))
                hexip_hf = "%s%08X" % (hexip_hf, ipgroup_hf)
            ips.append((family, hexip_hf))
    return ips
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def str_to_int(addr, flags=0):
    """
    :param addr: An IPv4 dotted decimal address in string form.

    :param flags: decides which rules are applied to the interpretation of the
        addr value. Supported constants are INET_PTON and ZEROFILL. See the
        netaddr.core docs for details.

    :return: The equivalent unsigned integer for a given IPv4 address.
    """
    if flags & ZEROFILL:
        addr = '.'.join(['%d' % int(i) for i in addr.split('.')])

    try:
        if flags & INET_PTON:
            return _struct.unpack('>I', _inet_pton(AF_INET, addr))[0]
        else:
            return _struct.unpack('>I', _inet_aton(addr))[0]
    except Exception:
        raise AddrFormatError('%r is not a valid IPv4 address string!' % addr)
项目:autoreg    作者:pbeyssac    | 项目源码 | 文件源码
def checkip(ip):
  """Check IPv4/IPv6 address for validity"""
  try:
    if ':' in ip:
      socket.inet_pton(socket.AF_INET6, ip)
    else:
      socket.inet_pton(socket.AF_INET, ip)
  except socket.error:
    return False
  return True

#
# Regexp for a valid FQDN:
#  - only letters, digits, '-'
#  - no '-' at the beginning or end of a label
#  - at least one '.'
#  - no '.' at the beginning or end
#
项目:fdslight    作者:fdslight    | 项目源码 | 文件源码
def __init__(self, subnet, prefix, is_ipv6=False):

        self.__no_use_iplist_num = []
        self.__subnet = subnet
        self.__prefix = prefix
        self.__is_ipv6 = is_ipv6

        if not is_ipv6:
            self.__fa = socket.AF_INET
            self.__cur_max_ipaddr_num = utils.bytes2number(socket.inet_pton(socket.AF_INET, subnet))
            self.__prefix_num = utils.calc_net_prefix_num(prefix)
        else:
            self.__fa = socket.AF_INET6
            self.__cur_max_ipaddr_num = utils.bytes2number(socket.inet_pton(socket.AF_INET6, subnet))
            self.__prefix_num = utils.calc_net_prefix_num(prefix, is_ipv6=True)

        self.__subnet_num = self.__cur_max_ipaddr_num
        return
项目:fdslight    作者:fdslight    | 项目源码 | 文件源码
def calc_subnet(ipaddr, prefix, is_ipv6=False):
    if is_ipv6 and prefix == 128: return ipaddr
    if not is_ipv6 and prefix == 32: return ipaddr

    q = int(prefix / 8)
    r = prefix % 8

    if is_ipv6:
        byte_ipaddr = socket.inet_pton(socket.AF_INET6, ipaddr)
        results = list(bytes(16))
    else:
        byte_ipaddr = socket.inet_pton(socket.AF_INET, ipaddr)
        results = list(bytes(4))

    results[0:q] = byte_ipaddr[0:q]
    v = 0
    for n in range(r + 1):
        if n == 0: continue
        v += 2 ** (8 - n)

    results[q] = byte_ipaddr[q] & v
    if is_ipv6:
        return socket.inet_ntop(socket.AF_INET6, bytes(results))
    else:
        return socket.inet_ntop(socket.AF_INET, bytes(results))
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def _convert_host_to_hex(host):
    """
    Convert the provided host to the format in /proc/net/tcp*

    /proc/net/tcp uses little-endian four byte hex for ipv4
    /proc/net/tcp6 uses little-endian per 4B word for ipv6

    Args:
        host: String with either hostname, IPv4, or IPv6 address

    Returns:
        List of tuples containing address family and the
        little-endian converted host
    """
    ips = []
    if host is not None:
        for family, ip in _convert_host_to_ip(host):
            hexip_nf = binascii.b2a_hex(socket.inet_pton(family, ip))
            hexip_hf = ""
            for i in range(0, len(hexip_nf), 8):
                ipgroup_nf = hexip_nf[i:i+8]
                ipgroup_hf = socket.ntohl(int(ipgroup_nf, base=16))
                hexip_hf = "%s%08X" % (hexip_hf, ipgroup_hf)
            ips.append((family, hexip_hf))
    return ips
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def is_valid_address(cls, ip):

        if ip.count('/') == 1:
            ip_address, netmask = ip.split('/')
        else:
            ip_address = ip

        if len(ip_address.split('.')) == 4:
            try:
                socket.inet_pton(socket.AF_INET, ip_address)
            except socket.error:
                return False

            if not 0 <= netmask <= 32:
                return False
        else:
            try:
                socket.inet_pton(socket.AF_INET6, ip_address)
            except socket.error:
                return False

            if not 0 <= netmask <= 128:
                return False

        return True
项目:python-cookbook-3rd    作者:tuanavu    | 项目源码 | 文件源码
def cidr_range(cidr_address):
    family = AF_INET6 if ':' in cidr_address else AF_INET
    address, maskstr = cidr_address.split('/')
    maskbits = int(maskstr)

    # Parse the supplied address into bytes
    addr_bytes = inet_pton(family, address)

    # Calculate number of address bytes and mask bits
    addr_len = len(addr_bytes)
    numaddrs = 2**(addr_len*8 - maskbits)
    mask = -numaddrs

    # Generate addresses
    addr = int.from_bytes(addr_bytes, 'big') & mask
    for n in range(numaddrs):
        yield inet_ntop(family, (addr+n).to_bytes(addr_len, 'big'))
项目:NeoAlchemy    作者:TwoBitAlchemist    | 项目源码 | 文件源码
def IPv6(ip_addr):
    """
    Validate a string as an IPv6 address. See `man 3 inet_pton` for details.

    Availability is platform-dependent.
    """
    ip_addr = str(ip_addr)

    try:
        socket.inet_pton(socket.AF_INET6, ip_addr)
    except socket.error:
        raise ValueError('Invalid IPv6 address: %s' % ip_addr)
    except AttributeError:
        raise ValueError('IPv6 validation unavailable on this platform.')

    return ip_addr
项目:heartbreaker    作者:lokori    | 项目源码 | 文件源码
def addrparse(addr_str):
  if addr_str is None:
    domain=socket.AF_INET
    addr,port = "0.0.0.0",0
  elif addr_str.count(':') == 0:
    # Port only given
    domain=socket.AF_INET
    addr,port="0.0.0.0",addr_str
  elif  addr_str.count(':') == 1:
    # IPv4 address and port
    domain=socket.AF_INET
    addr,port=addr_str.rsplit(':')
  else:
    domain=socket.AF_INET6
    addr,port=addr_str.rsplit(':',1)
  try:
    socket.inet_pton(domain,addr)
  except:
    sys.exit("Invalid address %s" % addr)
  try:
    port = int(port)
  except:
    sys.exit("Invalid port '%s'" % port)

  return domain, addr, port
项目:dmpt    作者:sistason    | 项目源码 | 文件源码
def __init__(self, destination, routing_specifications, options):
        LoadbalancerControl.__init__(self, options)

        # Use default port, if none specified
        if not ":" in destination:
            self.destination = (destination, self.port)
        else:
            self.destination = destination.rsplit(':',1)

        # Is destination a v6-address?
        try:
            socket.inet_pton(socket.AF_INET6, self.destination[0])
            self.set_socket(socket.socket(socket.AF_INET6, socket.SOCK_DGRAM))
        except:
            self.set_socket(socket.socket(socket.AF_INET, socket.SOCK_DGRAM))


        for routing_specs in routing_specifications:
            self.assign_loadbalancer_to_specs(routing_specs)
项目:openbmp-mrt    作者:OpenBMP    | 项目源码 | 文件源码
def encode_attr_mp_nexthop(nexthop):
    """ Encode string nexthop into a BGP MP_REACH attribute TLV

        A modified MP_REACH attribute that includes only the next hop info (per MRT RFC6396)
        This includes only the next hop address lenght (1 octet) and the next-hop

        :param nexthop:      String value for the next-hop

        :return: BGP attribute TLV ready to be written
    """
    # Set afi and length based on nexthop address
    if ':' in nexthop:
        nh_len = 16
        afi = socket.AF_INET6
    else:
        nh_len = 4
        afi = socket.AF_INET

    return ATTR_TYPE_MP_REACH + pack('!BB', nh_len + 1, nh_len) + socket.inet_pton(afi, nexthop)
项目:openbmp-mrt    作者:OpenBMP    | 项目源码 | 文件源码
def encode_attr_cluster_list(cluster_list):
    """ Encode string cluster_list into a BGP CLUSTER_LIST attribute TLV

        :param cluster_list:      String space delimited list of cluster id's

        :return: BGP attribute TLV ready to be written
    """
    encode_cluster_list = ""

    try:
        for cluster_id in cluster_list.split(' '):
            encode_cluster_list += socket.inet_pton(socket.AF_INET, cluster_id)

        return ATTR_TYPE_CLUSTER_LIST + pack('!B', len(encode_cluster_list)) + encode_cluster_list

    except:
        return ''
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def is_valid_ipv4(ip_addr):
    """
    Return buffer in network order
    """
    if  type(ip_addr) == bytes and len(ip_addr) == 4:
        return ip_addr

    if  type(ip_addr)== int:
        ip_addr = socket.inet_ntoa(struct.pack("!I", ip_addr))

    try:
        return socket.inet_pton(socket.AF_INET, ip_addr)
    except AttributeError:  # no inet_pton here, sorry
        return socket.inet_aton(ip_addr)
    except socket.error:  # not a valid address
        raise CTRexPacketBuildException(-10,"Not valid ipv4 format");
项目:vcdriver    作者:Lantero    | 项目源码 | 文件源码
def validate_ipv4(ip):
    """
    Validate an ipv4 address
    :param ip: The string with the ip

    :return: True ip if it's valid for ipv4, False otherwise
    """
    ip = str(ip)
    try:
        socket.inet_pton(socket.AF_INET, ip)
    except AttributeError:
        try:
            socket.inet_aton(ip)
        except socket.error:
            return False
    except socket.error:
        return False
    return True
项目:cidr-tools    作者:silverwind    | 项目源码 | 文件源码
def str_to_int(addr, flags=0):
    """
    :param addr: An IPv4 dotted decimal address in string form.

    :param flags: decides which rules are applied to the interpretation of the
        addr value. Supported constants are INET_PTON and ZEROFILL. See the
        netaddr.core docs for details.

    :return: The equivalent unsigned integer for a given IPv4 address.
    """
    if flags & ZEROFILL:
        addr = '.'.join(['%d' % int(i) for i in addr.split('.')])

    try:
        if flags & INET_PTON:
            return _struct.unpack('>I', _inet_pton(AF_INET, addr))[0]
        else:
            return _struct.unpack('>I', _inet_aton(addr))[0]
    except Exception:
        raise AddrFormatError('%r is not a valid IPv4 address string!' % addr)
项目:pycrate    作者:ANSSI-FR    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        # enable to pass IPv4 addr in human-readable format
        if 'val' in kwargs:
            if 'src' in kwargs['val'] and len(kwargs['val']['src']) != 16:
                try:
                    kwargs['val']['src'] = inet_pton(AF_INET6, kwargs['val']['src'])
                except:
                    pass
            if 'dst' in kwargs['val'] and len(kwargs['val']['dst']) != 16:
                try:
                    kwargs['val']['dst'] = inet_pton(AF_INET6, kwargs['val']['dst'])
                except:
                    pass
        Envelope.__init__(self, *args, **kwargs)
        if 'val' not in kwargs or 'plen' not in kwargs['val']:
            self[3].set_valauto(self._set_plen_val)
        if 'val' not in kwargs or 'next' not in kwargs['val']:
            self[4].set_valauto(self._set_next_val)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def testIPv6toString(self):
        try:
            from socket import inet_pton, AF_INET6, has_ipv6
            if not has_ipv6:
                self.skipTest('IPv6 not available')
        except ImportError:
            self.skipTest('could not import needed symbols from socket')
        f = lambda a: inet_pton(AF_INET6, a)

        self.assertEqual('\x00' * 16, f('::'))
        self.assertEqual('\x00' * 16, f('0::0'))
        self.assertEqual('\x00\x01' + '\x00' * 14, f('1::'))
        self.assertEqual(
            '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
            f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
        )
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def testIPv6toString(self):
        try:
            from socket import inet_pton, AF_INET6, has_ipv6
            if not has_ipv6:
                self.skipTest('IPv6 not available')
        except ImportError:
            self.skipTest('could not import needed symbols from socket')
        f = lambda a: inet_pton(AF_INET6, a)

        self.assertEqual('\x00' * 16, f('::'))
        self.assertEqual('\x00' * 16, f('0::0'))
        self.assertEqual('\x00\x01' + '\x00' * 14, f('1::'))
        self.assertEqual(
            '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
            f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
        )
项目:scapy-bpf    作者:guedou    | 项目源码 | 文件源码
def get_if_raw_addr(ifname):
    """Returns the IPv4 address configured on 'ifname', packed with inet_pton."""

    # Get ifconfig output
    try:
        fd = os.popen("%s %s" % (conf.prog.ifconfig, ifname))
    except OSError, msg:
        raise Scapy_Exception("Failed to execute ifconfig: (%s)" % msg)

    # Get IPv4 addresses
    addresses = [l for l in fd.readlines() if l.find("netmask") >= 0]
    if not addresses:
        raise Scapy_Exception("No IPv4 address found on %s !" % ifname)

    # Pack the first address
    address = addresses[0].split(' ')[1]
    return socket.inet_pton(socket.AF_INET, address)
项目:scapy-bpf    作者:guedou    | 项目源码 | 文件源码
def neighsol(addr, src, iface, timeout=1, chainCC=0):
    """
    Sends an ICMPv6 Neighbor Solicitation message to get the MAC address
    of the neighbor with specified IPv6 address addr. 'src' address is 
    used as source of the message. Message is sent on iface. By default,
    timeout waiting for an answer is 1 second.

    If no answer is gathered, None is returned. Else, the answer is 
    returned (ethernet frame).
    """

    nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr))
    d = inet_ntop(socket.AF_INET6, nsma)
    dm = in6_getnsmac(nsma)
    p = Ether(dst=dm)/IPv6(dst=d, src=src, hlim=255)
    p /= ICMPv6ND_NS(tgt=addr)
    p /= ICMPv6NDOptSrcLLAddr(lladdr=get_if_hwaddr(iface))
    res = srp1(p,type=ETH_P_IPV6, iface=iface, timeout=1, verbose=0, 
               chainCC=chainCC)    

    return res
项目:scapy-bpf    作者:guedou    | 项目源码 | 文件源码
def i2m(self, pkt, x):
        l = pkt.len

        if x is None:
            x = "::"
            if l is None:
                l = 1
        x = inet_pton(socket.AF_INET6, x)

        if l is None:
            return x
        if l in [0, 1]:
            return ""
        if l in [2, 3]:
            return x[:8*(l-1)]

        return x + '\x00'*8*(l-3)
项目:scapy-bpf    作者:guedou    | 项目源码 | 文件源码
def _resolve_one(self, ip):
        """
        overloaded version to provide a Whois resolution on the
        embedded IPv4 address if the address is 6to4 or Teredo. 
        Otherwise, the native IPv6 address is passed.
        """

        if in6_isaddr6to4(ip): # for 6to4, use embedded @
            tmp = inet_pton(socket.AF_INET6, ip)
            addr = inet_ntop(socket.AF_INET, tmp[2:6])
        elif in6_isaddrTeredo(ip): # for Teredo, use mapped address
            addr = teredoAddrExtractInfo(ip)[2]
        else:
            addr = ip

        _, asn, desc = AS_resolver_riswhois._resolve_one(self, addr)

        return ip,asn,desc
项目:autoinjection    作者:ChengWiLL    | 项目源码 | 文件源码
def inet_pton(address_family, ip_string):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))

    if WSAStringToAddressA(
            ip_string,
            address_family,
            None,
            ctypes.byref(addr),
            ctypes.byref(addr_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    if address_family == socket.AF_INET:
        return ctypes.string_at(addr.ipv4_addr, 4)
    if address_family == socket.AF_INET6:
        return ctypes.string_at(addr.ipv6_addr, 16)

    raise socket.error('unknown address family')
项目:punter    作者:nethunteros    | 项目源码 | 文件源码
def ip_to_integer(ip_address):
    """
    Converts an IP address expressed as a string to its
    representation as an integer value and returns a tuple
    (ip_integer, version), with version being the IP version
    (either 4 or 6).

    Both IPv4 addresses (e.g. "192.168.1.1") and IPv6 addresses
    (e.g. "2a02:a448:ddb0::") are accepted.
    """

    # try parsing the IP address first as IPv4, then as IPv6
    for version in (socket.AF_INET, socket.AF_INET6):

        try:
            ip_hex = socket.inet_pton(version, ip_address)
            ip_integer = int(binascii.hexlify(ip_hex), 16)

            return (ip_integer, 4 if version == socket.AF_INET else 6)
        except:
            pass

    raise ValueError("invalid IP address")
项目:CVE-2017-7494    作者:joxeankoret    | 项目源码 | 文件源码
def is_fdqn(self):
        # I will assume the following
        # If I can't socket.inet_aton() then it's not an IPv4 address
        # Same for ipv6, but since socket.inet_pton is not available in Windows, I'll look for ':'. There can't be
        # an FQDN with ':'
        # Is it isn't both, then it is a FDQN
        try:
            socket.inet_aton(self.__target)
        except:
            # Not an IPv4
            try:
                self.__target.index(':')
            except:
                # Not an IPv6, it's a FDQN
                return True
        return False
项目:scapy-radio    作者:BastilleResearch    | 项目源码 | 文件源码
def compressSourceAddr(self, ipv6):
        #print "compressSourceAddr"
        tmp_ip = socket.inet_pton(socket.AF_INET6, ipv6.src)

        if self.sac == 0:
            if self.sam == 0x0:
                tmp_ip = tmp_ip
            elif self.sam == 0x1:
                tmp_ip = tmp_ip[8:16]
            elif self.sam == 0x2:
                tmp_ip = tmp_ip[14:16]
            else: #self.sam == 0x3:
                pass
        else: #self.sac == 1
            if self.sam == 0x0:
                tmp_ip = "\x00"*16
            elif self.sam == 0x1:
                tmp_ip = tmp_ip[8:16]
            elif self.sam == 0x2:
                tmp_ip = tmp_ip[14:16]

        self.sourceAddr = socket.inet_ntop(socket.AF_INET6, "\x00"*(16-len(tmp_ip)) + tmp_ip)
        return self.sourceAddr
项目:powerstager    作者:z0noxz    | 项目源码 | 文件源码
def is_ipv4_address(address):
        try:
            socket.inet_pton(socket.AF_INET, address)
        except AttributeError:
            try:
                socket.inet_aton(address)
            except socket.error:
                return False
            return [x for x in address.split(".") if (len(x) > 0 and x.isdigit() and int(x) >= 0 and int(x) <= 255)] == 4
        except socket.error:
            return False

        return True


# Class for printing and styling text to terminal
项目:LFISuite    作者:D35m0nd142    | 项目源码 | 文件源码
def _write_SOCKS5_address(self, addr, file):
        """
        Return the host and port packed for the SOCKS5 protocol,
        and the resolved address as a tuple object.
        """
        host, port = addr
        proxy_type, _, _, rdns, username, password = self.proxy
        family_to_byte = {socket.AF_INET: b"\x01", socket.AF_INET6: b"\x04"}

        # If the given destination address is an IP address, we'll
        # use the IP address request even if remote resolving was specified.
        # Detect whether the address is IPv4/6 directly.
        for family in (socket.AF_INET, socket.AF_INET6):
            try:
                addr_bytes = socket.inet_pton(family, host)
                file.write(family_to_byte[family] + addr_bytes)
                host = socket.inet_ntop(family, addr_bytes)
                file.write(struct.pack(">H", port))
                return host, port
            except socket.error:
                continue

        # Well it's not an IP number, so it's probably a DNS name.
        if rdns:
            # Resolve remotely
            host_bytes = host.encode("idna")
            file.write(b"\x03" + chr(len(host_bytes)).encode() + host_bytes)
        else:
            # Resolve locally
            addresses = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_ADDRCONFIG)
            # We can't really work out what IP is reachable, so just pick the
            # first.
            target_addr = addresses[0]
            family = target_addr[0]
            host = target_addr[4][0]

            addr_bytes = socket.inet_pton(family, host)
            file.write(family_to_byte[family] + addr_bytes)
            host = socket.inet_ntop(family, addr_bytes)
        file.write(struct.pack(">H", port))
        return host, port
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def __host_per_rfc_2732(host):
    "Format a host name or IP for a URL according to RFC 2732"

    try:
        socket.inet_pton(socket.AF_INET6, host)
        return "[%s]" % (host)
    except socket.error:
        return host  # Not an IPv6 address
项目:shadowsocksR-b    作者:hao35954514    | 项目源码 | 文件源码
def inet_pton(family, addr):
    addr = to_str(addr)
    if family == socket.AF_INET:
        return socket.inet_aton(addr)
    elif family == socket.AF_INET6:
        if '.' in addr:  # a v4 addr
            v4addr = addr[addr.rindex(':') + 1:]
            v4addr = socket.inet_aton(v4addr)
            v4addr = ['%02X' % ord(x) for x in v4addr]
            v4addr.insert(2, ':')
            newaddr = addr[:addr.rindex(':') + 1] + ''.join(v4addr)
            return inet_pton(family, newaddr)
        dbyts = [0] * 8  # 8 groups
        grps = addr.split(':')
        for i, v in enumerate(grps):
            if v:
                dbyts[i] = int(v, 16)
            else:
                for j, w in enumerate(grps[::-1]):
                    if w:
                        dbyts[7 - j] = int(w, 16)
                    else:
                        break
                break
        return b''.join((chr(i // 256) + chr(i % 256)) for i in dbyts)
    else:
        raise RuntimeError("What family?")
项目:shadowsocksR-b    作者:hao35954514    | 项目源码 | 文件源码
def is_ip(address):
    for family in (socket.AF_INET, socket.AF_INET6):
        try:
            if type(address) != str:
                address = address.decode('utf8')
            inet_pton(family, address)
            return family
        except (TypeError, ValueError, OSError, IOError):
            pass
    return False
项目:shadowsocksR-b    作者:hao35954514    | 项目源码 | 文件源码
def patch_socket():
    if not hasattr(socket, 'inet_pton'):
        socket.inet_pton = inet_pton

    if not hasattr(socket, 'inet_ntop'):
        socket.inet_ntop = inet_ntop
项目:shadowsocksR-b    作者:hao35954514    | 项目源码 | 文件源码
def pack_addr(address):
    address_str = to_str(address)
    for family in (socket.AF_INET, socket.AF_INET6):
        try:
            r = socket.inet_pton(family, address_str)
            if family == socket.AF_INET6:
                return b'\x04' + r
            else:
                return b'\x01' + r
        except (TypeError, ValueError, OSError, IOError):
            pass
    if len(address) > 255:
        address = address[:255]  # TODO
    return b'\x03' + chr(len(address)) + address
项目:shadowsocksR-b    作者:hao35954514    | 项目源码 | 文件源码
def add_network(self, addr):
        if addr is "":
            return
        block = addr.split('/')
        addr_family = is_ip(block[0])
        addr_len = IPNetwork.ADDRLENGTH[addr_family]
        if addr_family is socket.AF_INET:
            ip, = struct.unpack("!I", socket.inet_aton(block[0]))
        elif addr_family is socket.AF_INET6:
            hi, lo = struct.unpack("!QQ", inet_pton(addr_family, block[0]))
            ip = (hi << 64) | lo
        else:
            raise Exception("Not a valid CIDR notation: %s" % addr)
        if len(block) is 1:
            prefix_size = 0
            while (ip & 1) == 0 and ip is not 0:
                ip >>= 1
                prefix_size += 1
            logging.warn("You did't specify CIDR routing prefix size for %s, "
                         "implicit treated as %s/%d" % (addr, addr, addr_len))
        elif block[1].isdigit() and int(block[1]) <= addr_len:
            prefix_size = addr_len - int(block[1])
            ip >>= prefix_size
        else:
            raise Exception("Not a valid CIDR notation: %s" % addr)
        if addr_family is socket.AF_INET:
            self._network_list_v4.append((ip, prefix_size))
        else:
            self._network_list_v6.append((ip, prefix_size))
项目:shadowsocksR-b    作者:hao35954514    | 项目源码 | 文件源码
def test_inet_conv():
    ipv4 = b'8.8.4.4'
    b = inet_pton(socket.AF_INET, ipv4)
    assert inet_ntop(socket.AF_INET, b) == ipv4
    ipv6 = b'2404:6800:4005:805::1011'
    b = inet_pton(socket.AF_INET6, ipv6)
    assert inet_ntop(socket.AF_INET6, b) == ipv6
项目:libbgp    作者:smartbgp    | 项目源码 | 文件源码
def pack(data):
        return socket.inet_pton(socket.AF_INET if len(data.split('.')) == 4 else socket.AF_INET6, data)
项目:my-weather-indicator    作者:atareao    | 项目源码 | 文件源码
def addr_to_dbus(addr, family):
        if (family == socket.AF_INET):
            return dbus.UInt32(struct.unpack('I', socket.inet_pton(family, addr))[0])
        else:
            return dbus.ByteArray(socket.inet_pton(family, addr))
项目:twampy    作者:nokia    | 项目源码 | 文件源码
def reqSession(self, sender="", s_port=20001, receiver="", r_port=20002, startTime=0, timeOut=3, dscp=0, padding=0):
        typeP = dscp << 24

        if startTime != 0:
            startTime += now() + TIMEOFFSET

        if sender == "":
            request = struct.pack('!4B L L H H 13L 4ILQ4L', 5, 4, 0, 0, 0, 0, s_port, r_port, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, padding, startTime, 0, timeOut, 0, typeP, 0, 0, 0, 0, 0)
        elif sender == "::":
            request = struct.pack('!4B L L H H 13L 4ILQ4L', 5, 6, 0, 0, 0, 0, s_port, r_port, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, padding, startTime, 0, timeOut, 0, typeP, 0, 0, 0, 0, 0)
        elif ':' in sender:
            s = socket.inet_pton(socket.AF_INET6, sender)
            r = socket.inet_pton(socket.AF_INET6, receiver)
            request = struct.pack('!4B L L H H 16s 16s 4L L 4ILQ4L', 5, 6, 0, 0, 0, 0, s_port, r_port, s, r, 0, 0, 0, 0, padding, startTime, 0, timeOut, 0, typeP, 0, 0, 0, 0, 0)
        else:
            s = socket.inet_pton(socket.AF_INET, sender)
            r = socket.inet_pton(socket.AF_INET, receiver)
            request = struct.pack('!4B L L H H 16s 16s 4L L 4ILQ4L', 5, 4, 0, 0, 0, 0, s_port, r_port, s, r, 0, 0, 0, 0, padding, startTime, 0, timeOut, 0, typeP, 0, 0, 0, 0, 0)

        log.info("CTRL.TX <<Request Session>>")
        self.send(request)
        log.info("CTRL.RX <<Session Accept>>")
        data = self.receive()

        rval = ord(data[0])
        if rval != 0:
            log.critical("ERROR CODE %d in <<Session Accept>>", rval)
            return False
        return True
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def _parse_ip(string, families=(socket.AF_INET, socket.AF_INET6)):
    for family in families:
        try:
            return socket.inet_ntop(family, socket.inet_pton(family, string))
        except (ValueError, socket.error):
            pass
    return None
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def _nibbles(ipv6, _hex="0123456789abcdef"):
    result = []
    for ch in socket.inet_pton(socket.AF_INET6, ipv6):
        num = ord(ch)
        result.append(_hex[num >> 4])
        result.append(_hex[num & 0xf])
    return result
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def parse_ipv4(ip):
    try:
        packed = inet_pton(AF_INET, ip.strip())
    except (error, UnicodeEncodeError):
        return None

    ip_num, = struct.unpack("!I", packed)
    return ip_num
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def parse_ipv6(ip):
    try:
        packed = inet_pton(AF_INET6, ip.strip())
    except (error, UnicodeEncodeError):
        return None

    hi, lo = struct.unpack("!QQ", packed)
    return (hi << 64) | lo