Python socket 模块,getfqdn() 实例源码

我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用socket.getfqdn()

项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def __init__(self, server, conn, addr):
        asynchat.async_chat.__init__(self, conn)
        self.__server = server
        self.__conn = conn
        self.__addr = addr
        self.__line = []
        self.__state = self.COMMAND
        self.__greeting = 0
        self.__mailfrom = None
        self.__rcpttos = []
        self.__data = ''
        self.__fqdn = socket.getfqdn()
        self.__peer = conn.getpeername()
        print >> DEBUGSTREAM, 'Peer:', repr(self.__peer)
        self.push('220 %s %s' % (self.__fqdn, __version__))
        self.set_terminator('\r\n')

    # Overrides base class for convenience
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def __init__(self, server_address):
        # Create a listening socket
        self.listen_socket = listen_socket = socket.socket(
            self.address_family,
            self.socket_type
        )
        # Allow to reuse the same address
        listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        # Bind
        listen_socket.bind(server_address)
        # Activate
        listen_socket.listen(self.request_queue_size)
        # Get server host name and port
        host, port = self.listen_socket.getsockname()[:2]
        self.server_name = socket.getfqdn(host)
        self.server_port = port
        # Return headers set by Web framework/Web application
        self.headers_set = []
项目:jenkins-epo    作者:peopledoc    | 项目源码 | 文件源码
def generate_comment(self):
        docs = []
        for ext in self.bot.extensions:
            doc = ext.__class__.__doc__
            if not doc:
                continue
            docs.append(inspect.cleandoc(doc))
        help_ = '\n\n'.join(docs)
        return self.HELP % dict(
            extensions=','.join(sorted(self.bot.extensions_map.keys())),
            help=help_,
            host=socket.getfqdn(),
            me=self.current.head.repository.SETTINGS.NAME,
            mentions=', '.join(sorted([
                '@' + m for m in self.current.help_mentions
            ])),
            software=self.DISTRIBUTION.project_name,
            version=self.DISTRIBUTION.version,
        )
项目:python-mysql-pool    作者:LuciferJack    | 项目源码 | 文件源码
def report_failure(self, server_uuid, errno):
        """Report failure to Fabric

        This method sets the status of a MySQL server identified by
        server_uuid.
        """
        if not self._report_errors:
            return

        errno = int(errno)
        current_host = socket.getfqdn()

        if errno in REPORT_ERRORS or errno in REPORT_ERRORS_EXTRA:
            _LOGGER.debug("Reporting error %d of server %s", errno,
                          server_uuid)
            inst = self.get_instance()
            try:
                data = inst.execute('threat', 'report_failure',
                                    server_uuid, current_host, errno)
                FabricResponse(data)
            except (Fault, socket.error) as exc:
                _LOGGER.debug("Failed reporting server to Fabric (%s)",
                              str(exc))
                # Not requiring further action
项目:spiderfoot    作者:wi-fi-analyzer    | 项目源码 | 文件源码
def override_system_resolver(resolver=None):
    """Override the system resolver routines in the socket module with
    versions which use dnspython's resolver.

    This can be useful in testing situations where you want to control
    the resolution behavior of python code without having to change
    the system's resolver settings (e.g. /etc/resolv.conf).

    The resolver to use may be specified; if it's not, the default
    resolver will be used.

    @param resolver: the resolver to use
    @type resolver: dns.resolver.Resolver object or None
    """
    if resolver is None:
        resolver = get_default_resolver()
    global _resolver
    _resolver = resolver
    socket.getaddrinfo = _getaddrinfo
    socket.getnameinfo = _getnameinfo
    socket.getfqdn = _getfqdn
    socket.gethostbyname = _gethostbyname
    socket.gethostbyname_ex = _gethostbyname_ex
    socket.gethostbyaddr = _gethostbyaddr
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def __init__(self, regr, key, meta=None):
        self.key = key
        self.regr = regr
        self.meta = self.Meta(
            # pyrfc3339 drops microseconds, make sure __eq__ is sane
            creation_dt=datetime.datetime.now(
                tz=pytz.UTC).replace(microsecond=0),
            creation_host=socket.getfqdn()) if meta is None else meta

        self.id = hashlib.md5(
            self.key.key.public_key().public_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PublicFormat.SubjectPublicKeyInfo)
        ).hexdigest()
        # Implementation note: Email? Multiple accounts can have the
        # same email address. Registration URI? Assigned by the
        # server, not guaranteed to be stable over time, nor
        # canonical URI can be generated. ACME protocol doesn't allow
        # account key (and thus its fingerprint) to be updated...
