Python kombu 模块,Consumer() 实例源码

我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用kombu.Consumer()

项目:Brightside    作者:BrighterCommand    | 项目源码 | 文件源码
def purge(self, timeout: int = 5) -> None:

        def _purge_errors(exc, interval):
            self._logger.error('Purging error: %s, will retry triggering in %s seconds', exc, interval, exc_info=True)

        def _purge_messages(cnsmr: BrightsideConsumer):
            cnsmr.purge()
            self._message = None

        connection = BrokerConnection(hostname=self._amqp_uri)
        with connections[connection].acquire(block=True) as conn:
            self._logger.debug('Got connection: %s', conn.as_uri())
            with Consumer(conn, queues=[self._queue], callbacks=[_purge_messages]) as consumer:
                ensure_kwargs = self.RETRY_OPTIONS.copy()
                ensure_kwargs['errback'] = _purge_errors
                safe_purge = conn.ensure(consumer, _purge_messages, **ensure_kwargs)
                safe_purge(consumer)
项目:microservices    作者:viatoriche    | 项目源码 | 文件源码
def add_queue_rule(self, handler, name, autoack=True, prefetch_size=0,
                       prefetch_count=0, **kwargs):
        """Add queue rule to Microservice

        :param prefetch_count: count of messages for getting from mq
        :param prefetch_size: size in bytes for getting data from mq
        :param handler: function for handling messages
        :param autoack: if True message.ack() after callback
        :type handler: callable object
        :param name: name of queue
        :type name: str
        """

        rule = Rule(name, handler, self.logger, autoack=autoack, **kwargs)
        consumer = Consumer(self.connection, queues=[Queue(rule.name)],
                            callbacks=[rule.callback], auto_declare=True)
        consumer.qos(prefetch_count=prefetch_count, prefetch_size=prefetch_size)
        self.consumers.append(consumer)
        self.logger.debug('Rule "%s" added!', rule.name)
项目:isc    作者:and3rson    | 项目源码 | 文件源码
def _start_consuming(self):
        """
        Start consuming messages.
        This function is blocking.
        """
        consumer = kombu.Consumer(
            self._conn,
            queues=[self._callback_queue],
            on_message=self._on_message,
            # accept=[self._codec.content_type],
            no_ack=True
        )

        consumer.consume()

        while self._is_running:
            try:
                self._conn.drain_events(timeout=0.5)
            except socket.timeout:
                continue
项目:Brightside    作者:BrighterCommand    | 项目源码 | 文件源码
def receive(self, timeout: int) -> BrightsideMessage:

        self._message = BrightsideMessage(BrightsideMessageHeader(uuid4(), "", BrightsideMessageType.MT_NONE), BrightsideMessageBody(""))

        def _consume(cnx: BrokerConnection, timesup: int) -> None:
            try:
                cnx.drain_events(timeout=timesup)
            except kombu_exceptions.TimeoutError:
                pass
            except(kombu_exceptions.ChannelLimitExceeded,
                   kombu_exceptions.ConnectionLimitExceeded,
                   kombu_exceptions.OperationalError,
                   kombu_exceptions.NotBoundError,
                   kombu_exceptions.MessageStateError,
                   kombu_exceptions.LimitExceeded) as err:
                raise ChannelFailureException("Error connecting to RabbitMQ, see inner exception for details", err)

        def _consume_errors(exc, interval: int)-> None:
            self._logger.error('Draining error: %s, will retry triggering in %s seconds', exc, interval, exc_info=True)

        def _read_message(body: str, msg: KombuMessage) -> None:
            self._logger.debug("Monitoring event received at: %s headers: %s payload: %s", datetime.utcnow().isoformat(), msg.headers, body)
            self._msg = msg
            self._message = self._message_factory.create_message(msg)

        connection = BrokerConnection(hostname=self._amqp_uri)
        with connections[connection].acquire(block=True) as conn:
            self._logger.debug('Got connection: %s', conn.as_uri())
            with Consumer(conn, queues=[self._queue], callbacks=[_read_message]) as consumer:
                consumer.qos(prefetch_count=1)
                ensure_kwargs = self.RETRY_OPTIONS.copy()
                ensure_kwargs['errback'] = _consume_errors
                safe_drain = conn.ensure(consumer, _consume, **ensure_kwargs)
                safe_drain(conn, timeout)

        return self._message
项目:isc    作者:and3rson    | 项目源码 | 文件源码
def get_consumers(self, Consumer, channel):
        consumers = [
            self._create_service_queues(self.services, Consumer, channel),
            self._create_fanout_exchange(Consumer, channel)
        ]
        return consumers
项目:isc    作者:and3rson    | 项目源码 | 文件源码
def _create_service_queues(self, services, Consumer, channel):
        """
        Creates necessary AMQP queues, one per service.
        """
        log.debug('Declaring exchange %s', self.exchange)
        exchange = kombu.Exchange(
            self.exchange,
            channel=channel,
            durable=False
        )
        exchange.declare()
        # channel.exchange_declare(exchange=self.exchange)
        queues = []
        for service in services.values():
            queue_name = '{}_service_{}'.format(self.exchange, service.name)
            log.debug('Declaring service queue %s', queue_name)
            queue = kombu.Queue(
                channel=channel,
                name=queue_name,
                exchange=exchange,
                routing_key=queue_name,
                exclusive=False,
                durable=False,
                # channel=channel
            )
            queue.declare()
            queues.append(queue)
            # channel.queue_delete(queue=queue)
            # channel.queue_declare(queue=queue, auto_delete=True)
            # channel.queue_bind(queue, self.exchange)
            # channel.basic_consume(self._on_message, queue=queue, no_ack=False)
        consumer = Consumer(
            # self.connection,
            queues=queues,
            on_message=self._on_message,
            no_ack=False
        )
        # consumer.consume(no_ack=False)
        return consumer
