我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用kafka.SimpleConsumer()。
def test_simple_consumer_leader_change(self): client = MagicMock() consumer = SimpleConsumer(client, group=None, topic='topic', partitions=[0, 1], auto_commit=False) # Mock so that only the first request gets a valid response def not_leader(request): return FetchResponsePayload(request.topic, request.partition, NotLeaderForPartitionError.errno, -1, ()) client.send_fetch_request.side_effect = self.fail_requests_factory(not_leader) # This should not raise an exception consumer.get_messages(20) # client should have updated metadata self.assertGreaterEqual(client.reset_topic_metadata.call_count, 1) self.assertGreaterEqual(client.load_metadata_for_topics.call_count, 1)
def test_simple_consumer_unknown_topic_partition(self): client = MagicMock() consumer = SimpleConsumer(client, group=None, topic='topic', partitions=[0, 1], auto_commit=False) # Mock so that only the first request gets a valid response def unknown_topic_partition(request): return FetchResponsePayload(request.topic, request.partition, UnknownTopicOrPartitionError.errno, -1, ()) client.send_fetch_request.side_effect = self.fail_requests_factory(unknown_topic_partition) # This should not raise an exception with self.assertRaises(UnknownTopicOrPartitionError): consumer.get_messages(20)
def test_simple_consumer_commit_does_not_raise(self): client = MagicMock() client.get_partition_ids_for_topic.return_value = [0, 1] def mock_offset_fetch_request(group, payloads, **kwargs): return [OffsetFetchResponsePayload(p.topic, p.partition, 0, b'', 0) for p in payloads] client.send_offset_fetch_request.side_effect = mock_offset_fetch_request def mock_offset_commit_request(group, payloads, **kwargs): raise FailedPayloadsError(payloads[0]) client.send_offset_commit_request.side_effect = mock_offset_commit_request consumer = SimpleConsumer(client, group='foobar', topic='topic', partitions=[0, 1], auto_commit=False) # Mock internal commit check consumer.count_since_commit = 10 # This should not raise an exception self.assertFalse(consumer.commit(partitions=[0, 1]))
def connect(self): """ Connect to kafka and create a consumer. It uses config parameters to create a kafka-python KafkaClient and SimpleConsumer. """ # Instantiate a kafka client connected to kafka. self.client = KafkaClient( self.config.broker_list, client_id=self.config.client_id ) # Create a kafka SimpleConsumer. self.kafka_consumer = SimpleConsumer( client=self.client, topic=self.topic, partitions=self.partitions, **self.config.get_simple_consumer_args() ) self.log.debug( "Connected to kafka. Topic %s, partitions %s, %s", self.topic, self.partitions, ','.join(['{0} {1}'.format(k, v) for k, v in six.iteritems(self.config.get_simple_consumer_args())]) ) self.kafka_consumer.provide_partition_info()
def setup_capture_new_messages_consumer(topic): """Seeks to the tail of the topic then returns a function that can consume messages from that point. """ kafka = KafkaClient(get_config().cluster_config.broker_list) group = str('data_pipeline_clientlib_test') consumer = SimpleConsumer(kafka, group, topic, max_buffer_size=_ONE_MEGABYTE) consumer.seek(0, 2) # seek to tail, 0 is the offset, and 2 is the tail yield consumer kafka.close()
def __init__(self, name, host='web14', port=51092, **kwargs): QueueBase.QueueBase.__init__(self, name, host, port) self.__queue = [] self.__kafka = KafkaClient('%s:%d' % (host, port)) self.__producer = SimpleProducer(self.__kafka, async=kwargs.get('async', False)) self.__producer.client.ensure_topic_exists(self.name) self.__consumer = SimpleConsumer(self.__kafka, self.name + '_consumer', self.name, auto_commit_every_n=1)
def test_non_integer_partitions(self): with self.assertRaises(AssertionError): SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])
def test_simple_consumer_failed_payloads(self): client = MagicMock() consumer = SimpleConsumer(client, group=None, topic='topic', partitions=[0, 1], auto_commit=False) def failed_payloads(payload): return FailedPayloadsError(payload) client.send_fetch_request.side_effect = self.fail_requests_factory(failed_payloads) # This should not raise an exception consumer.get_messages(5)
def test_switch_leader_simple_consumer(self): producer = Producer(self.client, async=False) consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10) self._send_random_messages(producer, self.topic, 0, 2) consumer.get_messages() self._kill_leader(self.topic, 0) consumer.get_messages()
def assert_message_count(self, topic, check_count, timeout=10, partitions=None, at_least=False): hosts = ','.join(['%s:%d' % (broker.host, broker.port) for broker in self.brokers]) client = SimpleClient(hosts, timeout=2) consumer = SimpleConsumer(client, None, topic, partitions=partitions, auto_commit=False, iter_timeout=timeout) started_at = time.time() pending = -1 while pending < check_count and (time.time() - started_at < timeout): try: pending = consumer.pending(partitions) except FailedPayloadsError: pass time.sleep(0.5) consumer.stop() client.close() if pending < check_count: self.fail('Too few pending messages: found %d, expected %d' % (pending, check_count)) elif pending > check_count and not at_least: self.fail('Too many pending messages: found %d, expected %d' % (pending, check_count)) return True
def get_consumer(containers, topic): kafka = containers.get_kafka_connection() group = str('replication_handler_itest') return SimpleConsumer(kafka, group, topic)
def get_message(self, block=True, timeout=0.1): """Get message from kafka. It supports the same arguments of get_message in kafka-python SimpleConsumer. :param block: If True, the API will block till at least a message is fetched. :type block: boolean :param timeout: If block is True, the function will block for the specified time (in seconds). If None, it will block forever. :returns: a Kafka message :rtype: Message namedtuple, which consists of: partition number, offset, key, and message value """ fetched_message = self.kafka_consumer.get_message(block, timeout) if fetched_message is None: # get message timed out returns None return None else: partition, kafka_message = fetched_message return Message( partition=partition, offset=kafka_message[0], key=kafka_message[1].key, value=kafka_message[1].value, )