Python socket 模块,getaddrinfo() 实例源码


项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def __init__(self, node, platform='', cpus=0, memory=0, disk=0):
        if node.find('*') < 0:
                info = socket.getaddrinfo(node, None)[0]
                ip_addr = info[4][0]
                if info[0] == socket.AF_INET6:
                    ip_addr = re.sub(r'^0*', '', ip_addr)
                    ip_addr = re.sub(r':0*', ':', ip_addr)
                    ip_addr = re.sub(r'::+', '::', ip_addr)
                node = ip_addr
                node = ''

        if node:
            self.ip_rex = node.replace('.', '\\.').replace('*', '.*')
            logger.warning('node "%s" is invalid', node)
            self.ip_rex = ''
        self.platform = platform.lower()
        self.cpus = cpus
        self.memory = memory
        self.disk = disk
项目:tsproxy    作者:WPO-Foundation    | 项目源码 | 文件源码
def run(self):
    global lock, background_activity_count
      logging.debug('[{0:d}] AsyncDNS - calling getaddrinfo for {1}:{2:d}'.format(self.client_id, self.hostname, self.port))
      addresses = socket.getaddrinfo(self.hostname, self.port)'[{0:d}] Resolving {1}:{2:d} Completed'.format(self.client_id, self.hostname, self.port))
      addresses = ()'[{0:d}] Resolving {1}:{2:d} Failed'.format(self.client_id, self.hostname, self.port))
    message = {'message': 'resolved', 'connection': self.client_id, 'addresses': addresses, 'localhost': self.is_localhost}
    self.result_pipe.SendMessage(message, False)
    if background_activity_count > 0:
      background_activity_count -= 1
    # open and close a local socket which will interrupt the long polling loop to process the message
    s = socket.socket()
    s.connect((server.ipaddr, server.port))

#   TCP Client
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __init__(self, host, port = POP3_SSL_PORT, keyfile = None, certfile = None):
   = host
            self.port = port
            self.keyfile = keyfile
            self.certfile = certfile
            self.buffer = ""
            msg = "getaddrinfo returns an empty list"
            self.sock = None
            for res in socket.getaddrinfo(, self.port, 0, socket.SOCK_STREAM):
                af, socktype, proto, canonname, sa = res
                    self.sock = socket.socket(af, socktype, proto)
                except socket.error, msg:
                    if self.sock:
                    self.sock = None
            if not self.sock:
                raise socket.error, msg
            self.file = self.sock.makefile('rb')
            self.sslobj = ssl.wrap_socket(self.sock, self.keyfile, self.certfile)
            self._debugging = 0
            self.welcome = self._getresp()
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def __init__(self, hosts):
        The `hosts` parameter should be a sequence of hosts to permit
        connections to.
        msg = ('WhiteListRoundRobinPolicy is deprecated. '
               'It will be removed in 4.0. '
               'It can effectively be reimplemented using HostFilterPolicy.')
        warn(msg, DeprecationWarning)
        # DeprecationWarnings are silent by default so we also log the message

        self._allowed_hosts = hosts
        self._allowed_hosts_resolved = [endpoint[4][0] for a in self._allowed_hosts
                                        for endpoint in socket.getaddrinfo(a, None, socket.AF_UNSPEC, socket.SOCK_STREAM)]

项目:Projects    作者:it2school    | 项目源码 | 文件源码
def create_connection(address):
    host, port = address
    err = None
    for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
        af, socktype, proto, _, sa = res
        sock = None
            sock = socket.socket(af, socktype, proto)
            return sock

        except socket.error as e:
            err = e
            if sock is not None:

    if err is not None:
        raise err  # pylint: disable=raising-bad-type
        raise socket.error("getaddrinfo returns an empty list")
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def select_ip_version(host, port):
    """Returns AF_INET4 or AF_INET6 depending on where to connect to."""
    # disabled due to problems with current ipv6 implementations
    # and various operating systems.  Probably this code also is
    # not supposed to work, but I can't come up with any other
    # ways to implement this.
    # try:
    #     info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
    #                               socket.SOCK_STREAM, 0,
    #                               socket.AI_PASSIVE)
    #     if info:
    #         return info[0][0]
    # except socket.gaierror:
    #     pass
    if ':' in host and hasattr(socket, 'AF_INET6'):
        return socket.AF_INET6
    return socket.AF_INET
