Python kafka 模块,KafkaClient() 实例源码

我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用kafka.KafkaClient()

项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def get_kafka_connection(cls, timeout_seconds=15):
        """Returns a kafka connection, waiting timeout_seconds for the container
        to come up.

        Args:
            timeout_seconds: Retry time (seconds) to get a kafka connection
        """
        end_time = time.time() + timeout_seconds
        logger.info("Getting connection to Kafka container on yocalhost")
        while end_time > time.time():
            try:
                return KafkaClient(get_config().cluster_config.broker_list)
            except KafkaUnavailableError:
                logger.info("Kafka not yet available, waiting...")
                time.sleep(0.1)
        raise KafkaUnavailableError()
项目:kafka-utils    作者:Yelp    | 项目源码 | 文件源码
def get_partition_leaders(cluster_config):
    """Return the current leaders of all partitions. Partitions are
    returned as a "topic-partition" string.

    :param cluster_config: the cluster
    :type cluster_config: kafka_utils.utils.config.ClusterConfig
    :returns: leaders for partitions
    :rtype: map of ("topic-partition", broker_id) pairs
    """
    client = KafkaClient(cluster_config.broker_list)
    result = {}
    for topic, topic_data in six.iteritems(client.topic_partitions):
        for partition, p_data in six.iteritems(topic_data):
            topic_partition = topic + "-" + str(partition)
            result[topic_partition] = p_data.leader
    return result
项目:yelp_kafka    作者:Yelp    | 项目源码 | 文件源码
def test_commit_message_zk(self, config):
        if getattr(KafkaClient, 'send_offset_commit_request_kafka', None) is None:
            return

        with mock_kafka() as (mock_client, mock_consumer):
            config._config['offset_storage'] = 'zookeeper'
            consumer = KafkaSimpleConsumer('test_topic', config)
            consumer.connect()

            actual = consumer.commit_message(
                Message(0, 100, 'mykey', 'myvalue'),
            )

            assert actual is True
            mock_client.return_value.send_offset_commit_request \
                .assert_called_once_with(
                    'test_group'.encode(),
                    [OffsetCommitRequest('test_topic'.encode(), 0, 100, None)],
                )
项目:yelp_kafka    作者:Yelp    | 项目源码 | 文件源码
def test_commit_message_kafka(self, config):
        if getattr(KafkaClient, 'send_offset_commit_request_kafka', None) is None:
            return

        with mock_kafka() as (mock_client, mock_consumer):
            config._config['offset_storage'] = 'kafka'
            consumer = KafkaSimpleConsumer('test_topic', config)
            consumer.connect()

            actual = consumer.commit_message(
                Message(0, 100, 'mykey', 'myvalue'),
            )

            assert actual is True
            assert not mock_client.return_value.send_offset_commit_request.called
            mock_client.return_value.send_offset_commit_request_kafka \
                .assert_called_once_with(
                    'test_group'.encode(),
                    [OffsetCommitRequest('test_topic'.encode(), 0, 100, None)],
                )
项目:yelp_kafka    作者:Yelp    | 项目源码 | 文件源码
def connect(self):
        """ Connect to kafka and create a consumer.
        It uses config parameters to create a kafka-python
        KafkaClient and SimpleConsumer.
        """
        # Instantiate a kafka client connected to kafka.
        self.client = KafkaClient(
            self.config.broker_list,
            client_id=self.config.client_id
        )

        # Create a kafka SimpleConsumer.
        self.kafka_consumer = SimpleConsumer(
            client=self.client, topic=self.topic, partitions=self.partitions,
            **self.config.get_simple_consumer_args()
        )
        self.log.debug(
            "Connected to kafka. Topic %s, partitions %s, %s",
            self.topic,
            self.partitions,
            ','.join(['{0} {1}'.format(k, v) for k, v in
                      six.iteritems(self.config.get_simple_consumer_args())])
        )
        self.kafka_consumer.provide_partition_info()
