我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用zmq.RCVTIMEO。
def setUp(self): """ Create a dummy supvisors, ZMQ context and sockets. """ from supvisors.supvisorszmq import (InternalEventPublisher, InternalEventSubscriber) # the dummy Supvisors is used for addresses and ports self.supvisors = MockedSupvisors() # create publisher and subscriber self.publisher = InternalEventPublisher( self.supvisors.address_mapper.local_address, self.supvisors.options.internal_port, self.supvisors.logger) self.subscriber = InternalEventSubscriber( self.supvisors.address_mapper.addresses, self.supvisors.options.internal_port) # socket configuration is meant to be blocking # however, a failure would block the unit test, # so a timeout is set for reception self.subscriber.socket.setsockopt(zmq.RCVTIMEO, 1000) # publisher does not wait for subscriber clients to work, # so give some time for connections time.sleep(1)
def start(self, socket): """ Start the monitoring thread and socket. :param socket: Socket to monitor. """ # Start a thread only if it is not already running. if self.monitor_listening.is_set(): return # Setup monitor socket. monitor_socket = socket.get_monitor_socket(events=self.events) monitor_socket.setsockopt(zmq.RCVTIMEO, self.receive_timeout) self.monitor_listening.set() def event_listener(monitor_listening): while monitor_listening.is_set(): try: event = recv_monitor_message(monitor_socket) # The socket is closed, just stop listening now. if event["event"] == zmq.EVENT_CLOSED: monitor_listening.clear() self._notify_listeners(event) # In case the receive cannot be completed before the timeout. except zmq.Again: # Heartbeat for listeners - we do not need an additional thread for time based listeners. self._notify_listeners(None) # Cleanup monitor socket. socket.disable_monitor() monitor_socket.close() self.monitor_thread = threading.Thread(target=event_listener, args=(self.monitor_listening,)) # In case someone does not call disconnect, this will stop the thread anyway. self.monitor_thread.daemon = True self.monitor_thread.start()
def setUp(self): """ Create a dummy supvisors, ZMQ context and sockets. """ from supvisors.supvisorszmq import RequestPusher, RequestPuller # the dummy Supvisors is used for addresses and ports self.supvisors = MockedSupvisors() # create pusher and puller self.pusher = RequestPusher(self.supvisors.logger) self.puller = RequestPuller() # socket configuration is meant to be blocking # however, a failure would block the unit test, # so a timeout is set for emission and reception self.puller.socket.setsockopt(zmq.SNDTIMEO, 1000) self.puller.socket.setsockopt(zmq.RCVTIMEO, 1000)
def setUp(self): """ Create a dummy supvisors and a ZMQ context. """ from supvisors.supvisorszmq import EventPublisher, EventSubscriber # the dummy Supvisors is used for addresses and ports self.supvisors = MockedSupvisors() # create the ZeroMQ context # create publisher and subscriber self.publisher = EventPublisher( self.supvisors.options.event_port, self.supvisors.logger) self.subscriber = EventSubscriber( zmq.Context.instance(), self.supvisors.options.event_port, self.supvisors.logger) # WARN: this subscriber does not include a subscription # when using a subscription, use a time sleep to give time # to PyZMQ to handle it # WARN: socket configuration is meant to be blocking # however, a failure would block the unit test, # so a timeout is set for reception self.subscriber.socket.setsockopt(zmq.RCVTIMEO, 1000) # create test payloads self.supvisors_payload = Payload({'state': 'running', 'version': '1.0'}) self.address_payload = Payload({'state': 'silent', 'name': 'cliche01', 'date': 1234}) self.application_payload = Payload({'state': 'starting', 'name': 'supvisors'}) self.process_payload = Payload({'state': 'running', 'process_name': 'plugin', 'application_name': 'supvisors', 'date': 1230}) self.event_payload = Payload({'state': 20, 'name': 'plugin', 'group': 'supvisors', 'now': 1230})
def setup_socket(self): """Sets up the ZMQ socket.""" context = zmq.Context() # The component inheriting from BaseComponent should self.socket.connect # with the appropriate address. self.socket = context.socket(zmq.REQ) # LINGER sets a timeout for socket.send. self.socket.setsockopt(zmq.LINGER, 0) # RCVTIME0 sets a timeout for socket.recv. self.socket.setsockopt(zmq.RCVTIMEO, 500) # milliseconds
def connect_socket(self): reply = None for c in self.controllers: self.logger.debug('Establishing socket connection to %s' % c) tmp_sock = self.context.socket(zmq.REQ) tmp_sock.setsockopt(zmq.RCVTIMEO, 2000) tmp_sock.setsockopt(zmq.LINGER, 0) tmp_sock.identity = self.identity tmp_sock.connect(c) # first ping the controller to see if it responds at all msg = RPCMessage({'payload': 'ping'}) tmp_sock.send_json(msg) try: reply = msg_factory(tmp_sock.recv_json()) self.address = c break except: traceback.print_exc() continue if reply: # Now set the timeout to the actual requested self.logger.debug("Connection OK, setting network timeout to %s milliseconds", self.timeout*1000) self.controller = tmp_sock self.controller.setsockopt(zmq.RCVTIMEO, self.timeout*1000) else: raise Exception('No controller connection')
def connect(self, server = None, port = None): if self.connected: self.disconnect() self.context = zmq.Context() self.server = (server if server else self.server) self.port = (port if port else self.port) # Socket to talk to server self.transport = "tcp://{0}:{1}".format(self.server, self.port) self.socket = self.context.socket(zmq.REQ) try: self.socket.connect(self.transport) except zmq.error.ZMQError as e: return RC_ERR("ZMQ Error: Bad server or port name: " + str(e)) self.socket.setsockopt(zmq.SNDTIMEO, 10000) self.socket.setsockopt(zmq.RCVTIMEO, 10000) self.connected = True rc = self.invoke_rpc_method('ping', api_class = None) if not rc: self.connected = False return rc return RC_OK()
def __init__(self, listeners: List[MessageListener] = None, on_finish: Callable[[int], None] = lambda return_code: None): """Starts an apart-core command and starts listening for zmq messages on this new thread""" Thread.__init__(self, name='apart-core-runner') self.ipc_address = 'ipc:///tmp/apart-gtk-{}.ipc'.format(uuid.uuid4()) self.zmq_context = zmq.Context() self.socket = self.zmq_context.socket(zmq.PAIR) self.socket.setsockopt(zmq.RCVTIMEO, 100) self.socket.bind(self.ipc_address) self.on_finish = on_finish self.listeners = listeners or [] # List[MessageListener] if LOG_MESSAGES: self.register(MessageListener(lambda msg: print('apart-core ->\n {}'.format(str(msg))))) # Current default is apart-core binary stored in the directory above these sources apart_core_cmd = os.environ.get('APART_GTK_CORE_CMD') or \ os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/../apart-core') try: if os.geteuid() == 0 or os.environ.get('APART_PARTCLONE_CMD'): self.process = subprocess.Popen([apart_core_cmd, self.ipc_address]) else: self.process = subprocess.Popen(['pkexec', apart_core_cmd, self.ipc_address]) except FileNotFoundError: if os.geteuid() == 0: print('apart-core command not found at \'' + apart_core_cmd + '\'', file=sys.stderr) else: print('pkexec command not found, install polkit or run as root', file=sys.stderr) self.zmq_context.destroy() sys.exit(1) self.start()
def start(self): ''' Startup the zmq consumer. ''' zmq_uri = '{protocol}://{address}:{port}'.format( protocol=self.protocol, address=self.address, port=self.port ) if self.port else\ '{protocol}://{address}'.format( # noqa protocol=self.protocol, address=self.address ) log.debug('ZMQ URI: %s', zmq_uri) self.ctx = zmq.Context() if hasattr(zmq, self.type): skt_type = getattr(zmq, self.type) else: skt_type = zmq.PULL self.sub = self.ctx.socket(skt_type) self.sub.connect(zmq_uri) if self.hwm is not None: try: self.sub.setsockopt(zmq.HWM, self.hwm) except AttributeError: self.sub.setsockopt(zmq.RCVHWM, self.hwm) if self.recvtimeout is not None: log.debug('Setting RCVTIMEO to %d', self.recvtimeout) self.sub.setsockopt(zmq.RCVTIMEO, self.recvtimeout) if self.keepalive is not None: log.debug('Setting TCP_KEEPALIVE to %d', self.keepalive) self.sub.setsockopt(zmq.TCP_KEEPALIVE, self.keepalive) if self.keepalive_idle is not None: log.debug('Setting TCP_KEEPALIVE_IDLE to %d', self.keepalive_idle) self.sub.setsockopt(zmq.TCP_KEEPALIVE_IDLE, self.keepalive_idle) if self.keepalive_interval is not None: log.debug('Setting TCP_KEEPALIVE_INTVL to %d', self.keepalive_interval) self.sub.setsockopt(zmq.TCP_KEEPALIVE_INTVL, self.keepalive_interval)
def test_zmq_socket_uses_timeout(self, mock_zmq_context): timeout = 100 ControlClient(host='127.0.0.1', port='10002', timeout=timeout) mock_zmq_context.assert_has_calls( [call().setsockopt(zmq.SNDTIMEO, timeout), call().setsockopt(zmq.RCVTIMEO, timeout)])
def test_connection(self, mock_context): cs = ControlServer(None, connection_string='127.0.0.1:10001') cs.start_server() mock_context.assert_has_calls([call(), call().socket(zmq.REP), call().socket().setsockopt(zmq.RCVTIMEO, 100), call().socket().bind('tcp://127.0.0.1:10001')])
def test_server_can_only_be_started_once(self, mock_context): server = ControlServer(None, connection_string='127.0.0.1:10000') server.start_server() server.start_server() mock_context.assert_has_calls([call(), call().socket(zmq.REP), call().socket().setsockopt(zmq.RCVTIMEO, 100), call().socket().bind('tcp://127.0.0.1:10000')])
def start_server(self): """ Binds the server to the configured host and port and starts listening. """ if self._socket is None: context = zmq.Context() self._socket = context.socket(zmq.REP) self._socket.setsockopt(zmq.RCVTIMEO, 100) self._socket.bind('tcp://{0}:{1}'.format(self.host, self.port)) self.log.info('Listening on %s:%s', self.host, self.port)
def _get_zmq_req_socket(self): context = zmq.Context() context.setsockopt(zmq.REQ_CORRELATE, 1) context.setsockopt(zmq.REQ_RELAXED, 1) context.setsockopt(zmq.SNDTIMEO, self.timeout) context.setsockopt(zmq.RCVTIMEO, self.timeout) context.setsockopt(zmq.LINGER, 0) return context.socket(zmq.REQ)
def _set_timeout(self, short=True, seconds=None): if seconds is not None: base = seconds * 1000 else: base = 5000 if not short: base *= 2 self._conn.setsockopt(zmq.SNDTIMEO, base) # A send should always be quick self._conn.setsockopt(zmq.RCVTIMEO, 2 * base) # A receive might need to wait on processing
def test_recv_timeout(): # https://github.com/eventlet/eventlet/issues/282 with clean_pair(zmq.PUB, zmq.SUB) as (_, sub, _): sub.setsockopt(zmq.RCVTIMEO, 100) try: with eventlet.Timeout(1, False): sub.recv() assert False except zmq.ZMQError as e: assert eventlet.is_timeout(e)
def zthread_fork(ctx, func, *args, **kwargs): """ Create an attached thread. An attached thread gets a ctx and a PAIR pipe back to its parent. It must monitor its pipe, and exit if the pipe becomes unreadable. Returns pipe, or NULL if there was an error. """ a = ctx.socket(zmq.PAIR) a.setsockopt(zmq.LINGER, 0) a.setsockopt(zmq.RCVHWM, 100) a.setsockopt(zmq.SNDHWM, 100) a.setsockopt(zmq.SNDTIMEO, 5000) a.setsockopt(zmq.RCVTIMEO, 5000) b = ctx.socket(zmq.PAIR) b.setsockopt(zmq.LINGER, 0) b.setsockopt(zmq.RCVHWM, 100) b.setsockopt(zmq.SNDHWM, 100) b.setsockopt(zmq.SNDTIMEO, 5000) a.setsockopt(zmq.RCVTIMEO, 5000) iface = "inproc://%s" % binascii.hexlify(os.urandom(8)) a.bind(iface) b.connect(iface) thread = threading.Thread(target=func, args=((ctx, b) + args), kwargs=kwargs) thread.daemon = False thread.start() return a
def _run (self): # socket must be created on the same thread self.socket.setsockopt(zmq.SUBSCRIBE, b'') self.socket.setsockopt(zmq.RCVTIMEO, 5000) self.socket.connect(self.tr) got_data = False self.monitor.reset() while self.active: try: with self.monitor: line = self.socket.recv_string() self.monitor.on_recv_msg(line) self.last_data_recv_ts = time.time() # signal once if not got_data: self.event_handler.on_async_alive() got_data = True # got a timeout - mark as not alive and retry except zmq.Again: # signal once if got_data: self.event_handler.on_async_dead() got_data = False continue except zmq.ContextTerminated: # outside thread signaled us to exit assert(not self.active) break msg = json.loads(line) name = msg['name'] data = msg['data'] type = msg['type'] baseline = msg.get('baseline', False) self.raw_snapshot[name] = data self.__dispatch(name, type, data, baseline) # closing of socket must be from the same thread self.socket.close(linger = 0)
def __init__(self, host, address, log_address): object.__init__(self) if log_address is None: raise NotImplementedError() # TODO remove self.logger = get_log(log_address, name=__name__) # TODO find proper space to define following class class Encoder(json.JSONEncoder): def default(self_, obj): if obj is None: obj = json.JSONEncoder.default(obj) else: if isinstance(obj, Proxy): obj = obj.encode() else: obj = self.wrap_proxy(obj) obj = obj.encode() return obj self.encoder = Encoder self.context = zmq.Context() # TODO connect tmp socket self.logger.debug("connect tmp socket at {a}".format(a=address)) socket = self.context.socket(zmq.PAIR) socket.connect(address) # TODO bind rpc socket transport = 'tcp' port = '*' endpoint = '{h}:{p}'.format(h=host, p=port) address = '{t}://{e}'.format(t=transport, e=endpoint) self.logger.debug("bind rpc socket at {a}".format(a=address)) self.socket = self.context.socket(zmq.PAIR) # self.socket.setsockopt(zmq.RCVTIMEO, 10000) self.socket.bind(address) self.address = self.socket.getsockopt(zmq.LAST_ENDPOINT) self.logger.debug("rpc socket binded at {a}".format(a=self.address)) # TODO send rpc address self.logger.debug("send back rpc address") message = { 'address': self.address, } socket.send_json(message) self.last_obj_id = -1 self.objs = {} self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN)
def __init__(self, ip_addr, load_instruments=None, force=False): """Create a connection to the Moku:Lab unit at the given IP address :type ip_addr: string :param ip_addr: The address to connect to. This should be in IPv4 dotted notation. :type load_instruments: bool or None :param load_instruments: Leave default (*None*) unless you know what you're doing. :type force: bool :param force: Ignore firmware and network compatibility checks and force the instrument to deploy. This is dangerous on many levels, leave *False* unless you know what you're doing. """ self._ip = ip_addr self._seq = 0 self._instrument = None self._known_mokus = [] self._ctx = zmq.Context.instance() self._conn_lock = threading.RLock() try: self._conn = self._ctx.socket(zmq.REQ) self._conn.setsockopt(zmq.LINGER, 5000) self._conn.curve_publickey, self._conn.curve_secretkey = zmq.curve_keypair() self._conn.curve_serverkey, _ = zmq.auth.load_certificate(os.path.join(data_folder, '000')) self._conn.connect("tcp://%s:%d" % (self._ip, Moku.PORT)) # Getting the serial should be fairly quick; it's a simple operation. More importantly we # don't wait to block the fall-back operation for too long self._conn.setsockopt(zmq.SNDTIMEO, 1000) self._conn.setsockopt(zmq.RCVTIMEO, 1000) self.serial = self.get_serial() self._set_timeout() except zmq.error.Again: if not force: print("Connection failed, either the Moku cannot be reached or the firmware is out of date") raise # If we're force-connecting, try falling back to non-encrypted. self._conn = self._ctx.socket(zmq.REQ) self._conn.setsockopt(zmq.LINGER, 5000) self._conn.connect("tcp://%s:%d" % (self._ip, Moku.PORT)) self._set_timeout() self.serial = self.get_serial() self.name = None self.led = None self.led_colours = None # Check that pymoku is compatible with the Moku:Lab's firmware version if not force: build = self.get_firmware_build() if cp.firmware_is_compatible(build) == False: # Might be None = unknown, don't print that. raise MokuException("The connected Moku appears to be incompatible with this version of pymoku. Please run 'moku --ip={} firmware check_compat' for more information.".format(self._ip)) self.load_instruments = load_instruments if load_instruments is not None else self.get_bootmode() == 'normal'