Python kombu 模块,Exchange() 实例源码

我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用kombu.Exchange()

项目:sfm-weibo-harvester    作者:gwu-libraries    | 项目源码 | 文件源码
def setUp(self):
        self.exchange = Exchange(EXCHANGE, type="topic")
        self.result_queue = Queue(name="result_queue", routing_key="harvest.status.weibo.*", exchange=self.exchange,
                                  durable=True)
        self.web_harvest_queue = Queue(name="web_harvest_queue", routing_key="harvest.start.web",
                                       exchange=self.exchange)
        self.warc_created_queue = Queue(name="warc_created_queue", routing_key="warc_created", exchange=self.exchange)
        weibo_harvester_queue = Queue(name="weibo_harvester", exchange=self.exchange)
        with self._create_connection() as connection:
            self.result_queue(connection).declare()
            self.result_queue(connection).purge()
            self.web_harvest_queue(connection).declare()
            self.web_harvest_queue(connection).purge()
            self.warc_created_queue(connection).declare()
            self.warc_created_queue(connection).purge()
            # avoid raise NOT_FOUND error 404
            weibo_harvester_queue(connection).declare()
            weibo_harvester_queue(connection).purge()

        self.path = None
项目:microservices    作者:viatoriche    | 项目源码 | 文件源码
def __init__(self, client, name, routing_key=None, logger=None):
        """Initialization

        :param client: instance of client
        :type client: Client
        :param name: name of exchange
        :type name: str
        :param routing_key: routing key to queue
        :type routing_key: str or None
        """
        self.client = client
        self.name = name
        self.routing_key = routing_key
        if logger is None:
            logger = _logger  # pragma: no cover
        self.logger = logger
        self.logger.debug('Exchange "%s" built, routing_key: %s', self.name,
                          self.routing_key if not self.routing_key is None else '')
项目:microservices    作者:viatoriche    | 项目源码 | 文件源码
def declare_exchange(self, name, type='direct', queues=None, **options):
        """Create or update exchange

        :param name: name of exchange
        :type name: str
        :param type: type of exchange - direct, fanout, topic, match
        :type type: str
        :param queues: list of queues with routing keys: [[queue_name, routing_key], [queue_name, routing_key], ...]
        :type queues: list, None or tuple
        :param options: additional options for Exchange creation
        """
        if queues is None:
            queues = []  # pragma: no cover

        with self.connections[self.connection].acquire() as conn:
            exchange = Exchange(name, type=type, channel=conn, **options)
            exchange.declare()
            self.exchanges[name] = exchange
            for q_name, routing_key in queues:
                queue = Queue(name=q_name, channel=conn)
                queue.declare()
                queue.bind_to(exchange=name, routing_key=routing_key)
                self.logger.debug('Queue "%s" with routing_key "%s" was bond to exchange "%s"', q_name,
                                  routing_key if routing_key else q_name, name)
项目:isc    作者:and3rson    | 项目源码 | 文件源码
def _connect(self):
        self._conn = kombu.Connection(
            self._hostname,
            connect_timeout=self._connect_timeout
        )

        self._channel = self._conn.channel()

        self._exchange = kombu.Exchange(
            name=self._exchange_name,
            channel=self._channel,
            durable=False
        )

        self._callback_queue = self._create_callback_queue(
            self._channel,
            self._exchange
        )
项目:koslab.messengerbot    作者:koslab    | 项目源码 | 文件源码
def consume(self):
        def worker(event, message):
            page_id = event['recipient']['id']
            bot_class, bot_args = self.get_bot(page_id)
            p = Process(target=spawn_bot_amqp, args=(bot_class, bot_args, 
                        self.transport, self.send_exchange, 
                        self.send_queue, event, message))
            p.start()
            def stop_worker(signum, frame):
                p.terminate()
                p.join()
            signal.signal(signal.SIGTERM, stop_worker)

        exchange = Exchange(self.exchange, 'direct', durable=True)
        queue = Queue(self.queue, exchange=exchange, routing_key=self.queue)
        with Connection(self.transport) as conn:
            with conn.Consumer(queue, callbacks=[worker]) as consumer:
                while True:
                    conn.drain_events()
