Python socket 模块,AF_INET6 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.AF_INET6

项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def __init__(self, node, platform='', cpus=0, memory=0, disk=0):
        if node.find('*') < 0:
            try:
                info = socket.getaddrinfo(node, None)[0]
                ip_addr = info[4][0]
                if info[0] == socket.AF_INET6:
                    ip_addr = re.sub(r'^0*', '', ip_addr)
                    ip_addr = re.sub(r':0*', ':', ip_addr)
                    ip_addr = re.sub(r'::+', '::', ip_addr)
                node = ip_addr
            except:
                node = ''

        if node:
            self.ip_rex = node.replace('.', '\\.').replace('*', '.*')
        else:
            logger.warning('node "%s" is invalid', node)
            self.ip_rex = ''
        self.platform = platform.lower()
        self.cpus = cpus
        self.memory = memory
        self.disk = disk
项目:spoon    作者:SpamExperts    | 项目源码 | 文件源码
def __init__(self, address):
        self.log = logging.getLogger(self.server_logger)
        self.socket = None
        if ":" in address[0]:
            self.address_family = socket.AF_INET6
        else:
            self.address_family = socket.AF_INET
        self.log.debug("Listening on %s", address)

        super(_SpoonMixIn, self).__init__(address, self.handler_klass,
                                          bind_and_activate=False)
        self.load_config()
        self._setup_socket()

        # Finally, set signals
        if self.signal_reload is not None:
            signal.signal(self.signal_reload, self.reload_handler)
        if self.signal_shutdown is not None:
            signal.signal(self.signal_shutdown, self.shutdown_handler)
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def api_has_bwctl(host, timeout=5, bind=None):
    """
    Determine if a host is running the BWCTL daemon
    """

    # Null implies localhost
    if host is None:
        host = "localhost"

    # HACK: BWTCLBC
    # If the environment says to bind to a certain address, do it.
    if bind is None:
        bind = os.environ.get('PSCHEDULER_LEAD_BIND_HACK', None)

    for family in [socket.AF_INET, socket.AF_INET6]:
        try:
            with closing(socket.socket(family, socket.SOCK_STREAM)) as sock:
                if bind is not None:
                    sock.bind((bind, 0))
                sock.settimeout(timeout)
                return sock.connect_ex((host, 4823)) == 0
        except socket.error:
            pass

    return False
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def source_interface(address, port=80, ip_version=None):
    """Figure out what local interface is being used to
    reach an address.

    Returns a tuple of (address, interface_name)
    """

    sock = socket.socket(
        socket.AF_INET6 if ip_version == 6 else socket.AF_INET,
        socket.SOCK_DGRAM)
    try:
        sock.connect((address, port))
    except socket.error:
        return (None, None)

    interface_address = sock.getsockname()[0]

    sock.close()

    interface_name = address_interface(interface_address, ip_version=ip_version)

    if interface_name:
        return (interface_address, interface_name)

    return (None, None)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def inet_pton(address_family, ip_string):
        if address_family == socket.AF_INET:
            return socket.inet_aton(ip_string)

        addr = sockaddr()
        addr.sa_family = address_family
        addr_size = ctypes.c_int(ctypes.sizeof(addr))

        if WSAStringToAddressA(
                ip_string,
                address_family,
                None,
                ctypes.byref(addr),
                ctypes.byref(addr_size)
        ) != 0:
            raise socket.error(ctypes.FormatError())

        if address_family == socket.AF_INET6:
            return ctypes.string_at(addr.ipv6_addr, 16)

        raise socket.error('unknown address family')
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def select_ip_version(host, port):
    """Returns AF_INET4 or AF_INET6 depending on where to connect to."""
    # disabled due to problems with current ipv6 implementations
    # and various operating systems.  Probably this code also is
    # not supposed to work, but I can't come up with any other
    # ways to implement this.
    # try:
    #     info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
    #                               socket.SOCK_STREAM, 0,
    #                               socket.AI_PASSIVE)
    #     if info:
    #         return info[0][0]
    # except socket.gaierror:
    #     pass
    if ':' in host and hasattr(socket, 'AF_INET6'):
        return socket.AF_INET6
    return socket.AF_INET
