Java 类org.apache.kafka.common.serialization.Serializer 实例源码

项目:kmq    文件:KafkaClients.java   
public <K, V> KafkaProducer<K, V> createProducer(Class<? extends Serializer<K>> keySerializer, Class<? extends Serializer<V>> valueSerializer,
                                                        Map<String, Object> extraConfig) {
    Properties props = new Properties();
    props.put("bootstrap.servers", bootstrapServers);
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", keySerializer.getName());
    props.put("value.serializer", valueSerializer.getName());
    for (Map.Entry<String, Object> extraCfgEntry : extraConfig.entrySet()) {
        props.put(extraCfgEntry.getKey(), extraCfgEntry.getValue());
    }

    return new KafkaProducer<>(props);
}
项目:stroom-stats    文件:SerdeUtils.java   
/**
 * Creates a basic serializer using the passed stateless function and not implementing close or configure
 *
 * @param serializeFunc The function to serialize T to a byte[]
 * @param <T>           The type of object to serialize
 * @return A byte[] representation of T
 */
public static <T> Serializer<T> buildBasicSerializer(final SerializeFunc<T> serializeFunc) {
    return new Serializer<T>() {

        @Override
        public void configure(final Map<String, ?> configs, final boolean isKey) {
        }

        @Override
        public byte[] serialize(final String topic, final T data) {
            return serializeFunc.serialize(topic, data);
        }

        @Override
        public void close() {
        }
    };
}
项目:kafka-junit    文件:KafkaTestServer.java   
/**
 * Creates a kafka producer that is connected to our test server.
 * @param <K> Type of message key
 * @param <V> Type of message value
 * @param keySerializer Class of serializer to be used for keys.
 * @param valueSerializer Class of serializer to be used for values.
 * @return KafkaProducer configured to produce into Test server.
 */
