我有一个Spring Boot尝试读取本地Kafka主题的应用。
Spring Boot
Kafka
该应用程序开始旋转,暂停2分钟,并引发以下错误:
19-11-10 14:49:24.700 INFO 20476 --- [ restartedMain] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1573390164696 2019-11-10 14:51:24.793 WARN 20476 --- [ restartedMain] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: Topic(s) [XYZ] is/are not present and missingTopicsFatal is true 2019-11-10 14:51:24.795 INFO 20476 --- [ restartedMain] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 'taskScheduler' 2019-11-10 14:51:24.796 INFO 20476 --- [ restartedMain] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor' 2019-11-10 14:51:24.810 INFO 20476 --- [ restartedMain] o.apache.catalina.core.StandardService : Stopping service [Tomcat] 2019-11-10 14:51:24.837 INFO 20476 --- [ restartedMain] ConditionEvaluationReportLoggingListener : Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled. 2019-11-10 14:51:24.844 ERROR 20476 --- [ restartedMain] o.s.boot.SpringApplication : Application run failed org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: Topic(s) [XYZ] is/are not present and missingTopicsFatal is true at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE] at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE] at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE] at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE] at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE] at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE] at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162) ~[spring-boot-2.2.0.RELEASE.jar:2.2.0.RELEASE] at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE] at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.2.0.RELEASE.jar:2.2.0.RELEASE] at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) [spring-boot-2.2.0.RELEASE.jar:2.2.0.RELEASE] at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.2.0.RELEASE.jar:2.2.0.RELEASE] at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) [spring-boot-2.2.0.RELEASE.jar:2.2.0.RELEASE] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.2.0.RELEASE.jar:2.2.0.RELEASE] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) [spring-boot-2.2.0.RELEASE.jar:2.2.0.RELEASE] at com.example.main(NyApplication.java:38) [classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_161] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_161] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_161] at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_161] at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) [spring-boot-devtools-2.2.0.RELEASE.jar:2.2.0.RELEASE] Caused by: java.lang.IllegalStateException: Topic(s) [ZYX] is/are not present and missingTopicsFatal is true at org.springframework.kafka.listener.AbstractMessageListenerContainer.checkTopics(AbstractMessageListenerContainer.java:366) ~[spring-kafka-2.3.3.RELEASE.jar:2.3.3.RELEASE] at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:136) ~[spring-kafka-2.3.3.RELEASE.jar:2.3.3.RELEASE] at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:323) ~[spring-kafka-2.3.3.RELEASE.jar:2.3.3.RELEASE] at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:309) ~[spring-kafka-2.3.3.RELEASE.jar:2.3.3.RELEASE] at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:256) ~[spring-kafka-2.3.3.RELEASE.jar:2.3.3.RELEASE] at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.2.1.RELEASE.jar:5.2.1.RELEASE] ... 19 common frames omitted `
将kafka在本地计算机上的码头工人运行:
kafka
docker run --detach --name kafka -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=192.168.1.89 --env ADVERVTISED_PORT=9092 --env AUTO.CREATE.TOPICS.ENABLE spotify/kafka
何时bash放入容器
bash
docker exec -it 1234567896b5 bash
我可以看到主题:
$/opt/kafka_2.11-0.10.1.0/bin# ./kafka-topics.sh --list --zookeeper localhost:2181 $XYZ
该主题是使用以下命令创建的:
./kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 3 --topic XYZ
我application.yml在应用程序的中拥有此功能(也尝试使用端口9092):
application.yml
spring: kafka: consumer: bootstrap-servers: localhost:2181 group-id: group_id auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: bootstrap-servers: localhost:2181 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
您的配置使用的是Zookeeper,而不是Kafka作为引导服务器
使用端口9092,而不是2181
另外,该Spotify容器未维护,因此我建议找到一个更新的容器