Python kombu 模块,Producer() 实例源码

我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用kombu.Producer()

项目:isc    作者:and3rson    | 项目源码 | 文件源码
def run(self):
        self._is_running = True
        while self._is_running:
            if self.consumer.is_connected():
                producer = kombu.Producer(self.consumer._channel, on_return=self.consumer._handle_return)
                try:
                    queued_request = self._out_queue.get(timeout=0.5)
                    if True:
                        # with kombu.producers[self.consumer.get_connection()].acquire(block=True) as producer:
                        # producer.on_return = print
                        try:
                            self._dispatch_request(queued_request, producer)
                        except Exception as e:
                            # except ConnectionResetError:
                            log.debug('Failed to dispatch request, re-enqueueing again, error was: {}'.format(
                                str(e)
                            ))
                            self.enqueue(queued_request)
                except Empty:
                    continue
            else:
                sleep(0.5)
                log.debug('Waiting for consumer to be ready...')
项目:commissaire    作者:projectatomic    | 项目源码 | 文件源码
def connect(self, exchange, channel):  # pragma: no cover
        """
        Readies the StorageNotify for publishing notification messages by
        setting up a kombu.Producer.

        :param exchange: The exchange for publishing notifications.
        :type exchange: kombu.Exchange
        :param channel: The channel to bind to.
        :type channel: kombu.transport.base.StdChannel
        """
        name = self.__class__.__name__
        self.logger.debug('Connecting {}'.format(name))

        self._queue = kombu.Queue(exchange=exchange, channel=channel)
        self._queue.declare()

        self._producer = kombu.Producer(channel, exchange)
项目:jx-sqlite    作者:mozilla    | 项目源码 | 文件源码
def send(self, topic, message):
        """Publishes a pulse message to the proper exchange."""

        if not message:
            Log.error("Expecting a message")

        message._prepare()

        if not self.connection:
            self.connect()

        producer = Producer(
            channel=self.connection,
            exchange=Exchange(self.settings.exchange, type='topic'),
            routing_key=topic
        )

        # The message is actually a simple envelope format with a payload and
        # some metadata.
        final_data = Data(
            payload=message.data,
            _meta=set_default({
                'exchange': self.settings.exchange,
                'routing_key': message.routing_key,
                'serializer': self.settings.serializer,
                'sent': time_to_string(datetime.datetime.now(timezone(self.settings.broker_timezone))),
                'count': self.count
            }, message.metadata)
        )

        producer.publish(jsons.scrub(final_data), serializer=self.settings.serializer)
        self.count += 1
项目:Brightside    作者:BrighterCommand    | 项目源码 | 文件源码
def send(self, message: BrightsideMessage):
        # we want to expose our logger to the functions defined in inner scope, so put it in their outer scope

        logger = self._logger

        def _build_message_header(msg: BrightsideMessage) -> Dict:
            return KombuMessageFactory(msg).create_message_header()

        def _publish(sender: Producer) -> None:
            logger.debug("Send message {body} to broker {amqpuri} with routing key {routing_key}"
                         .format(body=message, amqpuri=self._amqp_uri, routing_key=message.header.topic))
            sender.publish(message.body.bytes,
                           headers=_build_message_header(message),
                           exchange=self._exchange,
                           content_type="text/plain",
                           routing_key=message.header.topic,
                           declare=[self._exchange])

        def _error_callback(e, interval) -> None:
            logger.debug('Publishing error: {e}. Will retry in {interval} seconds', e, interval)

        self._logger.debug("Connect to broker {amqpuri}".format(amqpuri=self._amqp_uri))

        with connections[self._cnx].acquire(block=True) as conn:
            with Producer(conn) as producer:
                ensure_kwargs = self.RETRY_OPTIONS.copy()
                ensure_kwargs['errback'] = _error_callback
                safe_publish = conn.ensure(producer, _publish, **ensure_kwargs)
                safe_publish(producer)
项目:fuel-ccp-tests    作者:openstack    | 项目源码 | 文件源码
def publish_message_to_queue(self, queue):
        uid = utils.generate_uuid()
        producer = kombu.Producer(channel=self.ch, routing_key=queue)
        producer.publish(uid)
        return {'queue': queue, 'id': uid}
项目:xivo-ctid-ng    作者:wazo-pbx    | 项目源码 | 文件源码
def _make_publisher(self):
        bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**self.config)
        bus_connection = Connection(bus_url)
        bus_exchange = Exchange(self.config['exchange_name'], type=self.config['exchange_type'])
        bus_producer = Producer(bus_connection, exchange=bus_exchange, auto_declare=True)
        bus_marshaler = Marshaler(self._uuid)
        return Publisher(bus_producer, bus_marshaler)