项目:koslab.messengerbot    作者:koslab    | 项目源码 | 文件源码
def sender(self):
        def worker(event, message):
            p = Process(target=spawn_send_message_worker, args=(event, message))
            p.start()
            def stop_worker(signum, frame):
                p.terminate()
                p.join()
            signal.signal(signal.SIGTERM, stop_worker)


        exchange = Exchange(self.send_exchange, 'direct', durable=True)
        queue = Queue(self.send_queue, exchange=exchange, 
                        routing_key=self.send_queue)
        with Connection(self.send_transport) as conn:
            with conn.Consumer(queue, callbacks=[worker]) as consumer:
                while True:
                    conn.drain_events()
项目:commissaire    作者:projectatomic    | 项目源码 | 文件源码
def connect(self, exchange, channel):  # pragma: no cover
        """
        Readies the StorageNotify for publishing notification messages by
        setting up a kombu.Producer.

        :param exchange: The exchange for publishing notifications.
        :type exchange: kombu.Exchange
        :param channel: The channel to bind to.
        :type channel: kombu.transport.base.StdChannel
        """
        name = self.__class__.__name__
        self.logger.debug('Connecting {}'.format(name))

        self._queue = kombu.Queue(exchange=exchange, channel=channel)
        self._queue.declare()

        self._producer = kombu.Producer(channel, exchange)
项目:commissaire    作者:projectatomic    | 项目源码 | 文件源码
def _publish(self, event, model_instance):
        """
        Internal function to publish "created", "deleted", and "updated"
        notification messages.

        :param event: The event name ("created", "deleted", or "updated")
        :type event: str
        :param model_instance: The model instance upon which the event occurred
        :type model_instance: commissaire.model.Model
        """
        class_name = model_instance.__class__.__name__
        body = {
            'event': event,
            'class': class_name,
            'model': model_instance.to_dict_safe()
        }
        routing_key = 'notify.storage.{}.{}'.format(class_name, event)
        if self._producer:
            self.logger.debug('Publish "{}": {}'.format(routing_key, body))
            self._producer.publish(
                body, routing_key,
                kombu.Exchange.TRANSIENT_DELIVERY_MODE)
        else:
            # This means the connect() method was not called.
            self.logger.warn('Not publishing "%s"', routing_key)
项目:almanach    作者:internap    | 项目源码 | 文件源码
def _configure_dead_exchange(self, connection):
        def declare_dead_queue():
            channel = connection.channel()
            dead_exchange = Exchange(name=config.rabbitmq_dead_exchange(),
                                     type='direct',
                                     channel=channel)
            dead_queue = Queue(name=config.rabbitmq_dead_queue(),
                               routing_key=config.rabbitmq_routing_key(),
                               exchange=dead_exchange,
                               channel=channel)

            dead_queue.declare()

            return dead_exchange

        def error_callback(exception, interval):
            logging.error('Failed to declare dead queue and exchange, retrying in %d seconds. %r' %
                          (interval, exception))

        declare_dead_queue = connection.ensure(connection, declare_dead_queue, errback=error_callback,
                                               interval_start=0, interval_step=5, interval_max=30)
        return declare_dead_queue()
项目:jx-sqlite    作者:mozilla    | 项目源码 | 文件源码
def send(self, topic, message):
        """Publishes a pulse message to the proper exchange."""

        if not message:
            Log.error("Expecting a message")

        message._prepare()

        if not self.connection:
            self.connect()

        producer = Producer(
            channel=self.connection,
            exchange=Exchange(self.settings.exchange, type='topic'),
            routing_key=topic
        )

        # The message is actually a simple envelope format with a payload and
        # some metadata.
        final_data = Data(
            payload=message.data,
            _meta=set_default({
                'exchange': self.settings.exchange,
                'routing_key': message.routing_key,
                'serializer': self.settings.serializer,
                'sent': time_to_string(datetime.datetime.now(timezone(self.settings.broker_timezone))),
                'count': self.count
            }, message.metadata)
        )

        producer.publish(jsons.scrub(final_data), serializer=self.settings.serializer)
        self.count += 1
