Python socket 模块,getdefaulttimeout() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用socket.getdefaulttimeout()

项目:enigma2    作者:OpenLD    | 项目源码 | 文件源码
def checkTraficLight(self):
        self.activityTimer.callback.remove(self.checkTraficLight)
        self.activityTimer.start(100, False)

        from urllib import urlopen
        import socket
        import os
        currentTimeoutDefault = socket.getdefaulttimeout()
        socket.setdefaulttimeout(3)
        message = ""
        picon = None
        default = True
        socket.setdefaulttimeout(currentTimeoutDefault)
        if default:
            self.showDisclaimer()
        else:
            message += "\n" + _("Do you want to update your receiver?")
            self.session.openWithCallback(self.startActualUpdate, MessageBox, message, default = default, picon = picon)
项目:enigma2    作者:OpenLD    | 项目源码 | 文件源码
def ipkgCallback(self, event, param):
        if event == IpkgComponent.EVENT_DONE:
            if self.updating:
                self.updating = False
                self.ipkg.startCmd(IpkgComponent.CMD_UPGRADE_LIST)
            elif self.ipkg.currentCommand == IpkgComponent.CMD_UPGRADE_LIST:
                self.total_packages = len(self.ipkg.getFetchedList())
                print ('[OnlineVersionCheck] %s Updates available' % self.total_packages)
                if self.total_packages:
                    from urllib import urlopen
                    import socket
                    currentTimeoutDefault = socket.getdefaulttimeout()
                    socket.setdefaulttimeout(3)
                    config.softwareupdate.updatefound.setValue(True)
                    try:
                        config.softwareupdate.updateisunstable.setValue(urlopen("http://odisealinux.com/feeds/" + getImageVersion() + "/status").read())
                    except:
                        config.softwareupdate.updateisunstable.setValue(1)
                    socket.setdefaulttimeout(currentTimeoutDefault)
                else:
                    config.softwareupdate.updatefound.setValue(False)
            else:
                config.softwareupdate.updatefound.setValue(False)
        pass
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def build_http():
  """Builds httplib2.Http object

  Returns:
  A httplib2.Http object, which is used to make http requests, and which has timeout set by default.
  To override default timeout call

    socket.setdefaulttimeout(timeout_in_sec)

  before interacting with this method.
  """
  if socket.getdefaulttimeout() is not None:
    http_timeout = socket.getdefaulttimeout()
  else:
    http_timeout = DEFAULT_HTTP_TIMEOUT_SEC
  return httplib2.Http(timeout=http_timeout)
项目:SameKeyProxy    作者:xzhou    | 项目源码 | 文件源码
def __recv_msg_compat(sock,size,timeout):   # compatibility implementation for non-MSG_WAITALL / M2Crypto
    msglen=0
    msglist=[]
    # Receive chunks of max. 60kb size:
    # (rather arbitrary limit, but it avoids memory/buffer problems on certain OSes -- VAX/VMS, Windows)
    while msglen<size:
        chunk=sock.recv(min(60000,size-msglen))
        if not chunk:
            if hasattr(sock,'pending'):
                # m2crypto ssl socket - they have problems with a defaulttimeout
                if socket.getdefaulttimeout() != None:
                    raise ConnectionClosedError("m2crypto SSL can't be used when socket.setdefaulttimeout() has been set")
            err = ConnectionClosedError('connection lost')
            err.partialMsg=''.join(msglist)    # store the message that was received until now
            raise err
        msglist.append(chunk)
        msglen+=len(chunk)
    return ''.join(msglist)