public <K, V> KafkaProducer<K, V> getKafkaProducer(
    final Class<? extends Serializer<K>> keySerializer,
    final Class<? extends Serializer<V>> valueSerializer) {

    // Build config
    final Map<String, Object> kafkaProducerConfig = Maps.newHashMap();
    kafkaProducerConfig.put("bootstrap.servers", getKafkaConnectString());
    kafkaProducerConfig.put("key.serializer", keySerializer);
    kafkaProducerConfig.put("value.serializer", valueSerializer);
    kafkaProducerConfig.put("max.in.flight.requests.per.connection", 1);
    kafkaProducerConfig.put("retries", 5);
    kafkaProducerConfig.put("client.id", getClass().getSimpleName() + " Producer");
    kafkaProducerConfig.put("batch.size", 0);

    // Create and return Producer.
    return new KafkaProducer<>(kafkaProducerConfig);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamImpl.java   
@SuppressWarnings("unchecked")
@Override
public void to(final Serde<K> keySerde, final Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, final String topic) {
    Objects.requireNonNull(topic, "topic can't be null");
    final String name = topology.newName(SINK_NAME);

    final Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
    final Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer();

    if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) {
        final WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
        partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(topic, windowedSerializer);
    }

    topology.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name);
}
项目:kafka-0.11.0.0-src-with-comment    文件:WindowedSerializer.java   
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
    if (inner == null) {
        String propertyName = isKey ? "key.serializer.inner.class" : "value.serializer.inner.class";
        Object innerSerializerClass = configs.get(propertyName);
        propertyName = (innerSerializerClass == null) ? "serializer.inner.class" : propertyName;
        String value = null;
        try {
            value = (String) configs.get(propertyName);
            inner = Serializer.class.cast(Utils.newInstance(value, Serializer.class));
            inner.configure(configs, isKey);
        } catch (ClassNotFoundException e) {
            throw new ConfigException(propertyName, value, "Class " + value + " could not be found.");
        }
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollectorImpl.java   
@Override
public <K, V> void send(final String topic,
                        final K key,
                        final V value,
                        final Long timestamp,
                        final Serializer<K> keySerializer,
                        final Serializer<V> valueSerializer,
                        final StreamPartitioner<? super K, ? super V> partitioner) {
    Integer partition = null;

    if (partitioner != null) {
        final List<PartitionInfo> partitions = producer.partitionsFor(topic);
        if (partitions.size() > 0) {
            partition = partitioner.partition(key, value, partitions.size());
        } else {
            throw new StreamsException("Could not get partition information for topic '" + topic + "'." +
                " This can happen if the topic does not exist.");
        }
    }

    send(topic, key, value, partition, timestamp, keySerializer, valueSerializer);
}
项目:kafka-0.11.0.0-src-with-comment    文件:SinkNode.java   
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
    super.init(context);
    this.context = context;

    // if serializers are null, get the default ones from the context
    if (keySerializer == null) {
        keySerializer = (Serializer<K>) context.keySerde().serializer();
    }
    if (valSerializer == null) {
        valSerializer = (Serializer<V>) context.valueSerde().serializer();
    }

    // if value serializers are for {@code Change} values, set the inner serializer when necessary
    if (valSerializer instanceof ChangedSerializer &&
            ((ChangedSerializer) valSerializer).inner() == null) {
        ((ChangedSerializer) valSerializer).setInner(context.valueSerde().serializer());
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:TopologyBuilder.java   
/**
 * Add a new sink that forwards records from upstream parent processor and/or source nodes to the named Kafka topic.
 * The sink will use the specified key and value serializers, and the supplied partitioner.
 *
 * @param name the unique name of the sink
 * @param topic the name of the Kafka topic to which this sink should write its records
 * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink
 * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the
 * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
 * @param valSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink
 * should use the {@link org.apache.kafka.streams.StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the
 * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
 * @param partitioner the function that should be used to determine the partition for each record processed by the sink
 * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume
 * and write to its topic
 * @return this builder instance so methods can be chained together; never null
 * @see #addSink(String, String, String...)
 * @see #addSink(String, String, StreamPartitioner, String...)
 * @see #addSink(String, String, Serializer, Serializer, String...)
 * @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name
 */
public synchronized final <K, V> TopologyBuilder addSink(final String name, final String topic, final Serializer<K> keySerializer, final Serializer<V> valSerializer, final StreamPartitioner<? super K, ? super V> partitioner, final String... parentNames) {
    Objects.requireNonNull(name, "name must not be null");
    Objects.requireNonNull(topic, "topic must not be null");
    if (nodeFactories.containsKey(name))
        throw new TopologyBuilderException("Processor " + name + " is already added.");

    for (final String parent : parentNames) {
        if (parent.equals(name)) {
            throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself.");
        }
        if (!nodeFactories.containsKey(parent)) {
            throw new TopologyBuilderException("Parent processor " + parent + " is not added yet.");
        }
    }

    nodeFactories.put(name, new SinkNodeFactory<>(name, parentNames, topic, keySerializer, valSerializer, partitioner));
    nodeToSinkTopic.put(name, topic);
    nodeGrouper.add(name);
    nodeGrouper.unite(name, parentNames);
    return this;
}
项目:kafka-0.11.0.0-src-with-comment    文件:WindowedStreamPartitionerTest.java   
@Test
public void testWindowedSerializerNoArgConstructors() {
    Map<String, String> props = new HashMap<>();
    // test key[value].serializer.inner.class takes precedence over serializer.inner.class
    WindowedSerializer<StringSerializer> windowedSerializer = new WindowedSerializer<>();
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
    props.put("key.serializer.inner.class", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("serializer.inner.class", "org.apache.kafka.common.serialization.StringSerializer");
    windowedSerializer.configure(props, true);
    Serializer<?> inner = windowedSerializer.innerSerializer();
    assertNotNull("Inner serializer should be not null", inner);
    assertTrue("Inner serializer type should be StringSerializer", inner instanceof StringSerializer);
    // test serializer.inner.class
    props.put("serializer.inner.class", "org.apache.kafka.common.serialization.ByteArraySerializer");
    props.remove("key.serializer.inner.class");
    props.remove("value.serializer.inner.class");
    WindowedSerializer<?> windowedSerializer1 = new WindowedSerializer<>();
    windowedSerializer1.configure(props, false);
    Serializer<?> inner1 = windowedSerializer1.innerSerializer();
    assertNotNull("Inner serializer should be not null", inner1);
    assertTrue("Inner serializer type should be ByteArraySerializer", inner1 instanceof ByteArraySerializer);
}
项目:kafka-0.11.0.0-src-with-comment    文件:RocksDBWindowStoreSupplierTest.java   
@Test
public void shouldCreateLoggingEnabledStoreWhenWindowStoreLogged() throws Exception {
    store = createStore(true, false);
    final List<ProducerRecord> logged = new ArrayList<>();
    final NoOpRecordCollector collector = new NoOpRecordCollector() {
        @Override
        public <K, V> void send(final String topic,
                                K key,
                                V value,
                                Integer partition,
                                Long timestamp,
                                Serializer<K> keySerializer,
                                Serializer<V> valueSerializer) {
            logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
        }
    };
    final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
                                                                  Serdes.String(),
                                                                  Serdes.String(),
                                                                  collector,
                                                                  cache);
    context.setTime(1);
    store.init(context, store);
    store.put("a", "b");
    assertFalse(logged.isEmpty());
}
项目:kafka-0.11.0.0-src-with-comment    文件:RocksDBWindowStoreSupplierTest.java   
@Test
public void shouldNotBeLoggingEnabledStoreWhenLogginNotEnabled() throws Exception {
    store = createStore(false, false);
    final List<ProducerRecord> logged = new ArrayList<>();
    final NoOpRecordCollector collector = new NoOpRecordCollector() {
        @Override
        public <K, V> void send(final String topic,
                                K key,
                                V value,
                                Integer partition,
                                Long timestamp,
                                Serializer<K> keySerializer,
                                Serializer<V> valueSerializer) {
            logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
        }
    };
    final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
                                                                  Serdes.String(),
                                                                  Serdes.String(),
                                                                  collector,
                                                                  cache);
    context.setTime(1);
    store.init(context, store);
    store.put("a", "b");
    assertTrue(logged.isEmpty());
}
项目:kafka-0.11.0.0-src-with-comment    文件:ChangeLoggingKeyValueBytesStoreTest.java   
@Before
public void before() {
    final NoOpRecordCollector collector = new NoOpRecordCollector() {
        @Override
        public <K, V> void send(final String topic,
                                K key,
                                V value,
                                Integer partition,
                                Long timestamp,
                                Serializer<K> keySerializer,
                                Serializer<V> valueSerializer) {
            sent.put(key, value);
        }
    };
    context = new MockProcessorContext(
        TestUtils.tempDirectory(),
        Serdes.String(),
        Serdes.Long(),
        collector,
        new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
    context.setTime(0);
    store.init(context, store);
}
项目:kafka-0.11.0.0-src-with-comment    文件:RocksDBKeyValueStoreSupplierTest.java   
@Test
public void shouldCreateLoggingEnabledStoreWhenStoreLogged() throws Exception {
    store = createStore(true, false);
    final List<ProducerRecord> logged = new ArrayList<>();
    final NoOpRecordCollector collector = new NoOpRecordCollector() {
        @Override
        public <K, V> void send(final String topic,
                                K key,
                                V value,
                                Integer partition,
                                Long timestamp,
                                Serializer<K> keySerializer,
                                Serializer<V> valueSerializer) {
            logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
        }
    };
    final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
                                                                  Serdes.String(),
                                                                  Serdes.String(),
                                                                  collector,
                                                                  cache);
    context.setTime(1);
    store.init(context, store);
    store.put("a", "b");
    assertFalse(logged.isEmpty());
}
项目:kafka-0.11.0.0-src-with-comment    文件:RocksDBKeyValueStoreSupplierTest.java   
@Test
public void shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled() throws Exception {
    store = createStore(false, false);
    final List<ProducerRecord> logged = new ArrayList<>();
    final NoOpRecordCollector collector = new NoOpRecordCollector() {
        @Override
        public <K, V> void send(final String topic,
                                K key,
                                V value,
                                Integer partition,
                                Long timestamp,
                                Serializer<K> keySerializer,
                                Serializer<V> valueSerializer) {
            logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
        }
    };
    final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
                                                                  Serdes.String(),
                                                                  Serdes.String(),
                                                                  collector,
                                                                  cache);
    context.setTime(1);
    store.init(context, store);
    store.put("a", "b");
    assertTrue(logged.isEmpty());
}
项目:kafka-0.11.0.0-src-with-comment    文件:ChangeLoggingKeyValueStoreTest.java   
@Before
public void before() {
    final NoOpRecordCollector collector = new NoOpRecordCollector() {
        @Override
        public <K, V> void send(final String topic,
                                K key,
                                V value,
                                Integer partition,
                                Long timestamp,
                                Serializer<K> keySerializer,
                                Serializer<V> valueSerializer) {
            sent.put(key, value);
        }
    };
    context = new MockProcessorContext(
        TestUtils.tempDirectory(),
        Serdes.String(),
        Serdes.Long(),
        collector,
        new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())));
    context.setTime(0);
    store.init(context, store);
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamsConfigTest.java   
@Test
public void defaultSerdeShouldBeConfigured() {
    final Map<String, Object> serializerConfigs = new HashMap<>();
    serializerConfigs.put("key.serializer.encoding", "UTF8");
    serializerConfigs.put("value.serializer.encoding", "UTF-16");
    final Serializer<String> serializer = Serdes.String().serializer();

    final String str = "my string for testing";
    final String topic = "my topic";

    serializer.configure(serializerConfigs, true);
    assertEquals("Should get the original string after serialization and deserialization with the configured encoding",
            str, streamsConfig.defaultKeySerde().deserializer().deserialize(topic, serializer.serialize(topic, str)));

    serializer.configure(serializerConfigs, false);
    assertEquals("Should get the original string after serialization and deserialization with the configured encoding",
            str, streamsConfig.defaultValueSerde().deserializer().deserialize(topic, serializer.serialize(topic, str)));
}
项目:kafka-0.11.0.0-src-with-comment    文件:MockProducer.java   
/**
 * Create a mock producer
 *
 * @param cluster The cluster holding metadata for this producer
 * @param autoComplete If true automatically complete all requests successfully and execute the callback. Otherwise
 *        the user must call {@link #completeNext()} or {@link #errorNext(RuntimeException)} after
 *        {@link #send(ProducerRecord) send()} to complete the call and unblock the @{link
 *        java.util.concurrent.Future Future&lt;RecordMetadata&gt;} that is returned.
 * @param partitioner The partition strategy
 * @param keySerializer The serializer for key that implements {@link Serializer}.
 * @param valueSerializer The serializer for value that implements {@link Serializer}.
 */
