我们从Python开源项目中,提取了以下2个代码示例,用于说明如何使用twisted.internet.reactor.disconnectAll()。
def test_directConnectionLostCall(self): """ If C{connectionLost} is called directly on a port object, it succeeds (and doesn't expect the presence of a C{deferred} attribute). C{connectionLost} is called by L{reactor.disconnectAll} at shutdown. """ serverFactory = MyServerFactory() port = reactor.listenTCP(0, serverFactory, interface="127.0.0.1") portNumber = port.getHost().port port.connectionLost(None) client = MyClientFactory() serverFactory.protocolConnectionMade = defer.Deferred() client.protocolConnectionMade = defer.Deferred() reactor.connectTCP("127.0.0.1", portNumber, client) def check(ign): client.reason.trap(error.ConnectionRefusedError) return client.failDeferred.addCallback(check)
def test_exceptInConnectionLostCall(self): """ If C{connectionLost} is called directory on a port object and that the server factory raises an exception in C{stopFactory}, the exception is passed through to the caller. C{connectionLost} is called by L{reactor.disconnectAll} at shutdown. """ serverFactory = MyServerFactory() def raiseException(): raise RuntimeError("An error") serverFactory.stopFactory = raiseException port = reactor.listenTCP(0, serverFactory, interface="127.0.0.1") self.assertRaises(RuntimeError, port.connectionLost, None)