我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用kombu.Exchange()。
def setUp(self): self.exchange = Exchange(EXCHANGE, type="topic") self.result_queue = Queue(name="result_queue", routing_key="harvest.status.weibo.*", exchange=self.exchange, durable=True) self.web_harvest_queue = Queue(name="web_harvest_queue", routing_key="harvest.start.web", exchange=self.exchange) self.warc_created_queue = Queue(name="warc_created_queue", routing_key="warc_created", exchange=self.exchange) weibo_harvester_queue = Queue(name="weibo_harvester", exchange=self.exchange) with self._create_connection() as connection: self.result_queue(connection).declare() self.result_queue(connection).purge() self.web_harvest_queue(connection).declare() self.web_harvest_queue(connection).purge() self.warc_created_queue(connection).declare() self.warc_created_queue(connection).purge() # avoid raise NOT_FOUND error 404 weibo_harvester_queue(connection).declare() weibo_harvester_queue(connection).purge() self.path = None
def __init__(self, client, name, routing_key=None, logger=None): """Initialization :param client: instance of client :type client: Client :param name: name of exchange :type name: str :param routing_key: routing key to queue :type routing_key: str or None """ self.client = client self.name = name self.routing_key = routing_key if logger is None: logger = _logger # pragma: no cover self.logger = logger self.logger.debug('Exchange "%s" built, routing_key: %s', self.name, self.routing_key if not self.routing_key is None else '')
def declare_exchange(self, name, type='direct', queues=None, **options): """Create or update exchange :param name: name of exchange :type name: str :param type: type of exchange - direct, fanout, topic, match :type type: str :param queues: list of queues with routing keys: [[queue_name, routing_key], [queue_name, routing_key], ...] :type queues: list, None or tuple :param options: additional options for Exchange creation """ if queues is None: queues = [] # pragma: no cover with self.connections[self.connection].acquire() as conn: exchange = Exchange(name, type=type, channel=conn, **options) exchange.declare() self.exchanges[name] = exchange for q_name, routing_key in queues: queue = Queue(name=q_name, channel=conn) queue.declare() queue.bind_to(exchange=name, routing_key=routing_key) self.logger.debug('Queue "%s" with routing_key "%s" was bond to exchange "%s"', q_name, routing_key if routing_key else q_name, name)
def _connect(self): self._conn = kombu.Connection( self._hostname, connect_timeout=self._connect_timeout ) self._channel = self._conn.channel() self._exchange = kombu.Exchange( name=self._exchange_name, channel=self._channel, durable=False ) self._callback_queue = self._create_callback_queue( self._channel, self._exchange )
def consume(self): def worker(event, message): page_id = event['recipient']['id'] bot_class, bot_args = self.get_bot(page_id) p = Process(target=spawn_bot_amqp, args=(bot_class, bot_args, self.transport, self.send_exchange, self.send_queue, event, message)) p.start() def stop_worker(signum, frame): p.terminate() p.join() signal.signal(signal.SIGTERM, stop_worker) exchange = Exchange(self.exchange, 'direct', durable=True) queue = Queue(self.queue, exchange=exchange, routing_key=self.queue) with Connection(self.transport) as conn: with conn.Consumer(queue, callbacks=[worker]) as consumer: while True: conn.drain_events()
def sender(self): def worker(event, message): p = Process(target=spawn_send_message_worker, args=(event, message)) p.start() def stop_worker(signum, frame): p.terminate() p.join() signal.signal(signal.SIGTERM, stop_worker) exchange = Exchange(self.send_exchange, 'direct', durable=True) queue = Queue(self.send_queue, exchange=exchange, routing_key=self.send_queue) with Connection(self.send_transport) as conn: with conn.Consumer(queue, callbacks=[worker]) as consumer: while True: conn.drain_events()
def connect(self, exchange, channel): # pragma: no cover """ Readies the StorageNotify for publishing notification messages by setting up a kombu.Producer. :param exchange: The exchange for publishing notifications. :type exchange: kombu.Exchange :param channel: The channel to bind to. :type channel: kombu.transport.base.StdChannel """ name = self.__class__.__name__ self.logger.debug('Connecting {}'.format(name)) self._queue = kombu.Queue(exchange=exchange, channel=channel) self._queue.declare() self._producer = kombu.Producer(channel, exchange)
def _publish(self, event, model_instance): """ Internal function to publish "created", "deleted", and "updated" notification messages. :param event: The event name ("created", "deleted", or "updated") :type event: str :param model_instance: The model instance upon which the event occurred :type model_instance: commissaire.model.Model """ class_name = model_instance.__class__.__name__ body = { 'event': event, 'class': class_name, 'model': model_instance.to_dict_safe() } routing_key = 'notify.storage.{}.{}'.format(class_name, event) if self._producer: self.logger.debug('Publish "{}": {}'.format(routing_key, body)) self._producer.publish( body, routing_key, kombu.Exchange.TRANSIENT_DELIVERY_MODE) else: # This means the connect() method was not called. self.logger.warn('Not publishing "%s"', routing_key)
def _configure_dead_exchange(self, connection): def declare_dead_queue(): channel = connection.channel() dead_exchange = Exchange(name=config.rabbitmq_dead_exchange(), type='direct', channel=channel) dead_queue = Queue(name=config.rabbitmq_dead_queue(), routing_key=config.rabbitmq_routing_key(), exchange=dead_exchange, channel=channel) dead_queue.declare() return dead_exchange def error_callback(exception, interval): logging.error('Failed to declare dead queue and exchange, retrying in %d seconds. %r' % (interval, exception)) declare_dead_queue = connection.ensure(connection, declare_dead_queue, errback=error_callback, interval_start=0, interval_step=5, interval_max=30) return declare_dead_queue()
def send(self, topic, message): """Publishes a pulse message to the proper exchange.""" if not message: Log.error("Expecting a message") message._prepare() if not self.connection: self.connect() producer = Producer( channel=self.connection, exchange=Exchange(self.settings.exchange, type='topic'), routing_key=topic ) # The message is actually a simple envelope format with a payload and # some metadata. final_data = Data( payload=message.data, _meta=set_default({ 'exchange': self.settings.exchange, 'routing_key': message.routing_key, 'serializer': self.settings.serializer, 'sent': time_to_string(datetime.datetime.now(timezone(self.settings.broker_timezone))), 'count': self.count }, message.metadata) ) producer.publish(jsons.scrub(final_data), serializer=self.settings.serializer) self.count += 1
def get_consumers(self, consumer, channel): consumers = [] exchanges = DNS_CONF["exchanges"] exchanges = exchanges.split(",") for exch in exchanges: exchange = Exchange(exch, type="topic", durable=False) queue = Queue(DNS_CONF["queue_name"], exchange, routing_key=DNS_CONF["routing_key"], durable=False, auto_delete=True, no_ack=True) consumers.append(consumer(queue, callbacks=[self.on_message])) return consumers
def __init__(self, connection: Connection, logger: logging.Logger=None) -> None: self._amqp_uri = connection.amqp_uri self._cnx = BrokerConnection(hostname=connection.amqp_uri) self._exchange = Exchange(connection.exchange, type=connection.exchange_type, durable=connection.is_durable) self._logger = logger or logging.getLogger(__name__)
def __init__(self, connection: Connection, configuration: BrightsideConsumerConfiguration, logger: logging.Logger=None) -> None: self._exchange = Exchange(connection.exchange, type=connection.exchange_type, durable=connection.is_durable) self._routing_key = configuration.routing_key self._amqp_uri = connection.amqp_uri self._queue_name = configuration.queue_name self._routing_key = configuration.routing_key self._prefetch_count = configuration.prefetch_count self._is_durable = configuration.is_durable self._message_factory = ArameMessageFactory() self._logger = logger or logging.getLogger(__name__) self._queue = Queue(self._queue_name, exchange=self._exchange, routing_key=self._routing_key) self._msg = None # Kombu Message self._message = None # Brightside Message # TODO: Need to fix the argument types with default types issue
def build_queues(cls): default_exchange = Exchange(cls.default_name, type='direct') queues = [ Queue(cls.default_name, default_exchange, routing_key=cls.default_name), Queue(cls.system_name, default_exchange, routing_key=cls.system_name) ] return tuple(set(queues))
def delete_exchange(self, name): """Delete exchange by name :param name: name of exchange :type name: str """ with self.connections[self.connection].acquire() as conn: exchange = self.exchanges.pop(name, Exchange(name, channel=conn)) exchange.delete() self.logger.debug('Exchange "%s" was deleted', name)
def _create_service_queues(self, services, Consumer, channel): """ Creates necessary AMQP queues, one per service. """ log.debug('Declaring exchange %s', self.exchange) exchange = kombu.Exchange( self.exchange, channel=channel, durable=False ) exchange.declare() # channel.exchange_declare(exchange=self.exchange) queues = [] for service in services.values(): queue_name = '{}_service_{}'.format(self.exchange, service.name) log.debug('Declaring service queue %s', queue_name) queue = kombu.Queue( channel=channel, name=queue_name, exchange=exchange, routing_key=queue_name, exclusive=False, durable=False, # channel=channel ) queue.declare() queues.append(queue) # channel.queue_delete(queue=queue) # channel.queue_declare(queue=queue, auto_delete=True) # channel.queue_bind(queue, self.exchange) # channel.basic_consume(self._on_message, queue=queue, no_ack=False) consumer = Consumer( # self.connection, queues=queues, on_message=self._on_message, no_ack=False ) # consumer.consume(no_ack=False) return consumer
def _exchange(self): return kombu.Exchange(self.channel, type='fanout', durable=False)
def get_consumers(self, consumer, channel): exchange = Exchange(self.nova_exchange, type="topic", durable=False) queue = Queue(self.queue_name, exchange, routing_key=self.routing_key, durable=False, auto_delete=True, no_ack=True) return [consumer(queue, callbacks=[self.handle_notification])]
def _make_publisher(self): bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**self.config) bus_connection = Connection(bus_url) bus_exchange = Exchange(self.config['exchange_name'], type=self.config['exchange_type']) bus_producer = Producer(bus_connection, exchange=bus_exchange, auto_declare=True) bus_marshaler = Marshaler(self._uuid) return Publisher(bus_producer, bus_marshaler)
def __init__(self, global_config): self._events_pubsub = Pubsub() self._bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**global_config['bus']) self._exchange = Exchange(global_config['bus']['exchange_name'], type=global_config['bus']['exchange_type']) self._queue = kombu.Queue(exclusive=True) self._is_running = False
def _make_publisher(self): bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**self.config) bus_connection = Connection(bus_url) same_exchange_arguments_as_collectd = {'arguments': {'auto_delete': True}, 'durable': False} bus_exchange = Exchange(self.config['exchange_name'], type=self.config['exchange_type'], **same_exchange_arguments_as_collectd) bus_producer = Producer(bus_connection, exchange=bus_exchange, auto_declare=True) bus_marshaler = CollectdMarshaler(self._uuid) return Publisher(bus_producer, bus_marshaler)
def __init__(self, hosts_conf, exchange_name='', exchange_type='', exchange_arguments=None, queue_name='', routing_key='', queue_arguments=None, callback=None, no_ack=True): self.hosts_conf = hosts_conf self.hosts = self.create_hosts() self.connection = Connection(self.hosts) self.task_exchange = Exchange(name=exchange_name, type=exchange_type, arguments=exchange_arguments) self.task_queues = [Queue(name=queue_name, exchange=self.task_exchange, routing_key=routing_key, queue_arguments=queue_arguments)] self.callback = callback self.no_ack = no_ack
def queue_send(self, recipient, message=None, sender_action=None): exchange = Exchange(self.send_exchange, 'direct', durable=True) queue = Queue(self.send_queue, exchange=exchange, routing_key=self.send_queue) with Connection(self.send_transport) as conn: producer = conn.Producer(serializer='json') event = { 'recipient': recipient, 'message': message, 'sender_action': sender_action, 'page_access_token': self.page_access_token } producer.publish(event, exchange=exchange, routing_key=queue.routing_key, declare=[queue])
def queue_events(self, events): exchange = Exchange(self.exchange, 'direct', durable=True) queue = Queue(self.queue, exchange=exchange, routing_key=self.queue) with Connection(self.transport) as conn: producer = conn.Producer(serializer='json') for event in events: producer.publish(event, exchange=exchange, routing_key=queue.routing_key, declare=[queue])
def _queue(self, conn=None): exchange = kombu.Exchange(self.channel, type='fanout', durable=False) queue = kombu.Queue(str(uuid.uuid4()), exchange) return queue
def connect(self): """ 'Connects' to the bus. :returns: The same instance. :rtype: commissaire_http.bus.Bus """ if self.connection is not None: self.logger.warn('Bus already connected.') return self self.connection = Connection(self.connection_url) self._channel = self.connection.channel() self._exchange = Exchange( self.exchange_name, type='topic').bind(self._channel) self._exchange.declare() # Create queues self._queues = [] for kwargs in self.qkwargs: queue = Queue(**kwargs) queue.exchange = self._exchange queue = queue.bind(self._channel) self._queues.append(queue) self.logger.debug('Created queue %s', queue.as_dict()) # Create producer for publishing on topics self.producer = Producer(self._channel, self._exchange) self.logger.debug('Bus connection finished') return self
def _configure_retry_exchanges(self, connection): def declare_queues(): channel = connection.channel() almanach_exchange = Exchange(name=config.rabbitmq_retry_return_exchange(), type='direct', channel=channel) retry_exchange = Exchange(name=config.rabbitmq_retry_exchange(), type='direct', channel=channel) retry_queue = Queue(name=config.rabbitmq_retry_queue(), exchange=retry_exchange, routing_key=config.rabbitmq_routing_key(), queue_arguments=self._get_queue_arguments(), channel=channel) almanach_queue = Queue(name=config.rabbitmq_queue(), exchange=almanach_exchange, durable=False, routing_key=config.rabbitmq_routing_key(), channel=channel) retry_queue.declare() almanach_queue.declare() return retry_exchange def error_callback(exception, interval): logging.error('Failed to declare queues and exchanges, retrying in %d seconds. %r' % (interval, exception)) declare_queues = connection.ensure(connection, declare_queues, errback=error_callback, interval_start=0, interval_step=5, interval_max=30) return declare_queues()
def __init__(self, exchange_name, broker_url, mode=ASYNC): """?????? Args: exchange_name (string): ???? broker_url (string): ???? mode (int): ?? """ self.exchange_name = exchange_name self.broker_url = broker_url self.mode = mode self.exchange = Exchange(exchange_name, type='direct') self.connection = Connection(broker_url)
def __init__(self, connection): self.connection = connection self.uuid = uuid4() self.broadcast_exchange = Exchange('BROADCAST', type='topic', passive=True) self.p2p_exchange = Exchange('P2P', type='topic', passive=True)
def get_consumers(self, consumer, channel): exchange = Exchange(exchange_name, type="topic", durable=True) queue = Queue(queue_name, exchange, routing_key=routing_key, durable=True) return [consumer(queue, callbacks=[self.on_message])]
def create_partitioned_queues(name): exchange = Exchange(name, type='direct') for num in range(1): CELERY_QUEUES.append(Queue( '{0}-{1}'.format(name, num), exchange=exchange, ))
def get_celery_queues(): global _celery_queues if not _celery_queues: _celery_queues = [ Queue(q, Exchange(q), routing_key=q) for q in ASSEMBL_CELERY_APPS] return _celery_queues
def _create_fanout_exchange(self, Consumer, channel): """ Creates a fanout queue to accept notifications. """ # declare_kwargs = dict( # exchange='{}_fanout'.format(self.exchange) # ) # if PIKA_VERSION >= StrictVersion('0.10.0'): # kwarg = 'exchange_type' # else: # kwarg = 'type' # declare_kwargs[kwarg] = 'fanout' exchange_name = '{}_fanout'.format(self.exchange) log.debug('Declaring fanout exchange %s', exchange_name) exchange = kombu.Exchange( name=exchange_name, channel=channel, durable=False, type='fanout' ) exchange.declare() queue_name = 'fanout_callback_{}'.format(uuid.uuid4()) log.debug('Declaring fanout queue %s', queue_name) queue = kombu.Queue( name=queue_name, exchange=exchange, exclusive=True, durable=False, channel=channel ) queue.declare() consumer = Consumer( # self.connection, queues=[queue], on_message=self._on_broadcast, no_ack=True # no_ack=True ) # consumer.consume(no_ack=True) # channel.exchange_declare(**declare_kwargs) # fanout_queue = channel.queue_declare(exclusive=True) # channel.queue_bind(exchange='{}_fanout'.format(self.exchange), queue=fanout_queue.method.queue) # channel.basic_consume(self._on_broadcast, queue=fanout_queue.method.queue, no_ack=True) return consumer
def __init__( self, exchange_name, connection_url, qkwargs, config_file=None): """ Initializes a new Service instance. :param exchange_name: Name of the topic exchange. :type exchange_name: str :param connection_url: Kombu connection url. :type connection_url: str :param qkwargs: One or more dicts keyword arguments for queue creation :type qkwargs: list :param config_file: Path to the configuration file location. :type config_file: str or None """ name = self.__class__.__name__ self.logger = logging.getLogger(name) self.logger.debug('Initializing {}'.format(name)) # If we are given no default, use the global one # Read the configuration file self._config_data = read_config_file( config_file, self._default_config_file) if connection_url is None and 'bus_uri' in self._config_data: connection_url = self._config_data.get('bus_uri') self.logger.debug( 'Using connection_url=%s from config file', connection_url) if exchange_name is None and 'exchange_name' in self._config_data: self.logger.debug( 'Using exchange_name=%s from config file', exchange_name) exchange_name = self._config_data.get('bus_exchange') self.connection = Connection(connection_url) self._channel = self.connection.default_channel self._exchange = Exchange( exchange_name, type='topic').bind(self._channel) self._exchange.declare() # Set up queues self._queues = [] for kwargs in qkwargs: queue = Queue(**kwargs) queue.exchange = self._exchange queue = queue.bind(self._channel) self._queues.append(queue) self.logger.debug(queue.as_dict()) # Create producer for publishing on topics self.producer = Producer(self._channel, self._exchange) self.logger.debug('Initializing of {} finished'.format(name))
def base_app(): """Flask application fixture without InvenioStats.""" from invenio_stats.config import STATS_EVENTS instance_path = tempfile.mkdtemp() app_ = Flask('testapp', instance_path=instance_path) stats_events = {'file-download': deepcopy(STATS_EVENTS['file-download'])} stats_events.update({'event_{}'.format(idx): {} for idx in range(5)}) app_.config.update(dict( CELERY_ALWAYS_EAGER=True, CELERY_TASK_ALWAYS_EAGER=True, CELERY_CACHE_BACKEND='memory', CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, CELERY_TASK_EAGER_PROPAGATES=True, CELERY_RESULT_BACKEND='cache', SQLALCHEMY_DATABASE_URI=os.environ.get( 'SQLALCHEMY_DATABASE_URI', 'sqlite://'), SQLALCHEMY_TRACK_MODIFICATIONS=True, TESTING=True, OAUTH2SERVER_CLIENT_ID_SALT_LEN=64, OAUTH2SERVER_CLIENT_SECRET_SALT_LEN=60, OAUTH2SERVER_TOKEN_PERSONAL_SALT_LEN=60, STATS_MQ_EXCHANGE=Exchange( 'test_events', type='direct', delivery_mode='transient', # in-memory queue durable=True, ), SECRET_KEY='asecretkey', SERVER_NAME='localhost', STATS_QUERIES={'bucket-file-download-histogram': {}, 'bucket-file-download-total': {}, 'test-query': {}, 'test-query2': {}}, STATS_EVENTS=stats_events, STATS_AGGREGATIONS={'file-download-agg': {}} )) FlaskCeleryExt(app_) InvenioAccounts(app_) InvenioAccountsREST(app_) InvenioDB(app_) InvenioRecords(app_) InvenioFilesREST(app_) InvenioPIDStore(app_) InvenioQueues(app_) InvenioOAuth2Server(app_) InvenioOAuth2ServerREST(app_) InvenioSearch(app_, entry_point_group=None) with app_.app_context(): yield app_ shutil.rmtree(instance_path)
def setup_entities(self): """ declare all required entities no advanced error handling yet (like error on declaration with altered properties etc) """ # return if already inited if self._service_inited: return # setup exchange self._booking_exchange = Exchange(self._config.get_mq_config(constants.EXCHANGE), type='topic', durable=True) # setup durable queues self.work_queue = Queue(self._config.get_mq_config(constants.WORK_QUEUE), exchange=self._booking_exchange, routing_key=constants.WORK_QUEUE + ".#", durable=True) self.retry_queue = Queue(self._config.get_mq_config(constants.RETRY_QUEUE), exchange=self._booking_exchange, routing_key=constants.RETRY_QUEUE + ".#", durable=True) self.dlq_queue = Queue(self._config.get_mq_config(constants.DEAD_LETTER_QUEUE), exchange=self._booking_exchange, routing_key=constants.DEAD_LETTER_QUEUE + ".#", durable=True) # a buffer queue is needed by error-queue-consumer to temp-buffer msgs for processing # this is to handle retry loop which may cause between retry-queue and work-queue. # todo: Need to implement an alternive as this has a copy overhead # which can be significant when the error queue is large self.buffer_queue = Queue(name=self._config.get_mq_config(constants.BUFFER_QUEUE), exchange=self._booking_exchange, routing_key='buffer.#', durable=True) # todo: do we need to make confirm_publish configurable? self._conn = Connection(self.get_config().rabbitmq_url, transport_options={'confirm_publish': True}) # declare all the exchanges and queues needed (declare, not overwrite existing) for entity in [self._booking_exchange, self.work_queue, self.retry_queue, self.dlq_queue, self.buffer_queue]: entity.maybe_bind(self._conn) entity.declare() # setup producer to push to error and dlqs self._producer = Producer(channel=self._conn.channel(), exchange=self._booking_exchange) self._service_inited = True