我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.ROUTER。
def test_hwm(self): zmq3 = zmq.zmq_version_info()[0] >= 3 for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER): s = self.context.socket(stype) s.hwm = 100 self.assertEqual(s.hwm, 100) if zmq3: try: self.assertEqual(s.sndhwm, 100) except AttributeError: pass try: self.assertEqual(s.rcvhwm, 100) except AttributeError: pass s.close()
def run(self): self._loop = zmq.asyncio.ZMQEventLoop() asyncio.set_event_loop(self._loop) self.context = zmq.asyncio.Context() self.status_sock = self.context.socket(zmq.ROUTER) self.data_sock = self.context.socket(zmq.PUB) self.status_sock.bind("tcp://*:%s" % self.status_port) self.data_sock.bind("tcp://*:%s" % self.data_port) self.poller = zmq.asyncio.Poller() self.poller.register(self.status_sock, zmq.POLLIN) self._loop.create_task(self.poll_sockets()) try: self._loop.run_forever() finally: self.status_sock.close() self.data_sock.close() self.context.destroy()
def __init__(self, data_dir=bqueryd.DEFAULT_DATA_DIR, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.DEBUG): if not os.path.exists(data_dir) or not os.path.isdir(data_dir): raise Exception("Datadir %s is not a valid directory" % data_dir) self.worker_id = binascii.hexlify(os.urandom(8)) self.node_name = socket.gethostname() self.data_dir = data_dir self.data_files = set() context = zmq.Context() self.socket = context.socket(zmq.ROUTER) self.socket.setsockopt(zmq.LINGER, 500) self.socket.identity = self.worker_id self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT) self.redis_server = redis.from_url(redis_url) self.controllers = {} # Keep a dict of timestamps when you last spoke to controllers self.check_controllers() self.last_wrm = 0 self.start_time = time.time() self.logger = bqueryd.logger.getChild('worker ' + self.worker_id) self.logger.setLevel(loglevel) self.msg_count = 0 signal.signal(signal.SIGTERM, self.term_signal())
def handle_in(self): self.msg_count_in += 1 data = self.socket.recv_multipart() binary, sender = None, None # initialise outside for edge cases if len(data) == 3: if data[1] == '': # This is a RPC call from a zmq.REQ socket sender, _blank, msg_buf = data self.handle_rpc(sender, msg_factory(msg_buf)) return sender, msg_buf, binary = data elif len(data) == 2: # This is an internode call from another zmq.ROUTER, a Controller or Worker sender, msg_buf = data msg = msg_factory(msg_buf) if binary: msg['data'] = binary if sender in self.others: self.handle_peer(sender, msg) else: self.handle_worker(sender, msg)
def __init__(self, port=None, min_port=5000, max_port=9999, pipeline=100, log_file=None): """Create a new ZMQDealer object. """ context = zmq.Context.instance() # noinspection PyUnresolvedReferences self.socket = context.socket(zmq.ROUTER) self.socket.hwm = pipeline if port is not None: self.socket.bind('tcp://*:%d' % port) self.port = port else: self.port = self.socket.bind_to_random_port('tcp://*', min_port=min_port, max_port=max_port) self.addr = None self._log_file = log_file if self._log_file is not None: self._log_file = os.path.abspath(self._log_file) # If log file directory does not exists, create it log_dir = os.path.dirname(self._log_file) if not os.path.exists(log_dir): os.makedirs(log_dir) # clears any existing log if os.path.exists(self._log_file): os.remove(self._log_file)
def __init__(self, bind_address, linger=-1, poll_timeout=2, loop=None): self.bind_address = bind_address self.loop = loop self.context = zmq.asyncio.Context() self.poll_timeout = poll_timeout self.socket = self.context.socket(zmq.ROUTER) self.socket.setsockopt(zmq.LINGER, linger) self.in_poller = zmq.asyncio.Poller() self.in_poller.register(self.socket, zmq.POLLIN) log.info('Bound to: ' + self.bind_address) self.socket.bind(self.bind_address) self._kill = False
def __init__(self, targname, cfg, isServer=False): self.targname = targname self.cfg = cfg self.isServer = isServer self.fnCallName = '' self.ctx = zmq.Context() self.ctx.linger = 100 if not self.isServer: self.sock = self.ctx.socket(zmq.DEALER) self.sock.linger = 100 self.sock.connect('tcp://%s:%s' % (self.cfg['server'],self.cfg.get('port',7677))) # this times out with EINVAL when no internet self.poller = zmq.Poller() self.poller.register(self.sock, zmq.POLLIN) else: self.sock = self.ctx.socket(zmq.ROUTER) self.sock.linger = 100 self.sock.bind('tcp://*:%s' % (self.cfg.get('port',7677))) self.poller = zmq.Poller() self.poller.register(self.sock, zmq.POLLIN) self.be = GetBackend(self.cfg['backend'])(self.targname, self.cfg) self.inTime = time.time() self.inactiveLimit = int(self.cfg.get('inactivelimit',0)) print 'inactivelimit ',self.inactiveLimit
def __init__(self, address, port, logger): """ Initialize new instance with given address and port. :param address: String representation of IP address to listen to or a hostname. :param port: String port where to listen. :param logger: System logger """ self._logger = logger context = zmq.Context() self._receiver = context.socket(zmq.ROUTER) self._receiver.setsockopt(zmq.IDENTITY, b"recodex-monitor") address = "tcp://{}:{}".format(address, port) self._receiver.bind(address) self._logger.info("zeromq server initialized at {}".format(address))
def __init__(self, name, send_qsize=0, mode='ipc'): self._name = name self._conn_info = None self._context_lock = threading.Lock() self._context = zmq.Context() self._tosock = self._context.socket(zmq.ROUTER) self._frsock = self._context.socket(zmq.PULL) self._tosock.set_hwm(10) self._frsock.set_hwm(10) self._dispatcher = CallbackManager() self._send_queue = queue.Queue(maxsize=send_qsize) self._rcv_thread = None self._snd_thread = None self._mode = mode assert mode in ('ipc', 'tcp')
def __init__(self, config): super().__init__(config) self.paused = False slave_queue = self.ctx.socket(zmq.ROUTER) slave_queue.ipv6 = True slave_queue.bind(config.slave_queue) self.register(slave_queue, self.handle_slave) self.status_queue = self.ctx.socket(zmq.PUSH) self.status_queue.hwm = 10 self.status_queue.connect(const.INT_STATUS_QUEUE) SlaveState.status_queue = self.status_queue self.builds_queue = self.ctx.socket(zmq.REQ) self.builds_queue.hwm = 1 self.builds_queue.connect(config.builds_queue) self.index_queue = self.ctx.socket(zmq.PUSH) self.index_queue.hwm = 10 self.index_queue.connect(config.index_queue) self.db = DbClient(config) self.fs = FsClient(config) self.slaves = {} self.pypi_simple = config.pypi_simple
def __init__(self, config): super().__init__(config) self.output_path = Path(config.output_path) TransferState.output_path = self.output_path file_queue = self.ctx.socket(zmq.ROUTER) file_queue.ipv6 = True file_queue.hwm = TransferState.pipeline_size * 50 file_queue.bind(config.file_queue) fs_queue = self.ctx.socket(zmq.REP) fs_queue.hwm = 1 fs_queue.bind(config.fs_queue) self.register(file_queue, self.handle_file) self.register(fs_queue, self.handle_fs_request) self.pending = {} # keyed by slave_id self.active = {} # keyed by slave address self.complete = {} # keyed by slave_id
def _receive_message(self): """ Internal coroutine for receiving messages """ while True: try: if self._socket.getsockopt(zmq.TYPE) == zmq.ROUTER: zmq_identity, msg_bytes = \ yield from self._socket.recv_multipart() self._received_from_identity(zmq_identity) self._dispatcher_queue.put_nowait( (zmq_identity, msg_bytes)) else: msg_bytes = yield from self._socket.recv() self._last_message_time = time.time() self._dispatcher_queue.put_nowait((None, msg_bytes)) except CancelledError: # The concurrent.futures.CancelledError is caught by asyncio # when the Task associated with the coroutine is cancelled. # The raise is required to stop this component. raise except Exception as e: # pylint: disable=broad-except LOGGER.exception("Received a message on address %s that " "caused an error: %s", self._address, e)
def open(self): # noinspection PyUnresolvedReferences self.listener = self.ctx.socket(zmq.ROUTER) # noinspection PyUnresolvedReferences # self.poller.register(self.listener, test.POLLIN) public, secret = self.selfEncKeys self.listener.curve_secretkey = secret self.listener.curve_publickey = public self.listener.curve_server = True self.listener.identity = self.publicKey logger.debug( '{} will bind its listener at {}'.format(self, self.ha[1])) set_keepalive(self.listener, self.config) set_zmq_internal_queue_length(self.listener, self.config) self.listener.bind( '{protocol}://*:{port}'.format( port=self.ha[1], protocol=ZMQ_NETWORK_PROTOCOL) )
def monitored_queue(in_socket, out_socket, mon_socket, in_prefix=b'in', out_prefix=b'out'): swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER poller = zmq.Poller() poller.register(in_socket, zmq.POLLIN) poller.register(out_socket, zmq.POLLIN) while True: events = dict(poller.poll()) if in_socket in events: _relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids) if out_socket in events: _relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids)
def test_term_hang(self): rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER) req.setsockopt(zmq.LINGER, 0) req.send(b'hello', copy=False) req.close() rep.close() self.context.term()
def test_default_mq_args(self): self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB) dev.setsockopt_in(zmq.LINGER, 0) dev.setsockopt_out(zmq.LINGER, 0) dev.setsockopt_mon(zmq.LINGER, 0) # this will raise if default args are wrong dev.start() self.teardown_device()
def test_mq_check_prefix(self): ins = self.context.socket(zmq.ROUTER) outs = self.context.socket(zmq.DEALER) mons = self.context.socket(zmq.PUB) self.sockets.extend([ins, outs, mons]) ins = unicode('in') outs = unicode('out') self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)
def Server(context): context = context or zmq.Context().instance() # Socket facing clients frontend = context.socket(zmq.ROUTER) frontend.bind("tcp://*:5559") # Socket facing services backend = context.socket(zmq.ROUTER) backend.bind("tcp://*:5560") print "zmq server running on localhost:5559/5560" poll_workers = zmq.Poller() poll_workers.register(backend, zmq.POLLIN) poll_both = zmq.Poller() poll_both.register(backend, zmq.POLLIN) poll_both.register(frontend, zmq.POLLIN) clients = [[]] while True: if clients: sockets = dict(poll_both.poll()) else: sockets = dict(poll_workers.poll()) if sockets.get(frontend) == ZMQ.POLLIN: clientRequest = frontend.recv_multipart() clients.append(clientRequest[0]) #push client into queue if sockets.get(backend) == ZMQ.POLLIN: #workers want data msg = backend.recv_multipart() workerIdentity = msg[0] clientIdentity,request = clients.pop(0) workRequest = [workerIdentity, '',request] backend.send_multipart(workRequest) yelpResponse = backend.recv_multipart()[2] #fulfill frontend request frontendResponse = [clientIdentity,'',yelpResponse] frontend.send_multipart(frontendResponse)
def worker(client,location,query): """use the yelp api to find the desired place at a location""" #reuse context if it exists, otherwise make a new one context = context or zmq.Context.instance() service = context.socket(zmq.ROUTER) #identify worker service.setsockopt(zmq.IDENTITY,b'A') service.connect("tcp://localhost:5560") while True: #send our identity service.send('') message = service.recv() with myLock: print "yelp worker got:" print message if message != "": response = queryYelp(client, request) service.send(response) elif message == "END": break # else: # with myLock: # print "the server has the wrong identities!" # break
def __init__(self, app, ctx, port=6606): self.app = app self.ctx = ctx self.publisher = self.ctx.socket(zmq.PUB) self.publisher.bind("tcp://*:" + str(port)) self.snapshot = ctx.socket(zmq.ROUTER) self.snapshot.bind("tcp://*:" + str(port+2)) self.snapshot = ZMQStream(self.snapshot) self.snapshot.on_recv(self.handle_snapshot) self.seqNr = 0
def __init__(self, cfg, target=None, args=[]): (self.target,self.args) = (target,args) # Set up server socket self.context = zmq.Context() self.socket = self.context.socket(zmq.ROUTER) self.port = self.socket.bind_to_random_port('tcp://*', min_port=cfg["port_min"], max_port=cfg["port_max"], max_tries=100) self.uri = "tcp://127.0.0.1:%d"%(self.port) print "Controller listening on %s"%(self.uri) self.start()
def _memoryTask(settings, logger,master, url_setFrontend, url_getFrontend, url_getBackend, url_setBackend): from Cache import Slab, CacheSlubLRU # grab settings slabSize = settings.getSlabSize() preallocatedPool = settings.getPreallocatedPool() getterNumber = settings.getGetterThreadNumber() # initialize cache cache = CacheSlubLRU(preallocatedPool , slabSize, logger) #set as 10 mega, 1 mega per slab #log logger.debug("Memory Process initialized:" + str(preallocatedPool) + "B, get# = " + str(getterNumber)) # Prepare our context and sockets context = zmq.Context.instance() # Socket to talk to get socketGetFrontend = context.socket(zmq.ROUTER) socketGetFrontend.bind(url_getFrontend) # Socket to talk to workers socketGetBackend = context.socket(zmq.DEALER) socketGetBackend.bind(url_getBackend) timing = {} timing["getters"] = [] timing["setters"] = [-1] Thread(name='MemoryGetProxy',target=_proxyThread, args=(logger, master, socketGetFrontend, socketGetBackend, url_getFrontend, url_getBackend)).start() for i in range(getterNumber): timing["getters"].append(-1) th = Thread(name='MemoryGetter',target=_getThread, args=(i,logger, settings, cache,master,url_getBackend, timing)) th.start() slaveSetQueue = Queue.Queue() hostState = {} hostState["current"] = None Thread(name='MemoryPerformanceMetricator',target=_memoryMetricatorThread, args=(logger, cache, settings, master, timing)).start() Thread(name='MemorySlaveSetter',target=_setToSlaveThread, args=(logger,settings, cache,master,url_getBackend, slaveSetQueue, hostState)).start() _setThread(logger, settings, cache,master,url_setFrontend,slaveSetQueue, hostState, timing)
def __init__(self, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.INFO): self.redis_url = redis_url self.redis_server = redis.from_url(redis_url) self.context = zmq.Context() self.socket = self.context.socket(zmq.ROUTER) self.socket.setsockopt(zmq.LINGER, 500) self.socket.setsockopt(zmq.ROUTER_MANDATORY, 1) # Paranoid for debugging purposes self.socket.setsockopt(zmq.SNDTIMEO, 1000) # Short timeout self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT) self.node_name = socket.gethostname() self.address = bind_to_random_port(self.socket, 'tcp://' + get_my_ip(), min_port=14300, max_port=14399, max_tries=100) with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.address'), 'w') as F: F.write(self.address) with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.pid'), 'w') as F: F.write(str(os.getpid())) self.logger = bqueryd.logger.getChild('controller').getChild(self.address) self.logger.setLevel(loglevel) self.msg_count_in = 0 self.rpc_results = [] # buffer of results that are ready to be returned to callers self.rpc_segments = {} # Certain RPC calls get split and divided over workers, this dict tracks the original RPCs self.worker_map = {} # maintain a list of connected workers TODO get rid of unresponsive ones... self.files_map = {} # shows on which workers a file is available on self.worker_out_messages = {None: []} # A dict of buffers, used to round-robin based on message affinity self.worker_out_messages_sequence = [None] # used to round-robin the outgoing messages self.is_running = True self.last_heartbeat = 0 self.others = {} # A dict of other Controllers running on other DQE nodes self.start_time = time.time()
def test_router_dealer(self): router, dealer = self.create_bound_pair(zmq.ROUTER, zmq.DEALER) msg1 = b'message1' dealer.send(msg1) ident = self.recv(router) more = router.rcvmore self.assertEqual(more, True) msg2 = self.recv(router) self.assertEqual(msg1, msg2) more = router.rcvmore self.assertEqual(more, False)