项目:yelp_kafka    作者:Yelp    | 项目源码 | 文件源码
def discover_topics(cluster):
    """Get all the topics in a cluster

    :param cluster: config of the cluster to get topics from
    :type cluster: ClusterConfig
    :returns: a dict <topic>: <[partitions]>
    :raises DiscoveryError: upon failure to request topics from kafka
    """
    client = KafkaClient(cluster.broker_list)
    try:
        topics = get_kafka_topics(client)
        return dict([(topic.decode(), partitions) for topic, partitions in six.iteritems(topics)])
    except:
        log.exception(
            "Topics discovery failed for %s",
            cluster.broker_list
        )
        raise DiscoveryError("Failed to get topics information from "
                             "{cluster}".format(cluster=cluster))
项目:yelp_kafka    作者:Yelp    | 项目源码 | 文件源码
def get_kafka_connection(cluster_type, client_id, **kwargs):
    """Get a kafka connection for the local region kafka cluster at Yelp.

    :param cluster_type: kafka cluster type (ex.'scribe' or 'standard').
    :type cluster_type: string
    :param client_id: client_id to be used to connect to kafka.
    :type client_id: string
    :param kwargs: parameters to pass along when creating the KafkaClient instance.
    :returns: KafkaClient
    :raises DiscoveryError: :py:class:`yelp_kafka.error.DiscoveryError` upon failure connecting to a cluster.
    """
    cluster = get_region_cluster(cluster_type, client_id)
    try:
        return KafkaClient(cluster.broker_list, client_id=client_id, **kwargs)
    except:
        log.exception(
            "Connection to kafka cluster %s using broker list %s failed",
            cluster.name,
            cluster.broker_list
        )
        raise DiscoveryError("Failed to connect to cluster {0}".format(
            cluster.name))
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_get_kafka_connection(containers):
    """
    Asserts that the method returns a working kafka client connection.
    """
    kafka_connection = containers.get_kafka_connection(timeout_seconds=1)
    assert isinstance(kafka_connection, KafkaClient)
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def setup_capture_new_messages_consumer(topic):
    """Seeks to the tail of the topic then returns a function that can
    consume messages from that point.
    """
    kafka = KafkaClient(get_config().cluster_config.broker_list)
    group = str('data_pipeline_clientlib_test')
    consumer = SimpleConsumer(kafka, group, topic, max_buffer_size=_ONE_MEGABYTE)
    consumer.seek(0, 2)  # seek to tail, 0 is the offset, and 2 is the tail

    yield consumer

    kafka.close()
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def kafka_client(self):
        """ Returns the `KafkaClient` object."""
        return KafkaClient(self._region_cluster_config.broker_list)
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def _configure_tools(self):
        load_default_config(
            self.options.config_file,
            self.options.env_config_file
        )

        # We setup logging 'early' since we want it available for setup_topics
        self._setup_logging()

        self.kafka_client = KafkaClient(get_config().cluster_config.broker_list)

        self._setup_topics()
        if len(self.topic_to_offsets_map) == 0:
            self.option_parser.error("At least one topic must be specified.")

        if self.options.start_timestamp is not None and self.options.start_timestamp >= int(time.time()):
            self.option_parser.error("--start-timestamp should not be later than current time")

        if self.options.start_timestamp is not None and self.options.end_timestamp and (
            self.options.start_timestamp > self.options.end_timestamp
        ):
            self.option_parser.error("--end-timestamp must not be smaller than --start-timestamp")

        if self.options.all_fields:
            self.options.fields = self._public_message_field_names

        self._verify_offset_ranges()
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def __init__(self, producer_position_callback, dry_run=False):
        self.producer_position_callback = producer_position_callback
        self.dry_run = dry_run
        self.kafka_client = KafkaClient(get_config().cluster_config.broker_list)
        self.position_data_tracker = PositionDataTracker()
        self._reset_message_buffer()
        self.skip_messages_with_pii = get_config().skip_messages_with_pii
        self._publish_retry_policy = RetryPolicy(
            ExpBackoffPolicy(with_jitter=True),
            max_retry_count=get_config().producer_max_publish_retry_count
        )
        self._automatic_flush_enabled = True
项目:Twitter-and-IMDB-Sentimental-Analytics    作者:abhinandanramesh    | 项目源码 | 文件源码
def __init__(self, api):
        self.api = api
        super(tweepy.StreamListener, self).__init__()
        client = KafkaClient("localhost:9092")
        self.producer = SimpleProducer(client, async = True,
                          batch_send_every_n = 1000,
                          batch_send_every_t = 10)
