Java 类org.apache.kafka.common.serialization.Serdes 实例源码

项目:kafka-0.11.0.0-src-with-comment    文件:MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java   
private MergedSortedCacheWindowStoreKeyValueIterator<String, String> createIterator(
    final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs,
    final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs
) {
    final DelegatingPeekingKeyValueIterator<Windowed<Bytes>, byte[]> storeIterator
        = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(storeKvs));

    final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator
        = new DelegatingPeekingKeyValueIterator<>("cache", new KeyValueIteratorStub<>(cacheKvs));
    return new MergedSortedCacheWindowStoreKeyValueIterator<>(
        cacheIterator,
        storeIterator,
        new StateSerdes<>("name", Serdes.String(), Serdes.String()),
        WINDOW_SIZE,
        SINGLE_SEGMENT_CACHE_FUNCTION
    );
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamsConfigTest.java   
@Test
public void defaultSerdeShouldBeConfigured() {
    final Map<String, Object> serializerConfigs = new HashMap<>();
    serializerConfigs.put("key.serializer.encoding", "UTF8");
    serializerConfigs.put("value.serializer.encoding", "UTF-16");
    final Serializer<String> serializer = Serdes.String().serializer();

    final String str = "my string for testing";
    final String topic = "my topic";

    serializer.configure(serializerConfigs, true);
    assertEquals("Should get the original string after serialization and deserialization with the configured encoding",
            str, streamsConfig.defaultKeySerde().deserializer().deserialize(topic, serializer.serialize(topic, str)));

    serializer.configure(serializerConfigs, false);
    assertEquals("Should get the original string after serialization and deserialization with the configured encoding",
            str, streamsConfig.defaultValueSerde().deserializer().deserialize(topic, serializer.serialize(topic, str)));
}
项目:Building-Data-Streaming-Applications-with-Apache-Kafka    文件:KafkaStreamWordCount.java   
public static void main(String[] args) throws Exception {
    Properties kafkaStreamProperties = new Properties();
    kafkaStreamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-wordCount");
    kafkaStreamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    kafkaStreamProperties.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
    kafkaStreamProperties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    kafkaStreamProperties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

    Serde<String> stringSerde = Serdes.String();
    Serde<Long> longSerde = Serdes.Long();

    KStreamBuilder streamTopology = new KStreamBuilder();
    KStream<String, String> topicRecords = streamTopology.stream(stringSerde, stringSerde, "input");
    KStream<String, Long> wordCounts = topicRecords
            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
            .map((key, word) -> new KeyValue<>(word, word))
            .countByKey("Count")
            .toStream();
    wordCounts.to(stringSerde, longSerde, "wordCount");

    KafkaStreams streamManager = new KafkaStreams(streamTopology, kafkaStreamProperties);
    streamManager.start();

    Runtime.getRuntime().addShutdownHook(new Thread(streamManager::close));
}
项目:kafka-0.11.0.0-src-with-comment    文件:KTableFilterTest.java   
private void doTestKTable(final KStreamBuilder builder, final KTable<String, Integer> table2,
                          final KTable<String, Integer> table3, final String topic1) {
    MockProcessorSupplier<String, Integer> proc2 = new MockProcessorSupplier<>();
    MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>();
    table2.toStream().process(proc2);
    table3.toStream().process(proc3);

    driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.Integer());

    driver.process(topic1, "A", 1);
    driver.process(topic1, "B", 2);
    driver.process(topic1, "C", 3);
    driver.process(topic1, "D", 4);
    driver.flushState();
    driver.process(topic1, "A", null);
    driver.process(topic1, "B", null);
    driver.flushState();

    proc2.checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
    proc3.checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null");
}
项目:mockafka    文件:MockafkaBuilderTest.java   
@SuppressWarnings("unchecked")
@Test
public void doNotChangeOutputOrder() throws EmptyOutputSizeException, NoTopologyException, EmptyInputException {
    List<Message<Integer, Integer>> input = of(1, 2, 3, 4, 5, 6, 7)
        .map(i -> new Message<>(i, i))
        .collect(toList());

    Serde<Integer> integerSerde = Serdes.Integer();

    List<Message<Integer, Integer>> output = Mockafka
        .builder()
        .topology(builder ->
            builder.stream(integerSerde, integerSerde, "numbersTopic")
                .filter((key, value) -> value % 2 == 1)
                .to(integerSerde, integerSerde, "oddNumbersTopic")
        )
        .input("numbersTopic", integerSerde, integerSerde, input.toArray(new Message[]{}))
        .output("oddNumbersTopic", integerSerde, integerSerde, 4);

    List<Message<Integer, Integer>> expected = Arrays.asList(new Message<>(1, 1), new Message<>(3, 3), new Message<>(5, 5), new Message<>(7, 7));
    assertEquals(4, output.size());
    assertEquals(expected, output);
}
项目:kafka-0.11.0.0-src-with-comment    文件:PipeDemo.java   
public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    KStreamBuilder builder = new KStreamBuilder();

    builder.stream("streams-file-input").to("streams-pipe-output");

    KafkaStreams streams = new KafkaStreams(builder, props);
    streams.start();

    // usually the stream application would be running forever,
    // in this example we just let it run for some time and stop since the input data is finite.
    Thread.sleep(5000L);

    streams.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamsMetadataStateTest.java   