public MockProducer(final Cluster cluster,
                    final boolean autoComplete,
                    final Partitioner partitioner,
                    final Serializer<K> keySerializer,
                    final Serializer<V> valueSerializer) {
    this.cluster = cluster;
    this.autoComplete = autoComplete;
    this.partitioner = partitioner;
    this.keySerializer = ensureExtended(keySerializer);
    this.valueSerializer = ensureExtended(valueSerializer);
    this.offsets = new HashMap<>();
    this.sent = new ArrayList<>();
    this.uncommittedSends = new ArrayList<>();
    this.consumerGroupOffsets = new ArrayList<>();
    this.uncommittedConsumerGroupOffsets = new HashMap<>();
    this.completions = new ArrayDeque<>();
}
项目:rmap    文件:JustInTimeConfiguredProducerFactoryTest.java   
@Test
@SuppressWarnings("unchecked")
public void testPropertyResolutionMissingFromEnvironment() throws Exception {
    HashMap<String, Object> expected = new HashMap<String, Object>() {
        {
            put("supplied-foo", "bar");
            put("supplied-null", null);
        }
    };

    JustInTimeConfiguredProducerFactory underTest = new JustInTimeConfiguredProducerFactory(
            expected, mock(Serializer.class), mock(Serializer.class));
    underTest.setEnvironment(mock(Environment.class));

    Map<String, Object> actual = underTest.getConfigurationProperties();

    assertEquals(expected, actual);
}
项目:likafka-clients    文件:MessageAssemblerTest.java   
@Test
public void testSingleMessageSegment() {
  // Create serializer/deserializers.
  Serializer<LargeMessageSegment> segmentSerializer = new DefaultSegmentSerializer();
  Deserializer<LargeMessageSegment> segmentDeserializer = new DefaultSegmentDeserializer();

  byte[] messageWrappedBytes = wrapMessageBytes(segmentSerializer, "message".getBytes());

  MessageAssembler messageAssembler = new MessageAssemblerImpl(100, 100, true, segmentDeserializer);
  MessageAssembler.AssembleResult assembleResult =
      messageAssembler.assemble(new TopicPartition("topic", 0), 0, messageWrappedBytes);

  assertNotNull(assembleResult.messageBytes());
  assertEquals(assembleResult.messageStartingOffset(), 0, "The message starting offset should be 0");
  assertEquals(assembleResult.messageEndingOffset(), 0, "The message ending offset should be 0");
}
项目:li-apache-kafka-clients    文件:SerializerDeserializerTest.java   
@Test
public void testSerde() {
  Serializer<String> stringSerializer = new StringSerializer();
  Deserializer<String> stringDeserializer = new StringDeserializer();
  Serializer<LargeMessageSegment> segmentSerializer = new DefaultSegmentSerializer();
  Deserializer<LargeMessageSegment> segmentDeserializer = new DefaultSegmentDeserializer();

  String s = LiKafkaClientsTestUtils.getRandomString(100);
  assertEquals(s.length(), 100);
  byte[] stringBytes = stringSerializer.serialize("topic", s);
  assertEquals(stringBytes.length, 100);
  LargeMessageSegment segment =
      new LargeMessageSegment(LiKafkaClientsUtils.randomUUID(), 0, 2, stringBytes.length, ByteBuffer.wrap(stringBytes));
  // String bytes + segment header
  byte[] serializedSegment = segmentSerializer.serialize("topic", segment);
  assertEquals(serializedSegment.length, 1 + stringBytes.length + LargeMessageSegment.SEGMENT_INFO_OVERHEAD + 4);

  LargeMessageSegment deserializedSegment = segmentDeserializer.deserialize("topic", serializedSegment);
  assertEquals(deserializedSegment.messageId, segment.messageId);
  assertEquals(deserializedSegment.messageSizeInBytes, segment.messageSizeInBytes);
  assertEquals(deserializedSegment.numberOfSegments, segment.numberOfSegments);
  assertEquals(deserializedSegment.sequenceNumber, segment.sequenceNumber);
  assertEquals(deserializedSegment.payload.limit(), 100);
  String deserializedString = stringDeserializer.deserialize("topic", deserializedSegment.payloadArray());
  assertEquals(deserializedString.length(), s.length());
}
项目:li-apache-kafka-clients    文件:MessageAssemblerTest.java   
@Test
public void testSingleMessageSegment() {
  // Create serializer/deserializers.
  Serializer<LargeMessageSegment> segmentSerializer = new DefaultSegmentSerializer();
  Deserializer<LargeMessageSegment> segmentDeserializer = new DefaultSegmentDeserializer();

  byte[] messageWrappedBytes = wrapMessageBytes(segmentSerializer, "message".getBytes());

  MessageAssembler messageAssembler = new MessageAssemblerImpl(100, 100, true, segmentDeserializer);
  MessageAssembler.AssembleResult assembleResult =
      messageAssembler.assemble(new TopicPartition("topic", 0), 0, messageWrappedBytes);

  assertNotNull(assembleResult.messageBytes());
  assertEquals(assembleResult.messageStartingOffset(), 0, "The message starting offset should be 0");
  assertEquals(assembleResult.messageEndingOffset(), 0, "The message ending offset should be 0");
}
项目:hello-kafka-streams    文件:JsonPOJOSerde.java   
@Override
public Serializer<T> serializer() {
    return new Serializer<T>() {

        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {

        }

        @Override
        public byte[] serialize(String topic, T data) {
            try {
                return mapper.writeValueAsBytes(data);
            } catch (Exception e) {
                throw new SerializationException("Error serializing JSON message", e);
            }
        }

        @Override
        public void close() {

        }
    };

}
项目:kafka-sandbox    文件:CounterMetricSerde.java   
@Override
public Serializer<CounterMetric> serializer() {
    return new Serializer<CounterMetric>() {

        private Gson gson = new Gson();

        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
        }

        @Override
        public byte[] serialize(String topic, CounterMetric metric) {
            JsonObject jsonObject = new JsonObject();
            jsonObject.add("type", new JsonPrimitive("METER"));
            jsonObject.add("name", new JsonPrimitive(metric.getName()));
            jsonObject.add("count", new JsonPrimitive(metric.getValue()));
            jsonObject.add("timestamp", new JsonPrimitive(metric.getTimestampInMillis()));
            String json = gson.toJson(jsonObject);
            return json.getBytes(StandardCharsets.UTF_8);
        }

        @Override
        public void close() {
        }
    };
}
项目:ksql    文件:IntegrationTestHarness.java   
/**
 * Topic topicName will be automatically created if it doesn't exist.
 * @param topicName
 * @param recordsToPublish
 * @param timestamp
 * @return
 * @throws InterruptedException
 * @throws TimeoutException
 * @throws ExecutionException
 */
