Python twisted.internet.reactor 模块,listenUNIXDatagram() 实例源码

我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用twisted.internet.reactor.listenUNIXDatagram()

项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _reprTest(self, serverProto, protocolName):
        """
        Test the C{__str__} and C{__repr__} implementations of a UNIX datagram
        port when used with the given protocol.
        """
        filename = self.mktemp()
        unixPort = reactor.listenUNIXDatagram(filename, serverProto)

        connectedString = "<%s on %r>" % (protocolName, filename)
        self.assertEqual(repr(unixPort), connectedString)
        self.assertEqual(str(unixPort), connectedString)

        stopDeferred = defer.maybeDeferred(unixPort.stopListening)
        def stoppedListening(ign):
            unconnectedString = "<%s (not listening)>" % (protocolName,)
            self.assertEqual(repr(unixPort), unconnectedString)
            self.assertEqual(str(unixPort), unconnectedString)
        stopDeferred.addCallback(stoppedListening)
        return stopDeferred
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_reprWithClassicProtocol(self):
        """
        The two string representations of the L{IListeningPort} returned by
        L{IReactorUNIXDatagram.listenUNIXDatagram} contains the name of the
        classic protocol class being used and the filename on which the port is
        listening or indicates that the port is not listening.
        """
        class ClassicProtocol:
            def makeConnection(self, transport):
                pass

            def doStop(self):
                pass

        # Sanity check
        self.assertIsInstance(ClassicProtocol, types.ClassType)

        return self._reprTest(
            ClassicProtocol(), "twisted.test.test_unix.ClassicProtocol")
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_reprWithNewStyleProtocol(self):
        """
        The two string representations of the L{IListeningPort} returned by
        L{IReactorUNIXDatagram.listenUNIXDatagram} contains the name of the
        new-style protocol class being used and the filename on which the port
        is listening or indicates that the port is not listening.
        """
        class NewStyleProtocol(object):
            def makeConnection(self, transport):
                pass

            def doStop(self):
                pass

        # Sanity check
        self.assertIsInstance(NewStyleProtocol, type)

        return self._reprTest(
            NewStyleProtocol(), "twisted.test.test_unix.NewStyleProtocol")
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testExchange(self):
        clientaddr = self.mktemp()
        serveraddr = self.mktemp()
        sp = ServerProto()
        cp = ClientProto()
        s = reactor.listenUNIXDatagram(serveraddr, sp)
        c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress = clientaddr)

        d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
        def write(ignored):
            cp.transport.write("hi")
            return defer.gatherResults([sp.deferredGotWhat,
                                        cp.deferredGotBack])

        def cleanup(ignored):
            d1 = defer.maybeDeferred(s.stopListening)
            d1.addCallback(lambda x : os.unlink(clientaddr))
            d2 = defer.maybeDeferred(c.stopListening)
            d2.addCallback(lambda x : os.unlink(serveraddr))
            return defer.gatherResults([d1, d2])

        def _cbTestExchange(ignored):
            self.failUnlessEqual("hi", sp.gotwhat)
            self.failUnlessEqual(clientaddr, sp.gotfrom)
            self.failUnlessEqual("hi back", cp.gotback)

        d.addCallback(write)
        d.addCallback(cleanup)
        d.addCallback(_cbTestExchange)
        return d
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testCannotListen(self):
        addr = self.mktemp()
        p = ServerProto()
        s = reactor.listenUNIXDatagram(addr, p)
        self.failUnlessRaises(error.CannotListenError, reactor.listenUNIXDatagram, addr, p)
        s.stopListening()
        os.unlink(addr)
    # test connecting to bound and connected (somewhere else) address
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testRepr(self):
        filename = self.mktemp()
        f = ServerProto()
        p = reactor.listenUNIXDatagram(filename, f)
        self.failIf(str(p).find(filename) == -1)

        def stoppedListening(ign):
            self.failIf(str(p).find(filename) != -1)

        return defer.maybeDeferred(p.stopListening).addCallback(stoppedListening)
