我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用zmq.IDENTITY。
def serviceA(context=None): #reuse context if it exists, otherwise make a new one context = context or zmq.Context.instance() service = context.socket(zmq.DEALER) #identify worker service.setsockopt(zmq.IDENTITY,b'A') service.connect("tcp://localhost:5560") while True: message = service.recv() with myLock: print "Service A got:" print message if message == "Service A": #do some work time.sleep(random.uniform(0,0.5)) service.send(b"Service A did your laundry") elif message == "END": break else: with myLock: print "the server has the wrong identities!" break
def serviceB(context=None): #reuse context if it exists, otherwise make a new one context = context or zmq.Context.instance() service = context.socket(zmq.DEALER) #identify worker service.setsockopt(zmq.IDENTITY,b'B') service.connect("tcp://localhost:5560") while True: message = service.recv() with myLock: print "Service B got:" print message if message == "Service B": #do some work time.sleep(random.uniform(0,0.5)) service.send(b"Service B cleaned your room") elif message == "END": break else: with myLock: print "the server has the wrong identities!" break
def _setup_sockets(self): ins,outs = Device._setup_sockets(self) ctx = self._context mons = ctx.socket(self.mon_type) # set sockopts (must be done first, in case of zmq.IDENTITY) for opt,value in self._mon_sockopts: mons.setsockopt(opt, value) for iface in self._mon_binds: mons.bind(iface) for iface in self._mon_connects: mons.connect(iface) return ins,outs,mons
def test_unicode_sockopts(self): """test setting/getting sockopts with unicode strings""" topic = "tést" if str is not unicode: topic = topic.decode('utf8') p,s = self.create_bound_pair(zmq.PUB, zmq.SUB) self.assertEqual(s.send_unicode, s.send_unicode) self.assertEqual(p.recv_unicode, p.recv_unicode) self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic) self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic) s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16') self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic) s.setsockopt_unicode(zmq.SUBSCRIBE, topic) self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY) self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE) identb = s.getsockopt(zmq.IDENTITY) identu = identb.decode('utf16') identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16') self.assertEqual(identu, identu2) time.sleep(0.1) # wait for connection/subscription p.send_unicode(topic,zmq.SNDMORE) p.send_unicode(topic*2, encoding='latin-1') self.assertEqual(topic, s.recv_unicode()) self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
def get(self, option): c_data = new_pointer_from_opt(option, length=255) c_value_pointer = c_data[0] c_sizet_pointer = c_data[1] _retry_sys_call(C.zmq_getsockopt, self._zmq_socket, option, c_value_pointer, c_sizet_pointer) sz = c_sizet_pointer[0] v = value_from_opt_pointer(option, c_value_pointer, sz) if option != zmq.IDENTITY and option in zmq.constants.bytes_sockopts and v.endswith(b'\0'): v = v[:-1] return v
def __init__(self, address, port, logger): """ Initialize new instance with given address and port. :param address: String representation of IP address to listen to or a hostname. :param port: String port where to listen. :param logger: System logger """ self._logger = logger context = zmq.Context() self._receiver = context.socket(zmq.ROUTER) self._receiver.setsockopt(zmq.IDENTITY, b"recodex-monitor") address = "tcp://{}:{}".format(address, port) self._receiver.bind(address) self._logger.info("zeromq server initialized at {}".format(address))
def run(self): player = self._build_player() context = zmq.Context() c2s_socket = context.socket(zmq.PUSH) c2s_socket.setsockopt(zmq.IDENTITY, self.identity) c2s_socket.set_hwm(2) c2s_socket.connect(self.c2s) s2c_socket = context.socket(zmq.DEALER) s2c_socket.setsockopt(zmq.IDENTITY, self.identity) #s2c_socket.set_hwm(5) s2c_socket.connect(self.s2c) state = player.current_state() reward, isOver = 0, False while True: c2s_socket.send(dumps( (self.identity, state, reward, isOver)), copy=False) action = loads(s2c_socket.recv(copy=False).bytes) reward, isOver = player.action(action) state = player.current_state() # compatibility
def set_id(zsocket): """Set simple random printable identity on socket""" identity = u"%04x-%04x" % (randint(0, 0x10000), randint(0, 0x10000)) zsocket.setsockopt_string(zmq.IDENTITY, identity)
def test_dir(self): ctx = self.Context() s = ctx.socket(zmq.PUB) self.assertTrue('send' in dir(s)) self.assertTrue('IDENTITY' in dir(s)) self.assertTrue('AFFINITY' in dir(s)) self.assertTrue('FD' in dir(s)) s.close() ctx.term()
def worker(client,location,query): """use the yelp api to find the desired place at a location""" #reuse context if it exists, otherwise make a new one context = context or zmq.Context.instance() service = context.socket(zmq.ROUTER) #identify worker service.setsockopt(zmq.IDENTITY,b'A') service.connect("tcp://localhost:5560") while True: #send our identity service.send('') message = service.recv() with myLock: print "yelp worker got:" print message if message != "": response = queryYelp(client, request) service.send(response) elif message == "END": break # else: # with myLock: # print "the server has the wrong identities!" # break
def test_identity(self): s = self.context.socket(zmq.PULL) self.sockets.append(s) ident = b'identity\0\0' s.identity = ident self.assertEqual(s.get(zmq.IDENTITY), ident)
def test_init(self, mock_context): mock_socket = MagicMock() mock_receiver = MagicMock() logger = MagicMock() mock_context.return_value = mock_socket mock_socket.socket.return_value = mock_receiver ServerConnection("ip_address", 1025, logger) mock_context.assert_called_once_with() mock_socket.socket.assert_called_once_with(zmq.ROUTER) mock_receiver.setsockopt.assert_called_once_with(zmq.IDENTITY, b"recodex-monitor") mock_receiver.bind.assert_called_once_with("tcp://ip_address:1025")
def initialize(self): self._context = zmq.Context() self._tosock = self._context.socket(zmq.PUSH) self._frsock = self._context.socket(zmq.DEALER) self._tosock.setsockopt(zmq.IDENTITY, self.identity) self._frsock.setsockopt(zmq.IDENTITY, self.identity) self._tosock.set_hwm(2) self._tosock.connect(self._conn_info[0]) self._frsock.connect(self._conn_info[1])
def _setup_ipc(self): ''' Subscribe to the right topic in the device IPC and publish to the publisher proxy. ''' self.ctx = zmq.Context() # subscribe to device IPC log.debug('Creating the dealer IPC for %s', self._name) self.sub = self.ctx.socket(zmq.DEALER) if six.PY2: self.sub.setsockopt(zmq.IDENTITY, self._name) elif six.PY3: self.sub.setsockopt(zmq.IDENTITY, bytes(self._name, 'utf-8')) try: self.sub.setsockopt(zmq.HWM, self.opts['hwm']) # zmq 2 except AttributeError: # zmq 3 self.sub.setsockopt(zmq.RCVHWM, self.opts['hwm']) # subscribe to the corresponding IPC pipe self.sub.connect(DEV_IPC_URL) # self.sub.setsockopt(zmq.SUBSCRIBE, '') # publish to the publisher IPC self.pub = self.ctx.socket(zmq.PUSH) self.pub.connect(PUB_IPC_URL) try: self.pub.setsockopt(zmq.HWM, self.opts['hwm']) # zmq 2 except AttributeError: # zmq 3 self.pub.setsockopt(zmq.SNDHWM, self.opts['hwm'])
def set_socket_option(self, name, option, value): """calls ``zmq.setsockopt`` on the given socket. :param name: the name of the socket where data will pad through :param option: the option from the ``zmq`` module :param value: Here are some examples of options: * ``zmq.HWM``: Set high water mark * ``zmq.AFFINITY``: Set I/O thread affinity * ``zmq.IDENTITY``: Set socket identity * ``zmq.SUBSCRIBE``: Establish message filter * ``zmq.UNSUBSCRIBE``: Remove message filter * ``zmq.SNDBUF``: Set kernel transmit buffer size * ``zmq.RCVBUF``: Set kernel receive buffer size * ``zmq.LINGER``: Set linger period for socket shutdown * ``zmq.BACKLOG``: Set maximum length of the queue of outstanding connections * for the full list go to ``http://api.zeromq.org/4-0:zmq-setsockopt`` **Example:** :: >>> import zmq >>> from agentzero.core import SocketManager >>> >>> sockets = SocketManager() >>> sockets.create('pipe-in', zmq.PULL) >>> >>> # block after 10 messages are queued >>> sockets.set_socket_option('pipe-in', zmq.HWM, 10) """ socket = self.get_by_name(name) socket.setsockopt(option, value)
def create(self, name, socket_type): """Creates a named socket by type. Can raise a SocketAlreadyExists. Returns the socket itself :param name: the socket name :param socket_type: a valid socket type (i.e: ``zmq.REQ``, ``zmq.PUB``, ``zmq.PAIR``, ...) """ if name in self.sockets: raise SocketAlreadyExists(self, name) self.sockets[name] = self.context.socket(socket_type) self.set_socket_option(name, zmq.IDENTITY, str(uuid4())) return self.get_by_name(name)
def run(self): self.player = self._build_player() self.ctx = zmq.Context() self.c2s_socket = self.ctx.socket(zmq.PUSH) self.c2s_socket.setsockopt(zmq.IDENTITY, self.identity) self.c2s_socket.set_hwm(5) self.c2s_socket.connect(self.pipe_c2s) self._prepare() for dp in self.get_data(): self.c2s_socket.send(dumps(dp), copy=False)
def start_dispatch_thread(): global INITED, DISPATCHER if INITED: return DISPATCHER = zmq.devices.ThreadDevice(zmq.FORWARDER, zmq.XSUB, zmq.XPUB) DISPATCHER.bind_in(INTERNAL_SOCKET) DISPATCHER.connect_out(CHANGES_SOCKET) DISPATCHER.setsockopt_in(zmq.IDENTITY, b'XSUB') DISPATCHER.setsockopt_out(zmq.IDENTITY, b'XPUB') DISPATCHER.start() #Fix weird nosetests problems. TODO: find and fix underlying problem sleep(0.01) INITED = True
def test_tracker(self): "test the MessageTracker object for tracking when zmq is done with a buffer" addr = 'tcp://127.0.0.1' a = self.context.socket(zmq.PUB) port = a.bind_to_random_port(addr) a.close() iface = "%s:%i"%(addr,port) a = self.context.socket(zmq.PAIR) # a.setsockopt(zmq.IDENTITY, b"a") b = self.context.socket(zmq.PAIR) self.sockets.extend([a,b]) a.connect(iface) time.sleep(0.1) p1 = a.send(b'something', copy=False, track=True) self.assertTrue(isinstance(p1, zmq.MessageTracker)) self.assertFalse(p1.done) p2 = a.send_multipart([b'something', b'else'], copy=False, track=True) self.assert_(isinstance(p2, zmq.MessageTracker)) self.assertEqual(p2.done, False) self.assertEqual(p1.done, False) b.bind(iface) msg = b.recv_multipart() for i in range(10): if p1.done: break time.sleep(0.1) self.assertEqual(p1.done, True) self.assertEqual(msg, [b'something']) msg = b.recv_multipart() for i in range(10): if p2.done: break time.sleep(0.1) self.assertEqual(p2.done, True) self.assertEqual(msg, [b'something', b'else']) m = zmq.Frame(b"again", track=True) self.assertEqual(m.tracker.done, False) p1 = a.send(m, copy=False) p2 = a.send(m, copy=False) self.assertEqual(m.tracker.done, False) self.assertEqual(p1.done, False) self.assertEqual(p2.done, False) msg = b.recv_multipart() self.assertEqual(m.tracker.done, False) self.assertEqual(msg, [b'again']) msg = b.recv_multipart() self.assertEqual(m.tracker.done, False) self.assertEqual(msg, [b'again']) self.assertEqual(p1.done, False) self.assertEqual(p2.done, False) pm = m.tracker del m for i in range(10): if p1.done: break time.sleep(0.1) self.assertEqual(p1.done, True) self.assertEqual(p2.done, True) m = zmq.Frame(b'something', track=False) self.assertRaises(ValueError, a.send, m, copy=False, track=True)