我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用gevent.queue.get()。
def test_fetch_and_prepare_message(mocker): mocker.patch('iris.bin.sender.message_send_enqueue') from iris.bin.sender import ( fetch_and_prepare_message, message_queue, per_mode_send_queues ) init_queue_with_item(message_queue, {'message_id': 1234, 'plan_id': None}) fetch_and_prepare_message() assert message_queue.qsize() == 0 send_queue = per_mode_send_queues.setdefault('email', gevent.queue.Queue()) init_queue_with_item(send_queue, {'message_id': 1234, 'plan_id': None}) assert message_queue.qsize() == 0 assert send_queue.qsize() == 1 m = send_queue.get() assert m['message_id'] == 1234
def getLocalIp(self): i = 0 local_ip = None # ????local_ip???local_ip??cmd_agent??????????? while i < 3: gevent.sleep(3) sys_conf = os.path.join(_agentBasePath , "conf","sysconf.ini") conf = ConfigParser.ConfigParser() conf.optionxform = str if os.path.exists(sys_conf): conf.read(sys_conf) if conf.has_section('sys'): local_ip = conf.get('sys','local_ip') if local_ip: break else: # ?????????logger????????logger logger.error('not found local_ip, will retry') i += 1 return local_ip
def handle_customize(self): self.generate_uuid() # self.inner_ip = self.getLocalIp() # if not self.inner_ip: # logger.error('not found local_ip, please restart agent') # sys.exit(1) server_groups = self.conf.get('report', 'server_groups') job_list = [] job_list.append(gevent.spawn(self.localReport)) job_list.append(gevent.spawn(self.localJsonReport)) jobs = self.send_to_server_groups(server_groups, self.config["linger_ms"], self.config["max_queued_messages"]) job_list.extend(jobs) gevent.joinall(job_list)
def processJsonRep(self,socket, address): org = self.conf.get('base','client_id') jsonSocket = jsonSession(socket=socket,org=org) while 1: try: code, data = jsonSocket.recv() if code != 0: logger.error("local receive error (%s %s)"%(code, data)) socket.close() break try: _reportQueue.put_nowait(data) except gevent.queue.Full: logger.error("report queue is full") jsonSocket.send_response(conf.global_vars.ErrCode.QueueFull, 'ok') continue jsonSocket.send_response(0, 'ok') except Exception, e: logger.error("uncaught error, e={}, traceback={}".format(e, traceback.format_exc())) socket.close() break
def processRep(self,socket, address): org = self.conf.get('base', 'client_id') pbSocket = pbSession(socket=socket,org=org) while 1: try: code, data = pbSocket.recv(decode=False) if code != 0: if "connection closed" not in data: logger.error("local receive error (%s %s)"%(code, data)) socket.close() break try: _reportQueue.put_nowait(data) except gevent.queue.Full: logger.error("report queue is full") pbSocket.send_response(conf.global_vars.ErrCode.QueueFull, 'ok') continue pbSocket.send_response(0, 'ok') except Exception, e: logger.error("uncaught error, e={}, traceback={}".format(e, traceback.format_exc())) socket.close() break
def enqueue(self, queue_event_list, max_queued_messages): if len(queue_event_list) == 0: return while True: try: # get msg task_msg = _reportQueue.get() if not task_msg: continue dataid, org, ip = task_msg[0][-3:] logger.debug('recv msg, org: %s dataid: %s' %(org, dataid)) # enqueue for (q, flush_ready_event) in queue_event_list: if not q.full(): q.put_nowait(task_msg) else: logger.error("queue full") if q.qsize() >= max_queued_messages and not flush_ready_event.is_set(): flush_ready_event.set() except Exception, e: logger.error(e)
def on_message_received(self, namespace, message): consumers = self.consumers.get(namespace, []) # Compress the message if len(message) >= MIN_COMPRESS_SIZE: compressed = make_compressed_frame(message, COMPRESSOR) else: compressed = None message = Message(compressed=compressed, raw=message) with self.metrics.timer("dispatch"): for consumer in consumers: consumer.put(message)
def listen(self, namespace, max_timeout): """Register to listen to a namespace and yield messages as they arrive. If no messages arrive within `max_timeout` seconds, this will yield a `None` to allow clients to do periodic actions like send PINGs. This will run forever and yield items as an iterable. Use it in a loop and break out of it when you want to deregister. """ queue = gevent.queue.Queue() namespace = namespace.rstrip("/") for ns in _walk_namespace_hierarchy(namespace): self.consumers.setdefault(ns, []).append(queue) try: while True: # jitter the timeout a bit to ensure we don't herd timeout = max_timeout - random.uniform(0, max_timeout / 2) try: yield queue.get(block=True, timeout=timeout) except gevent.queue.Empty: yield None # ensure we're not starving others by spinning gevent.sleep() finally: for ns in _walk_namespace_hierarchy(namespace): self.consumers[ns].remove(queue) if not self.consumers[ns]: del self.consumers[ns]
def _create_greenlet_worker(self, queue): def greenlet_worker(): while True: try: func = queue.get() if func is _STOP: break func() except Empty: continue except Exception as exc: log.warning("Exception in worker greenlet") log.exception(exc) return gevent.spawn(greenlet_worker)
def init_queue_with_item(queue, item=None): # drain out queue while queue.qsize() > 0: queue.get() if item: queue.put(item)
def test_handle_api_request_v0_send(mocker): from iris.bin.sender import message_send_enqueue from iris.sender.rpc import handle_api_request, send_funcs from iris.sender.shared import per_mode_send_queues send_funcs['message_send_enqueue'] = message_send_enqueue send_queue = per_mode_send_queues.setdefault('email', gevent.queue.Queue()) # support expanding target mocker.patch('iris.sender.cache.targets_for_role', lambda role, target: [target]) mocker.patch('iris.bin.sender.db') mocker.patch('iris.metrics.stats') mocker.patch('iris.bin.sender.set_target_contact').return_value = True mock_address = mocker.MagicMock() mock_socket = mocker.MagicMock() mock_socket.recv.return_value = msgpack.packb({ 'endpoint': 'v0/send', 'data': fake_notification, }) while send_queue.qsize() > 0: send_queue.get() handle_api_request(mock_socket, mock_address) assert send_queue.qsize() == 1 m = send_queue.get() assert m['subject'] == '[%s] %s' % (fake_notification['application'], fake_notification['subject'])
def localJsonReport(self): import platform if platform.system() == 'Windows': rep_port = self.conf.get('report','local_json_port') server = StreamServer(('127.0.0.1', rep_port), self.processJsonRep) server.serve_forever() else: from libs.unixSocket import bind_unix_listener unix_sock_name = os.path.join(_agentBasePath,'localJsonReport.sock') server = StreamServer(bind_unix_listener(unix_sock_name), self.processJsonRep) os.chmod(unix_sock_name, 0o777) server.serve_forever() # ??????????? #@profile
def sendToServer(self, group_name, server_list, local_queue, flush_ready_event, linger_ms, max_queued_messages): connected = False rs = None while True: try: # get msg task_msgs = self.batch_fetch(local_queue, flush_ready_event, linger_ms, max_queued_messages) if not task_msgs: continue # retry 3 times if failed while True: # check connection if connected is False: if rs is not None: rs.session.close() rs = self.get_report_server(group_name, server_list) if rs.connect() != 0: gevent.sleep(3) continue else: connected = True # send data ret = rs.batch_send_data(task_msgs) if ret == 0: break logger.error("send msg error!, ret={}".format(ret)) connected = False except Exception, e: connected = False logger.error("Uncaught error here! e={}, traceback={}".format(e, traceback.format_exc())) #@profile
def batch_fetch(self, queue, event, linger_ms, max_queued_messages): if queue.qsize() < max_queued_messages: event.wait(linger_ms / 1000) if event.is_set(): event.clear() batch_msgs = [queue.get() for _ in range(queue.qsize())] return batch_msgs #@profile