我正在使用此docker- compose设置在本地设置Kafka:https : //github.com/wurstmeister/kafka- docker/
docker-compose up 工作正常,通过shell创建主题工作正常。
docker-compose up
现在我尝试通过以下方式连接到Kafka spring-kafka:2.1.0.RELEASE
spring-kafka:2.1.0.RELEASE
启动Spring应用程序时,它会打印正确版本的Kafka:
o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0 o.a.kafka.common.utils.AppInfoParser : Kafka commitId : aaa7af6d4a11b29d
我尝试发送这样的消息
kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");
在客户端发送失败
UnknownServerException: The server experienced an unexpected error when processing the request
在服务器控制台中,我收到消息 Magic v1不支持记录头
Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis) java.lang.IllegalArgumentException: Magic v1 does not support record headers
谷歌搜索表明版本冲突,但该版本似乎合适(org.apache.kafka:kafka-clients:1.0.0在类路径中)。
org.apache.kafka:kafka-clients:1.0.0
有什么线索吗?谢谢!
编辑:我缩小了问题的根源。发送纯字符串是可行的,但是通过JsonSerializer发送Json会导致给定的问题。这是我的生产者配置的内容:
@Value("\${kafka.bootstrap-servers}") lateinit var bootstrapServers: String @Bean fun producerConfigs(): Map<String, Any> = HashMap<String, Any>().apply { // list of host:port pairs used for establishing the initial connections to the Kakfa cluster put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java) put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java) } @Bean fun producerFactory(): ProducerFactory<String, MyClass> = DefaultKafkaProducerFactory(producerConfigs()) @Bean fun kafkaTemplate(): KafkaTemplate<String, MyClass> = KafkaTemplate(producerFactory())
解决了。问题既不是代理,也不是某些docker缓存,也不是Spring应用程序。
问题是控制台使用者,我并行使用它进行调试。这是一个“老”消费者kafka-console-consumer.sh --topic=topic --zookeeper=...
kafka-console-consumer.sh --topic=topic --zookeeper=...
它实际上在启动时会显示警告: Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
--bootstrap-server应该使用带有选项的“新”使用者(尤其是在将Kafka 1.0与JsonSerializer一起使用时)。注意:在这里使用老消费者确实会影响生产者。
--bootstrap-server