Java 类org.apache.kafka.common.serialization.Serde 实例源码

项目:stroom-stats    文件:StatisticsAggregationService.java   
private KafkaProducer<StatEventKey, StatAggregate> buildProducer() {

        //Configure the producers
        Map<String, Object> producerProps = getProducerProps();

        Serde<StatEventKey> statKeySerde = StatEventKeySerde.instance();
        Serde<StatAggregate> statAggregateSerde = StatAggregateSerde.instance();

        try {
            return new KafkaProducer<>(
                    producerProps,
                    statKeySerde.serializer(),
                    statAggregateSerde.serializer());
        } catch (Exception e) {
            try {
                String props = producerProps.entrySet().stream()
                        .map(entry -> "  " + entry.getKey() + "=" + entry.getValue())
                        .collect(Collectors.joining("\n"));
                LOGGER.error("Error initialising kafka producer with props:\n{}",props, e);
            } catch (Exception e1) {
            }
            LOGGER.error("Error initialising kafka producer, unable to dump property values ", e);
            throw e;
        }
    }
项目:kafka-0.11.0.0-src-with-comment    文件:StateSerdes.java   
/**
 * Create a context for serialization using the specified serializers and deserializers which
 * <em>must</em> match the key and value types used as parameters for this object; the state changelog topic
 * is provided to bind this serde factory to, so that future calls for serialize / deserialize do not
 * need to provide the topic name any more.
 *
 * @param topic         the topic name
 * @param keySerde      the serde for keys; cannot be null
 * @param valueSerde    the serde for values; cannot be null
 * @throws IllegalArgumentException if key or value serde is null
 */