项目:fuel-plugin-dns-updater    作者:openstack    | 项目源码 | 文件源码
def get_consumers(self, consumer, channel):
        consumers = []
        exchanges = DNS_CONF["exchanges"]
        exchanges = exchanges.split(",")
        for exch in exchanges:
            exchange = Exchange(exch, type="topic", durable=False)
            queue = Queue(DNS_CONF["queue_name"], exchange,
                routing_key=DNS_CONF["routing_key"],
                durable=False, auto_delete=True, no_ack=True)
            consumers.append(consumer(queue, callbacks=[self.on_message]))
        return consumers
项目:Brightside    作者:BrighterCommand    | 项目源码 | 文件源码
def __init__(self, connection: Connection, logger: logging.Logger=None) -> None:
        self._amqp_uri = connection.amqp_uri
        self._cnx = BrokerConnection(hostname=connection.amqp_uri)
        self._exchange = Exchange(connection.exchange, type=connection.exchange_type, durable=connection.is_durable)
        self._logger = logger or logging.getLogger(__name__)
项目:Brightside    作者:BrighterCommand    | 项目源码 | 文件源码
def __init__(self, connection: Connection, configuration: BrightsideConsumerConfiguration, logger: logging.Logger=None) -> None:
        self._exchange = Exchange(connection.exchange, type=connection.exchange_type, durable=connection.is_durable)
        self._routing_key = configuration.routing_key
        self._amqp_uri = connection.amqp_uri
        self._queue_name = configuration.queue_name
        self._routing_key = configuration.routing_key
        self._prefetch_count = configuration.prefetch_count
        self._is_durable = configuration.is_durable
        self._message_factory = ArameMessageFactory()
        self._logger = logger or logging.getLogger(__name__)
        self._queue = Queue(self._queue_name, exchange=self._exchange, routing_key=self._routing_key)
        self._msg = None  # Kombu Message
        self._message = None  # Brightside Message

        # TODO: Need to fix the argument types with default types issue
项目:flask-boilerplate    作者:MarcFord    | 项目源码 | 文件源码
def build_queues(cls):
        default_exchange = Exchange(cls.default_name, type='direct')

        queues = [
            Queue(cls.default_name, default_exchange, routing_key=cls.default_name),
            Queue(cls.system_name, default_exchange, routing_key=cls.system_name)
        ]
        return tuple(set(queues))
项目:microservices    作者:viatoriche    | 项目源码 | 文件源码
def delete_exchange(self, name):
        """Delete exchange by name

        :param name: name of exchange
        :type name: str
        """
        with self.connections[self.connection].acquire() as conn:
            exchange = self.exchanges.pop(name, Exchange(name, channel=conn))
            exchange.delete()
            self.logger.debug('Exchange "%s" was deleted', name)
项目:isc    作者:and3rson    | 项目源码 | 文件源码
def _create_service_queues(self, services, Consumer, channel):
        """
        Creates necessary AMQP queues, one per service.
        """
        log.debug('Declaring exchange %s', self.exchange)
        exchange = kombu.Exchange(
            self.exchange,
            channel=channel,
            durable=False
        )
        exchange.declare()
        # channel.exchange_declare(exchange=self.exchange)
        queues = []
        for service in services.values():
            queue_name = '{}_service_{}'.format(self.exchange, service.name)
            log.debug('Declaring service queue %s', queue_name)
            queue = kombu.Queue(
                channel=channel,
                name=queue_name,
                exchange=exchange,
                routing_key=queue_name,
                exclusive=False,
                durable=False,
                # channel=channel
            )
            queue.declare()
            queues.append(queue)
            # channel.queue_delete(queue=queue)
            # channel.queue_declare(queue=queue, auto_delete=True)
            # channel.queue_bind(queue, self.exchange)
            # channel.basic_consume(self._on_message, queue=queue, no_ack=False)
        consumer = Consumer(
            # self.connection,
            queues=queues,
            on_message=self._on_message,
            no_ack=False
        )
        # consumer.consume(no_ack=False)
        return consumer
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def _exchange(self):
        return kombu.Exchange(self.channel, type='fanout', durable=False)
