我使用Spring Boot 2.0.6 开发了一个@KafkaListener带有ConsumerAwareRebalanceListener接口的标签。我实现了该onPartitionsAssigned方法,在该方法中,将偏移的时间倒回固定的时间,例如60秒。
@KafkaListener
ConsumerAwareRebalanceListener
onPartitionsAssigned
到目前为止,一切都很好。
如何使用Spring Kafka给我的工具测试上述用例?我以为我需要启动一个Kafka代理(例如EmbeddedKafka),然后停止侦听器,然后再次重新启动它,以测试它是否再次读取了过去60秒内到达的消息。
EmbeddedKafka
有人可以帮我吗?我在Google上搜索了一下,但没有找到任何东西。非常感谢。
该@KafkaListener有:
/** * The unique identifier of the container managing for this endpoint. * <p>If none is specified an auto-generated one is provided. * @return the {@code id} for the container managing for this endpoint. * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String) */ String id() default "";
属性,因此您可以MessageListenerContainer通过提到的对其进行访问,您可以将其KafkaListenerEndpointRegistry简单地@Autowired放入基于Spring Testing Framework的测试类中。然后,你才能真正stop()和start()那MessageListenerContainer在您的测试方法。
MessageListenerContainer
KafkaListenerEndpointRegistry
@Autowired
stop()
start()
也要注意如何@KafkaListener也有一个autoStartup()属性。
autoStartup()