# Send a message over a socket. Raises ConnectionClosedError if the msg
# couldn't be sent (the connection has probably been lost then).
# We need this because 'send' isn't guaranteed to send all desired
# bytes in one call, for instance, when network load is high.
项目:enigma2    作者:Openeight    | 项目源码 | 文件源码
def getLatestImageTimestamp(self):
        currentTimeoutDefault = socket.getdefaulttimeout()
        socket.setdefaulttimeout(3)
        try:
            # TODO: Use Twisted's URL fetcher, urlopen is evil. And it can
            # run in parallel to the package update.
            from time import strftime
            from datetime import datetime
            imageVersion = about.getImageTypeString().split(" ")[1]
            imageVersion = (int(imageVersion) < 5 and "%.1f" or "%s") % int(imageVersion)
            url = "http://openpli.org/download/timestamp/%s~%s" % (HardwareInfo().get_device_model(), imageVersion)
            try:
                latestImageTimestamp = datetime.fromtimestamp(int(urlopen(url, timeout=5).read())).strftime(_("%Y-%m-%d %H:%M"))
            except:
                # OpenPli 5.0 uses python 2.7.11 and here we need to bypass the certificate check
                from ssl import _create_unverified_context
                latestImageTimestamp = datetime.fromtimestamp(int(urlopen(url, timeout=5, context=_create_unverified_context()).read())).strftime(_("%Y-%m-%d %H:%M"))
        except:
            latestImageTimestamp = ""
        socket.setdefaulttimeout(currentTimeoutDefault)
        return latestImageTimestamp
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def build_http():
  """Builds httplib2.Http object

  Returns:
  A httplib2.Http object, which is used to make http requests, and which has timeout set by default.
  To override default timeout call

    socket.setdefaulttimeout(timeout_in_sec)

  before interacting with this method.
  """
  if socket.getdefaulttimeout() is not None:
    http_timeout = socket.getdefaulttimeout()
  else:
    http_timeout = DEFAULT_HTTP_TIMEOUT_SEC
  return httplib2.Http(timeout=http_timeout)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_timeout(self):
        old_timeout = socket.getdefaulttimeout()
        try:
            socket.setdefaulttimeout(0.1)
            parent, child = multiprocessing.Pipe(duplex=True)
            l = multiprocessing.connection.Listener(family='AF_INET')
            p = multiprocessing.Process(target=self._test_timeout,
                                        args=(child, l.address))
            p.start()
            child.close()
            self.assertEqual(parent.recv(), 123)
            parent.close()
            conn = l.accept()
            self.assertEqual(conn.recv(), 456)
            conn.close()
            l.close()
            p.join(10)
        finally:
            socket.setdefaulttimeout(old_timeout)