项目:omni    作者:openstack    | 项目源码 | 文件源码
def get_consumers(self, consumer, channel):
        exchange = Exchange(self.nova_exchange, type="topic", durable=False)
        queue = Queue(self.queue_name, exchange, routing_key=self.routing_key,
                      durable=False, auto_delete=True, no_ack=True)
        return [consumer(queue, callbacks=[self.handle_notification])]
项目:xivo-ctid-ng    作者:wazo-pbx    | 项目源码 | 文件源码
def _make_publisher(self):
        bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**self.config)
        bus_connection = Connection(bus_url)
        bus_exchange = Exchange(self.config['exchange_name'], type=self.config['exchange_type'])
        bus_producer = Producer(bus_connection, exchange=bus_exchange, auto_declare=True)
        bus_marshaler = Marshaler(self._uuid)
        return Publisher(bus_producer, bus_marshaler)
项目:xivo-ctid-ng    作者:wazo-pbx    | 项目源码 | 文件源码
def __init__(self, global_config):
        self._events_pubsub = Pubsub()

        self._bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**global_config['bus'])
        self._exchange = Exchange(global_config['bus']['exchange_name'],
                                  type=global_config['bus']['exchange_type'])
        self._queue = kombu.Queue(exclusive=True)
        self._is_running = False
项目:xivo-ctid-ng    作者:wazo-pbx    | 项目源码 | 文件源码
def _make_publisher(self):
        bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**self.config)
        bus_connection = Connection(bus_url)
        same_exchange_arguments_as_collectd = {'arguments': {'auto_delete': True}, 'durable': False}
        bus_exchange = Exchange(self.config['exchange_name'],
                                type=self.config['exchange_type'],
                                **same_exchange_arguments_as_collectd)
        bus_producer = Producer(bus_connection, exchange=bus_exchange, auto_declare=True)
        bus_marshaler = CollectdMarshaler(self._uuid)
        return Publisher(bus_producer, bus_marshaler)
项目:PythonSkillTree    作者:w4n9H    | 项目源码 | 文件源码
def __init__(self, hosts_conf, exchange_name='', exchange_type='', exchange_arguments=None,
                 queue_name='', routing_key='', queue_arguments=None, callback=None, no_ack=True):
        self.hosts_conf = hosts_conf
        self.hosts = self.create_hosts()
        self.connection = Connection(self.hosts)
        self.task_exchange = Exchange(name=exchange_name, type=exchange_type, arguments=exchange_arguments)
        self.task_queues = [Queue(name=queue_name, exchange=self.task_exchange, routing_key=routing_key,
                                  queue_arguments=queue_arguments)]
        self.callback = callback
        self.no_ack = no_ack
项目:koslab.messengerbot    作者:koslab    | 项目源码 | 文件源码
def queue_send(self, recipient, message=None, sender_action=None):
        exchange = Exchange(self.send_exchange, 'direct', durable=True)
        queue = Queue(self.send_queue, exchange=exchange, 
                        routing_key=self.send_queue)
        with Connection(self.send_transport) as conn:
            producer = conn.Producer(serializer='json')
            event = {
                'recipient': recipient,
                'message': message,
                'sender_action': sender_action,
                'page_access_token': self.page_access_token
            }
            producer.publish(event, exchange=exchange, 
                                routing_key=queue.routing_key,
                                declare=[queue])
项目:koslab.messengerbot    作者:koslab    | 项目源码 | 文件源码
def queue_events(self, events):
        exchange = Exchange(self.exchange, 'direct', durable=True)
        queue = Queue(self.queue, exchange=exchange, routing_key=self.queue)
        with Connection(self.transport) as conn:
            producer = conn.Producer(serializer='json')
            for event in events:
                producer.publish(event, exchange=exchange, 
                        routing_key=queue.routing_key,
                        declare=[queue])
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def _queue(self, conn=None):
        exchange = kombu.Exchange(self.channel, type='fanout', durable=False)
        queue = kombu.Queue(str(uuid.uuid4()), exchange)
        return queue
