Python socket 模块,herror() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.herror()

项目:SupercomputerInABriefcase    作者:SupercomputerInABriefcase    | 项目源码 | 文件源码
def discover(data: ConnectionData) -> None:
    assert isinstance(data, ConnectionData)
    ip_net, iface = data
    try:
        ans, unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=ip_net), iface=iface, timeout=2, verbose=False)
        for s, r in ans:
            line = r.sprintf("%Ether.src%  %ARP.psrc%")
            try:
                hostname = socket.gethostbyaddr(r.psrc)
                line += '  ' + hostname[0]
            except socket.herror:
                pass
            print(line)
    except PermissionError:
        print('Cannot execute necessary code, did you run as root?')
        sys.exit(1)
    except:
        raise
项目:cerberus-core    作者:ovh    | 项目源码 | 文件源码
def get_ips_from_url(url):
    """
        Retrieve IPs from url

        :param str url: The url to resolve
        :rtype: list
        :return: the list of resolved IP address for given url
    """
    try:
        parsed = urlparse(url)
        if parsed.hostname:
            socket.setdefaulttimeout(5)
            ips = socket.gethostbyname_ex(parsed.hostname)[2]
            return ips
    except (ValueError, socket.error, socket.gaierror, socket.herror, socket.timeout):
        pass
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def run(self):
        value = getword()
        try:
            print "-"*12
            print "User:",user[:-1],"Password:",value
            pop = poplib.POP3(ipaddr[0])
            pop.user(user[:-1])
            pop.pass_(value)
            print "\t\nLogin successful:",value, user
            print pop.stat()
            pop.quit()
            work.join()
            sys.exit(2)
        except(poplib.error_proto, socket.gaierror, socket.error, socket.herror), msg: 
            #print "An error occurred:", msg
            pass
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def sendchk(listindex, host, user, password):   # seperated function for checking
    try:
        smtp = smtplib.SMTP(host)
        smtp.login(user, password)
        code = smtp.ehlo()[0]
        if not (200 <= code <= 299):
            code = smtp.helo()[0]
            if not (200 <= code <= 299):
                raise SMTPHeloError(code, resp)
        smtp.sendmail(fromaddr, toaddr, message)
        print "\n\t[!] Email Sent Successfully:",host, user, password
        print "\t[!] Message Sent Successfully\n"
        LSstring = host+":"+user+":"+password+"\n"
        nList.append(LSstring)      # special list for AMS file ID's
        LFile = open(output, "a")
        LFile.write(LSstring)       # save working host/usr/pass to file
        LFile.close()
        AMSout = open("AMSlist.txt", "a")
        AMSout.write("[Server"+str(nList.index(LSstring))+"]\nName="+str(host)+"\nPort=25\nUserID=User\nBccSize=50\nUserName="+str(user)+"\nPassword="+str(password)+"\nAuthType=0\n\n")
        smtp.quit()
    except(socket.gaierror, socket.error, socket.herror, smtplib.SMTPException), msg:
        print "[-] Login Failed:", host, user, password
        pass
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def get_name_from_ip(self, addr):  # pylint: disable=no-self-use
        """Returns a reverse dns name if available.

        :param addr: IP Address
        :type addr: ~.common.Addr

        :returns: name or empty string if name cannot be determined
        :rtype: str

        """
        # If it isn't a private IP, do a reverse DNS lookup
        if not common.private_ips_regex.match(addr.get_addr()):
            try:
                socket.inet_aton(addr.get_addr())
                return socket.gethostbyaddr(addr.get_addr())[0]
            except (socket.error, socket.herror, socket.timeout):
                pass

        return ""