项目:dlbench    作者:hclhkbu    | 项目源码 | 文件源码
def get_host_ip(hostIP=None):
    if hostIP is None or hostIP == 'auto':
        hostIP = 'ip'

    if hostIP == 'dns':
        hostIP = socket.getfqdn()
    elif hostIP == 'ip':
        from socket import gaierror
        try:
            hostIP = socket.gethostbyname(socket.getfqdn())
        except gaierror:
            logging.warn('gethostbyname(socket.getfqdn()) failed... trying on hostname()')
            hostIP = socket.gethostbyname(socket.gethostname())
        if hostIP.startswith("127."):
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            # doesn't have to be reachable
            s.connect(('10.255.255.255', 1))
            hostIP = s.getsockname()[0]
    return hostIP
项目:importacsv    作者:rasertux    | 项目源码 | 文件源码
def report_failure(self, server_uuid, errno):
        """Report failure to Fabric

        This method sets the status of a MySQL server identified by
        server_uuid.
        """
        if not self._report_errors:
            return

        errno = int(errno)
        current_host = socket.getfqdn()

        if errno in REPORT_ERRORS or errno in REPORT_ERRORS_EXTRA:
            _LOGGER.debug("Reporting error %d of server %s", errno,
                          server_uuid)
            inst = self.get_instance()
            try:
                data = inst.execute('threat', 'report_failure',
                                    server_uuid, current_host, errno)
                FabricResponse(data)
            except (Fault, socket.error) as exc:
                _LOGGER.debug("Failed reporting server to Fabric (%s)",
                              str(exc))
                # Not requiring further action
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def override_system_resolver(resolver=None):
    """Override the system resolver routines in the socket module with
    versions which use dnspython's resolver.

    This can be useful in testing situations where you want to control
    the resolution behavior of python code without having to change
    the system's resolver settings (e.g. /etc/resolv.conf).

    The resolver to use may be specified; if it's not, the default
    resolver will be used.

    @param resolver: the resolver to use
    @type resolver: dns.resolver.Resolver object or None
    """
    if resolver is None:
        resolver = get_default_resolver()
    global _resolver
    _resolver = resolver
    socket.getaddrinfo = _getaddrinfo
    socket.getnameinfo = _getnameinfo
    socket.getfqdn = _getfqdn
    socket.gethostbyname = _gethostbyname
    socket.gethostbyname_ex = _gethostbyname_ex
    socket.gethostbyaddr = _gethostbyaddr
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def __init__(self, client, path, num_clients, identifier=None):
        """Create a Double Barrier

        :param client: A :class:`~kazoo.client.KazooClient` instance.
        :param path: The barrier path to use.
        :param num_clients: How many clients must enter the barrier to
                            proceed.
        :type num_clients: int
        :param identifier: An identifier to use for this member of the
                           barrier when participating. Defaults to the
                           hostname + process id.

        """
        self.client = client
        self.path = path
        self.num_clients = num_clients
        self._identifier = identifier or '%s-%s' % (
            socket.getfqdn(), os.getpid())
        self.participating = False
        self.assured_path = False
        self.node_name = uuid.uuid4().hex
        self.create_path = self.path + "/" + self.node_name