@Test
public void shouldGetInstanceWithKey() throws Exception {
    final TopicPartition tp4 = new TopicPartition("topic-three", 1);
    hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, tp4));

    discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));

    final StreamsMetadata expected = new StreamsMetadata(hostThree, Utils.mkSet(globalTable, "table-three"),
            Collections.singleton(topic3P0));

    final StreamsMetadata actual = discovery.getMetadataWithKey("table-three",
                                                                "the-key",
                                                                Serdes.String().serializer());

    assertEquals(expected, actual);
}
项目:kafka-0.11.0.0-src-with-comment    文件:CachingWindowStore.java   
@Override
public synchronized WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
    // since this function may not access the underlying inner store, we need to validate
    // if store is open outside as well.
    validateStoreOpen();

    final Bytes keyBytes = Bytes.wrap(serdes.rawKey(key));
    final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(keyBytes, timeFrom, timeTo);

    final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(keyBytes, timeFrom));
    final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRangeFixedSize(keyBytes, timeTo));
    final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom, cacheKeyTo);

    final HasNextCondition hasNextCondition = keySchema.hasNextCondition(keyBytes,
                                                                         keyBytes,
                                                                         timeFrom,
                                                                         timeTo);
    final PeekingKeyValueIterator<Bytes, LRUCacheEntry> filteredCacheIterator = new FilteredCacheIterator(
        cacheIterator, hasNextCondition, cacheFunction
    );

    return new MergedSortedCacheWindowStoreIterator<>(filteredCacheIterator,
                                                      underlyingIterator,
                                                      new StateSerdes<>(serdes.topic(), Serdes.Long(), serdes.valueSerde()));
}
项目:kiqr    文件:AllKeyValuesQueryVerticleTest.java   
@Test
public void notFoundWithNoResult(TestContext context){
    KafkaStreams streamMock = mock(KafkaStreams.class);
    ReadOnlyKeyValueStore<Object, Object> storeMock = mock(ReadOnlyKeyValueStore.class);
    KeyValueIterator<Object, Object> iteratorMock = mock(KeyValueIterator.class);
    when(streamMock.store(eq("store"), any(QueryableStoreType.class))).thenReturn(storeMock);
    SimpleKeyValueIterator iterator = new SimpleKeyValueIterator();
    when(storeMock.all()).thenReturn(iterator);


    rule.vertx().deployVerticle(new AllKeyValuesQueryVerticle("host", streamMock), context.asyncAssertSuccess(deployment->{

        StoreWideQuery query = new StoreWideQuery("store", Serdes.String().getClass().getName(), Serdes.String().getClass().getName());

        rule.vertx().eventBus().send(Config.ALL_KEY_VALUE_QUERY_ADDRESS_PREFIX + "host", query, context.asyncAssertSuccess(reply ->{

            context.assertTrue(reply.body() instanceof MultiValuedKeyValueQueryResponse);
            MultiValuedKeyValueQueryResponse response = (MultiValuedKeyValueQueryResponse) reply.body();
            context.assertEquals(0, response.getResults().size());
            context.assertTrue(iterator.closed);

        }));

    }));

}
项目:kafka-streams-on-heroku    文件:AnomalyDetectorConfig.java   
private Properties buildKafkaStreamsDefaults() {
  Properties properties = new Properties();
  properties.put(StreamsConfig.APPLICATION_ID_CONFIG,
      String.format("%sanomaly-detector-app", HEROKU_KAFKA_PREFIX));
  properties.put(StreamsConfig.CLIENT_ID_CONFIG,
      String.format("%sanomaly-detector-client", HEROKU_KAFKA_PREFIX));
  properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  properties.put(
      StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
      Serdes.String().getClass().getName());
  properties.put(
      StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
      Serdes.String().getClass().getName());
  properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
  properties.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
      WallclockTimestampExtractor.class);

  return properties;
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamRepartitionJoinTest.java   
@Before
public void before() throws InterruptedException {
    testNo++;
    String applicationId = "kstream-repartition-join-test-" + testNo;
    builder = new KStreamBuilder();
    createTopics();
    streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
    streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
    streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);

    streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput);
    streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput);
    streamFour = builder.stream(Serdes.Integer(), Serdes.String(), streamFourInput);

    keyMapper = MockKeyValueMapper.SelectValueKeyValueMapper();
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamRepartitionJoinTest.java   
private ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws Exception {
    final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);

    final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper());


    final String outputTopic = "left-join-" + testNo;
    CLUSTER.createTopic(outputTopic);
    map1.leftJoin(map2,
        TOSTRING_JOINER,
        getJoinWindow(),
        Serdes.Integer(),
        Serdes.Integer(),
        Serdes.String())
        .filterNot(new Predicate<Integer, String>() {
            @Override
            public boolean test(Integer key, String value) {
                // filter not left-only join results
                return value.substring(2).equals("null");
            }
        })
        .to(Serdes.Integer(), Serdes.String(), outputTopic);

    return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
}
项目:kafka-0.11.0.0-src-with-comment    文件:QueryableStateIntegrationTest.java   
/**
 * Creates a typical word count topology
 */
