小编典典

如何测试ConsumerAwareRebalanceListener?

spring-boot

我使用Spring Boot 2.0.6
开发了一个@KafkaListener带有ConsumerAwareRebalanceListener接口的标签。我实现了该onPartitionsAssigned方法,在该方法中,将偏移的时间倒回固定的时间,例如60秒。

到目前为止,一切都很好。

如何使用Spring
Kafka给我的工具测试上述用例?我以为我需要启动一个Kafka代理(例如EmbeddedKafka),然后停止侦听器,然后再次重新启动它,以测试它是否再次读取了过去60秒内到达的消息。

有人可以帮我吗?我在Google上搜索了一下,但没有找到任何东西。非常感谢。


阅读 851

收藏
2020-05-30

共1个答案

小编典典

@KafkaListener有:

/**
 * The unique identifier of the container managing for this endpoint.
 * <p>If none is specified an auto-generated one is provided.
 * @return the {@code id} for the container managing for this endpoint.
 * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
 */
String id() default "";

属性,因此您可以MessageListenerContainer通过提到的对其进行访问,您可以将其KafkaListenerEndpointRegistry简单地@Autowired放入基于Spring
Testing
Framework的测试类中。然后,你才能真正stop()start()MessageListenerContainer在您的测试方法。

也要注意如何@KafkaListener也有一个autoStartup()属性。

2020-05-30