项目:floto    作者:babbel    | 项目源码 | 文件源码
def __init__(self, swf=None, identity=None):
        """Base class for deciders.
        Parameters
        ----------
        swf: floto.api.Swf
            The SWF client, if swf is None an instance is created
        """
        self.task_token = None
        self.last_response = None
        self.history = None
        self.decisions = []
        self.run_id = None
        self.workflow_id = None
        self.domain = None
        self.identity = identity or socket.getfqdn(socket.gethostname())

        self.swf = swf or floto.api.Swf()
        self.task_list = None

        self.terminate_workflow = False
        self.terminate_decider = False

        self._separate_process = None
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def select_playbook(self, path):
        playbook = None
        if len(self.args) > 0 and self.args[0] is not None:
            playbook = os.path.join(path, self.args[0])
            rc = self.try_playbook(playbook)
            if rc != 0:
                display.warning("%s: %s" % (playbook, self.PLAYBOOK_ERRORS[rc]))
                return None
            return playbook
        else:
            fqdn = socket.getfqdn()
            hostpb = os.path.join(path, fqdn + '.yml')
            shorthostpb = os.path.join(path, fqdn.split('.')[0] + '.yml')
            localpb = os.path.join(path, self.DEFAULT_PLAYBOOK)
            errors = []
            for pb in [hostpb, shorthostpb, localpb]:
                rc = self.try_playbook(pb)
                if rc == 0:
                    playbook = pb
                    break
                else:
                    errors.append("%s: %s" % (pb, self.PLAYBOOK_ERRORS[rc]))
            if playbook is None:
                display.warning("\n".join(errors))
            return playbook
项目:opsmgr    作者:open-power-ref-design-toolkit    | 项目源码 | 文件源码
def _check_address(address):
    ipv4 = ""
    hostname = ""
    if is_valid_address(address):
        ipv4 = address
        try:
            hostname = socket.gethostbyaddr(address)[0]
        except Exception:
            pass  # no DNS
    else:
        hostname = socket.getfqdn(address)
        try:
            ipv4 = socket.gethostbyname(hostname)
        except Exception:
            pass  # host not valid or offline
    return ipv4, hostname
项目:twitter-message-bus    作者:clickyotomy    | 项目源码 | 文件源码
def post(content, token=None, username=None, public=False, debug=False):
    '''
    Post a gist on GitHub.
    '''
    random = hashlib.sha1(os.urandom(16)).hexdigest()
    username = getuser() if username is None else username
    now = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
    description = ('{hash} (twitter-message-bus); from {host} by {user} '
                   'at {time} UTC.').format(host=getfqdn(), user=username,
                                            time=now, hash=random)

    payload = json.dumps({
        'files': {
            'message': {
                'content': content if content is not None else ''
            }
        },
        'public': public,
        'description': description
    })

    response = github(http='post', uri='gists', token=token, payload=payload,
                      debug=debug)
    return (response['id'], random) if 'id' in response else (None, None)
项目:virtual-core    作者:tiagoantao    | 项目源码 | 文件源码
def welcome(run=0):
    if run == 0:
        form = wizard.config.get('General', {})
    else:
        form = _delist(request.form)
        if _has_all_parameters(form, ['host', 'country', 'state', 'locality',
                                      'orgname', 'orgunit', 'commonname',
                                      'email']):
            wizard.change_config('General', **form)
            return redirect(url_for('determine_ssh_status', run=0))
    if 'host' not in form:
        form['host'] = socket.getfqdn()

    all_params = {'run': run, **form}
    return render_template('welcome.html',
                           menu_options=wizard.get_available_options(),
                           **all_params)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testHostnameRes(self):
        # Testing hostname resolution mechanisms
        hostname = socket.gethostname()
        try:
            ip = socket.gethostbyname(hostname)
        except socket.error:
            # Probably name lookup wasn't set up right; skip this test
            return
        self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
        try:
            hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
        except socket.error:
            # Probably a similar problem as above; skip this test
            return
        all_host_names = [hostname, hname] + aliases
        fqhn = socket.getfqdn(ip)
        if not fqhn in all_host_names:
            self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def setUp(self):
        self.real_getfqdn = socket.getfqdn
        socket.getfqdn = mock_socket.getfqdn
        # temporarily replace sys.stdout to capture DebuggingServer output
        self.old_stdout = sys.stdout
        self.output = io.StringIO()
        sys.stdout = self.output

        self.serv_evt = threading.Event()
        self.client_evt = threading.Event()
        # Capture SMTPChannel debug output
        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
        smtpd.DEBUGSTREAM = io.StringIO()
        # Pick a random unused port by passing 0 for the port number
        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1))
        # Keep a note of what port was assigned
        self.port = self.serv.socket.getsockname()[1]
        serv_args = (self.serv, self.serv_evt, self.client_evt)
        self.thread = threading.Thread(target=debugging_server, args=serv_args)
        self.thread.start()

        # wait until server thread has assigned a port number
        self.serv_evt.wait()
        self.serv_evt.clear()