private KafkaStreams createCountStream(final String inputTopic, final String outputTopic, final Properties streamsConfiguration) {
    final KStreamBuilder builder = new KStreamBuilder();
    final Serde<String> stringSerde = Serdes.String();
    final KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, inputTopic);

    final KGroupedStream<String, String> groupedByWord = textLines
        .flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(final String value) {
                return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
            }
        })
        .groupBy(MockKeyValueMapper.<String, String>SelectValueMapper());

    // Create a State Store for the all time word count
    groupedByWord.count("word-count-store-" + inputTopic).to(Serdes.String(), Serdes.Long(), outputTopic);

    // Create a Windowed State Store that contains the word count for every 1 minute
    groupedByWord.count(TimeWindows.of(WINDOW_SIZE), "windowed-word-count-store-" + inputTopic);

    return new KafkaStreams(builder, streamsConfiguration);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KTableKTableJoinTest.java   
@Test
public void testQueryableJoin() throws Exception {
    final KStreamBuilder builder = new KStreamBuilder();

    final int[] expectedKeys = new int[]{0, 1, 2, 3};

    final KTable<Integer, String> table1;
    final KTable<Integer, String> table2;
    final KTable<Integer, String> joined;
    final MockProcessorSupplier<Integer, String> processor;

    processor = new MockProcessorSupplier<>();
    table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
    table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
    joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Serdes.String(), "anyQueryableName");
    joined.toStream().process(processor);

    doTestJoin(builder, expectedKeys, processor, joined);
}
项目:kafka-0.11.0.0-src-with-comment    文件:RocksDBWindowStoreSupplierTest.java   
@SuppressWarnings("unchecked")
private WindowStore<String, String> createStore(final boolean logged, final boolean cached) {
    return new RocksDBWindowStoreSupplier<>(STORE_NAME,
                                            10,
                                            3,
                                            false,
                                            Serdes.String(),
                                            Serdes.String(),
                                            10,
                                            logged,
                                            Collections.<String, String>emptyMap(),
                                            cached).get();
}
项目:stroom-stats    文件:StatisticsFlatMappingServiceIT.java   
/**
 * Start a consume consuming from both bad events topics, log each message and add each message
 * into a map keyed by topic name
 * A {@link CountDownLatch} is returned to allow the caller to wait for the expected number of messages
 */