项目:xivo-ctid-ng    作者:wazo-pbx    | 项目源码 | 文件源码
def _drain_events(self, on_event):
        if not hasattr(self, 'bus_queue'):
            raise Exception('You must listen for events before consuming them')
        with Connection(self._url) as conn:
            with Consumer(conn, self.bus_queue, callbacks=[on_event]):
                try:
                    while True:
                        conn.drain_events(timeout=0.5)
                except TimeoutError:
                    pass
项目:commissaire    作者:projectatomic    | 项目源码 | 文件源码
def get_consumers(self, Consumer, channel):
        """
        Returns a list of kombu.Consumer instances to service all registered
        notification callbacks.

        If using the kombu.mixin.ConsumerMixin mixin class, these instances
        should be included in its get_consumers() method.

        :param Consumer: Message consumer class.
        :type Consumer: class
        :param channel: An open channel.
        :type channel: kombu.transport.*.Channel
        :returns: A list of consumer instances
        :rtype: [kombu.Consumer, ....]
        """
        consumer_list = []
        exchange = self.bus_mixin.producer.exchange
        for routing_key, callbacks in self.notify_callbacks.items():
            queue = kombu.Queue(
                exchange=exchange, routing_key=routing_key)
            consumer = Consumer(
                queues=queue, callbacks=callbacks)
            consumer_list.append(consumer)
            self.bus_mixin.logger.info(
                'Listening for "%s" notifications', routing_key)
        return consumer_list
项目:celery-deadline    作者:chadrik    | 项目源码 | 文件源码
def Consumer(self, *args, **kwargs):
        messages = self.get_messages()
        if messages is not None:
            print("USING DEADLINE CONSUMER")
            return DeadlineConsumer(messages, *args, **kwargs)
        else:
            print("USING KOMBU CONSUMER")
            return kombu.Consumer(*args, **kwargs)
项目:EasyJobLite    作者:treebohotels    | 项目源码 | 文件源码
def start_rmq_consume(self):
        """
        start consuming from rmq
        :return: 
        """
        logger = logging.getLogger(self.__class__.__name__)
        logger.info("starting rabbit mq consumer")

        channel = self._conn.channel()

        # prep a consumer for the from_queue only
        self._queue_consumer = Consumer(channel=channel,
                                        queues=[self._from_queue],
                                        callbacks=[self.process_message])
        self._queue_consumer.consume()
项目:EasyJobLite    作者:treebohotels    | 项目源码 | 文件源码
def _shovel_to_buffer(self, from_queue):
        """
        poor man's alternative to the shovel plugin

        :param from_queue: shovel messages from which queue? entity.Queue object
        """
        logger = logging.getLogger(self.__class__.__name__)
        logger.info("shovelling all messages from error queue to buffer queue")

        channel = self._conn.channel()

        # prep a consumer for the from_queue only
        queue_consumer = Consumer(channel=channel,
                                  queues=[from_queue],
                                  callbacks=[self._shoveller])
        queue_consumer.consume()

        # finally drain all the work items from error-queue into shoveller
        while True:
            try:
                self._conn.drain_events(timeout=1)

            except socket.timeout:
                logger.debug("No more work-items in {q}".format(q=from_queue.name))
                break

            except socket.error as e:
                # we don't care about EAGAIN, since we had intentionally started a non-blocking conn
                if e.errno == 35:
                    msg = "{q} is empty".format(q=from_queue.name)
                    logger.debug(msg)
                    break

        # disconnect
        queue_consumer.cancel()
项目:isc    作者:and3rson    | 项目源码 | 文件源码
def _create_fanout_exchange(self, Consumer, channel):
        """
        Creates a fanout queue to accept notifications.
        """
        # declare_kwargs = dict(
        #     exchange='{}_fanout'.format(self.exchange)
        # )
        # if PIKA_VERSION >= StrictVersion('0.10.0'):
        #     kwarg = 'exchange_type'
        # else:
        #     kwarg = 'type'
        # declare_kwargs[kwarg] = 'fanout'
        exchange_name = '{}_fanout'.format(self.exchange)
        log.debug('Declaring fanout exchange %s', exchange_name)
        exchange = kombu.Exchange(
            name=exchange_name,
            channel=channel,
            durable=False,
            type='fanout'
        )
        exchange.declare()
        queue_name = 'fanout_callback_{}'.format(uuid.uuid4())
        log.debug('Declaring fanout queue %s', queue_name)
        queue = kombu.Queue(
            name=queue_name,
            exchange=exchange,
            exclusive=True,
            durable=False,
            channel=channel
        )
        queue.declare()
        consumer = Consumer(
            # self.connection,
            queues=[queue],
            on_message=self._on_broadcast,
            no_ack=True
            # no_ack=True
        )
        # consumer.consume(no_ack=True)
        # channel.exchange_declare(**declare_kwargs)
        # fanout_queue = channel.queue_declare(exclusive=True)
        # channel.queue_bind(exchange='{}_fanout'.format(self.exchange), queue=fanout_queue.method.queue)

        # channel.basic_consume(self._on_broadcast, queue=fanout_queue.method.queue, no_ack=True)
        return consumer