项目:landscape-client    作者:CanonicalLtd    | 项目源码 | 文件源码
def get_fqdn():
    """
    Return the current fqdn of the machine, trying hard to return a meaningful
    name.

    In particular, it means working against a NetworkManager bug which seems to
    make C{getfqdn} return localhost6.localdomain6 for machine without a domain
    since Maverick.
    """
    fqdn = socket.getfqdn()
    if "localhost" in fqdn:
        # Try the heavy artillery
        fqdn = socket.getaddrinfo(socket.gethostname(), None, socket.AF_INET,
                                  socket.SOCK_DGRAM, socket.IPPROTO_IP,
                                  socket.AI_CANONNAME)[0][3]
        if "localhost" in fqdn:
            # Another fallback
            fqdn = socket.gethostname()
    return fqdn
项目:strack_python_api    作者:cine-use    | 项目源码 | 文件源码
def get_unique_code():
        # get code from env
        code_from_env = os.environ.get("STRACK_UNIQUE_CODE")
        if code_from_env:
            return code_from_env
        # read code from cache
        unique_code_cache_file = os.path.join(STRACK_USER_PATH, "unique")
        if os.path.isfile(unique_code_cache_file):
            with open(unique_code_cache_file) as f:
                code_from_file = f.read()
            os.environ.update({"STRACK_UNIQUE_CODE": code_from_file})
            return code_from_file
        # make dir
        if not os.path.isdir(STRACK_USER_PATH):
            os.makedirs(STRACK_USER_PATH)
        # generate the code
        computer_name = socket.getfqdn(socket.gethostname())
        ip = socket.gethostbyname(computer_name)
        unique_code = md5(ip).hexdigest()
        # save cache
        os.environ.update({"STRACK_UNIQUE_CODE": unique_code})
        with open(unique_code_cache_file, "w") as f:
            f.write(unique_code)
        return unique_code
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def perform_krb181_workaround():
    cmdv = [configuration.get('kerberos', 'kinit_path'),
            "-c", configuration.get('kerberos', 'ccache'),
            "-R"]  # Renew ticket_cache

    log.info("Renewing kerberos ticket to work around kerberos 1.8.1: " +
             " ".join(cmdv))

    ret = subprocess.call(cmdv, close_fds=True)

    if ret != 0:
        principal = "%s/%s" % (configuration.get('kerberos', 'principal'), socket.getfqdn())
        fmt_dict = dict(princ=principal,
                        ccache=configuration.get('kerberos', 'principal'))
        log.error("Couldn't renew kerberos ticket in order to work around "
                  "Kerberos 1.8.1 issue. Please check that the ticket for "
                  "'%(princ)s' is still renewable:\n"
                  "  $ kinit -f -c %(ccache)s\n"
                  "If the 'renew until' date is the same as the 'valid starting' "
                  "date, the ticket cannot be renewed. Please check your KDC "
                  "configuration, and the ticket renewal policy (maxrenewlife) "
                  "for the '%(princ)s' and `krbtgt' principals." % fmt_dict)
        sys.exit(ret)
