我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.DEALER。
def serviceA(context=None): #reuse context if it exists, otherwise make a new one context = context or zmq.Context.instance() service = context.socket(zmq.DEALER) #identify worker service.setsockopt(zmq.IDENTITY,b'A') service.connect("tcp://localhost:5560") while True: message = service.recv() with myLock: print "Service A got:" print message if message == "Service A": #do some work time.sleep(random.uniform(0,0.5)) service.send(b"Service A did your laundry") elif message == "END": break else: with myLock: print "the server has the wrong identities!" break
def _execute_command(self, command): if len(self.job_servers) == 0: app_log.error('there is no job server') return server = self.job_servers[self.job_server_index] self.job_server_index = (self.job_server_index + 1) % len(self.job_servers) context = zmq.Context.instance() zmq_sock = context.socket(zmq.DEALER) zmq_sock.linger = 1000 zmq_sock.identity = bytes(str(os.getpid()), 'ascii') ip = server['ip'] if ip == '*': ip = 'localhost' url = 'tcp://{0}:{1}'.format(ip, server['zmq_port']) app_log.info('connect %s', url) zmq_sock.connect(url) command = json_encode({'command': command}) app_log.info('command: %s', command) zmq_sock.send_multipart([b'0', bytes(command, 'ascii')]) stream = ZMQStream(zmq_sock) stream.on_recv(self.response_handler)
def serviceB(context=None): #reuse context if it exists, otherwise make a new one context = context or zmq.Context.instance() service = context.socket(zmq.DEALER) #identify worker service.setsockopt(zmq.IDENTITY,b'B') service.connect("tcp://localhost:5560") while True: message = service.recv() with myLock: print "Service B got:" print message if message == "Service B": #do some work time.sleep(random.uniform(0,0.5)) service.send(b"Service B cleaned your room") elif message == "END": break else: with myLock: print "the server has the wrong identities!" break
def test_hwm(self): zmq3 = zmq.zmq_version_info()[0] >= 3 for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER): s = self.context.socket(stype) s.hwm = 100 self.assertEqual(s.hwm, 100) if zmq3: try: self.assertEqual(s.sndhwm, 100) except AttributeError: pass try: self.assertEqual(s.rcvhwm, 100) except AttributeError: pass s.close()
def bounce(self, server, client, test_metadata=True): msg = [os.urandom(64), os.urandom(64)] client.send_multipart(msg) frames = self.recv_multipart(server, copy=False) recvd = list(map(lambda x: x.bytes, frames)) try: if test_metadata and not PYPY: for frame in frames: self.assertEqual(frame.get('User-Id'), 'anonymous') self.assertEqual(frame.get('Hello'), 'World') self.assertEqual(frame['Socket-Type'], 'DEALER') except zmq.ZMQVersionError: pass self.assertEqual(recvd, msg) server.send_multipart(recvd) msg2 = self.recv_multipart(client) self.assertEqual(msg2, msg)
def skip_plain_inauth(self): """test PLAIN failed authentication""" server = self.socket(zmq.DEALER) server.identity = b'IDENT' client = self.socket(zmq.DEALER) self.sockets.extend([server, client]) client.plain_username = USER client.plain_password = b'incorrect' server.plain_server = True self.assertEqual(server.mechanism, zmq.PLAIN) self.assertEqual(client.mechanism, zmq.PLAIN) self.start_zap() iface = 'tcp://127.0.0.1' port = server.bind_to_random_port(iface) client.connect("%s:%i" % (iface, port)) client.send(b'ping') server.rcvtimeo = 250 self.assertRaisesErrno(zmq.EAGAIN, server.recv) self.stop_zap()
def flash(self): if self.pid != str(os.getpid()): # reset process pid self.pid = str(os.getpid()) # update zmq sockets # (couldnt share socket in differenet process) self.zmq_socket = zmq.Context().socket(zmq.REQ) self.zmq_file_socket = zmq.Context().socket(zmq.DEALER) # update context ctx = main_context(self.main_file, self.main_folder) if self.main_param is not None: main_config_path = os.path.join(self.main_folder, self.main_param) params = yaml.load(open(main_config_path, 'r')) ctx.params = params self.context = ctx
def __init__(self, port, pipeline=100, host='localhost', log_file=None): """Create a new ZMQDealer object. """ context = zmq.Context.instance() # noinspection PyUnresolvedReferences self.socket = context.socket(zmq.DEALER) self.socket.hwm = pipeline self.socket.connect('tcp://%s:%d' % (host, port)) self._log_file = log_file self.poller = zmq.Poller() # noinspection PyUnresolvedReferences self.poller.register(self.socket, zmq.POLLIN) if self._log_file is not None: self._log_file = os.path.abspath(self._log_file) # If log file directory does not exists, create it log_dir = os.path.dirname(self._log_file) if not os.path.exists(log_dir): os.makedirs(log_dir) # clears any existing log if os.path.exists(self._log_file): os.remove(self._log_file)
def __init__(self, targname, cfg, isServer=False): self.targname = targname self.cfg = cfg self.isServer = isServer self.fnCallName = '' self.ctx = zmq.Context() self.ctx.linger = 100 if not self.isServer: self.sock = self.ctx.socket(zmq.DEALER) self.sock.linger = 100 self.sock.connect('tcp://%s:%s' % (self.cfg['server'],self.cfg.get('port',7677))) # this times out with EINVAL when no internet self.poller = zmq.Poller() self.poller.register(self.sock, zmq.POLLIN) else: self.sock = self.ctx.socket(zmq.ROUTER) self.sock.linger = 100 self.sock.bind('tcp://*:%s' % (self.cfg.get('port',7677))) self.poller = zmq.Poller() self.poller.register(self.sock, zmq.POLLIN) self.be = GetBackend(self.cfg['backend'])(self.targname, self.cfg) self.inTime = time.time() self.inactiveLimit = int(self.cfg.get('inactivelimit',0)) print 'inactivelimit ',self.inactiveLimit
def do_send(self, filename): """ If a build succeeds and generates files (detailed in a "BUILT" message), the master will reply with "SEND" *filename* indicating we should transfer the specified file (this is done on a separate socket with a different protocol; see :meth:`builder.PiWheelsPackage.transfer` for more details). Once the transfers concludes, reply to the master with "SENT". """ assert self.slave_id is not None, 'Send before hello' assert self.builder, 'Send before build / after failed build' assert self.builder.status, 'Send after failed build' pkg = [f for f in self.builder.files if f.filename == filename][0] self.logger.info('Sending %s to master on localhost', pkg.filename) ctx = zmq.Context.instance() queue = ctx.socket(zmq.DEALER) queue.ipv6 = True queue.hwm = 10 queue.connect('tcp://{master}:5556'.format(master=self.config.master)) try: pkg.transfer(queue, self.slave_id) finally: queue.close() return ['SENT']
def do_send(builder, filename): """ Handles sending files when requested by :func:`do_import`. """ logging.info('Sending %s to master', filename) pkg = [f for f in builder.files if f.filename == filename][0] ctx = zmq.Context.instance() queue = ctx.socket(zmq.DEALER) queue.ipv6 = True queue.hwm = 10 # NOTE: The following assumes that we're running on the master; this # *should* be the case (it's risky to run the importer on a tcp queue) # but there's no guarantee of this. queue.connect('tcp://localhost:5556') try: pkg.transfer(queue, 0) finally: queue.close()
def test_getsockopt_events(self): sock1, sock2, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER) eventlet.sleep() poll_out = zmq.Poller() poll_out.register(sock1, zmq.POLLOUT) sock_map = poll_out.poll(100) self.assertEqual(len(sock_map), 1) events = sock1.getsockopt(zmq.EVENTS) self.assertEqual(events & zmq.POLLOUT, zmq.POLLOUT) sock1.send(b'') poll_in = zmq.Poller() poll_in.register(sock2, zmq.POLLIN) sock_map = poll_in.poll(100) self.assertEqual(len(sock_map), 1) events = sock2.getsockopt(zmq.EVENTS) self.assertEqual(events & zmq.POLLIN, zmq.POLLIN)
def test_cpu_usage_after_pub_send_or_dealer_recv(self): """zmq eats CPU after PUB send or DEALER recv. Same https://bitbucket.org/eventlet/eventlet/issue/128 """ pub, sub, _port = self.create_bound_pair(zmq.PUB, zmq.SUB) sub.setsockopt(zmq.SUBSCRIBE, b"") eventlet.sleep() pub.send(b'test_send') tests.check_idle_cpu_usage(0.2, 0.1) sender, receiver, _port = self.create_bound_pair(zmq.DEALER, zmq.DEALER) eventlet.sleep() sender.send(b'test_recv') msg = receiver.recv() self.assertEqual(msg, b'test_recv') tests.check_idle_cpu_usage(0.2, 0.1)
def _send(self, ident, message): """ (asyncio coroutine) Send the message and wait for a response. :param message (sawtooth_sdk.protobuf.Message) :param ident (str) the identity of the zmq.DEALER to send to """ LOGGER.debug( "Sending %s(%s) to %s", str(to_protobuf_class(message.message_type).__name__), str(message.message_type), str(ident) ) return await self._socket.send_multipart([ ident, message.SerializeToString() ])
def __init__(self, url): self._url = url self._ctx = Context.instance() self._socket = self._ctx.socket(zmq.DEALER) self._socket.identity = uuid.uuid4().hex.encode()[0:16] self._msg_router = _MessageRouter() self._receiver = _Receiver(self._socket, self._msg_router) self._sender = _Sender(self._socket, self._msg_router) self._connection_state_listeners = {} self._recv_task = None # Monitoring properties self._monitor_sock = None self._monitor_fd = None self._monitor_task = None
def run(self): player = self._build_player() context = zmq.Context() c2s_socket = context.socket(zmq.PUSH) c2s_socket.setsockopt(zmq.IDENTITY, self.identity) c2s_socket.set_hwm(2) c2s_socket.connect(self.c2s) s2c_socket = context.socket(zmq.DEALER) s2c_socket.setsockopt(zmq.IDENTITY, self.identity) #s2c_socket.set_hwm(5) s2c_socket.connect(self.s2c) state = player.current_state() reward, isOver = 0, False while True: c2s_socket.send(dumps( (self.identity, state, reward, isOver)), copy=False) action = loads(s2c_socket.recv(copy=False).bytes) reward, isOver = player.action(action) state = player.current_state() # compatibility
def start(self): self.pid = os.getpid() context = zmq.Context.instance() zmq_sock = context.socket(zmq.DEALER) zmq_sock.linger = 1000 zmq_sock.identity = bytes(str(self.pid), 'ascii') if self.port == 0: self.zmq_port = zmq_sock.bind_to_random_port('tcp://{0}'.format(self.ip)) else: self.zmq_port = zmq_sock.bind('tcp://{0}:{1}'.format(self.ip, self.port)) self.zmq_stream = zmqstream.ZMQStream(zmq_sock) self.zmq_stream.on_recv(self.request_handler) self.log_format = (u'%(color)s[%(levelname)1.1s %(asctime)s.%(msecs).03d ' u'%(name)s-{0}]%(end_color)s %(message)s').format(self.pid) self.log.info('start %s', self) self.write_server_info_file() atexit.register(self.remove_server_info_file) self.io_loop = ioloop.IOLoop.current() try: self.io_loop.start() except KeyboardInterrupt: self.log.info('JobServer interrupted...') finally: self.remove_server_info_file()
def test_attr(self): """set setting/getting sockopts as attributes""" s = self.context.socket(zmq.DEALER) self.sockets.append(s) linger = 10 s.linger = linger self.assertEqual(linger, s.linger) self.assertEqual(linger, s.getsockopt(zmq.LINGER)) self.assertEqual(s.fd, s.getsockopt(zmq.FD))
def test_bad_attr(self): s = self.context.socket(zmq.DEALER) self.sockets.append(s) try: s.apple='foo' except AttributeError: pass else: self.fail("bad setattr should have raised AttributeError") try: s.apple except AttributeError: pass else: self.fail("bad getattr should have raised AttributeError")
def test_router_dealer(self): router, dealer = self.create_bound_pair(zmq.ROUTER, zmq.DEALER) msg1 = b'message1' dealer.send(msg1) ident = self.recv(router) more = router.rcvmore self.assertEqual(more, True) msg2 = self.recv(router) self.assertEqual(msg1, msg2) more = router.rcvmore self.assertEqual(more, False)
def test_default_mq_args(self): self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB) dev.setsockopt_in(zmq.LINGER, 0) dev.setsockopt_out(zmq.LINGER, 0) dev.setsockopt_mon(zmq.LINGER, 0) # this will raise if default args are wrong dev.start() self.teardown_device()
def test_mq_check_prefix(self): ins = self.context.socket(zmq.ROUTER) outs = self.context.socket(zmq.DEALER) mons = self.context.socket(zmq.PUB) self.sockets.extend([ins, outs, mons]) ins = unicode('in') outs = unicode('out') self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)
def test_plain(self): """test PLAIN authentication""" server = self.socket(zmq.DEALER) server.identity = b'IDENT' client = self.socket(zmq.DEALER) self.assertEqual(client.plain_username, b'') self.assertEqual(client.plain_password, b'') client.plain_username = USER client.plain_password = PASS self.assertEqual(client.getsockopt(zmq.PLAIN_USERNAME), USER) self.assertEqual(client.getsockopt(zmq.PLAIN_PASSWORD), PASS) self.assertEqual(client.plain_server, 0) self.assertEqual(server.plain_server, 0) server.plain_server = True self.assertEqual(server.mechanism, zmq.PLAIN) self.assertEqual(client.mechanism, zmq.PLAIN) assert not client.plain_server assert server.plain_server self.start_zap() iface = 'tcp://127.0.0.1' port = server.bind_to_random_port(iface) client.connect("%s:%i" % (iface, port)) self.bounce(server, client) self.stop_zap()
def test_curve(self): """test CURVE encryption""" server = self.socket(zmq.DEALER) server.identity = b'IDENT' client = self.socket(zmq.DEALER) self.sockets.extend([server, client]) try: server.curve_server = True except zmq.ZMQError as e: # will raise EINVAL if not linked against libsodium if e.errno == zmq.EINVAL: raise SkipTest("CURVE unsupported") server_public, server_secret = zmq.curve_keypair() client_public, client_secret = zmq.curve_keypair() server.curve_secretkey = server_secret server.curve_publickey = server_public client.curve_serverkey = server_public client.curve_publickey = client_public client.curve_secretkey = client_secret self.assertEqual(server.mechanism, zmq.CURVE) self.assertEqual(client.mechanism, zmq.CURVE) self.assertEqual(server.get(zmq.CURVE_SERVER), True) self.assertEqual(client.get(zmq.CURVE_SERVER), False) self.start_zap() iface = 'tcp://127.0.0.1' port = server.bind_to_random_port(iface) client.connect("%s:%i" % (iface, port)) self.bounce(server, client) self.stop_zap()
def __init__(self, uri=open(os.getenv("HOME") + "/.dh_uri","r").read()): self.context = zmq.Context() self.socket = self.context.socket(zmq.DEALER) self.socket.connect (uri) #self.stream = ZMQStream(self.socket)
def run(self): self.context = zmq.Context() self.socket = self.context.socket(zmq.DEALER) self.socket.connect (self.controller_uri) self.stream = ZMQStream(self.socket) self.stream.on_recv(self.on_rcv) self.ioloop = ioloop.IOLoop.instance() self.ioloop.add_callback(self.on_start) tornado.ioloop.PeriodicCallback(self.on_ping, 1000).start() try: self.ioloop.start() except KeyboardInterrupt: self.shutdown() self.ioloop.close()
def open_connection(self, address, status_port, data_port): self.statusBar().showMessage("Open session to {}:{}".format(address, status_port), 2000) socket = self.context.socket(zmq.DEALER) socket.identity = "Matplotlib_Qt_Client".encode() socket.connect("tcp://{}:{}".format(address, status_port)) socket.send(b"WHATSUP") poller = zmq.Poller() poller.register(socket, zmq.POLLOUT) time.sleep(0.1) evts = dict(poller.poll(100)) if socket in evts: try: reply, desc = [e.decode() for e in socket.recv_multipart()] desc = json.loads(desc) self.statusBar().showMessage("Connection established. Pulling plot information.", 2000) except: self.statusBar().showMessage("Could not connect to server.", 2000) return else: self.statusBar().showMessage("Server did not respond.", 2000) socket.close() self.construct_plots(desc) # Actual data listener if self.listener_thread: self.Datalistener.running = False self.listener_thread.quit() self.listener_thread.wait() self.listener_thread = QtCore.QThread() self.Datalistener = DataListener(address, data_port) self.Datalistener.moveToThread(self.listener_thread) self.listener_thread.started.connect(self.Datalistener.loop) self.Datalistener.message.connect(self.data_signal_received) self.Datalistener.finished.connect(self.stop_listening) QtCore.QTimer.singleShot(0, self.listener_thread.start)
def _memoryTask(settings, logger,master, url_setFrontend, url_getFrontend, url_getBackend, url_setBackend): from Cache import Slab, CacheSlubLRU # grab settings slabSize = settings.getSlabSize() preallocatedPool = settings.getPreallocatedPool() getterNumber = settings.getGetterThreadNumber() # initialize cache cache = CacheSlubLRU(preallocatedPool , slabSize, logger) #set as 10 mega, 1 mega per slab #log logger.debug("Memory Process initialized:" + str(preallocatedPool) + "B, get# = " + str(getterNumber)) # Prepare our context and sockets context = zmq.Context.instance() # Socket to talk to get socketGetFrontend = context.socket(zmq.ROUTER) socketGetFrontend.bind(url_getFrontend) # Socket to talk to workers socketGetBackend = context.socket(zmq.DEALER) socketGetBackend.bind(url_getBackend) timing = {} timing["getters"] = [] timing["setters"] = [-1] Thread(name='MemoryGetProxy',target=_proxyThread, args=(logger, master, socketGetFrontend, socketGetBackend, url_getFrontend, url_getBackend)).start() for i in range(getterNumber): timing["getters"].append(-1) th = Thread(name='MemoryGetter',target=_getThread, args=(i,logger, settings, cache,master,url_getBackend, timing)) th.start() slaveSetQueue = Queue.Queue() hostState = {} hostState["current"] = None Thread(name='MemoryPerformanceMetricator',target=_memoryMetricatorThread, args=(logger, cache, settings, master, timing)).start() Thread(name='MemorySlaveSetter',target=_setToSlaveThread, args=(logger,settings, cache,master,url_getBackend, slaveSetQueue, hostState)).start() _setThread(logger, settings, cache,master,url_setFrontend,slaveSetQueue, hostState, timing)
def test_term_hang(self): rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER) req.setsockopt(zmq.LINGER, 0) req.send(b'hello', copy=False) req.close() rep.close() self.context.term()
def test_null(self): """test NULL (default) security""" server = self.socket(zmq.DEALER) client = self.socket(zmq.DEALER) self.assertEqual(client.MECHANISM, zmq.NULL) self.assertEqual(server.mechanism, zmq.NULL) self.assertEqual(client.plain_server, 0) self.assertEqual(server.plain_server, 0) iface = 'tcp://127.0.0.1' port = server.bind_to_random_port(iface) client.connect("%s:%i" % (iface, port)) self.bounce(server, client, False)