我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用kombu.Consumer()。
def purge(self, timeout: int = 5) -> None: def _purge_errors(exc, interval): self._logger.error('Purging error: %s, will retry triggering in %s seconds', exc, interval, exc_info=True) def _purge_messages(cnsmr: BrightsideConsumer): cnsmr.purge() self._message = None connection = BrokerConnection(hostname=self._amqp_uri) with connections[connection].acquire(block=True) as conn: self._logger.debug('Got connection: %s', conn.as_uri()) with Consumer(conn, queues=[self._queue], callbacks=[_purge_messages]) as consumer: ensure_kwargs = self.RETRY_OPTIONS.copy() ensure_kwargs['errback'] = _purge_errors safe_purge = conn.ensure(consumer, _purge_messages, **ensure_kwargs) safe_purge(consumer)
def add_queue_rule(self, handler, name, autoack=True, prefetch_size=0, prefetch_count=0, **kwargs): """Add queue rule to Microservice :param prefetch_count: count of messages for getting from mq :param prefetch_size: size in bytes for getting data from mq :param handler: function for handling messages :param autoack: if True message.ack() after callback :type handler: callable object :param name: name of queue :type name: str """ rule = Rule(name, handler, self.logger, autoack=autoack, **kwargs) consumer = Consumer(self.connection, queues=[Queue(rule.name)], callbacks=[rule.callback], auto_declare=True) consumer.qos(prefetch_count=prefetch_count, prefetch_size=prefetch_size) self.consumers.append(consumer) self.logger.debug('Rule "%s" added!', rule.name)
def _start_consuming(self): """ Start consuming messages. This function is blocking. """ consumer = kombu.Consumer( self._conn, queues=[self._callback_queue], on_message=self._on_message, # accept=[self._codec.content_type], no_ack=True ) consumer.consume() while self._is_running: try: self._conn.drain_events(timeout=0.5) except socket.timeout: continue
def receive(self, timeout: int) -> BrightsideMessage: self._message = BrightsideMessage(BrightsideMessageHeader(uuid4(), "", BrightsideMessageType.MT_NONE), BrightsideMessageBody("")) def _consume(cnx: BrokerConnection, timesup: int) -> None: try: cnx.drain_events(timeout=timesup) except kombu_exceptions.TimeoutError: pass except(kombu_exceptions.ChannelLimitExceeded, kombu_exceptions.ConnectionLimitExceeded, kombu_exceptions.OperationalError, kombu_exceptions.NotBoundError, kombu_exceptions.MessageStateError, kombu_exceptions.LimitExceeded) as err: raise ChannelFailureException("Error connecting to RabbitMQ, see inner exception for details", err) def _consume_errors(exc, interval: int)-> None: self._logger.error('Draining error: %s, will retry triggering in %s seconds', exc, interval, exc_info=True) def _read_message(body: str, msg: KombuMessage) -> None: self._logger.debug("Monitoring event received at: %s headers: %s payload: %s", datetime.utcnow().isoformat(), msg.headers, body) self._msg = msg self._message = self._message_factory.create_message(msg) connection = BrokerConnection(hostname=self._amqp_uri) with connections[connection].acquire(block=True) as conn: self._logger.debug('Got connection: %s', conn.as_uri()) with Consumer(conn, queues=[self._queue], callbacks=[_read_message]) as consumer: consumer.qos(prefetch_count=1) ensure_kwargs = self.RETRY_OPTIONS.copy() ensure_kwargs['errback'] = _consume_errors safe_drain = conn.ensure(consumer, _consume, **ensure_kwargs) safe_drain(conn, timeout) return self._message
def get_consumers(self, Consumer, channel): consumers = [ self._create_service_queues(self.services, Consumer, channel), self._create_fanout_exchange(Consumer, channel) ] return consumers
def _create_service_queues(self, services, Consumer, channel): """ Creates necessary AMQP queues, one per service. """ log.debug('Declaring exchange %s', self.exchange) exchange = kombu.Exchange( self.exchange, channel=channel, durable=False ) exchange.declare() # channel.exchange_declare(exchange=self.exchange) queues = [] for service in services.values(): queue_name = '{}_service_{}'.format(self.exchange, service.name) log.debug('Declaring service queue %s', queue_name) queue = kombu.Queue( channel=channel, name=queue_name, exchange=exchange, routing_key=queue_name, exclusive=False, durable=False, # channel=channel ) queue.declare() queues.append(queue) # channel.queue_delete(queue=queue) # channel.queue_declare(queue=queue, auto_delete=True) # channel.queue_bind(queue, self.exchange) # channel.basic_consume(self._on_message, queue=queue, no_ack=False) consumer = Consumer( # self.connection, queues=queues, on_message=self._on_message, no_ack=False ) # consumer.consume(no_ack=False) return consumer
def _drain_events(self, on_event): if not hasattr(self, 'bus_queue'): raise Exception('You must listen for events before consuming them') with Connection(self._url) as conn: with Consumer(conn, self.bus_queue, callbacks=[on_event]): try: while True: conn.drain_events(timeout=0.5) except TimeoutError: pass
def get_consumers(self, Consumer, channel): """ Returns a list of kombu.Consumer instances to service all registered notification callbacks. If using the kombu.mixin.ConsumerMixin mixin class, these instances should be included in its get_consumers() method. :param Consumer: Message consumer class. :type Consumer: class :param channel: An open channel. :type channel: kombu.transport.*.Channel :returns: A list of consumer instances :rtype: [kombu.Consumer, ....] """ consumer_list = [] exchange = self.bus_mixin.producer.exchange for routing_key, callbacks in self.notify_callbacks.items(): queue = kombu.Queue( exchange=exchange, routing_key=routing_key) consumer = Consumer( queues=queue, callbacks=callbacks) consumer_list.append(consumer) self.bus_mixin.logger.info( 'Listening for "%s" notifications', routing_key) return consumer_list
def Consumer(self, *args, **kwargs): messages = self.get_messages() if messages is not None: print("USING DEADLINE CONSUMER") return DeadlineConsumer(messages, *args, **kwargs) else: print("USING KOMBU CONSUMER") return kombu.Consumer(*args, **kwargs)
def start_rmq_consume(self): """ start consuming from rmq :return: """ logger = logging.getLogger(self.__class__.__name__) logger.info("starting rabbit mq consumer") channel = self._conn.channel() # prep a consumer for the from_queue only self._queue_consumer = Consumer(channel=channel, queues=[self._from_queue], callbacks=[self.process_message]) self._queue_consumer.consume()
def _shovel_to_buffer(self, from_queue): """ poor man's alternative to the shovel plugin :param from_queue: shovel messages from which queue? entity.Queue object """ logger = logging.getLogger(self.__class__.__name__) logger.info("shovelling all messages from error queue to buffer queue") channel = self._conn.channel() # prep a consumer for the from_queue only queue_consumer = Consumer(channel=channel, queues=[from_queue], callbacks=[self._shoveller]) queue_consumer.consume() # finally drain all the work items from error-queue into shoveller while True: try: self._conn.drain_events(timeout=1) except socket.timeout: logger.debug("No more work-items in {q}".format(q=from_queue.name)) break except socket.error as e: # we don't care about EAGAIN, since we had intentionally started a non-blocking conn if e.errno == 35: msg = "{q} is empty".format(q=from_queue.name) logger.debug(msg) break # disconnect queue_consumer.cancel()
def _create_fanout_exchange(self, Consumer, channel): """ Creates a fanout queue to accept notifications. """ # declare_kwargs = dict( # exchange='{}_fanout'.format(self.exchange) # ) # if PIKA_VERSION >= StrictVersion('0.10.0'): # kwarg = 'exchange_type' # else: # kwarg = 'type' # declare_kwargs[kwarg] = 'fanout' exchange_name = '{}_fanout'.format(self.exchange) log.debug('Declaring fanout exchange %s', exchange_name) exchange = kombu.Exchange( name=exchange_name, channel=channel, durable=False, type='fanout' ) exchange.declare() queue_name = 'fanout_callback_{}'.format(uuid.uuid4()) log.debug('Declaring fanout queue %s', queue_name) queue = kombu.Queue( name=queue_name, exchange=exchange, exclusive=True, durable=False, channel=channel ) queue.declare() consumer = Consumer( # self.connection, queues=[queue], on_message=self._on_broadcast, no_ack=True # no_ack=True ) # consumer.consume(no_ack=True) # channel.exchange_declare(**declare_kwargs) # fanout_queue = channel.queue_declare(exclusive=True) # channel.queue_bind(exchange='{}_fanout'.format(self.exchange), queue=fanout_queue.method.queue) # channel.basic_consume(self._on_broadcast, queue=fanout_queue.method.queue, no_ack=True) return consumer