项目:certbot    作者:nikoloskii    | 项目源码 | 文件源码
def __init__(self, regr, key, meta=None):
        self.key = key
        self.regr = regr
        self.meta = self.Meta(
            # pyrfc3339 drops microseconds, make sure __eq__ is sane
            creation_dt=datetime.datetime.now(
                tz=pytz.UTC).replace(microsecond=0),
            creation_host=socket.getfqdn()) if meta is None else meta

        self.id = hashlib.md5(
            self.key.key.public_key().public_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PublicFormat.SubjectPublicKeyInfo)
        ).hexdigest()
        # Implementation note: Email? Multiple accounts can have the
        # same email address. Registration URI? Assigned by the
        # server, not guaranteed to be stable over time, nor
        # canonical URI can be generated. ACME protocol doesn't allow
        # account key (and thus its fingerprint) to be updated...
项目:osp-scraper    作者:opensyllabus    | 项目源码 | 文件源码
def update_warc_info_from_spider(record, spider):
    """update a WARC warcinfo record from a scrapy Spider"""

    # make empty header object to use for fields
    # XXX WARCHeader messes up capitalization here
    fields = warc.WARCHeader({}, defaults=False)
    fields['software'] = 'osp_scraper'
    fields['hostname'] = socket.getfqdn()
    fields['x-spider-name'] = spider.name
    fields['x-spider-run-id'] = spider.run_id
    fields['x-spider-revision'] = git_revision
    fields['x-spider-parameters'] = json.dumps(spider.get_parameters())

    buf = BytesIO()
    fields.write_to(buf, version_line=False, extra_crlf=False)
    record.update_payload(buf.getvalue())
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def test_login(self):
        self.hosts = self.add_hosts([
            config['lustre_servers'][0]['address'],
            config['lustre_servers'][1]['address']])

        # Chroma puts its FQDN in the manager certificate, but the test config may
        # be pointing to localhost: if this is the case, substitute the FQDN in the
        # URL so that the client can validate the certificate.
        url = config['chroma_managers'][0]['server_http_url']
        parsed = urlparse.urlparse(url)
        if parsed.hostname == 'localhost':
            parsed = list(parsed)
            parsed[1] = parsed[1].replace("localhost", socket.getfqdn())
            url = urlparse.urlunparse(tuple(parsed))

        example_api_client.setup_ca(url)
        hosts = example_api_client.list_hosts(url,
            config['chroma_managers'][0]['users'][0]['username'],
            config['chroma_managers'][0]['users'][0]['password']
        )
        self.assertListEqual(hosts, [h['fqdn'] for h in self.hosts])
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def testHostnameRes(self):
        # Testing hostname resolution mechanisms
        hostname = socket.gethostname()
        try:
            ip = socket.gethostbyname(hostname)
        except socket.error:
            # Probably name lookup wasn't set up right; skip this test
            self.skipTest('name lookup failure')
        self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
        try:
            hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
        except socket.error:
            # Probably a similar problem as above; skip this test
            self.skipTest('address lookup failure')
        all_host_names = [hostname, hname] + aliases
        fqhn = socket.getfqdn(ip)
        if not fqhn in all_host_names:
            self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def testHostnameRes(self):
        # Testing hostname resolution mechanisms
        hostname = socket.gethostname()
        try:
            ip = socket.gethostbyname(hostname)
        except socket.error:
            # Probably name lookup wasn't set up right; skip this test
            self.skipTest('name lookup failure')
        self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
        try:
            hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
        except socket.error:
            # Probably a similar problem as above; skip this test
            self.skipTest('address lookup failure')
        all_host_names = [hostname, hname] + aliases
        fqhn = socket.getfqdn(ip)
        if not fqhn in all_host_names:
            self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
