我正在开发Spring Boot + Apache Kafka + Apache Zookeeper示例。我已经Apache Zookeeper and Apache Kafka在本地Windows机器上安装/设置了。我从链接中获取了参考:https : //www.tutorialspoint.com/spring_boot/spring_boot_apache_kafka.htm并按原样执行了代码:
Spring Boot + Apache Kafka + Apache Zookeeper
Apache Zookeeper and Apache Kafka
设置:https://medium.com/@shaaslam/installing-apache-kafka-on- windows-495f6f2fd3c8
错误:
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE] at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE] at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE] at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE] at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE] at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:879) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE] at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE] at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE] at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE] at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE] at com.tutorialspoint.SpringKafkaTutorialspointApplication.main(SpringKafkaTutorialspointApplication.java:21) [classes/:na] Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
Zookeeper服务器的日志:
2018-12-14 22:16:35,352 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60197 (no session established for client) 2018-12-14 22:16:36,260 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60200 2018-12-14 22:16:36,260 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null 2018-12-14 22:16:36,262 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60200 (no session established for client) 2018-12-14 22:16:37,473 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60204 2018-12-14 22:16:37,473 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null 2018-12-14 22:16:37,476 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60204 (no session established for client) 2018-12-14 22:16:38,383 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60207 2018-12-14 22:16:38,384 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null 2018-12-14 22:16:38,388 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60207 (no session established for client) 2018-12-14 22:16:39,494 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60211 2018-12-14 22:16:39,494 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null 2018-12-14 22:16:39,497 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60211 (no session established for client) 2018-12-14 22:16:40,506 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60214 2018-12-14 22:16:40,507 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null 2018-12-14 22:16:40,509 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60214 (no session established for client) 2018-12-14 22:16:41,519 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60217 2018-12-14 22:16:41,519 [myid:] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null 2018-12-14 22:16:41,525 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60217 (no session established for client)
KafkaProducerConfig.java
@Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String, String> producerFactory(){ Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
KafkaConsumerConfig.java
@EnableKafka @Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
SpringKafkaTutorialspointApplication.java
@SpringBootApplication public class SpringKafkaTutorialspointApplication implements CommandLineRunner{ @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String message) { kafkaTemplate.send("tutorialspoint", message); } public static void main(String[] args) { SpringApplication.run(SpringKafkaTutorialspointApplication.class, args); } @Override public void run(String... args) throws Exception { sendMessage("Hi Welcome to Spring For Apache Kafka @@@@@@@@@"); } @KafkaListener(topics = "tutorialspoint", groupId = "group-id") public void listen(String message) { System.out.println("Received Messasge in group - group-id: " + message); } }
注意:请让我知道是否需要其他详细信息。
有任何快速帮助吗?
花了很多小时后,我发现我也应该在下面使用它KafkaConsumerConfig。
KafkaConsumerConfig
通过以下更改,代码可以正常工作。
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");