项目:xivo-ctid-ng    作者:wazo-pbx    | 项目源码 | 文件源码
def _make_publisher(self):
        bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**self.config)
        bus_connection = Connection(bus_url)
        same_exchange_arguments_as_collectd = {'arguments': {'auto_delete': True}, 'durable': False}
        bus_exchange = Exchange(self.config['exchange_name'],
                                type=self.config['exchange_type'],
                                **same_exchange_arguments_as_collectd)
        bus_producer = Producer(bus_connection, exchange=bus_exchange, auto_declare=True)
        bus_marshaler = CollectdMarshaler(self._uuid)
        return Publisher(bus_producer, bus_marshaler)
项目:xivo-ctid-ng    作者:wazo-pbx    | 项目源码 | 文件源码
def send_event(self, event, routing_key):
        with Connection(self._url) as connection:
            producer = Producer(connection, exchange=BUS_EXCHANGE_XIVO, auto_declare=True)
            producer.publish(json.dumps(event), routing_key=routing_key, content_type='application/json')
项目:commissaire-http    作者:projectatomic    | 项目源码 | 文件源码
def connect(self):
        """
        'Connects' to the bus.

        :returns: The same instance.
        :rtype: commissaire_http.bus.Bus
        """
        if self.connection is not None:
            self.logger.warn('Bus already connected.')
            return self

        self.connection = Connection(self.connection_url)
        self._channel = self.connection.channel()
        self._exchange = Exchange(
            self.exchange_name, type='topic').bind(self._channel)
        self._exchange.declare()

        # Create queues
        self._queues = []
        for kwargs in self.qkwargs:
            queue = Queue(**kwargs)
            queue.exchange = self._exchange
            queue = queue.bind(self._channel)
            self._queues.append(queue)
            self.logger.debug('Created queue %s', queue.as_dict())

        # Create producer for publishing on topics
        self.producer = Producer(self._channel, self._exchange)
        self.logger.debug('Bus connection finished')
        return self
项目:almanach    作者:internap    | 项目源码 | 文件源码
def __init__(self, connection):
        self.connection = connection
        retry_exchange = self._configure_retry_exchanges(self.connection)
        dead_exchange = self._configure_dead_exchange(self.connection)
        self._retry_producer = Producer(self.connection, exchange=retry_exchange)
        self._dead_producer = Producer(self.connection, exchange=dead_exchange)
项目:celery-deadline    作者:chadrik    | 项目源码 | 文件源码
def Producer(self, *args, **kwargs):
        kwargs['deadline_pulse_url'] = self.app.conf.get('deadline_pulse_url')
        print("deadline_pulse_url %s" % kwargs['deadline_pulse_url'])
        kwargs['deadline_mongo_url'] = self.mongo_url
        return DeadlineProducer(*args, **kwargs)
项目:celery-deadline    作者:chadrik    | 项目源码 | 文件源码
def producer_pool(self):
        if self._producer_pool is None:
            self._producer_pool = kombu.pools.producers[
                self.app.connection_for_write()]
            self._producer_pool.limit = self.app.pool.limit
            # TODO: submit this patch to celery:
            self._producer_pool.Producer = self.Producer
        return self._producer_pool


# tasks ---

# FIXME: look into global tasks, which get added to all apps automatically
项目:EasyJobLite    作者:treebohotels    | 项目源码 | 文件源码
def start_connection(self):
        """
        reset the connection to rabbit mq
        :return: 
        """
        logger = logging.getLogger(self.__class__.__name__)
        logger.info("starting new rabbit mq connection")

        # todo: do we need to make confirm_publish configurable?
        self._conn = Connection(self.get_config().rabbitmq_url,
                                transport_options={'confirm_publish': True})

        # setup producer to push to error and dlqs
        self._producer = Producer(channel=self._conn.channel(),
                                  exchange=self._orchestrator.get_exchange())
项目:sfm-weibo-harvester    作者:gwu-libraries    | 项目源码 | 文件源码
def test_search_timeline(self):
        self.path = "/sfm-data/collection_set/test_collection/test_3"
        harvest_msg = {
            "id": "test:3",
            "type": "weibo_timeline",
            "path": self.path,
            "credentials": {
                "access_token": tests.WEIBO_ACCESS_TOKEN
            },
            "collection_set": {
                "id": "test_collection_set"
            },
            "collection": {
                "id": "test_collection"
            },
            "options": {
                "web_resources": True,
                "image_sizes": [
                    "Thumbnail",
                    "Medium",
                    "Large"
                ]
            }
        }
        with self._create_connection() as connection:
            bound_exchange = self.exchange(connection)
            producer = Producer(connection, exchange=bound_exchange)
            producer.publish(harvest_msg, routing_key="harvest.start.weibo.weibo_timeline")

            # Now wait for status message.
            status_msg = self._wait_for_message(self.result_queue, connection)
            # Matching ids
            self.assertEqual("test:3", status_msg["id"])
            # Running
            self.assertEqual(STATUS_RUNNING, status_msg["status"])

            # Another running message
            status_msg = self._wait_for_message(self.result_queue, connection)
            self.assertEqual(STATUS_RUNNING, status_msg["status"])

            # Now wait for result message.
            result_msg = self._wait_for_message(self.result_queue, connection)
            # Matching ids
            self.assertEqual("test:3", result_msg["id"])
            # Success
            self.assertEqual(STATUS_SUCCESS, result_msg["status"])

            # Some weibo posts
            self.assertTrue(result_msg["stats"][date.today().isoformat()]["weibos"])

            # Web harvest message.
            web_harvest_msg = self._wait_for_message(self.web_harvest_queue, connection)
            # Some seeds
            self.assertTrue(len(web_harvest_msg["seeds"]))

            # Warc created message.
            warc_msg = self._wait_for_message(self.warc_created_queue, connection)
            # check path exist
            self.assertTrue(os.path.isfile(warc_msg["warc"]["path"]))
