private KafkaProducer<StatEventKey, StatAggregate> buildProducer() { //Configure the producers Map<String, Object> producerProps = getProducerProps(); Serde<StatEventKey> statKeySerde = StatEventKeySerde.instance(); Serde<StatAggregate> statAggregateSerde = StatAggregateSerde.instance(); try { return new KafkaProducer<>( producerProps, statKeySerde.serializer(), statAggregateSerde.serializer()); } catch (Exception e) { try { String props = producerProps.entrySet().stream() .map(entry -> " " + entry.getKey() + "=" + entry.getValue()) .collect(Collectors.joining("\n")); LOGGER.error("Error initialising kafka producer with props:\n{}",props, e); } catch (Exception e1) { } LOGGER.error("Error initialising kafka producer, unable to dump property values ", e); throw e; } }
/** * Create a context for serialization using the specified serializers and deserializers which * <em>must</em> match the key and value types used as parameters for this object; the state changelog topic * is provided to bind this serde factory to, so that future calls for serialize / deserialize do not * need to provide the topic name any more. * * @param topic the topic name * @param keySerde the serde for keys; cannot be null * @param valueSerde the serde for values; cannot be null * @throws IllegalArgumentException if key or value serde is null */ @SuppressWarnings("unchecked") public StateSerdes(final String topic, final Serde<K> keySerde, final Serde<V> valueSerde) { if (topic == null) { throw new IllegalArgumentException("topic cannot be null"); } if (keySerde == null) { throw new IllegalArgumentException("key serde cannot be null"); } if (valueSerde == null) { throw new IllegalArgumentException("value serde cannot be null"); } this.topic = topic; this.keySerde = keySerde; this.valueSerde = valueSerde; }
@Override public <K, V> Map<K, V> getRangeKeyValues(String store, Class<K> keyClass, Class<V> valueClass, Serde<K> keySerde, Serde<V> valueSerde, K from, K to) { return execute(() -> getUriBuilder() .setPath(String.format("/api/v1/kv/%s", store)) .addParameter("keySerde", keySerde.getClass().getName()) .addParameter("valueSerde", valueSerde.getClass().getName()) .addParameter("from", Base64.getEncoder().encodeToString(keySerde.serializer().serialize("", from))) .addParameter("to", Base64.getEncoder().encodeToString(keySerde.serializer().serialize("", to))) .build(), bytes -> { MultiValuedKeyValueQueryResponse resp = mapper.readValue(bytes, MultiValuedKeyValueQueryResponse.class); return resp.getResults().entrySet().stream() .map(entry -> { return new Pair<K, V>(deserialize(keyClass, keySerde, entry.getKey()), deserialize(valueClass, valueSerde, entry.getValue())); }).collect(Collectors.toMap(Pair::getKey, pair -> pair.getValue())); }, () -> Collections.emptyMap()); }
private <K, V> KTable<K, V> doTable(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final TimestampExtractor timestampExtractor, final String topic, final StateStoreSupplier<KeyValueStore> storeSupplier, final boolean isQueryable) { final String source = newName(KStreamImpl.SOURCE_NAME); final String name = newName(KTableImpl.SOURCE_NAME); final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name()); addSource(offsetReset, source, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topic); addProcessor(name, processorSupplier, source); final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier, keySerde, valSerde, Collections.singleton(source), storeSupplier.name(), isQueryable); addStateStore(storeSupplier, name); connectSourceStoreAndTopic(storeSupplier.name(), topic); return kTable; }
@Override @SuppressWarnings("unchecked") public void init(ProcessorContext context, StateStore root) { inner.init(context, root); // construct the serde StateSerdes<K, V> serdes = new StateSerdes<>( ProcessorStateManager.storeChangelogTopic(context.applicationId(), inner.name()), keySerde == null ? (Serde<K>) context.keySerde() : keySerde, valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); this.changeLogger = new StoreChangeLogger<>(inner.name(), context, serdes); // if the inner store is an LRU cache, add the eviction listener to log removed record if (inner instanceof MemoryLRUCache) { ((MemoryLRUCache<K, V>) inner).whenEldestRemoved(new MemoryNavigableLRUCache.EldestEntryRemovalListener<K, V>() { @Override public void apply(K key, V value) { removed(key); } }); } }
@SuppressWarnings("unchecked") @Override public void to(final Serde<K> keySerde, final Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, final String topic) { Objects.requireNonNull(topic, "topic can't be null"); final String name = topology.newName(SINK_NAME); final Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer(); final Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer(); if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) { final WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer; partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(topic, windowedSerializer); } topology.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name); }
public KStreamTestDriver(final KStreamBuilder builder, final File stateDir, final Serde<?> keySerde, final Serde<?> valSerde, final long cacheSize) { builder.setApplicationId("TestDriver"); topology = builder.build(null); globalTopology = builder.buildGlobalStateTopology(); final ThreadCache cache = new ThreadCache("testCache", cacheSize, new MockStreamsMetrics(new Metrics())); context = new MockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache); context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic")); // init global topology first as it will add stores to the // store map that are required for joins etc. if (globalTopology != null) { initTopology(globalTopology, globalTopology.globalStateStores()); } initTopology(topology, topology.stateStores()); }
private <V1> KTable<K, V1> doMapValues(final ValueMapper<? super V, ? extends V1> mapper, final Serde<V1> valueSerde, final StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(mapper); String name = topology.newName(MAPVALUES_NAME); String internalStoreName = null; if (storeSupplier != null) { internalStoreName = storeSupplier.name(); } KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(this, mapper, internalStoreName); topology.addProcessor(name, processorSupplier, this.name); if (storeSupplier != null) { topology.addStateStore(storeSupplier, name); return new KTableImpl<>(topology, name, processorSupplier, this.keySerde, valueSerde, sourceNodes, internalStoreName, true); } else { return new KTableImpl<>(topology, name, processorSupplier, sourceNodes, this.queryableStoreName, false); } }
/** * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #VALUE_SERDE_CLASS_CONFIG value * Serde class}. This method is deprecated. Use {@link #defaultValueSerde()} instead. * * @return an configured instance of value Serde class */ @Deprecated public Serde valueSerde() { try { Serde<?> serde = getConfiguredInstance(VALUE_SERDE_CLASS_CONFIG, Serde.class); // the default value of deprecated value serde is null if (serde == null) { serde = defaultValueSerde(); } else { serde.configure(originals(), false); } return serde; } catch (final Exception e) { throw new StreamsException(String.format("Failed to configure value serde %s", get(VALUE_SERDE_CLASS_CONFIG)), e); } }
@SuppressWarnings("unchecked") @Override public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final String queryableStoreName) { determineIsQueryable(queryableStoreName); return aggregate(initializer, aggregator, sessionMerger, sessionWindows, aggValueSerde, storeFactory(keySerde, aggValueSerde, getOrCreateName(queryableStoreName, AGGREGATE_NAME)) .sessionWindowed(sessionWindows.maintainMs()).build()); }
/** * Creates a typical word count topology */ private KafkaStreams createCountStream(final String inputTopic, final String outputTopic, final Properties streamsConfiguration) { final KStreamBuilder builder = new KStreamBuilder(); final Serde<String> stringSerde = Serdes.String(); final KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, inputTopic); final KGroupedStream<String, String> groupedByWord = textLines .flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override public Iterable<String> apply(final String value) { return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); } }) .groupBy(MockKeyValueMapper.<String, String>SelectValueMapper()); // Create a State Store for the all time word count groupedByWord.count("word-count-store-" + inputTopic).to(Serdes.String(), Serdes.Long(), outputTopic); // Create a Windowed State Store that contains the word count for every 1 minute groupedByWord.count(TimeWindows.of(WINDOW_SIZE), "windowed-word-count-store-" + inputTopic); return new KafkaStreams(builder, streamsConfiguration); }
@SuppressWarnings("unchecked") private void initInternal(final InternalProcessorContext context) { this.context = context; keySchema.init(topic); serdes = new StateSerdes<>( topic, keySerde == null ? (Serde<K>) context.keySerde() : keySerde, aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde); cacheName = context.taskId() + "-" + bytesStore.name(); cache = context.getCache(); cache.addDirtyEntryFlushListener(cacheName, new ThreadCache.DirtyEntryFlushListener() { @Override public void apply(final List<ThreadCache.DirtyEntry> entries) { for (ThreadCache.DirtyEntry entry : entries) { putAndMaybeForward(entry, context); } } }); }
/** * Helper method for testing that a serde can serialize its object and deserialize it * back again. Throws a {@link RuntimeException } if the objects don't match. * @param serde * @param obj * @param <T> */ public static <T> void verify(final Serde<T> serde, final T obj) { final String dummyTopic = "xxx"; byte[] bytes = serde.serializer().serialize(dummyTopic, obj); LOGGER.trace(() -> String.format("object form: %s", obj)); LOGGER.trace(() -> String.format("byte form: %s", ByteArrayUtils.byteArrayToHex(bytes))); T deserializedObj = serde.deserializer().deserialize(dummyTopic, bytes); if (!obj.equals(deserializedObj)) { throw new RuntimeException(String.format("Original [%s] and de-serialized [%s] values don't match", obj, deserializedObj)); } }
protected Serde<Object> getSerde(String serde){ try { return (Serde<Object>) Class.forName(serde).newInstance(); } catch (InstantiationException | IllegalAccessException | ClassNotFoundException | ClassCastException e) { throw new SerdeNotFoundException(serde, e); } }
@Override @SuppressWarnings("unchecked") public void init(final ProcessorContext context, final StateStore root) { this.context = context; // construct the serde serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), bytesStore.name()), keySerde == null ? (Serde<K>) context.keySerde() : keySerde, valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde); bytesStore.init(context, root); }
@Test public void shouldNotAllowOffsetResetSourceWithDuplicateSourceName() { final TopologyBuilder builder = new TopologyBuilder(); final Serde<String> stringSerde = Serdes.String(); builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-1"); try { builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-2"); fail("Should throw TopologyBuilderException for duplicate source name"); } catch (TopologyBuilderException tpe) { //no-op } }
RocksDBWindowStore(final SegmentedBytesStore bytesStore, final Serde<K> keySerde, final Serde<V> valueSerde, final boolean retainDuplicates, final long windowSize) { super(bytesStore); this.keySerde = keySerde; this.valueSerde = valueSerde; this.bytesStore = bytesStore; this.retainDuplicates = retainDuplicates; this.windowSize = windowSize; }
private ChangeLoggingKeyValueStore(final ChangeLoggingKeyValueBytesStore bytesStore, final Serde keySerde, final Serde valueSerde) { super(bytesStore); this.innerBytes = bytesStore; this.keySerde = keySerde; this.valueSerde = valueSerde; }
public SpecificBlockingRestKiqrClientImpl(String host, int port, String store, Class<K> keyClass, Class<V> valueClass, Serde<K> keySerde, Serde<V> valueSerde) { this.keyClass = keyClass; this.valueClass = valueClass; this.keySerde = keySerde; this.valueSerde = valueSerde; this.genericClient = initGenericService(host, port); this.store = store; }
public InMemoryKeyValueStore(final String name, final Serde<K> keySerde, final Serde<V> valueSerde) { this.name = name; this.keySerde = keySerde; this.valueSerde = valueSerde; // TODO: when we have serde associated with class types, we can // improve this situation by passing the comparator here. this.map = new TreeMap<>(); }
/** * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #DEFAULT_VALUE_SERDE_CLASS_CONFIG value * Serde class}. * * @return an configured instance of value Serde class */ public Serde defaultValueSerde() { try { Serde<?> serde = getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class); serde.configure(originals(), false); return serde; } catch (final Exception e) { throw new StreamsException(String.format("Failed to configure value serde %s", get(DEFAULT_VALUE_SERDE_CLASS_CONFIG)), e); } }
static <K1, V1> String createReparitionedSource(AbstractStream<K1> stream, Serde<K1> keySerde, Serde<V1> valSerde, final String topicNamePrefix) { Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null; Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null; Deserializer<K1> keyDeserializer = keySerde != null ? keySerde.deserializer() : null; Deserializer<V1> valDeserializer = valSerde != null ? valSerde.deserializer() : null; String baseName = topicNamePrefix != null ? topicNamePrefix : stream.name; String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX; String sinkName = stream.topology.newName(SINK_NAME); String filterName = stream.topology.newName(FILTER_NAME); String sourceName = stream.topology.newName(SOURCE_NAME); stream.topology.addInternalTopic(repartitionTopic); stream.topology.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() { @Override public boolean test(final K1 key, final V1 value) { return key != null; } }, false), stream.name); stream.topology.addSink(sinkName, repartitionTopic, keySerializer, valSerializer, filterName); stream.topology.addSource(sourceName, keyDeserializer, valDeserializer, repartitionTopic); return sourceName; }
CachingSessionStore(final SessionStore<Bytes, byte[]> bytesStore, final Serde<K> keySerde, final Serde<AGG> aggSerde, final long segmentInterval) { super(bytesStore); this.bytesStore = bytesStore; this.keySerde = keySerde; this.aggSerde = aggSerde; this.keySchema = new SessionKeySchema(); this.cacheFunction = new SegmentedCacheFunction(keySchema, segmentInterval); }
@Override public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<? super K, ? super V, K1> selector, Serde<K1> keySerde, Serde<V> valSerde) { Objects.requireNonNull(selector, "selector can't be null"); String selectName = internalSelectKey(selector); return new KGroupedStreamImpl<>(topology, selectName, sourceNodes, keySerde, valSerde, true); }
@Override public KGroupedStream<K, V> groupByKey(Serde<K> keySerde, Serde<V> valSerde) { return new KGroupedStreamImpl<>(topology, this.name, sourceNodes, keySerde, valSerde, this.repartitionRequired); }
public KGroupedTableImpl(final KStreamBuilder topology, final String name, final String sourceName, final Serde<? extends K> keySerde, final Serde<? extends V> valSerde) { super(topology, name, Collections.singleton(sourceName)); this.keySerde = keySerde; this.valSerde = valSerde; this.isQueryable = true; }
RocksDBSessionStore(final SegmentedBytesStore bytesStore, final Serde<K> keySerde, final Serde<AGG> aggSerde) { super(bytesStore); this.keySerde = keySerde; this.bytesStore = bytesStore; this.aggSerde = aggSerde; }
@Override public <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper, final Serde<V1> valueSerde, final StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); return doMapValues(mapper, valueSerde, storeSupplier); }
AbstractStoreSupplier(final String name, final Serde<K> keySerde, final Serde<V> valueSerde, final Time time, final boolean logged, final Map<String, String> logConfig) { this.time = time; this.name = name; this.valueSerde = valueSerde; this.keySerde = keySerde; this.logged = logged; this.logConfig = logConfig; }
public MockProcessorContext(final File stateDir, final Serde<?> keySerde, final Serde<?> valSerde, final RecordCollector collector, final ThreadCache cache) { this(stateDir, keySerde, valSerde, new RecordCollector.Supplier() { @Override public RecordCollector recordCollector() { return collector; } }, cache); }
@Override public KTable<K, V> through(final Serde<K> keySerde, final Serde<V> valSerde, final StreamPartitioner<? super K, ? super V> partitioner, final String topic, final StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); to(keySerde, valSerde, partitioner, topic); return topology.table(keySerde, valSerde, topic, storeSupplier); }
@Override public KTable<K, V> through(final Serde<K> keySerde, final Serde<V> valSerde, final StreamPartitioner<? super K, ? super V> partitioner, final String topic) { return through(keySerde, valSerde, partitioner, topic, (String) null); }
@Override public KTable<K, V> through(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String queryableStoreName) { return through(keySerde, valSerde, null, topic, queryableStoreName); }
@Override public <V1, R> KTable<K, R> join(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, final Serde<R> joinSerde, final String queryableStoreName) { return doJoin(other, joiner, false, false, joinSerde, queryableStoreName); }
@Override public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, final Serde<R> joinSerde, final String queryableStoreName) { return doJoin(other, joiner, true, true, joinSerde, queryableStoreName); }
@Override public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, final Serde<R> joinSerde, final String queryableStoreName) { return doJoin(other, joiner, true, false, joinSerde, queryableStoreName); }
@SuppressWarnings("unchecked") private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner, final boolean leftOuter, final boolean rightOuter, final Serde<R> joinSerde, final String queryableStoreName) { Objects.requireNonNull(other, "other can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); final StateStoreSupplier storeSupplier = queryableStoreName == null ? null : keyValueStore(this.keySerde, joinSerde, queryableStoreName); return doJoin(other, joiner, leftOuter, rightOuter, storeSupplier); }
@Override @SuppressWarnings("unchecked") public void init(final ProcessorContext context, final StateStore root) { final String storeName = bytesStore.name(); topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName); serdes = new StateSerdes<>( topic, keySerde == null ? (Serde<K>) context.keySerde() : keySerde, aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde); bytesStore.init(context, root); }
@SuppressWarnings("unchecked") @Override public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Windows<W> windows, final Serde<T> aggValueSerde) { return aggregate(initializer, aggregator, windows, aggValueSerde, null); }
@SuppressWarnings("unchecked") static <T, K> StateStoreSupplier<KeyValueStore> keyValueStore(final Serde<K> keySerde, final Serde<T> aggValueSerde, final String storeName) { Objects.requireNonNull(storeName, "storeName can't be null"); Topic.validate(storeName); return storeFactory(keySerde, aggValueSerde, storeName).build(); }