项目:munch-core    作者:crunchmail    | 项目源码 | 文件源码
def save(self, *args, **kwargs):
        created = not self.pk

        self.update_mail_status()

        if self.status_code:
            status_code = rfc3463_regex.match(self.status_code)
            if status_code:
                self.status_code = status_code.group(0)

        if not self.pk:
            if not self.creation_date:
                self.creation_date = timezone.now()
            if not self.source_hostname:
                self.source_hostname = socket.getfqdn()
        super().save(*args, **kwargs)

        if created and self.status in [self.DROPPED, self.BOUNCED]:
            self.should_optout(create=True)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def override_system_resolver(resolver=None):
    """Override the system resolver routines in the socket module with
    versions which use dnspython's resolver.

    This can be useful in testing situations where you want to control
    the resolution behavior of python code without having to change
    the system's resolver settings (e.g. /etc/resolv.conf).

    The resolver to use may be specified; if it's not, the default
    resolver will be used.

    @param resolver: the resolver to use
    @type resolver: dns.resolver.Resolver object or None
    """
    if resolver is None:
        resolver = get_default_resolver()
    global _resolver
    _resolver = resolver
    socket.getaddrinfo = _getaddrinfo
    socket.getnameinfo = _getnameinfo
    socket.getfqdn = _getfqdn
    socket.gethostbyname = _gethostbyname
    socket.gethostbyname_ex = _gethostbyname_ex
    socket.gethostbyaddr = _gethostbyaddr
项目:of    作者:OptimalBPM    | 项目源码 | 文件源码
def store_process_system_document(_process_id, _name, _parent_id=None):
    """
    Creates a process instance structure, automatically sets systemPid, spawnedBy, host and spawnedWhen
    """
    _struct = {
        "_id": _process_id,
        "systemPid": os.getpid(),
        "spawnedBy": get_current_login(),
        "name": _name,
        "host": socket.getfqdn(),
        "spawnedWhen": str(datetime.datetime.utcnow()),
        "schemaRef": "ref://of.process.system"
    }
    if _parent_id:
        _struct["parent_id"] = _parent_id,

    return _struct
项目:python_programing    作者:lzhaoyang    | 项目源码 | 文件源码
def __init__(self,server_address):
        #create a listening socket
        self.listen_socket=listen_socket=socket.socket(
            self.address_family,self.socket_type
        )
        #allow the same socket address
        listen_socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
        #bind to the address
        listen_socket.bind(server_address)
        #activate to listening
        listen_socket.listen(self.request_queue_size)
        #get server host name and port
        host,port = self.listen_socket.getsockname()[:2]
        self.server_name=socket.getfqdn(host)
        self.server_port=port
        # return headers set by web frameworks/applications
        self.headers_set=[]
项目:cheapstream    作者:miltador    | 项目源码 | 文件源码
def override_system_resolver(resolver=None):
    """Override the system resolver routines in the socket module with
    versions which use dnspython's resolver.

    This can be useful in testing situations where you want to control
    the resolution behavior of python code without having to change
    the system's resolver settings (e.g. /etc/resolv.conf).

    The resolver to use may be specified; if it's not, the default
    resolver will be used.

    @param resolver: the resolver to use
    @type resolver: dns.resolver.Resolver object or None
    """
    if resolver is None:
        resolver = get_default_resolver()
    global _resolver
    _resolver = resolver
    socket.getaddrinfo = _getaddrinfo
    socket.getnameinfo = _getnameinfo
    socket.getfqdn = _getfqdn
    socket.gethostbyname = _gethostbyname
    socket.gethostbyname_ex = _gethostbyname_ex
    socket.gethostbyaddr = _gethostbyaddr