项目:kzmonitor    作者:tqlihuiqi    | 项目源码 | 文件源码
def __init__(self, broker):
        self.broker = broker
        self.client = KafkaClient(broker, timeout=3)
项目:ALBATROSS    作者:KVSDURGASURESH    | 项目源码 | 文件源码
def _python_kafka_listtopics(self):
        """
            Return list of the topics of the kafka cluster
        """
        client = KafkaClient(hosts=self.brokers)
        topics = client.topics
        topic_list = list()
        for topic in topics:
            topic_list.append(topic)
        return topic_list
项目:jtyd_python_spider    作者:xtuyaowu    | 项目源码 | 文件源码
def __init__(self, name, host='web14', port=51092, **kwargs):
        QueueBase.QueueBase.__init__(self, name, host, port)
        self.__queue = []
        self.__kafka = KafkaClient('%s:%d' % (host, port))
        self.__producer = SimpleProducer(self.__kafka, async=kwargs.get('async', False))
        self.__producer.client.ensure_topic_exists(self.name)
        self.__consumer = SimpleConsumer(self.__kafka, self.name + '_consumer', self.name, auto_commit_every_n=1)
项目:Twitter-Sentiment-Analysis-Using-Spark-Streaming-And-Kafka    作者:sridharswamy    | 项目源码 | 文件源码
def __init__(self, api):
        self.api = api
        super(tweepy.StreamListener, self).__init__()
        client = KafkaClient("localhost:9092")
        self.producer = SimpleProducer(client, async = True,
                          batch_send_every_n = 1000,
                          batch_send_every_t = 10)
项目:kafka_extract    作者:tqlihuiqi    | 项目源码 | 文件源码
def handler(self):
        """ ????Kafka??Topic???Partition??Logsize, ?Logsize??LevelDB
            ????Logsize??????retention_day?????????
        """

        clusters = base.config["collector"]["clusters"]

        for cluster, metric in clusters.items():
            client = KafkaClient(metric["brokers"], timeout=3)

            for topic in metric["topics"]:
                partitions = client.get_partition_ids_for_topic(topic)
                payload = [ OffsetRequestPayload(topic, p, -1, 1) for p in partitions ]
                logsize = { p.partition: p.offsets[0] for p in client.send_offset_request(payload) }

                if logsize:
                    key = str(int(time.time())).encode("utf-8")
                    value = json.dumps(logsize).encode("utf-8")

                    db = base.init_leveldb(cluster=cluster, topic=topic)
                    db.Put(key, value)
                    deadline = base.config["collector"]["clusters"][cluster]["retention_hour"] * 3600

                    for key, _ in db.RangeIter():
                        if time.time() - int(key) > deadline:
                            db.Delete(key)
                        else:
                            break

            client.close()
项目:tsd-helpers    作者:robinhood    | 项目源码 | 文件源码
def make_kafka_producer(kafka_znode):
    kafka_brokers = get_kafka_brokers(kafka_znode)
    kafka_client = KafkaClient(kafka_brokers)
    return SimpleProducer(
        kafka_client,
        async=False,
        req_acks=1,
        random_start=True
    )
项目:yelp_kafka    作者:Yelp    | 项目源码 | 文件源码
def test_simple_consumer():
    topic = create_random_topic(1, 1)

    messages = [str(i).encode("UTF-8") for i in range(100)]

    cluster_config = ClusterConfig(None, None, [KAFKA_URL], ZOOKEEPER_URL)
    producer = YelpKafkaSimpleProducer(
        cluster_config=cluster_config,
        report_metrics=False,
        client=KafkaClient(KAFKA_URL),
    )
    producer.send_messages(topic, *messages)

    config = KafkaConsumerConfig(
        'test',
        cluster_config,
        auto_offset_reset='smallest',
        auto_commit=False,
        consumer_timeout_ms=1000
    )
    consumer = KafkaSimpleConsumer(topic, config)

    with consumer:
        for expected_offset in range(100):
            message = consumer.get_message()
            assert message.offset == expected_offset
            assert message.partition == 0
            assert message.value == str(expected_offset).encode("UTF-8")