@SuppressWarnings("unchecked")
public StateSerdes(final String topic,
                   final Serde<K> keySerde,
                   final Serde<V> valueSerde) {
    if (topic == null) {
        throw new IllegalArgumentException("topic cannot be null");
    }
    if (keySerde == null) {
        throw new IllegalArgumentException("key serde cannot be null");
    }
    if (valueSerde == null) {
        throw new IllegalArgumentException("value serde cannot be null");
    }

    this.topic = topic;
    this.keySerde = keySerde;
    this.valueSerde = valueSerde;
}
项目:kiqr    文件:GenericBlockingRestKiqrClientImpl.java   
@Override
public <K, V> Map<K, V> getRangeKeyValues(String store, Class<K> keyClass, Class<V> valueClass, Serde<K> keySerde, Serde<V> valueSerde, K from, K to) {


    return execute(() -> getUriBuilder()
            .setPath(String.format("/api/v1/kv/%s", store))
            .addParameter("keySerde", keySerde.getClass().getName())
            .addParameter("valueSerde", valueSerde.getClass().getName())
            .addParameter("from", Base64.getEncoder().encodeToString(keySerde.serializer().serialize("", from)))
            .addParameter("to", Base64.getEncoder().encodeToString(keySerde.serializer().serialize("", to)))
            .build(), bytes -> {
        MultiValuedKeyValueQueryResponse resp = mapper.readValue(bytes, MultiValuedKeyValueQueryResponse.class);


        return resp.getResults().entrySet().stream()
                .map(entry -> {
                    return new Pair<K, V>(deserialize(keyClass, keySerde, entry.getKey()),
                            deserialize(valueClass, valueSerde, entry.getValue()));
                }).collect(Collectors.toMap(Pair::getKey, pair -> pair.getValue()));
    }, () -> Collections.emptyMap());

}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamBuilder.java   
private <K, V> KTable<K, V> doTable(final AutoOffsetReset offsetReset,
                                    final Serde<K> keySerde,
                                    final Serde<V> valSerde,
                                    final TimestampExtractor timestampExtractor,
                                    final String topic,
                                    final StateStoreSupplier<KeyValueStore> storeSupplier,
                                    final boolean isQueryable) {
    final String source = newName(KStreamImpl.SOURCE_NAME);
    final String name = newName(KTableImpl.SOURCE_NAME);
    final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name());

    addSource(offsetReset, source, timestampExtractor, keySerde == null ? null : keySerde.deserializer(),
            valSerde == null ? null : valSerde.deserializer(),
            topic);
    addProcessor(name, processorSupplier, source);

    final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier,
            keySerde, valSerde, Collections.singleton(source), storeSupplier.name(), isQueryable);

    addStateStore(storeSupplier, name);
    connectSourceStoreAndTopic(storeSupplier.name(), topic);

    return kTable;
}
项目:kafka-0.11.0.0-src-with-comment    文件:InMemoryKeyValueLoggedStore.java   
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context, StateStore root) {
    inner.init(context, root);

    // construct the serde
    StateSerdes<K, V>  serdes = new StateSerdes<>(
        ProcessorStateManager.storeChangelogTopic(context.applicationId(), inner.name()),
        keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
        valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);

    this.changeLogger = new StoreChangeLogger<>(inner.name(), context, serdes);

    // if the inner store is an LRU cache, add the eviction listener to log removed record
    if (inner instanceof MemoryLRUCache) {
        ((MemoryLRUCache<K, V>) inner).whenEldestRemoved(new MemoryNavigableLRUCache.EldestEntryRemovalListener<K, V>() {
            @Override
            public void apply(K key, V value) {
                removed(key);
            }
        });
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamImpl.java   
@SuppressWarnings("unchecked")
@Override
public void to(final Serde<K> keySerde, final Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, final String topic) {
    Objects.requireNonNull(topic, "topic can't be null");
    final String name = topology.newName(SINK_NAME);

    final Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
    final Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer();

    if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
        final WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
        partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(topic, windowedSerializer);
    }

    topology.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamTestDriver.java   
public KStreamTestDriver(final KStreamBuilder builder,
                         final File stateDir,
                         final Serde<?> keySerde,
                         final Serde<?> valSerde,
                         final long cacheSize) {
    builder.setApplicationId("TestDriver");
    topology = builder.build(null);
    globalTopology = builder.buildGlobalStateTopology();
    final ThreadCache cache = new ThreadCache("testCache", cacheSize, new MockStreamsMetrics(new Metrics()));
    context = new MockProcessorContext(stateDir, keySerde, valSerde, new MockRecordCollector(), cache);
    context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "topic"));
    // init global topology first as it will add stores to the
    // store map that are required for joins etc.
    if (globalTopology != null) {
        initTopology(globalTopology, globalTopology.globalStateStores());
    }
    initTopology(topology, topology.stateStores());
}
项目:kafka-0.11.0.0-src-with-comment    文件:KTableImpl.java   
private <V1> KTable<K, V1> doMapValues(final ValueMapper<? super V, ? extends V1> mapper,
                                       final Serde<V1> valueSerde,
                                       final StateStoreSupplier<KeyValueStore> storeSupplier) {
    Objects.requireNonNull(mapper);
    String name = topology.newName(MAPVALUES_NAME);
    String internalStoreName = null;
    if (storeSupplier != null) {
        internalStoreName = storeSupplier.name();
    }
    KTableProcessorSupplier<K, V, V1> processorSupplier = new KTableMapValues<>(this, mapper, internalStoreName);
    topology.addProcessor(name, processorSupplier, this.name);
    if (storeSupplier != null) {
        topology.addStateStore(storeSupplier, name);
        return new KTableImpl<>(topology, name, processorSupplier, this.keySerde, valueSerde, sourceNodes, internalStoreName, true);
    } else {
        return new KTableImpl<>(topology, name, processorSupplier, sourceNodes, this.queryableStoreName, false);
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamsConfig.java   
/**
 * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #VALUE_SERDE_CLASS_CONFIG value
 * Serde class}. This method is deprecated. Use {@link #defaultValueSerde()} instead.
 *
 * @return an configured instance of value Serde class
 */
@Deprecated
public Serde valueSerde() {
    try {
        Serde<?> serde = getConfiguredInstance(VALUE_SERDE_CLASS_CONFIG, Serde.class);
        // the default value of deprecated value serde is null
        if (serde == null) {
            serde = defaultValueSerde();
        } else {
            serde.configure(originals(), false);
        }
        return serde;
    } catch (final Exception e) {
        throw new StreamsException(String.format("Failed to configure value serde %s", get(VALUE_SERDE_CLASS_CONFIG)), e);
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:KGroupedStreamImpl.java   
@SuppressWarnings("unchecked")
@Override
public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                            final Aggregator<? super K, ? super V, T> aggregator,
                                            final Merger<? super K, T> sessionMerger,
                                            final SessionWindows sessionWindows,
                                            final Serde<T> aggValueSerde,
                                            final String queryableStoreName) {
    determineIsQueryable(queryableStoreName);
    return aggregate(initializer,
                     aggregator,
                     sessionMerger,
                     sessionWindows,
                     aggValueSerde,
                     storeFactory(keySerde, aggValueSerde, getOrCreateName(queryableStoreName, AGGREGATE_NAME))
                      .sessionWindowed(sessionWindows.maintainMs()).build());


}
项目:kafka-0.11.0.0-src-with-comment    文件:QueryableStateIntegrationTest.java   
/**
 * Creates a typical word count topology
 */
private KafkaStreams createCountStream(final String inputTopic, final String outputTopic, final Properties streamsConfiguration) {
    final KStreamBuilder builder = new KStreamBuilder();
    final Serde<String> stringSerde = Serdes.String();
    final KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, inputTopic);

    final KGroupedStream<String, String> groupedByWord = textLines
        .flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(final String value) {
                return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
            }
        })
        .groupBy(MockKeyValueMapper.<String, String>SelectValueMapper());

    // Create a State Store for the all time word count
    groupedByWord.count("word-count-store-" + inputTopic).to(Serdes.String(), Serdes.Long(), outputTopic);

    // Create a Windowed State Store that contains the word count for every 1 minute
    groupedByWord.count(TimeWindows.of(WINDOW_SIZE), "windowed-word-count-store-" + inputTopic);

    return new KafkaStreams(builder, streamsConfiguration);
}
项目:kafka-0.11.0.0-src-with-comment    文件:CachingSessionStore.java   
@SuppressWarnings("unchecked")
private void initInternal(final InternalProcessorContext context) {
    this.context = context;

    keySchema.init(topic);
    serdes = new StateSerdes<>(
        topic,
        keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
        aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde);


    cacheName = context.taskId() + "-" + bytesStore.name();
    cache = context.getCache();
    cache.addDirtyEntryFlushListener(cacheName, new ThreadCache.DirtyEntryFlushListener() {
        @Override
        public void apply(final List<ThreadCache.DirtyEntry> entries) {
            for (ThreadCache.DirtyEntry entry : entries) {
                putAndMaybeForward(entry, context);
            }
        }
    });
}
项目:stroom-stats    文件:SerdeUtils.java   
/**
 * Helper method for testing that a serde can serialize its object and deserialize it
 * back again. Throws a {@link RuntimeException } if the objects don't match.
 * @param serde
 * @param obj
 * @param <T>
 */
