private MergedSortedCacheWindowStoreKeyValueIterator<String, String> createIterator( final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs, final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs ) { final DelegatingPeekingKeyValueIterator<Windowed<Bytes>, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(storeKvs)); final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = new DelegatingPeekingKeyValueIterator<>("cache", new KeyValueIteratorStub<>(cacheKvs)); return new MergedSortedCacheWindowStoreKeyValueIterator<>( cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.String(), Serdes.String()), WINDOW_SIZE, SINGLE_SEGMENT_CACHE_FUNCTION ); }
@Test public void defaultSerdeShouldBeConfigured() { final Map<String, Object> serializerConfigs = new HashMap<>(); serializerConfigs.put("key.serializer.encoding", "UTF8"); serializerConfigs.put("value.serializer.encoding", "UTF-16"); final Serializer<String> serializer = Serdes.String().serializer(); final String str = "my string for testing"; final String topic = "my topic"; serializer.configure(serializerConfigs, true); assertEquals("Should get the original string after serialization and deserialization with the configured encoding", str, streamsConfig.defaultKeySerde().deserializer().deserialize(topic, serializer.serialize(topic, str))); serializer.configure(serializerConfigs, false); assertEquals("Should get the original string after serialization and deserialization with the configured encoding", str, streamsConfig.defaultValueSerde().deserializer().deserialize(topic, serializer.serialize(topic, str))); }
public static void main(String[] args) throws Exception { Properties kafkaStreamProperties = new Properties(); kafkaStreamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-wordCount"); kafkaStreamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); kafkaStreamProperties.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); kafkaStreamProperties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); kafkaStreamProperties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); Serde<String> stringSerde = Serdes.String(); Serde<Long> longSerde = Serdes.Long(); KStreamBuilder streamTopology = new KStreamBuilder(); KStream<String, String> topicRecords = streamTopology.stream(stringSerde, stringSerde, "input"); KStream<String, Long> wordCounts = topicRecords .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) .map((key, word) -> new KeyValue<>(word, word)) .countByKey("Count") .toStream(); wordCounts.to(stringSerde, longSerde, "wordCount"); KafkaStreams streamManager = new KafkaStreams(streamTopology, kafkaStreamProperties); streamManager.start(); Runtime.getRuntime().addShutdownHook(new Thread(streamManager::close)); }
private void doTestKTable(final KStreamBuilder builder, final KTable<String, Integer> table2, final KTable<String, Integer> table3, final String topic1) { MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>(); MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>(); table2.toStream().process(proc2); table3.toStream().process(proc3); driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.Integer()); driver.process(topic1, "A", 1); driver.process(topic1, "B", 2); driver.process(topic1, "C", 3); driver.process(topic1, "D", 4); driver.flushState(); driver.process(topic1, "A", null); driver.process(topic1, "B", null); driver.flushState(); proc2.checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null"); proc3.checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null"); }
@SuppressWarnings("unchecked") @Test public void doNotChangeOutputOrder() throws EmptyOutputSizeException, NoTopologyException, EmptyInputException { List<Message<Integer, Integer>> input = of(1, 2, 3, 4, 5, 6, 7) .map(i -> new Message<>(i, i)) .collect(toList()); Serde<Integer> integerSerde = Serdes.Integer(); List<Message<Integer, Integer>> output = Mockafka .builder() .topology(builder -> builder.stream(integerSerde, integerSerde, "numbersTopic") .filter((key, value) -> value % 2 == 1) .to(integerSerde, integerSerde, "oddNumbersTopic") ) .input("numbersTopic", integerSerde, integerSerde, input.toArray(new Message[]{})) .output("oddNumbersTopic", integerSerde, integerSerde, 4); List<Message<Integer, Integer>> expected = Arrays.asList(new Message<>(1, 1), new Message<>(3, 3), new Message<>(5, 5), new Message<>(7, 7)); assertEquals(4, output.size()); assertEquals(expected, output); }
public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KStreamBuilder builder = new KStreamBuilder(); builder.stream("streams-file-input").to("streams-pipe-output"); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); // usually the stream application would be running forever, // in this example we just let it run for some time and stop since the input data is finite. Thread.sleep(5000L); streams.close(); }
@Test public void shouldGetInstanceWithKey() throws Exception { final TopicPartition tp4 = new TopicPartition("topic-three", 1); hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, tp4)); discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null)))); final StreamsMetadata expected = new StreamsMetadata(hostThree, Utils.mkSet(globalTable, "table-three"), Collections.singleton(topic3P0)); final StreamsMetadata actual = discovery.getMetadataWithKey("table-three", "the-key", Serdes.String().serializer()); assertEquals(expected, actual); }
@Override public synchronized WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) { // since this function may not access the underlying inner store, we need to validate // if store is open outside as well. validateStoreOpen(); final Bytes keyBytes = Bytes.wrap(serdes.rawKey(key)); final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(keyBytes, timeFrom, timeTo); final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(keyBytes, timeFrom)); final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRangeFixedSize(keyBytes, timeTo)); final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom, cacheKeyTo); final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyBytes, keyBytes, timeFrom, timeTo); final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator( cacheIterator, hasNextCondition, cacheFunction ); return new MergedSortedCacheWindowStoreIterator<>(filteredCacheIterator, underlyingIterator, new StateSerdes<>(serdes.topic(), Serdes.Long(), serdes.valueSerde())); }
@Test public void notFoundWithNoResult(TestContext context){ KafkaStreams streamMock = mock(KafkaStreams.class); ReadOnlyKeyValueStore<Object, Object> storeMock = mock(ReadOnlyKeyValueStore.class); KeyValueIterator<Object, Object> iteratorMock = mock(KeyValueIterator.class); when(streamMock.store(eq("store"), any(QueryableStoreType.class))).thenReturn(storeMock); SimpleKeyValueIterator iterator = new SimpleKeyValueIterator(); when(storeMock.all()).thenReturn(iterator); rule.vertx().deployVerticle(new AllKeyValuesQueryVerticle("host", streamMock), context.asyncAssertSuccess(deployment->{ StoreWideQuery query = new StoreWideQuery("store", Serdes.String().getClass().getName(), Serdes.String().getClass().getName()); rule.vertx().eventBus().send(Config.ALL_KEY_VALUE_QUERY_ADDRESS_PREFIX + "host", query, context.asyncAssertSuccess(reply ->{ context.assertTrue(reply.body() instanceof MultiValuedKeyValueQueryResponse); MultiValuedKeyValueQueryResponse response = (MultiValuedKeyValueQueryResponse) reply.body(); context.assertEquals(0, response.getResults().size()); context.assertTrue(iterator.closed); })); })); }
private Properties buildKafkaStreamsDefaults() { Properties properties = new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, String.format("%sanomaly-detector-app", HEROKU_KAFKA_PREFIX)); properties.put(StreamsConfig.CLIENT_ID_CONFIG, String.format("%sanomaly-detector-client", HEROKU_KAFKA_PREFIX)); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put( StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); properties.put( StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); properties.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); return properties; }
@Before public void before() throws InterruptedException { testNo++; String applicationId = "kstream-repartition-join-test-" + testNo; builder = new KStreamBuilder(); createTopics(); streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3); streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput); streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput); streamFour = builder.stream(Serdes.Integer(), Serdes.String(), streamFourInput); keyMapper = MockKeyValueMapper.SelectValueKeyValueMapper(); }
private ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws Exception { final KStream<Integer, Integer> map1 = streamOne.map(keyMapper); final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper()); final String outputTopic = "left-join-" + testNo; CLUSTER.createTopic(outputTopic); map1.leftJoin(map2, TOSTRING_JOINER, getJoinWindow(), Serdes.Integer(), Serdes.Integer(), Serdes.String()) .filterNot(new Predicate<Integer, String>() { @Override public boolean test(Integer key, String value) { // filter not left-only join results return value.substring(2).equals("null"); } }) .to(Serdes.Integer(), Serdes.String(), outputTopic); return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic); }
/** * Creates a typical word count topology */ private KafkaStreams createCountStream(final String inputTopic, final String outputTopic, final Properties streamsConfiguration) { final KStreamBuilder builder = new KStreamBuilder(); final Serde<String> stringSerde = Serdes.String(); final KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, inputTopic); final KGroupedStream<String, String> groupedByWord = textLines .flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override public Iterable<String> apply(final String value) { return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); } }) .groupBy(MockKeyValueMapper.<String, String>SelectValueMapper()); // Create a State Store for the all time word count groupedByWord.count("word-count-store-" + inputTopic).to(Serdes.String(), Serdes.Long(), outputTopic); // Create a Windowed State Store that contains the word count for every 1 minute groupedByWord.count(TimeWindows.of(WINDOW_SIZE), "windowed-word-count-store-" + inputTopic); return new KafkaStreams(builder, streamsConfiguration); }
@Test public void testQueryableJoin() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); final int[] expectedKeys = new int[]{0, 1, 2, 3}; final KTable<Integer, String> table1; final KTable<Integer, String> table2; final KTable<Integer, String> joined; final MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); table1 = builder.table(intSerde, stringSerde, topic1, storeName1); table2 = builder.table(intSerde, stringSerde, topic2, storeName2); joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Serdes.String(), "anyQueryableName"); joined.toStream().process(processor); doTestJoin(builder, expectedKeys, processor, joined); }
@SuppressWarnings("unchecked") private WindowStore<String, String> createStore(final boolean logged, final boolean cached) { return new RocksDBWindowStoreSupplier<>(STORE_NAME, 10, 3, false, Serdes.String(), Serdes.String(), 10, logged, Collections.<String, String>emptyMap(), cached).get(); }
/** * Start a consume consuming from both bad events topics, log each message and add each message * into a map keyed by topic name * A {@link CountDownLatch} is returned to allow the caller to wait for the expected number of messages */ private CountDownLatch startBadEventsConsumer(final Map<String, Object> consumerProps, final int expectedMsgCount, List<BadStatMessage> badMessages) { final CountDownLatch latch = new CountDownLatch(expectedMsgCount); ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.execute(() -> { Thread.currentThread().setName("bad-events-consumer-thread"); activeConsumerThreads.incrementAndGet(); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerProps, Serdes.String().deserializer(), Serdes.String().deserializer()); //Subscribe to all bad event topics kafkaConsumer.subscribe(BAD_TOPICS_MAP.values()); try { while (areConsumersEnabled.get()) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); if (records.count() > 0) { for (ConsumerRecord<String, String> record : records) { LOGGER.warn("Bad events Consumer - topic = {}, partition = {}, offset = {}, key = {}, value = {}", record.topic(), record.partition(), record.offset(), record.key(), record.value()); badMessages.add(new BadStatMessage(record.topic(), record.key(), record.value())); latch.countDown(); } if (latch.getCount() == 0) { break; } kafkaConsumer.commitSync(); } } } finally { kafkaConsumer.close(); activeConsumerThreads.decrementAndGet(); } }); return latch; }
@Test public void testCount() throws IOException { final KStreamBuilder builder = new KStreamBuilder(); final String input = "count-test-input"; final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>(); builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName") .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerde, stringSerde) .count("count") .toStream() .process(proc); testCountHelper(builder, input, proc); }
public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put(APPLICATION_ID_CONFIG, "my-stream-processing-application"); props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.serializer", JsonPOJOSerializer.class.getName()); props.put("value.deserializer", JsonPOJODeserializer.class.getName()); Map<String, Object> serdeProps = new HashMap<>(); serdeProps.put("JsonPOJOClass", Messung.class); final Serializer<Messung> serializer = new JsonPOJOSerializer<>(); serializer.configure(serdeProps, false); final Deserializer<Messung> deserializer = new JsonPOJODeserializer<>(); deserializer.configure(serdeProps, false); final Serde<Messung> serde = Serdes.serdeFrom(serializer, deserializer); StreamsConfig config = new StreamsConfig(props); KStreamBuilder builder = new KStreamBuilder(); builder.stream(Serdes.String(), serde, "produktion") .filter( (k,v) -> v.type.equals("Biogas")) .to(Serdes.String(), serde,"produktion2"); KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); }
public static Properties defaultConsumingProperties() { Properties properties = new Properties(); properties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); properties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); properties.put(StreamsConfig.CLIENT_ID_CONFIG, CLIENT_ID); properties.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, ConsumerTimestampExtractor.class); return properties; }
private List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Bytes, byte[]> iterator) { final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>(); while (iterator.hasNext()) { final KeyValue<Bytes, byte[]> next = iterator.next(); final KeyValue<Windowed<String>, Long> deserialized = KeyValue.pair(SessionKeySerde.from(next.key.get(), Serdes.String().deserializer(), "dummy"), Serdes.Long().deserializer().deserialize("", next.value)); results.add(deserialized); } return results; }
@Test public void successfulRangeQuery() throws Exception{ GenericBlockingKiqrClient client = new GenericBlockingRestKiqrClientImpl("localhost", port); Map<String, Long> result = client.getRangeKeyValues("kv", String.class, Long.class, Serdes.String(), Serdes.Long(), "key1", "key2"); assertThat(result.entrySet(),hasSize(2)); assertThat(result, hasEntry("key1", 3L)); assertThat(result, hasEntry("key2", 6L)); }
public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); builder.<String, String>stream("streams-plaintext-input") .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) .groupBy((key, value) -> value) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")) .toStream() .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); }
@Test public void successfulWindowQuery() throws Exception{ GenericBlockingKiqrClient client = new GenericBlockingRestKiqrClientImpl("localhost", port); Map<Long, Long> result = client.getWindow("window", String.class, "key1", Long.class, Serdes.String(), Serdes.Long(), 0L, 100001L); assertThat(result.entrySet(),hasSize(2)); assertThat(result, hasEntry(0L, 2L)); assertThat(result, hasEntry(100000L, 1L)); Map<Long, Long> resultKey2 = client.getWindow("window", String.class, "key2", Long.class, Serdes.String(), Serdes.Long(), 0L, 100001L); assertThat(resultKey2.entrySet(),hasSize(2)); assertThat(resultKey2, hasEntry(0L, 1L)); assertThat(resultKey2, hasEntry(100000L, 2L)); Map<Long, Long> resultKey3 = client.getWindow("window", String.class, "key3", Long.class, Serdes.String(), Serdes.Long(), 0L, 100001L); assertThat(resultKey3.entrySet(),hasSize(3)); assertThat(resultKey3, hasEntry(0L, 1L)); assertThat(resultKey3, hasEntry(50000L, 1L)); assertThat(resultKey3, hasEntry(100000L, 1L)); Map<Long, Long> resultKey4 = client.getWindow("window", String.class, "key4", Long.class, Serdes.String(), Serdes.Long(), 0L, 100001L); assertThat(resultKey4.entrySet(),hasSize(1)); assertThat(resultKey4, hasEntry(0L, 3L)); }
@Test public void successfulAllQuery() throws Exception{ GenericBlockingKiqrClient client = new GenericBlockingRestKiqrClientImpl("localhost", port); Map<String, Long> result = client.getAllKeyValues("kv", String.class, Long.class, Serdes.String(), Serdes.Long()); assertThat(result.entrySet(),hasSize(4)); assertThat(result, hasEntry("key1", 3L)); assertThat(result, hasEntry("key2", 6L)); assertThat(result, hasEntry("key3", 9L)); assertThat(result, hasEntry("key4", 12L)); }
public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); TopologyBuilder builder = new TopologyBuilder(); builder.addSource("Source", "streams-file-input"); builder.addProcessor("Process", new MyProcessorSupplier(), "Source"); builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process"); builder.addSink("Sink", "streams-wordcount-processor-output", "Process"); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); // usually the stream application would be running forever, // in this example we just let it run for some time and stop since the input data is finite. Thread.sleep(5000L); streams.close(); }
@Test(expected = InvalidTopicException.class) public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() throws Exception { groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() { @Override public String apply(final String aggKey, final String aggOne, final String aggTwo) { return null; } }, SessionWindows.with(10), Serdes.String(), INVALID_STORE_NAME); }
@SuppressWarnings("unchecked") @Before public void initializeStore() { final File stateDir = TestUtils.tempDirectory(); context = new MockProcessorContext(stateDir, Serdes.String(), Serdes.String(), new NoOpRecordCollector(), new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics()))) { @Override public <K, V> void forward(final K key, final V value) { results.add(KeyValue.pair(key, value)); } }; initStore(true); processor.init(context); }
@SuppressWarnings("unchecked") public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String queryableStoreName) { determineIsQueryable(queryableStoreName); return count(sessionWindows, storeFactory(keySerde, Serdes.Long(), getOrCreateName(queryableStoreName, AGGREGATE_NAME)) .sessionWindowed(sessionWindows.maintainMs()).build()); }
private void doTestKTable(final KStreamBuilder builder, final String topic1, final MockProcessorSupplier<String, Integer> proc2) { driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.String()); driver.process(topic1, "A", "1"); driver.process(topic1, "B", "2"); driver.process(topic1, "C", "3"); driver.process(topic1, "D", "4"); driver.flushState(); assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed); }
public KeyValueStore get() { if (!cached && !logged) { return new MeteredKeyValueStore<>( new RocksDBStore<>(name, keySerde, valueSerde), METRICS_SCOPE, time); } // when cached, logged, or both we use a bytes store as the inner most store final RocksDBStore<Bytes, byte[]> rocks = new RocksDBStore<>(name, Serdes.Bytes(), Serdes.ByteArray()); if (cached && logged) { return new CachingKeyValueStore<>( new MeteredKeyValueStore<>( new ChangeLoggingKeyValueBytesStore(rocks), METRICS_SCOPE, time), keySerde, valueSerde); } if (cached) { return new CachingKeyValueStore<>( new MeteredKeyValueStore<>(rocks, METRICS_SCOPE, time), keySerde, valueSerde); } else { // logged return new MeteredKeyValueStore<>( new ChangeLoggingKeyValueStore<>(rocks, keySerde, valueSerde), METRICS_SCOPE, time); } }
private void initStore(final boolean enableCaching) { final RocksDBSessionStoreSupplier<String, Long> supplier = new RocksDBSessionStoreSupplier<>(STORE_NAME, GAP_MS * 3, Serdes.String(), Serdes.Long(), false, Collections.<String, String>emptyMap(), enableCaching); sessionStore = (SessionStore<String, Long>) supplier.get(); sessionStore.init(context, sessionStore); }
@Before public void setUp() throws Exception { final String storeName = "store"; underlyingStore = new InMemoryKeyValueStore<>(storeName, Serdes.Bytes(), Serdes.ByteArray()); cacheFlushListener = new CacheFlushListenerStub<>(); store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), Serdes.String()); store.setFlushListener(cacheFlushListener); cache = new ThreadCache("testCache", maxCacheSizeBytes, new MockStreamsMetrics(new Metrics())); context = new MockProcessorContext(null, null, null, (RecordCollector) null, cache); topic = "topic"; context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic)); store.init(context, null); }
@Test(expected = NullPointerException.class) public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() throws Exception { groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() { @Override public String apply(final String aggKey, final String aggOne, final String aggTwo) { return null; } }, SessionWindows.with(10), Serdes.String(), "storeName"); }
@Test public void shouldReduce() throws Exception { produceMessages(mockTime.milliseconds()); groupedStream .reduce(reducer, "reduce-by-key") .to(Serdes.String(), Serdes.String(), outputTopic); startStreams(); produceMessages(mockTime.milliseconds()); final List<KeyValue<String, String>> results = receiveMessages( new StringDeserializer(), new StringDeserializer(), 10); Collections.sort(results, new Comparator<KeyValue<String, String>>() { @Override public int compare(final KeyValue<String, String> o1, final KeyValue<String, String> o2) { return KStreamAggregationIntegrationTest.compare(o1, o2); } }); assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"), KeyValue.pair("A", "A:A"), KeyValue.pair("B", "B"), KeyValue.pair("B", "B:B"), KeyValue.pair("C", "C"), KeyValue.pair("C", "C:C"), KeyValue.pair("D", "D"), KeyValue.pair("D", "D:D"), KeyValue.pair("E", "E"), KeyValue.pair("E", "E:E")))); }
@Test public void shouldCreateInMemoryStoreSupplierWithLoggedConfig() throws Exception { final StateStoreSupplier supplier = Stores.create("store") .withKeys(Serdes.String()) .withValues(Serdes.String()) .inMemory() .enableLogging(Collections.singletonMap("retention.ms", "1000")) .build(); final Map<String, String> config = supplier.logConfig(); assertTrue(supplier.loggingEnabled()); assertEquals("1000", config.get("retention.ms")); }
@Test public void shouldCountWithInternalStore() throws Exception { produceMessages(mockTime.milliseconds()); groupedStream.count() .to(Serdes.String(), Serdes.Long(), outputTopic); shouldCountHelper(); }
private ExpectedOutputOnTopic joinMappedRhsStream() throws Exception { final String output = "join-rhs-stream-mapped-" + testNo; CLUSTER.createTopic(output); streamTwo .join(streamOne.map(keyMapper), TOSTRING_JOINER, getJoinWindow(), Serdes.Integer(), Serdes.String(), Serdes.Integer()) .to(Serdes.Integer(), Serdes.String(), output); return new ExpectedOutputOnTopic(Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5"), output); }
private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws Exception { final KStream<Integer, Integer> map1 = streamOne.map(keyMapper); final KeyValueMapper<Integer, String, KeyValue<Integer, String>> kvMapper = MockKeyValueMapper.NoOpKeyValueMapper(); final KStream<Integer, String> map2 = streamTwo.map(kvMapper); final KStream<Integer, String> join = map1.join(map2, TOSTRING_JOINER, getJoinWindow(), Serdes.Integer(), Serdes.Integer(), Serdes.String()); final String topic = "map-join-join-" + testNo; CLUSTER.createTopic(topic); join.map(kvMapper) .join(streamFour.map(kvMapper), TOSTRING_JOINER, getJoinWindow(), Serdes.Integer(), Serdes.String(), Serdes.String()) .to(Serdes.Integer(), Serdes.String(), topic); return new ExpectedOutputOnTopic(Arrays.asList("1:A:A", "2:B:B", "3:C:C", "4:D:D", "5:E:E"), topic); }
private void doJoin(final KStream<Integer, Integer> lhs, final KStream<Integer, String> rhs, final String outputTopic) throws InterruptedException { CLUSTER.createTopic(outputTopic); lhs.join(rhs, TOSTRING_JOINER, getJoinWindow(), Serdes.Integer(), Serdes.Integer(), Serdes.String()) .to(Serdes.Integer(), Serdes.String(), outputTopic); }
@Test(expected = StreamsException.class) public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() throws Exception { final byte[] key = Serdes.Long().serializer().serialize("foo", 1L); final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList( new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, recordValue)); queue.addRawRecords(records); }