项目:shadowsocksR-b    作者:hao35954514    | 项目源码 | 文件源码
def _socket_bind_addr(self, sock, af):
        bind_addr = ''
        if self._bind and af == socket.AF_INET:
            bind_addr = self._bind
        elif self._bindv6 and af == socket.AF_INET6:
            bind_addr = self._bindv6
        else:
            bind_addr = self._accept_address[0]

        bind_addr = bind_addr.replace("::ffff:", "")
        if bind_addr in self._ignore_bind_list:
            bind_addr = None
        if bind_addr:
            local_addrs = socket.getaddrinfo(bind_addr, 0, 0, socket.SOCK_STREAM, socket.SOL_TCP)
            if local_addrs[0][0] == af:
                logging.debug("bind %s" % (bind_addr,))
                try:
                    sock.bind((bind_addr, 0))
                except Exception as e:
                    logging.warn("bind %s fail" % (bind_addr,))
项目:shadowsocksR-b    作者:hao35954514    | 项目源码 | 文件源码
def _socket_bind_addr(self, sock, af):
        bind_addr = ''
        if self._bind and af == socket.AF_INET:
            bind_addr = self._bind
        elif self._bindv6 and af == socket.AF_INET6:
            bind_addr = self._bindv6

        bind_addr = bind_addr.replace("::ffff:", "")
        if bind_addr in self._ignore_bind_list:
            bind_addr = None
        if bind_addr:
            local_addrs = socket.getaddrinfo(bind_addr, 0, 0, socket.SOCK_DGRAM, socket.SOL_UDP)
            if local_addrs[0][0] == af:
                logging.debug("bind %s" % (bind_addr,))
                try:
                    sock.bind((bind_addr, 0))
                except Exception as e:
                    logging.warn("bind %s fail" % (bind_addr,))
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def select_ip_version(host, port):
    """Returns AF_INET4 or AF_INET6 depending on where to connect to."""
    # disabled due to problems with current ipv6 implementations
    # and various operating systems.  Probably this code also is
    # not supposed to work, but I can't come up with any other
    # ways to implement this.
    # try:
    #     info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
    #                               socket.SOCK_STREAM, 0,
    #                               socket.AI_PASSIVE)
    #     if info:
    #         return info[0][0]
    # except socket.gaierror:
    #     pass
    if ':' in host and hasattr(socket, 'AF_INET6'):
        return socket.AF_INET6
    return socket.AF_INET
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_ipv6(self):
        try:
            [sock] = bind_sockets(None, '::1', family=socket.AF_INET6)
            port = sock.getsockname()[1]
            self.http_server.add_socket(sock)
        except socket.gaierror as e:
            if e.args[0] == socket.EAI_ADDRFAMILY:
                # python supports ipv6, but it's not configured on the network
                # interface, so skip this test.
                return
            raise
        url = '%s://[::1]:%d/hello' % (self.get_protocol(), port)

        # ipv6 is currently enabled by default but can be disabled
        self.http_client.fetch(url, self.stop, allow_ipv6=False)
        response = self.wait()
        self.assertEqual(response.code, 599)

        self.http_client.fetch(url, self.stop)
        response = self.wait()
        self.assertEqual(response.body, b"Hello world!")
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, stream, address, protocol):
        self.address = address
        # Save the socket's address family now so we know how to
        # interpret self.address even after the stream is closed
        # and its socket attribute replaced with None.
        if stream.socket is not None:
            self.address_family = stream.socket.family
        else:
            self.address_family = None
        # In HTTPServerRequest we want an IP, not a full socket address.
        if (self.address_family in (socket.AF_INET, socket.AF_INET6) and
                address is not None):
            self.remote_ip = address[0]
        else:
            # Unix (or other) socket; fake the remote address.
            self.remote_ip = '0.0.0.0'
        if protocol:
            self.protocol = protocol
        elif isinstance(stream, iostream.SSLIOStream):
            self.protocol = "https"
        else:
            self.protocol = "http"
        self._orig_remote_ip = self.remote_ip
        self._orig_protocol = self.protocol
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_ipv6(self):
        try:
            [sock] = bind_sockets(None, '::1', family=socket.AF_INET6)
            port = sock.getsockname()[1]
            self.http_server.add_socket(sock)
        except socket.gaierror as e:
            if e.args[0] == socket.EAI_ADDRFAMILY:
                # python supports ipv6, but it's not configured on the network
                # interface, so skip this test.
                return
            raise
        url = '%s://[::1]:%d/hello' % (self.get_protocol(), port)

        # ipv6 is currently enabled by default but can be disabled
        self.http_client.fetch(url, self.stop, allow_ipv6=False)
        response = self.wait()
        self.assertEqual(response.code, 599)

        self.http_client.fetch(url, self.stop)
        response = self.wait()
        self.assertEqual(response.body, b"Hello world!")
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, stream, address, protocol):
        self.address = address
        # Save the socket's address family now so we know how to
        # interpret self.address even after the stream is closed
        # and its socket attribute replaced with None.
        if stream.socket is not None:
            self.address_family = stream.socket.family
        else:
            self.address_family = None
        # In HTTPServerRequest we want an IP, not a full socket address.
        if (self.address_family in (socket.AF_INET, socket.AF_INET6) and
                address is not None):
            self.remote_ip = address[0]
        else:
            # Unix (or other) socket; fake the remote address.
            self.remote_ip = '0.0.0.0'
        if protocol:
            self.protocol = protocol
        elif isinstance(stream, iostream.SSLIOStream):
            self.protocol = "https"
        else:
            self.protocol = "http"
        self._orig_remote_ip = self.remote_ip
        self._orig_protocol = self.protocol
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_ipv6(self):
        try:
            [sock] = bind_sockets(None, '::1', family=socket.AF_INET6)
            port = sock.getsockname()[1]
            self.http_server.add_socket(sock)
        except socket.gaierror as e:
            if e.args[0] == socket.EAI_ADDRFAMILY:
                # python supports ipv6, but it's not configured on the network
                # interface, so skip this test.
                return
            raise
        url = '%s://[::1]:%d/hello' % (self.get_protocol(), port)

        # ipv6 is currently enabled by default but can be disabled
        self.http_client.fetch(url, self.stop, allow_ipv6=False)
        response = self.wait()
        self.assertEqual(response.code, 599)

        self.http_client.fetch(url, self.stop)
        response = self.wait()
        self.assertEqual(response.body, b"Hello world!")
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, stream, address, protocol):
        self.address = address
        # Save the socket's address family now so we know how to
        # interpret self.address even after the stream is closed
        # and its socket attribute replaced with None.
        if stream.socket is not None:
            self.address_family = stream.socket.family
        else:
            self.address_family = None
        # In HTTPServerRequest we want an IP, not a full socket address.
        if (self.address_family in (socket.AF_INET, socket.AF_INET6) and
                address is not None):
            self.remote_ip = address[0]
        else:
            # Unix (or other) socket; fake the remote address.
            self.remote_ip = '0.0.0.0'
        if protocol:
            self.protocol = protocol
        elif isinstance(stream, iostream.SSLIOStream):
            self.protocol = "https"
        else:
            self.protocol = "http"
        self._orig_remote_ip = self.remote_ip
        self._orig_protocol = self.protocol
