我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.Socket()。
def poll(self, timeout=None): """Poll the registered 0MQ or native fds for I/O. Parameters ---------- timeout : float, int The timeout in milliseconds. If None, no `timeout` (infinite). This is in milliseconds to be compatible with ``select.poll()``. Returns ------- events : list of tuples The list of events that are ready to be processed. This is a list of tuples of the form ``(socket, event)``, where the 0MQ Socket or integer fd is the first element, and the poll event mask (POLLIN, POLLOUT) is the second. It is common to call ``events = dict(poller.poll())``, which turns the list of tuples into a mapping of ``socket : event``. """ if timeout is None or timeout < 0: timeout = -1 elif isinstance(timeout, float): timeout = int(timeout) return zmq_poll(self.sockets, timeout=timeout)
def main(): context = zmq.Context() socket = zmq.Socket(context, zmq.SUB) monitor = socket.get_monitor_socket() socket.connect(ipc_sub_url) while True: status = recv_monitor_message(monitor) if status['event'] == zmq.EVENT_CONNECTED: break elif status['event'] == zmq.EVENT_CONNECT_DELAYED: pass print('connected') socket.subscribe('pupil') while True: topic = socket.recv_string() payload = serializer.loads(socket.recv(), encoding='utf-8') print(topic, payload)
def __init__(self, ctx, url, topics=(), block_until_connected=True): self.socket = zmq.Socket(ctx, zmq.SUB) assert type(topics) != str if block_until_connected: # connect node and block until a connecetion has been made monitor = self.socket.get_monitor_socket() self.socket.connect(url) while True: status = recv_monitor_message(monitor) if status['event'] == zmq.EVENT_CONNECTED: break elif status['event'] == zmq.EVENT_CONNECT_DELAYED: pass else: raise Exception("ZMQ connection failed") self.socket.disable_monitor() else: self.socket.connect(url) for t in topics: self.subscribe(t)
def register(self, socket, address, alias=None, handler=None): assert not self.registered(address), \ 'Socket is already registered!' if not alias: alias = address self.socket[alias] = socket self.socket[address] = socket self.socket[socket] = socket self.address[alias] = address self.address[socket] = address self.address[address] = address if handler is not None: self.poller.register(socket, zmq.POLLIN) if address.kind in ('SUB', 'SYNC_SUB'): self.subscribe(socket, handler) else: self._set_handler(socket, handler)
def _set_handler(self, socket, handler, update=False): """ Set the socket handler(s). Parameters ---------- socket : zmq.Socket Socket to set its handler(s). handler : function(s) Handler(s) for the socket. This can be a list or a dictionary too. """ if update: try: self.handler[socket].update(self._curated_handlers(handler)) except KeyError: self.handler[socket] = self._curated_handlers(handler) else: self.handler[socket] = self._curated_handlers(handler)
def _process_single_event(self, socket): """ Process a socket's event. Parameters ---------- socket : zmq.Socket Socket that generated the event. """ data = socket.recv() address = self.address[socket] if address.kind == 'SUB': self._process_sub_event(socket, address, data) elif address.kind == 'PULL': self._process_pull_event(socket, address, data) elif address.kind == 'REP': self._process_rep_event(socket, address, data) else: self._process_single_event_complex(address, socket, data)
def _process_single_event_complex(self, address, socket, data): """ Process a socket's event for complex sockets (channels). Parameters ---------- address : AgentAddress or AgentChannel Agent address or channel associated to the socket. socket : zmq.Socket Socket that generated the event. data Received in the socket. """ if address.kind == 'ASYNC_REP': self._process_async_rep_event(socket, address, data) elif address.kind == 'PULL_SYNC_PUB': self._process_sync_pub_event(socket, address.channel, data) else: raise NotImplementedError('Unsupported kind %s!' % address.kind)
def _process_pull_event(self, socket, addr, data): """ Process a PULL socket's event. Parameters ---------- socket : zmq.Socket Socket that generated the event. addr : AgentAddress AgentAddress associated with the socket that generated the event. data : bytes Data received on the socket. """ message = deserialize_message(message=data, serializer=addr.serializer) handler = self.handler[socket] if not isinstance(handler, (list, dict, tuple)): handler = [handler] for h in handler: h(self, message)
def test_shadow_pyczmq(self): try: from pyczmq import zctx, zsocket except Exception: raise SkipTest("Requires pyczmq") ctx = zctx.new() ca = zsocket.new(ctx, zmq.PUSH) cb = zsocket.new(ctx, zmq.PULL) a = zmq.Socket.shadow(ca) b = zmq.Socket.shadow(cb) a.bind("inproc://a") b.connect("inproc://a") a.send(b'hi') rcvd = self.recv(b) self.assertEqual(rcvd, b'hi')
def poll(self, timeout=None): """Poll the registered 0MQ or native fds for I/O. Parameters ---------- timeout : float, int The timeout in milliseconds. If None, no `timeout` (infinite). This is in milliseconds to be compatible with ``select.poll()``. The underlying zmq_poll uses microseconds and we convert to that in this function. Returns ------- events : list of tuples The list of events that are ready to be processed. This is a list of tuples of the form ``(socket, event)``, where the 0MQ Socket or integer fd is the first element, and the poll event mask (POLLIN, POLLOUT) is the second. It is common to call ``events = dict(poller.poll())``, which turns the list of tuples into a mapping of ``socket : event``. """ if timeout is None or timeout < 0: timeout = -1 elif isinstance(timeout, float): timeout = int(timeout) return zmq_poll(self.sockets, timeout=timeout)
def register(self, queue, handler, flags=zmq.POLLIN): """ Register *queue* to be polled on each cycle of the task. Any messages with the relevant *flags* (defaults to ``POLLIN``) will trigger the specified *handler* method which is expected to take a single argument which will be *queue*. :param zmq.Socket queue: The queue to poll. :param handler: The function or method to call when a message with matching *flags* arrives in *queue*. :param int flags: The flags to match in the queue poller (defaults to ``POLLIN``). """ self.poller.register(queue, flags) self.handlers[queue] = handler
def dump(msg_or_socket): """Receives all message parts from socket, printing each frame neatly""" if isinstance(msg_or_socket, zmq.Socket): # it's a socket, call on current message msg = msg_or_socket.recv_multipart() else: msg = msg_or_socket print("----------------------------------------") for part in msg: print("[%03d]" % len(part), end=' ') is_text = True try: print(part.decode('ascii')) except UnicodeDecodeError: print(r"0x%s" % (binascii.hexlify(part).decode('ascii')))
def __init__(self, interface_or_socket, context=None): logging.Handler.__init__(self) if isinstance(interface_or_socket, zmq.Socket): self.socket = interface_or_socket self.ctx = self.socket.context else: self.ctx = context or zmq.Context() self.socket = self.ctx.socket(zmq.PUB) self.socket.bind(interface_or_socket)
def _get_descriptors(self): """Returns three elements tuple with socket descriptors ready for gevent.select.select """ rlist = [] wlist = [] xlist = [] for socket, flags in self.sockets: if isinstance(socket, zmq.Socket): rlist.append(socket.getsockopt(zmq.FD)) continue elif isinstance(socket, int): fd = socket elif hasattr(socket, 'fileno'): try: fd = int(socket.fileno()) except: raise ValueError('fileno() must return an valid integer fd') else: raise TypeError('Socket must be a 0MQ socket, an integer fd ' 'or have a fileno() method: %r' % socket) if flags & zmq.POLLIN: rlist.append(fd) if flags & zmq.POLLOUT: wlist.append(fd) if flags & zmq.POLLERR: xlist.append(fd) return (rlist, wlist, xlist)
def test_subclass(self): """subclasses can assign attributes""" class S(zmq.Socket): a = None def __init__(self, *a, **kw): self.a=-1 super(S, self).__init__(*a, **kw) s = S(self.context, zmq.REP) self.sockets.append(s) self.assertEqual(s.a, -1) s.a=1 self.assertEqual(s.a, 1) a=s.a self.assertEqual(a, 1)
def test_shadow(self): p = self.socket(zmq.PUSH) p.bind("tcp://127.0.0.1:5555") p2 = zmq.Socket.shadow(p.underlying) self.assertEqual(p.underlying, p2.underlying) s = self.socket(zmq.PULL) s2 = zmq.Socket.shadow(s.underlying) self.assertNotEqual(s.underlying, p.underlying) self.assertEqual(s.underlying, s2.underlying) s2.connect("tcp://127.0.0.1:5555") sent = b'hi' p2.send(sent) rcvd = self.recv(s2) self.assertEqual(rcvd, sent)
def register(self, socket, flags=POLLIN|POLLOUT): """p.register(socket, flags=POLLIN|POLLOUT) Register a 0MQ socket or native fd for I/O monitoring. register(s,0) is equivalent to unregister(s). Parameters ---------- socket : zmq.Socket or native socket A zmq.Socket or any Python object having a ``fileno()`` method that returns a valid file descriptor. flags : int The events to watch for. Can be POLLIN, POLLOUT or POLLIN|POLLOUT. If `flags=0`, socket will be unregistered. """ if flags: if socket in self._map: idx = self._map[socket] self.sockets[idx] = (socket, flags) else: idx = len(self.sockets) self.sockets.append((socket, flags)) self._map[socket] = idx elif socket in self._map: # uregister sockets registered with no events self.unregister(socket) else: # ignore new sockets with no events pass
def unregister(self, socket): """Remove a 0MQ socket or native fd for I/O monitoring. Parameters ---------- socket : Socket The socket instance to stop polling. """ idx = self._map.pop(socket) self.sockets.pop(idx) # shift indices after deletion for socket, flags in self.sockets[idx:]: self._map[socket] -= 1
def __init__(self, context, socket_type, io_loop=None): super(_AsyncSocket, self).__init__(context, socket_type) self.io_loop = io_loop or self._default_loop() self._recv_futures = [] self._send_futures = [] self._state = 0 self._shadow_sock = _zmq.Socket.shadow(self.underlying) self._init_io_state()
def _socket_class(self, socket_type): return Socket(self, socket_type, io_loop=self.io_loop)
def __init__(self): pupil_queue = Queue() self.pupil_proc = Process(target=pupil_capture.alternate_launch, args=((pupil_queue), )) self.pupil_proc.start() while True: pupil_msg = pupil_queue.get() print(pupil_msg) if 'tcp' in pupil_msg: self.ipc_sub_url = pupil_msg if 'EYE_READY' in pupil_msg: break context = zmq.Context() self.socket = zmq.Socket(context, zmq.SUB) monitor = self.socket.get_monitor_socket() self.socket.connect(self.ipc_sub_url) while True: status = recv_monitor_message(monitor) if status['event'] == zmq.EVENT_CONNECTED: break elif status['event'] == zmq.EVENT_CONNECT_DELAYED: pass print('Capturing from pupil on url %s.' % self.ipc_sub_url) self.socket.subscribe('pupil') # setup LSL streams = resolve_byprop('name', LSL_STREAM_NAME, timeout=2.5) try: self.inlet = StreamInlet(streams[0]) except IndexError: raise ValueError('Make sure stream name="%s", is opened first.' % LSL_STREAM_NAME) self.running = True self.samples = [] # LSL and pupil samples are synchronized to local_clock(), which is the # runtime on this slave, not the host
def thread_loop(self, context, pipe): poller = zmq.Poller() ipc_pub = zmq_tools.Msg_Dispatcher(context, self.g_pool.ipc_push_url) poller.register(pipe, zmq.POLLIN) remote_socket = None while True: items = dict(poller.poll()) if pipe in items: cmd = pipe.recv_string() if cmd == 'Exit': break elif cmd == 'Bind': new_url = pipe.recv_string() if remote_socket: poller.unregister(remote_socket) remote_socket.close(linger=0) try: remote_socket = context.socket(zmq.REP) remote_socket.bind(new_url) except zmq.ZMQError as e: remote_socket = None pipe.send_string("Error", flags=zmq.SNDMORE) pipe.send_string("Could not bind to Socket: {}. Reason: {}".format(new_url, e)) else: pipe.send_string("Bind OK", flags=zmq.SNDMORE) # `.last_endpoint` is already of type `bytes` pipe.send(remote_socket.last_endpoint.replace(b"tcp://", b"")) poller.register(remote_socket, zmq.POLLIN) if remote_socket in items: self.on_recv(remote_socket, ipc_pub) self.thread_pipe = None
def __init__(self, ctx, url): self.socket = zmq.Socket(ctx, zmq.PUB) self.socket.connect(url)
def _bind_socket(self, socket, addr=None, transport=None): """ Bind a socket using the corresponding transport and address. Parameters ---------- socket : zmq.Socket Socket to bind. addr : str, default is None The address to bind to. transport : str, AgentAddressTransport, default is None Transport protocol. Returns ------- addr : str The address where the socket binded to. """ if transport == 'tcp': host, port = address_to_host_port(addr) if not port: uri = 'tcp://%s' % self.host port = socket.bind_to_random_port(uri) addr = self.host + ':' + str(port) else: socket.bind('tcp://%s' % (addr)) else: if not addr: addr = str(unique_identifier()) if transport == 'ipc': addr = config['IPC_DIR'] / addr socket.bind('%s://%s' % (transport, addr)) return addr
def _process_async_rep_event(self, socket, channel, data): """ Process a ASYNC_REP socket's event. Parameters ---------- socket : zmq.Socket Socket that generated the event. channel : AgentChannel AgentChannel associated with the socket that generated the event. data : bytes Data received on the socket. """ message = deserialize_message(message=data, serializer=channel.serializer) address_uuid, request_uuid, data, address = message client_address = address.twin() if not self.registered(client_address): self.connect(address) handler = self.handler[socket] is_generator = inspect.isgeneratorfunction(handler) if is_generator: generator = handler(self, data) reply = next(generator) else: reply = handler(self, data) self.send(client_address, (address_uuid, request_uuid, reply)) if is_generator: execute_code_after_yield(generator)
def _process_sync_pub_event(self, socket, channel, data): """ Process a SYNC_PUB socket's event. Parameters ---------- socket : zmq.Socket Socket that generated the event. channel : AgentChannel AgentChannel associated with the socket that generated the event. data : bytes Data received on the socket. """ message = deserialize_message(message=data, serializer=channel.serializer) address_uuid, request_uuid, data = message handler = self.handler[socket] is_generator = inspect.isgeneratorfunction(handler) if is_generator: generator = handler(self, data) reply = next(generator) else: reply = handler(self, data) message = (address_uuid, request_uuid, reply) self._send_channel_sync_pub(channel=channel, message=message, topic=address_uuid, general=False) if is_generator: execute_code_after_yield(generator)
def _process_sub_event(self, socket, addr, data): """ Process a SUB socket's event. Parameters ---------- socket : zmq.Socket Socket that generated the event. addr : AgentAddress AgentAddress associated with the socket that generated the event. data : bytes Data received on the socket. """ handlers = self.handler[socket] message = self._process_sub_message(addr.serializer, data) for topic in handlers: if not data.startswith(topic): continue # Call the handler (with or without the topic) handler = handlers[topic] nparams = len(inspect.signature(handler).parameters) if nparams == 2: handler(self, message) elif nparams == 3: handler(self, message, topic)
def get_unique_external_zmq_sockets(self): """ Return an iterable containing all the zmq.Socket objects from `self.socket` which are not internal, without repetition. Originally, a socket was internal if its alias was one of the following: - loopback - _loopback_safe - inproc://loopback - inproc://_loopback_safe However, since we are storing more than one entry in the `self.socket` dictionary per zmq.socket (by storing its AgentAddress, for example), we need a way to simply get all non-internal zmq.socket objects, and this is precisely what this function does. """ reserved = ('loopback', '_loopback_safe', 'inproc://loopback', 'inproc://_loopback_safe') external_sockets = [] for k, v in self.socket.items(): if isinstance(k, zmq.sugar.socket.Socket): continue if isinstance(k, AgentAddress) and k.address in reserved: continue if k in reserved: continue external_sockets.append(v) return set(external_sockets)
def recv_messages(zmq_subscriber, timeout_count, message_count): """Test utility function. Subscriber thread that receives and counts ZMQ messages. Args: zmq_subscriber (zmq.Socket): ZMQ subscriber socket. timeout_count (int): No. of failed receives until exit. message_count (int): No. of messages expected to be received. Returns: (int) Number of messages received. """ # pylint: disable=E1101 fails = 0 # No. of receives that didn't return a message. receive_count = 0 # Total number of messages received. while fails < timeout_count: try: _ = zmq_subscriber.recv_string(flags=zmq.NOBLOCK) fails = 0 receive_count += 1 if receive_count == message_count: break except zmq.ZMQError as error: if error.errno == zmq.EAGAIN: pass else: raise fails += 1 time.sleep(1e-6) return receive_count
def __init__(self, *args, **kwargs): zmq.Socket.__init__(self, *args, **kwargs) # # Keep track of which thread this socket was created in # self.__dict__['_thread'] = threading.current_thread()