项目:commissaire-http    作者:projectatomic    | 项目源码 | 文件源码
def connect(self):
        """
        'Connects' to the bus.

        :returns: The same instance.
        :rtype: commissaire_http.bus.Bus
        """
        if self.connection is not None:
            self.logger.warn('Bus already connected.')
            return self

        self.connection = Connection(self.connection_url)
        self._channel = self.connection.channel()
        self._exchange = Exchange(
            self.exchange_name, type='topic').bind(self._channel)
        self._exchange.declare()

        # Create queues
        self._queues = []
        for kwargs in self.qkwargs:
            queue = Queue(**kwargs)
            queue.exchange = self._exchange
            queue = queue.bind(self._channel)
            self._queues.append(queue)
            self.logger.debug('Created queue %s', queue.as_dict())

        # Create producer for publishing on topics
        self.producer = Producer(self._channel, self._exchange)
        self.logger.debug('Bus connection finished')
        return self
项目:almanach    作者:internap    | 项目源码 | 文件源码
def _configure_retry_exchanges(self, connection):
        def declare_queues():
            channel = connection.channel()
            almanach_exchange = Exchange(name=config.rabbitmq_retry_return_exchange(),
                                         type='direct',
                                         channel=channel)
            retry_exchange = Exchange(name=config.rabbitmq_retry_exchange(),
                                      type='direct',
                                      channel=channel)
            retry_queue = Queue(name=config.rabbitmq_retry_queue(),
                                exchange=retry_exchange,
                                routing_key=config.rabbitmq_routing_key(),
                                queue_arguments=self._get_queue_arguments(),
                                channel=channel)
            almanach_queue = Queue(name=config.rabbitmq_queue(),
                                   exchange=almanach_exchange,
                                   durable=False,
                                   routing_key=config.rabbitmq_routing_key(),
                                   channel=channel)

            retry_queue.declare()
            almanach_queue.declare()

            return retry_exchange

        def error_callback(exception, interval):
            logging.error('Failed to declare queues and exchanges, retrying in %d seconds. %r' % (interval, exception))

        declare_queues = connection.ensure(connection, declare_queues, errback=error_callback,
                                           interval_start=0, interval_step=5, interval_max=30)
        return declare_queues()
项目:github_spider    作者:LiuRoy    | 项目源码 | 文件源码
def __init__(self, exchange_name, broker_url, mode=ASYNC):
        """??????

        Args:
            exchange_name (string): ????
            broker_url (string): ????
            mode (int): ??
        """
        self.exchange_name = exchange_name
        self.broker_url = broker_url
        self.mode = mode

        self.exchange = Exchange(exchange_name, type='direct')
        self.connection = Connection(broker_url)
项目:Flask-SocketIO    作者:cutedogspark    | 项目源码 | 文件源码
def _queue(self, conn=None):
        exchange = kombu.Exchange(self.channel, type='fanout', durable=False)
        queue = kombu.Queue(str(uuid.uuid4()), exchange)
        return queue
项目:Charlie    作者:nxintech    | 项目源码 | 文件源码
def __init__(self, connection):
        self.connection = connection

        self.uuid = uuid4()
        self.broadcast_exchange = Exchange('BROADCAST', type='topic', passive=True)
        self.p2p_exchange = Exchange('P2P', type='topic', passive=True)
项目:Charlie    作者:nxintech    | 项目源码 | 文件源码
def get_consumers(self, consumer, channel):
        exchange = Exchange(exchange_name, type="topic", durable=True)
        queue = Queue(queue_name, exchange, routing_key=routing_key, durable=True)
        return [consumer(queue, callbacks=[self.on_message])]
项目:Sentry    作者:NetEaseGame    | 项目源码 | 文件源码
def create_partitioned_queues(name):
    exchange = Exchange(name, type='direct')
    for num in range(1):
        CELERY_QUEUES.append(Queue(
            '{0}-{1}'.format(name, num),
            exchange=exchange,
        ))