public static <T> void verify(final Serde<T> serde, final T obj) {
    final String dummyTopic = "xxx";
    byte[] bytes = serde.serializer().serialize(dummyTopic, obj);

    LOGGER.trace(() -> String.format("object form: %s", obj));
    LOGGER.trace(() -> String.format("byte form: %s", ByteArrayUtils.byteArrayToHex(bytes)));

    T deserializedObj = serde.deserializer().deserialize(dummyTopic, bytes);
    if (!obj.equals(deserializedObj)) {
        throw new RuntimeException(String.format("Original [%s] and de-serialized [%s] values don't match", obj, deserializedObj));
    }
}
项目:kiqr    文件:AbstractKiqrVerticle.java   
protected Serde<Object> getSerde(String serde){
    try {
        return (Serde<Object>) Class.forName(serde).newInstance();

    } catch (InstantiationException | IllegalAccessException | ClassNotFoundException | ClassCastException e) {
        throw new SerdeNotFoundException(serde, e);
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:RocksDBWindowStore.java   
@Override
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context, final StateStore root) {
    this.context = context;
    // construct the serde
    serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), bytesStore.name()),
                               keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
                               valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);

    bytesStore.init(context, root);
}
项目:kafka-0.11.0.0-src-with-comment    文件:TopologyBuilderTest.java   
@Test
public void shouldNotAllowOffsetResetSourceWithDuplicateSourceName() {
    final TopologyBuilder builder = new TopologyBuilder();
    final Serde<String> stringSerde = Serdes.String();

    builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-1");
    try {
        builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-2");
        fail("Should throw TopologyBuilderException for duplicate source name");
    } catch (TopologyBuilderException tpe) {
        //no-op
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:RocksDBWindowStore.java   
RocksDBWindowStore(final SegmentedBytesStore bytesStore,
                   final Serde<K> keySerde,
                   final Serde<V> valueSerde,
                   final boolean retainDuplicates,
                   final long windowSize) {
    super(bytesStore);
    this.keySerde = keySerde;
    this.valueSerde = valueSerde;
    this.bytesStore = bytesStore;
    this.retainDuplicates = retainDuplicates;
    this.windowSize = windowSize;
}
项目:kafka-0.11.0.0-src-with-comment    文件:ChangeLoggingKeyValueStore.java   
private ChangeLoggingKeyValueStore(final ChangeLoggingKeyValueBytesStore bytesStore,
                                   final Serde keySerde,
                                   final Serde valueSerde) {
    super(bytesStore);
    this.innerBytes = bytesStore;
    this.keySerde = keySerde;
    this.valueSerde = valueSerde;
}
项目:kiqr    文件:SpecificBlockingRestKiqrClientImpl.java   
public SpecificBlockingRestKiqrClientImpl(String host, int port, String store, Class<K> keyClass, Class<V> valueClass, Serde<K> keySerde, Serde<V> valueSerde) {
    this.keyClass = keyClass;
    this.valueClass = valueClass;
    this.keySerde = keySerde;
    this.valueSerde = valueSerde;
    this.genericClient = initGenericService(host, port);
    this.store = store;
}
项目:kafka-0.11.0.0-src-with-comment    文件:InMemoryKeyValueStore.java   
public InMemoryKeyValueStore(final String name, final Serde<K> keySerde, final Serde<V> valueSerde) {
    this.name = name;
    this.keySerde = keySerde;
    this.valueSerde = valueSerde;

    // TODO: when we have serde associated with class types, we can
    // improve this situation by passing the comparator here.
    this.map = new TreeMap<>();
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamsConfig.java   
/**
 * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #DEFAULT_VALUE_SERDE_CLASS_CONFIG value
 * Serde class}.
 *
 * @return an configured instance of value Serde class
 */
public Serde defaultValueSerde() {
    try {
        Serde<?> serde = getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class);
        serde.configure(originals(), false);
        return serde;
    } catch (final Exception e) {
        throw new StreamsException(String.format("Failed to configure value serde %s", get(DEFAULT_VALUE_SERDE_CLASS_CONFIG)), e);
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamImpl.java   
static <K1, V1> String createReparitionedSource(AbstractStream<K1> stream,
                                                Serde<K1> keySerde,
                                                Serde<V1> valSerde,
                                                final String topicNamePrefix) {
    Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null;
    Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null;
    Deserializer<K1> keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
    Deserializer<V1> valDeserializer = valSerde != null ? valSerde.deserializer() : null;
    String baseName = topicNamePrefix != null ? topicNamePrefix : stream.name;

    String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
    String sinkName = stream.topology.newName(SINK_NAME);
    String filterName = stream.topology.newName(FILTER_NAME);
    String sourceName = stream.topology.newName(SOURCE_NAME);

    stream.topology.addInternalTopic(repartitionTopic);
    stream.topology.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() {
        @Override
        public boolean test(final K1 key, final V1 value) {
            return key != null;
        }
    }, false), stream.name);

    stream.topology.addSink(sinkName, repartitionTopic, keySerializer,
                     valSerializer, filterName);
    stream.topology.addSource(sourceName, keyDeserializer, valDeserializer,
                       repartitionTopic);

    return sourceName;
}
项目:kafka-0.11.0.0-src-with-comment    文件:CachingSessionStore.java   
CachingSessionStore(final SessionStore<Bytes, byte[]> bytesStore,
                    final Serde<K> keySerde,
                    final Serde<AGG> aggSerde,
                    final long segmentInterval) {
    super(bytesStore);
    this.bytesStore = bytesStore;
    this.keySerde = keySerde;
    this.aggSerde = aggSerde;
    this.keySchema = new SessionKeySchema();
    this.cacheFunction = new SegmentedCacheFunction(keySchema, segmentInterval);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamImpl.java   
@Override
public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<? super K, ? super V, K1> selector,
                                          Serde<K1> keySerde,
                                          Serde<V> valSerde) {

    Objects.requireNonNull(selector, "selector can't be null");
    String selectName = internalSelectKey(selector);
    return new KGroupedStreamImpl<>(topology,
                                    selectName,
                                    sourceNodes,
                                    keySerde,
                                    valSerde, true);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamImpl.java   
@Override
public KGroupedStream<K, V> groupByKey(Serde<K> keySerde,
                                       Serde<V> valSerde) {
    return new KGroupedStreamImpl<>(topology,
                                    this.name,
                                    sourceNodes,
                                    keySerde,
                                    valSerde,
                                    this.repartitionRequired);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KGroupedTableImpl.java   
public KGroupedTableImpl(final KStreamBuilder topology,
                         final String name,
                         final String sourceName,
                         final Serde<? extends K> keySerde,
                         final Serde<? extends V> valSerde) {
    super(topology, name, Collections.singleton(sourceName));
    this.keySerde = keySerde;
    this.valSerde = valSerde;
    this.isQueryable = true;
}
项目:kafka-0.11.0.0-src-with-comment    文件:RocksDBSessionStore.java   
RocksDBSessionStore(final SegmentedBytesStore bytesStore,
                    final Serde<K> keySerde,
                    final Serde<AGG> aggSerde) {
    super(bytesStore);
    this.keySerde = keySerde;
    this.bytesStore = bytesStore;
    this.aggSerde = aggSerde;
}
项目:kafka-0.11.0.0-src-with-comment    文件:KTableImpl.java   
@Override
public  <V1> KTable<K, V1> mapValues(final ValueMapper<? super V, ? extends V1> mapper,
                                     final Serde<V1> valueSerde,
                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
    Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
    return doMapValues(mapper, valueSerde, storeSupplier);
}
项目:kafka-0.11.0.0-src-with-comment    文件:AbstractStoreSupplier.java   
AbstractStoreSupplier(final String name,
                      final Serde<K> keySerde,
                      final Serde<V> valueSerde,
                      final Time time,
                      final boolean logged,
                      final Map<String, String> logConfig) {
    this.time = time;
    this.name = name;
    this.valueSerde = valueSerde;
    this.keySerde = keySerde;
    this.logged = logged;
    this.logConfig = logConfig;
}
项目:kafka-0.11.0.0-src-with-comment    文件:MockProcessorContext.java   
public MockProcessorContext(final File stateDir,
                            final Serde<?> keySerde,
                            final Serde<?> valSerde,
                            final RecordCollector collector,
                            final ThreadCache cache) {
    this(stateDir, keySerde, valSerde,
            new RecordCollector.Supplier() {
                @Override
                public RecordCollector recordCollector() {
                    return collector;
                }
            },
            cache);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KTableImpl.java   
@Override
public KTable<K, V> through(final Serde<K> keySerde,
                            final Serde<V> valSerde,
                            final StreamPartitioner<? super K, ? super V> partitioner,
                            final String topic,
                            final StateStoreSupplier<KeyValueStore> storeSupplier) {
    Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
    to(keySerde, valSerde, partitioner, topic);

    return topology.table(keySerde, valSerde, topic, storeSupplier);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KTableImpl.java   
@Override
public KTable<K, V> through(final Serde<K> keySerde,
                            final Serde<V> valSerde,
                            final StreamPartitioner<? super K, ? super V> partitioner,
                            final String topic) {
    return through(keySerde, valSerde, partitioner, topic, (String) null);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KTableImpl.java   
@Override
public KTable<K, V> through(final Serde<K> keySerde,
                            final Serde<V> valSerde,
                            final String topic,
                            final String queryableStoreName) {
    return through(keySerde, valSerde, null, topic, queryableStoreName);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KTableImpl.java   
@Override
public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
                                 final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                 final Serde<R> joinSerde,
                                 final String queryableStoreName) {
    return doJoin(other, joiner, false, false, joinSerde, queryableStoreName);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KTableImpl.java   
@Override
public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other,
                                      final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                      final Serde<R> joinSerde,
                                      final String queryableStoreName) {
    return doJoin(other, joiner, true, true, joinSerde, queryableStoreName);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KTableImpl.java   
@Override
public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other,
                                     final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                     final Serde<R> joinSerde,
                                     final String queryableStoreName) {
    return doJoin(other, joiner, true, false, joinSerde, queryableStoreName);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KTableImpl.java   
@SuppressWarnings("unchecked")
private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other,
                                    ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                    final boolean leftOuter,
                                    final boolean rightOuter,
                                    final Serde<R> joinSerde,
                                    final String queryableStoreName) {
    Objects.requireNonNull(other, "other can't be null");
    Objects.requireNonNull(joiner, "joiner can't be null");

    final StateStoreSupplier storeSupplier = queryableStoreName == null ? null : keyValueStore(this.keySerde, joinSerde, queryableStoreName);

    return doJoin(other, joiner, leftOuter, rightOuter, storeSupplier);
}
项目:kafka-0.11.0.0-src-with-comment    文件:RocksDBSessionStore.java   
@Override
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context, final StateStore root) {
    final String storeName = bytesStore.name();
    topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);

    serdes = new StateSerdes<>(
        topic,
        keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
        aggSerde == null ? (Serde<AGG>) context.valueSerde() : aggSerde);

    bytesStore.init(context, root);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KGroupedStreamImpl.java   
@SuppressWarnings("unchecked")
@Override
public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                              final Aggregator<? super K, ? super V, T> aggregator,
                                                              final Windows<W> windows,
                                                              final Serde<T> aggValueSerde) {
    return aggregate(initializer, aggregator, windows, aggValueSerde, null);
}
项目:kafka-0.11.0.0-src-with-comment    文件:AbstractStream.java   
@SuppressWarnings("unchecked")
static <T, K>  StateStoreSupplier<KeyValueStore> keyValueStore(final Serde<K> keySerde,
                                                               final Serde<T> aggValueSerde,
                                                               final String storeName) {
    Objects.requireNonNull(storeName, "storeName can't be null");
    Topic.validate(storeName);
    return storeFactory(keySerde, aggValueSerde, storeName).build();
}