我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.POLLOUT。
def __init__(self, data_dir=bqueryd.DEFAULT_DATA_DIR, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.DEBUG): if not os.path.exists(data_dir) or not os.path.isdir(data_dir): raise Exception("Datadir %s is not a valid directory" % data_dir) self.worker_id = binascii.hexlify(os.urandom(8)) self.node_name = socket.gethostname() self.data_dir = data_dir self.data_files = set() context = zmq.Context() self.socket = context.socket(zmq.ROUTER) self.socket.setsockopt(zmq.LINGER, 500) self.socket.identity = self.worker_id self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT) self.redis_server = redis.from_url(redis_url) self.controllers = {} # Keep a dict of timestamps when you last spoke to controllers self.check_controllers() self.last_wrm = 0 self.start_time = time.time() self.logger = bqueryd.logger.getChild('worker ' + self.worker_id) self.logger.setLevel(loglevel) self.msg_count = 0 signal.signal(signal.SIGTERM, self.term_signal())
def go(self): self.logger.info('[#############################>. Starting .<#############################]') while self.is_running: try: time.sleep(0.001) self.heartbeat() self.free_dead_workers() for sock, event in self.poller.poll(timeout=POLLING_TIMEOUT): if event & zmq.POLLIN: self.handle_in() if event & zmq.POLLOUT: self.handle_out() self.process_sink_results() except KeyboardInterrupt: self.logger.debug('Keyboard Interrupt') self.kill() except: self.logger.error("Exception %s" % traceback.format_exc()) self.logger.info('Stopping') for x in (os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.pid'), os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.address')): if os.path.exists(x): os.remove(x)
def ensure_and_bind(self, socket_name, socket_type, address, polling_mechanism): """Ensure that a socket exists, that is *binded* to the given address and that is registered with the given polling mechanism. This method is a handy replacement for calling ``.get_or_create()``, ``.bind()`` and then ``.engage()``. returns the socket itself. :param socket_name: the socket name :param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...) :param address: a valid zeromq address (i.e: inproc://whatevs) :param polling_mechanism: ``zmq.POLLIN``, ``zmq.POLLOUT`` or ``zmq.POLLIN | zmq.POLLOUT`` """ self.get_or_create(socket_name, socket_type, polling_mechanism) socket = self.bind(socket_name, address, polling_mechanism) self.engage() return socket
def ready(self, name, polling_mechanism, timeout=None): """Polls all sockets and checks if the socket with the given name is ready for either ``zmq.POLLIN`` or ``zmq.POLLOUT``. returns the socket if available, or ``None`` :param socket_name: the socket name :param polling_mechanism: either ``zmq.POLLIN`` or ``zmq.POLLOUT`` :param timeout: the polling timeout in miliseconds that will be passed to ``zmq.Poller().poll()`` (optional, defaults to ``core.DEFAULT_POLLING_TIMEOUT``) """ socket = self.get_by_name(name) available_mechanism = self.engage(timeout is None and self.timeout or timeout).pop(socket, None) if polling_mechanism == available_mechanism: return socket
def wait_until_ready(self, name, polling_mechanism, timeout=None, polling_timeout=None): """Briefly waits until the socket is ready to be used, yields to other greenlets until the socket becomes available. returns the socket if available within the given timeout, or ``None`` :param socket_name: the socket name :param polling_mechanism: either ``zmq.POLLIN`` or ``zmq.POLLOUT`` :param timeout: the timeout in seconds (accepts float) in which it should wait for the socket to become available (optional, defaults to ``core.DEFAULT_TIMEOUT_IN_SECONDS``) :param polling_timeout: the polling timeout in miliseconds that will be passed to ``zmq.Poller().poll()``. (optional, defaults to ``core.DEFAULT_POLLING_TIMEOUT``) """ timeout = timeout is None and self.timeout or timeout polling_timeout = polling_timeout is None and self.polling_timeout or polling_timeout start = time.time() current = start while current - start < timeout: self.engage(polling_timeout) socket = self.ready(name, polling_mechanism, timeout=timeout) current = time.time() if socket: return socket
def get_or_create(self, name, socket_type, polling_mechanism): """ensure that a socket exists and is registered with a given polling_mechanism (POLLIN, POLLOUT or both) returns the socket itself. :param name: the socket name :param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...) :param polling_mechanism: one of (``zmq.POLLIN``, ``zmq.POLLOUT``, ``zmq.POLLIN | zmq.POLLOUT``) """ if name not in self.sockets: self.create(name, socket_type) socket = self.get_by_name(name) self.register_socket(socket, polling_mechanism) return socket
def get_log_handler(self, socket_name, topic_name='logs'): """returns an instance of :py:class:ZMQPubHandler attached to a previously-created socket. :param socket_name: the name of the socket, previously created with :py:meth:SocketManager.create :param topic_name: the name of the topic in which the logs will be PUBlished **Example:** :: >>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.ensure_and_bind('logs', zmq.PUB, 'tcp://*:6000', zmq.POLLOUT) >>> app_logger = sockets.get_logger('logs', logger_name='myapp')) >>> app_logger.info("Server is up!") >>> try: url = sockets.recv_safe('download_queue') ... requests.get(url) ... except: ... app_logger.exception('failed to download url: %s', url) """ return ZMQPubHandler(self, socket_name, topic_name)
def test_getsockopt_events(self): sock1, sock2, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER) eventlet.sleep() poll_out = zmq.Poller() poll_out.register(sock1, zmq.POLLOUT) sock_map = poll_out.poll(100) self.assertEqual(len(sock_map), 1) events = sock1.getsockopt(zmq.EVENTS) self.assertEqual(events & zmq.POLLOUT, zmq.POLLOUT) sock1.send(b'') poll_in = zmq.Poller() poll_in.register(sock2, zmq.POLLIN) sock_map = poll_in.poll(100) self.assertEqual(len(sock_map), 1) events = sock2.getsockopt(zmq.EVENTS) self.assertEqual(events & zmq.POLLIN, zmq.POLLIN)
def _get_descriptors(self): """Returns three elements tuple with socket descriptors ready for gevent.select.select """ rlist = [] wlist = [] xlist = [] for socket, flags in self.sockets: if isinstance(socket, zmq.Socket): rlist.append(socket.getsockopt(zmq.FD)) continue elif isinstance(socket, int): fd = socket elif hasattr(socket, 'fileno'): try: fd = int(socket.fileno()) except: raise ValueError('fileno() must return an valid integer fd') else: raise TypeError('Socket must be a 0MQ socket, an integer fd ' 'or have a fileno() method: %r' % socket) if flags & zmq.POLLIN: rlist.append(fd) if flags & zmq.POLLOUT: wlist.append(fd) if flags & zmq.POLLERR: xlist.append(fd) return (rlist, wlist, xlist)
def __state_changed(self, event=None, _evtype=None): if self.closed: self.__cleanup_events() return try: # avoid triggering __state_changed from inside __state_changed events = super(_Socket, self).getsockopt(zmq.EVENTS) except zmq.ZMQError as exc: self.__writable.set_exception(exc) self.__readable.set_exception(exc) else: if events & zmq.POLLOUT: self.__writable.set() if events & zmq.POLLIN: self.__readable.set()
def _wait_write(self): assert self.__writable.ready(), "Only one greenlet can be waiting on this event" self.__writable = AsyncResult() # timeout is because libzmq cannot be trusted to properly signal a new send event: # this is effectively a maximum poll interval of 1s tic = time.time() dt = self._gevent_bug_timeout if dt: timeout = gevent.Timeout(seconds=dt) else: timeout = None try: if timeout: timeout.start() self.__writable.get(block=True) except gevent.Timeout as t: if t is not timeout: raise toc = time.time() # gevent bug: get can raise timeout even on clean return # don't display zmq bug warning for gevent bug (this is getting ridiculous) if self._debug_gevent and timeout and toc-tic > dt and \ self.getsockopt(zmq.EVENTS) & zmq.POLLOUT: print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr) finally: if timeout: timeout.cancel() self.__writable.set()
def test_poll(self): a,b = self.create_bound_pair() tic = time.time() evt = a.poll(50) self.assertEqual(evt, 0) evt = a.poll(50, zmq.POLLOUT) self.assertEqual(evt, zmq.POLLOUT) msg = b'hi' a.send(msg) evt = b.poll(50) self.assertEqual(evt, zmq.POLLIN) msg2 = self.recv(b) evt = b.poll(50) self.assertEqual(evt, 0) self.assertEqual(msg2, msg)
def test_no_events(self): s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) poller = self.Poller() poller.register(s1, zmq.POLLIN|zmq.POLLOUT) poller.register(s2, 0) self.assertTrue(s1 in poller) self.assertFalse(s2 in poller) poller.register(s1, 0) self.assertFalse(s1 in poller)
def test_pubsub(self): s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB) s2.setsockopt(zmq.SUBSCRIBE, b'') # Sleep to allow sockets to connect. wait() poller = self.Poller() poller.register(s1, zmq.POLLIN|zmq.POLLOUT) poller.register(s2, zmq.POLLIN) # Now make sure that both are send ready. socks = dict(poller.poll()) self.assertEqual(socks[s1], zmq.POLLOUT) self.assertEqual(s2 in socks, 0) # Make sure that s1 stays in POLLOUT after a send. s1.send(b'msg1') socks = dict(poller.poll()) self.assertEqual(socks[s1], zmq.POLLOUT) # Make sure that s2 is POLLIN after waiting. wait() socks = dict(poller.poll()) self.assertEqual(socks[s2], zmq.POLLIN) # Make sure that s2 goes into 0 after recv. s2.recv() socks = dict(poller.poll()) self.assertEqual(s2 in socks, 0) poller.unregister(s1) poller.unregister(s2) # Wait for everything to finish. wait()
def open_connection(self, address, status_port, data_port): self.statusBar().showMessage("Open session to {}:{}".format(address, status_port), 2000) socket = self.context.socket(zmq.DEALER) socket.identity = "Matplotlib_Qt_Client".encode() socket.connect("tcp://{}:{}".format(address, status_port)) socket.send(b"WHATSUP") poller = zmq.Poller() poller.register(socket, zmq.POLLOUT) time.sleep(0.1) evts = dict(poller.poll(100)) if socket in evts: try: reply, desc = [e.decode() for e in socket.recv_multipart()] desc = json.loads(desc) self.statusBar().showMessage("Connection established. Pulling plot information.", 2000) except: self.statusBar().showMessage("Could not connect to server.", 2000) return else: self.statusBar().showMessage("Server did not respond.", 2000) socket.close() self.construct_plots(desc) # Actual data listener if self.listener_thread: self.Datalistener.running = False self.listener_thread.quit() self.listener_thread.wait() self.listener_thread = QtCore.QThread() self.Datalistener = DataListener(address, data_port) self.Datalistener.moveToThread(self.listener_thread) self.listener_thread.started.connect(self.Datalistener.loop) self.Datalistener.message.connect(self.data_signal_received) self.Datalistener.finished.connect(self.stop_listening) QtCore.QTimer.singleShot(0, self.listener_thread.start)
def __init__(self, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.INFO): self.redis_url = redis_url self.redis_server = redis.from_url(redis_url) self.context = zmq.Context() self.socket = self.context.socket(zmq.ROUTER) self.socket.setsockopt(zmq.LINGER, 500) self.socket.setsockopt(zmq.ROUTER_MANDATORY, 1) # Paranoid for debugging purposes self.socket.setsockopt(zmq.SNDTIMEO, 1000) # Short timeout self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT) self.node_name = socket.gethostname() self.address = bind_to_random_port(self.socket, 'tcp://' + get_my_ip(), min_port=14300, max_port=14399, max_tries=100) with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.address'), 'w') as F: F.write(self.address) with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.pid'), 'w') as F: F.write(str(os.getpid())) self.logger = bqueryd.logger.getChild('controller').getChild(self.address) self.logger.setLevel(loglevel) self.msg_count_in = 0 self.rpc_results = [] # buffer of results that are ready to be returned to callers self.rpc_segments = {} # Certain RPC calls get split and divided over workers, this dict tracks the original RPCs self.worker_map = {} # maintain a list of connected workers TODO get rid of unresponsive ones... self.files_map = {} # shows on which workers a file is available on self.worker_out_messages = {None: []} # A dict of buffers, used to round-robin based on message affinity self.worker_out_messages_sequence = [None] # used to round-robin the outgoing messages self.is_running = True self.last_heartbeat = 0 self.others = {} # A dict of other Controllers running on other DQE nodes self.start_time = time.time()
def test_pair(self): s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) # Sleep to allow sockets to connect. wait() poller = self.Poller() poller.register(s1, zmq.POLLIN|zmq.POLLOUT) poller.register(s2, zmq.POLLIN|zmq.POLLOUT) # Poll result should contain both sockets socks = dict(poller.poll()) # Now make sure that both are send ready. self.assertEqual(socks[s1], zmq.POLLOUT) self.assertEqual(socks[s2], zmq.POLLOUT) # Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN s1.send(b'msg1') s2.send(b'msg2') wait() socks = dict(poller.poll()) self.assertEqual(socks[s1], zmq.POLLOUT|zmq.POLLIN) self.assertEqual(socks[s2], zmq.POLLOUT|zmq.POLLIN) # Make sure that both are in POLLOUT after recv. s1.recv() s2.recv() socks = dict(poller.poll()) self.assertEqual(socks[s1], zmq.POLLOUT) self.assertEqual(socks[s2], zmq.POLLOUT) poller.unregister(s1) poller.unregister(s2) # Wait for everything to finish. wait()
def send_safe(self, name, data, *args, **kw): """serializes the data with the configured ``serialization_backend``, waits for the socket to become available, then sends it over through the provided socket name. returns ``True`` if the message was sent, or ``False`` if the socket never became available. .. note:: you can safely use this function without waiting for a socket to become ready, as it already does it for you. raises SocketNotFound when the socket name is wrong. :param name: the name of the socket where data should be sent through :param data: the data to be serialized then sent :param ``*args``: args to be passed to wait_until_ready :param ``**kw``: kwargs to be passed to wait_until_ready """ socket = self.wait_until_ready(name, self.zmq.POLLOUT, *args, **kw) if not socket: return False payload = self.serialization_backend.pack(data) socket.send(payload) return True