我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用twisted.internet.reactor.connectUNIXDatagram()。
def testExchange(self): clientaddr = self.mktemp() serveraddr = self.mktemp() sp = ServerProto() cp = ClientProto() s = reactor.listenUNIXDatagram(serveraddr, sp) c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress = clientaddr) d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted]) def write(ignored): cp.transport.write("hi") return defer.gatherResults([sp.deferredGotWhat, cp.deferredGotBack]) def cleanup(ignored): d1 = defer.maybeDeferred(s.stopListening) d1.addCallback(lambda x : os.unlink(clientaddr)) d2 = defer.maybeDeferred(c.stopListening) d2.addCallback(lambda x : os.unlink(serveraddr)) return defer.gatherResults([d1, d2]) def _cbTestExchange(ignored): self.failUnlessEqual("hi", sp.gotwhat) self.failUnlessEqual(clientaddr, sp.gotfrom) self.failUnlessEqual("hi back", cp.gotback) d.addCallback(write) d.addCallback(cleanup) d.addCallback(_cbTestExchange) return d
def openClientMode(self, iface=''): try: self._lport = reactor.connectUNIXDatagram(iface, self) except Exception: raise error.CarrierError(sys.exc_info()[1]) return self
def test_exchange(self): """ Test that a datagram can be sent to and received by a server and vice versa. """ clientaddr = self.mktemp() serveraddr = self.mktemp() sp = ServerProto() cp = ClientProto() s = reactor.listenUNIXDatagram(serveraddr, sp) self.addCleanup(s.stopListening) c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress=clientaddr) self.addCleanup(c.stopListening) d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted]) def write(ignored): cp.transport.write(b"hi") return defer.gatherResults([sp.deferredGotWhat, cp.deferredGotBack]) def _cbTestExchange(ignored): self.assertEqual(b"hi", sp.gotwhat) self.assertEqual(clientaddr, sp.gotfrom) self.assertEqual(b"hi back", cp.gotback) d.addCallback(write) d.addCallback(_cbTestExchange) return d