编辑仅供参考:gitHub工作示例
我正在互联网上搜索,找不到嵌入式Kafka测试的工作示例。
我的设置是:
请帮我。大部分是配置过度或工程过度的示例。我相信这可以简单完成。谢谢你们!
@Controller public class KafkaController { private static final Logger LOG = getLogger(KafkaController.class); @KafkaListener(topics = "test.kafka.topic") public void receiveDunningHead(final String payload) { LOG.debug("Receiving event with payload [{}]", payload); //I will do database stuff here which i could check in db for testing } }
私有静态字符串SENDER_TOPIC =“ test.kafka.topic”;
@ClassRule public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC); @Test public void testSend() throws InterruptedException, ExecutionException { Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka); KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get(); producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get(); producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get(); Thread.sleep(10000); }
嵌入式Kafka测试适用于以下配置,
测试课注释
@EnableKafka @SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config @EmbeddedKafka( partitions = 1, controlledShutdown = false, brokerProperties = { "listeners=PLAINTEXT://localhost:3333", "port=3333" }) public class KafkaConsumerTest { @Autowired KafkaEmbedded kafkaEmbeded; @Autowired KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
注释之前的设置方法
@Before public void setUp() throws Exception { for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) { ContainerTestUtils.waitForAssignment(messageListenerContainer, kafkaEmbeded.getPartitionsPerTopic()); } }
注意:我不是@ClassRule用来创建嵌入式Kafka而是自动装配 @Autowired embeddedKafka
@ClassRule
@Autowired embeddedKafka
@Test public void testReceive() throws Exception { kafkaTemplate.send(topic, data); }
希望这可以帮助!
编辑:测试配置类标记为 @TestConfiguration
@TestConfiguration
@TestConfiguration public class TestConfig { @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbedded)); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory()); kafkaTemplate.setDefaultTopic(topic); return kafkaTemplate; }
现在@Test方法将自动连接KafkaTemplate并用于发送消息
@Test
kafkaTemplate.send(topic, data);
用上面的行更新了答案代码块