我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.LINGER。
def disconnect(self): if self.is_connected: if self.socket: self.socket.setsockopt(zmq.LINGER, 0) if self.poller: self.poller.unregister(self.socket) self.socket.close() if self.context: self.context.term() self.is_connected = False
def __init__(self, data_dir=bqueryd.DEFAULT_DATA_DIR, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.DEBUG): if not os.path.exists(data_dir) or not os.path.isdir(data_dir): raise Exception("Datadir %s is not a valid directory" % data_dir) self.worker_id = binascii.hexlify(os.urandom(8)) self.node_name = socket.gethostname() self.data_dir = data_dir self.data_files = set() context = zmq.Context() self.socket = context.socket(zmq.ROUTER) self.socket.setsockopt(zmq.LINGER, 500) self.socket.identity = self.worker_id self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT) self.redis_server = redis.from_url(redis_url) self.controllers = {} # Keep a dict of timestamps when you last spoke to controllers self.check_controllers() self.last_wrm = 0 self.start_time = time.time() self.logger = bqueryd.logger.getChild('worker ' + self.worker_id) self.logger.setLevel(loglevel) self.msg_count = 0 signal.signal(signal.SIGTERM, self.term_signal())
def brute_zmq(host, port=5555, user=None, password=None, db=0): context = zmq.Context() # Configure socket = context.socket(zmq.SUB) socket.setsockopt(zmq.SUBSCRIBE, b"") # All topics socket.setsockopt(zmq.LINGER, 0) # All topics socket.RCVTIMEO = 1000 # timeout: 1 sec # Connect socket.connect("tcp://%s:%s" % (host, port)) # Try to receive try: socket.recv() return True except Exception: return False finally: socket.close()
def handle_zmq(host, port=5555, extra_config=None): # log.debug(" * Connection to ZeroMQ: %s : %s" % (host, port)) context = zmq.Context() # Configure socket = context.socket(zmq.SUB) socket.setsockopt(zmq.SUBSCRIBE, b"") # All topics socket.setsockopt(zmq.LINGER, 0) # All topics socket.RCVTIMEO = 1000 # timeout: 1 sec # Connect socket.connect("tcp://%s:%s" % (host, port)) # Try to receive try: socket.recv() return True except Exception: return False finally: socket.close()
def __init__(self, bind_address, linger=-1, poll_timeout=2, loop=None): self.bind_address = bind_address self.loop = loop self.context = zmq.asyncio.Context() self.poll_timeout = poll_timeout self.socket = self.context.socket(zmq.ROUTER) self.socket.setsockopt(zmq.LINGER, linger) self.in_poller = zmq.asyncio.Poller() self.in_poller.register(self.socket, zmq.POLLIN) log.info('Bound to: ' + self.bind_address) self.socket.bind(self.bind_address) self._kill = False
def _frame_worker(self): if(getattr(self, '_frame_class', None)): ctx = zmq.Context.instance() skt = ctx.socket(zmq.SUB) skt.connect("tcp://%s:27185" % self._moku._ip) skt.setsockopt_string(zmq.SUBSCRIBE, u'') skt.setsockopt(zmq.RCVHWM, 8) skt.setsockopt(zmq.LINGER, 5000) fr = self._frame_class(**self._frame_kwargs) try: while self._running: if skt in zmq.select([skt], [], [], 1.0)[0]: d = skt.recv() fr.add_packet(d) if fr._complete: self._queue.put_nowait(fr) fr = self._frame_class(**self._frame_kwargs) finally: skt.close()
def reset_socket(self): # Close things if necessary if self.pubsock is not None: self.pubsock.close() print("Nutmeg connecting") print("\tPublishing to:", self.pub_address) self.pubsock = self.context.socket(zmq.PUB) # self.socket.setsockopt(zmq.LINGER, 0) self.pubsock.connect(self.pub_address) if not self.sub_running: self.sub_running = True self._subscribe() else: self.reset_sub = True # # Last time a disconnection occurred # self.disconnected_t = time.time() self.running = True self._poke_server()
def zcreate_pipe(ctx, hwm=1000): backend = zsocket.ZSocket(ctx, zmq.PAIR) frontend = zsocket.ZSocket(ctx, zmq.PAIR) backend.set_hwm(hwm) frontend.set_hwm(hwm) # close immediately on shutdown backend.setsockopt(zmq.LINGER, 0) frontend.setsockopt(zmq.LINGER, 0) endpoint = "inproc://zactor-%04x-%04x\n"\ %(random.randint(0, 0x10000), random.randint(0, 0x10000)) while True: try: frontend.bind(endpoint) except: endpoint = "inproc://zactor-%04x-%04x\n"\ %(random.randint(0, 0x10000), random.randint(0, 0x10000)) else: break backend.connect(endpoint) return (frontend, backend)
def serve_data(ds, addr): ctx = zmq.Context() socket = ctx.socket(zmq.PUSH) socket.set_hwm(10) socket.bind(addr) ds = RepeatedData(ds, -1) try: ds.reset_state() logger.info("Serving data at {}".format(addr)) while True: for dp in ds.get_data(): socket.send(dumps(dp), copy=False) finally: socket.setsockopt(zmq.LINGER, 0) socket.close() if not ctx.closed: ctx.destroy(0)
def socket_fitness(self, chrom): if self.socket.closed: self.socket = self.context.socket(zmq.REQ) self.socket.bind(self.socket_port) self.poll.register(self.socket, zmq.POLLIN) self.socket.send_string(';'.join([ self.func.get_Driving(), self.func.get_Follower(), self.func.get_Link(), self.func.get_Target(), self.func.get_ExpressionName(), self.func.get_Expression(), ','.join(["{}:{}".format(e[0], e[1]) for e in self.targetPath]), ','.join([str(e) for e in chrom]) ])) while True: socks = dict(self.poll.poll(100)) if socks.get(self.socket)==zmq.POLLIN: return float(self.socket.recv().decode('utf-8')) else: self.socket.setsockopt(zmq.LINGER, 0) self.socket.close() self.poll.unregister(self.socket) return self.func(chrom)
def test_bad_sockopts(self): """Test that appropriate errors are raised on bad socket options""" s = self.context.socket(zmq.PUB) self.sockets.append(s) s.setsockopt(zmq.LINGER, 0) # unrecognized int sockopts pass through to libzmq, and should raise EINVAL self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, 9999, 5) self.assertRaisesErrno(zmq.EINVAL, s.getsockopt, 9999) # but only int sockopts are allowed through this way, otherwise raise a TypeError self.assertRaises(TypeError, s.setsockopt, 9999, b"5") # some sockopts are valid in general, but not on every socket: self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, zmq.SUBSCRIBE, b'hi')
def test_sockopt_roundtrip(self): "test set/getsockopt roundtrip." p = self.context.socket(zmq.PUB) self.sockets.append(p) p.setsockopt(zmq.LINGER, 11) self.assertEqual(p.getsockopt(zmq.LINGER), 11)
def test_attr(self): """set setting/getting sockopts as attributes""" s = self.context.socket(zmq.DEALER) self.sockets.append(s) linger = 10 s.linger = linger self.assertEqual(linger, s.linger) self.assertEqual(linger, s.getsockopt(zmq.LINGER)) self.assertEqual(s.fd, s.getsockopt(zmq.FD))
def test_term_hang(self): rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER) req.setsockopt(zmq.LINGER, 0) req.send(b'hello', copy=False) req.close() rep.close() self.context.term()
def test_default_mq_args(self): self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB) dev.setsockopt_in(zmq.LINGER, 0) dev.setsockopt_out(zmq.LINGER, 0) dev.setsockopt_mon(zmq.LINGER, 0) # this will raise if default args are wrong dev.start() self.teardown_device()
def create_bound_pair(self, type1=zmq.PAIR, type2=zmq.PAIR, interface='tcp://127.0.0.1'): """Create a bound socket pair using a random port.""" s1 = self.context.socket(type1) s1.setsockopt(zmq.LINGER, 0) port = s1.bind_to_random_port(interface) s2 = self.context.socket(type2) s2.setsockopt(zmq.LINGER, 0) s2.connect('%s:%s' % (interface, port)) self.sockets.extend([s1,s2]) return s1, s2
def connect(self): """Bind or connect to ZMQ socket. Requires package zmq.""" context = zmq.Context() self.socket = context.socket(self.socket_type) self.socket.setsockopt(zmq.LINGER, 1) host = 'tcp://{}:{}'.format(self.address, self.port) if self.socket_type == zmq.REP: self.socket.bind(host) else: self.socket.connect(host) print('python thread connected to ' + host)
def setup_socket(self): """Sets up the ZMQ socket.""" context = zmq.Context() # The component inheriting from BaseComponent should self.socket.connect # with the appropriate address. self.socket = context.socket(zmq.REQ) # LINGER sets a timeout for socket.send. self.socket.setsockopt(zmq.LINGER, 0) # RCVTIME0 sets a timeout for socket.recv. self.socket.setsockopt(zmq.RCVTIMEO, 500) # milliseconds
def test_forwarder(forwarder, tcp_sender, context): """Monitor should correctly send data""" sender = tcp_sender mon = context.socket(zmq.SUB) mon.setsockopt_string(zmq.SUBSCRIBE, "") mon.setsockopt(zmq.LINGER, 0) mon.connect("tcp://localhost:6500") recv = context.socket(zmq.SUB) recv.setsockopt_string(zmq.SUBSCRIBE, "") recv.setsockopt(zmq.LINGER, 0) recv.connect("tcp://localhost:6002") server_address = ('localhost', 6001) sender.connect(server_address) # waiting for warmup time.sleep(1) sender.sendall(b"test test") data = mon.recv() assert data is not None sender.sendall(b"test test") data = recv.recv() assert data is not None sender.close() forwarder.terminate()
def sender(context): s = context.socket(zmq.PUSH) s.setsockopt(zmq.LINGER, 0) s.bind("tcp://*:6800") time.sleep(1) return s
def __init__(self, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.INFO): self.redis_url = redis_url self.redis_server = redis.from_url(redis_url) self.context = zmq.Context() self.socket = self.context.socket(zmq.ROUTER) self.socket.setsockopt(zmq.LINGER, 500) self.socket.setsockopt(zmq.ROUTER_MANDATORY, 1) # Paranoid for debugging purposes self.socket.setsockopt(zmq.SNDTIMEO, 1000) # Short timeout self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT) self.node_name = socket.gethostname() self.address = bind_to_random_port(self.socket, 'tcp://' + get_my_ip(), min_port=14300, max_port=14399, max_tries=100) with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.address'), 'w') as F: F.write(self.address) with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.pid'), 'w') as F: F.write(str(os.getpid())) self.logger = bqueryd.logger.getChild('controller').getChild(self.address) self.logger.setLevel(loglevel) self.msg_count_in = 0 self.rpc_results = [] # buffer of results that are ready to be returned to callers self.rpc_segments = {} # Certain RPC calls get split and divided over workers, this dict tracks the original RPCs self.worker_map = {} # maintain a list of connected workers TODO get rid of unresponsive ones... self.files_map = {} # shows on which workers a file is available on self.worker_out_messages = {None: []} # A dict of buffers, used to round-robin based on message affinity self.worker_out_messages_sequence = [None] # used to round-robin the outgoing messages self.is_running = True self.last_heartbeat = 0 self.others = {} # A dict of other Controllers running on other DQE nodes self.start_time = time.time()
def connect_socket(self): reply = None for c in self.controllers: self.logger.debug('Establishing socket connection to %s' % c) tmp_sock = self.context.socket(zmq.REQ) tmp_sock.setsockopt(zmq.RCVTIMEO, 2000) tmp_sock.setsockopt(zmq.LINGER, 0) tmp_sock.identity = self.identity tmp_sock.connect(c) # first ping the controller to see if it responds at all msg = RPCMessage({'payload': 'ping'}) tmp_sock.send_json(msg) try: reply = msg_factory(tmp_sock.recv_json()) self.address = c break except: traceback.print_exc() continue if reply: # Now set the timeout to the actual requested self.logger.debug("Connection OK, setting network timeout to %s milliseconds", self.timeout*1000) self.controller = tmp_sock self.controller.setsockopt(zmq.RCVTIMEO, self.timeout*1000) else: raise Exception('No controller connection')
def test_sockopts(self): """setting socket options with ctx attributes""" ctx = self.Context() ctx.linger = 5 self.assertEqual(ctx.linger, 5) s = ctx.socket(zmq.REQ) self.assertEqual(s.linger, 5) self.assertEqual(s.getsockopt(zmq.LINGER), 5) s.close() # check that subscribe doesn't get set on sockets that don't subscribe: ctx.subscribe = b'' s = ctx.socket(zmq.REQ) s.close() ctx.term()