我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用zmq.FORWARDER。
def execute_command_forwarder(): from oldspeak.console.parsers.streamer import parser args = parser.parse_args(get_sub_parser_argv()) bootstrap_conf_with_gevent(args) device = Device(zmq.FORWARDER, zmq.SUB, zmq.PUB) device.bind_in(args.subscriber) device.bind_out(args.publisher) device.setsockopt_in(zmq.SUBSCRIBE, b'') if args.subscriber_hwm: device.setsockopt_in(zmq.RCVHWM, args.subscriber_hwm) if args.publisher_hwm: device.setsockopt_out(zmq.SNDHWM, args.publisher_hwm) print "oldspeak forwarder started" print "date", datetime.utcnow().isoformat() print "subscriber", (getattr(args, 'subscriber')) print "publisher", (getattr(args, 'publisher')) device.start()
def test_device_types(self): for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE): dev = devices.Device(devtype, zmq.PAIR, zmq.PAIR) self.assertEqual(dev.device_type, devtype) del dev
def start_dispatch_thread(): global INITED, DISPATCHER if INITED: return DISPATCHER = zmq.devices.ThreadDevice(zmq.FORWARDER, zmq.XSUB, zmq.XPUB) DISPATCHER.bind_in(INTERNAL_SOCKET) DISPATCHER.connect_out(CHANGES_SOCKET) DISPATCHER.setsockopt_in(zmq.IDENTITY, b'XSUB') DISPATCHER.setsockopt_out(zmq.IDENTITY, b'XPUB') DISPATCHER.start() #Fix weird nosetests problems. TODO: find and fix underlying problem sleep(0.01) INITED = True