private CountDownLatch startBadEventsConsumer(final Map<String, Object> consumerProps, final int expectedMsgCount,
                                              List<BadStatMessage> badMessages) {

    final CountDownLatch latch = new CountDownLatch(expectedMsgCount);
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    executorService.execute(() -> {
        Thread.currentThread().setName("bad-events-consumer-thread");
        activeConsumerThreads.incrementAndGet();
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerProps,
                Serdes.String().deserializer(),
                Serdes.String().deserializer());

        //Subscribe to all bad event topics
        kafkaConsumer.subscribe(BAD_TOPICS_MAP.values());

        try {
            while (areConsumersEnabled.get()) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                if (records.count() > 0) {
                    for (ConsumerRecord<String, String> record : records) {
                        LOGGER.warn("Bad events Consumer - topic = {}, partition = {}, offset = {}, key = {}, value = {}",
                                record.topic(), record.partition(), record.offset(), record.key(), record.value());
                        badMessages.add(new BadStatMessage(record.topic(), record.key(), record.value()));
                        latch.countDown();
                    }
                    if (latch.getCount() == 0) {
                        break;
                    }
                    kafkaConsumer.commitSync();
                }
            }
        } finally {
            kafkaConsumer.close();
            activeConsumerThreads.decrementAndGet();
        }
    });
    return latch;
}
项目:kafka-0.11.0.0-src-with-comment    文件:KTableAggregateTest.java   
@Test
public void testCount() throws IOException {
    final KStreamBuilder builder = new KStreamBuilder();
    final String input = "count-test-input";
    final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();

    builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
            .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerde, stringSerde)
            .count("count")
            .toStream()
            .process(proc);

    testCountHelper(builder, input, proc);
}
项目:apache-kafka-demos    文件:FilterStream.java   
public static void main(String[] args) throws InterruptedException {

        Properties props = new Properties();
        props.put(APPLICATION_ID_CONFIG, "my-stream-processing-application");
        props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.serializer", JsonPOJOSerializer.class.getName());
        props.put("value.deserializer", JsonPOJODeserializer.class.getName());

        Map<String, Object> serdeProps = new HashMap<>();
        serdeProps.put("JsonPOJOClass", Messung.class);

        final Serializer<Messung> serializer = new JsonPOJOSerializer<>();
        serializer.configure(serdeProps, false);

        final Deserializer<Messung> deserializer = new JsonPOJODeserializer<>();
        deserializer.configure(serdeProps, false);

        final Serde<Messung> serde = Serdes.serdeFrom(serializer, deserializer);

        StreamsConfig config = new StreamsConfig(props);

        KStreamBuilder builder = new KStreamBuilder();

        builder.stream(Serdes.String(), serde, "produktion")
                .filter( (k,v) -> v.type.equals("Biogas"))
                .to(Serdes.String(), serde,"produktion2");

        KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();
    }