项目:commissaire-service    作者:projectatomic    | 项目源码 | 文件源码
def __init__(
            self, exchange_name, connection_url, qkwargs, config_file=None):
        """
        Initializes a new Service instance.

        :param exchange_name: Name of the topic exchange.
        :type exchange_name: str
        :param connection_url: Kombu connection url.
        :type connection_url: str
        :param qkwargs: One or more dicts keyword arguments for queue creation
        :type qkwargs: list
        :param config_file: Path to the configuration file location.
        :type config_file: str or None
        """
        name = self.__class__.__name__
        self.logger = logging.getLogger(name)
        self.logger.debug('Initializing {}'.format(name))

        # If we are given no default, use the global one
        # Read the configuration file
        self._config_data = read_config_file(
            config_file, self._default_config_file)

        if connection_url is None and 'bus_uri' in self._config_data:
            connection_url = self._config_data.get('bus_uri')
            self.logger.debug(
                'Using connection_url=%s from config file', connection_url)
        if exchange_name is None and 'exchange_name' in self._config_data:
            self.logger.debug(
                'Using exchange_name=%s from config file', exchange_name)
            exchange_name = self._config_data.get('bus_exchange')

        self.connection = Connection(connection_url)
        self._channel = self.connection.default_channel
        self._exchange = Exchange(
            exchange_name, type='topic').bind(self._channel)
        self._exchange.declare()

        # Set up queues
        self._queues = []
        for kwargs in qkwargs:
            queue = Queue(**kwargs)
            queue.exchange = self._exchange
            queue = queue.bind(self._channel)
            self._queues.append(queue)
            self.logger.debug(queue.as_dict())

        # Create producer for publishing on topics
        self.producer = Producer(self._channel, self._exchange)
        self.logger.debug('Initializing of {} finished'.format(name))
项目:EasyJobLite    作者:treebohotels    | 项目源码 | 文件源码
def setup_entities(self):
        """
        declare all required entities
        no advanced error handling yet (like error on declaration with altered properties etc)
        """
        # return if already inited
        if self._service_inited:
            return

        # setup exchange
        self._booking_exchange = Exchange(self._config.get_mq_config(constants.EXCHANGE),
                                          type='topic',
                                          durable=True)

        # setup durable queues
        self.work_queue = Queue(self._config.get_mq_config(constants.WORK_QUEUE),
                                exchange=self._booking_exchange,
                                routing_key=constants.WORK_QUEUE + ".#",
                                durable=True)

        self.retry_queue = Queue(self._config.get_mq_config(constants.RETRY_QUEUE),
                                 exchange=self._booking_exchange,
                                 routing_key=constants.RETRY_QUEUE + ".#",
                                 durable=True)

        self.dlq_queue = Queue(self._config.get_mq_config(constants.DEAD_LETTER_QUEUE),
                               exchange=self._booking_exchange,
                               routing_key=constants.DEAD_LETTER_QUEUE + ".#",
                               durable=True)

        # a buffer queue is needed by error-queue-consumer to temp-buffer msgs for processing
        # this is to handle retry loop which may cause between retry-queue and work-queue.
        # todo: Need to implement an alternive as this has a copy overhead
        #  which can be significant when the error queue is large
        self.buffer_queue = Queue(name=self._config.get_mq_config(constants.BUFFER_QUEUE),
                                  exchange=self._booking_exchange,
                                  routing_key='buffer.#',
                                  durable=True)

        # todo: do we need to make confirm_publish configurable?
        self._conn = Connection(self.get_config().rabbitmq_url,
                                transport_options={'confirm_publish': True})

        # declare all the exchanges and queues needed (declare, not overwrite existing)
        for entity in [self._booking_exchange, self.work_queue, self.retry_queue,
                       self.dlq_queue, self.buffer_queue]:
            entity.maybe_bind(self._conn)
            entity.declare()

        # setup producer to push to error and dlqs
        self._producer = Producer(channel=self._conn.channel(),
                                  exchange=self._booking_exchange)

        self._service_inited = True