项目:idealoom    作者:conversence    | 项目源码 | 文件源码
def get_celery_queues():
    global _celery_queues
    if not _celery_queues:
        _celery_queues = [
            Queue(q, Exchange(q), routing_key=q)
            for q in ASSEMBL_CELERY_APPS]
    return _celery_queues
项目:isc    作者:and3rson    | 项目源码 | 文件源码
def _create_fanout_exchange(self, Consumer, channel):
        """
        Creates a fanout queue to accept notifications.
        """
        # declare_kwargs = dict(
        #     exchange='{}_fanout'.format(self.exchange)
        # )
        # if PIKA_VERSION >= StrictVersion('0.10.0'):
        #     kwarg = 'exchange_type'
        # else:
        #     kwarg = 'type'
        # declare_kwargs[kwarg] = 'fanout'
        exchange_name = '{}_fanout'.format(self.exchange)
        log.debug('Declaring fanout exchange %s', exchange_name)
        exchange = kombu.Exchange(
            name=exchange_name,
            channel=channel,
            durable=False,
            type='fanout'
        )
        exchange.declare()
        queue_name = 'fanout_callback_{}'.format(uuid.uuid4())
        log.debug('Declaring fanout queue %s', queue_name)
        queue = kombu.Queue(
            name=queue_name,
            exchange=exchange,
            exclusive=True,
            durable=False,
            channel=channel
        )
        queue.declare()
        consumer = Consumer(
            # self.connection,
            queues=[queue],
            on_message=self._on_broadcast,
            no_ack=True
            # no_ack=True
        )
        # consumer.consume(no_ack=True)
        # channel.exchange_declare(**declare_kwargs)
        # fanout_queue = channel.queue_declare(exclusive=True)
        # channel.queue_bind(exchange='{}_fanout'.format(self.exchange), queue=fanout_queue.method.queue)

        # channel.basic_consume(self._on_broadcast, queue=fanout_queue.method.queue, no_ack=True)
        return consumer
项目:commissaire-service    作者:projectatomic    | 项目源码 | 文件源码
def __init__(
            self, exchange_name, connection_url, qkwargs, config_file=None):
        """
        Initializes a new Service instance.

        :param exchange_name: Name of the topic exchange.
        :type exchange_name: str
        :param connection_url: Kombu connection url.
        :type connection_url: str
        :param qkwargs: One or more dicts keyword arguments for queue creation
        :type qkwargs: list
        :param config_file: Path to the configuration file location.
        :type config_file: str or None
        """
        name = self.__class__.__name__
        self.logger = logging.getLogger(name)
        self.logger.debug('Initializing {}'.format(name))

        # If we are given no default, use the global one
        # Read the configuration file
        self._config_data = read_config_file(
            config_file, self._default_config_file)

        if connection_url is None and 'bus_uri' in self._config_data:
            connection_url = self._config_data.get('bus_uri')
            self.logger.debug(
                'Using connection_url=%s from config file', connection_url)
        if exchange_name is None and 'exchange_name' in self._config_data:
            self.logger.debug(
                'Using exchange_name=%s from config file', exchange_name)
            exchange_name = self._config_data.get('bus_exchange')

        self.connection = Connection(connection_url)
        self._channel = self.connection.default_channel
        self._exchange = Exchange(
            exchange_name, type='topic').bind(self._channel)
        self._exchange.declare()

        # Set up queues
        self._queues = []
        for kwargs in qkwargs:
            queue = Queue(**kwargs)
            queue.exchange = self._exchange
            queue = queue.bind(self._channel)
            self._queues.append(queue)
            self.logger.debug(queue.as_dict())

        # Create producer for publishing on topics
        self.producer = Producer(self._channel, self._exchange)
        self.logger.debug('Initializing of {} finished'.format(name))