项目:my-weather-indicator    作者:atareao    | 项目源码 | 文件源码
def postprocess(self, name, val):
        if name == 'GetSettings':
            if 'ssid' in val.get('802-11-wireless', {}):
                val['802-11-wireless']['ssid'] = fixups.ssid_to_python(val['802-11-wireless']['ssid'])
            for key in val:
                val_ = val[key]
                if 'mac-address' in val_:
                    val_['mac-address'] = fixups.mac_to_python(val_['mac-address'])
                if 'cloned-mac-address' in val_:
                    val_['cloned-mac-address'] = fixups.mac_to_python(val_['cloned-mac-address'])
                if 'bssid' in val_:
                    val_['bssid'] = fixups.mac_to_python(val_['bssid'])
            if 'ipv4' in val:
                val['ipv4']['addresses'] = [fixups.addrconf_to_python(addr, socket.AF_INET) for addr in val['ipv4']['addresses']]
                val['ipv4']['routes'] = [fixups.route_to_python(route, socket.AF_INET) for route in val['ipv4']['routes']]
                val['ipv4']['dns'] = [fixups.addr_to_python(addr, socket.AF_INET) for addr in val['ipv4']['dns']]
            if 'ipv6' in val:
                val['ipv6']['addresses'] = [fixups.addrconf_to_python(addr, socket.AF_INET6) for addr in val['ipv6']['addresses']]
                val['ipv6']['routes'] = [fixups.route_to_python(route, socket.AF_INET6) for route in val['ipv6']['routes']]
                val['ipv6']['dns'] = [fixups.addr_to_python(addr, socket.AF_INET6) for addr in val['ipv6']['dns']]
        return val
