我正在用Java编写一个Kafka流应用程序,该应用程序将接受由连接器创建的输入主题,该连接器将模式注册表和avro用于键和值转换器。连接器产生以下架构:
key-schema: "int" value-schema:{ "type": "record", "name": "User", "fields": [ {"name": "firstname", "type": "string"}, {"name": "lastname", "type": "string"} ]}
实际上,有几个主题,键模式始终是“ int”,而值模式始终是某种记录(用户,产品等)。我的代码包含以下定义
Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl); Serde<User> userSerde = new SpecificAvroSerde<>(); userSerde.configure(serdeConfig, false);
最初,我尝试使用类似的内容来使用该主题, Consumed.with(Serdes.Integer(), userSerde);但由于Serdes.Integer()期望使用4个字节对整数进行编码,但avro使用可变长度编码,因此无法使用。使用Consumed.with(Serdes.Bytes(), userSerde);工作,但我真的想要int而不是字节,所以我将代码更改为此
Consumed.with(Serdes.Bytes(), userSerde)
KafkaAvroDeserializer keyDeserializer = new KafkaAvroDeserializer() KafkaAvroSerializer keySerializer = new KafkaAvroSerializer(); keyDeserializer.configure(serdeConfig, true); keySerializer.configure(serdeConfig, true); Serde<Integer> keySerde = (Serde<Integer>)(Serde)Serdes.serdeFrom(keySerializer, keyDeserializer);
这使编译器产生警告(它不喜欢(Serde<Integer>)(Serde)强制转换),但允许我使用
(Serde<Integer>)(Serde)
Consumed.with(keySerde, userSerde);并获得一个整数作为关键字。这工作得很好,我的应用程序表现出预期(很棒!!!)。但是现在我想为键/值定义默认Serde,但我无法使其正常工作。
Consumed.with(keySerde, userSerde)
设置默认值serde很简单:
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
但是我不知道如何定义默认的密钥序列。
我试过了
1.streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde.getClass().getName()); 产生运行时错误:找不到org.apache.kafka.common.serialization.Serdes $ WrapperSerde的公共无参数构造函数 2. streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); 产生运行时错误:无法将java.lang.Integer强制转换为org.apache.avro.specific.SpecificRecord 我想念什么?谢谢。
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerde.getClass().getName())
org.apache.kafka.common.serialization.Serdes $ WrapperSerde
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class)
java.lang.Integer
org.apache.avro.specific.SpecificRecord
更新 (5.5及更高版本)
Confluent版本5.5通过PrimitiveAvroSerde(cf. https://github.com/confluentinc/schema-registry/blob/5.5.x/avro-serde/src/main/java/io/confluent/kafka/streams添加了对原始Avro类型的本地支持/serdes/avro/PrimitiveAvroSerde.java)
原始答案 (版本5.4及更高版本):
这是一个已知问题。原始Avro类型无法与Confluent的AvroSerdes配合使用,因为Serdes只能与GenericAvroRecord和一起SpecificAvroRecord使用。
比较https://github.com/confluentinc/schema-registry/tree/master/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro。
因此,基于KafkaAvroSerializer和构建您自己的SerdeKafkaAvroDeserializer是正确的方法。为了能够将其作为默认Serde传递到配置中,您不能使用,Serdes.serdeFrom因为由于泛型类型擦除而导致类型信息丢失。
但是,您可以实现自己的扩展Serde接口的类,并将自定义类传递给config:
public class MySerde extends Serde<Integer> { // use KafkaAvroSerializer and KafkaAvroDeserializer and cast `Object` to `Integer` } config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MySerde.class);
感谢@Matthias J. Sax的提示,我想发布解决方案周围的工作。请免费进行改进。
import java.util.Collections; import java.util.Map; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; public class GenericPrimitiveAvroSerDe<T> implements Serde<T> { private final Serde<Object> inner; /** * Constructor used by Kafka Streams. */ public GenericPrimitiveAvroSerDe() { inner = Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer()); } public GenericPrimitiveAvroSerDe(SchemaRegistryClient client) { this(client, Collections.emptyMap()); } public GenericPrimitiveAvroSerDe(SchemaRegistryClient client, Map<String, ?> props) { inner = Serdes.serdeFrom(new KafkaAvroSerializer(client), new KafkaAvroDeserializer(client, props)); } @Override public void configure(final Map<String, ?> serdeConfig, final boolean isSerdeForRecordKeys) { inner.serializer().configure(serdeConfig, isSerdeForRecordKeys); inner.deserializer().configure(serdeConfig, isSerdeForRecordKeys); } @Override public void close() { // TODO Auto-generated method stub inner.serializer().close(); inner.deserializer().close(); } @SuppressWarnings("unchecked") @Override public Serializer<T> serializer() { // TODO Auto-generated method stub Object obj = inner.serializer(); return (Serializer<T>) obj; } @SuppressWarnings("unchecked") @Override public Deserializer<T> deserializer() { // TODO Auto-generated method stub Object obj = inner.deserializer(); return (Deserializer<T>) obj; } }
用作默认流配置:
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,GenericPrimitiveAvroSerDe.class); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,GenericPrimitiveAvroSerDe.class);
覆盖默认值:
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", "http://localhost:8081"); final GenericPrimitiveAvroSerDe<String> keyGenericAvroSerde = new GenericPrimitiveAvroSerDe<String>(); keyGenericAvroSerde.configure(serdeConfig, true); // `true` for record keys final GenericPrimitiveAvroSerDe<Long> valueGenericAvroSerde = new GenericPrimitiveAvroSerDe<Long>(); valueGenericAvroSerde.configure(serdeConfig, false); // `false` for record values