项目:pysnmp    作者:etingof    | 项目源码 | 文件源码
def openServerMode(self, iface):
        try:
            self._lport = reactor.listenUNIXDatagram(iface, self)
        except Exception:
            raise error.CarrierError(sys.exc_info()[1])

        return self
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def testExchange(self):
        clientaddr = self.mktemp()
        serveraddr = self.mktemp()
        sp = ServerProto()
        cp = ClientProto()
        s = reactor.listenUNIXDatagram(serveraddr, sp)
        c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress = clientaddr)

        d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
        def write(ignored):
            cp.transport.write("hi")
            return defer.gatherResults([sp.deferredGotWhat,
                                        cp.deferredGotBack])

        def cleanup(ignored):
            d1 = defer.maybeDeferred(s.stopListening)
            d1.addCallback(lambda x : os.unlink(clientaddr))
            d2 = defer.maybeDeferred(c.stopListening)
            d2.addCallback(lambda x : os.unlink(serveraddr))
            return defer.gatherResults([d1, d2])

        def _cbTestExchange(ignored):
            self.failUnlessEqual("hi", sp.gotwhat)
            self.failUnlessEqual(clientaddr, sp.gotfrom)
            self.failUnlessEqual("hi back", cp.gotback)

        d.addCallback(write)
        d.addCallback(cleanup)
        d.addCallback(_cbTestExchange)
        return d
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def testCannotListen(self):
        addr = self.mktemp()
        p = ServerProto()
        s = reactor.listenUNIXDatagram(addr, p)
        self.failUnlessRaises(error.CannotListenError, reactor.listenUNIXDatagram, addr, p)
        s.stopListening()
        os.unlink(addr)
    # test connecting to bound and connected (somewhere else) address
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def testRepr(self):
        filename = self.mktemp()
        f = ServerProto()
        p = reactor.listenUNIXDatagram(filename, f)
        self.failIf(str(p).find(filename) == -1)

        def stoppedListening(ign):
            self.failIf(str(p).find(filename) != -1)

        return defer.maybeDeferred(p.stopListening).addCallback(stoppedListening)
项目:kekescan    作者:xiaoxiaoleo    | 项目源码 | 文件源码
def openServerMode(self, iface=None):
        try:
            self._lport = reactor.listenUNIXDatagram(iface, self)
        except Exception:
            raise error.CarrierError(sys.exc_info()[1])

        return self
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_exchange(self):
        """
        Test that a datagram can be sent to and received by a server and vice
        versa.
        """
        clientaddr = self.mktemp()
        serveraddr = self.mktemp()
        sp = ServerProto()
        cp = ClientProto()
        s = reactor.listenUNIXDatagram(serveraddr, sp)
        self.addCleanup(s.stopListening)
        c = reactor.connectUNIXDatagram(serveraddr, cp, bindAddress=clientaddr)
        self.addCleanup(c.stopListening)

        d = defer.gatherResults([sp.deferredStarted, cp.deferredStarted])
        def write(ignored):
            cp.transport.write(b"hi")
            return defer.gatherResults([sp.deferredGotWhat,
                                        cp.deferredGotBack])

        def _cbTestExchange(ignored):
            self.assertEqual(b"hi", sp.gotwhat)
            self.assertEqual(clientaddr, sp.gotfrom)
            self.assertEqual(b"hi back", cp.gotback)

        d.addCallback(write)
        d.addCallback(_cbTestExchange)
        return d
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_cannotListen(self):
        """
        L{IReactorUNIXDatagram.listenUNIXDatagram} raises
        L{error.CannotListenError} if the unix socket specified is already in
        use.
        """
        addr = self.mktemp()
        p = ServerProto()
        s = reactor.listenUNIXDatagram(addr, p)
        self.assertRaises(error.CannotListenError,
                          reactor.listenUNIXDatagram, addr, p)
        s.stopListening()
        os.unlink(addr)

    # test connecting to bound and connected (somewhere else) address
项目:PyQYT    作者:collinsctk    | 项目源码 | 文件源码
def openServerMode(self, iface=None):
        try:
            self._lport = reactor.listenUNIXDatagram(iface, self)
        except Exception:
            raise error.CarrierError(sys.exc_info()[1])

        return self