我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用kombu.Connection()。
def publisher(nameko_config, **kwargs): """ Return a function that sends AMQP messages. """ def publish(payload, routing_key, exchange=None): """ Dispatch a message with `payload` """ conn = Connection(nameko_config[AMQP_URI_CONFIG_KEY]) with connections[conn].acquire(block=True) as connection: if exchange is not None: # pragma: no cover exchange.maybe_bind(connection) with producers[conn].acquire(block=True) as producer: producer.publish( payload, routing_key=routing_key, exchange=exchange, **kwargs ) return publish
def publish_message(rabbit_config): def publish( exchange, payload, routing_key=None, serializer="json", **kwargs ): conn = Connection(rabbit_config[AMQP_URI_CONFIG_KEY]) with connections[conn].acquire(block=True) as connection: exchange.maybe_bind(connection) with producers[conn].acquire(block=True) as producer: producer.publish( payload, exchange=exchange, routing_key=routing_key, serializer=serializer, **kwargs ) return publish
def deadlettering_exchange(self, rabbit_config, exchange, queue): conn = Connection(rabbit_config[AMQP_URI_CONFIG_KEY]) with connections[conn].acquire(block=True) as connection: deadletter_exchange = Exchange(name="deadletter", type="topic") deadletter_exchange.maybe_bind(connection) deadletter_exchange.declare() deadletter_queue = Queue( name="deadletter", exchange=deadletter_exchange, routing_key="#", queue_arguments={ 'x-dead-letter-exchange': exchange.name } ) deadletter_queue.maybe_bind(connection) deadletter_queue.declare() return deadletter_exchange
def _get_connection(self, connection): """Create connection strategy :param connection: connection for broker :type connection: str, None, kombu.connections.Connection, dict :return: instance of kombu.connections.Connection :rtype: Connection """ if not connection: connection = self.default_connection # pragma: no cover if isinstance(connection, str): connection = {'hostname': connection} if isinstance(connection, dict): connection = Connection(**connection) return connection
def _get_connection(self, connection): """Create connection strategy :param connection: connection for broker :type connection: str, None, kombu.connections.Connection, dict :return: instance of kombu.connections.Connection :rtype: Connection """ if not connection: connection = self.connection # pragma: no cover if isinstance(connection, str): connection = {'hostname': connection} if isinstance(connection, dict): connection = Connection(**connection) return connection
def _connect(self): self._conn = kombu.Connection( self._hostname, connect_timeout=self._connect_timeout ) self._channel = self._conn.channel() self._exchange = kombu.Exchange( name=self._exchange_name, channel=self._channel, durable=False ) self._callback_queue = self._create_callback_queue( self._channel, self._exchange )
def __init__(self, name, url="amqp://", maxsize=0, lazy_limit=True): """ Constructor for KombuQueue url: http://kombu.readthedocs.org/en/latest/userguide/connections.html#urls maxsize: an integer that sets the upperbound limit on the number of items that can be placed in the queue. """ self.name = name self.conn = Connection(url) self.queue = self.conn.SimpleQueue(self.name, no_ack=True, serializer='umsgpack') self.maxsize = maxsize self.lazy_limit = lazy_limit if self.lazy_limit and self.maxsize: self.qsize_diff_limit = int(self.maxsize * 0.1) else: self.qsize_diff_limit = 0 self.qsize_diff = 0
def on_consume_ready( self, connection, channel, consumers): # pragma: no cover """ Called when the service is ready to consume messages. :param connection: The current connection instance. :type connection: kombu.Connection :param channel: The current channel. :type channel: kombu.transport.*.Channel :param consumers: A list of consumers. :type consumers: list """ self.logger.info('Ready to consume') if self.logger.level == logging.DEBUG: queue_names = [] for consumer in consumers: queue_names += [x.name for x in consumer.queues] self.logger.debug( 'Consuming via connection "{}" and channel "{}" on ' 'the following queues: "{}"'.format( connection.as_uri(), channel, '", "'.join(queue_names)))
def consume(self): def worker(event, message): page_id = event['recipient']['id'] bot_class, bot_args = self.get_bot(page_id) p = Process(target=spawn_bot_amqp, args=(bot_class, bot_args, self.transport, self.send_exchange, self.send_queue, event, message)) p.start() def stop_worker(signum, frame): p.terminate() p.join() signal.signal(signal.SIGTERM, stop_worker) exchange = Exchange(self.exchange, 'direct', durable=True) queue = Queue(self.queue, exchange=exchange, routing_key=self.queue) with Connection(self.transport) as conn: with conn.Consumer(queue, callbacks=[worker]) as consumer: while True: conn.drain_events()
def sender(self): def worker(event, message): p = Process(target=spawn_send_message_worker, args=(event, message)) p.start() def stop_worker(signum, frame): p.terminate() p.join() signal.signal(signal.SIGTERM, stop_worker) exchange = Exchange(self.send_exchange, 'direct', durable=True) queue = Queue(self.send_queue, exchange=exchange, routing_key=self.send_queue) with Connection(self.send_transport) as conn: with conn.Consumer(queue, callbacks=[worker]) as consumer: while True: conn.drain_events()
def publish(config): conn = Connection(config[AMQP_URI_CONFIG_KEY]) def publish(payload, routing_key, exchange=None, **kwargs): """Publish an AMQP message.""" with kombu_connections[conn].acquire(block=True) as connection: if exchange is not None: exchange.maybe_bind(connection) with producers[conn].acquire(block=True) as producer: producer.publish( payload, exchange=exchange, serializer='json', routing_key=routing_key, **kwargs) return publish
def connect(self): if not self.connection: self.connection = Connection( hostname=self.settings.host, port=self.settings.port, userid=self.settings.user, password=self.settings.password, virtual_host=self.settings.vhost, ssl=self.settings.ssl )
def republish(self, backoff_exc, message, target_queue): expiration = backoff_exc.next(message, self.exchange.name) queue = self.make_queue(expiration) # republish to appropriate backoff queue amqp_uri = self.container.config[AMQP_URI_CONFIG_KEY] with get_producer(amqp_uri) as producer: properties = message.properties.copy() headers = properties.pop('application_headers') headers['backoff'] = expiration expiration_seconds = float(expiration) / 1000 # force redeclaration; the publisher will skip declaration if # the entity has previously been declared by the same connection conn = Connection(amqp_uri) maybe_declare(queue, conn, retry=True, **DEFAULT_RETRY_POLICY) producer.publish( message.body, headers=headers, exchange=self.exchange, routing_key=target_queue, expiration=expiration_seconds, retry=True, retry_policy=DEFAULT_RETRY_POLICY, declare=[queue.exchange, queue], **properties )
def _create_connection(): return Connection(hostname="mq", userid=tests.mq_username, password=tests.mq_password)
def __init__(self, ip, port, user='rabbitmq', password='password'): c = kombu.Connection("amqp://{0}:{1}@{2}:{3}//".format(user, password, ip, port)) c.connect() self.ch = c.channel()
def __init__(self, connection='amqp:///', name=None, logger=None, limit=None): """Initialization of Client instance :param connection: connection for broker :type connection: str, None, kombu.connections.Connection, dict """ self.connection = self._get_connection(connection) self.exchanges = {} if name is None: try: name = '<client: {}>'.format(self.connection.as_uri()) except: # pragma: no cover # Errors with filesystem transport name = '<client: {}>'.format(self.connection.transport_cls) if logger is None: logger = get_logger(__name__) self.logger = InstanceLogger(self, logger) self.name = name self.logger.debug('%s built', self.name) if limit is None: # Set limit as global kombu limit. limit = pools.get_limit() self.limit = limit self.connections = pools.Connections(self.limit)
def __init__(self, connection='amqp:///', logger=None, timeout=10, name=None): """Initialization :param connection: connection for queues broker :type connection: str, None, dict or Connection :param logger: logging instance :type logger: Logger :param timeout: sleeping for loop, default = 0.1 :type timeout: None, int or float """ if logger is None: logger = _logger self.logger = InstanceLogger(self, logger) self.connection = self._get_connection(connection) self.timeout = timeout self.consumers = [] if name is None: try: name = '<microservice: {}>'.format(self.connection.as_uri()) except: # pragma no cover name = '<microservice: {}>'.format( self.connection.transport_cls) # pragma: no cover self.name = name self._stop = False self._stopped = False
def drain_events(self, infinity=True): with nested(*self.consumers): while not self._stop: try: self.connection.drain_events(timeout=self.timeout) except socket.timeout: if not infinity: break except Exception as e: if not self.connection.connected and not self._stop: self.logger.error( 'Connection to mq has broken off. Try to reconnect') self.connect() self.revive() break elif not self._stop and not isinstance(e, HandlerError): self.logger.exception( 'Something wrong! Try to restart the loop') self.revive() break elif isinstance(e, HandlerError): pass else: # pragma: no cover self.logger.exception('Unknown error') # pragma: no cover if self._stop: self._stopped = True self.logger.info('Stopped draining events.')
def _connection(self): return kombu.Connection(self.url)
def onconnection_revived(self): # pragma: no cover """ Called when a reconnection occurs. """ self.logger.info('Connection (re)established')
def on_consume_end(self, connection, channel): # pragma: no cover """ Called when the service stops consuming. :param connection: The current connection instance. :type connection: kombu.Connection :param channel: The current channel. :type channel: kombu.transport.*.Channel """ self.logger.warn('Consuming has ended')
def __init__(self, aws_connection, transport='amqp'): self.ec2_conn = aws_connection self.broker_uri = \ "{transport}://{username}:{password}@{rabbit_host}:{rabbit_port}"\ .format(transport=transport, username=CONF.rabbit_userid, password=CONF.rabbit_password, rabbit_host=CONF.rabbit_host, rabbit_port=CONF.rabbit_port) self.connection = Connection(self.broker_uri)
def sync_get(name, interval=0.5): with Connection(BROKER_URI) as conn: publish(conn, name) while 1: rs = r.get(name) if rs and Backend.from_json(cPickle.loads(rs)).status == SUCCESS: break time.sleep(interval) item = Backend.get(name) return item.result
def main(): disconnect() connect('zhihulive') with Connection(BROKER_URI) as conn: consumer(conn, [process_task])
def action_proc_remove(config): log.warning(" - Trying to connect with server...") url = '%s://%s' % (config.broker_type, config.target) with Connection(url) as conn: in_queue = conn.SimpleQueue('celery') # Get remote process for _ in get_remote_messages(config, in_queue, False): pass log.error(" - All tasks removed from '%s'" % config.target)
def _make_publisher(self): bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**self.config) bus_connection = Connection(bus_url) bus_exchange = Exchange(self.config['exchange_name'], type=self.config['exchange_type']) bus_producer = Producer(bus_connection, exchange=bus_exchange, auto_declare=True) bus_marshaler = Marshaler(self._uuid) return Publisher(bus_producer, bus_marshaler)
def run(self): logger.info("Running AMQP consumer") with Connection(self._bus_url) as connection: self.connection = connection super(CoreBusConsumer, self).run()
def _make_publisher(self): bus_url = 'amqp://{username}:{password}@{host}:{port}//'.format(**self.config) bus_connection = Connection(bus_url) same_exchange_arguments_as_collectd = {'arguments': {'auto_delete': True}, 'durable': False} bus_exchange = Exchange(self.config['exchange_name'], type=self.config['exchange_type'], **same_exchange_arguments_as_collectd) bus_producer = Producer(bus_connection, exchange=bus_exchange, auto_declare=True) bus_marshaler = CollectdMarshaler(self._uuid) return Publisher(bus_producer, bus_marshaler)
def listen_events(self, routing_key, exchange=BUS_EXCHANGE_XIVO): with Connection(self._url) as conn: queue = Queue(BUS_QUEUE_NAME, exchange=exchange, routing_key=routing_key, channel=conn.channel()) queue.declare() queue.purge() self.bus_queue = queue
def _drain_events(self, on_event): if not hasattr(self, 'bus_queue'): raise Exception('You must listen for events before consuming them') with Connection(self._url) as conn: with Consumer(conn, self.bus_queue, callbacks=[on_event]): try: while True: conn.drain_events(timeout=0.5) except TimeoutError: pass
def __init__(self, hosts_conf, exchange_name='', exchange_type='', exchange_arguments=None, queue_name='', routing_key='', queue_arguments=None, callback=None, no_ack=True): self.hosts_conf = hosts_conf self.hosts = self.create_hosts() self.connection = Connection(self.hosts) self.task_exchange = Exchange(name=exchange_name, type=exchange_type, arguments=exchange_arguments) self.task_queues = [Queue(name=queue_name, exchange=self.task_exchange, routing_key=routing_key, queue_arguments=queue_arguments)] self.callback = callback self.no_ack = no_ack
def queue_send(self, recipient, message=None, sender_action=None): exchange = Exchange(self.send_exchange, 'direct', durable=True) queue = Queue(self.send_queue, exchange=exchange, routing_key=self.send_queue) with Connection(self.send_transport) as conn: producer = conn.Producer(serializer='json') event = { 'recipient': recipient, 'message': message, 'sender_action': sender_action, 'page_access_token': self.page_access_token } producer.publish(event, exchange=exchange, routing_key=queue.routing_key, declare=[queue])
def queue_events(self, events): exchange = Exchange(self.exchange, 'direct', durable=True) queue = Queue(self.queue, exchange=exchange, routing_key=self.queue) with Connection(self.transport) as conn: producer = conn.Producer(serializer='json') for event in events: producer.publish(event, exchange=exchange, routing_key=queue.routing_key, declare=[queue])
def _listen(self): reader_conn = kombu.Connection(self.url) reader_queue = self._queue(reader_conn) with reader_conn.SimpleQueue(reader_queue) as queue: while True: message = queue.get(block=True) message.ack() yield message.payload
def taste_soup(queue, broker_url): try: with Connection(broker_url) as conn: q = conn.SimpleQueue(queue) return q.qsize() except Exception as e: return 0
def connect(self): """ 'Connects' to the bus. :returns: The same instance. :rtype: commissaire_http.bus.Bus """ if self.connection is not None: self.logger.warn('Bus already connected.') return self self.connection = Connection(self.connection_url) self._channel = self.connection.channel() self._exchange = Exchange( self.exchange_name, type='topic').bind(self._channel) self._exchange.declare() # Create queues self._queues = [] for kwargs in self.qkwargs: queue = Queue(**kwargs) queue.exchange = self._exchange queue = queue.bind(self._channel) self._queues.append(queue) self.logger.debug('Created queue %s', queue.as_dict()) # Create producer for publishing on topics self.producer = Producer(self._channel, self._exchange) self.logger.debug('Bus connection finished') return self
def __init__(self): self._controller = Controller(DatabaseAdapter()) _connection = Connection(config.rabbitmq_url(), heartbeat=540) retry_adapter = RetryAdapter(_connection) self._busAdapter = BusAdapter(self._controller, _connection, retry_adapter)
def setup_connection_mock(self): mocks.Transport.recoverable_connection_errors = pyamqp.Transport.recoverable_connection_errors self.connection = flexmock(Connection(transport=mocks.Transport)) self.channel_mock = flexmock(self.connection.default_channel) self.connection.should_receive('channel').and_return(self.channel_mock)
def test_declare_retry_exchanges_retries_if_it_fails(self): connection = flexmock(Connection(transport=mocks.Transport)) connection.should_receive('_establish_connection').times(3)\ .and_raise(IOError)\ .and_raise(IOError)\ .and_return(connection.transport.establish_connection()) self.retry_adapter = RetryAdapter(connection)
def __init__(self, exchange_name, broker_url, mode=ASYNC): """?????? Args: exchange_name (string): ???? broker_url (string): ???? mode (int): ?? """ self.exchange_name = exchange_name self.broker_url = broker_url self.mode = mode self.exchange = Exchange(exchange_name, type='direct') self.connection = Connection(broker_url)
def __init__(self, broker_url, queue_name, fetch_count=10): """?????? Args: broker_url (string): broker?? queue_name (string): ??????? fetch_count (int): ??????? """ self.queue_name = queue_name self.broker_url = broker_url self.fetch_count = fetch_count self.connection = Connection(broker_url) self.queue = Queue(queue_name)
def start_connection(self): """ reset the connection to rabbit mq :return: """ logger = logging.getLogger(self.__class__.__name__) logger.info("starting new rabbit mq connection") # todo: do we need to make confirm_publish configurable? self._conn = Connection(self.get_config().rabbitmq_url, transport_options={'confirm_publish': True}) # setup producer to push to error and dlqs self._producer = Producer(channel=self._conn.channel(), exchange=self._orchestrator.get_exchange())
def main(): parser = cli_parser() opts, _ = parser.parse_args(sys.argv) if not opts.password: logger.error('Password required, see help (-h)') sys.exit(-1) if not opts.domain: logger.error('yourdomain.efflux.io required, see help (-h)') sys.exit(-1) if not opts.token: logger.error('API token required, see help (-h)') sys.exit(-1) auth = 'amqp://{}:{}@{}:{}'.format( opts.username, opts.password, opts.host, opts.port ) if opts.mode == 'json': efflux = CBEventHandler(opts.domain, opts.token) events = [ 'watchlist.hit.process' ] elif opts.mode == 'protobuf': events = [ 'ingress.event.netconn', 'ingress.event.procstart' ] if opts.output == 'api': efflux = CBProtobufHandler(opts.domain, opts.token, mode='post') elif opts.output == 'file': efflux = CBProtobufHandler(opts.domain, opts.token, mode='file') efflux.set_logfile(path='/Users/jtm/telemetry/cb.log') with(Connection(auth)) as connection: CarbonBlackConsumer( connection, efflux.handle_event, events=events).run()
def __init__( self, exchange_name, connection_url, qkwargs, config_file=None): """ Initializes a new Service instance. :param exchange_name: Name of the topic exchange. :type exchange_name: str :param connection_url: Kombu connection url. :type connection_url: str :param qkwargs: One or more dicts keyword arguments for queue creation :type qkwargs: list :param config_file: Path to the configuration file location. :type config_file: str or None """ name = self.__class__.__name__ self.logger = logging.getLogger(name) self.logger.debug('Initializing {}'.format(name)) # If we are given no default, use the global one # Read the configuration file self._config_data = read_config_file( config_file, self._default_config_file) if connection_url is None and 'bus_uri' in self._config_data: connection_url = self._config_data.get('bus_uri') self.logger.debug( 'Using connection_url=%s from config file', connection_url) if exchange_name is None and 'exchange_name' in self._config_data: self.logger.debug( 'Using exchange_name=%s from config file', exchange_name) exchange_name = self._config_data.get('bus_exchange') self.connection = Connection(connection_url) self._channel = self.connection.default_channel self._exchange = Exchange( exchange_name, type='topic').bind(self._channel) self._exchange.declare() # Set up queues self._queues = [] for kwargs in qkwargs: queue = Queue(**kwargs) queue.exchange = self._exchange queue = queue.bind(self._channel) self._queues.append(queue) self.logger.debug(queue.as_dict()) # Create producer for publishing on topics self.producer = Producer(self._channel, self._exchange) self.logger.debug('Initializing of {} finished'.format(name))
def action_task_inject_process(config): if config.function_files is None: log.error(" - input .json file with process files is needed") return # -------------------------------------------------------------------------- # Load process information # -------------------------------------------------------------------------- with open(config.function_files, "r") as f: f_info = json.load(f) log.error(" - Building process...") # Search and inject process injections = [] for p in f_info: parameters = OrderedDict({x["param_position"]: x["param_value"] for x in p['parameters']}) # -------------------------------------------------------------------------- # Fill process information # -------------------------------------------------------------------------- inject_process = { "args": [y for x, y in six.iteritems(parameters)], "callbacks": None, "chord": None, "errbacks": None, "eta": None, "expires": None, "id": uuid.uuid1(), "kwargs": {}, "retries": 0, "task": p["function"], "taskset": None, "timelimit": [ None, None ], "utc": True } injections.append(inject_process) # -------------------------------------------------------------------------- # Re-inject messages # -------------------------------------------------------------------------- log.warning(" - Trying to connect with server...") url = '%s://%s' % (config.broker_type, config.target) with Connection(url) as conn: in_queue = conn.SimpleQueue('celery') log.error(" - Sending processes to '%s'" % config.target) for i, e in enumerate(injections, 1): log.warning(" %s) %s" % (i, e['task'])) # pass in_queue.put(e, serializer="pickle")
def action_proc_list_tasks(config): log.warning(" - Trying to connect with server...") url = '%s://%s' % (config.broker_type, config.target) with Connection(url) as conn: in_queue = conn.SimpleQueue('celery') process_info = {} # Get remote process first_msg = True while 1: for remote_process, remote_args, _ in list_remote_process(config, in_queue): if remote_process not in process_info: process_info[remote_process] = remote_args if config.no_stream is False and not process_info: if first_msg is True: log.error(" -> Not messages found. Waiting ...") first_msg = False sleep(0.1) else: break # -------------------------------------------------------------------------- # Try to identify parameters types # -------------------------------------------------------------------------- # Display info log.error(" - Remote process found:") for p, v in six.iteritems(process_info): log.error(" -> %s (%s)" % ( p, ", ".join("param_%s:%s" % (i, get_param_type(x)) for i, x in enumerate(v)) )) # Export to template enabled? if config.template is not None: log.warning(" - Building template...") export_data = export_process(process_info, config) # -------------------------------------------------------------------------- # Save template # -------------------------------------------------------------------------- # Build path in current dir export_path = os.path.abspath(config.template) if ".json" not in export_path: export_path += ".json" # dumps json.dump(export_data, open(export_path, "w")) log.error(" - Template saved at: '%s'" % export_path)