我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用kafka.KafkaClient()。
def get_kafka_connection(cls, timeout_seconds=15): """Returns a kafka connection, waiting timeout_seconds for the container to come up. Args: timeout_seconds: Retry time (seconds) to get a kafka connection """ end_time = time.time() + timeout_seconds logger.info("Getting connection to Kafka container on yocalhost") while end_time > time.time(): try: return KafkaClient(get_config().cluster_config.broker_list) except KafkaUnavailableError: logger.info("Kafka not yet available, waiting...") time.sleep(0.1) raise KafkaUnavailableError()
def get_partition_leaders(cluster_config): """Return the current leaders of all partitions. Partitions are returned as a "topic-partition" string. :param cluster_config: the cluster :type cluster_config: kafka_utils.utils.config.ClusterConfig :returns: leaders for partitions :rtype: map of ("topic-partition", broker_id) pairs """ client = KafkaClient(cluster_config.broker_list) result = {} for topic, topic_data in six.iteritems(client.topic_partitions): for partition, p_data in six.iteritems(topic_data): topic_partition = topic + "-" + str(partition) result[topic_partition] = p_data.leader return result
def test_commit_message_zk(self, config): if getattr(KafkaClient, 'send_offset_commit_request_kafka', None) is None: return with mock_kafka() as (mock_client, mock_consumer): config._config['offset_storage'] = 'zookeeper' consumer = KafkaSimpleConsumer('test_topic', config) consumer.connect() actual = consumer.commit_message( Message(0, 100, 'mykey', 'myvalue'), ) assert actual is True mock_client.return_value.send_offset_commit_request \ .assert_called_once_with( 'test_group'.encode(), [OffsetCommitRequest('test_topic'.encode(), 0, 100, None)], )
def test_commit_message_kafka(self, config): if getattr(KafkaClient, 'send_offset_commit_request_kafka', None) is None: return with mock_kafka() as (mock_client, mock_consumer): config._config['offset_storage'] = 'kafka' consumer = KafkaSimpleConsumer('test_topic', config) consumer.connect() actual = consumer.commit_message( Message(0, 100, 'mykey', 'myvalue'), ) assert actual is True assert not mock_client.return_value.send_offset_commit_request.called mock_client.return_value.send_offset_commit_request_kafka \ .assert_called_once_with( 'test_group'.encode(), [OffsetCommitRequest('test_topic'.encode(), 0, 100, None)], )
def connect(self): """ Connect to kafka and create a consumer. It uses config parameters to create a kafka-python KafkaClient and SimpleConsumer. """ # Instantiate a kafka client connected to kafka. self.client = KafkaClient( self.config.broker_list, client_id=self.config.client_id ) # Create a kafka SimpleConsumer. self.kafka_consumer = SimpleConsumer( client=self.client, topic=self.topic, partitions=self.partitions, **self.config.get_simple_consumer_args() ) self.log.debug( "Connected to kafka. Topic %s, partitions %s, %s", self.topic, self.partitions, ','.join(['{0} {1}'.format(k, v) for k, v in six.iteritems(self.config.get_simple_consumer_args())]) ) self.kafka_consumer.provide_partition_info()
def discover_topics(cluster): """Get all the topics in a cluster :param cluster: config of the cluster to get topics from :type cluster: ClusterConfig :returns: a dict <topic>: <[partitions]> :raises DiscoveryError: upon failure to request topics from kafka """ client = KafkaClient(cluster.broker_list) try: topics = get_kafka_topics(client) return dict([(topic.decode(), partitions) for topic, partitions in six.iteritems(topics)]) except: log.exception( "Topics discovery failed for %s", cluster.broker_list ) raise DiscoveryError("Failed to get topics information from " "{cluster}".format(cluster=cluster))
def get_kafka_connection(cluster_type, client_id, **kwargs): """Get a kafka connection for the local region kafka cluster at Yelp. :param cluster_type: kafka cluster type (ex.'scribe' or 'standard'). :type cluster_type: string :param client_id: client_id to be used to connect to kafka. :type client_id: string :param kwargs: parameters to pass along when creating the KafkaClient instance. :returns: KafkaClient :raises DiscoveryError: :py:class:`yelp_kafka.error.DiscoveryError` upon failure connecting to a cluster. """ cluster = get_region_cluster(cluster_type, client_id) try: return KafkaClient(cluster.broker_list, client_id=client_id, **kwargs) except: log.exception( "Connection to kafka cluster %s using broker list %s failed", cluster.name, cluster.broker_list ) raise DiscoveryError("Failed to connect to cluster {0}".format( cluster.name))
def test_get_kafka_connection(containers): """ Asserts that the method returns a working kafka client connection. """ kafka_connection = containers.get_kafka_connection(timeout_seconds=1) assert isinstance(kafka_connection, KafkaClient)
def setup_capture_new_messages_consumer(topic): """Seeks to the tail of the topic then returns a function that can consume messages from that point. """ kafka = KafkaClient(get_config().cluster_config.broker_list) group = str('data_pipeline_clientlib_test') consumer = SimpleConsumer(kafka, group, topic, max_buffer_size=_ONE_MEGABYTE) consumer.seek(0, 2) # seek to tail, 0 is the offset, and 2 is the tail yield consumer kafka.close()
def kafka_client(self): """ Returns the `KafkaClient` object.""" return KafkaClient(self._region_cluster_config.broker_list)
def _configure_tools(self): load_default_config( self.options.config_file, self.options.env_config_file ) # We setup logging 'early' since we want it available for setup_topics self._setup_logging() self.kafka_client = KafkaClient(get_config().cluster_config.broker_list) self._setup_topics() if len(self.topic_to_offsets_map) == 0: self.option_parser.error("At least one topic must be specified.") if self.options.start_timestamp is not None and self.options.start_timestamp >= int(time.time()): self.option_parser.error("--start-timestamp should not be later than current time") if self.options.start_timestamp is not None and self.options.end_timestamp and ( self.options.start_timestamp > self.options.end_timestamp ): self.option_parser.error("--end-timestamp must not be smaller than --start-timestamp") if self.options.all_fields: self.options.fields = self._public_message_field_names self._verify_offset_ranges()
def __init__(self, producer_position_callback, dry_run=False): self.producer_position_callback = producer_position_callback self.dry_run = dry_run self.kafka_client = KafkaClient(get_config().cluster_config.broker_list) self.position_data_tracker = PositionDataTracker() self._reset_message_buffer() self.skip_messages_with_pii = get_config().skip_messages_with_pii self._publish_retry_policy = RetryPolicy( ExpBackoffPolicy(with_jitter=True), max_retry_count=get_config().producer_max_publish_retry_count ) self._automatic_flush_enabled = True
def __init__(self, api): self.api = api super(tweepy.StreamListener, self).__init__() client = KafkaClient("localhost:9092") self.producer = SimpleProducer(client, async = True, batch_send_every_n = 1000, batch_send_every_t = 10)
def __init__(self, broker): self.broker = broker self.client = KafkaClient(broker, timeout=3)
def _python_kafka_listtopics(self): """ Return list of the topics of the kafka cluster """ client = KafkaClient(hosts=self.brokers) topics = client.topics topic_list = list() for topic in topics: topic_list.append(topic) return topic_list
def __init__(self, name, host='web14', port=51092, **kwargs): QueueBase.QueueBase.__init__(self, name, host, port) self.__queue = [] self.__kafka = KafkaClient('%s:%d' % (host, port)) self.__producer = SimpleProducer(self.__kafka, async=kwargs.get('async', False)) self.__producer.client.ensure_topic_exists(self.name) self.__consumer = SimpleConsumer(self.__kafka, self.name + '_consumer', self.name, auto_commit_every_n=1)
def handler(self): """ ????Kafka??Topic???Partition??Logsize, ?Logsize??LevelDB ????Logsize??????retention_day????????? """ clusters = base.config["collector"]["clusters"] for cluster, metric in clusters.items(): client = KafkaClient(metric["brokers"], timeout=3) for topic in metric["topics"]: partitions = client.get_partition_ids_for_topic(topic) payload = [ OffsetRequestPayload(topic, p, -1, 1) for p in partitions ] logsize = { p.partition: p.offsets[0] for p in client.send_offset_request(payload) } if logsize: key = str(int(time.time())).encode("utf-8") value = json.dumps(logsize).encode("utf-8") db = base.init_leveldb(cluster=cluster, topic=topic) db.Put(key, value) deadline = base.config["collector"]["clusters"][cluster]["retention_hour"] * 3600 for key, _ in db.RangeIter(): if time.time() - int(key) > deadline: db.Delete(key) else: break client.close()
def make_kafka_producer(kafka_znode): kafka_brokers = get_kafka_brokers(kafka_znode) kafka_client = KafkaClient(kafka_brokers) return SimpleProducer( kafka_client, async=False, req_acks=1, random_start=True )
def test_simple_consumer(): topic = create_random_topic(1, 1) messages = [str(i).encode("UTF-8") for i in range(100)] cluster_config = ClusterConfig(None, None, [KAFKA_URL], ZOOKEEPER_URL) producer = YelpKafkaSimpleProducer( cluster_config=cluster_config, report_metrics=False, client=KafkaClient(KAFKA_URL), ) producer.send_messages(topic, *messages) config = KafkaConsumerConfig( 'test', cluster_config, auto_offset_reset='smallest', auto_commit=False, consumer_timeout_ms=1000 ) consumer = KafkaSimpleConsumer(topic, config) with consumer: for expected_offset in range(100): message = consumer.get_message() assert message.offset == expected_offset assert message.partition == 0 assert message.value == str(expected_offset).encode("UTF-8")
def mock_kafka(): with mock.patch('yelp_kafka.consumer.KafkaClient', autospec=True) as mock_client: with mock.patch('yelp_kafka.consumer.SimpleConsumer', autospec=True) as mock_consumer: mock_consumer.return_value.auto_commit = True yield mock_client, mock_consumer
def close(self): """Disconnect from kafka. If auto_commit is enabled commit offsets before disconnecting. """ if self.kafka_consumer.auto_commit is True: try: self.commit() except: self.log.exception("Commit error. " "Offsets may not have been committed") # Close all the connections to kafka brokers. KafkaClient open # connections to all the partition leaders. self.client.close()
def get_all_kafka_connections(cluster_type, client_id, **kwargs): """Get a kafka connection for each available kafka cluster at Yelp. :param cluster_type: kafka cluster type (ex.'scribe' or 'standard'). :type cluster_type: string :param client_id: client_id to be used to connect to kafka. :type client_id: string :param kwargs: parameters to pass along when creating the KafkaClient instance. :returns: list (cluster_name, KafkaClient) :raises DiscoveryError: :py:class:`yelp_kafka.error.DiscoveryError` upon failure connecting to a cluster. .. note:: This function creates a KafkaClient for each cluster in a region and tries to connect to it. If a cluster is not available it fails and closes all the previous connections. """ clusters = get_all_clusters(cluster_type, client_id) connected_clusters = [] for cluster in clusters: try: client = KafkaClient(cluster.broker_list, client_id=client_id, **kwargs) connected_clusters.append((cluster.name, client)) except: log.exception( "Connection to kafka cluster %s using broker list %s failed", cluster.name, cluster.broker_list ) for _, client in connected_clusters: client.close() raise DiscoveryError("Failed to connect to cluster {0}".format( cluster.name)) return connected_clusters
def __init__(self, settings): self.settings = settings self.client = KafkaClient(settings.get("KAFKA_HOSTS")) self.producer = SimpleProducer(self.client) self.producer.send_messages = failedpayloads_wrapper( settings.get("KAFKA_RETRY_TIME", 5))(self.producer.send_messages) super(KafkaHandler, self).__init__()
def run_kafka_consumer_group_test(num_consumers, num_partitions): topic = create_random_topic(1, num_partitions) cluster_config = ClusterConfig(None, None, [KAFKA_URL], ZOOKEEPER_URL) config = KafkaConsumerConfig( 'test', cluster_config, auto_offset_reset='smallest', partitioner_cooldown=5, auto_commit_interval_messages=1, ) queue = Queue() def create_consumer(): def consume(): consumer = KafkaConsumerGroup([topic], config) with consumer: while True: try: message = consumer.next() queue.put(message) consumer.task_done(message) except ConsumerTimeout: return p = Process(target=consume) p.daemon = True return p consumer_processes = [create_consumer() for _ in range(num_consumers)] for consumer_process in consumer_processes: consumer_process.start() producer = YelpKafkaSimpleProducer( cluster_config=cluster_config, report_metrics=False, client=KafkaClient(KAFKA_URL), ) for i in range(100): producer.send_messages(topic, str(i).encode("UTF-8")) # wait until all 100 messages have been consumed while queue.qsize() < 100: time.sleep(0.1) received_messages = [] while True: try: message = queue.get(block=True, timeout=0.5) except Empty: break received_messages.append(int(message.value)) assert [i for i in range(100)] == sorted(received_messages)