我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.Poller()。
def receive_message(self, event, event_data, listener_data): """ Receives a messages from another processes. :param * event: Not used. :param * event_data: Not used. :param * listener_data: Not used. """ del event, event_data, listener_data # Make a poller for all incoming sockets. poller = zmq.Poller() for socket in self.__end_points.values(): if socket.type in [zmq.PULL, zmq.REP]: poller.register(socket, zmq.POLLIN) # Wait for socket is ready for reading. socks = dict(poller.poll()) for name, socket in self.__end_points.items(): if socket in socks: self._receive_message(name, socket) # ------------------------------------------------------------------------------------------------------------------
def no_barking(self, seconds): """ During start up of ZMQ the incoming file descriptors become 'ready for reading' while there is no message on the socket. This method prevent incoming sockets barking that the are ready the for reading. :param int seconds: The number of seconds the give the other ZMQ thread to start up. """ sleep(seconds) for _ in range(1, len(self.end_points)): poller = zmq.Poller() for socket in self.end_points.values(): if socket.type in [zmq.PULL, zmq.REP]: poller.register(socket, zmq.POLLIN) poller.poll(1) # ----------------------------------------------------------------------------------------------------------------------
def run(self): """ Start the Authentication Agent thread task """ self.authenticator.start() zap = self.authenticator.zap_socket poller = zmq.Poller() poller.register(self.pipe, zmq.POLLIN) poller.register(zap, zmq.POLLIN) while True: try: socks = dict(poller.poll()) except zmq.ZMQError: break # interrupted if self.pipe in socks and socks[self.pipe] == zmq.POLLIN: terminate = self._handle_pipe() if terminate: break if zap in socks and socks[zap] == zmq.POLLIN: self._handle_zap() self.pipe.close() self.authenticator.stop()
def test_timeout(self): """make sure Poller.poll timeout has the right units (milliseconds).""" s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) poller = self.Poller() poller.register(s1, zmq.POLLIN) tic = time.time() evt = poller.poll(.005) toc = time.time() self.assertTrue(toc-tic < 0.1) tic = time.time() evt = poller.poll(5) toc = time.time() self.assertTrue(toc-tic < 0.1) self.assertTrue(toc-tic > .001) tic = time.time() evt = poller.poll(500) toc = time.time() self.assertTrue(toc-tic < 1) self.assertTrue(toc-tic > 0.1)
def __init__(self, socket, io_loop=None): self.socket = socket self.io_loop = io_loop or IOLoop.instance() self.poller = zmq.Poller() self._send_queue = Queue() self._recv_callback = None self._send_callback = None self._close_callback = None self._recv_copy = False self._flushed = False self._state = self.io_loop.ERROR self._init_io_state() # shortcircuit some socket methods self.bind = self.socket.bind self.bind_to_random_port = self.socket.bind_to_random_port self.connect = self.socket.connect self.setsockopt = self.socket.setsockopt self.getsockopt = self.socket.getsockopt self.setsockopt_string = self.socket.setsockopt_string self.getsockopt_string = self.socket.getsockopt_string self.setsockopt_unicode = self.socket.setsockopt_unicode self.getsockopt_unicode = self.socket.getsockopt_unicode
def connect(self): self.context = zmq.Context() if not self.context: raise RuntimeError('Failed to create ZMQ context!') self.socket = self.context.socket(zmq.REQ) if not self.socket: raise RuntimeError('Failed to create ZMQ socket!') self.socket.connect(self.endpoint) self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN) self.is_connected = True
def connect(self): self.context = zmq.Context() if not self.context: raise RuntimeError('Failed to create ZMQ context!') self.socket = self.context.socket(zmq.PULL) if not self.socket: raise RuntimeError('Failed to create ZMQ socket!') self.socket.bind(self.endpoint) self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN) self.is_connected = True
def connect(self): self.context = zmq.Context() if not self.context: raise RuntimeError('Failed to create ZMQ context!') self.socket = self.context.socket(zmq.REP) if not self.socket: raise RuntimeError('Failed to create ZMQ socket!') self.socket.bind(self.endpoint) self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN) self.is_connected = True
def run(self): """ Contents of the infinite loop. """ # Create zmq sockets sockets = SupvisorsZmq(self.supvisors) # create poller poller = zmq.Poller() # register sockets poller.register(sockets.internal_subscriber.socket, zmq.POLLIN) poller.register(sockets.puller.socket, zmq.POLLIN) # poll events forever while not self.stopping(): socks = dict(poller.poll(500)) # test stop condition again: if Supervisor is stopping, # any XML-RPC call would block this thread, and the other # because of the join if not self.stopping(): self.check_requests(sockets, socks) self.check_events(sockets.internal_subscriber, socks) # close resources gracefully poller.unregister(sockets.puller.socket) poller.unregister(sockets.internal_subscriber.socket) sockets.close()
def support_test_send_to_multiple_addresses(self, address1, address2): poller = zmq.Poller() socket1 = self.context.socket(roles['listener']) socket2 = self.context.socket(roles['listener']) try: socket1.bind("tcp://%s" % address1) socket2.bind("tcp://%s" % address2) poller.register(socket1, zmq.POLLIN) poller.register(socket2, zmq.POLLIN) polled = dict(poller.poll(2000)) if socket1 in polled: socket1.recv() socket1.send(nw0.sockets._serialise(address1)) elif socket2 in polled: socket2.recv() socket2.send(nw0.sockets._serialise(address2)) else: raise RuntimeError("Nothing found") finally: socket1.close() socket2.close()
def __init__(self, data_dir=bqueryd.DEFAULT_DATA_DIR, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.DEBUG): if not os.path.exists(data_dir) or not os.path.isdir(data_dir): raise Exception("Datadir %s is not a valid directory" % data_dir) self.worker_id = binascii.hexlify(os.urandom(8)) self.node_name = socket.gethostname() self.data_dir = data_dir self.data_files = set() context = zmq.Context() self.socket = context.socket(zmq.ROUTER) self.socket.setsockopt(zmq.LINGER, 500) self.socket.identity = self.worker_id self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT) self.redis_server = redis.from_url(redis_url) self.controllers = {} # Keep a dict of timestamps when you last spoke to controllers self.check_controllers() self.last_wrm = 0 self.start_time = time.time() self.logger = bqueryd.logger.getChild('worker ' + self.worker_id) self.logger.setLevel(loglevel) self.msg_count = 0 signal.signal(signal.SIGTERM, self.term_signal())
def __init__(self, server_ip, server_port, task_id='', debug=False): if debug: l.setLevel(logging.DEBUG) l.debug("Hydra Analyser initiated...") self.server_ip = server_ip self.port = server_port self.task_id = task_id self.data = {} # This is where all received data will be stored self.context = zmq.Context.instance() self.poller = zmq.Poller() self.req_msg = hdaemon_pb2.CommandMessage() self.resp_msg = hdaemon_pb2.ResponseMessage() l.debug("Connecting to server at [%s:%s]", self.server_ip, self.port) self.socket = self.context.socket(zmq.REQ) self.socket.connect("tcp://%s:%s" % (self.server_ip, self.port)) l.debug("Connected...")
def run(self): self.log.debug("Broker starts XPUB:{}, XSUB:{}" .format(self.xpub_url, self.xsub_url)) # self.proxy.start() poller = zmq.Poller() poller.register(self.xpub, zmq.POLLIN) poller.register(self.xsub, zmq.POLLIN) self.running = True while self.running: events = dict(poller.poll(1000)) if self.xpub in events: message = self.xpub.recv_multipart() self.log.debug("subscription message: {}".format(message[0])) self.xsub.send_multipart(message) if self.xsub in events: message = self.xsub.recv_multipart() self.log.debug("publishing message: {}".format(message)) self.xpub.send_multipart(message)
def __init__(self, port, pipeline=100, host='localhost', log_file=None): """Create a new ZMQDealer object. """ context = zmq.Context.instance() # noinspection PyUnresolvedReferences self.socket = context.socket(zmq.DEALER) self.socket.hwm = pipeline self.socket.connect('tcp://%s:%d' % (host, port)) self._log_file = log_file self.poller = zmq.Poller() # noinspection PyUnresolvedReferences self.poller.register(self.socket, zmq.POLLIN) if self._log_file is not None: self._log_file = os.path.abspath(self._log_file) # If log file directory does not exists, create it log_dir = os.path.dirname(self._log_file) if not os.path.exists(log_dir): os.makedirs(log_dir) # clears any existing log if os.path.exists(self._log_file): os.remove(self._log_file)
def __init__(self, targname, cfg, isServer=False): self.targname = targname self.cfg = cfg self.isServer = isServer self.fnCallName = '' self.ctx = zmq.Context() self.ctx.linger = 100 if not self.isServer: self.sock = self.ctx.socket(zmq.DEALER) self.sock.linger = 100 self.sock.connect('tcp://%s:%s' % (self.cfg['server'],self.cfg.get('port',7677))) # this times out with EINVAL when no internet self.poller = zmq.Poller() self.poller.register(self.sock, zmq.POLLIN) else: self.sock = self.ctx.socket(zmq.ROUTER) self.sock.linger = 100 self.sock.bind('tcp://*:%s' % (self.cfg.get('port',7677))) self.poller = zmq.Poller() self.poller.register(self.sock, zmq.POLLIN) self.be = GetBackend(self.cfg['backend'])(self.targname, self.cfg) self.inTime = time.time() self.inactiveLimit = int(self.cfg.get('inactivelimit',0)) print 'inactivelimit ',self.inactiveLimit
def ready(self, name, polling_mechanism, timeout=None): """Polls all sockets and checks if the socket with the given name is ready for either ``zmq.POLLIN`` or ``zmq.POLLOUT``. returns the socket if available, or ``None`` :param socket_name: the socket name :param polling_mechanism: either ``zmq.POLLIN`` or ``zmq.POLLOUT`` :param timeout: the polling timeout in miliseconds that will be passed to ``zmq.Poller().poll()`` (optional, defaults to ``core.DEFAULT_POLLING_TIMEOUT``) """ socket = self.get_by_name(name) available_mechanism = self.engage(timeout is None and self.timeout or timeout).pop(socket, None) if polling_mechanism == available_mechanism: return socket
def wait_until_ready(self, name, polling_mechanism, timeout=None, polling_timeout=None): """Briefly waits until the socket is ready to be used, yields to other greenlets until the socket becomes available. returns the socket if available within the given timeout, or ``None`` :param socket_name: the socket name :param polling_mechanism: either ``zmq.POLLIN`` or ``zmq.POLLOUT`` :param timeout: the timeout in seconds (accepts float) in which it should wait for the socket to become available (optional, defaults to ``core.DEFAULT_TIMEOUT_IN_SECONDS``) :param polling_timeout: the polling timeout in miliseconds that will be passed to ``zmq.Poller().poll()``. (optional, defaults to ``core.DEFAULT_POLLING_TIMEOUT``) """ timeout = timeout is None and self.timeout or timeout polling_timeout = polling_timeout is None and self.polling_timeout or polling_timeout start = time.time() current = start while current - start < timeout: self.engage(polling_timeout) socket = self.ready(name, polling_mechanism, timeout=timeout) current = time.time() if socket: return socket
def _send_raw(self, serialized): self.create_socket() self._socket.send_string(serialized, zmq.NOBLOCK) poller = zmq.Poller() poller.register(self._socket, zmq.POLLIN) if poller.poll(self._timeout * 1000): msg = self._socket.recv() self.on_message(msg) self.cleanup_socket() else: self._transport.log("Peer " + self._address + " timed out.") self.cleanup_socket() self._transport.remove_peer(self._address)
def zmq_request(self, msg_type, msg_content, timeout=__DEFAULT_REQUEST_TIMEOUT): # new socket to talk to server self.__socket = zmq.Context().socket(zmq.REQ) self.__socket.connect("tcp://localhost:" + ZMQPort.RQ) # init poller and register to socket that web can poll socket to check is it has messages poller = zmq.Poller() poller.register(self.__socket, zmq.POLLIN) send_flatbuf_msg(self.__socket, msg_type, msg_content) reqs = 0 while reqs * self.__POLL_INTERVAL <= timeout: socks = dict(poller.poll(self.__POLL_INTERVAL)) if self.__socket in socks and socks[self.__socket] == zmq.POLLIN: msg = self.__socket.recv() msgObj = TransMsg.GetRootAsTransMsg(msg, 0) return msgObj.Content() reqs = reqs + 1 return False
def test_getsockopt_events(self): sock1, sock2, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER) eventlet.sleep() poll_out = zmq.Poller() poll_out.register(sock1, zmq.POLLOUT) sock_map = poll_out.poll(100) self.assertEqual(len(sock_map), 1) events = sock1.getsockopt(zmq.EVENTS) self.assertEqual(events & zmq.POLLOUT, zmq.POLLOUT) sock1.send(b'') poll_in = zmq.Poller() poll_in.register(sock2, zmq.POLLIN) sock_map = poll_in.poll(100) self.assertEqual(len(sock_map), 1) events = sock2.getsockopt(zmq.EVENTS) self.assertEqual(events & zmq.POLLIN, zmq.POLLIN)
def __init__(self, *args, **kwargs): super(ZMQPlaybook, self).__init__(*args, **kwargs) self._sockets = [] self._hooks = {} self.context = kwargs.get("context", zmq.Context.instance()) self.socket = self.context.socket(zmq.SUB) self.socket_dir = tempfile.mkdtemp() self._env['DAUBER_SOCKET_URI'] = "ipc://{}/dauber.socket".format(self.socket_dir) self.socket.connect(self._env['DAUBER_SOCKET_URI']) self._env['DAUBER_CONTROL_SOCKET_URI'] = "ipc://{}/control.socket".format(self.socket_dir) self.poller = zmq.Poller() self._register_socket(self.socket, self.__class__._zmq_socket_handler) self.add_callback_plugin_dir( pr.resource_filename(__name__, 'ansible/callback_plugins')) self.logger.setLevel(logging.WARNING) self.verbosity = 4
def _wait_for_goahead(self): control_socket = self.context.socket(zmq.REP) control_socket.bind(os.environ['DAUBER_CONTROL_SOCKET_URI']) poller = zmq.Poller() poller.register(control_socket) timeout = 500 t_last = time.time() while (time.time() - t_last) < timeout: ready = dict(poller.poll(10)) if ready.get(control_socket): control_socket.recv() control_socket.send(b'') break self.socket.send_multipart(['hello', b'']) t_last = time.time() assert (time.time() - t_last) < timeout, \ "Timed out before recieving a signal to continue" del control_socket
def monitored_queue(in_socket, out_socket, mon_socket, in_prefix=b'in', out_prefix=b'out'): swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER poller = zmq.Poller() poller.register(in_socket, zmq.POLLIN) poller.register(out_socket, zmq.POLLIN) while True: events = dict(poller.poll()) if in_socket in events: _relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids) if out_socket in events: _relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids)
def test_retry_poll(self): x, y = self.create_bound_pair() poller = zmq.Poller() poller.register(x, zmq.POLLIN) self.alarm() def send(): time.sleep(2 * self.signal_delay) y.send(b('ping')) t = Thread(target=send) t.start() evts = dict(poller.poll(2 * self.timeout_ms)) t.join() assert x in evts assert self.timer_fired x.recv()
def test_pair(self): s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) # Sleep to allow sockets to connect. wait() poller = self.Poller() poller.register(s1, zmq.POLLIN|zmq.POLLOUT) poller.register(s2, zmq.POLLIN|zmq.POLLOUT) # Poll result should contain both sockets socks = dict(poller.poll()) # Now make sure that both are send ready. self.assertEqual(socks[s1], zmq.POLLOUT) self.assertEqual(socks[s2], zmq.POLLOUT) # Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN s1.send(b'msg1') s2.send(b'msg2') wait() socks = dict(poller.poll()) self.assertEqual(socks[s1], zmq.POLLOUT|zmq.POLLIN) self.assertEqual(socks[s2], zmq.POLLOUT|zmq.POLLIN) # Make sure that both are in POLLOUT after recv. s1.recv() s2.recv() socks = dict(poller.poll()) self.assertEqual(socks[s1], zmq.POLLOUT) self.assertEqual(socks[s2], zmq.POLLOUT) poller.unregister(s1) poller.unregister(s2) # Wait for everything to finish. wait()
def test_pubsub(self): s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB) s2.setsockopt(zmq.SUBSCRIBE, b'') # Sleep to allow sockets to connect. wait() poller = self.Poller() poller.register(s1, zmq.POLLIN|zmq.POLLOUT) poller.register(s2, zmq.POLLIN) # Now make sure that both are send ready. socks = dict(poller.poll()) self.assertEqual(socks[s1], zmq.POLLOUT) self.assertEqual(s2 in socks, 0) # Make sure that s1 stays in POLLOUT after a send. s1.send(b'msg1') socks = dict(poller.poll()) self.assertEqual(socks[s1], zmq.POLLOUT) # Make sure that s2 is POLLIN after waiting. wait() socks = dict(poller.poll()) self.assertEqual(socks[s2], zmq.POLLIN) # Make sure that s2 goes into 0 after recv. s2.recv() socks = dict(poller.poll()) self.assertEqual(s2 in socks, 0) poller.unregister(s1) poller.unregister(s2) # Wait for everything to finish. wait()
def test_wakeup(self): s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) poller = self.Poller() poller.register(s2, zmq.POLLIN) tic = time.time() r = gevent.spawn(lambda: poller.poll(10000)) s = gevent.spawn(lambda: s1.send(b'msg1')) r.join() toc = time.time() self.assertTrue(toc-tic < 1)
def __init__(self): super().__init__() # this maps file descriptors to keys self._fd_to_key = {} # read-only mapping returned by get_map() self._map = _SelectorMapping(self) self._zmq_poller = _zmq.Poller()
def thread_loop(self, context, pipe): poller = zmq.Poller() ipc_pub = zmq_tools.Msg_Dispatcher(context, self.g_pool.ipc_push_url) poller.register(pipe, zmq.POLLIN) remote_socket = None while True: items = dict(poller.poll()) if pipe in items: cmd = pipe.recv_string() if cmd == 'Exit': break elif cmd == 'Bind': new_url = pipe.recv_string() if remote_socket: poller.unregister(remote_socket) remote_socket.close(linger=0) try: remote_socket = context.socket(zmq.REP) remote_socket.bind(new_url) except zmq.ZMQError as e: remote_socket = None pipe.send_string("Error", flags=zmq.SNDMORE) pipe.send_string("Could not bind to Socket: {}. Reason: {}".format(new_url, e)) else: pipe.send_string("Bind OK", flags=zmq.SNDMORE) # `.last_endpoint` is already of type `bytes` pipe.send(remote_socket.last_endpoint.replace(b"tcp://", b"")) poller.register(remote_socket, zmq.POLLIN) if remote_socket in items: self.on_recv(remote_socket, ipc_pub) self.thread_pipe = None
def __init__(self, host, port=7772): QtCore.QObject.__init__(self) self.context = zmq.Context() self.socket = self.context.socket(zmq.SUB) self.socket.connect("tcp://{}:{}".format(host, port)) self.socket.setsockopt_string(zmq.SUBSCRIBE, "data") self.socket.setsockopt_string(zmq.SUBSCRIBE, "done") self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN) self.running = True
def open_connection(self, address, status_port, data_port): self.statusBar().showMessage("Open session to {}:{}".format(address, status_port), 2000) socket = self.context.socket(zmq.DEALER) socket.identity = "Matplotlib_Qt_Client".encode() socket.connect("tcp://{}:{}".format(address, status_port)) socket.send(b"WHATSUP") poller = zmq.Poller() poller.register(socket, zmq.POLLOUT) time.sleep(0.1) evts = dict(poller.poll(100)) if socket in evts: try: reply, desc = [e.decode() for e in socket.recv_multipart()] desc = json.loads(desc) self.statusBar().showMessage("Connection established. Pulling plot information.", 2000) except: self.statusBar().showMessage("Could not connect to server.", 2000) return else: self.statusBar().showMessage("Server did not respond.", 2000) socket.close() self.construct_plots(desc) # Actual data listener if self.listener_thread: self.Datalistener.running = False self.listener_thread.quit() self.listener_thread.wait() self.listener_thread = QtCore.QThread() self.Datalistener = DataListener(address, data_port) self.Datalistener.moveToThread(self.listener_thread) self.listener_thread.started.connect(self.Datalistener.loop) self.Datalistener.message.connect(self.data_signal_received) self.Datalistener.finished.connect(self.stop_listening) QtCore.QTimer.singleShot(0, self.listener_thread.start)
def run(self): """ Main loop of the thread. """ # create event socket self.subscriber = EventSubscriber(self.zmq_context, self.event_port, self.logger) self.configure() # create poller and register event subscriber poller = zmq.Poller() poller.register(self.subscriber.socket, zmq.POLLIN) # poll events every seconds self.logger.info('entering main loop') while not self.stop_event.is_set(): socks = dict(poller.poll(self._Poll_timeout)) # check if something happened on the socket if self.subscriber.socket in socks and \ socks[self.subscriber.socket] == zmq.POLLIN: self.logger.debug('got message on subscriber') try: message = self.subscriber.receive() except Exception, e: self.logger.error( 'failed to get data from subscriber: {}'.format( e.message)) else: if message[0] == EventHeaders.SUPVISORS: self.on_supvisors_status(message[1]) elif message[0] == EventHeaders.ADDRESS: self.on_address_status(message[1]) elif message[0] == EventHeaders.APPLICATION: self.on_application_status(message[1]) elif message[0] == EventHeaders.PROCESS_EVENT: self.on_process_event(message[1]) elif message[0] == EventHeaders.PROCESS_STATUS: self.on_process_status(message[1]) self.logger.warn('exiting main loop') self.subscriber.close()
def _receive_with_timeout(self, socket, timeout_s, use_multipart=False): """Check for socket activity and either return what's received on the socket or time out if timeout_s expires without anything on the socket. This is implemented in loops of self.try_length_ms milliseconds to allow Ctrl-C handling to take place. """ if timeout_s is config.FOREVER: timeout_ms = config.FOREVER else: timeout_ms = int(1000 * timeout_s) poller = zmq.Poller() poller.register(socket, zmq.POLLIN) ms_so_far = 0 try: for interval_ms in self.intervals_ms(timeout_ms): sockets = dict(poller.poll(interval_ms)) ms_so_far += interval_ms if socket in sockets: if use_multipart: return socket.recv_multipart() else: return socket.recv() else: raise core.SocketTimedOutError(timeout_s) except KeyboardInterrupt: raise core.SocketInterruptedError(ms_so_far / 1000.0)
def __init__(self, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.INFO): self.redis_url = redis_url self.redis_server = redis.from_url(redis_url) self.context = zmq.Context() self.socket = self.context.socket(zmq.ROUTER) self.socket.setsockopt(zmq.LINGER, 500) self.socket.setsockopt(zmq.ROUTER_MANDATORY, 1) # Paranoid for debugging purposes self.socket.setsockopt(zmq.SNDTIMEO, 1000) # Short timeout self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT) self.node_name = socket.gethostname() self.address = bind_to_random_port(self.socket, 'tcp://' + get_my_ip(), min_port=14300, max_port=14399, max_tries=100) with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.address'), 'w') as F: F.write(self.address) with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.pid'), 'w') as F: F.write(str(os.getpid())) self.logger = bqueryd.logger.getChild('controller').getChild(self.address) self.logger.setLevel(loglevel) self.msg_count_in = 0 self.rpc_results = [] # buffer of results that are ready to be returned to callers self.rpc_segments = {} # Certain RPC calls get split and divided over workers, this dict tracks the original RPCs self.worker_map = {} # maintain a list of connected workers TODO get rid of unresponsive ones... self.files_map = {} # shows on which workers a file is available on self.worker_out_messages = {None: []} # A dict of buffers, used to round-robin based on message affinity self.worker_out_messages_sequence = [None] # used to round-robin the outgoing messages self.is_running = True self.last_heartbeat = 0 self.others = {} # A dict of other Controllers running on other DQE nodes self.start_time = time.time()
def test_no_events(self): s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) poller = self.Poller() poller.register(s1, zmq.POLLIN|zmq.POLLOUT) poller.register(s2, 0) self.assertTrue(s1 in poller) self.assertFalse(s2 in poller) poller.register(s1, 0) self.assertFalse(s1 in poller)