#
# Test what happens with no "if __name__ == '__main__'"
#
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def socket_timeout(timeout=15):
    def _socket_timeout(func):
        def _socket_timeout(*args, **kwargs):
            old_timeout = socket.getdefaulttimeout()
            socket.setdefaulttimeout(timeout)
            try:
                return func(*args, **kwargs)
            finally:
                socket.setdefaulttimeout(old_timeout)

        return _socket_timeout

    return _socket_timeout
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def socket_timeout(timeout=15):
    def _socket_timeout(func):
        def _socket_timeout(*args, **kwargs):
            old_timeout = socket.getdefaulttimeout()
            socket.setdefaulttimeout(timeout)
            try:
                return func(*args, **kwargs)
            finally:
                socket.setdefaulttimeout(old_timeout)

        return _socket_timeout

    return _socket_timeout
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _new_fixed_fetch(validate_certificate):
        def fixed_fetch(url, payload=None, method="GET", headers={},
                        allow_truncated=False, follow_redirects=True,
                        deadline=None):
            if deadline is None:
                deadline = socket.getdefaulttimeout() or 5
            return fetch(url, payload=payload, method=method, headers=headers,
                         allow_truncated=allow_truncated,
                         follow_redirects=follow_redirects, deadline=deadline,
                         validate_certificate=validate_certificate)
        return fixed_fetch
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def socket_timeout(timeout=15):
    def _socket_timeout(func):
        def _socket_timeout(*args, **kwargs):
            old_timeout = socket.getdefaulttimeout()
            socket.setdefaulttimeout(timeout)
            try:
                return func(*args, **kwargs)
            finally:
                socket.setdefaulttimeout(old_timeout)
        return _socket_timeout
    return _socket_timeout
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def socket_timeout(timeout=15):
    def _socket_timeout(func):
        def _socket_timeout(*args, **kwargs):
            old_timeout = socket.getdefaulttimeout()
            socket.setdefaulttimeout(timeout)
            try:
                return func(*args, **kwargs)
            finally:
                socket.setdefaulttimeout(old_timeout)

        return _socket_timeout

    return _socket_timeout
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def socket_timeout(timeout=15):
    def _socket_timeout(func):
        def _socket_timeout(*args, **kwargs):
            old_timeout = socket.getdefaulttimeout()
            socket.setdefaulttimeout(timeout)
            try:
                return func(*args, **kwargs)
            finally:
                socket.setdefaulttimeout(old_timeout)
        return _socket_timeout
    return _socket_timeout
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def socket_timeout(timeout=15):
    def _socket_timeout(func):
        def _socket_timeout(*args, **kwargs):
            old_timeout = socket.getdefaulttimeout()
            socket.setdefaulttimeout(timeout)
            try:
                return func(*args, **kwargs)
            finally:
                socket.setdefaulttimeout(old_timeout)
        return _socket_timeout
    return _socket_timeout
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)
项目:jira_worklog_scanner    作者:pgarneau    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)
项目:jira_worklog_scanner    作者:pgarneau    | 项目源码 | 文件源码
def socket_timeout(timeout=15):
    def _socket_timeout(func):
        def _socket_timeout(*args, **kwargs):
            old_timeout = socket.getdefaulttimeout()
            socket.setdefaulttimeout(timeout)
            try:
                return func(*args, **kwargs)
            finally:
                socket.setdefaulttimeout(old_timeout)

        return _socket_timeout

    return _socket_timeout
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def socket_timeout(timeout=15):
    def _socket_timeout(func):
        def _socket_timeout(*args, **kwargs):
            old_timeout = socket.getdefaulttimeout()
            socket.setdefaulttimeout(timeout)
            try:
                return func(*args, **kwargs)
            finally:
                socket.setdefaulttimeout(old_timeout)
        return _socket_timeout
    return _socket_timeout
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)
项目:quantrocket-client    作者:quantrocket-llc    | 项目源码 | 文件源码
def emit(self, record):
        orig_timeout = socket.getdefaulttimeout()
        socket.setdefaulttimeout(3)
        super(_ImpatientHttpHandler, self).emit(record)
        socket.setdefaulttimeout(orig_timeout)
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def _new_fixed_fetch(validate_certificate):
    def fixed_fetch(url, payload=None, method="GET", headers={},
                    allow_truncated=False, follow_redirects=True,
                    deadline=None):
        if deadline is None:
            deadline = socket.getdefaulttimeout()
        return fetch(url, payload=payload, method=method, headers=headers,
                     allow_truncated=allow_truncated,
                     follow_redirects=follow_redirects, deadline=deadline,
                     validate_certificate=validate_certificate)
    return fixed_fetch
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def socket_timeout(timeout=15):
    def _socket_timeout(func):
        def _socket_timeout(*args, **kwargs):
            old_timeout = socket.getdefaulttimeout()
            socket.setdefaulttimeout(timeout)
            try:
                return func(*args, **kwargs)
            finally:
                socket.setdefaulttimeout(old_timeout)

        return _socket_timeout

    return _socket_timeout
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def socket_timeout(timeout=15):
    def _socket_timeout(func):
        def _socket_timeout(*args, **kwargs):
            old_timeout = socket.getdefaulttimeout()
            socket.setdefaulttimeout(timeout)
            try:
                return func(*args, **kwargs)
            finally:
                socket.setdefaulttimeout(old_timeout)

        return _socket_timeout

    return _socket_timeout