项目:sauna    作者:NicolasLM    | 项目源码 | 文件源码
def request(self, check_config):
        domain = check_config.get('domain')
        if check_config.get('ip_version') == 6:
            af = socket.AF_INET6
        elif check_config.get('ip_version') == 4:
            af = socket.AF_INET
        else:
            af = 0

        try:
            result = socket.getaddrinfo(domain, 0, af)
        except Exception as e:
            return Plugin.STATUS_CRIT, '{}'.format(e)

        ips = [ip[4][0] for ip in result]
        return (
            Plugin.STATUS_OK,
            'Domain was resolved with {}'.format(', '.join(ips))
        )
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def select_ip_version(host, port):
    """Returns AF_INET4 or AF_INET6 depending on where to connect to."""
    # disabled due to problems with current ipv6 implementations
    # and various operating systems.  Probably this code also is
    # not supposed to work, but I can't come up with any other
    # ways to implement this.
    # try:
    #     info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
    #                               socket.SOCK_STREAM, 0,
    #                               socket.AI_PASSIVE)
    #     if info:
    #         return info[0][0]
    # except socket.gaierror:
    #     pass
    if ':' in host and hasattr(socket, 'AF_INET6'):
        return socket.AF_INET6
    return socket.AF_INET
项目:CyberScan    作者:medbenali    | 项目源码 | 文件源码
def neighsol(addr, src, iface, timeout=1, chainCC=0):
    """
    Sends an ICMPv6 Neighbor Solicitation message to get the MAC address
    of the neighbor with specified IPv6 address addr. 'src' address is 
    used as source of the message. Message is sent on iface. By default,
    timeout waiting for an answer is 1 second.

    If no answer is gathered, None is returned. Else, the answer is 
    returned (ethernet frame).
    """

    nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr))
    d = inet_ntop(socket.AF_INET6, nsma)
    dm = in6_getnsmac(nsma)
    p = Ether(dst=dm)/IPv6(dst=d, src=src, hlim=255)
    p /= ICMPv6ND_NS(tgt=addr)
    p /= ICMPv6NDOptSrcLLAddr(lladdr=get_if_hwaddr(iface))
    res = srp1(p,type=ETH_P_IPV6, iface=iface, timeout=1, verbose=0, 
               chainCC=chainCC)    

    return res
项目:CyberScan    作者:medbenali    | 项目源码 | 文件源码
def getfield(self, pkt, s):
        c = l = None
        if self.length_from is not None:
            l = self.length_from(pkt)
        elif self.count_from is not None:
            c = self.count_from(pkt)

        lst = []
        ret = ""
        remain = s
        if l is not None:
            remain,ret = s[:l],s[l:]
        while remain:
            if c is not None:
                if c <= 0:
                    break
                c -= 1
            addr = inet_ntop(socket.AF_INET6, remain[:16])
            lst.append(addr)
            remain = remain[16:]
        return remain+ret,lst
项目:CyberScan    作者:medbenali    | 项目源码 | 文件源码
def i2m(self, pkt, x):
        l = pkt.len

        if x is None:
            x = "::"
            if l is None:
                l = 1
        x = inet_pton(socket.AF_INET6, x)

        if l is None:
            return x
        if l in [0, 1]:
            return ""
        if l in [2, 3]:
            return x[:8*(l-1)]

        return x + '\x00'*8*(l-3)
项目:CyberScan    作者:medbenali    | 项目源码 | 文件源码
def h2i(self, pkt, x):
        if x is tuple and type(x[0]) is int:
            return x

        val = None
        try: # Try IPv6
            inet_pton(socket.AF_INET6, x)
            val = (0, x)
        except:
            try: # Try IPv4
                inet_pton(socket.AF_INET, x)
                val = (2, x)
            except: # Try DNS
                if x is None:
                    x = ""
                x = names2dnsrepr(x)
                val = (1, x)
        return val
项目:CyberScan    作者:medbenali    | 项目源码 | 文件源码
def _resolve_one(self, ip):
        """
        overloaded version to provide a Whois resolution on the
        embedded IPv4 address if the address is 6to4 or Teredo. 
        Otherwise, the native IPv6 address is passed.
        """

        if in6_isaddr6to4(ip): # for 6to4, use embedded @
            tmp = inet_pton(socket.AF_INET6, ip)
            addr = inet_ntop(socket.AF_INET, tmp[2:6])
        elif in6_isaddrTeredo(ip): # for Teredo, use mapped address
            addr = teredoAddrExtractInfo(ip)[2]
        else:
            addr = ip

        _, asn, desc = AS_resolver_riswhois._resolve_one(self, addr)

        return ip,asn,desc
