我有一个重试5次的Kafka使用者,并且我正在使用带有重试模板的Spring Kafka。现在,如果所有重试都失败了,那么在这种情况下如何确认工作。另外,如果我将确认模式设置为手动,那么如何确认这些消息
消费者
@Bean("kafkaListenerContainerFactory") public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(RetryTemplate retryTemplate) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); factory.setRetryTemplate(retryTemplate); factory.setRecoveryCallback(context -> { log.error("Maximum retry policy has been reached {}", context.getAttribute("record")); Acknowledgment ack = (Acknowledgment) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT); ack.acknowledge(); return null; }); factory.setConcurrency(Integer.parseInt(kafkaConcurrency)); return factory; }
卡夫卡听众
@KafkaListener(topics = "${kafka.topic.json}", containerFactory = "kafkaListenerContainerFactory") public void recieveSegmentService(String KafkaPayload, Acknowledgment acknowledgment) throws Exception { KafkaSegmentTrigger kafkaSegmentTrigger; kafkaSegmentTrigger = TransformUtil.fromJson(KafkaPayload, KafkaSegmentTrigger.class); log.info("Trigger recieved from segment service {}", kafkaSegmentTrigger); processMessage(kafkaSegmentTrigger); acknowledgment.acknowledge(); }
如果您有一个RecoveryCallback在重试用尽时可以处理记录的,并且您没有使用手动方式,则容器将提交偏移量。
RecoveryCallback
如果您使用的是MANUAL ACK,则可以在恢复回调中提交偏移量。
传递给回调的上下文对象具有一个属性RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT(“确认”)。
RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT
它还具有CONTEXT_CONSUMER和CONTEXT_RECORD。
CONTEXT_CONSUMER
CONTEXT_RECORD