我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用zmq.proxy()。
def router_main(_, pidx, args): log = get_logger('examples.zmqserver.extra', pidx) ctx = zmq.Context() ctx.linger = 0 in_sock = ctx.socket(zmq.PULL) in_sock.bind('tcp://*:5000') out_sock = ctx.socket(zmq.PUSH) out_sock.bind('ipc://example-events') try: log.info('router proxy started') zmq.proxy(in_sock, out_sock) except KeyboardInterrupt: pass except: log.exception('unexpected error') finally: log.info('router proxy terminated') in_sock.close() out_sock.close() ctx.term()
def run(self): self.log.debug("Broker starts XPUB:{}, XSUB:{}" .format(self.xpub_url, self.xsub_url)) # self.proxy.start() poller = zmq.Poller() poller.register(self.xpub, zmq.POLLIN) poller.register(self.xsub, zmq.POLLIN) self.running = True while self.running: events = dict(poller.poll(1000)) if self.xpub in events: message = self.xpub.recv_multipart() self.log.debug("subscription message: {}".format(message[0])) self.xsub.send_multipart(message) if self.xsub in events: message = self.xsub.recv_multipart() self.log.debug("publishing message: {}".format(message)) self.xpub.send_multipart(message)
def run_device(self): ins,outs,mons = self._setup_sockets() zmq.proxy(ins, outs, mons)
def _proxyThread(logger, master, frontend, backend, url_frontend, url_backend): logger.debug("Routing from " + url_frontend + " to " + url_backend) zmq.proxy(frontend, backend)
def __init__(self, xpub="tcp://127.0.0.1:8990", xsub="tcp://127.0.0.1:8989"): self.log = logging.getLogger("{module}.{name}".format( module=self.__class__.__module__, name=self.__class__.__name__)) super(Broker, self).__init__() self.running = False self.xpub_url = xpub self.xsub_url = xsub self.ctx = zmq.Context() self.xpub = self.ctx.socket(zmq.XPUB) self.xpub.bind(self.xpub_url) self.xsub = self.ctx.socket(zmq.XSUB) self.xsub.bind(self.xsub_url) # self.proxy = zmq.proxy(xpub, xsub)
def _start_mpbs_broker(self): ''' Multi-Publisher-Multi-Subscriber ''' context = zmq.Context() front = context.socket(zmq.SUB) # @UndefinedVariable front.setsockopt(zmq.SUBSCRIBE, b"") # @UndefinedVariable front.bind(self.frontend) back = context.socket(zmq.PUB) # @UndefinedVariable back.bind(self.backend) zmq.proxy(front, back) # @UndefinedVariable
def _start_mpup_broker(self): ''' Multi-Pusher-Mutli-Puller ''' context = zmq.Context() front = context.socket(zmq.PULL) # @UndefinedVariable front.bind(self.frontend) back = context.socket(zmq.PUSH) # @UndefinedVariable back.bind(self.backend) zmq.proxy(front, back) # @UndefinedVariable
def _start_mrer_broker(self): ''' Multi-Req-Mutli-Rep ''' zc = zmq.Context() front = zc.socket(zmq.ROUTER) # @UndefinedVariable front.bind(self.frontend) back = zc.socket(zmq.DEALER) # @UndefinedVariable back.bind(self.backend) zmq.proxy(front, back) # @UndefinedVariable
def start_forwarder(pub_port, receive_port, mon_port=None, backend_socket=None, frontend_socket=None): """Start a zeromq proxy for forwarding messages from TCP socket to zmq PUB socket :param int pub_port: port number to use for publishing messages to workers :param int receive_port: port number to use for receiving messages :param int mon_port (optional): port to use for monitor socket :param str backend_socket (optionnal): socket type to use for backend socket :param str frontend_socket (optionnal): socket type to use for frontend socket """ context = zmq.Context() if frontend_socket is not None: try: frontend_socket = getattr(zmq, frontend_socket.upper()) except AttributeError: frontend_socket = zmq.PUB log.warning("Bad frontend type provided :{}\nForwarder will use default PUB type".format(frontend_socket)) else: frontend_socket = zmq.PUB frontend = context.socket(frontend_socket) frontend.bind("tcp://*:{}".format(pub_port)) if backend_socket is not None: try: backend_socket = getattr(zmq, backend_socket.upper()) except AttributeError: backend_socket = zmq.STREAM log.warning("Bad backend type provided :{}\nForwarder will use default STREAM type".format(backend_socket)) else: backend_socket = zmq.STREAM backend = context.socket(backend_socket) backend.bind("tcp://*:{}".format(receive_port)) if mon_port is not None: monitor = context.socket(zmq.PUB) monitor.bind("tcp://*:{}".format(mon_port)) log.info("Starting forwarder") log.info("frontend: {}\tbackend: {}\tmonitor: {}".format(pub_port, receive_port, mon_port)) zmq.proxy(frontend, backend, monitor) else: log.info("Starting forwarder") log.info("frontend: {}\tbackend: {}".format(pub_port, receive_port)) zmq.proxy(frontend, backend)