Python twisted.internet.reactor 模块,disconnectAll() 实例源码

我们从Python开源项目中,提取了以下2个代码示例,用于说明如何使用twisted.internet.reactor.disconnectAll()

项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_directConnectionLostCall(self):
        """
        If C{connectionLost} is called directly on a port object, it succeeds
        (and doesn't expect the presence of a C{deferred} attribute).

        C{connectionLost} is called by L{reactor.disconnectAll} at shutdown.
        """
        serverFactory = MyServerFactory()
        port = reactor.listenTCP(0, serverFactory, interface="127.0.0.1")
        portNumber = port.getHost().port
        port.connectionLost(None)

        client = MyClientFactory()
        serverFactory.protocolConnectionMade = defer.Deferred()
        client.protocolConnectionMade = defer.Deferred()
        reactor.connectTCP("127.0.0.1", portNumber, client)
        def check(ign):
            client.reason.trap(error.ConnectionRefusedError)
        return client.failDeferred.addCallback(check)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_exceptInConnectionLostCall(self):
        """
        If C{connectionLost} is called directory on a port object and that the
        server factory raises an exception in C{stopFactory}, the exception is
        passed through to the caller.

        C{connectionLost} is called by L{reactor.disconnectAll} at shutdown.
        """
        serverFactory = MyServerFactory()
        def raiseException():
            raise RuntimeError("An error")
        serverFactory.stopFactory = raiseException
        port = reactor.listenTCP(0, serverFactory, interface="127.0.0.1")
        self.assertRaises(RuntimeError, port.connectionLost, None)