项目:shadowsocksR-b    作者:hao35954514    | 项目源码 | 文件源码
def _socket_bind_addr(self, sock, af):
        bind_addr = ''
        if self._bind and af == socket.AF_INET:
            bind_addr = self._bind
        elif self._bindv6 and af == socket.AF_INET6:
            bind_addr = self._bindv6
            bind_addr = self._accept_address[0]

        bind_addr = bind_addr.replace("::ffff:", "")
        if bind_addr in self._ignore_bind_list:
            bind_addr = None
        if bind_addr:
            local_addrs = socket.getaddrinfo(bind_addr, 0, 0, socket.SOCK_STREAM, socket.SOL_TCP)
            if local_addrs[0][0] == af:
                logging.debug("bind %s" % (bind_addr,))
                    sock.bind((bind_addr, 0))
                except Exception as e:
                    logging.warn("bind %s fail" % (bind_addr,))
项目:Mk3-Firmware    作者:emfcamp    | 项目源码 | 文件源码
def get_NTP_time():
    NTP_QUERY = bytearray(48)
    NTP_QUERY[0] = 0x1b
    addr = socket.getaddrinfo(NTP_HOST, NTP_PORT)[0][-1]
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.sendto(NTP_QUERY, addr)

    # Setting timeout for receiving data. Because we're using UDP,
    # there's no need for a timeout on send.
    msg = None
        msg = s.recv(48)
    except OSError:

    if msg is None:
        return None

    import struct
    val = struct.unpack("!I", msg[40:44])[0]
    return val - NTP_DELTA
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def is_valid_ip(ip):
    """Returns true if the given string is a well-formed IP address.

    Supports IPv4 and IPv6.
    if not ip or '\x00' in ip:
        # getaddrinfo resolves empty strings to localhost, and truncates
        # on zero bytes.
        return False
        res = socket.getaddrinfo(ip, 0, socket.AF_UNSPEC,
                                 0, socket.AI_NUMERICHOST)
        return bool(res)
    except socket.gaierror as e:
        if e.args[0] == socket.EAI_NONAME:
            return False
    return True
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
项目:sauna    作者:NicolasLM    | 项目源码 | 文件源码
def _get_receivers_addresses(self):
        """Retrieve all the addresses associated with a hostname.

        It will return in priority the address of the last known receiver which
        accepted the previous check.
        receivers = socket.getaddrinfo(
            self.config['server'], self.config['port'],
        # Only keep the actual address
        addresses = [r[4][0] for r in receivers]
            addresses = [self._last_good_receiver_address] + addresses
        except ValueError:
        return addresses
项目:sauna    作者:NicolasLM    | 项目源码 | 文件源码
def request(self, check_config):
        domain = check_config.get('domain')
        if check_config.get('ip_version') == 6:
            af = socket.AF_INET6
        elif check_config.get('ip_version') == 4:
            af = socket.AF_INET
            af = 0

            result = socket.getaddrinfo(domain, 0, af)
        except Exception as e:
            return Plugin.STATUS_CRIT, '{}'.format(e)

        ips = [ip[4][0] for ip in result]
        return (
            'Domain was resolved with {}'.format(', '.join(ips))
项目:zanph    作者:zanph    | 项目源码 | 文件源码
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
项目:shadowsocksr    作者:shadowsocksr-backup    | 项目源码 | 文件源码
def _create_remote_socket(self, ip, port):
        addrs = socket.getaddrinfo(ip, port, 0, socket.SOCK_STREAM, socket.SOL_TCP)
        if len(addrs) == 0:
            raise Exception("getaddrinfo failed for %s:%d" % (ip, port))
        af, socktype, proto, canonname, sa = addrs[0]
        if self._forbidden_iplist:
            if common.to_str(sa[0]) in self._forbidden_iplist:
                raise Exception('IP %s is in forbidden list, reject' %
        remote_sock = socket.socket(af, socktype, proto)
        self._remote_sock = remote_sock

        self._fd_to_handlers[remote_sock.fileno()] = self

        remote_sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
        return remote_sock
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_create_connection_timeout(self, m_socket):
        # Ensure that the socket is closed on timeout
        sock = mock.Mock()
        m_socket.socket.return_value = sock

        def getaddrinfo(*args, **kw):
            fut = asyncio.Future(loop=self.loop)
            addr = (socket.AF_INET, socket.SOCK_STREAM, 0, '',
                    ('', 80))
            return fut
        self.loop.getaddrinfo = getaddrinfo

        with mock.patch.object(self.loop, 'sock_connect',
            coro = self.loop.create_connection(MyProto, '', 80)
            with self.assertRaises(asyncio.TimeoutError):
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_create_connection_connect_err(self):
        def getaddrinfo(*args, **kw):
            yield from []
            return [(2, 1, 6, '', ('', 80))]

        def getaddrinfo_task(*args, **kwds):
            return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)

        self.loop.getaddrinfo = getaddrinfo_task
        self.loop.sock_connect = mock.Mock()
        self.loop.sock_connect.side_effect = OSError

        coro = self.loop.create_connection(MyProto, '', 80)
            OSError, self.loop.run_until_complete, coro)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_create_connection_multiple(self):
        def getaddrinfo(*args, **kw):
            return [(2, 1, 6, '', ('', 80)),
                    (2, 1, 6, '', ('', 80))]

        def getaddrinfo_task(*args, **kwds):
            return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)

        self.loop.getaddrinfo = getaddrinfo_task
        self.loop.sock_connect = mock.Mock()
        self.loop.sock_connect.side_effect = OSError

        coro = self.loop.create_connection(
            MyProto, '', 80, family=socket.AF_INET)
        with self.assertRaises(OSError):
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_create_connection_no_local_addr(self):
        def getaddrinfo(host, *args, **kw):
            if host == '':
                return [(2, 1, 6, '', ('', 80)),
                        (2, 1, 6, '', ('', 80))]
                return []

        def getaddrinfo_task(*args, **kwds):
            return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)
        self.loop.getaddrinfo = getaddrinfo_task

        coro = self.loop.create_connection(
            MyProto, '', 80, family=socket.AF_INET,
            local_addr=(None, 8080))
            OSError, self.loop.run_until_complete, coro)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_create_server_empty_host(self):
        # if host is empty string use None instead
        host = object()

        def getaddrinfo(*args, **kw):
            nonlocal host
            host = args[0]
            yield from []

        def getaddrinfo_task(*args, **kwds):
            return asyncio.Task(getaddrinfo(*args, **kwds), loop=self.loop)

        self.loop.getaddrinfo = getaddrinfo_task
        fut = self.loop.create_server(MyProto, '', 0)
        self.assertRaises(OSError, self.loop.run_until_complete, fut)
