Python zmq 模块,eventloop() 实例源码

我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用zmq.eventloop()

项目:eventdriventalk    作者:cachedout    | 项目源码 | 文件源码
def __init__(self, opts=None):
        if opts is None:
            self.opts = self.process_config(CONFIG_LOCATION)
        else:
            self.opts = opts

        self.ctx = zmq.Context()
        self.pub_socket = self.ctx.socket(zmq.PUB)

        self.pub_socket.bind('tcp://127.0.0.1:2000')

        self.loop = zmq.eventloop.IOLoop.instance()
        self.pub_stream = zmq.eventloop.zmqstream.ZMQStream(self.pub_socket, self.loop)

        # Now create PULL socket over IPC to listen to reactor

        self.pull_socket = self.ctx.socket(zmq.PULL)
        self.pull_socket.bind('ipc:///tmp/reactor.ipc')
        self.pull_stream = zmq.eventloop.zmqstream.ZMQStream(self.pull_socket, self.loop)

        self.pull_stream.on_recv(self.republish)
项目:eventdriventalk    作者:cachedout    | 项目源码 | 文件源码
def __init__(self, opts=None):
        if opts is None:
            self.opts = self.process_config(CONFIG_LOCATION)
        else:
            self.opts = opts

        # Start setting up ZeroMQ
        self.ctx = zmq.Context()
        self.socket = self.ctx.socket(zmq.SUB)

        self.socket.connect('tcp://localhost:2000')

        self.loop = zmq.eventloop.IOLoop.instance()
        self.stream = zmq.eventloop.zmqstream.ZMQStream(self.socket, self.loop)
        self.stream.on_recv(act)

        # Load up actions
        self.actions = loader.load_actions(self.opts, '/home/mp/devel/eventdrivetalk/actions')
项目:odr-stream-router    作者:digris    | 项目源码 | 文件源码
def run(self):

        for i in self._inputs:
            i.stream.on_recv_stream(partial(self.route, input=i))

        # self.telnet_server = TelnetServer(router=self)
        # self.telnet_server.listen(4444)

        if self._telnet_server:
            self._telnet_server.start()

        # Start the tornado ioloop
        # http://pyzmq.readthedocs.io/en/latest/eventloop.html#futures-and-coroutines
        ioloop.IOLoop.instance().start()

        while True:
            pass
项目:eventdriventalk    作者:cachedout    | 项目源码 | 文件源码
def __init__(self, opts=None):
        if opts is None:
            self.opts = self.process_config(CONFIG_LOCATION)
        else:
            self.opts = opts
        return
        # General setup of ZeroMQ
        self.ctx = zmq.Context()
        self.loop = zmq.eventloop.IOLoop.instance()

        # Begin setup of PULL socket
        self.pull_socket = self.ctx.socket(zmq.PULL)
        self.pull_socket.bind('tcp://127.0.0.1:2001')

        self.pull_stream = zmq.eventloop.zmqstream.ZMQStream(self.pull_socket, self.loop)
        self.pull_stream.on_recv(self.stream_decode)

        # Begin setup of PUSH socket for IPC to publisher
        self.push_socket = self.ctx.socket(zmq.PUSH)
        self.push_socket.connect('ipc:///tmp/reactor.ipc')

        self.push_stream = zmq.eventloop.zmqstream.ZMQStream(self.push_socket, self.loop)

        self.actions = loader.load_actions(self.opts, '/home/mp/devel/eventdriventalk/actions')
        self.registers = loader.load_registers(self.opts, '/home/mp/devel/eventdriventalk/registers')
        self.rules = loader.load_registers(self.opts, '/home/mp/devel/eventdriventalk/rules')
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_eventloop(self):
        """test eventloop imports"""
        import zmq.eventloop
        from zmq.eventloop import ioloop
        from zmq.eventloop import zmqstream
        from zmq.eventloop.minitornado.platform import auto
        from zmq.eventloop.minitornado import ioloop
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_eventloop(self):
        """test eventloop imports"""
        import zmq.eventloop
        from zmq.eventloop import ioloop
        from zmq.eventloop import zmqstream
        from zmq.eventloop.minitornado.platform import auto
        from zmq.eventloop.minitornado import ioloop
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_eventloop(self):
        """test eventloop imports"""
        import zmq.eventloop
        from zmq.eventloop import ioloop
        from zmq.eventloop import zmqstream
        from zmq.eventloop.minitornado.platform import auto
        from zmq.eventloop.minitornado import ioloop
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_eventloop(self):
        """test eventloop imports"""
        import zmq.eventloop
        from zmq.eventloop import ioloop
        from zmq.eventloop import zmqstream
        from zmq.eventloop.minitornado.platform import auto
        from zmq.eventloop.minitornado import ioloop
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_eventloop(self):
        """test eventloop imports"""
        import zmq.eventloop
        from zmq.eventloop import ioloop
        from zmq.eventloop import zmqstream
        from zmq.eventloop.minitornado.platform import auto
        from zmq.eventloop.minitornado import ioloop
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_eventloop(self):
        """test eventloop imports"""
        import zmq.eventloop
        from zmq.eventloop import ioloop
        from zmq.eventloop import zmqstream
        from zmq.eventloop.minitornado.platform import auto
        from zmq.eventloop.minitornado import ioloop