我们从Python开源项目中,提取了以下43个代码示例,用于说明如何使用kombu.Queue()。
def setUp(self): self.exchange = Exchange(EXCHANGE, type="topic") self.result_queue = Queue(name="result_queue", routing_key="harvest.status.weibo.*", exchange=self.exchange, durable=True) self.web_harvest_queue = Queue(name="web_harvest_queue", routing_key="harvest.start.web", exchange=self.exchange) self.warc_created_queue = Queue(name="warc_created_queue", routing_key="warc_created", exchange=self.exchange) weibo_harvester_queue = Queue(name="weibo_harvester", exchange=self.exchange) with self._create_connection() as connection: self.result_queue(connection).declare() self.result_queue(connection).purge() self.web_harvest_queue(connection).declare() self.web_harvest_queue(connection).purge() self.warc_created_queue(connection).declare() self.warc_created_queue(connection).purge() # avoid raise NOT_FOUND error 404 weibo_harvester_queue(connection).declare() weibo_harvester_queue(connection).purge() self.path = None
def check_queue_message(self, message): q = kombu.Queue(message['queue'], channel=self.ch) msg = q.get(True) assert msg.body in message['id'],\ "Message body is {}, expected {}".format(msg.body, message['id'])
def declare_exchange(self, name, type='direct', queues=None, **options): """Create or update exchange :param name: name of exchange :type name: str :param type: type of exchange - direct, fanout, topic, match :type type: str :param queues: list of queues with routing keys: [[queue_name, routing_key], [queue_name, routing_key], ...] :type queues: list, None or tuple :param options: additional options for Exchange creation """ if queues is None: queues = [] # pragma: no cover with self.connections[self.connection].acquire() as conn: exchange = Exchange(name, type=type, channel=conn, **options) exchange.declare() self.exchanges[name] = exchange for q_name, routing_key in queues: queue = Queue(name=q_name, channel=conn) queue.declare() queue.bind_to(exchange=name, routing_key=routing_key) self.logger.debug('Queue "%s" with routing_key "%s" was bond to exchange "%s"', q_name, routing_key if routing_key else q_name, name)
def add_queue_rule(self, handler, name, autoack=True, prefetch_size=0, prefetch_count=0, **kwargs): """Add queue rule to Microservice :param prefetch_count: count of messages for getting from mq :param prefetch_size: size in bytes for getting data from mq :param handler: function for handling messages :param autoack: if True message.ack() after callback :type handler: callable object :param name: name of queue :type name: str """ rule = Rule(name, handler, self.logger, autoack=autoack, **kwargs) consumer = Consumer(self.connection, queues=[Queue(rule.name)], callbacks=[rule.callback], auto_declare=True) consumer.qos(prefetch_count=prefetch_count, prefetch_size=prefetch_size) self.consumers.append(consumer) self.logger.debug('Rule "%s" added!', rule.name)
def consume(self): def worker(event, message): page_id = event['recipient']['id'] bot_class, bot_args = self.get_bot(page_id) p = Process(target=spawn_bot_amqp, args=(bot_class, bot_args, self.transport, self.send_exchange, self.send_queue, event, message)) p.start() def stop_worker(signum, frame): p.terminate() p.join() signal.signal(signal.SIGTERM, stop_worker) exchange = Exchange(self.exchange, 'direct', durable=True) queue = Queue(self.queue, exchange=exchange, routing_key=self.queue) with Connection(self.transport) as conn: with conn.Consumer(queue, callbacks=[worker]) as consumer: while True: conn.drain_events()
def sender(self): def worker(event, message): p = Process(target=spawn_send_message_worker, args=(event, message)) p.start() def stop_worker(signum, frame): p.terminate() p.join() signal.signal(signal.SIGTERM, stop_worker) exchange = Exchange(self.send_exchange, 'direct', durable=True) queue = Queue(self.send_queue, exchange=exchange, routing_key=self.send_queue) with Connection(self.send_transport) as conn: with conn.Consumer(queue, callbacks=[worker]) as consumer: while True: conn.drain_events()
def connect(self, exchange, channel): # pragma: no cover """ Readies the StorageNotify for publishing notification messages by setting up a kombu.Producer. :param exchange: The exchange for publishing notifications. :type exchange: kombu.Exchange :param channel: The channel to bind to. :type channel: kombu.transport.base.StdChannel """ name = self.__class__.__name__ self.logger.debug('Connecting {}'.format(name)) self._queue = kombu.Queue(exchange=exchange, channel=channel) self._queue.declare() self._producer = kombu.Producer(channel, exchange)
def _configure_dead_exchange(self, connection): def declare_dead_queue(): channel = connection.channel() dead_exchange = Exchange(name=config.rabbitmq_dead_exchange(), type='direct', channel=channel) dead_queue = Queue(name=config.rabbitmq_dead_queue(), routing_key=config.rabbitmq_routing_key(), exchange=dead_exchange, channel=channel) dead_queue.declare() return dead_exchange def error_callback(exception, interval): logging.error('Failed to declare dead queue and exchange, retrying in %d seconds. %r' % (interval, exception)) declare_dead_queue = connection.ensure(connection, declare_dead_queue, errback=error_callback, interval_start=0, interval_step=5, interval_max=30) return declare_dead_queue()
def get_consumers(self, consumer, channel): consumers = [] exchanges = DNS_CONF["exchanges"] exchanges = exchanges.split(",") for exch in exchanges: exchange = Exchange(exch, type="topic", durable=False) queue = Queue(DNS_CONF["queue_name"], exchange, routing_key=DNS_CONF["routing_key"], durable=False, auto_delete=True, no_ack=True) consumers.append(consumer(queue, callbacks=[self.on_message])) return consumers
def __init__(self, connection: Connection, configuration: BrightsideConsumerConfiguration, logger: logging.Logger=None) -> None: self._exchange = Exchange(connection.exchange, type=connection.exchange_type, durable=connection.is_durable) self._routing_key = configuration.routing_key self._amqp_uri = connection.amqp_uri self._queue_name = configuration.queue_name self._routing_key = configuration.routing_key self._prefetch_count = configuration.prefetch_count self._is_durable = configuration.is_durable self._message_factory = ArameMessageFactory() self._logger = logger or logging.getLogger(__name__) self._queue = Queue(self._queue_name, exchange=self._exchange, routing_key=self._routing_key) self._msg = None # Kombu Message self._message = None # Brightside Message # TODO: Need to fix the argument types with default types issue
def _create_queues(self, queue_list): for topic in queue_list: self.queues.append( Queue( name=self.queue_name, exchange=self.exchange, routing_key=topic, durable=False, exclusive=True, no_ack=True ) )
def create_queue(self): test_queue = 'test-rabbit-{}'.format(utils.rand_name()) q = kombu.Queue(test_queue, channel=self.ch, durable=False, queue_arguments={"x-expires": 15 * 60 * 1000}) q.declare() return test_queue
def delete_queue(self, queue): q = kombu.Queue(queue, channel=self.ch) q.delete()
def __init__(self, client, name, logger=None): """Initialization :param client: instance of client :type client: Client :param name: name of queue :type name: str """ self.client = client self.name = name if logger is None: logger = _logger # pragma: no cover self.logger = logger self.logger.debug('Queue "%s" built', self.name)
def purge_queue(self, name): """Remove all messages from queue :param name: name of queue :type name: str """ connections = pools.Connections(self.limit) with connections[self.connection].acquire() as conn: Queue(name=name, channel=conn).purge() self.logger.debug('Queue "%s" was purged', name)
def delete_queue(self, name): """Delete queue by name :param name: name of queue :type name: str """ with self.connections[self.connection].acquire() as conn: Queue(name=name, channel=conn).delete() self.logger.debug('Queue "%s" was deleted', name)
def _create_callback_queue(self, channel, exchange): name = 'response-{}'.format(uuid.uuid4()) callback_queue = kombu.Queue( name=name, exchange=exchange, routing_key=name, exclusive=True, channel=self._channel ) callback_queue.declare() return callback_queue
def __init__(self, consumer): self.consumer = proxy(consumer) super(PublisherThread, self).__init__() self.daemon = True self._out_queue = Queue() self._is_running = False
def _create_service_queues(self, services, Consumer, channel): """ Creates necessary AMQP queues, one per service. """ log.debug('Declaring exchange %s', self.exchange) exchange = kombu.Exchange( self.exchange, channel=channel, durable=False ) exchange.declare() # channel.exchange_declare(exchange=self.exchange) queues = [] for service in services.values(): queue_name = '{}_service_{}'.format(self.exchange, service.name) log.debug('Declaring service queue %s', queue_name) queue = kombu.Queue( channel=channel, name=queue_name, exchange=exchange, routing_key=queue_name, exclusive=False, durable=False, # channel=channel ) queue.declare() queues.append(queue) # channel.queue_delete(queue=queue) # channel.queue_declare(queue=queue, auto_delete=True) # channel.queue_bind(queue, self.exchange) # channel.basic_consume(self._on_message, queue=queue, no_ack=False) consumer = Consumer( # self.connection, queues=queues, on_message=self._on_message, no_ack=False ) # consumer.consume(no_ack=False) return consumer
def _queue(self): queue_name = 'flask-socketio.' + str(uuid.uuid4()) return kombu.Queue(queue_name, self._exchange(), queue_arguments={'x-expires': 300000})
def get_consumers(self, consumer, channel): exchange = Exchange(self.nova_exchange, type="topic", durable=False) queue = Queue(self.queue_name, exchange, routing_key=self.routing_key, durable=False, auto_delete=True, no_ack=True) return [consumer(queue, callbacks=[self.handle_notification])]
def __init__(self, global_config): self._events_pubsub = Pubsub() self._bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**global_config['bus']) self._exchange = Exchange(global_config['bus']['exchange_name'], type=global_config['bus']['exchange_type']) self._queue = kombu.Queue(exclusive=True) self._is_running = False
def listen_events(self, routing_key, exchange=BUS_EXCHANGE_XIVO): with Connection(self._url) as conn: queue = Queue(BUS_QUEUE_NAME, exchange=exchange, routing_key=routing_key, channel=conn.channel()) queue.declare() queue.purge() self.bus_queue = queue
def __init__(self, hosts_conf, exchange_name='', exchange_type='', exchange_arguments=None, queue_name='', routing_key='', queue_arguments=None, callback=None, no_ack=True): self.hosts_conf = hosts_conf self.hosts = self.create_hosts() self.connection = Connection(self.hosts) self.task_exchange = Exchange(name=exchange_name, type=exchange_type, arguments=exchange_arguments) self.task_queues = [Queue(name=queue_name, exchange=self.task_exchange, routing_key=routing_key, queue_arguments=queue_arguments)] self.callback = callback self.no_ack = no_ack
def queue_size(self, queue_list, queue_arguments=None): result = dict() for i in queue_list: queue_size = self.connection.SimpleQueue(name=Queue(name=i, queue_arguments=queue_arguments)).qsize() result[i] = queue_size return result
def queue_send(self, recipient, message=None, sender_action=None): exchange = Exchange(self.send_exchange, 'direct', durable=True) queue = Queue(self.send_queue, exchange=exchange, routing_key=self.send_queue) with Connection(self.send_transport) as conn: producer = conn.Producer(serializer='json') event = { 'recipient': recipient, 'message': message, 'sender_action': sender_action, 'page_access_token': self.page_access_token } producer.publish(event, exchange=exchange, routing_key=queue.routing_key, declare=[queue])
def queue_events(self, events): exchange = Exchange(self.exchange, 'direct', durable=True) queue = Queue(self.queue, exchange=exchange, routing_key=self.queue) with Connection(self.transport) as conn: producer = conn.Producer(serializer='json') for event in events: producer.publish(event, exchange=exchange, routing_key=queue.routing_key, declare=[queue])
def _queue(self, conn=None): exchange = kombu.Exchange(self.channel, type='fanout', durable=False) queue = kombu.Queue(str(uuid.uuid4()), exchange) return queue
def get_consumers(self, Consumer, channel): """ Returns a list of kombu.Consumer instances to service all registered notification callbacks. If using the kombu.mixin.ConsumerMixin mixin class, these instances should be included in its get_consumers() method. :param Consumer: Message consumer class. :type Consumer: class :param channel: An open channel. :type channel: kombu.transport.*.Channel :returns: A list of consumer instances :rtype: [kombu.Consumer, ....] """ consumer_list = [] exchange = self.bus_mixin.producer.exchange for routing_key, callbacks in self.notify_callbacks.items(): queue = kombu.Queue( exchange=exchange, routing_key=routing_key) consumer = Consumer( queues=queue, callbacks=callbacks) consumer_list.append(consumer) self.bus_mixin.logger.info( 'Listening for "%s" notifications', routing_key) return consumer_list
def connect(self): """ 'Connects' to the bus. :returns: The same instance. :rtype: commissaire_http.bus.Bus """ if self.connection is not None: self.logger.warn('Bus already connected.') return self self.connection = Connection(self.connection_url) self._channel = self.connection.channel() self._exchange = Exchange( self.exchange_name, type='topic').bind(self._channel) self._exchange.declare() # Create queues self._queues = [] for kwargs in self.qkwargs: queue = Queue(**kwargs) queue.exchange = self._exchange queue = queue.bind(self._channel) self._queues.append(queue) self.logger.debug('Created queue %s', queue.as_dict()) # Create producer for publishing on topics self.producer = Producer(self._channel, self._exchange) self.logger.debug('Bus connection finished') return self
def get_consumers(self, consumer, channel): queue = kombu.Queue(config.rabbitmq_queue(), routing_key=config.rabbitmq_routing_key()) return [consumer( [queue], callbacks=[self.on_message], auto_declare=False)]
def _configure_retry_exchanges(self, connection): def declare_queues(): channel = connection.channel() almanach_exchange = Exchange(name=config.rabbitmq_retry_return_exchange(), type='direct', channel=channel) retry_exchange = Exchange(name=config.rabbitmq_retry_exchange(), type='direct', channel=channel) retry_queue = Queue(name=config.rabbitmq_retry_queue(), exchange=retry_exchange, routing_key=config.rabbitmq_routing_key(), queue_arguments=self._get_queue_arguments(), channel=channel) almanach_queue = Queue(name=config.rabbitmq_queue(), exchange=almanach_exchange, durable=False, routing_key=config.rabbitmq_routing_key(), channel=channel) retry_queue.declare() almanach_queue.declare() return retry_exchange def error_callback(exception, interval): logging.error('Failed to declare queues and exchanges, retrying in %d seconds. %r' % (interval, exception)) declare_queues = connection.ensure(connection, declare_queues, errback=error_callback, interval_start=0, interval_step=5, interval_max=30) return declare_queues()
def __init__(self, broker_url, queue_name, fetch_count=10): """?????? Args: broker_url (string): broker?? queue_name (string): ??????? fetch_count (int): ??????? """ self.queue_name = queue_name self.broker_url = broker_url self.fetch_count = fetch_count self.connection = Connection(broker_url) self.queue = Queue(queue_name)
def setup(self): super(ReplyConsumer, self).setup() config = self.container.config """Declare consumer queue for this service in current region""" self.queue = Queue( exchange=orders_exchange, routing_key='{}_{}'.format( config['REGION'], ROUTING_KEY_CALCULATE_TAXES ), name='fed.{}_{}'.format( config['REGION'], ROUTING_KEY_CALCULATE_TAXES ) ) """Bind federated queues in all regions to `orders` exchange with correct routing key. """ with get_connection(config[AMQP_URI_CONFIG_KEY]) as connection: maybe_declare(orders_exchange, connection) self._bind_queues_in_for_all_regions( ROUTING_KEY_CALCULATE_TAXES, connection ) self._bind_queues_in_for_all_regions( ROUTING_KEY_CALCULATE_TAXES_REPLY, connection )
def _bind_queues_in_for_all_regions(self, routing_key, connection): for region in REGIONS: maybe_declare(Queue( exchange=orders_exchange, routing_key='{}_{}'.format( region, routing_key ), name='fed.{}_{}'.format( region, routing_key ) ), connection)
def setup(self): reply_queue_name = "{}_{}".format( self.container.config['REGION'], ROUTING_KEY_CALCULATE_TAXES_REPLY ) queue = Queue( exchange=orders_exchange, routing_key=reply_queue_name, name='fed.{}'.format(reply_queue_name) ) self.queue = queue super(DynamicConsumer, self).setup()
def get_consumers(self, consumer, channel): api_event_queue = Queue( "zstack.ui.api.event.%s" % self.uuid, exchange=self.broadcast_exchange, routing_key="key.event.API.API_EVENT", auto_delete=True) canonical_event_queue = Queue( "zstack.ui.canonical.event.%s" % self.uuid, exchange=self.broadcast_exchange, routing_key="key.event.LOCAL.canonicalEvent", auto_delete=True) # self.new_channel = channel.connection.channel() reply_queue_name = "zstack.ui.message.%s" % self.uuid reply_queue = Queue( reply_queue_name, # exchange=self.p2p_exchange, # routing_key="zstack.message.cloudbus.#", [binding(self.p2p_exchange, "zstack.message.vmInstance.#"), binding(self.p2p_exchange, "zstack.message.ecs.vm.#"), binding(self.p2p_exchange, "zstack.message.aliyun.sdk.#") ], auto_delete=True) return [ consumer( queues=[canonical_event_queue], callbacks=[self.on_canonical_event]), consumer( queues=[api_event_queue], callbacks=[self.on_api_event]), consumer( queues=[reply_queue], callbacks=[self.on_message]) ]
def get_consumers(self, consumer, channel): exchange = Exchange(exchange_name, type="topic", durable=True) queue = Queue(queue_name, exchange, routing_key=routing_key, durable=True) return [consumer(queue, callbacks=[self.on_message])]
def create_partitioned_queues(name): exchange = Exchange(name, type='direct') for num in range(1): CELERY_QUEUES.append(Queue( '{0}-{1}'.format(name, num), exchange=exchange, ))
def _create_fanout_exchange(self, Consumer, channel): """ Creates a fanout queue to accept notifications. """ # declare_kwargs = dict( # exchange='{}_fanout'.format(self.exchange) # ) # if PIKA_VERSION >= StrictVersion('0.10.0'): # kwarg = 'exchange_type' # else: # kwarg = 'type' # declare_kwargs[kwarg] = 'fanout' exchange_name = '{}_fanout'.format(self.exchange) log.debug('Declaring fanout exchange %s', exchange_name) exchange = kombu.Exchange( name=exchange_name, channel=channel, durable=False, type='fanout' ) exchange.declare() queue_name = 'fanout_callback_{}'.format(uuid.uuid4()) log.debug('Declaring fanout queue %s', queue_name) queue = kombu.Queue( name=queue_name, exchange=exchange, exclusive=True, durable=False, channel=channel ) queue.declare() consumer = Consumer( # self.connection, queues=[queue], on_message=self._on_broadcast, no_ack=True # no_ack=True ) # consumer.consume(no_ack=True) # channel.exchange_declare(**declare_kwargs) # fanout_queue = channel.queue_declare(exclusive=True) # channel.queue_bind(exchange='{}_fanout'.format(self.exchange), queue=fanout_queue.method.queue) # channel.basic_consume(self._on_broadcast, queue=fanout_queue.method.queue, no_ack=True) return consumer
def __init__( self, exchange_name, connection_url, qkwargs, config_file=None): """ Initializes a new Service instance. :param exchange_name: Name of the topic exchange. :type exchange_name: str :param connection_url: Kombu connection url. :type connection_url: str :param qkwargs: One or more dicts keyword arguments for queue creation :type qkwargs: list :param config_file: Path to the configuration file location. :type config_file: str or None """ name = self.__class__.__name__ self.logger = logging.getLogger(name) self.logger.debug('Initializing {}'.format(name)) # If we are given no default, use the global one # Read the configuration file self._config_data = read_config_file( config_file, self._default_config_file) if connection_url is None and 'bus_uri' in self._config_data: connection_url = self._config_data.get('bus_uri') self.logger.debug( 'Using connection_url=%s from config file', connection_url) if exchange_name is None and 'exchange_name' in self._config_data: self.logger.debug( 'Using exchange_name=%s from config file', exchange_name) exchange_name = self._config_data.get('bus_exchange') self.connection = Connection(connection_url) self._channel = self.connection.default_channel self._exchange = Exchange( exchange_name, type='topic').bind(self._channel) self._exchange.declare() # Set up queues self._queues = [] for kwargs in qkwargs: queue = Queue(**kwargs) queue.exchange = self._exchange queue = queue.bind(self._channel) self._queues.append(queue) self.logger.debug(queue.as_dict()) # Create producer for publishing on topics self.producer = Producer(self._channel, self._exchange) self.logger.debug('Initializing of {} finished'.format(name))
def setup_entities(self): """ declare all required entities no advanced error handling yet (like error on declaration with altered properties etc) """ # return if already inited if self._service_inited: return # setup exchange self._booking_exchange = Exchange(self._config.get_mq_config(constants.EXCHANGE), type='topic', durable=True) # setup durable queues self.work_queue = Queue(self._config.get_mq_config(constants.WORK_QUEUE), exchange=self._booking_exchange, routing_key=constants.WORK_QUEUE + ".#", durable=True) self.retry_queue = Queue(self._config.get_mq_config(constants.RETRY_QUEUE), exchange=self._booking_exchange, routing_key=constants.RETRY_QUEUE + ".#", durable=True) self.dlq_queue = Queue(self._config.get_mq_config(constants.DEAD_LETTER_QUEUE), exchange=self._booking_exchange, routing_key=constants.DEAD_LETTER_QUEUE + ".#", durable=True) # a buffer queue is needed by error-queue-consumer to temp-buffer msgs for processing # this is to handle retry loop which may cause between retry-queue and work-queue. # todo: Need to implement an alternive as this has a copy overhead # which can be significant when the error queue is large self.buffer_queue = Queue(name=self._config.get_mq_config(constants.BUFFER_QUEUE), exchange=self._booking_exchange, routing_key='buffer.#', durable=True) # todo: do we need to make confirm_publish configurable? self._conn = Connection(self.get_config().rabbitmq_url, transport_options={'confirm_publish': True}) # declare all the exchanges and queues needed (declare, not overwrite existing) for entity in [self._booking_exchange, self.work_queue, self.retry_queue, self.dlq_queue, self.buffer_queue]: entity.maybe_bind(self._conn) entity.declare() # setup producer to push to error and dlqs self._producer = Producer(channel=self._conn.channel(), exchange=self._booking_exchange) self._service_inited = True