项目:yelp_kafka    作者:Yelp    | 项目源码 | 文件源码
def mock_kafka():
    with mock.patch('yelp_kafka.consumer.KafkaClient', autospec=True) as mock_client:
        with mock.patch('yelp_kafka.consumer.SimpleConsumer', autospec=True) as mock_consumer:
            mock_consumer.return_value.auto_commit = True
            yield mock_client, mock_consumer
项目:yelp_kafka    作者:Yelp    | 项目源码 | 文件源码
def close(self):
        """Disconnect from kafka.
        If auto_commit is enabled commit offsets before disconnecting.
        """
        if self.kafka_consumer.auto_commit is True:
            try:
                self.commit()
            except:
                self.log.exception("Commit error. "
                                   "Offsets may not have been committed")
        # Close all the connections to kafka brokers. KafkaClient open
        # connections to all the partition leaders.
        self.client.close()
项目:yelp_kafka    作者:Yelp    | 项目源码 | 文件源码
def get_all_kafka_connections(cluster_type, client_id, **kwargs):
    """Get a kafka connection for each available kafka cluster at Yelp.

    :param cluster_type: kafka cluster type (ex.'scribe' or 'standard').
    :type cluster_type: string
    :param client_id: client_id to be used to connect to kafka.
    :type client_id: string
    :param kwargs: parameters to pass along when creating the KafkaClient instance.
    :returns: list (cluster_name, KafkaClient)
    :raises DiscoveryError: :py:class:`yelp_kafka.error.DiscoveryError` upon failure connecting to a cluster.

    .. note:: This function creates a KafkaClient for each cluster in a region and tries to connect to it. If a cluster is not available it fails and closes all the previous connections.
    """

    clusters = get_all_clusters(cluster_type, client_id)
    connected_clusters = []
    for cluster in clusters:
        try:
            client = KafkaClient(cluster.broker_list, client_id=client_id, **kwargs)
            connected_clusters.append((cluster.name, client))
        except:
            log.exception(
                "Connection to kafka cluster %s using broker list %s failed",
                cluster.name,
                cluster.broker_list
            )
            for _, client in connected_clusters:
                client.close()
            raise DiscoveryError("Failed to connect to cluster {0}".format(
                cluster.name))
    return connected_clusters
项目:log_to_kafka    作者:ShichaoMa    | 项目源码 | 文件源码
def __init__(self, settings):
        self.settings = settings
        self.client = KafkaClient(settings.get("KAFKA_HOSTS"))
        self.producer = SimpleProducer(self.client)
        self.producer.send_messages = failedpayloads_wrapper(
            settings.get("KAFKA_RETRY_TIME", 5))(self.producer.send_messages)
        super(KafkaHandler, self).__init__()
项目:yelp_kafka    作者:Yelp    | 项目源码 | 文件源码
def run_kafka_consumer_group_test(num_consumers, num_partitions):
    topic = create_random_topic(1, num_partitions)
    cluster_config = ClusterConfig(None, None, [KAFKA_URL], ZOOKEEPER_URL)
    config = KafkaConsumerConfig(
        'test',
        cluster_config,
        auto_offset_reset='smallest',
        partitioner_cooldown=5,
        auto_commit_interval_messages=1,
    )

    queue = Queue()

    def create_consumer():
        def consume():
            consumer = KafkaConsumerGroup([topic], config)
            with consumer:
                while True:
                    try:
                        message = consumer.next()
                        queue.put(message)
                        consumer.task_done(message)
                    except ConsumerTimeout:
                        return

        p = Process(target=consume)
        p.daemon = True
        return p

    consumer_processes = [create_consumer() for _ in range(num_consumers)]

    for consumer_process in consumer_processes:
        consumer_process.start()

    producer = YelpKafkaSimpleProducer(
        cluster_config=cluster_config,
        report_metrics=False,
        client=KafkaClient(KAFKA_URL),
    )
    for i in range(100):
        producer.send_messages(topic, str(i).encode("UTF-8"))

    # wait until all 100 messages have been consumed
    while queue.qsize() < 100:
        time.sleep(0.1)

    received_messages = []
    while True:
        try:
            message = queue.get(block=True, timeout=0.5)
        except Empty:
            break
        received_messages.append(int(message.value))

    assert [i for i in range(100)] == sorted(received_messages)