public <K, V> KafkaProducer<K, V> createProducer(Class<? extends Serializer<K>> keySerializer, Class<? extends Serializer<V>> valueSerializer, Map<String, Object> extraConfig) { Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", keySerializer.getName()); props.put("value.serializer", valueSerializer.getName()); for (Map.Entry<String, Object> extraCfgEntry : extraConfig.entrySet()) { props.put(extraCfgEntry.getKey(), extraCfgEntry.getValue()); } return new KafkaProducer<>(props); }
/** * Creates a basic serializer using the passed stateless function and not implementing close or configure * * @param serializeFunc The function to serialize T to a byte[] * @param <T> The type of object to serialize * @return A byte[] representation of T */ public static <T> Serializer<T> buildBasicSerializer(final SerializeFunc<T> serializeFunc) { return new Serializer<T>() { @Override public void configure(final Map<String, ?> configs, final boolean isKey) { } @Override public byte[] serialize(final String topic, final T data) { return serializeFunc.serialize(topic, data); } @Override public void close() { } }; }
/** * Creates a kafka producer that is connected to our test server. * @param <K> Type of message key * @param <V> Type of message value * @param keySerializer Class of serializer to be used for keys. * @param valueSerializer Class of serializer to be used for values. * @return KafkaProducer configured to produce into Test server. */ public <K, V> KafkaProducer<K, V> getKafkaProducer( final Class<? extends Serializer<K>> keySerializer, final Class<? extends Serializer<V>> valueSerializer) { // Build config final Map<String, Object> kafkaProducerConfig = Maps.newHashMap(); kafkaProducerConfig.put("bootstrap.servers", getKafkaConnectString()); kafkaProducerConfig.put("key.serializer", keySerializer); kafkaProducerConfig.put("value.serializer", valueSerializer); kafkaProducerConfig.put("max.in.flight.requests.per.connection", 1); kafkaProducerConfig.put("retries", 5); kafkaProducerConfig.put("client.id", getClass().getSimpleName() + " Producer"); kafkaProducerConfig.put("batch.size", 0); // Create and return Producer. return new KafkaProducer<>(kafkaProducerConfig); }
@SuppressWarnings("unchecked") @Override public void to(final Serde<K> keySerde, final Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, final String topic) { Objects.requireNonNull(topic, "topic can't be null"); final String name = topology.newName(SINK_NAME); final Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer(); final Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer(); if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) { final WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer; partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(topic, windowedSerializer); } topology.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name); }
@Override public void configure(Map<String, ?> configs, boolean isKey) { if (inner == null) { String propertyName = isKey ? "key.serializer.inner.class" : "value.serializer.inner.class"; Object innerSerializerClass = configs.get(propertyName); propertyName = (innerSerializerClass == null) ? "serializer.inner.class" : propertyName; String value = null; try { value = (String) configs.get(propertyName); inner = Serializer.class.cast(Utils.newInstance(value, Serializer.class)); inner.configure(configs, isKey); } catch (ClassNotFoundException e) { throw new ConfigException(propertyName, value, "Class " + value + " could not be found."); } } }
@Override public <K, V> void send(final String topic, final K key, final V value, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner) { Integer partition = null; if (partitioner != null) { final List<PartitionInfo> partitions = producer.partitionsFor(topic); if (partitions.size() > 0) { partition = partitioner.partition(key, value, partitions.size()); } else { throw new StreamsException("Could not get partition information for topic '" + topic + "'." + " This can happen if the topic does not exist."); } } send(topic, key, value, partition, timestamp, keySerializer, valueSerializer); }
@SuppressWarnings("unchecked") @Override public void init(final ProcessorContext context) { super.init(context); this.context = context; // if serializers are null, get the default ones from the context if (keySerializer == null) { keySerializer = (Serializer<K>) context.keySerde().serializer(); } if (valSerializer == null) { valSerializer = (Serializer<V>) context.valueSerde().serializer(); } // if value serializers are for {@code Change} values, set the inner serializer when necessary if (valSerializer instanceof ChangedSerializer && ((ChangedSerializer) valSerializer).inner() == null) { ((ChangedSerializer) valSerializer).setInner(context.valueSerde().serializer()); } }
/** * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic. * The sink will use the specified key and value serializers, and the supplied partitioner. * * @param name the unique name of the sink * @param topic the name of the Kafka topic to which this sink should write its records * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration} * @param valSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration} * @param partitioner the function that should be used to determine the partition for each record processed by the sink * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume * and write to its topic * @return this builder instance so methods can be chained together; never null * @see #addSink(String, String, String...) * @see #addSink(String, String, StreamPartitioner, String...) * @see #addSink(String, String, Serializer, Serializer, String...) * @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name */ public synchronized final <K, V> TopologyBuilder addSink(final String name, final String topic, final Serializer<K> keySerializer, final Serializer<V> valSerializer, final StreamPartitioner<? super K, ? super V> partitioner, final String... parentNames) { Objects.requireNonNull(name, "name must not be null"); Objects.requireNonNull(topic, "topic must not be null"); if (nodeFactories.containsKey(name)) throw new TopologyBuilderException("Processor " + name + " is already added."); for (final String parent : parentNames) { if (parent.equals(name)) { throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself."); } if (!nodeFactories.containsKey(parent)) { throw new TopologyBuilderException("Parent processor " + parent + " is not added yet."); } } nodeFactories.put(name, new SinkNodeFactory<>(name, parentNames, topic, keySerializer, valSerializer, partitioner)); nodeToSinkTopic.put(name, topic); nodeGrouper.add(name); nodeGrouper.unite(name, parentNames); return this; }
@Test public void testWindowedSerializerNoArgConstructors() { Map<String, String> props = new HashMap<>(); // test key[value].serializer.inner.class takes precedence over serializer.inner.class WindowedSerializer<StringSerializer> windowedSerializer = new WindowedSerializer<>(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1"); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); props.put("key.serializer.inner.class", "org.apache.kafka.common.serialization.StringSerializer"); props.put("serializer.inner.class", "org.apache.kafka.common.serialization.StringSerializer"); windowedSerializer.configure(props, true); Serializer<?> inner = windowedSerializer.innerSerializer(); assertNotNull("Inner serializer should be not null", inner); assertTrue("Inner serializer type should be StringSerializer", inner instanceof StringSerializer); // test serializer.inner.class props.put("serializer.inner.class", "org.apache.kafka.common.serialization.ByteArraySerializer"); props.remove("key.serializer.inner.class"); props.remove("value.serializer.inner.class"); WindowedSerializer<?> windowedSerializer1 = new WindowedSerializer<>(); windowedSerializer1.configure(props, false); Serializer<?> inner1 = windowedSerializer1.innerSerializer(); assertNotNull("Inner serializer should be not null", inner1); assertTrue("Inner serializer type should be ByteArraySerializer", inner1 instanceof ByteArraySerializer); }
@Test public void shouldCreateLoggingEnabledStoreWhenWindowStoreLogged() throws Exception { store = createStore(true, false); final List<ProducerRecord> logged = new ArrayList<>(); final NoOpRecordCollector collector = new NoOpRecordCollector() { @Override public <K, V> void send(final String topic, K key, V value, Integer partition, Long timestamp, Serializer<K> keySerializer, Serializer<V> valueSerializer) { logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value)); } }; final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), collector, cache); context.setTime(1); store.init(context, store); store.put("a", "b"); assertFalse(logged.isEmpty()); }
@Test public void shouldNotBeLoggingEnabledStoreWhenLogginNotEnabled() throws Exception { store = createStore(false, false); final List<ProducerRecord> logged = new ArrayList<>(); final NoOpRecordCollector collector = new NoOpRecordCollector() { @Override public <K, V> void send(final String topic, K key, V value, Integer partition, Long timestamp, Serializer<K> keySerializer, Serializer<V> valueSerializer) { logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value)); } }; final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), collector, cache); context.setTime(1); store.init(context, store); store.put("a", "b"); assertTrue(logged.isEmpty()); }
@Before public void before() { final NoOpRecordCollector collector = new NoOpRecordCollector() { @Override public <K, V> void send(final String topic, K key, V value, Integer partition, Long timestamp, Serializer<K> keySerializer, Serializer<V> valueSerializer) { sent.put(key, value); } }; context = new MockProcessorContext( TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), collector, new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))); context.setTime(0); store.init(context, store); }
@Test public void shouldCreateLoggingEnabledStoreWhenStoreLogged() throws Exception { store = createStore(true, false); final List<ProducerRecord> logged = new ArrayList<>(); final NoOpRecordCollector collector = new NoOpRecordCollector() { @Override public <K, V> void send(final String topic, K key, V value, Integer partition, Long timestamp, Serializer<K> keySerializer, Serializer<V> valueSerializer) { logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value)); } }; final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), collector, cache); context.setTime(1); store.init(context, store); store.put("a", "b"); assertFalse(logged.isEmpty()); }
@Test public void shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled() throws Exception { store = createStore(false, false); final List<ProducerRecord> logged = new ArrayList<>(); final NoOpRecordCollector collector = new NoOpRecordCollector() { @Override public <K, V> void send(final String topic, K key, V value, Integer partition, Long timestamp, Serializer<K> keySerializer, Serializer<V> valueSerializer) { logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value)); } }; final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), collector, cache); context.setTime(1); store.init(context, store); store.put("a", "b"); assertTrue(logged.isEmpty()); }
@Test public void defaultSerdeShouldBeConfigured() { final Map<String, Object> serializerConfigs = new HashMap<>(); serializerConfigs.put("key.serializer.encoding", "UTF8"); serializerConfigs.put("value.serializer.encoding", "UTF-16"); final Serializer<String> serializer = Serdes.String().serializer(); final String str = "my string for testing"; final String topic = "my topic"; serializer.configure(serializerConfigs, true); assertEquals("Should get the original string after serialization and deserialization with the configured encoding", str, streamsConfig.defaultKeySerde().deserializer().deserialize(topic, serializer.serialize(topic, str))); serializer.configure(serializerConfigs, false); assertEquals("Should get the original string after serialization and deserialization with the configured encoding", str, streamsConfig.defaultValueSerde().deserializer().deserialize(topic, serializer.serialize(topic, str))); }
/** * Create a mock producer * * @param cluster The cluster holding metadata for this producer * @param autoComplete If true automatically complete all requests successfully and execute the callback. Otherwise * the user must call {@link #completeNext()} or {@link #errorNext(RuntimeException)} after * {@link #send(ProducerRecord) send()} to complete the call and unblock the @{link * java.util.concurrent.Future Future<RecordMetadata>} that is returned. * @param partitioner The partition strategy * @param keySerializer The serializer for key that implements {@link Serializer}. * @param valueSerializer The serializer for value that implements {@link Serializer}. */ public MockProducer(final Cluster cluster, final boolean autoComplete, final Partitioner partitioner, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) { this.cluster = cluster; this.autoComplete = autoComplete; this.partitioner = partitioner; this.keySerializer = ensureExtended(keySerializer); this.valueSerializer = ensureExtended(valueSerializer); this.offsets = new HashMap<>(); this.sent = new ArrayList<>(); this.uncommittedSends = new ArrayList<>(); this.consumerGroupOffsets = new ArrayList<>(); this.uncommittedConsumerGroupOffsets = new HashMap<>(); this.completions = new ArrayDeque<>(); }
@Test @SuppressWarnings("unchecked") public void testPropertyResolutionMissingFromEnvironment() throws Exception { HashMap<String, Object> expected = new HashMap<String, Object>() { { put("supplied-foo", "bar"); put("supplied-null", null); } }; JustInTimeConfiguredProducerFactory underTest = new JustInTimeConfiguredProducerFactory( expected, mock(Serializer.class), mock(Serializer.class)); underTest.setEnvironment(mock(Environment.class)); Map<String, Object> actual = underTest.getConfigurationProperties(); assertEquals(expected, actual); }
@Test public void testSingleMessageSegment() { // Create serializer/deserializers. Serializer<LargeMessageSegment> segmentSerializer = new DefaultSegmentSerializer(); Deserializer<LargeMessageSegment> segmentDeserializer = new DefaultSegmentDeserializer(); byte[] messageWrappedBytes = wrapMessageBytes(segmentSerializer, "message".getBytes()); MessageAssembler messageAssembler = new MessageAssemblerImpl(100, 100, true, segmentDeserializer); MessageAssembler.AssembleResult assembleResult = messageAssembler.assemble(new TopicPartition("topic", 0), 0, messageWrappedBytes); assertNotNull(assembleResult.messageBytes()); assertEquals(assembleResult.messageStartingOffset(), 0, "The message starting offset should be 0"); assertEquals(assembleResult.messageEndingOffset(), 0, "The message ending offset should be 0"); }
@Test public void testSerde() { Serializer<String> stringSerializer = new StringSerializer(); Deserializer<String> stringDeserializer = new StringDeserializer(); Serializer<LargeMessageSegment> segmentSerializer = new DefaultSegmentSerializer(); Deserializer<LargeMessageSegment> segmentDeserializer = new DefaultSegmentDeserializer(); String s = LiKafkaClientsTestUtils.getRandomString(100); assertEquals(s.length(), 100); byte[] stringBytes = stringSerializer.serialize("topic", s); assertEquals(stringBytes.length, 100); LargeMessageSegment segment = new LargeMessageSegment(LiKafkaClientsUtils.randomUUID(), 0, 2, stringBytes.length, ByteBuffer.wrap(stringBytes)); // String bytes + segment header byte[] serializedSegment = segmentSerializer.serialize("topic", segment); assertEquals(serializedSegment.length, 1 + stringBytes.length + LargeMessageSegment.SEGMENT_INFO_OVERHEAD + 4); LargeMessageSegment deserializedSegment = segmentDeserializer.deserialize("topic", serializedSegment); assertEquals(deserializedSegment.messageId, segment.messageId); assertEquals(deserializedSegment.messageSizeInBytes, segment.messageSizeInBytes); assertEquals(deserializedSegment.numberOfSegments, segment.numberOfSegments); assertEquals(deserializedSegment.sequenceNumber, segment.sequenceNumber); assertEquals(deserializedSegment.payload.limit(), 100); String deserializedString = stringDeserializer.deserialize("topic", deserializedSegment.payloadArray()); assertEquals(deserializedString.length(), s.length()); }
@Override public Serializer<T> serializer() { return new Serializer<T>() { @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public byte[] serialize(String topic, T data) { try { return mapper.writeValueAsBytes(data); } catch (Exception e) { throw new SerializationException("Error serializing JSON message", e); } } @Override public void close() { } }; }
@Override public Serializer<CounterMetric> serializer() { return new Serializer<CounterMetric>() { private Gson gson = new Gson(); @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public byte[] serialize(String topic, CounterMetric metric) { JsonObject jsonObject = new JsonObject(); jsonObject.add("type", new JsonPrimitive("METER")); jsonObject.add("name", new JsonPrimitive(metric.getName())); jsonObject.add("count", new JsonPrimitive(metric.getValue())); jsonObject.add("timestamp", new JsonPrimitive(metric.getTimestampInMillis())); String json = gson.toJson(jsonObject); return json.getBytes(StandardCharsets.UTF_8); } @Override public void close() { } }; }
/** * Topic topicName will be automatically created if it doesn't exist. * @param topicName * @param recordsToPublish * @param timestamp * @return * @throws InterruptedException * @throws TimeoutException * @throws ExecutionException */ public Map<String, RecordMetadata> produceData(String topicName, Map<String, GenericRow> recordsToPublish, Serializer<GenericRow> serializer, Long timestamp) throws InterruptedException, TimeoutException, ExecutionException { createTopic(topicName); Properties producerConfig = properties(); KafkaProducer<String, GenericRow> producer = new KafkaProducer<>(producerConfig, new StringSerializer(), serializer); Map<String, RecordMetadata> result = new HashMap<>(); for (Map.Entry<String, GenericRow> recordEntry : recordsToPublish.entrySet()) { String key = recordEntry.getKey(); Future<RecordMetadata> recordMetadataFuture = producer.send(buildRecord(topicName, timestamp, recordEntry, key)); result.put(key, recordMetadataFuture.get(TEST_RECORD_FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS)); } producer.close(); return result; }
@SuppressWarnings("unchecked") @Override public Producer<Integer, Long> apply(Map<String, Object> config) { // Make sure the config is correctly set up for serializers. Utils.newInstance( ((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) .asSubclass(Serializer.class) ).configure(config, true); Utils.newInstance( ((Class<?>) config.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) .asSubclass(Serializer.class) ).configure(config, false); // Returning same producer in each instance in a pipeline seems to work fine currently. // If DirectRunner creates multiple DoFn instances for sinks, we might need to handle // it appropriately. I.e. allow multiple producers for each producerKey and concatenate // all the messages written to each producer for verification after the pipeline finishes. return MOCK_PRODUCER_MAP.get(producerKey); }
private static void produce(Map<String, Instant> messages) { Properties producerProps = new Properties(); producerProps.putAll(EMBEDDED_KAFKA_CLUSTER.getProps()); producerProps.put("request.required.acks", 1); producerProps.put("bootstrap.servers", EMBEDDED_KAFKA_CLUSTER.getBrokerList()); Serializer<String> stringSerializer = new StringSerializer(); Serializer<Instant> instantSerializer = new InstantSerializer(); try (@SuppressWarnings("unchecked") KafkaProducer<String, Instant> kafkaProducer = new KafkaProducer(producerProps, stringSerializer, instantSerializer)) { for (Map.Entry<String, Instant> en : messages.entrySet()) { kafkaProducer.send(new ProducerRecord<>(TOPIC, en.getKey(), en.getValue())); } kafkaProducer.close(); } }
public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put(APPLICATION_ID_CONFIG, "my-stream-processing-application"); props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.serializer", JsonPOJOSerializer.class.getName()); props.put("value.deserializer", JsonPOJODeserializer.class.getName()); Map<String, Object> serdeProps = new HashMap<>(); serdeProps.put("JsonPOJOClass", Messung.class); final Serializer<Messung> serializer = new JsonPOJOSerializer<>(); serializer.configure(serdeProps, false); final Deserializer<Messung> deserializer = new JsonPOJODeserializer<>(); deserializer.configure(serdeProps, false); final Serde<Messung> serde = Serdes.serdeFrom(serializer, deserializer); StreamsConfig config = new StreamsConfig(props); KStreamBuilder builder = new KStreamBuilder(); builder.stream(Serdes.String(), serde, "produktion") .filter( (k,v) -> v.type.equals("Biogas")) .to(Serdes.String(), serde,"produktion2"); KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); }
static <K1, V1> String createReparitionedSource(AbstractStream<K1> stream, Serde<K1> keySerde, Serde<V1> valSerde, final String topicNamePrefix) { Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null; Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null; Deserializer<K1> keyDeserializer = keySerde != null ? keySerde.deserializer() : null; Deserializer<V1> valDeserializer = valSerde != null ? valSerde.deserializer() : null; String baseName = topicNamePrefix != null ? topicNamePrefix : stream.name; String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX; String sinkName = stream.topology.newName(SINK_NAME); String filterName = stream.topology.newName(FILTER_NAME); String sourceName = stream.topology.newName(SOURCE_NAME); stream.topology.addInternalTopic(repartitionTopic); stream.topology.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() { @Override public boolean test(final K1 key, final V1 value) { return key != null; } }, false), stream.name); stream.topology.addSink(sinkName, repartitionTopic, keySerializer, valSerializer, filterName); stream.topology.addSource(sourceName, keyDeserializer, valDeserializer, repartitionTopic); return sourceName; }
private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier, final String functionName, final StateStoreSupplier<KeyValueStore> storeSupplier) { String sinkName = topology.newName(KStreamImpl.SINK_NAME); String sourceName = topology.newName(KStreamImpl.SOURCE_NAME); String funcName = topology.newName(functionName); String topic = storeSupplier.name() + KStreamImpl.REPARTITION_TOPIC_SUFFIX; Serializer<? extends K> keySerializer = keySerde == null ? null : keySerde.serializer(); Deserializer<? extends K> keyDeserializer = keySerde == null ? null : keySerde.deserializer(); Serializer<? extends V> valueSerializer = valSerde == null ? null : valSerde.serializer(); Deserializer<? extends V> valueDeserializer = valSerde == null ? null : valSerde.deserializer(); ChangedSerializer<? extends V> changedValueSerializer = new ChangedSerializer<>(valueSerializer); ChangedDeserializer<? extends V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer); // send the aggregate key-value pairs to the intermediate topic for partitioning topology.addInternalTopic(topic); topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name); // read the intermediate topic topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic); // aggregate the values with the aggregator and local store topology.addProcessor(funcName, aggregateSupplier, sourceName); topology.addStateStore(storeSupplier, funcName); // return the KTable representation with the intermediate topic as the sources return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name(), isQueryable); }
public static <K> Bytes toBinary(final Windowed<K> sessionKey, final Serializer<K> serializer, final String topic) { final byte[] bytes = serializer.serialize(topic, sessionKey.key()); ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE); buf.put(bytes); buf.putLong(sessionKey.window().end()); buf.putLong(sessionKey.window().start()); return new Bytes(buf.array()); }
void logChange(final K key, final V value) { if (collector != null) { final Serializer<K> keySerializer = serialization.keySerializer(); final Serializer<V> valueSerializer = serialization.valueSerializer(); collector.send(this.topic, key, value, this.partition, context.timestamp(), keySerializer, valueSerializer); } }
@Override public <K, V> void send(final String topic, final K key, final V value, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) { }
@Override public <K, V> void send(final String topic, final K key, final V value, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner) {}
<K, V> void send(final String topic, final K key, final V value, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer);
<K, V> void send(final String topic, final K key, final V value, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<? super K, ? super V> partitioner);
public SinkNode(final String name, final String topic, final Serializer<K> keySerializer, final Serializer<V> valSerializer, final StreamPartitioner<? super K, ? super V> partitioner) { super(name); this.topic = topic; this.keySerializer = keySerializer; this.valSerializer = valSerializer; this.partitioner = partitioner; }
/** * Find the {@link StreamsMetadata}s for a given storeName and key. This method will use the * {@link DefaultStreamPartitioner} to locate the store. If a custom partitioner has been used * please use {@link StreamsMetadataState#getMetadataWithKey(String, Object, StreamPartitioner)} * * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore}, * this method provides a way of finding which {@link StreamsMetadata} it would exist on. * * @param storeName Name of the store * @param key Key to use * @param keySerializer Serializer for the key * @param <K> key type * @return The {@link StreamsMetadata} for the storeName and key or {@link StreamsMetadata#NOT_AVAILABLE} * if streams is (re-)initializing */ public synchronized <K> StreamsMetadata getMetadataWithKey(final String storeName, final K key, final Serializer<K> keySerializer) { Objects.requireNonNull(keySerializer, "keySerializer can't be null"); Objects.requireNonNull(storeName, "storeName can't be null"); Objects.requireNonNull(key, "key can't be null"); if (!isInitialized()) { return StreamsMetadata.NOT_AVAILABLE; } if (globalStores.contains(storeName)) { // global stores are on every node. if we dont' have the host info // for this host then just pick the first metadata if (thisHost == UNKNOWN_HOST) { return allMetadata.get(0); } return myMetadata; } final SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName); if (sourceTopicsInfo == null) { return null; } return getStreamsMetadataForKey(storeName, key, new DefaultStreamPartitioner<>(keySerializer, clusterMetadata, sourceTopicsInfo.topicWithMostPartitions), sourceTopicsInfo); }
private SinkNodeFactory(String name, String[] parents, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner) { super(name); this.parents = parents.clone(); this.topic = topic; this.keySerializer = keySerializer; this.valSerializer = valSerializer; this.partitioner = partitioner; }
@Override public <K, V> void send(final String topic, final K key, final V value, final Integer partition, final Long timestamp, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) { logged.add(new ProducerRecord<>(topic, partition, timestamp, key, value)); }
@Override public <K1, V1> void send(final String topic, K1 key, V1 value, Integer partition, Long timestamp, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { changeLog.add(new KeyValue<>( keySerializer.serialize(topic, key), valueSerializer.serialize(topic, value)) ); }