项目:CyberScan    作者:medbenali    | 项目源码 | 文件源码
def ifchange(self, iff, addr):
        the_addr, the_plen = (addr.split("/")+["128"])[:2]
        the_plen = int(the_plen)

        naddr = inet_pton(socket.AF_INET6, the_addr)
        nmask = in6_cidr2mask(the_plen)
        the_net = inet_ntop(socket.AF_INET6, in6_and(nmask,naddr))

        for i in range(len(self.routes)):
            net,plen,gw,iface,addr = self.routes[i]
            if iface != iff:
                continue
            if gw == '::':
                self.routes[i] = (the_net,the_plen,gw,iface,the_addr)
            else:
                self.routes[i] = (net,the_plen,gw,iface,the_addr)
        self.invalidate_cache()
        ip6_neigh_cache.flush()
项目:CyberScan    作者:medbenali    | 项目源码 | 文件源码
def ifadd(self, iff, addr):
        """
        Add an interface 'iff' with provided address into routing table.

        Ex: ifadd('eth0', '2001:bd8:cafe:1::1/64') will add following entry into 
            Scapy6 internal routing table:

            Destination           Next Hop  iface  Def src @
            2001:bd8:cafe:1::/64  ::        eth0   2001:bd8:cafe:1::1

            prefix length value can be omitted. In that case, a value of 128
            will be used.
        """
        addr, plen = (addr.split("/")+["128"])[:2]
        addr = in6_ptop(addr)
        plen = int(plen)
        naddr = inet_pton(socket.AF_INET6, addr)
        nmask = in6_cidr2mask(plen)
        prefix = inet_ntop(socket.AF_INET6, in6_and(nmask,naddr))
        self.invalidate_cache()
        self.routes.append((prefix,plen,'::',iff,[addr]))
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def inet_pton(family, text):
    """Convert the textual form of a network address into its binary form.

    @param family: the address family
    @type family: int
    @param text: the textual address
    @type text: string
    @raises NotImplementedError: the address family specified is not
    implemented.
    @rtype: string
    """

    if family == AF_INET:
        return dns.ipv4.inet_aton(text)
    elif family == AF_INET6:
        return dns.ipv6.inet_aton(text)
    else:
        raise NotImplementedError
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def inet_ntop(family, address):
    """Convert the binary form of a network address into its textual form.

    @param family: the address family
    @type family: int
    @param address: the binary address
    @type address: string
    @raises NotImplementedError: the address family specified is not
    implemented.
    @rtype: string
    """
    if family == AF_INET:
        return dns.ipv4.inet_ntoa(address)
    elif family == AF_INET6:
        return dns.ipv6.inet_ntoa(address)
    else:
        raise NotImplementedError
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def af_for_address(text):
    """Determine the address family of a textual-form network address.

    @param text: the textual address
    @type text: string
    @raises ValueError: the address family cannot be determined from the input.
    @rtype: int
    """
    try:
        dns.ipv4.inet_aton(text)
        return AF_INET
    except:
        try:
            dns.ipv6.inet_aton(text)
            return AF_INET6
        except:
            raise ValueError
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _gethostbyaddr(ip):
    try:
        dns.ipv6.inet_aton(ip)
        sockaddr = (ip, 80, 0, 0)
        family = socket.AF_INET6
    except:
        sockaddr = (ip, 80)
        family = socket.AF_INET
    (name, port) = _getnameinfo(sockaddr, socket.NI_NAMEREQD)
    aliases = []
    addresses = []
    tuples = _getaddrinfo(name, 0, family, socket.SOCK_STREAM, socket.SOL_TCP,
                          socket.AI_CANONNAME)
    canonical = tuples[0][3]
    for item in tuples:
        addresses.append(item[4][0])
    # XXX we just ignore aliases
    return (canonical, aliases, addresses)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testNToP(self):
        from twisted.python.compat import inet_ntop

        f = lambda a: inet_ntop(socket.AF_INET6, a)
        g = lambda a: inet_ntop(socket.AF_INET, a)

        self.assertEquals('::', f('\x00' * 16))
        self.assertEquals('::1', f('\x00' * 15 + '\x01'))
        self.assertEquals(
            'aef:b01:506:1001:ffff:9997:55:170',
            f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70'))

        self.assertEquals('1.0.1.0', g('\x01\x00\x01\x00'))
        self.assertEquals('170.85.170.85', g('\xaa\x55\xaa\x55'))
        self.assertEquals('255.255.255.255', g('\xff\xff\xff\xff'))

        self.assertEquals('100::', f('\x01' + '\x00' * 15))
        self.assertEquals('100::1', f('\x01' + '\x00' * 14 + '\x01'))
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def m2i(self, pkt, s):
        family = None
        if pkt.type == 1: # A
            family = socket.AF_INET
        elif pkt.type == 12: # PTR
            s = DNSgetstr(s, 0)[0]
        elif pkt.type == 16: # TXT
            ret_s = ""
            tmp_s = s
            # RDATA contains a list of strings, each are prepended with
            # a byte containing the size of the following string.
            while tmp_s:
                tmp_len = struct.unpack("!B", tmp_s[0])[0] + 1
                if tmp_len > len(tmp_s):
                  warning("DNS RR TXT prematured end of character-string (size=%i, remaining bytes=%i)" % (tmp_len, len(tmp_s)))
                ret_s += tmp_s[1:tmp_len]
                tmp_s = tmp_s[tmp_len:]
            s = ret_s
        elif pkt.type == 28: # AAAA
            family = socket.AF_INET6
        if family is not None:    
            s = inet_ntop(family, s)
        return s
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def i2m(self, pkt, s):
        if pkt.type == 1: # A
            if s:
                s = inet_aton(s)
        elif pkt.type in [2,3,4,5]: # NS, MD, MF, CNAME
            s = "".join(map(lambda x: chr(len(x))+x, s.split(".")))
            if ord(s[-1]):
                s += "\x00"
        elif pkt.type == 16: # TXT
            if s:
                ret_s = ""
                # The initial string must be splitted into a list of strings
                # prepended with theirs sizes.
                while len(s) >= 255:
                    ret_s += "\xff" + s[:255]
                    s = s[255:]
                # The remaining string is less than 255 bytes long    
                if len(s):
                    ret_s += struct.pack("!B", len(s)) + s
                s = ret_s
        elif pkt.type == 28: # AAAA
            if s:
                s = inet_pton(socket.AF_INET6, s)
        return s
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def neighsol(addr, src, iface, timeout=1, chainCC=0):
    """
    Sends an ICMPv6 Neighbor Solicitation message to get the MAC address
    of the neighbor with specified IPv6 address addr. 'src' address is 
    used as source of the message. Message is sent on iface. By default,
    timeout waiting for an answer is 1 second.

    If no answer is gathered, None is returned. Else, the answer is 
    returned (ethernet frame).
    """

    nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr))
    d = inet_ntop(socket.AF_INET6, nsma)
    dm = in6_getnsmac(nsma)
    p = Ether(dst=dm)/IPv6(dst=d, src=src, hlim=255)
    p /= ICMPv6ND_NS(tgt=addr)
    p /= ICMPv6NDOptSrcLLAddr(lladdr=get_if_hwaddr(iface))
    res = srp1(p,type=ETH_P_IPV6, iface=iface, timeout=1, verbose=0, 
               chainCC=chainCC)    

    return res
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def getfield(self, pkt, s):
        c = l = None
        if self.length_from is not None:
            l = self.length_from(pkt)
        elif self.count_from is not None:
            c = self.count_from(pkt)

        lst = []
        ret = ""
        remain = s
        if l is not None:
            remain,ret = s[:l],s[l:]
        while remain:
            if c is not None:
                if c <= 0:
                    break
                c -= 1
            addr = inet_ntop(socket.AF_INET6, remain[:16])
            lst.append(addr)
            remain = remain[16:]
        return remain+ret,lst
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def i2m(self, pkt, x):
        l = pkt.len

        if x is None:
            x = "::"
            if l is None:
                l = 1
        x = inet_pton(socket.AF_INET6, x)

        if l is None:
            return x
        if l in [0, 1]:
            return ""
        if l in [2, 3]:
            return x[:8*(l-1)]

        return x + '\x00'*8*(l-3)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def h2i(self, pkt, x):
        if x is tuple and type(x[0]) is int:
            return x

        val = None
        try: # Try IPv6
            inet_pton(socket.AF_INET6, x)
            val = (0, x)
        except:
            try: # Try IPv4
                inet_pton(socket.AF_INET, x)
                val = (2, x)
            except: # Try DNS
                if x is None:
                    x = ""
                x = names2dnsrepr(x)
                val = (1, x)
        return val
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _resolve_one(self, ip):
        """
        overloaded version to provide a Whois resolution on the
        embedded IPv4 address if the address is 6to4 or Teredo. 
        Otherwise, the native IPv6 address is passed.
        """

        if in6_isaddr6to4(ip): # for 6to4, use embedded @
            tmp = inet_pton(socket.AF_INET6, ip)
            addr = inet_ntop(socket.AF_INET, tmp[2:6])
        elif in6_isaddrTeredo(ip): # for Teredo, use mapped address
            addr = teredoAddrExtractInfo(ip)[2]
        else:
            addr = ip

        _, asn, desc = AS_resolver_riswhois._resolve_one(self, addr)

        return ip,asn,desc
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def hashret(self): # we filter on peer address field
        return inet_pton(socket.AF_INET6, self.peeraddr)