public Map<String, RecordMetadata> produceData(String topicName, Map<String, GenericRow> recordsToPublish, Serializer<GenericRow> serializer, Long timestamp)
        throws InterruptedException, TimeoutException, ExecutionException {

  createTopic(topicName);

  Properties producerConfig = properties();
  KafkaProducer<String, GenericRow> producer =
          new KafkaProducer<>(producerConfig, new StringSerializer(), serializer);

  Map<String, RecordMetadata> result = new HashMap<>();
  for (Map.Entry<String, GenericRow> recordEntry : recordsToPublish.entrySet()) {
    String key = recordEntry.getKey();
    Future<RecordMetadata> recordMetadataFuture = producer.send(buildRecord(topicName, timestamp, recordEntry, key));
    result.put(key, recordMetadataFuture.get(TEST_RECORD_FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS));
  }
  producer.close();

  return result;
}
项目:beam    文件:KafkaIOTest.java   
@SuppressWarnings("unchecked")
@Override
public Producer<Integer, Long> apply(Map<String, Object> config) {

  // Make sure the config is correctly set up for serializers.
  Utils.newInstance(
          ((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
                  .asSubclass(Serializer.class)
  ).configure(config, true);

  Utils.newInstance(
      ((Class<?>) config.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
          .asSubclass(Serializer.class)
  ).configure(config, false);

  // Returning same producer in each instance in a pipeline seems to work fine currently.
  // If DirectRunner creates multiple DoFn instances for sinks, we might need to handle
  // it appropriately. I.e. allow multiple producers for each producerKey and concatenate
  // all the messages written to each producer for verification after the pipeline finishes.

  return MOCK_PRODUCER_MAP.get(producerKey);
}
项目:beam    文件:ResumeFromCheckpointStreamingTest.java   
private static void produce(Map<String, Instant> messages) {
  Properties producerProps = new Properties();
  producerProps.putAll(EMBEDDED_KAFKA_CLUSTER.getProps());
  producerProps.put("request.required.acks", 1);
  producerProps.put("bootstrap.servers", EMBEDDED_KAFKA_CLUSTER.getBrokerList());
  Serializer<String> stringSerializer = new StringSerializer();
  Serializer<Instant> instantSerializer = new InstantSerializer();

  try (@SuppressWarnings("unchecked") KafkaProducer<String, Instant> kafkaProducer =
      new KafkaProducer(producerProps, stringSerializer, instantSerializer)) {
        for (Map.Entry<String, Instant> en : messages.entrySet()) {
          kafkaProducer.send(new ProducerRecord<>(TOPIC, en.getKey(), en.getValue()));
        }
        kafkaProducer.close();
      }
}
项目:apache-kafka-demos    文件:FilterStream.java   
public static void main(String[] args) throws InterruptedException {

        Properties props = new Properties();
        props.put(APPLICATION_ID_CONFIG, "my-stream-processing-application");
        props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.serializer", JsonPOJOSerializer.class.getName());
        props.put("value.deserializer", JsonPOJODeserializer.class.getName());

        Map<String, Object> serdeProps = new HashMap<>();
        serdeProps.put("JsonPOJOClass", Messung.class);

        final Serializer<Messung> serializer = new JsonPOJOSerializer<>();
        serializer.configure(serdeProps, false);

        final Deserializer<Messung> deserializer = new JsonPOJODeserializer<>();
        deserializer.configure(serdeProps, false);

        final Serde<Messung> serde = Serdes.serdeFrom(serializer, deserializer);

        StreamsConfig config = new StreamsConfig(props);

        KStreamBuilder builder = new KStreamBuilder();

        builder.stream(Serdes.String(), serde, "produktion")
                .filter( (k,v) -> v.type.equals("Biogas"))
                .to(Serdes.String(), serde,"produktion2");

        KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();
    }
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamImpl.java   
static <K1, V1> String createReparitionedSource(AbstractStream<K1> stream,
                                                Serde<K1> keySerde,
                                                Serde<V1> valSerde,
                                                final String topicNamePrefix) {
    Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null;
    Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null;
    Deserializer<K1> keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
    Deserializer<V1> valDeserializer = valSerde != null ? valSerde.deserializer() : null;
    String baseName = topicNamePrefix != null ? topicNamePrefix : stream.name;

    String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
    String sinkName = stream.topology.newName(SINK_NAME);
    String filterName = stream.topology.newName(FILTER_NAME);
    String sourceName = stream.topology.newName(SOURCE_NAME);

    stream.topology.addInternalTopic(repartitionTopic);
    stream.topology.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() {
        @Override
        public boolean test(final K1 key, final V1 value) {
            return key != null;
        }
    }, false), stream.name);

    stream.topology.addSink(sinkName, repartitionTopic, keySerializer,
                     valSerializer, filterName);
    stream.topology.addSource(sourceName, keyDeserializer, valDeserializer,
                       repartitionTopic);

    return sourceName;
}
项目:kafka-0.11.0.0-src-with-comment    文件:KGroupedTableImpl.java   
private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
                                     final String functionName,
                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
    String sinkName = topology.newName(KStreamImpl.SINK_NAME);
    String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
    String funcName = topology.newName(functionName);

    String topic = storeSupplier.name() + KStreamImpl.REPARTITION_TOPIC_SUFFIX;

    Serializer<? extends K> keySerializer = keySerde == null ? null : keySerde.serializer();
    Deserializer<? extends K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
    Serializer<? extends V> valueSerializer = valSerde == null ? null : valSerde.serializer();
    Deserializer<? extends V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();

    ChangedSerializer<? extends V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
    ChangedDeserializer<? extends V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);

    // send the aggregate key-value pairs to the intermediate topic for partitioning
    topology.addInternalTopic(topic);
    topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name);

    // read the intermediate topic
    topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);

    // aggregate the values with the aggregator and local store
    topology.addProcessor(funcName, aggregateSupplier, sourceName);
    topology.addStateStore(storeSupplier, funcName);

    // return the KTable representation with the intermediate topic as the sources
    return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name(), isQueryable);
}
项目:kafka-0.11.0.0-src-with-comment    文件:SessionKeySerde.java   
public static <K> Bytes toBinary(final Windowed<K> sessionKey, final Serializer<K> serializer, final String topic) {
    final byte[] bytes = serializer.serialize(topic, sessionKey.key());
    ByteBuffer buf = ByteBuffer.allocate(bytes.length + 2 * TIMESTAMP_SIZE);
    buf.put(bytes);
    buf.putLong(sessionKey.window().end());
    buf.putLong(sessionKey.window().start());
    return new Bytes(buf.array());
}
项目:kafka-0.11.0.0-src-with-comment    文件:StoreChangeLogger.java   
void logChange(final K key, final V value) {
    if (collector != null) {
        final Serializer<K> keySerializer = serialization.keySerializer();
        final Serializer<V> valueSerializer = serialization.valueSerializer();
        collector.send(this.topic, key, value, this.partition, context.timestamp(), keySerializer, valueSerializer);
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:StandbyContextImpl.java   
@Override
public <K, V> void send(final String topic,
                        final K key,
                        final V value,
                        final Integer partition,
                        final Long timestamp,
                        final Serializer<K> keySerializer,
                        final Serializer<V> valueSerializer) {
}
项目:kafka-0.11.0.0-src-with-comment    文件:StandbyContextImpl.java   
@Override
public <K, V> void send(final String topic,
                        final K key,
                        final V value,
                        final Long timestamp,
                        final Serializer<K> keySerializer,
                        final Serializer<V> valueSerializer,
                        final StreamPartitioner<? super K, ? super V> partitioner) {}
项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollector.java   
<K, V> void send(final String topic,
final K key,
final V value,
final Integer partition,
final Long timestamp,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer);
项目:kafka-0.11.0.0-src-with-comment    文件:RecordCollector.java   
<K, V> void send(final String topic,
final K key,
final V value,
final Long timestamp,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer,
final StreamPartitioner<? super K, ? super V> partitioner);
项目:kafka-0.11.0.0-src-with-comment    文件:SinkNode.java   
public SinkNode(final String name,
                final String topic,
                final Serializer<K> keySerializer,
                final Serializer<V> valSerializer,
                final StreamPartitioner<? super K, ? super V> partitioner) {
    super(name);

    this.topic = topic;
    this.keySerializer = keySerializer;
    this.valSerializer = valSerializer;
    this.partitioner = partitioner;
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamsMetadataState.java   
/**
 * Find the {@link StreamsMetadata}s for a given storeName and key. This method will use the
 * {@link DefaultStreamPartitioner} to locate the store. If a custom partitioner has been used
 * please use {@link StreamsMetadataState#getMetadataWithKey(String, Object, StreamPartitioner)}
 *
 * Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
 * this method provides a way of finding which {@link StreamsMetadata} it would exist on.
 *
 * @param storeName     Name of the store
 * @param key           Key to use
 * @param keySerializer Serializer for the key
 * @param <K>           key type
 * @return The {@link StreamsMetadata} for the storeName and key or {@link StreamsMetadata#NOT_AVAILABLE}
 * if streams is (re-)initializing
 */
public synchronized <K> StreamsMetadata getMetadataWithKey(final String storeName,
                                                           final K key,
                                                           final Serializer<K> keySerializer) {
    Objects.requireNonNull(keySerializer, "keySerializer can't be null");
    Objects.requireNonNull(storeName, "storeName can't be null");
    Objects.requireNonNull(key, "key can't be null");

    if (!isInitialized()) {
        return StreamsMetadata.NOT_AVAILABLE;
    }

    if (globalStores.contains(storeName)) {
        // global stores are on every node. if we dont' have the host info
        // for this host then just pick the first metadata
        if (thisHost == UNKNOWN_HOST) {
            return allMetadata.get(0);
        }
        return myMetadata;
    }

    final SourceTopicsInfo sourceTopicsInfo = getSourceTopicsInfo(storeName);
    if (sourceTopicsInfo == null) {
        return null;
    }

    return getStreamsMetadataForKey(storeName,
                                    key,
                                    new DefaultStreamPartitioner<>(keySerializer,
                                                                   clusterMetadata,
                                                                   sourceTopicsInfo.topicWithMostPartitions),
                                    sourceTopicsInfo);
}
项目:kafka-0.11.0.0-src-with-comment    文件:TopologyBuilder.java   
private SinkNodeFactory(String name, String[] parents, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner) {
    super(name);
    this.parents = parents.clone();
    this.topic = topic;
    this.keySerializer = keySerializer;
    this.valSerializer = valSerializer;
    this.partitioner = partitioner;
}
项目:kafka-0.11.0.0-src-with-comment    文件:RocksDBSessionStoreSupplierTest.java   
@Override
public <K, V> void send(final String topic,
                        final K key,
                        final V value,
                        final Integer partition,
                        final Long timestamp,
                        final Serializer<K> keySerializer,
                        final Serializer<V> valueSerializer) {
    logged.add(new ProducerRecord<>(topic, partition, timestamp, key, value));
}
项目:kafka-0.11.0.0-src-with-comment    文件:RocksDBWindowStoreTest.java   
@Override
public <K1, V1> void send(final String topic,
                          K1 key,
                          V1 value,
                          Integer partition,
                          Long timestamp,
                          Serializer<K1> keySerializer,
                          Serializer<V1> valueSerializer) {
    changeLog.add(new KeyValue<>(
            keySerializer.serialize(topic, key),
            valueSerializer.serialize(topic, value))
    );
}