项目:Qyoutube-dl    作者:lzambella    | 项目源码 | 文件源码
def compat_socket_create_connection(address, timeout, source_address=None):
        host, port = address
        err = None
        for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
            af, socktype, proto, canonname, sa = res
            sock = None
                sock = socket.socket(af, socktype, proto)
                if source_address:
                return sock
            except socket.error as _:
                err = _
                if sock is not None:
        if err is not None:
            raise err
            raise socket.error("getaddrinfo returns an empty list")
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
项目:ShadowSocks    作者:immqy    | 项目源码 | 文件源码
def _create_remote_socket(self, ip, port):
        addrs = socket.getaddrinfo(ip, port, 0, socket.SOCK_STREAM,
        if len(addrs) == 0:
            raise Exception("getaddrinfo failed for %s:%d" % (ip, port))
        af, socktype, proto, canonname, sa = addrs[0]
        if self._forbidden_iplist:
            if common.to_str(sa[0]) in self._forbidden_iplist:
                raise Exception('IP %s is in forbidden list, reject' %
        remote_sock = socket.socket(af, socktype, proto)
        self._remote_sock = remote_sock
        self._fd_to_handlers[remote_sock.fileno()] = self
        remote_sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
        return remote_sock
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def _load_from_socket(port, serializer):
    sock = None
    # Support for both IPv4 and IPv6.
    # On most of IPv6-ready systems, IPv6 will take precedence.
    for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
        af, socktype, proto, canonname, sa = res
        sock = socket.socket(af, socktype, proto)
        except socket.error:
            sock = None
    if not sock:
        raise Exception("could not open socket")
        rf = sock.makefile("rb", 65536)
        for item in serializer.load_stream(rf):
            yield item
项目:spiderfoot    作者:wi-fi-analyzer    | 项目源码 | 文件源码
def resolveHost6(self, hostname):
        if hostname in self.resolveCache6:
            self.sf.debug("Returning IPv6 cached result for " + hostname + " (" +
                          str(self.resolveCache6[hostname]) + ")")
            return self.resolveCache6[hostname]

            addrs = list()
            res = socket.getaddrinfo(hostname, None, socket.AF_INET6)
            for addr in res:
                if addr[4][0] not in addrs:
            if len(addrs) < 1:
                return None
            self.resolveCache6[hostname] = addrs
            self.sf.debug("Resolved " + hostname + " to IPv6: " + str(addrs))
            return addrs
        except BaseException as e:
            self.sf.debug("Unable to IPv6 resolve " + hostname + " (" + str(e) + ")")
            return list()

    # Process a host/IP, parentEvent is the event that represents this entity
项目:spiderfoot    作者:wi-fi-analyzer    | 项目源码 | 文件源码
def override_system_resolver(resolver=None):
    """Override the system resolver routines in the socket module with
    versions which use dnspython's resolver.

    This can be useful in testing situations where you want to control
    the resolution behavior of python code without having to change
    the system's resolver settings (e.g. /etc/resolv.conf).

    The resolver to use may be specified; if it's not, the default
    resolver will be used.

    @param resolver: the resolver to use
    @type resolver: dns.resolver.Resolver object or None
    if resolver is None:
        resolver = get_default_resolver()
    global _resolver
    _resolver = resolver
    socket.getaddrinfo = _getaddrinfo
    socket.getnameinfo = _getnameinfo
    socket.getfqdn = _getfqdn
    socket.gethostbyname = _gethostbyname
    socket.gethostbyname_ex = _gethostbyname_ex
    socket.gethostbyaddr = _gethostbyaddr
项目:CVE-2016-6366    作者:RiskSense-Ops    | 项目源码 | 文件源码
def create(self):
            msg = "getaddrinfo returns an empty list"
            for res in socket.getaddrinfo(self.target_ip, self.target_port, 0, socket.SOCK_DGRAM):
                af, socktype, proto, canonname, sa = res
                    sock = socket.socket(af, socktype, proto)
                except socket.error, msg:
                    if sock:
                        sock = None
                if not sock:
                    raise socket.error, msg
        except socket.error:
            raise RuntimeError,'[+] Cannot connect to %s:%d\n[+] port might not be up' % (self.target_ip, self.target_port)
        return sock
项目:arithmancer    作者:google    | 项目源码 | 文件源码
项目:PiBunny    作者:tholum    | 项目源码 | 文件源码
def _setup_connection(self, dstaddr, timeout=None):
        port = randint(10000, 60000)
        af, socktype, proto, _canonname, _sa = socket.getaddrinfo(dstaddr, port, socket.AF_INET, socket.SOCK_DGRAM)[0]
        s = socket.socket(af, socktype, proto)
        has_bind = 1
        for _i in range(0, 10):
            # We try to bind to a port for 10 tries
                s.bind((INADDR_ANY, randint(10000, 60000)))
                s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
                has_bind = 1
            except socket.error:
        if not has_bind:
            raise NetBIOSError, ('Cannot bind to a good UDP port', ERRCLASS_OS, errno.EAGAIN)
        self.__sock = s
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def __init__(self, host, tcp_port):
        if re.match(r'^\d+[\.\d]+$', host) or re.match(r'^[0-9a-fA-F:]+$', host):
            self.addr = host
            self.addr = socket.getaddrinfo(host, 0, 0, socket.SOCK_STREAM)[0][4][0]
        self.port = int(tcp_port)
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def __init__(self, host, tcp_port):
        if re.match(r'^\d+[\.\d]+$', host) or re.match(r'^[0-9a-fA-F:]+$', host):
            self.addr = host
            self.addr = socket.getaddrinfo(host, 0, 0, socket.SOCK_STREAM)[0][4][0]
        self.port = int(tcp_port)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def allowed_gai_family():
    """This function is designed to work in the context of
    getaddrinfo, where family=socket.AF_UNSPEC is the default and
    will perform a DNS search for both IPv6 and IPv4 records."""

    family = socket.AF_INET
    if HAS_IPV6:
        family = socket.AF_UNSPEC
    return family
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def allowed_gai_family():
    """This function is designed to work in the context of
    getaddrinfo, where family=socket.AF_UNSPEC is the default and
    will perform a DNS search for both IPv6 and IPv4 records."""

    family = socket.AF_INET
    if HAS_IPV6:
        family = socket.AF_UNSPEC
    return family
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def allowed_gai_family():
    """This function is designed to work in the context of
    getaddrinfo, where family=socket.AF_UNSPEC is the default and
    will perform a DNS search for both IPv6 and IPv4 records."""

    family = socket.AF_INET
    if HAS_IPV6:
        family = socket.AF_UNSPEC
    return family
项目:gimel    作者:Alephbet    | 项目源码 | 文件源码
def _connect(self):
        "Create a TCP socket connection"
        # we want to mimic what socket.create_connection does to support
        # ipv4/ipv6, but we want to set options prior to calling
        # socket.connect()
        err = None
        for res in socket.getaddrinfo(, self.port, 0,
            family, socktype, proto, canonname, socket_address = res
            sock = None
                sock = socket.socket(family, socktype, proto)
                # TCP_NODELAY
                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

                # TCP_KEEPALIVE
                if self.socket_keepalive:
                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
                    for k, v in iteritems(self.socket_keepalive_options):
                        sock.setsockopt(socket.SOL_TCP, k, v)

                # set the socket_connect_timeout before we connect

                # connect

                # set the socket_timeout now that we're connected
                return sock

            except socket.error as _:
                err = _
                if sock is not None:

        if err is not None:
            raise err
        raise socket.error("socket.getaddrinfo returned an empty list")
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def makeport(self):
        '''Create a new socket and send a PORT command for it.'''
        msg = "getaddrinfo returns an empty list"
        sock = None
        for res in socket.getaddrinfo(None, 0,, socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
            af, socktype, proto, canonname, sa = res
                sock = socket.socket(af, socktype, proto)
            except socket.error, msg:
                if sock:
                sock = None
        if not sock:
            raise socket.error, msg
        port = sock.getsockname()[1] # Get proper port
        host = self.sock.getsockname()[0] # Get proper host
        if == socket.AF_INET:
            resp = self.sendport(host, port)
            resp = self.sendeprt(host, port)
        if self.timeout is not _GLOBAL_DEFAULT_TIMEOUT:
        return sock
项目:LFISuite    作者:D35m0nd142    | 项目源码 | 文件源码
def _write_SOCKS5_address(self, addr, file):
        Return the host and port packed for the SOCKS5 protocol,
        and the resolved address as a tuple object.
        host, port = addr
        proxy_type, _, _, rdns, username, password = self.proxy
        family_to_byte = {socket.AF_INET: b"\x01", socket.AF_INET6: b"\x04"}

        # If the given destination address is an IP address, we'll
        # use the IP address request even if remote resolving was specified.
        # Detect whether the address is IPv4/6 directly.
        for family in (socket.AF_INET, socket.AF_INET6):
                addr_bytes = socket.inet_pton(family, host)
                file.write(family_to_byte[family] + addr_bytes)
                host = socket.inet_ntop(family, addr_bytes)
                file.write(struct.pack(">H", port))
                return host, port
            except socket.error:

        # Well it's not an IP number, so it's probably a DNS name.
        if rdns:
            # Resolve remotely
            host_bytes = host.encode("idna")
            file.write(b"\x03" + chr(len(host_bytes)).encode() + host_bytes)
            # Resolve locally
            addresses = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_ADDRCONFIG)
            # We can't really work out what IP is reachable, so just pick the
            # first.
            target_addr = addresses[0]
            family = target_addr[0]
            host = target_addr[4][0]

            addr_bytes = socket.inet_pton(family, host)
            file.write(family_to_byte[family] + addr_bytes)
            host = socket.inet_ntop(family, addr_bytes)
        file.write(struct.pack(">H", port))
        return host, port
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def __dns_resolve_host(host, ip_version, timeout):
    Resolve a host using the system's facilities
    family = socket.AF_INET if ip_version == 4 else socket.AF_INET6

    def proc(host, family, queue):
            queue.put(socket.getaddrinfo(host, 0, family))
        except socket.gaierror as ex:
            # TODO: Would be nice if we could isolate just the not
            # found error.
        except socket.timeout:
            # Don't care, we just want the queue to be empty if
            # there's an error.

    queue = Queue.Queue()
    thread = threading.Thread(target=proc, args=(host, family, queue))
        results = queue.get(True, timeout)
        if len(results) == 0:
            return None
        family, socktype, proto, canonname, sockaddr = results[0]
    except Queue.Empty:
        return None

    # NOTE: Don't make any attempt to kill the thread, as it will get
    # Python all confused if it holds the GIL.

    (ip) = sockaddr
    return str(ip[0])
项目:deb-python-pyngus    作者:openstack    | 项目源码 | 文件源码
def connect_socket(host, port, blocking=True):
    """Create a TCP connection to the server."""
    addr = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM)
    if not addr:
        raise Exception("Could not translate address '%s:%s'"
                        % (host, str(port)))
    my_socket = socket.socket(addr[0][0], addr[0][1], addr[0][2])
    if not blocking:
    except socket.error as e:
        if e.errno != errno.EINPROGRESS:
    return my_socket
项目:deb-python-pyngus    作者:openstack    | 项目源码 | 文件源码
def server_socket(host, port, backlog=10):
    """Create a TCP listening socket for a server."""
    addr = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM)
    if not addr:
        raise Exception("Could not translate address '%s:%s'"
                        % (host, str(port)))
    my_socket = socket.socket(addr[0][0], addr[0][1], addr[0][2])
    my_socket.setblocking(0)  # 0=non-blocking
    except socket.error as e:
        if e.errno != errno.EINPROGRESS:
    return my_socket
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_error_multiple(self):
        if len(socket.getaddrinfo('localhost', 9043, socket.AF_UNSPEC, socket.SOCK_STREAM)) < 2:
            raise unittest.SkipTest('localhost only resolves one address')
        cluster = Cluster(connection_class=self.connection_class, contact_points=['localhost'], port=9043,
                          connect_timeout=10, protocol_version=PROTOCOL_VERSION)
        self.assertRaisesRegexp(NoHostAvailable, '\(\'Unable to connect.*Tried connecting to \[\(.*\(.*\].*Last error',
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def _connect_socket(self):
        sockerr = None
        addresses = socket.getaddrinfo(, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM)
        if not addresses:
            raise ConnectionException("getaddrinfo returned empty list for %s" % (,))
        for (af, socktype, proto, canonname, sockaddr) in addresses:
                self._socket = self._socket_impl.socket(af, socktype, proto)
                if self.ssl_options:
                    if not self._ssl_impl:
                        raise RuntimeError("This version of Python was not compiled with SSL support")
                    self._socket = self._ssl_impl.wrap_socket(self._socket, **self.ssl_options)
                if self._check_hostname:
                sockerr = None
            except socket.error as err:
                if self._socket:
                    self._socket = None
                sockerr = err

        if sockerr:
            raise socket.error(sockerr.errno, "Tried connecting to %s. Last error: %s" % ([a[4] for a in addresses], sockerr.strerror or sockerr))

        if self.sockopts:
            for args in self.sockopts:
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def translate(self, addr):
        Reverse DNS the public broadcast_address, then lookup that hostname to get the AWS-resolved IP, which
        will point to the private IP address within the same datacenter.
        # get family of this address so we translate to the same
        family = socket.getaddrinfo(addr, 0, socket.AF_UNSPEC, socket.SOCK_STREAM)[0][0]
        host = socket.getfqdn(addr)
        for a in socket.getaddrinfo(host, 0, family, socket.SOCK_STREAM):
                return a[4][0]
            except Exception:
        return addr