我手动启动Zookeeper,然后启动Kafka服务器,最后启动Kafka-Rest服务器及其相应的属性文件。接下来,我将我的Spring Boot应用程序部署在tomcat上
在Tomcat日志跟踪中,出现错误org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer,我的应用程序无法启动
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
错误记录
25-Dec-2017 15:00:32.508 SEVERE [localhost-startStop-1] org.apache.catalina.core.ContainerBase.addChildInternal ContainerBase.addChild: start: org.apache.catalina.LifecycleException: Failed to start component [StandardEngine[Catalina].StandardHost[localhost].StandardContext[/spring-kafka-webhook-service-0.0.1-SNAPSHOT]] at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:167) at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:752) at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:728) at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:734) at org.apache.catalina.startup.HostConfig.deployWAR(HostConfig.java:986) at org.apache.catalina.startup.HostConfig$DeployWar.run(HostConfig.java:1857) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:50) at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:348) at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:151) at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:114) at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:880) at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.finishRefresh(EmbeddedWebApplicationContext.java:144) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) at org.springframework.boot.web.support.SpringBootServletInitializer.run(SpringBootServletInitializer.java:154) at org.springframework.boot.web.support.SpringBootServletInitializer.createRootApplicationContext(SpringBootServletInitializer.java:134) at org.springframework.boot.web.support.SpringBootServletInitializer.onStartup(SpringBootServletInitializer.java:87) at org.springframework.web.SpringServletContainerInitializer.onStartup(SpringServletContainerInitializer.java:169) at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5196) at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) ... 10 more Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:73) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:69) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:305) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:230) at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:180) at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:202) at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:126) at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:202) at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:287) at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:236) at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:175) ... 27 more Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/common/ClusterResourceListener at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at org.apache.catalina.loader.WebappClassLoaderBase.findClassInternal(WebappClassLoaderBase.java:2283) at org.apache.catalina.loader.WebappClassLoaderBase.findClass(WebappClassLoaderBase.java:811) at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1260) at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1119) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:332) at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:225) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:643) ... 39 more Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.ClusterResourceListener at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1291) at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1119) ... 51 more
接收器类
public class InventoryEventReceiver { private static final Logger log = LoggerFactory.getLogger(InventoryEventReceiver.class); private CountDownLatch latch = new CountDownLatch(1); public CountDownLatch getLatch() { return latch; } @KafkaListener(topics="inventory", containerFactory="kafkaListenerContainerFactory") public void listenWithHeaders( @Payload InventoryEvent event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.OFFSET) String offset ) { System.out.println("EVENT HAS BEEN RECEIVED by listenWithHeaders(InventoryEvent)"); System.out.println(event.toString()); log.info(System.currentTimeMillis() + "-- Received Event :\"" + event + "\" from partition:offset -- " + partition + ":" + offset + " for topic : " + topic); String urlForInventoryListeners = "http://localhost:8080/" + topic + "/listeners"; OutputStream os = null; try { URL objectUrl = new URL(urlForInventoryListeners); HttpURLConnection con = (HttpURLConnection) objectUrl.openConnection(); con.setRequestMethod("POST"); con.setRequestProperty("Content-Type", "application/json; charset=UTF-8"); con.setRequestProperty("topic", topic); Gson gson = new Gson(); String eventJson = gson.toJson(event); con.setDoOutput(true); os = con.getOutputStream(); os.write(eventJson.getBytes("UTF-8")); System.out.println("Event sent to " + objectUrl); } catch (Exception e) { e.printStackTrace(); System.out.println(e.getMessage()); } finally { try { os.close(); } catch (IOException e) { e.printStackTrace(); System.out.println(e.getMessage()); } } latch.countDown(); } }
接收器配置类
@Configuration @EnableKafka public class InventoryReceiverConfig { @Autowired private KafkaConfig kafkaConfig; @Bean public static ConsumerFactory<String, InventoryEvent> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(InventoryEvent.class)); } @Bean public static ConcurrentKafkaListenerContainerFactory<String, InventoryEvent> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, InventoryEvent> containerFactory = new ConcurrentKafkaListenerContainerFactory<>(); containerFactory.setConsumerFactory(consumerFactory()); containerFactory.setConcurrency(3); containerFactory.getContainerProperties().setPollTimeout(3000); return containerFactory; } @Bean public static Map<String, Object> consumerConfigs() { Map<String, Object> consumerProps = new HashMap<>(); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,"inventory_consumers"); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class); consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"); return consumerProps; } @Bean public InventoryEventReceiver receiver() { return new InventoryEventReceiver(); } }
我的server.properties,consumer.properties和kafka-rest.properties的集群属性文件如下:
server.properties
# The id of the broker. This must be set to a unique integer for each broker. broker.id=0 # Switch to enable topic deletion or not, default value is false delete.topic.enable=true ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). advertised.listeners=PLAINTEXT://localhost:9092 # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3 # The number of threads that the server uses for processing requests, which may include disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3. offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 ##################### Confluent Proactive Support ###################### # If set to true, and confluent-support-metrics package is installed # then the feature to collect and report support metrics # ("Metrics") is enabled. If set to false, the feature is disabled. # confluent.support.metrics.enable=true ############################# Group Coordinator Settings ############################# # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. # The default value for this is 3 seconds. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=0 # The customer ID under which support metrics will be collected and # reported. # # When the customer ID is set to "anonymous" (the default), then only a # reduced set of metrics is being collected and reported. # # Confluent customers # ------------------- # If you are a Confluent customer, then you should replace the default # value with your actual Confluent customer ID. Doing so will ensure # that additional support metrics will be collected and reported. # confluent.support.customer.id=anonymous
消费者属性
# Zookeeper connection string # comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" zookeeper.connect=127.0.0.1:2181 # timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 #consumer group id group.id=test-consumer-group,inventory_consumers #consumer timeout #consumer.timeout.ms=5000
kafka-rest.properties
id=kafka-rest-test-server schema.registry.url=http://localhost:8081 zookeeper.connect=localhost:2181 # # Configure interceptor classes for sending consumer and producer metrics to Confluent Control Center # Make sure that monitoring-interceptors-<version>.jar is on the Java class path consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.psl.kafka.spring</groupId> <artifactId>spring-kafka-webhook-service</artifactId> <packaging>war</packaging> <name>spring-kafka-webhook-service</name> <description>Spring Kafka Webhook Service</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <repositories> <repository> <id>confluent</id> <url>http://packages.confluent.io/maven/</url> </repository> </repositories> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <org.springframework-version>5.0.0.RELEASE</org.springframework-version> <org.springframework.security-version>4.0.1.RELEASE</org.springframework.security-version> <org.aspectj-version>1.8.11</org.aspectj-version> <org.slf4j-version>1.7.12</org.slf4j-version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>1.1.1.RELEASE</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> <scope>runtime</scope> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.2</version> <exclusions> <exclusion> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> </exclusion> <exclusion> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20160810</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version>2.1.0.RELEASE</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.1</version> </dependency> <dependency> <groupId>io.confluent</groupId> <artifactId>monitoring-interceptors</artifactId> <version>3.1.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> <version>0.0.1-SNAPSHOT</version> </project>
我的Receiver和Sender类未使用诸如@Component或的任何注释进行注释@Service。有什么区别吗?
@Component
@Service
@Configuration public class InventorySenderConfig @Configuration @EnableKafka public class InventoryReceiverConfig @Component public class KafkaConfig @Configuration public class ProducingChannelConfig @Configuration public class ConsumingChannelConfig @RestController public class KafkaWebhookController @Service("webhookService") public class KafkaServiceImpl @EnableIntegration @SpringBootApplication @ComponentScan("com.psl.kafka") public class SpringKafkaWebhookServiceApplication extends SpringBootServletInitializer
这些是我的课堂注释。他们看起来还不错还是我需要更改某些内容?
kafka版本更新到0.10.1.1之后的新构建错误
2017-12-26 13:11:44.490 INFO 13444 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 0.10.1.1 2017-12-26 13:11:44.490 INFO 13444 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : f10ef2720b03b247 2017-12-26 13:12:44.499 ERROR 13444 --- [ main] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='inventory-events' and payload='Hello Spring Integration Kafka 0!' to topic inventory: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 2017-12-26 13:12:44.501 WARN 13444 --- [ main] o.a.k.c.p.i.ProducerInterceptors : Error executing interceptor onAcknowledgement callback java.lang.IllegalStateException: clusterResource is not defined at io.confluent.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:174) ~[monitoring-interceptors-3.1.1.jar:na] at io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor.onAcknowledgement(MonitoringProducerInterceptor.java:59) ~[monitoring-interceptors-3.1.1.jar:na] at org.apache.kafka.clients.producer.internals.ProducerInterceptors.onSendError(ProducerInterceptors.java:116) ~[kafka-clients-0.10.1.1.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:489) [kafka-clients-0.10.1.1.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:436) [kafka-clients-0.10.1.1.jar:na]
我是否需要定义我在ProducingChannelConfig,ConsumingChannelConfig和InventoryReceiverConfig类中作为配置添加的所有Interceptor类?
引起原因:java.lang.ClassNotFoundException:org.apache.kafka.common.ClusterResourceListener
您缺少kafka-clients类路径中的jar。您正在使用什么进行依赖项管理?Maven和gradle应该自动将这个jar放在类路径上。
kafka-clients