项目:streaming-engines-benchmark    文件:Consumer.java   
public static Properties defaultConsumingProperties() {
    Properties properties = new Properties();
    properties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    properties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
    properties.put(StreamsConfig.CLIENT_ID_CONFIG, CLIENT_ID);
    properties.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, ConsumerTimestampExtractor.class);

    return properties;
}
项目:kafka-0.11.0.0-src-with-comment    文件:RocksDBSegmentedBytesStoreTest.java   
private List<KeyValue<Windowed<String>, Long>> toList(final KeyValueIterator<Bytes, byte[]> iterator) {
    final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
    while (iterator.hasNext()) {
        final KeyValue<Bytes, byte[]> next = iterator.next();
        final KeyValue<Windowed<String>, Long> deserialized
                = KeyValue.pair(SessionKeySerde.from(next.key.get(), Serdes.String().deserializer(), "dummy"), Serdes.Long().deserializer().deserialize("", next.value));
        results.add(deserialized);
    }
    return results;
}
项目:kiqr    文件:GenericClientDistributedIntegrationITCase.java   
@Test
public void successfulRangeQuery() throws Exception{

    GenericBlockingKiqrClient client = new GenericBlockingRestKiqrClientImpl("localhost", port);

    Map<String, Long> result = client.getRangeKeyValues("kv", String.class, Long.class, Serdes.String(), Serdes.Long(), "key1", "key2");
    assertThat(result.entrySet(),hasSize(2));
    assertThat(result, hasEntry("key1", 3L));
    assertThat(result, hasEntry("key2", 6L));

}
项目:datastreaming-presentation    文件:WordCount.java   
public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    final StreamsBuilder builder = new StreamsBuilder();

    builder.<String, String>stream("streams-plaintext-input")
           .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
           .groupBy((key, value) -> value)
           .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
           .toStream()
           .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

    final Topology topology = builder.build();
    final KafkaStreams streams = new KafkaStreams(topology, props);
    final CountDownLatch latch = new CountDownLatch(1);

    // attach shutdown handler to catch control-c
    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
        @Override
        public void run() {
            streams.close();
            latch.countDown();
        }
    });

    try {
        streams.start();
        latch.await();
    } catch (Throwable e) {
        System.exit(1);
    }
    System.exit(0);
}
项目:kiqr    文件:GenericClientDistributedIntegrationITCase.java   
@Test
public void successfulWindowQuery() throws Exception{

    GenericBlockingKiqrClient client = new GenericBlockingRestKiqrClientImpl("localhost", port);

    Map<Long, Long> result = client.getWindow("window", String.class, "key1", Long.class, Serdes.String(), Serdes.Long(), 0L, 100001L);
    assertThat(result.entrySet(),hasSize(2));
    assertThat(result, hasEntry(0L, 2L));
    assertThat(result, hasEntry(100000L, 1L));

    Map<Long, Long> resultKey2 = client.getWindow("window", String.class, "key2", Long.class, Serdes.String(), Serdes.Long(), 0L, 100001L);
    assertThat(resultKey2.entrySet(),hasSize(2));
    assertThat(resultKey2, hasEntry(0L, 1L));
    assertThat(resultKey2, hasEntry(100000L, 2L));

    Map<Long, Long> resultKey3 = client.getWindow("window", String.class, "key3", Long.class, Serdes.String(), Serdes.Long(), 0L, 100001L);
    assertThat(resultKey3.entrySet(),hasSize(3));
    assertThat(resultKey3, hasEntry(0L, 1L));
    assertThat(resultKey3, hasEntry(50000L, 1L));
    assertThat(resultKey3, hasEntry(100000L, 1L));

    Map<Long, Long> resultKey4 = client.getWindow("window", String.class, "key4", Long.class, Serdes.String(), Serdes.Long(), 0L, 100001L);
    assertThat(resultKey4.entrySet(),hasSize(1));
    assertThat(resultKey4, hasEntry(0L, 3L));


}
项目:kiqr    文件:GenericClientDistributedIntegrationITCase.java   
@Test
public void successfulAllQuery() throws Exception{

    GenericBlockingKiqrClient client = new GenericBlockingRestKiqrClientImpl("localhost", port);

    Map<String, Long> result = client.getAllKeyValues("kv", String.class, Long.class, Serdes.String(), Serdes.Long());
    assertThat(result.entrySet(),hasSize(4));
    assertThat(result, hasEntry("key1", 3L));
    assertThat(result, hasEntry("key2", 6L));
    assertThat(result, hasEntry("key3", 9L));
    assertThat(result, hasEntry("key4", 12L));

}
项目:kafka-0.11.0.0-src-with-comment    文件:WordCountProcessorDemo.java   
public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    TopologyBuilder builder = new TopologyBuilder();

    builder.addSource("Source", "streams-file-input");

    builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
    builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process");

    builder.addSink("Sink", "streams-wordcount-processor-output", "Process");

    KafkaStreams streams = new KafkaStreams(builder, props);
    streams.start();

    // usually the stream application would be running forever,
    // in this example we just let it run for some time and stop since the input data is finite.
    Thread.sleep(5000L);

    streams.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:KGroupedStreamImplTest.java   
@Test(expected = InvalidTopicException.class)
public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() throws Exception {
    groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
        @Override
        public String apply(final String aggKey, final String aggOne, final String aggTwo) {
            return null;
        }
    }, SessionWindows.with(10), Serdes.String(), INVALID_STORE_NAME);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamSessionWindowAggregateProcessorTest.java   
@SuppressWarnings("unchecked")
@Before
public void initializeStore() {
    final File stateDir = TestUtils.tempDirectory();
    context = new MockProcessorContext(stateDir,
        Serdes.String(), Serdes.String(), new NoOpRecordCollector(), new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics()))) {
        @Override
        public <K, V> void forward(final K key, final V value) {
            results.add(KeyValue.pair(key, value));
        }
    };

    initStore(true);
    processor.init(context);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KGroupedStreamImpl.java   
@SuppressWarnings("unchecked")
public KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String queryableStoreName) {
    determineIsQueryable(queryableStoreName);
    return count(sessionWindows,
                 storeFactory(keySerde, Serdes.Long(), getOrCreateName(queryableStoreName, AGGREGATE_NAME))
                         .sessionWindowed(sessionWindows.maintainMs()).build());
}
项目:kafka-0.11.0.0-src-with-comment    文件:KTableMapValuesTest.java   
private void doTestKTable(final KStreamBuilder builder, final String topic1, final MockProcessorSupplier<String, Integer> proc2) {
    driver = new KStreamTestDriver(builder, stateDir, Serdes.String(), Serdes.String());

    driver.process(topic1, "A", "1");
    driver.process(topic1, "B", "2");
    driver.process(topic1, "C", "3");
    driver.process(topic1, "D", "4");
    driver.flushState();
    assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed);
}
项目:kafka-0.11.0.0-src-with-comment    文件:RocksDBKeyValueStoreSupplier.java   
public KeyValueStore get() {
    if (!cached && !logged) {
        return new MeteredKeyValueStore<>(
                new RocksDBStore<>(name, keySerde, valueSerde), METRICS_SCOPE, time);
    }

    // when cached, logged, or both we use a bytes store as the inner most store
    final RocksDBStore<Bytes, byte[]> rocks = new RocksDBStore<>(name,
                                                                 Serdes.Bytes(),
                                                                 Serdes.ByteArray());

    if (cached && logged) {
        return new CachingKeyValueStore<>(
                new MeteredKeyValueStore<>(
                        new ChangeLoggingKeyValueBytesStore(rocks),
                        METRICS_SCOPE,
                        time),
                keySerde,
                valueSerde);
    }

    if (cached) {
        return new CachingKeyValueStore<>(
                new MeteredKeyValueStore<>(rocks, METRICS_SCOPE, time),
                keySerde,
                valueSerde);

    } else {
        // logged
        return new MeteredKeyValueStore<>(
                new ChangeLoggingKeyValueStore<>(rocks, keySerde, valueSerde),
                METRICS_SCOPE,
                time);
    }

}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamSessionWindowAggregateProcessorTest.java   
private void initStore(final boolean enableCaching) {
    final RocksDBSessionStoreSupplier<String, Long> supplier =
            new RocksDBSessionStoreSupplier<>(STORE_NAME,
                                              GAP_MS * 3,
                                              Serdes.String(),
                                              Serdes.Long(),
                                              false,
                                              Collections.<String, String>emptyMap(),
                                              enableCaching);
    sessionStore = (SessionStore<String, Long>) supplier.get();
    sessionStore.init(context, sessionStore);
}
项目:kafka-0.11.0.0-src-with-comment    文件:CachingKeyValueStoreTest.java   
@Before
public void setUp() throws Exception {
    final String storeName = "store";
    underlyingStore = new InMemoryKeyValueStore<>(storeName, Serdes.Bytes(), Serdes.ByteArray());
    cacheFlushListener = new CacheFlushListenerStub<>();
    store = new CachingKeyValueStore<>(underlyingStore, Serdes.String(), Serdes.String());
    store.setFlushListener(cacheFlushListener);
    cache = new ThreadCache("testCache", maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
    context = new MockProcessorContext(null, null, null, (RecordCollector) null, cache);
    topic = "topic";
    context.setRecordContext(new ProcessorRecordContext(10, 0, 0, topic));
    store.init(context, null);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KGroupedStreamImplTest.java   
@Test(expected = NullPointerException.class)
public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() throws Exception {
    groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
        @Override
        public String apply(final String aggKey, final String aggOne, final String aggTwo) {
            return null;
        }
    }, SessionWindows.with(10), Serdes.String(), "storeName");
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamAggregationIntegrationTest.java   
@Test
public void shouldReduce() throws Exception {
    produceMessages(mockTime.milliseconds());
    groupedStream
        .reduce(reducer, "reduce-by-key")
        .to(Serdes.String(), Serdes.String(), outputTopic);

    startStreams();

    produceMessages(mockTime.milliseconds());

    final List<KeyValue<String, String>> results = receiveMessages(
        new StringDeserializer(),
        new StringDeserializer(),
        10);

    Collections.sort(results, new Comparator<KeyValue<String, String>>() {
        @Override
        public int compare(final KeyValue<String, String> o1, final KeyValue<String, String> o2) {
            return KStreamAggregationIntegrationTest.compare(o1, o2);
        }
    });

    assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"),
        KeyValue.pair("A", "A:A"),
        KeyValue.pair("B", "B"),
        KeyValue.pair("B", "B:B"),
        KeyValue.pair("C", "C"),
        KeyValue.pair("C", "C:C"),
        KeyValue.pair("D", "D"),
        KeyValue.pair("D", "D:D"),
        KeyValue.pair("E", "E"),
        KeyValue.pair("E", "E:E"))));
}
项目:kafka-0.11.0.0-src-with-comment    文件:StoresTest.java   
@Test
public void shouldCreateInMemoryStoreSupplierWithLoggedConfig() throws Exception {
    final StateStoreSupplier supplier = Stores.create("store")
            .withKeys(Serdes.String())
            .withValues(Serdes.String())
            .inMemory()
            .enableLogging(Collections.singletonMap("retention.ms", "1000"))
            .build();

    final Map<String, String> config = supplier.logConfig();
    assertTrue(supplier.loggingEnabled());
    assertEquals("1000", config.get("retention.ms"));
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamAggregationIntegrationTest.java   
@Test
public void shouldCountWithInternalStore() throws Exception {
    produceMessages(mockTime.milliseconds());

    groupedStream.count()
        .to(Serdes.String(), Serdes.Long(), outputTopic);

    shouldCountHelper();
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamRepartitionJoinTest.java   
private ExpectedOutputOnTopic joinMappedRhsStream() throws Exception {

        final String output = "join-rhs-stream-mapped-" + testNo;
        CLUSTER.createTopic(output);
        streamTwo
            .join(streamOne.map(keyMapper),
                TOSTRING_JOINER,
                getJoinWindow(),
                Serdes.Integer(),
                Serdes.String(),
                Serdes.Integer())
            .to(Serdes.Integer(), Serdes.String(), output);

        return new ExpectedOutputOnTopic(Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5"), output);
    }
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamRepartitionJoinTest.java   
private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws Exception {
    final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);

    final KeyValueMapper<Integer, String, KeyValue<Integer, String>>
        kvMapper = MockKeyValueMapper.NoOpKeyValueMapper();

    final KStream<Integer, String> map2 = streamTwo.map(kvMapper);

    final KStream<Integer, String> join = map1.join(map2,
        TOSTRING_JOINER,
        getJoinWindow(),
        Serdes.Integer(),
        Serdes.Integer(),
        Serdes.String());

    final String topic = "map-join-join-" + testNo;
    CLUSTER.createTopic(topic);
    join.map(kvMapper)
        .join(streamFour.map(kvMapper),
            TOSTRING_JOINER,
            getJoinWindow(),
            Serdes.Integer(),
            Serdes.String(),
            Serdes.String())
        .to(Serdes.Integer(), Serdes.String(), topic);


    return new ExpectedOutputOnTopic(Arrays.asList("1:A:A", "2:B:B", "3:C:C", "4:D:D", "5:E:E"), topic);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamRepartitionJoinTest.java   
private void doJoin(final KStream<Integer, Integer> lhs,
                    final KStream<Integer, String> rhs,
                    final String outputTopic) throws InterruptedException {
    CLUSTER.createTopic(outputTopic);
    lhs.join(rhs,
        TOSTRING_JOINER,
        getJoinWindow(),
        Serdes.Integer(),
        Serdes.Integer(),
        Serdes.String())
        .to(Serdes.Integer(), Serdes.String(), outputTopic);
}
项目:kafka-0.11.0.0-src-with-comment    文件:RecordQueueTest.java   
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() throws Exception {
    final byte[] key = Serdes.Long().serializer().serialize("foo", 1L);
    final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, recordValue));

    queue.addRawRecords(records);
}