#####################################################################
# sent between Relay Agents and Servers 
# Normalement, doit inclure une option "Relay Message Option"
# peut en inclure d'autres.
# Les valeurs des champs hop-count, link-addr et peer-addr
# sont copiees du messsage Forward associe. POur le suivi de session.
# Pour le moment, comme decrit dans le commentaire, le hashret
# se limite au contenu du champ peer address.
# Voir section 7.2 de la 3315.

# Relay-Reply Message
# - sent by servers to relay agents
# - if the solicit message was received in a Relay-Forward message,
# the server constructs a relay-reply message with the Advertise
# message in the payload of a relay-message. cf page 37/101. Envoie de
# ce message en unicast au relay-agent. utilisation de l'adresse ip
# presente en ip source du paquet recu
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def ifchange(self, iff, addr):
        the_addr, the_plen = (addr.split("/")+["128"])[:2]
        the_plen = int(the_plen)

        naddr = inet_pton(socket.AF_INET6, the_addr)
        nmask = in6_cidr2mask(the_plen)
        the_net = inet_ntop(socket.AF_INET6, in6_and(nmask,naddr))

        for i in range(len(self.routes)):
            net,plen,gw,iface,addr = self.routes[i]
            if iface != iff:
                continue
            if gw == '::':
                self.routes[i] = (the_net,the_plen,gw,iface,the_addr)
            else:
                self.routes[i] = (net,the_plen,gw,iface,the_addr)
        self.invalidate_cache()
        ip6_neigh_cache.flush()
