我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用kombu.Producer()。
def run(self): self._is_running = True while self._is_running: if self.consumer.is_connected(): producer = kombu.Producer(self.consumer._channel, on_return=self.consumer._handle_return) try: queued_request = self._out_queue.get(timeout=0.5) if True: # with kombu.producers[self.consumer.get_connection()].acquire(block=True) as producer: # producer.on_return = print try: self._dispatch_request(queued_request, producer) except Exception as e: # except ConnectionResetError: log.debug('Failed to dispatch request, re-enqueueing again, error was: {}'.format( str(e) )) self.enqueue(queued_request) except Empty: continue else: sleep(0.5) log.debug('Waiting for consumer to be ready...')
def connect(self, exchange, channel): # pragma: no cover """ Readies the StorageNotify for publishing notification messages by setting up a kombu.Producer. :param exchange: The exchange for publishing notifications. :type exchange: kombu.Exchange :param channel: The channel to bind to. :type channel: kombu.transport.base.StdChannel """ name = self.__class__.__name__ self.logger.debug('Connecting {}'.format(name)) self._queue = kombu.Queue(exchange=exchange, channel=channel) self._queue.declare() self._producer = kombu.Producer(channel, exchange)
def send(self, topic, message): """Publishes a pulse message to the proper exchange.""" if not message: Log.error("Expecting a message") message._prepare() if not self.connection: self.connect() producer = Producer( channel=self.connection, exchange=Exchange(self.settings.exchange, type='topic'), routing_key=topic ) # The message is actually a simple envelope format with a payload and # some metadata. final_data = Data( payload=message.data, _meta=set_default({ 'exchange': self.settings.exchange, 'routing_key': message.routing_key, 'serializer': self.settings.serializer, 'sent': time_to_string(datetime.datetime.now(timezone(self.settings.broker_timezone))), 'count': self.count }, message.metadata) ) producer.publish(jsons.scrub(final_data), serializer=self.settings.serializer) self.count += 1
def send(self, message: BrightsideMessage): # we want to expose our logger to the functions defined in inner scope, so put it in their outer scope logger = self._logger def _build_message_header(msg: BrightsideMessage) -> Dict: return KombuMessageFactory(msg).create_message_header() def _publish(sender: Producer) -> None: logger.debug("Send message {body} to broker {amqpuri} with routing key {routing_key}" .format(body=message, amqpuri=self._amqp_uri, routing_key=message.header.topic)) sender.publish(message.body.bytes, headers=_build_message_header(message), exchange=self._exchange, content_type="text/plain", routing_key=message.header.topic, declare=[self._exchange]) def _error_callback(e, interval) -> None: logger.debug('Publishing error: {e}. Will retry in {interval} seconds', e, interval) self._logger.debug("Connect to broker {amqpuri}".format(amqpuri=self._amqp_uri)) with connections[self._cnx].acquire(block=True) as conn: with Producer(conn) as producer: ensure_kwargs = self.RETRY_OPTIONS.copy() ensure_kwargs['errback'] = _error_callback safe_publish = conn.ensure(producer, _publish, **ensure_kwargs) safe_publish(producer)
def publish_message_to_queue(self, queue): uid = utils.generate_uuid() producer = kombu.Producer(channel=self.ch, routing_key=queue) producer.publish(uid) return {'queue': queue, 'id': uid}
def _make_publisher(self): bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**self.config) bus_connection = Connection(bus_url) bus_exchange = Exchange(self.config['exchange_name'], type=self.config['exchange_type']) bus_producer = Producer(bus_connection, exchange=bus_exchange, auto_declare=True) bus_marshaler = Marshaler(self._uuid) return Publisher(bus_producer, bus_marshaler)
def _make_publisher(self): bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**self.config) bus_connection = Connection(bus_url) same_exchange_arguments_as_collectd = {'arguments': {'auto_delete': True}, 'durable': False} bus_exchange = Exchange(self.config['exchange_name'], type=self.config['exchange_type'], **same_exchange_arguments_as_collectd) bus_producer = Producer(bus_connection, exchange=bus_exchange, auto_declare=True) bus_marshaler = CollectdMarshaler(self._uuid) return Publisher(bus_producer, bus_marshaler)
def send_event(self, event, routing_key): with Connection(self._url) as connection: producer = Producer(connection, exchange=BUS_EXCHANGE_XIVO, auto_declare=True) producer.publish(json.dumps(event), routing_key=routing_key, content_type='application/json')
def connect(self): """ 'Connects' to the bus. :returns: The same instance. :rtype: commissaire_http.bus.Bus """ if self.connection is not None: self.logger.warn('Bus already connected.') return self self.connection = Connection(self.connection_url) self._channel = self.connection.channel() self._exchange = Exchange( self.exchange_name, type='topic').bind(self._channel) self._exchange.declare() # Create queues self._queues = [] for kwargs in self.qkwargs: queue = Queue(**kwargs) queue.exchange = self._exchange queue = queue.bind(self._channel) self._queues.append(queue) self.logger.debug('Created queue %s', queue.as_dict()) # Create producer for publishing on topics self.producer = Producer(self._channel, self._exchange) self.logger.debug('Bus connection finished') return self
def __init__(self, connection): self.connection = connection retry_exchange = self._configure_retry_exchanges(self.connection) dead_exchange = self._configure_dead_exchange(self.connection) self._retry_producer = Producer(self.connection, exchange=retry_exchange) self._dead_producer = Producer(self.connection, exchange=dead_exchange)
def Producer(self, *args, **kwargs): kwargs['deadline_pulse_url'] = self.app.conf.get('deadline_pulse_url') print("deadline_pulse_url %s" % kwargs['deadline_pulse_url']) kwargs['deadline_mongo_url'] = self.mongo_url return DeadlineProducer(*args, **kwargs)
def producer_pool(self): if self._producer_pool is None: self._producer_pool = kombu.pools.producers[ self.app.connection_for_write()] self._producer_pool.limit = self.app.pool.limit # TODO: submit this patch to celery: self._producer_pool.Producer = self.Producer return self._producer_pool # tasks --- # FIXME: look into global tasks, which get added to all apps automatically
def start_connection(self): """ reset the connection to rabbit mq :return: """ logger = logging.getLogger(self.__class__.__name__) logger.info("starting new rabbit mq connection") # todo: do we need to make confirm_publish configurable? self._conn = Connection(self.get_config().rabbitmq_url, transport_options={'confirm_publish': True}) # setup producer to push to error and dlqs self._producer = Producer(channel=self._conn.channel(), exchange=self._orchestrator.get_exchange())
def test_search_timeline(self): self.path = "/sfm-data/collection_set/test_collection/test_3" harvest_msg = { "id": "test:3", "type": "weibo_timeline", "path": self.path, "credentials": { "access_token": tests.WEIBO_ACCESS_TOKEN }, "collection_set": { "id": "test_collection_set" }, "collection": { "id": "test_collection" }, "options": { "web_resources": True, "image_sizes": [ "Thumbnail", "Medium", "Large" ] } } with self._create_connection() as connection: bound_exchange = self.exchange(connection) producer = Producer(connection, exchange=bound_exchange) producer.publish(harvest_msg, routing_key="harvest.start.weibo.weibo_timeline") # Now wait for status message. status_msg = self._wait_for_message(self.result_queue, connection) # Matching ids self.assertEqual("test:3", status_msg["id"]) # Running self.assertEqual(STATUS_RUNNING, status_msg["status"]) # Another running message status_msg = self._wait_for_message(self.result_queue, connection) self.assertEqual(STATUS_RUNNING, status_msg["status"]) # Now wait for result message. result_msg = self._wait_for_message(self.result_queue, connection) # Matching ids self.assertEqual("test:3", result_msg["id"]) # Success self.assertEqual(STATUS_SUCCESS, result_msg["status"]) # Some weibo posts self.assertTrue(result_msg["stats"][date.today().isoformat()]["weibos"]) # Web harvest message. web_harvest_msg = self._wait_for_message(self.web_harvest_queue, connection) # Some seeds self.assertTrue(len(web_harvest_msg["seeds"])) # Warc created message. warc_msg = self._wait_for_message(self.warc_created_queue, connection) # check path exist self.assertTrue(os.path.isfile(warc_msg["warc"]["path"]))
def __init__( self, exchange_name, connection_url, qkwargs, config_file=None): """ Initializes a new Service instance. :param exchange_name: Name of the topic exchange. :type exchange_name: str :param connection_url: Kombu connection url. :type connection_url: str :param qkwargs: One or more dicts keyword arguments for queue creation :type qkwargs: list :param config_file: Path to the configuration file location. :type config_file: str or None """ name = self.__class__.__name__ self.logger = logging.getLogger(name) self.logger.debug('Initializing {}'.format(name)) # If we are given no default, use the global one # Read the configuration file self._config_data = read_config_file( config_file, self._default_config_file) if connection_url is None and 'bus_uri' in self._config_data: connection_url = self._config_data.get('bus_uri') self.logger.debug( 'Using connection_url=%s from config file', connection_url) if exchange_name is None and 'exchange_name' in self._config_data: self.logger.debug( 'Using exchange_name=%s from config file', exchange_name) exchange_name = self._config_data.get('bus_exchange') self.connection = Connection(connection_url) self._channel = self.connection.default_channel self._exchange = Exchange( exchange_name, type='topic').bind(self._channel) self._exchange.declare() # Set up queues self._queues = [] for kwargs in qkwargs: queue = Queue(**kwargs) queue.exchange = self._exchange queue = queue.bind(self._channel) self._queues.append(queue) self.logger.debug(queue.as_dict()) # Create producer for publishing on topics self.producer = Producer(self._channel, self._exchange) self.logger.debug('Initializing of {} finished'.format(name))
def setup_entities(self): """ declare all required entities no advanced error handling yet (like error on declaration with altered properties etc) """ # return if already inited if self._service_inited: return # setup exchange self._booking_exchange = Exchange(self._config.get_mq_config(constants.EXCHANGE), type='topic', durable=True) # setup durable queues self.work_queue = Queue(self._config.get_mq_config(constants.WORK_QUEUE), exchange=self._booking_exchange, routing_key=constants.WORK_QUEUE + ".#", durable=True) self.retry_queue = Queue(self._config.get_mq_config(constants.RETRY_QUEUE), exchange=self._booking_exchange, routing_key=constants.RETRY_QUEUE + ".#", durable=True) self.dlq_queue = Queue(self._config.get_mq_config(constants.DEAD_LETTER_QUEUE), exchange=self._booking_exchange, routing_key=constants.DEAD_LETTER_QUEUE + ".#", durable=True) # a buffer queue is needed by error-queue-consumer to temp-buffer msgs for processing # this is to handle retry loop which may cause between retry-queue and work-queue. # todo: Need to implement an alternive as this has a copy overhead # which can be significant when the error queue is large self.buffer_queue = Queue(name=self._config.get_mq_config(constants.BUFFER_QUEUE), exchange=self._booking_exchange, routing_key='buffer.#', durable=True) # todo: do we need to make confirm_publish configurable? self._conn = Connection(self.get_config().rabbitmq_url, transport_options={'confirm_publish': True}) # declare all the exchanges and queues needed (declare, not overwrite existing) for entity in [self._booking_exchange, self.work_queue, self.retry_queue, self.dlq_queue, self.buffer_queue]: entity.maybe_bind(self._conn) entity.declare() # setup producer to push to error and dlqs self._producer = Producer(channel=self._conn.channel(), exchange=self._booking_exchange) self._service_inited = True