我有一个简单的Java生产者,如下所示
public class Producer { private final static String TOPIC = "my-example-topi8"; private final static String BOOTSTRAP_SERVERS = "localhost:8092"; public static void main( String[] args ) throws Exception { Producer<String, byte[]> producer = createProducer(); for(int i=0;i<3000;i++) { String msg = "Test Message-" + i; final ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(TOPIC, "key" + i, msg.getBytes()); producer.send(record).get(); System.out.println("Sent message " + msg); } producer.close(); } private static Producer<String, byte[]> createProducer() { Properties props = new Properties(); props.put("metadata.broker.list", BOOTSTRAP_SERVERS); props.put("bootstrap.servers", BOOTSTRAP_SERVERS); props.put("client.id", "AppFromJava"); props.put("serializer.class", "kafka.serializer.DefaultEncoder"); props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("compression.codec", "snappy"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); return new KafkaProducer<String, byte[]>(props); } }
我正在尝试读取以下数据
public class Consumer { private final static String TOPIC = "my-example-topi8"; private final static String BOOTSTRAP_SERVERS = "localhost:8092"; public static void main( String[] args ) throws Exception { Consumer<String, byte[]> consumer = createConsumer(); start(consumer); } static void start(Consumer<String, byte[]> consumer) throws InterruptedException { final int giveUp = 10; int noRecordsCount = 0; int stopCount = 1000; while (true) { final ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(1000); if (consumerRecords.count()==0) { noRecordsCount++; if (noRecordsCount > giveUp) break; else continue; } consumerRecords.forEach(record -> { // Process the record System.out.printf("\nConsumer Record:(%s, %s, %s)", record.key(), new String(record.value()), record.topic()); }); consumer.commitSync(); break; } consumer.close(); System.out.println("DONE"); } private static Consumer<String, byte[]> createConsumer() { final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234"); props.put("enable.auto.commit", "false"); // Create the consumer using props. final Consumer<String, byte[]> consumer = new KafkaConsumer(props); consumer.subscribe(Collections.singletonList(TOPIC)); return consumer; } }
但是消费者没有阅读来自kafka的任何消息。如果我在下面添加以下内容start()
start()
consumer.poll(0); consumer.seekToBeginning(consumer.assignment());
然后,消费者开始阅读该主题。但是,每当使用者重新启动时,它都会从我不希望的主题开头读取消息。如果我在启动Consumer时添加以下配置
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
然后它从主题读取消息,但是如果使用者在处理所有消息之前重新启动,则它不会读取未处理的消息。
有人可以让我知道出了什么问题吗,我该如何解决?
Kafka代理和Zookeeper正在使用默认配置运行。
您对commitSync()的调用将确认最后一个poll()中批处理中的所有消息,而不是在处理消息时确认每个消息,这是我认为您要尝试的。
从文档中
“以上示例使用commitSync将所有收到的记录标记为已提交。在某些情况下,您可能希望通过显式指定偏移量来更好地控制已提交的记录。在下面的示例中,我们在处理完每个分区中的记录后提交偏移量。
try { while(running) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); }
注意:提交的偏移量应始终是应用程序将读取的下一条消息的偏移量。因此,在调用commitSync(offsets)时,应在最后处理的消息的偏移量上添加一个。”