项目:llk    作者:Tycx2ry    | 项目源码 | 文件源码
def inet_pton(family, text):
    """Convert the textual form of a network address into its binary form.

    @param family: the address family
    @type family: int
    @param text: the textual address
    @type text: string
    @raises NotImplementedError: the address family specified is not
    implemented.
    @rtype: string
    """

    if family == AF_INET:
        return dns.ipv4.inet_aton(text)
    elif family == AF_INET6:
        return dns.ipv6.inet_aton(text)
    else:
        raise NotImplementedError
项目:llk    作者:Tycx2ry    | 项目源码 | 文件源码
def inet_ntop(family, address):
    """Convert the binary form of a network address into its textual form.

    @param family: the address family
    @type family: int
    @param address: the binary address
    @type address: string
    @raises NotImplementedError: the address family specified is not
    implemented.
    @rtype: string
    """
    if family == AF_INET:
        return dns.ipv4.inet_ntoa(address)
    elif family == AF_INET6:
        return dns.ipv6.inet_ntoa(address)
    else:
        raise NotImplementedError
项目:llk    作者:Tycx2ry    | 项目源码 | 文件源码
def af_for_address(text):
    """Determine the address family of a textual-form network address.

    @param text: the textual address
    @type text: string
    @raises ValueError: the address family cannot be determined from the input.
    @rtype: int
    """
    try:
        junk = dns.ipv4.inet_aton(text)
        return AF_INET
    except:
        try:
            junk = dns.ipv6.inet_aton(text)
            return AF_INET6
        except:
            raise ValueError
项目:llk    作者:Tycx2ry    | 项目源码 | 文件源码
def _gethostbyaddr(ip):
    try:
        addr = dns.ipv6.inet_aton(ip)
        sockaddr = (ip, 80, 0, 0)
        family = socket.AF_INET6
    except:
        sockaddr = (ip, 80)
        family = socket.AF_INET
    (name, port) = _getnameinfo(sockaddr, socket.NI_NAMEREQD)
    aliases = []
    addresses = []
    tuples = _getaddrinfo(name, 0, family, socket.SOCK_STREAM, socket.SOL_TCP,
                          socket.AI_CANONNAME)
    canonical = tuples[0][3]
    for item in tuples:
        addresses.append(item[4][0])
    # XXX we just ignore aliases
    return (canonical, aliases, addresses)