项目:cmdb    作者:jonmsawyer    | 项目源码 | 文件源码
def register(args, config):
    path = '/clients/register/'
    my_hostname = socket.getfqdn().lower()
    agent_root_dir = os.path.dirname(os.path.dirname(__file__))
    url = lib.lib.get_config_url(config, path)
    req = requests.post(url, data=json.dumps({'fqdn': my_hostname}))
    resp_body = lib.lib.get_response_body(req)
    obj = json.loads(resp_body)
    lib.lib.check_for_error(obj)

    print('Host `{}` successfully registered with CMDB!'.format(my_hostname))
    print()
    print('API Key: {}'.format(obj.get('api_key')))
    print()
    print('!!! WARNING: save this API Key in your conifg file: `{}`'.format(
        os.path.join(agent_root_dir, 'config.py')
    ))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testHostnameRes(self):
        # Testing hostname resolution mechanisms
        hostname = socket.gethostname()
        try:
            ip = socket.gethostbyname(hostname)
        except socket.error:
            # Probably name lookup wasn't set up right; skip this test
            return
        self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
        try:
            hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
        except socket.error:
            # Probably a similar problem as above; skip this test
            return
        all_host_names = [hostname, hname] + aliases
        fqhn = socket.getfqdn(ip)
        if not fqhn in all_host_names:
            self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def setUp(self):
        self.real_getfqdn = socket.getfqdn
        socket.getfqdn = mock_socket.getfqdn
        # temporarily replace sys.stdout to capture DebuggingServer output
        self.old_stdout = sys.stdout
        self.output = io.StringIO()
        sys.stdout = self.output

        self.serv_evt = threading.Event()
        self.client_evt = threading.Event()
        # Capture SMTPChannel debug output
        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
        smtpd.DEBUGSTREAM = io.StringIO()
        # Pick a random unused port by passing 0 for the port number
        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1))
        # Keep a note of what port was assigned
        self.port = self.serv.socket.getsockname()[1]
        serv_args = (self.serv, self.serv_evt, self.client_evt)
        self.thread = threading.Thread(target=debugging_server, args=serv_args)
        self.thread.start()

        # wait until server thread has assigned a port number
        self.serv_evt.wait()
        self.serv_evt.clear()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def setUp(self):
        self.real_getfqdn = socket.getfqdn
        socket.getfqdn = mock_socket.getfqdn
        self.serv_evt = threading.Event()
        self.client_evt = threading.Event()
        # Pick a random unused port by passing 0 for the port number
        self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1))
        # Keep a note of what port was assigned
        self.port = self.serv.socket.getsockname()[1]
        serv_args = (self.serv, self.serv_evt, self.client_evt)
        self.thread = threading.Thread(target=debugging_server, args=serv_args)
        self.thread.start()

        # wait until server thread has assigned a port number
        self.serv_evt.wait()
        self.serv_evt.clear()
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def __init__(self):
        super(TransformService, self).__init__()

        self.coordinator = None

        self.group = CONF.service.coordinator_group

        # A unique name used for establishing election candidacy
        self.my_host_name = socket.getfqdn()

        # periodic check
        leader_check = loopingcall.FixedIntervalLoopingCall(
            self.periodic_leader_check)
        leader_check.start(interval=float(
            CONF.service.election_polling_frequency))
项目:transfert    作者:rbernand    | 项目源码 | 文件源码
def _connect(self):
        self._transport = paramiko.Transport((self.url.host, self.url.port or self._DEFAULT_PORT))
        self._transport.connect(self._get_hostkey(),
                                self.url.user,
                                self.url.password,
                                gss_host=socket.getfqdn(self.url.host),
                                gss_auth=self.GSS_AUTH,
                                gss_kex=self.GSS_KEX)
        self.__client = paramiko.SFTPClient.from_transport(self._transport)
        self.__client.chdir()
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def __init__(self, server, conn, addr):
        asynchat.async_chat.__init__(self, conn)
        self.__server = server
        self.__conn = conn
        self.__addr = addr
        self.__line = []
        self.__state = self.COMMAND
        self.__greeting = 0
        self.__mailfrom = None
        self.__rcpttos = []
        self.__data = ''
        self.__fqdn = socket.getfqdn()
        try:
            self.__peer = conn.getpeername()
        except socket.error, err:
            # a race condition  may occur if the other end is closing
            # before we can get the peername
            self.close()
            if err[0] != errno.ENOTCONN:
                raise
            return
        print >> DEBUGSTREAM, 'Peer:', repr(self.__peer)
        self.push('220 %s %s' % (self.__fqdn, __version__))
        self.set_terminator('\r\n')

    # Overrides base class for convenience