项目:invenio-stats    作者:inveniosoftware    | 项目源码 | 文件源码
def base_app():
    """Flask application fixture without InvenioStats."""
    from invenio_stats.config import STATS_EVENTS
    instance_path = tempfile.mkdtemp()
    app_ = Flask('testapp', instance_path=instance_path)
    stats_events = {'file-download': deepcopy(STATS_EVENTS['file-download'])}
    stats_events.update({'event_{}'.format(idx): {} for idx in range(5)})
    app_.config.update(dict(
        CELERY_ALWAYS_EAGER=True,
        CELERY_TASK_ALWAYS_EAGER=True,
        CELERY_CACHE_BACKEND='memory',
        CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
        CELERY_TASK_EAGER_PROPAGATES=True,
        CELERY_RESULT_BACKEND='cache',
        SQLALCHEMY_DATABASE_URI=os.environ.get(
            'SQLALCHEMY_DATABASE_URI', 'sqlite://'),
        SQLALCHEMY_TRACK_MODIFICATIONS=True,
        TESTING=True,
        OAUTH2SERVER_CLIENT_ID_SALT_LEN=64,
        OAUTH2SERVER_CLIENT_SECRET_SALT_LEN=60,
        OAUTH2SERVER_TOKEN_PERSONAL_SALT_LEN=60,
        STATS_MQ_EXCHANGE=Exchange(
            'test_events',
            type='direct',
            delivery_mode='transient',  # in-memory queue
            durable=True,
        ),
        SECRET_KEY='asecretkey',
        SERVER_NAME='localhost',
        STATS_QUERIES={'bucket-file-download-histogram': {},
                       'bucket-file-download-total': {},
                       'test-query': {},
                       'test-query2': {}},
        STATS_EVENTS=stats_events,
        STATS_AGGREGATIONS={'file-download-agg': {}}
    ))
    FlaskCeleryExt(app_)
    InvenioAccounts(app_)
    InvenioAccountsREST(app_)
    InvenioDB(app_)
    InvenioRecords(app_)
    InvenioFilesREST(app_)
    InvenioPIDStore(app_)
    InvenioQueues(app_)
    InvenioOAuth2Server(app_)
    InvenioOAuth2ServerREST(app_)
    InvenioSearch(app_, entry_point_group=None)
    with app_.app_context():
        yield app_
    shutil.rmtree(instance_path)
项目:EasyJobLite    作者:treebohotels    | 项目源码 | 文件源码
def setup_entities(self):
        """
        declare all required entities
        no advanced error handling yet (like error on declaration with altered properties etc)
        """
        # return if already inited
        if self._service_inited:
            return

        # setup exchange
        self._booking_exchange = Exchange(self._config.get_mq_config(constants.EXCHANGE),
                                          type='topic',
                                          durable=True)

        # setup durable queues
        self.work_queue = Queue(self._config.get_mq_config(constants.WORK_QUEUE),
                                exchange=self._booking_exchange,
                                routing_key=constants.WORK_QUEUE + ".#",
                                durable=True)

        self.retry_queue = Queue(self._config.get_mq_config(constants.RETRY_QUEUE),
                                 exchange=self._booking_exchange,
                                 routing_key=constants.RETRY_QUEUE + ".#",
                                 durable=True)

        self.dlq_queue = Queue(self._config.get_mq_config(constants.DEAD_LETTER_QUEUE),
                               exchange=self._booking_exchange,
                               routing_key=constants.DEAD_LETTER_QUEUE + ".#",
                               durable=True)

        # a buffer queue is needed by error-queue-consumer to temp-buffer msgs for processing
        # this is to handle retry loop which may cause between retry-queue and work-queue.
        # todo: Need to implement an alternive as this has a copy overhead
        #  which can be significant when the error queue is large
        self.buffer_queue = Queue(name=self._config.get_mq_config(constants.BUFFER_QUEUE),
                                  exchange=self._booking_exchange,
                                  routing_key='buffer.#',
                                  durable=True)

        # todo: do we need to make confirm_publish configurable?
        self._conn = Connection(self.get_config().rabbitmq_url,
                                transport_options={'confirm_publish': True})

        # declare all the exchanges and queues needed (declare, not overwrite existing)
        for entity in [self._booking_exchange, self.work_queue, self.retry_queue,
                       self.dlq_queue, self.buffer_queue]:
            entity.maybe_bind(self._conn)
            entity.declare()

        # setup producer to push to error and dlqs
        self._producer = Producer(channel=self._conn.channel(),
                                  exchange=self._booking_exchange)

        self._service_inited = True