项目:trip    作者:littlecodersh    | 项目源码 | 文件源码
def _read_socks5_address(self):
        atyp = yield self.read_bytes(1)
        if atyp == b"\x01":
            data = yield self.read_bytes(4)
            addr = socket.inet_ntoa(data)
        elif atyp == b"\x03":
            length = yield self.read_bytes(1)
            addr = yield self.read_bytes(length)
        elif atyp == b"\x04":
            data = yield self.read_bytes(16)
            addr = socket.inet_ntop(socket.AF_INET6, data)
        else:
            raise GeneralProxyError("SOCKS5 proxy server sent invalid data")

        data = yield self.read_bytes(2)
        port = struct.unpack(">H", data)[0]
        raise gen.Return((addr, port))
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def select_ip_version(host, port):
    """Returns AF_INET4 or AF_INET6 depending on where to connect to."""
    # disabled due to problems with current ipv6 implementations
    # and various operating systems.  Probably this code also is
    # not supposed to work, but I can't come up with any other
    # ways to implement this.
    # try:
    #     info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
    #                               socket.SOCK_STREAM, 0,
    #                               socket.AI_PASSIVE)
    #     if info:
    #         return info[0][0]
    # except socket.gaierror:
    #     pass
    if ':' in host and hasattr(socket, 'AF_INET6'):
        return socket.AF_INET6
    return socket.AF_INET
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def select_ip_version(host, port):
    """Returns AF_INET4 or AF_INET6 depending on where to connect to."""
    # disabled due to problems with current ipv6 implementations
    # and various operating systems.  Probably this code also is
    # not supposed to work, but I can't come up with any other
    # ways to implement this.
    # try:
    #     info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
    #                               socket.SOCK_STREAM, 0,
    #                               socket.AI_PASSIVE)
    #     if info:
    #         return info[0][0]
    # except socket.gaierror:
    #     pass
    if ':' in host and hasattr(socket, 'AF_INET6'):
        return socket.AF_INET6
    return socket.AF_INET
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_sock_connect_address(self):
        # In debug mode, sock_connect() must ensure that the address is already
        # resolved (call _check_resolved_address())
        self.loop.set_debug(True)

        addresses = [(socket.AF_INET, ('www.python.org', 80))]
        if support.IPV6_ENABLED:
            addresses.extend((
                (socket.AF_INET6, ('www.python.org', 80)),
                (socket.AF_INET6, ('www.python.org', 80, 0, 0)),
            ))

        for family, address in addresses:
            for sock_type in (socket.SOCK_STREAM, socket.SOCK_DGRAM):
                sock = socket.socket(family, sock_type)
                with sock:
                    sock.setblocking(False)
                    connect = self.loop.sock_connect(sock, address)
                    with self.assertRaises(ValueError) as cm:
                        self.loop.run_until_complete(connect)
                    self.assertIn('address must be resolved',
                                  str(cm.exception))
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def select_ip_version(host, port):
    """Returns AF_INET4 or AF_INET6 depending on where to connect to."""
    # disabled due to problems with current ipv6 implementations
    # and various operating systems.  Probably this code also is
    # not supposed to work, but I can't come up with any other
    # ways to implement this.
    # try:
    #     info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
    #                               socket.SOCK_STREAM, 0,
    #                               socket.AI_PASSIVE)
    #     if info:
    #         return info[0][0]
    # except socket.gaierror:
    #     pass
    if ':' in host and hasattr(socket, 'AF_INET6'):
        return socket.AF_INET6
    return socket.AF_INET
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def select_ip_version(host, port):
    """Returns AF_INET4 or AF_INET6 depending on where to connect to."""
    # disabled due to problems with current ipv6 implementations
    # and various operating systems.  Probably this code also is
    # not supposed to work, but I can't come up with any other
    # ways to implement this.
    # try:
    #     info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
    #                               socket.SOCK_STREAM, 0,
    #                               socket.AI_PASSIVE)
    #     if info:
    #         return info[0][0]
    # except socket.gaierror:
    #     pass
    if ':' in host and hasattr(socket, 'AF_INET6'):
        return socket.AF_INET6
    return socket.AF_INET
项目:Fuk    作者:r4gnax    | 项目源码 | 文件源码
def makesock(self):
        if self.config["ipv6"]:
            family = socket.AF_INET6
        else:
            family = socket.AF_INET

        if self.proxy is not None:
            proxy_serv, proxy_port = self.proxy.split(":")
            self.sock = socks.socket(family, socket.SOCK_STREAM)
            self.sock.setblocking(0)
            self.sock.setproxy(socks.PROXY_TYPE_SOCKS5, proxy_serv, proxy_port)
        else:
            self.sock = socket.socket(family, socket.SOCK_STREAM)

        self.sock.settimeout(self.timeout)
        if self.vhost is not None:
            self.sock.bind((self.vhost, 0))
        if self.config["ssl"]:
            self.sock = ssl.wrap_socket(self.sock)