public KmqClient(KmqConfig config, KafkaClients clients, Class<? extends Deserializer<K>> keyDeserializer, Class<? extends Deserializer<V>> valueDeserializer, long msgPollTimeout) { this.config = config; this.msgPollTimeout = msgPollTimeout; this.msgConsumer = clients.createConsumer(config.getMsgConsumerGroupId(), keyDeserializer, valueDeserializer); // Using the custom partitioner, each offset-partition will contain markers only from a single queue-partition. this.markerProducer = clients.createProducer( MarkerKey.MarkerKeySerializer.class, MarkerValue.MarkerValueSerializer.class, Collections.singletonMap(ProducerConfig.PARTITIONER_CLASS_CONFIG, ParititionFromMarkerKey.class)); LOG.info(String.format("Subscribing to topic: %s, using group id: %s", config.getMsgTopic(), config.getMsgConsumerGroupId())); msgConsumer.subscribe(Collections.singletonList(config.getMsgTopic())); }
@Override @SuppressWarnings("unchecked") public void afterPropertiesSet() throws Exception { if (topics == null && topicPatternString == null) { throw new IllegalArgumentException("topic info must not be null"); } Assert.notEmpty(configs, "configs must not be null"); Assert.notNull(payloadListener, "payloadListener must be null"); String valueDeserializerKlass = (String) configs.get("value.deserializer"); configs.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); Consumer<String, byte[]> consumer = new KafkaConsumer<>(configs); Deserializer valueDeserializer = createDeserializer(valueDeserializerKlass); valueDeserializer.configure(configs, false); if (topics != null) { listenableConsumer = new ListenableTracingConsumer<>(consumer, Arrays.asList(topics), valueDeserializer); } else { listenableConsumer = new ListenableTracingConsumer<>(consumer, Pattern.compile(topicPatternString), valueDeserializer); } if (payloadListener != null) { listenableConsumer.addListener(payloadListener); } listenableConsumer.start(); }
/** * Builds a Deserializer of T with the passed stateless function and no configure or close implementations */ public static <T> Deserializer<T> buildBasicDeserializer(final DeserializeFunc<T> deserializeFunc) { return new Deserializer<T>() { @Override public void configure(final Map<String, ?> configs, final boolean isKey) { } @Override public T deserialize(final String topic, final byte[] bData) { return deserializeFunc.deserialize(topic, bData); } @Override public void close() { } }; }
/** * * @param connector * @param topics * @param processThreads */ @SuppressWarnings("unchecked") public OldApiTopicConsumer(ConsumerContext context) { this.consumerContext = context; try { Class<?> deserializerClass = Class .forName(context.getProperties().getProperty("value.deserializer")); deserializer = (Deserializer<Object>) deserializerClass.newInstance(); } catch (Exception e) { } this.connector = kafka.consumer.Consumer .createJavaConsumerConnector(new ConsumerConfig(context.getProperties())); int poolSize = consumerContext.getMessageHandlers().size(); this.fetchExecutor = new StandardThreadExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS, poolSize, new StandardThreadFactory("KafkaFetcher")); this.defaultProcessExecutor = new StandardThreadExecutor(1, context.getMaxProcessThreads(), 30, TimeUnit.SECONDS, context.getMaxProcessThreads(), new StandardThreadFactory("KafkaProcessor"), new PoolFullRunsPolicy()); logger.info( "Kafka Conumer ThreadPool initialized,fetchPool Size:{},defalutProcessPool Size:{} ", poolSize, context.getMaxProcessThreads()); }
@Override public void configure(Map<String, ?> configs, boolean isKey) { if (inner == null) { String propertyName = isKey ? "key.deserializer.inner.class" : "value.deserializer.inner.class"; Object innerDeserializerClass = configs.get(propertyName); propertyName = (innerDeserializerClass == null) ? "deserializer.inner.class" : propertyName; String value = null; try { value = (String) configs.get(propertyName); inner = Deserializer.class.cast(Utils.newInstance(value, Deserializer.class)); inner.configure(configs, isKey); } catch (ClassNotFoundException e) { throw new ConfigException(propertyName, value, "Class " + value + " could not be found."); } } }
@SuppressWarnings("unchecked") @Override public void init(ProcessorContext context) { super.init(context); this.context = context; // if deserializers are null, get the default ones from the context if (this.keyDeserializer == null) this.keyDeserializer = ensureExtended((Deserializer<K>) context.keySerde().deserializer()); if (this.valDeserializer == null) this.valDeserializer = ensureExtended((Deserializer<V>) context.valueSerde().deserializer()); // if value deserializers are for {@code Change} values, set the inner deserializer when necessary if (this.valDeserializer instanceof ChangedDeserializer && ((ChangedDeserializer) this.valDeserializer).inner() == null) ((ChangedDeserializer) this.valDeserializer).setInner(context.valueSerde().deserializer()); }
private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer, final int numMessages) throws InterruptedException { final Properties consumerProperties = new Properties(); consumerProperties .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo); consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName()); consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName()); return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived( consumerProperties, outputTopic, numMessages, 60 * 1000); }
private List<String> receiveMessages(final Deserializer<?> valueDeserializer, final int numMessages, final String topic) throws InterruptedException { final Properties config = new Properties(); config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-test"); config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName()); final List<String> received = IntegrationTestUtils.waitUntilMinValuesRecordsReceived( config, topic, numMessages, 60 * 1000); Collections.sort(received); return received; }
private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer, final int numMessages) throws InterruptedException { final Properties consumerProperties = new Properties(); consumerProperties .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo); consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName()); consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName()); return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties, outputTopic, numMessages, 60 * 1000); }
@Test public void testWindowedDeserializerNoArgConstructors() { Map<String, String> props = new HashMap<>(); // test key[value].deserializer.inner.class takes precedence over serializer.inner.class WindowedDeserializer<StringSerializer> windowedDeserializer = new WindowedDeserializer<>(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1"); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); props.put("key.deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer"); windowedDeserializer.configure(props, true); Deserializer<?> inner = windowedDeserializer.innerDeserializer(); assertNotNull("Inner deserializer should be not null", inner); assertTrue("Inner deserializer type should be StringDeserializer", inner instanceof StringDeserializer); // test deserializer.inner.class props.put("deserializer.inner.class", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.remove("key.deserializer.inner.class"); props.remove("value.deserializer.inner.class"); WindowedDeserializer<?> windowedDeserializer1 = new WindowedDeserializer<>(); windowedDeserializer1.configure(props, false); Deserializer<?> inner1 = windowedDeserializer1.innerDeserializer(); assertNotNull("Inner deserializer should be not null", inner1); assertTrue("Inner deserializer type should be ByteArrayDeserializer", inner1 instanceof ByteArrayDeserializer); }
private <K, V> Fetcher<K, V> createFetcher(SubscriptionState subscriptions, Metrics metrics, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, int maxPollRecords, IsolationLevel isolationLevel) { return new Fetcher<>(consumerClient, minBytes, maxBytes, maxWaitMs, fetchSize, maxPollRecords, true, // check crc keyDeserializer, valueDeserializer, metadata, subscriptions, metrics, metricsRegistry, time, retryBackoffMs, isolationLevel); }
@SuppressWarnings("unchecked") void init(ServletContext context) { String serializedConfig = context.getInitParameter(ConfigUtils.class.getName() + ".serialized"); Objects.requireNonNull(serializedConfig); this.config = ConfigUtils.deserialize(serializedConfig); this.updateTopic = config.getString("oryx.update-topic.message.topic"); this.maxMessageSize = config.getInt("oryx.update-topic.message.max-size"); this.updateTopicLockMaster = config.getString("oryx.update-topic.lock.master"); this.updateTopicBroker = config.getString("oryx.update-topic.broker"); this.readOnly = config.getBoolean("oryx.serving.api.read-only"); if (!readOnly) { this.inputTopic = config.getString("oryx.input-topic.message.topic"); this.inputTopicLockMaster = config.getString("oryx.input-topic.lock.master"); this.inputTopicBroker = config.getString("oryx.input-topic.broker"); } this.modelManagerClassName = config.getString("oryx.serving.model-manager-class"); this.updateDecoderClass = (Class<? extends Deserializer<U>>) ClassUtils.loadClass( config.getString("oryx.update-topic.message.decoder-class"), Deserializer.class); Preconditions.checkArgument(maxMessageSize > 0); }
@Test public void testSingleMessageSegment() { // Create serializer/deserializers. Serializer<LargeMessageSegment> segmentSerializer = new DefaultSegmentSerializer(); Deserializer<LargeMessageSegment> segmentDeserializer = new DefaultSegmentDeserializer(); byte[] messageWrappedBytes = wrapMessageBytes(segmentSerializer, "message".getBytes()); MessageAssembler messageAssembler = new MessageAssemblerImpl(100, 100, true, segmentDeserializer); MessageAssembler.AssembleResult assembleResult = messageAssembler.assemble(new TopicPartition("topic", 0), 0, messageWrappedBytes); assertNotNull(assembleResult.messageBytes()); assertEquals(assembleResult.messageStartingOffset(), 0, "The message starting offset should be 0"); assertEquals(assembleResult.messageEndingOffset(), 0, "The message ending offset should be 0"); }
@Test public void testSerde() { Serializer<String> stringSerializer = new StringSerializer(); Deserializer<String> stringDeserializer = new StringDeserializer(); Serializer<LargeMessageSegment> segmentSerializer = new DefaultSegmentSerializer(); Deserializer<LargeMessageSegment> segmentDeserializer = new DefaultSegmentDeserializer(); String s = LiKafkaClientsTestUtils.getRandomString(100); assertEquals(s.length(), 100); byte[] stringBytes = stringSerializer.serialize("topic", s); assertEquals(stringBytes.length, 100); LargeMessageSegment segment = new LargeMessageSegment(LiKafkaClientsUtils.randomUUID(), 0, 2, stringBytes.length, ByteBuffer.wrap(stringBytes)); // String bytes + segment header byte[] serializedSegment = segmentSerializer.serialize("topic", segment); assertEquals(serializedSegment.length, 1 + stringBytes.length + LargeMessageSegment.SEGMENT_INFO_OVERHEAD + 4); LargeMessageSegment deserializedSegment = segmentDeserializer.deserialize("topic", serializedSegment); assertEquals(deserializedSegment.messageId, segment.messageId); assertEquals(deserializedSegment.messageSizeInBytes, segment.messageSizeInBytes); assertEquals(deserializedSegment.numberOfSegments, segment.numberOfSegments); assertEquals(deserializedSegment.sequenceNumber, segment.sequenceNumber); assertEquals(deserializedSegment.payload.limit(), 100); String deserializedString = stringDeserializer.deserialize("topic", deserializedSegment.payloadArray()); assertEquals(deserializedString.length(), s.length()); }
@Override public Deserializer<T> deserializer() { return new Deserializer<T>() { @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public T deserialize(String topic, byte[] data) { T result; try { result = mapper.readValue(data, cls); } catch (Exception e) { throw new SerializationException(e); } return result; } @Override public void close() { } }; }
public <K, V> KafkaConsumer<K, V> createConsumer(String groupId, Class<? extends Deserializer<K>> keyDeserializer, Class<? extends Deserializer<V>> valueDeserializer) { Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("enable.auto.commit", "false"); props.put("key.deserializer", keyDeserializer.getName()); props.put("value.deserializer", valueDeserializer.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); if (groupId != null) { props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); } return new KafkaConsumer<>(props); }
public static <K, V> Payload<K, V> decodePayload(Deserializer<V> valueDeserializer, ConsumerRecord<K, byte[]> originConsumerRecord) { TracingHeader tracingHeader = null; ConsumerRecord<K, V> dataRecord = null; boolean sampled = false; byte[] data = originConsumerRecord.value(); byte[] vData = null; if (data.length <= HEADER_LENGTH) { vData = data; } else { ByteBuffer byteBuf = ByteBuffer.wrap(data); short magic = byteBuf.getShort(0); short tpLen = byteBuf.getShort(2); if (magic == MAGIC && tpLen == TracingHeader.LENGTH) { byte[] tpBytes = new byte[tpLen]; System.arraycopy(byteBuf.array(), HEADER_LENGTH, tpBytes, 0, tpLen); tracingHeader = TracingHeader.fromBytes(tpBytes); sampled = true; int dataOffset = tpLen + HEADER_LENGTH; vData = new byte[byteBuf.array().length - dataOffset]; System.arraycopy(byteBuf.array(), dataOffset, vData, 0, vData.length); } else { vData = data; } } dataRecord = new ConsumerRecord<>(originConsumerRecord.topic(), originConsumerRecord.partition(), originConsumerRecord.offset(), originConsumerRecord.key(), valueDeserializer.deserialize(originConsumerRecord.topic(), vData)); return new Payload<>(tracingHeader, dataRecord, sampled); }
public ListenableTracingConsumer(Consumer<K, byte[]> delegate, Collection<String> topics, Deserializer<V> valueDeserializer) { super(delegate); this.delegate = delegate; this.topics = topics; this.topicPattern = null; this.valueDeserializer = valueDeserializer; }
public ListenableTracingConsumer(Consumer<K, byte[]> delegate, Pattern topicPattern, Deserializer<V> valueDeserializer) { super(delegate); this.delegate = delegate; this.topicPattern = topicPattern; this.topics = null; this.valueDeserializer = valueDeserializer; }
/** * Return Kafka Consumer configured to consume from internal Kafka Server. * @param <K> Type of message key * @param <V> Type of message value * @param keyDeserializer Class of deserializer to be used for keys. * @param valueDeserializer Class of deserializer to be used for values. * @return KafkaProducer configured to produce into Test server. */ public <K, V> KafkaConsumer<K, V> getKafkaConsumer( final Class<? extends Deserializer<K>> keyDeserializer, final Class<? extends Deserializer<V>> valueDeserializer) { // Build config Map<String, Object> kafkaConsumerConfig = buildDefaultClientConfig(); kafkaConsumerConfig.put("key.deserializer", keyDeserializer); kafkaConsumerConfig.put("value.deserializer", valueDeserializer); kafkaConsumerConfig.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor"); // Create and return Consumer. return new KafkaConsumer<>(kafkaConsumerConfig); }
/** * Constructor. * @param keyDeserializerClass Class for deserializer for keys. * @param valueDeserializerClass Class for deserializer for values. */ private DeserializerConfig( final Class<? extends Deserializer> keyDeserializerClass, final Map<String, String> keyDeserializerOptions, final Class<? extends Deserializer> valueDeserializerClass, final Map<String, String> valueDeserializerOptions ) { this.keyDeserializerClass = keyDeserializerClass; this.keyDeserializerOptions = new HashMap<>(); this.keyDeserializerOptions.putAll(keyDeserializerOptions); this.valueDeserializerClass = valueDeserializerClass; this.valueDeserializerOptions = new HashMap<>(); this.valueDeserializerOptions.putAll(valueDeserializerOptions); }
/** * Constructor. */ public WebKafkaConsumerFactory( final PluginFactory<Deserializer> deserializerPluginFactory, final PluginFactory<RecordFilter> recordFilterPluginFactory, final SecretManager secretManager, final KafkaConsumerFactory kafkaConsumerFactory) { this.deserializerPluginFactory = deserializerPluginFactory; this.recordFilterPluginFactory = recordFilterPluginFactory; this.secretManager = secretManager; this.kafkaConsumerFactory = kafkaConsumerFactory; }
private Class<? extends Deserializer> getDeserializerClass(final MessageFormat messageFormat) { try { if (messageFormat.isDefaultFormat()) { return deserializerPluginFactory.getPluginClass(messageFormat.getClasspath()); } else { return deserializerPluginFactory.getPluginClass(messageFormat.getJar(), messageFormat.getClasspath()); } } catch (final LoaderException exception) { throw new RuntimeException(exception.getMessage(), exception); } }
/** * Test creating a Deserializer. */ @Test public void testWithDeserializer() throws LoaderException { final String jarFilename = "testPlugins.jar"; final String classPath = "examples.deserializer.ExampleDeserializer"; // Find jar on filesystem. final URL jar = getClass().getClassLoader().getResource("testDeserializer/" + jarFilename); final String jarPath = new File(jar.getFile()).getParent(); // Create factory final PluginFactory<Deserializer> factory = new PluginFactory<>(jarPath, Deserializer.class); final Path pathForJar = factory.getPathForJar(jarFilename); // Validate path is correct assertEquals("Has expected Path", jar.getPath(), pathForJar.toString()); // Get class instance final Class<? extends Deserializer> pluginFilterClass = factory.getPluginClass(jarFilename, classPath); // Validate assertNotNull(pluginFilterClass); assertEquals("Has expected name", classPath, pluginFilterClass.getName()); assertTrue("Validate came from correct class loader", pluginFilterClass.getClassLoader() instanceof PluginClassLoader); // Crete filter instance final Deserializer deserializer = factory.getPlugin(jarFilename, classPath); assertNotNull(deserializer); assertEquals("Has correct name", classPath, deserializer.getClass().getName()); // Call method on interface final String value = "MyValue"; final String result = (String) deserializer.deserialize("MyTopic", value.getBytes(StandardCharsets.UTF_8)); }
/** * Tests loading a deserializer not from an external jar. */ @Test public void testLoadingDefaultDeserializer() throws LoaderException { final String classPath = StringDeserializer.class.getName(); // Create factory final PluginFactory<Deserializer> factory = new PluginFactory<>("/tmp", Deserializer.class); // Get class instance final Class<? extends Deserializer> pluginFilterClass = factory.getPluginClass(classPath); // Validate assertNotNull(pluginFilterClass); assertEquals("Has expected name", classPath, pluginFilterClass.getName()); }
private WebKafkaConsumerFactory createDefaultFactory() { final PluginFactory<Deserializer> deserializerPluginFactory = new PluginFactory<>("not/used", Deserializer.class); final PluginFactory<RecordFilter> filterPluginFactoryPluginFactory = new PluginFactory<>("not/used", RecordFilter.class); final SecretManager secretManager = new SecretManager("Passphrase"); final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory("not/used"); return new WebKafkaConsumerFactory( deserializerPluginFactory, filterPluginFactoryPluginFactory, secretManager, kafkaConsumerFactory ); }
public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put(APPLICATION_ID_CONFIG, "my-stream-processing-application"); props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.serializer", JsonPOJOSerializer.class.getName()); props.put("value.deserializer", JsonPOJODeserializer.class.getName()); Map<String, Object> serdeProps = new HashMap<>(); serdeProps.put("JsonPOJOClass", Messung.class); final Serializer<Messung> serializer = new JsonPOJOSerializer<>(); serializer.configure(serdeProps, false); final Deserializer<Messung> deserializer = new JsonPOJODeserializer<>(); deserializer.configure(serdeProps, false); final Serde<Messung> serde = Serdes.serdeFrom(serializer, deserializer); StreamsConfig config = new StreamsConfig(props); KStreamBuilder builder = new KStreamBuilder(); builder.stream(Serdes.String(), serde, "produktion") .filter( (k,v) -> v.type.equals("Biogas")) .to(Serdes.String(), serde,"produktion2"); KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); }
static <K1, V1> String createReparitionedSource(AbstractStream<K1> stream, Serde<K1> keySerde, Serde<V1> valSerde, final String topicNamePrefix) { Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null; Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null; Deserializer<K1> keyDeserializer = keySerde != null ? keySerde.deserializer() : null; Deserializer<V1> valDeserializer = valSerde != null ? valSerde.deserializer() : null; String baseName = topicNamePrefix != null ? topicNamePrefix : stream.name; String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX; String sinkName = stream.topology.newName(SINK_NAME); String filterName = stream.topology.newName(FILTER_NAME); String sourceName = stream.topology.newName(SOURCE_NAME); stream.topology.addInternalTopic(repartitionTopic); stream.topology.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() { @Override public boolean test(final K1 key, final V1 value) { return key != null; } }, false), stream.name); stream.topology.addSink(sinkName, repartitionTopic, keySerializer, valSerializer, filterName); stream.topology.addSource(sourceName, keyDeserializer, valDeserializer, repartitionTopic); return sourceName; }
private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier, final String functionName, final StateStoreSupplier<KeyValueStore> storeSupplier) { String sinkName = topology.newName(KStreamImpl.SINK_NAME); String sourceName = topology.newName(KStreamImpl.SOURCE_NAME); String funcName = topology.newName(functionName); String topic = storeSupplier.name() + KStreamImpl.REPARTITION_TOPIC_SUFFIX; Serializer<? extends K> keySerializer = keySerde == null ? null : keySerde.serializer(); Deserializer<? extends K> keyDeserializer = keySerde == null ? null : keySerde.deserializer(); Serializer<? extends V> valueSerializer = valSerde == null ? null : valSerde.serializer(); Deserializer<? extends V> valueDeserializer = valSerde == null ? null : valSerde.deserializer(); ChangedSerializer<? extends V> changedValueSerializer = new ChangedSerializer<>(valueSerializer); ChangedDeserializer<? extends V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer); // send the aggregate key-value pairs to the intermediate topic for partitioning topology.addInternalTopic(topic); topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name); // read the intermediate topic topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic); // aggregate the values with the aggregator and local store topology.addProcessor(funcName, aggregateSupplier, sourceName); topology.addStateStore(storeSupplier, funcName); // return the KTable representation with the intermediate topic as the sources return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name(), isQueryable); }
public SourceNode(String name, List<String> topics, TimestampExtractor timestampExtractor, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) { super(name); this.topics = topics; this.timestampExtractor = timestampExtractor; this.keyDeserializer = ensureExtended(keyDeserializer); this.valDeserializer = ensureExtended(valDeserializer); }
private SourceNodeFactory(final String name, final String[] topics, final Pattern pattern, final TimestampExtractor timestampExtractor, final Deserializer<?> keyDeserializer, final Deserializer<?> valDeserializer) { super(name); this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<String>(); this.pattern = pattern; this.keyDeserializer = keyDeserializer; this.valDeserializer = valDeserializer; this.timestampExtractor = timestampExtractor; }
/** * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes. * The source will use the specified key and value deserializers. * * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found; * acceptable values are earliest or latest. * @param name the unique name of the source used to reference this node when * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. * @param timestampExtractor the stateless timestamp extractor used for this source, * if not specified the default extractor defined in the configs will be used * @param keyDeserializer key deserializer used to read this source, if not specified the default * key deserializer defined in the configs will be used * @param valDeserializer value deserializer used to read this source, * if not specified the default value deserializer defined in the configs will be used * @param topics the name of one or more Kafka topics that this source is to consume * @return this builder instance so methods can be chained together; never null * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source */ public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final TimestampExtractor timestampExtractor, final Deserializer keyDeserializer, final Deserializer valDeserializer, final String... topics) { if (topics.length == 0) { throw new TopologyBuilderException("You must provide at least one topic"); } Objects.requireNonNull(name, "name must not be null"); if (nodeFactories.containsKey(name)) throw new TopologyBuilderException("Processor " + name + " is already added."); for (String topic : topics) { Objects.requireNonNull(topic, "topic names cannot be null"); validateTopicNotAlreadyRegistered(topic); maybeAddToResetList(earliestResetTopics, latestResetTopics, offsetReset, topic); sourceTopicNames.add(topic); } nodeFactories.put(name, new SourceNodeFactory(name, topics, null, timestampExtractor, keyDeserializer, valDeserializer)); nodeToSourceTopics.put(name, Arrays.asList(topics)); nodeGrouper.add(name); return this; }
/** * Add a new source that consumes from topics matching the given pattern * and forwards the records to child processor and/or sink nodes. * The source will use the specified key and value deserializers. The provided * de-/serializers will be used for all matched topics, so care should be taken to specify patterns for * topics that share the same key-value data format. * * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found; * acceptable values are earliest or latest * @param name the unique name of the source used to reference this node when * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. * @param timestampExtractor the stateless timestamp extractor used for this source, * if not specified the default extractor defined in the configs will be used * @param keyDeserializer key deserializer used to read this source, if not specified the default * key deserializer defined in the configs will be used * @param valDeserializer value deserializer used to read this source, * if not specified the default value deserializer defined in the configs will be used * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume * @return this builder instance so methods can be chained together; never null * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name */ public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final TimestampExtractor timestampExtractor, final Deserializer keyDeserializer, final Deserializer valDeserializer, final Pattern topicPattern) { Objects.requireNonNull(topicPattern, "topicPattern can't be null"); Objects.requireNonNull(name, "name can't be null"); if (nodeFactories.containsKey(name)) { throw new TopologyBuilderException("Processor " + name + " is already added."); } for (String sourceTopicName : sourceTopicNames) { if (topicPattern.matcher(sourceTopicName).matches()) { throw new TopologyBuilderException("Pattern " + topicPattern + " will match a topic that has already been registered by another source."); } } maybeAddToResetList(earliestResetPatterns, latestResetPatterns, offsetReset, topicPattern); nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, timestampExtractor, keyDeserializer, valDeserializer)); nodeToSourcePatterns.put(name, topicPattern); nodeGrouper.add(name); return this; }
/** * Read the next record from the given topic. These records were output by the topology during the previous calls to * {@link #process(String, byte[], byte[])}. * * @param topic the name of the topic * @param keyDeserializer the deserializer for the key type * @param valueDeserializer the deserializer for the value type * @return the next record on that topic, or null if there is no record available */ public <K, V> ProducerRecord<K, V> readOutput(final String topic, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer) { final ProducerRecord<byte[], byte[]> record = readOutput(topic); if (record == null) { return null; } final K key = keyDeserializer.deserialize(record.topic(), record.key()); final V value = valueDeserializer.deserialize(record.topic(), record.value()); return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value); }
public Fetcher(ConsumerNetworkClient client, int minBytes, int maxBytes, int maxWaitMs, int fetchSize, int maxPollRecords, boolean checkCrcs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Metadata metadata, SubscriptionState subscriptions, Metrics metrics, FetcherMetricsRegistry metricsRegistry, Time time, long retryBackoffMs, IsolationLevel isolationLevel) { this.time = time; this.client = client; this.metadata = metadata; this.subscriptions = subscriptions; this.minBytes = minBytes; this.maxBytes = maxBytes; this.maxWaitMs = maxWaitMs; this.fetchSize = fetchSize; this.maxPollRecords = maxPollRecords; this.checkCrcs = checkCrcs; this.keyDeserializer = ensureExtended(keyDeserializer); this.valueDeserializer = ensureExtended(valueDeserializer); this.completedFetches = new ConcurrentLinkedQueue<>(); this.sensors = new FetchManagerMetrics(metrics, metricsRegistry); this.retryBackoffMs = retryBackoffMs; this.isolationLevel = isolationLevel; subscriptions.addListener(this); }
public static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer) { Map<String, Object> newConfigs = new HashMap<String, Object>(); newConfigs.putAll(configs); if (keyDeserializer != null) newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass()); if (valueDeserializer != null) newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass()); return newConfigs; }
public static Properties addDeserializerToConfig(Properties properties, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer) { Properties newProperties = new Properties(); newProperties.putAll(properties); if (keyDeserializer != null) newProperties.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName()); if (valueDeserializer != null) newProperties.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName()); return newProperties; }
KafkaConsumer(String clientId, ConsumerCoordinator coordinator, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Fetcher<K, V> fetcher, ConsumerInterceptors<K, V> interceptors, Time time, ConsumerNetworkClient client, Metrics metrics, SubscriptionState subscriptions, Metadata metadata, long retryBackoffMs, long requestTimeoutMs) { this.clientId = clientId; this.coordinator = coordinator; this.keyDeserializer = keyDeserializer; this.valueDeserializer = valueDeserializer; this.fetcher = fetcher; this.interceptors = interceptors; this.time = time; this.client = client; this.metrics = metrics; this.subscriptions = subscriptions; this.metadata = metadata; this.retryBackoffMs = retryBackoffMs; this.requestTimeoutMs = requestTimeoutMs; }
private ClusterResourceListeners configureClusterResourceListeners(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, List<?>... candidateLists) { ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners(); for (List<?> candidateList: candidateLists) clusterResourceListeners.maybeAddAll(candidateList); clusterResourceListeners.maybeAdd(keyDeserializer); clusterResourceListeners.maybeAdd(valueDeserializer); return clusterResourceListeners; }