项目:legos.nettools    作者:Legobot    | 项目源码 | 文件源码
def run(self):
        #do pings
        for x in range(0, self.repeat):
            self.one_ping(self.ip, self.port, self.identifier, self.sequence, self.ttl, self.timeout)
            self.sequence += 1
            if x != self.repeat -1:
                time.sleep(self.sleep)
        #count packet loss
        self.result['packet_loss'] /= self.repeat
        #try to get hostname
        try:
            self.result['hostname'] = socket.gethostbyaddr(self.ip)[0]
        except socket.herror:
            self.result['hostname'] = None
        #calculate averate time
        if len(self.result['times']) != 0:
            self.result['avg_time'] = sum(self.result['times']) / len(self.result['times'])
            #and calculate mdev
            mean = sum([float(x) for x in self.result['times']]) / len(self.result['times'])
            self.result['mdev'] = sum([abs(x - mean) for x in self.result['times']]) / len(self.result['times'])

        return self.result
项目:Network-Scanner-PORT-ALIVE    作者:C3s1um133    | 项目源码 | 文件源码
def scan_and_print_neighbors(net, interface, timeout=1):
    global ips_o
    print_fmt("\n\033[94m[ARP]\033[0m %s sur %s" % (net, interface))
    try:
        ans, unans = scapy.layers.l2.arping(net, iface=interface, timeout=timeout, verbose=False)
        for s, r in ans.res:
            line = r.sprintf("%Ether.src%  %ARP.psrc%")
            ips_o.append(line.split(' ')[2])
            line = mac_address_id(line)
            try:
                hostname = socket.gethostbyaddr(r.psrc)
                line += " " + hostname[0]
            except socket.herror:
                pass
            except KeyboardInterrupt:
                print '\033[91m[-]\033[0m L\'utilisateur a choisi l\'interruption du process.'
                break
            logger.info("\033[96m[ONLINE]\033[0m " + line)
    except socket.error as e:
        if e.errno == errno.EPERM:
            logger.error("\033[91m[-]\033[0m %s. Vous n'etes pas root?", e.strerror)
        else:
            raise
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def whois(url, command=False):
    # clean domain to expose netloc
    ip_match = re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", url)
    if ip_match:
        domain = url
        try:
            result = socket.gethostbyaddr(url)
        except socket.herror as e:
            pass
        else:
            domain = result[0]
    else:
        domain = extract_domain(url)
    if command:
        # try native whois command
        r = subprocess.Popen(['whois', domain], stdout=subprocess.PIPE)
        text = r.stdout.read()
    else:
        # try builtin client
        nic_client = NICClient()
        text = nic_client.whois_lookup(None, domain, 0)
    return WhoisEntry.load(domain, text)
项目:Belati    作者:aancw    | 项目源码 | 文件源码
def whois(url, command=False):
    # clean domain to expose netloc
    ip_match = re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", url)
    if ip_match:
        domain = url
        try:
            result = socket.gethostbyaddr(url)
        except socket.herror as e:
            pass
        else:
            domain = result[0]
    else:
        domain = extract_domain(url)
    if command:
        # try native whois command
        r = subprocess.Popen(['whois', domain], stdout=subprocess.PIPE)
        text = r.stdout.read()
    else:
        # try builtin client
        nic_client = NICClient()
        text = nic_client.whois_lookup(None, domain, 0)
    return WhoisEntry.load(domain, text)
项目:certbot    作者:nikoloskii    | 项目源码 | 文件源码
def get_name_from_ip(self, addr):  # pylint: disable=no-self-use
        """Returns a reverse dns name if available.

        :param addr: IP Address
        :type addr: ~.common.Addr

        :returns: name or empty string if name cannot be determined
        :rtype: str

        """
        # If it isn't a private IP, do a reverse DNS lookup
        if not common.private_ips_regex.match(addr.get_addr()):
            try:
                socket.inet_aton(addr.get_addr())
                return socket.gethostbyaddr(addr.get_addr())[0]
            except (socket.error, socket.herror, socket.timeout):
                pass

        return ""