项目:httplib2    作者:httplib2    | 项目源码 | 文件源码
def _new_fixed_fetch(validate_certificate):
    def fixed_fetch(url, payload=None, method="GET", headers={},
                    allow_truncated=False, follow_redirects=True,
                    deadline=None):
        if deadline is None:
            deadline = socket.getdefaulttimeout()
        return fetch(url, payload=payload, method=method, headers=headers,
                     allow_truncated=allow_truncated,
                     follow_redirects=follow_redirects, deadline=deadline,
                     validate_certificate=validate_certificate)
    return fixed_fetch
项目:ascii-art-py    作者:blinglnav    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)
项目:ascii-art-py    作者:blinglnav    | 项目源码 | 文件源码
def socket_timeout(timeout=15):
    def _socket_timeout(func):
        def _socket_timeout(*args, **kwargs):
            old_timeout = socket.getdefaulttimeout()
            socket.setdefaulttimeout(timeout)
            try:
                return func(*args, **kwargs)
            finally:
                socket.setdefaulttimeout(old_timeout)

        return _socket_timeout

    return _socket_timeout
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def _new_fixed_fetch(validate_certificate):
        def fixed_fetch(url, payload=None, method="GET", headers={},
                        allow_truncated=False, follow_redirects=True,
                        deadline=None):
            if deadline is None:
                deadline = socket.getdefaulttimeout() or 5
            return fetch(url, payload=payload, method=method, headers=headers,
                         allow_truncated=allow_truncated,
                         follow_redirects=follow_redirects, deadline=deadline,
                         validate_certificate=validate_certificate)
        return fixed_fetch
项目:ivaochdoc    作者:ivaoch    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)
项目:ivaochdoc    作者:ivaoch    | 项目源码 | 文件源码
def socket_timeout(timeout=15):
    def _socket_timeout(func):
        def _socket_timeout(*args, **kwargs):
            old_timeout = socket.getdefaulttimeout()
            socket.setdefaulttimeout(timeout)
            try:
                return func(*args, **kwargs)
            finally:
                socket.setdefaulttimeout(old_timeout)

        return _socket_timeout

    return _socket_timeout
项目:bpy_lambda    作者:bcongdon    | 项目源码 | 文件源码
def __init__(self, timeout = None):
            self.old_timeout = socket.getdefaulttimeout()
            self.timeout = timeout
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def socket_timeout(timeout=15):
    def _socket_timeout(func):
        def _socket_timeout(*args, **kwargs):
            old_timeout = socket.getdefaulttimeout()
            socket.setdefaulttimeout(timeout)
            try:
                return func(*args, **kwargs)
            finally:
                socket.setdefaulttimeout(old_timeout)

        return _socket_timeout

    return _socket_timeout
项目:django    作者:alexsukhrin    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def socket_timeout(timeout=15):
    def _socket_timeout(func):
        def _socket_timeout(*args, **kwargs):
            old_timeout = socket.getdefaulttimeout()
            socket.setdefaulttimeout(timeout)
            try:
                return func(*args, **kwargs)
            finally:
                socket.setdefaulttimeout(old_timeout)

        return _socket_timeout

    return _socket_timeout
项目:isni-reconcile    作者:cmh2166    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)
项目:isni-reconcile    作者:cmh2166    | 项目源码 | 文件源码
def socket_timeout(timeout=15):
    def _socket_timeout(func):
        def _socket_timeout(*args, **kwargs):
            old_timeout = socket.getdefaulttimeout()
            socket.setdefaulttimeout(timeout)
            try:
                return func(*args, **kwargs)
            finally:
                socket.setdefaulttimeout(old_timeout)
        return _socket_timeout
    return _socket_timeout
项目:AshsSDK    作者:thehappydinoa    | 项目源码 | 文件源码
def socket_timeout(seconds=15):
    cto = socket.getdefaulttimeout()
    try:
        socket.setdefaulttimeout(seconds)
        yield
    finally:
        socket.setdefaulttimeout(cto)