我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用zmq.EVENTS。
def test_getsockopt_events(self): sock1, sock2, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER) eventlet.sleep() poll_out = zmq.Poller() poll_out.register(sock1, zmq.POLLOUT) sock_map = poll_out.poll(100) self.assertEqual(len(sock_map), 1) events = sock1.getsockopt(zmq.EVENTS) self.assertEqual(events & zmq.POLLOUT, zmq.POLLOUT) sock1.send(b'') poll_in = zmq.Poller() poll_in.register(sock2, zmq.POLLIN) sock_map = poll_in.poll(100) self.assertEqual(len(sock_map), 1) events = sock2.getsockopt(zmq.EVENTS) self.assertEqual(events & zmq.POLLIN, zmq.POLLIN)
def __state_changed(self, event=None, _evtype=None): if self.closed: self.__cleanup_events() return try: # avoid triggering __state_changed from inside __state_changed events = super(_Socket, self).getsockopt(zmq.EVENTS) except zmq.ZMQError as exc: self.__writable.set_exception(exc) self.__readable.set_exception(exc) else: if events & zmq.POLLOUT: self.__writable.set() if events & zmq.POLLIN: self.__readable.set()
def _wait_write(self): assert self.__writable.ready(), "Only one greenlet can be waiting on this event" self.__writable = AsyncResult() # timeout is because libzmq cannot be trusted to properly signal a new send event: # this is effectively a maximum poll interval of 1s tic = time.time() dt = self._gevent_bug_timeout if dt: timeout = gevent.Timeout(seconds=dt) else: timeout = None try: if timeout: timeout.start() self.__writable.get(block=True) except gevent.Timeout as t: if t is not timeout: raise toc = time.time() # gevent bug: get can raise timeout even on clean return # don't display zmq bug warning for gevent bug (this is getting ridiculous) if self._debug_gevent and timeout and toc-tic > dt and \ self.getsockopt(zmq.EVENTS) & zmq.POLLOUT: print("BUG: gevent may have missed a libzmq send event on %i!" % self.FD, file=sys.stderr) finally: if timeout: timeout.cancel() self.__writable.set()
def _wait_read(self): assert self.__readable.ready(), "Only one greenlet can be waiting on this event" self.__readable = AsyncResult() # timeout is because libzmq cannot always be trusted to play nice with libevent. # I can only confirm that this actually happens for send, but lets be symmetrical # with our dirty hacks. # this is effectively a maximum poll interval of 1s tic = time.time() dt = self._gevent_bug_timeout if dt: timeout = gevent.Timeout(seconds=dt) else: timeout = None try: if timeout: timeout.start() self.__readable.get(block=True) except gevent.Timeout as t: if t is not timeout: raise toc = time.time() # gevent bug: get can raise timeout even on clean return # don't display zmq bug warning for gevent bug (this is getting ridiculous) if self._debug_gevent and timeout and toc-tic > dt and \ self.getsockopt(zmq.EVENTS) & zmq.POLLIN: print("BUG: gevent may have missed a libzmq recv event on %i!" % self.FD, file=sys.stderr) finally: if timeout: timeout.cancel() self.__readable.set()
def get(self, opt): """trigger state_changed on getsockopt(EVENTS)""" if opt in TIMEOS: warnings.warn("TIMEO socket options have no effect in zmq.green", UserWarning) optval = super(_Socket, self).get(opt) if opt == zmq.EVENTS: self.__state_changed() return optval
def new_data(self): return self.socket.get(zmq.EVENTS)
def checkForMessage(self, socket): """ Check on socket activity if there is a complete ZMQ message. @param socket: ZMQ socket """ logging.debug( "Check: {0!s}".format(self.readnotifier.socket())) self.readnotifier.setEnabled(False) check = True try: while check: events = self.socket.get(zmq.EVENTS) check = events & zmq.POLLIN logging.debug( "EVENTS: {0!s}".format(events)) if check: try: msg = self.socket.recv_multipart(zmq.NOBLOCK) except zmq.ZMQError as e: if e.errno == zmq.EAGAIN: # state changed since poll event pass else: logging.info( "RECV Error: {0!s}".format(zmq.strerror(e.errno))) else: logging.debug( "MSG: {0!s} {1!s}".format(self.readnotifier.socket(), msg)) self.sigMsgRecvd.emit(msg) except: pass else: self.readnotifier.setEnabled(True)
def test_int_sockopts(self): "test integer sockopts" v = zmq.zmq_version_info() if v < (3,0): default_hwm = 0 else: default_hwm = 1000 p,s = self.create_bound_pair(zmq.PUB, zmq.SUB) p.setsockopt(zmq.LINGER, 0) self.assertEqual(p.getsockopt(zmq.LINGER), 0) p.setsockopt(zmq.LINGER, -1) self.assertEqual(p.getsockopt(zmq.LINGER), -1) self.assertEqual(p.hwm, default_hwm) p.hwm = 11 self.assertEqual(p.hwm, 11) # p.setsockopt(zmq.EVENTS, zmq.POLLIN) self.assertEqual(p.getsockopt(zmq.EVENTS), zmq.POLLOUT) self.assertRaisesErrno(zmq.EINVAL, p.setsockopt,zmq.EVENTS, 2**7-1) self.assertEqual(p.getsockopt(zmq.TYPE), p.socket_type) self.assertEqual(p.getsockopt(zmq.TYPE), zmq.PUB) self.assertEqual(s.getsockopt(zmq.TYPE), s.socket_type) self.assertEqual(s.getsockopt(zmq.TYPE), zmq.SUB) # check for overflow / wrong type: errors = [] backref = {} constants = zmq.constants for name in constants.__all__: value = getattr(constants, name) if isinstance(value, int): backref[value] = name for opt in zmq.constants.int_sockopts.union(zmq.constants.int64_sockopts): sopt = backref[opt] if sopt.startswith(( 'ROUTER', 'XPUB', 'TCP', 'FAIL', 'REQ_', 'CURVE_', 'PROBE_ROUTER', 'IPC_FILTER', 'GSSAPI', 'STREAM_', )): # some sockopts are write-only continue try: n = p.getsockopt(opt) except zmq.ZMQError as e: errors.append("getsockopt(zmq.%s) raised '%s'."%(sopt, e)) else: if n > 2**31: errors.append("getsockopt(zmq.%s) returned a ridiculous value." " It is probably the wrong type."%sopt) if errors: self.fail('\n'.join([''] + errors))
def test_int_sockopts(self): "test integer sockopts" v = zmq.zmq_version_info() if v < (3,0): default_hwm = 0 else: default_hwm = 1000 p,s = self.create_bound_pair(zmq.PUB, zmq.SUB) p.setsockopt(zmq.LINGER, 0) self.assertEqual(p.getsockopt(zmq.LINGER), 0) p.setsockopt(zmq.LINGER, -1) self.assertEqual(p.getsockopt(zmq.LINGER), -1) self.assertEqual(p.hwm, default_hwm) p.hwm = 11 self.assertEqual(p.hwm, 11) # p.setsockopt(zmq.EVENTS, zmq.POLLIN) self.assertEqual(p.getsockopt(zmq.EVENTS), zmq.POLLOUT) self.assertRaisesErrno(zmq.EINVAL, p.setsockopt,zmq.EVENTS, 2**7-1) self.assertEqual(p.getsockopt(zmq.TYPE), p.socket_type) self.assertEqual(p.getsockopt(zmq.TYPE), zmq.PUB) self.assertEqual(s.getsockopt(zmq.TYPE), s.socket_type) self.assertEqual(s.getsockopt(zmq.TYPE), zmq.SUB) # check for overflow / wrong type: errors = [] backref = {} constants = zmq.constants for name in constants.__all__: value = getattr(constants, name) if isinstance(value, int): backref[value] = name for opt in zmq.constants.int_sockopts.union(zmq.constants.int64_sockopts): sopt = backref[opt] if sopt.startswith(( 'ROUTER', 'XPUB', 'TCP', 'FAIL', 'REQ_', 'CURVE_', 'PROBE_ROUTER', 'IPC_FILTER', 'GSSAPI', )): # some sockopts are write-only continue try: n = p.getsockopt(opt) except zmq.ZMQError as e: errors.append("getsockopt(zmq.%s) raised '%s'."%(sopt, e)) else: if n > 2**31: errors.append("getsockopt(zmq.%s) returned a ridiculous value." " It is probably the wrong type."%sopt) if errors: self.fail('\n'.join([''] + errors))