项目:Project-ET    作者:p0rt22    | 项目源码 | 文件源码
def traceIP(target):
        try:
            base = GeoIP('GeoLiteCity.dat')
            data = base.record_by_addr(target)
            dnsName = socket.gethostbyaddr(target)[0]
            formatedData = '''IP: {}
City: {}
State/Province: {}
Country: {}
Continent: {}
Zip/Postal code: {}
Timezone: {}
Latitude: {}
Longitude: {}
DNS name: {}'''.format(target, data['city'], data['region_code'], data['country_name'], data['continent'], data['postal_code'], data['time_zone'], str(data['latitude']), str(data['longitude']), dnsName)
            print formatedData
            # compares target to database and print results to console

            askSave = raw_input('Save data? Y/n: ').lower()
            if askSave == 'y':
                ipFileName = raw_input('Filename: ')

                with open(ipFileName, 'w') as fileName:
                    fileName.write(formatedData)

                print 'Output saved as {}'.format(ipFileName)

            else:
                pass
            # asks user if they want to save the output

            pause()
            main()

        except socket.herror:
            pass
项目:esdc-ce    作者:erigones    | 项目源码 | 文件源码
def _test_vnc(host, port, timeout=3):
    """
    Test VNC connection.
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(timeout)

    try:
        sock.connect((host, port))

        if sock.recv(1024).startswith('RFB'):
            return True
    except (socket.error, socket.timeout, socket.herror, socket.gaierror) as err:
        logger.warning('Error "%s" when testing VNC on "%s:%s"', err, host, port)

    finally:
        sock.close()

    return False
项目:ssh    作者:GDGVIT    | 项目源码 | 文件源码
def TCP_connect(ip, port_number, delay, hosts):
    host_name = 'Unknown'
    TCPsock = socket.socket()
    TCPsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    TCPsock.settimeout(delay)
    try:
        host_name = socket.gethostbyaddr(ip)[0]
        # print(host_name)
    except socket.herror:
        pass
    try:
        TCPsock.connect((ip, port_number))
        if 'SSH' in str(TCPsock.recv(256)):
            hosts.append((host_name, ip))
            # print(host_name,ip)
    except (OSError, ConnectionRefusedError):
        pass
项目:MgmtAppForLinuxMachines_flask    作者:yugokato    | 项目源码 | 文件源码
def is_aws(ipaddr):
        if Validation.check_internet_connection():
            if ipaddr in Validation.__aws_cache:
                return True
            elif ipaddr in Validation.__no_aws_cache:
                return False
            else:   # if ipaddr not in chache
                try:
                    result = socket.gethostbyaddr(ipaddr)
                    for line in result:
                        if 'compute.amazonaws.com' in line:
                            Validation.__aws_cache.append(ipaddr)
                            return True
                except socket.herror:
                    Validation.__no_aws_cache.append(ipaddr)
        return False
项目:smarthome    作者:smarthomeNG    | 项目源码 | 文件源码
def get_local_hostname(self):
        """
        Returns the local hostname under which the webinterface can be reached

        :return: fully qualified hostname 
        :rtype: str
        """
        import socket
        try:
            return socket.gethostbyaddr(self.get_local_ip_address())[0] # can fail with default /etc/hosts
        except socket.herror:
            try:
                return socket.gethostbyaddr("127.0.1.1")[0] # in debian based systems hostname is assigned to "127.0.1.1" by default
            except socket.herror:
                try:
                    return socket.gethostbyaddr("127.0.0.1")[0] # 'localhost' in most cases
                except socket.herror:
                    return "localhost"  # should not happen
项目:oecluster    作者:OECFHTW    | 项目源码 | 文件源码
def scan_network(self):
        logger.info('scanning Network')
        network = self._config_reader.get_config_section("Networking")['network']
        netmask = self._config_reader.get_config_section("Networking")['netmask']
        my_net = ipaddress.ip_network(network+'/'+netmask)
        host_list = dict()
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.settimeout(2.0)

        for ip in my_net:
            try:
                # print(ip)
                host = self._generate_host(ip)
                if host is not None:
                    host_list[ip] = host
            except socket.herror as ex:
                pass
        return host_list
项目:oecluster    作者:OECFHTW    | 项目源码 | 文件源码
def fill_host_list(self, ip_list):
        host_list = dict()

        # Use UDP.
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.settimeout(2.0)

        for ip in ip_list:
            try:
                ip = ipaddress.ip_address(ip)
                host = self._generate_host(ip)
                if host is not None:
                    host_list[ip] = host
            except socket.herror as ex:
                pass

        return host_list
项目:oecluster    作者:OECFHTW    | 项目源码 | 文件源码
def _generate_host(self, ip):
        host = None

        # Use UDP. socket.SOCK_DGRAM
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.settimeout(2.0)

        try:
            host_name, alias, ipaddr = socket.gethostbyaddr(str(ip))
            host = Node.Node(ip, host_name)
        except socket.herror as ex:
            pass

        return host

    # Properties
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def lookup_ip_address(self, ip):
        """
        Perform a reverse DNS lookup for ip.

        Uses self.cache to speed up results when the same ip is
        lookup up several times during one session.

        :param ip: IP address to look up
        """

        if ip is None:
            return None
        if ip not in self.cache:
            try:
                self.cache[ip] = gethostbyaddr(ip)[0]
            except (herror, gaierror):
                # if lookup fails, return the input address
                self.cache[ip] = ip

        return self.cache[ip]
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def hostname(ip):
    """
    Performs a DNS reverse lookup for an IP address and caches the result in
    a global variable, which is really, really stupid.

    :param ip: And IP address string.
    :returns: A hostname string or a False value if the lookup failed.

    """
    addr = unicode(ip)
    if addr in _cached_hostname:
        return _cached_hostname[addr]

    try:
        dns = gethostbyaddr(addr)
    except herror:
        return False

    _cached_hostname[addr] = dns[0]
    return dns[0]
项目:xf    作者:red-green    | 项目源码 | 文件源码
def listen():
    port = input('port? ')
    sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    try:
        while True:
            sock.bind(('',port))
            sock.listen(1)
            conn,addr = sock.accept()
            while True:
                sys.stdout.write(conn.recv(4096))
                time.sleep(0.005)
    except (socket.herror,socket.gaierror,socket.timeout) as e:
        print 'could not connect: ',e
    except KeyboardInterrupt:
        print 'disconnecting'
        sock.close()
项目:xf    作者:red-green    | 项目源码 | 文件源码
def telnetbf():
    host = raw_input('host or ip to connect to> ')
    port = input('port to connect to> ')
    pwlist = raw_input('password list> ').strip()
    prompt = raw_input('the password prompt (the cue to send the password, usually "Password: ")> ')
    incorrect = raw_input('first thing the server says if password is incorrect (cue to disconnect and try another)> ')
    def try_pass(pw):
        try:
            tel = telnetlib.Telnet()
            tel.open(host,port)
            tel.read_until(prompt,5)
            tel.write(pw)
            tel.write('\n')
            ret = tel.read_until(incorrect,3)
            tel.close()
            if incorrect in ret:
                return False
            return True
        except (socket.error, socket.herror, socket.gaierror, socket.timeout, telnetlib.EOFError):
            return False
    zc = zippycrack(try_pass,pwlist,num_threads=4,cont=False)
    zc.run()
项目:hachoir3    作者:vstinner    | 项目源码 | 文件源码
def ip2name(addr):
    if not ip2name.resolve:
        return addr
    try:
        if addr in ip2name.cache:
            return ip2name.cache[addr]
        # FIXME: Workaround Python bug
        # Need double try/except to catch the bug
        try:
            name = gethostbyaddr(addr)[0]
        except KeyboardInterrupt:
            raise
    except (socket_host_error, ValueError):
        name = addr
    except (socket_host_error, KeyboardInterrupt, ValueError):
        ip2name.resolve = False
        name = addr
    ip2name.cache[addr] = name
    return name
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def ip2name(addr):
    if not ip2name.resolve:
        return addr
    try:
        if addr in ip2name.cache:
            return ip2name.cache[addr]
        # FIXME: Workaround Python bug
        # Need double try/except to catch the bug
        try:
            name = gethostbyaddr(addr)[0]
        except KeyboardInterrupt:
            raise
    except (socket_host_error, ValueError):
        name = addr
    except (socket_host_error, KeyboardInterrupt, ValueError):
        ip2name.resolve = False
        name = addr
    ip2name.cache[addr] = name
    return name
项目:webdigger    作者:rajeshmajumdar    | 项目源码 | 文件源码
def whois(url, command=False):
    # clean domain to expose netloc
    ip_match = re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", url)
    if ip_match:
        domain = url
        try:
            result = socket.gethostbyaddr(url)
        except socket.herror as e:
            pass
        else:
            domain = result[0]
    else:
        domain = extract_domain(url)
    if command:
        # try native whois command
        r = subprocess.Popen(['whois', domain], stdout=subprocess.PIPE)
        text = r.stdout.read()
    else:
        # try builtin client
        nic_client = NICClient()
        text = nic_client.whois_lookup(None, domain, 0)
    return WhoisEntry.load(domain, text)
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def host_cache_lookup(ip):
        try:
                return host_cache[ip]
        except KeyError:
                pass

        try:
                hname = socket.gethostbyaddr(ip)[0]
                host_cache[ip] = hname

                host_cache_add()

                return host_cache[ip]
        except socket.herror:
                pass

        host_cache[ip] = ip
        return host_cache[ip]

# Countries we're not allowed to report on (iran, north korea)
项目:data-flow-graph    作者:macbre    | 项目源码 | 文件源码
def normalize_host(ip):
    def resolve_host(ip):
        logger = logging.getLogger('resolve_host')

        try:
            hostname = gethostbyaddr(ip)[0]
        except herror as e:
            # socket.herror: [Errno 1] Unknown host
            logger.error("Unable to resolve %s: %s", ip, e)
            return ip

        hostname = hostname.split('.')[0]
        logger.info("%s is known as %s", ip, hostname)

        if not hostname.startswith('ap-'):
            return hostname
        else:
            # ap-s200 -> ap-s*
            return re.sub(r'\d+$', '*', hostname)

    if ip not in hosts_cache:
        hosts_cache[ip] = resolve_host(ip)

    return hosts_cache[ip]
项目:subdomain_finder    作者:anarcoder    | 项目源码 | 文件源码
def whois(url, command=False):
    # clean domain to expose netloc
    ip_match = re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", url)
    if ip_match:
        domain = url
        try:
            result = socket.gethostbyaddr(url)
        except socket.herror as e:
            pass
        else:
            domain = result[0]
    else:
        domain = extract_domain(url)
    if command:
        # try native whois command
        r = subprocess.Popen(['whois', domain], stdout=subprocess.PIPE)
        text = r.stdout.read()
    else:
        # try builtin client
        nic_client = NICClient()
        text = nic_client.whois_lookup(None, domain, 0)
    return WhoisEntry.load(domain, text)
项目:IP    作者:DannyMcwaves    | 项目源码 | 文件源码
def state(self):
        """
        :return: The current state of the ip address.
        """
        connect = socket.socket()
        try:
            stat = connect.connect_ex((self.address, 80))
            if stat == 0:
                return "HOST IS UP"
            else:
                return "HOST IS DOWN"
        except socket.gaierror as gaer:
            return gaer.args[1]
        except socket.herror as her:
            return her.args[1]
        except socket.error as err:
            return err.args[1]
        finally:
            connect.shutdown(socket.SHUT_RDWR)
            connect.close()
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def run(self):
        value, user = getword()
        try:
            print "-"*12
            print "User:",user,"Password:",value
            smtp = smtplib.SMTP(sys.argv[1])
            smtp.login(user, value)
            print "\t\nLogin successful:",user, value
            smtp.quit()
            work.join()
            sys.exit(2)
        except(socket.gaierror, socket.error, socket.herror, smtplib.SMTPException), msg: 
            #print "An error occurred:", msg
            pass
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def run(self):
        value, user = getword()
        for ip in completed:
            print "-"*12
            print "[+] IP: "+ip
            try:
                print "User:",user,"Password:",value
                smtp = smtplib.SMTP(ip)
                smtp.login(user, value)
                print "\t\n[!] Login successful:",user, value
                logger.write("[!] Found: " + ip + " " + str(user) + ":" + str(value) + "\n")
                smtp.quit()
                sys.exit(2)
            except(socket.gaierror, socket.error, socket.herror, smtplib.SMTPException), msg: 
                pass
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def run(self):
        value, user = getword()
        try:
            print "-"*12
            print "User:",user,"Password:",value
            n = nntplib.NNTP(sys.argv[1],int(sys.argv[2]),user,value)
            print "\t\nLogin successful:",value, user
            n.quit()
            work.join()
            sys.exit(2)
        except(nntplib.NNTPError, socket.gaierror, socket.error, socket.herror), msg: 
            print "An error occurred:", msg
            pass
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def run(self):
        value = getword()
        try:
            print "-"*12
            print "User:",user[:-1],"Password:",value
            n = nntplib.NNTP(ip,119,user,value)
            print "\t\nLogin successful:",user, value
            n.quit()
            work.join()
            sys.exit(2)
        except(nntplib.NNTPError, socket.gaierror, socket.error, socket.herror), msg: 
            print "An error occurred:", msg
            pass
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def run(self):
        value = getword()
        try:
            print "-"*12
            print "User:",user[:-1],"Password:",value
            M = imaplib.IMAP4(ipaddr[0])
            M = login(user[:-1], value)
            print "\t\nLogin successful:",user, value
            M.close()
            M.logout()
            work.join()
            sys.exit(2)
        except(IMAP4.error, socket.gaierror, socket.error, socket.herror), msg: 
            print "An error occurred:", msg
            pass
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def run(self):
        value = getword()
        try:
            print "-"*12
            print "User:",user[:-1],"Password:",value
            smtp = smtplib.SMTP(ipaddr[0])
            smtp.login(user[:-1], value)
            print "\t\nLogin successful:",user, value
            smtp.quit()
            work.join()
            sys.exit(2)
        except(socket.gaierror, socket.error, socket.herror, smtplib.SMTPException), msg: 
            #print "An error occurred:", msg
            pass
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def run(self):
        value = getword()
        try:
            print "-"*12
            print "User:",user[:-1],"Password:",value
            n = nntplib.NNTP(ipaddr[0],119,user,value)
            print "\t\nLogin successful:",user, value
            n.quit()
            work.join()
            sys.exit(2)
        except (nntplib.NNTPError, socket.gaierror, socket.error, socket.herror), msg: 
            #print "An error occurred:", msg
            pass
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def run(self):
        value, user = getword()
        try:
            print "-"*12
            print "User:",user,"Password:",value
            M = imaplib.IMAP4(sys.argv[1])
            M = login(user, value)
            print "\t\nLogin successful:",user, value 
            M.close()
            M.logout()
            work.join()
            sys.exit(2)
        except(IMAP4.error, socket.gaierror, socket.error, socket.herror), msg: 
            print "An error occurred:", msg
            pass
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def get_all_names(self):
        """Returns all names found in the Nginx Configuration.

        :returns: All ServerNames, ServerAliases, and reverse DNS entries for
                  virtual host addresses
        :rtype: set

        """
        all_names = set()

        for vhost in self.parser.get_vhosts():
            all_names.update(vhost.names)

            for addr in vhost.addrs:
                host = addr.get_addr()
                if common.hostname_regex.match(host):
                    # If it's a hostname, add it to the names.
                    all_names.add(host)
                elif not common.private_ips_regex.match(host):
                    # If it isn't a private IP, do a reverse DNS lookup
                    # TODO: IPv6 support
                    try:
                        socket.inet_aton(host)
                        all_names.add(socket.gethostbyaddr(host)[0])
                    except (socket.error, socket.herror, socket.timeout):
                        continue

        return util.get_filtered_names(all_names)
项目:cerberus-core    作者:ovh    | 项目源码 | 文件源码
def run(self):
        """
            Infinite loop fetching email , lauching < 10 threads pushing email
            to storage service and worker
        """
        try:
            self._imap_conn = get_imap_connection(host=HOST, port=PORT, user=USER, passwd=PASS)
        except (IMAP4.error, IMAP4.abort, IMAP4.readonly) as ex:
            Logger.error(unicode('Error in IMAP connection - %s' % (str(ex))))
            return
        except (socket.error, socket.gaierror, socket.herror, socket.timeout) as ex:
            Logger.error(unicode('Error in IMAP connection - %s' % (str(ex))))
            return

        pool = ThreadPool(5)

        while True:
            try:
                messages = self.get_messages()
            except (socket.error, socket.gaierror, socket.herror,
                    socket.timeout, ssl.SSLError) as ex:
                Logger.debug(unicode('Error in IMAP connection - %s' % (str(ex))))
                return

            if not messages:
                sleep(1)
                continue

            uids = messages.keys()
            args = [(uid, messages) for uid in uids]
            pool.map(push_email, args)
            pool.wait_completion()

            for uid in uids:
                if uid not in messages:
                    self._imap_conn.uid('store', uid, '+FLAGS', r'(\Deleted)')

            self._imap_conn.expunge()
            sleep(1)
项目:cerberus-core    作者:ovh    | 项目源码 | 文件源码
def get_ips_from_fqdn(fqdn):
    """
        Retrieve IPs from FQDN

        :param str fqdn: The FQDN to resolve
        :rtype: list
        :return: the list of resolved IP address for given FQDN
    """
    try:
        socket.setdefaulttimeout(5)
        ips = socket.gethostbyname_ex(fqdn)[2]
        return ips
    except (ValueError, socket.error, socket.gaierror, socket.herror, socket.timeout):
        return None
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def run():
    try:
        querly = socket.gethostbyaddr(variables['target'][0])
        printSuccess("resolved hostname: "+ querly[0])
        return querly[0]
    except(socket.herror):
        printError("unknown host")
        return ModuleError("unknown host")
    except(socket.gaierror):
        printError("name or service not known")
        return ModuleError("name or service not known")
项目:manyfaced-honeypot    作者:Zloool    | 项目源码 | 文件源码
def __init__(self, ip, raw_request, timestamp, parsed_request, is_detected,
                 hostname):
        self.ip = ip
        self.raw_request = raw_request
        self.timestamp = timestamp
        self.path = ""
        self.command = ""
        self.version = ""
        self.ua = ""
        self.headers = ""
        self.country = ""
        self.continent = ""
        self.timezone = ""
        self.dns_name = ""
        self.tracert = ""  # TODO
        if hasattr(parsed_request, 'path'):
            self.path = parsed_request.path
        if parsed_request.command is not None:
            self.command = parsed_request.command
        if hasattr(parsed_request, 'request_version'):
            self.version = parsed_request.request_version
        if hasattr(parsed_request, 'headers'):
            self.headers = parsed_request.headers
            if 'user-agent' in parsed_request.headers:
                self.ua = parsed_request.headers['user-agent']
        self.isDetected = is_detected
        self.hostname = hostname
        location = geolite2.lookup(ip)
        if location is not None:
            self.country = location.country
            self.continent = location.continent
            self.timezone = location.timezone
        try:
            self.dns_name = socket.gethostbyaddr(ip)[0]
        except socket.herror:
            pass
项目:Theseus    作者:Dylan-halls    | 项目源码 | 文件源码
def arp_spoof(self, tm):
            arp = Arp_Spoof(args.iface)
            try:
                log.status("{} ({}) is at {}".format(socket.gethostbyaddr(args.target)[0], args.target, tm))
            except socket.herror:
                log.warn("{} is at {}".format(args.target, tm))

            ajobs = []
            victim_thread = multiprocessing.Process(target=arp.poison_victim, args=(args.target, args.gateway, int(verbose), args.iface, tm))
            ajobs.append(victim_thread)
            victim_thread.start()
            try:
                vname = socket.gethostbyaddr(args.target)[0]
                vname = vname.replace('.home', " ")
                log.status("Started attack on {}".format(vname))
            except socket.herror:
                log.warn("Started attack on {}".format(args.target))

            target_thread = multiprocessing.Process(target=arp.poison_router, args=(args.gateway, args.target, int(verbose), args.iface, tm))
            ajobs.append(victim_thread)
            target_thread.start()
            try:
                rname = socket.gethostbyaddr(args.gateway)[0]
                rname = rname.replace('.home', " ")
                log.status("Started attack on {}".format(rname))
            except socket.herror:
                log.warn("Started attack on {}".format(args.target))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testSocketError(self):
        # Testing socket module exceptions
        def raise_error(*args, **kwargs):
            raise socket.error
        def raise_herror(*args, **kwargs):
            raise socket.herror
        def raise_gaierror(*args, **kwargs):
            raise socket.gaierror
        self.assertRaises(socket.error, raise_error,
                              "Error raising socket exception.")
        self.assertRaises(socket.error, raise_herror,
                              "Error raising socket exception.")
        self.assertRaises(socket.error, raise_gaierror,
                              "Error raising socket exception.")
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testExceptionTree(self):
        self.assertTrue(issubclass(socket.error, Exception))
        self.assertTrue(issubclass(socket.herror, socket.error))
        self.assertTrue(issubclass(socket.gaierror, socket.error))
        self.assertTrue(issubclass(socket.timeout, socket.error))
项目:attacks-pages-collector    作者:ifreddyrondon    | 项目源码 | 文件源码
def get_host(ip):
    attempts = 5
    host = "undefined"
    while attempts:
        try:
            data = socket.gethostbyaddr(ip)
            host = data[0]
            break
        except socket.herror:
            attempts -= 1

    return host
项目:attacks-pages-collector    作者:ifreddyrondon    | 项目源码 | 文件源码
def get_host(ip):
    attempts = 5
    host = "undefined"
    while attempts:
        try:
            data = socket.gethostbyaddr(ip)
            host = data[0]
            break
        except socket.herror:
            attempts -= 1

    return host
项目:attacks-pages-collector    作者:ifreddyrondon    | 项目源码 | 文件源码
def get_ip(name):
    attempts = 5
    ip = "undefined"
    while attempts:
        try:
            data = socket.gethostbyname_ex(name)
            ip = data[2][0]
            break
        except (socket.herror, socket.gaierror):
            attempts -= 1

    return ip
项目:attacks-pages-collector    作者:ifreddyrondon    | 项目源码 | 文件源码
def get_host(ip):
    attempts = 5
    host = "undefined"
    while attempts:
        try:
            data = socket.gethostbyaddr(ip)
            host = data[0]
            break
        except socket.herror:
            attempts -= 1

    return host
项目:attacks-pages-collector    作者:ifreddyrondon    | 项目源码 | 文件源码
def get_host(ip):
    attempts = 5
    host = "undefined"
    while attempts:
        try:
            data = socket.gethostbyaddr(ip)
            host = data[0]
            break
        except socket.herror:
            attempts -= 1

    return host