Java 类org.apache.kafka.common.serialization.Deserializer 实例源码

项目:kmq    文件:KmqClient.java   
public KmqClient(KmqConfig config, KafkaClients clients,
                 Class<? extends Deserializer<K>> keyDeserializer,
                 Class<? extends Deserializer<V>> valueDeserializer,
                 long msgPollTimeout) {

    this.config = config;
    this.msgPollTimeout = msgPollTimeout;

    this.msgConsumer = clients.createConsumer(config.getMsgConsumerGroupId(), keyDeserializer, valueDeserializer);
    // Using the custom partitioner, each offset-partition will contain markers only from a single queue-partition.
    this.markerProducer = clients.createProducer(
            MarkerKey.MarkerKeySerializer.class, MarkerValue.MarkerValueSerializer.class,
            Collections.singletonMap(ProducerConfig.PARTITIONER_CLASS_CONFIG, ParititionFromMarkerKey.class));

    LOG.info(String.format("Subscribing to topic: %s, using group id: %s", config.getMsgTopic(), config.getMsgConsumerGroupId()));
    msgConsumer.subscribe(Collections.singletonList(config.getMsgTopic()));
}
项目:nighthawk    文件:ListenableConsumerFactoryBean.java   
@Override
@SuppressWarnings("unchecked")
public void afterPropertiesSet() throws Exception {
    if (topics == null && topicPatternString == null) {
        throw new IllegalArgumentException("topic info must not be null");
    }
    Assert.notEmpty(configs, "configs must not be null");
    Assert.notNull(payloadListener, "payloadListener must be null");
    String valueDeserializerKlass = (String) configs.get("value.deserializer");
    configs.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    Consumer<String, byte[]> consumer = new KafkaConsumer<>(configs);

    Deserializer valueDeserializer = createDeserializer(valueDeserializerKlass);
    valueDeserializer.configure(configs, false);

    if (topics != null) {
        listenableConsumer =
                new ListenableTracingConsumer<>(consumer, Arrays.asList(topics), valueDeserializer);
    } else {
        listenableConsumer =
                new ListenableTracingConsumer<>(consumer, Pattern.compile(topicPatternString), valueDeserializer);
    }
    if (payloadListener != null) {
        listenableConsumer.addListener(payloadListener);
    }
    listenableConsumer.start();
}
项目:stroom-stats    文件:SerdeUtils.java   
/**
 * Builds a Deserializer of T with the passed stateless function and no configure or close implementations
 */
public static <T> Deserializer<T> buildBasicDeserializer(final DeserializeFunc<T> deserializeFunc) {
    return new Deserializer<T>() {
        @Override
        public void configure(final Map<String, ?> configs, final boolean isKey) {
        }

        @Override
        public T deserialize(final String topic, final byte[] bData) {
            return deserializeFunc.deserialize(topic, bData);
        }

        @Override
        public void close() {
        }
    };
}
项目:azeroth    文件:OldApiTopicConsumer.java   
/**
 * 
 * @param connector
 * @param topics
 * @param processThreads 
 */
@SuppressWarnings("unchecked")
public OldApiTopicConsumer(ConsumerContext context) {

    this.consumerContext = context;
    try {
        Class<?> deserializerClass = Class
            .forName(context.getProperties().getProperty("value.deserializer"));
        deserializer = (Deserializer<Object>) deserializerClass.newInstance();
    } catch (Exception e) {
    }
    this.connector = kafka.consumer.Consumer
        .createJavaConsumerConnector(new ConsumerConfig(context.getProperties()));

    int poolSize = consumerContext.getMessageHandlers().size();
    this.fetchExecutor = new StandardThreadExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS,
        poolSize, new StandardThreadFactory("KafkaFetcher"));

    this.defaultProcessExecutor = new StandardThreadExecutor(1, context.getMaxProcessThreads(),
        30, TimeUnit.SECONDS, context.getMaxProcessThreads(),
        new StandardThreadFactory("KafkaProcessor"), new PoolFullRunsPolicy());

    logger.info(
        "Kafka Conumer ThreadPool initialized,fetchPool Size:{},defalutProcessPool Size:{} ",
        poolSize, context.getMaxProcessThreads());
}
项目:kafka-0.11.0.0-src-with-comment    文件:WindowedDeserializer.java   
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
    if (inner == null) {
        String propertyName = isKey ? "key.deserializer.inner.class" : "value.deserializer.inner.class";
        Object innerDeserializerClass = configs.get(propertyName);
        propertyName = (innerDeserializerClass == null) ? "deserializer.inner.class" : propertyName;
        String value = null;
        try {
            value = (String) configs.get(propertyName);
            inner = Deserializer.class.cast(Utils.newInstance(value, Deserializer.class));
            inner.configure(configs, isKey);
        } catch (ClassNotFoundException e) {
            throw new ConfigException(propertyName, value, "Class " + value + " could not be found.");
        }
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:SourceNode.java   
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
    super.init(context);
    this.context = context;

    // if deserializers are null, get the default ones from the context
    if (this.keyDeserializer == null)
        this.keyDeserializer = ensureExtended((Deserializer<K>) context.keySerde().deserializer());
    if (this.valDeserializer == null)
        this.valDeserializer = ensureExtended((Deserializer<V>) context.valueSerde().deserializer());

    // if value deserializers are for {@code Change} values, set the inner deserializer when necessary
    if (this.valDeserializer instanceof ChangedDeserializer &&
            ((ChangedDeserializer) this.valDeserializer).inner() == null)
        ((ChangedDeserializer) this.valDeserializer).setInner(context.valueSerde().deserializer());
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamAggregationIntegrationTest.java   
private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
                                                        keyDeserializer,
                                                    final Deserializer<V>
                                                        valueDeserializer,
                                                    final int numMessages)
    throws InterruptedException {
    final Properties consumerProperties = new Properties();
    consumerProperties
        .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo);
    consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
    consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
    return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
        consumerProperties,
        outputTopic,
        numMessages,
        60 * 1000);

}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamRepartitionJoinTest.java   
private List<String> receiveMessages(final Deserializer<?> valueDeserializer,
                                     final int numMessages, final String topic) throws InterruptedException {

    final Properties config = new Properties();

    config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-test");
    config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        IntegerDeserializer.class.getName());
    config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        valueDeserializer.getClass().getName());
    final List<String> received = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(
        config,
        topic,
        numMessages,
        60 * 1000);
    Collections.sort(received);

    return received;
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamAggregationDedupIntegrationTest.java   
private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
                                                        keyDeserializer,
                                                    final Deserializer<V>
                                                        valueDeserializer,
                                                    final int numMessages)
    throws InterruptedException {
    final Properties consumerProperties = new Properties();
    consumerProperties
        .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" +
        testNo);
    consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        keyDeserializer.getClass().getName());
    consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        valueDeserializer.getClass().getName());
    return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties,
        outputTopic,
        numMessages,
        60 * 1000);

}
项目:kafka-0.11.0.0-src-with-comment    文件:WindowedStreamPartitionerTest.java   
@Test
public void testWindowedDeserializerNoArgConstructors() {
    Map<String, String> props = new HashMap<>();
    // test key[value].deserializer.inner.class takes precedence over serializer.inner.class
    WindowedDeserializer<StringSerializer> windowedDeserializer = new WindowedDeserializer<>();
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
    props.put("key.deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer");
    windowedDeserializer.configure(props, true);
    Deserializer<?> inner = windowedDeserializer.innerDeserializer();
    assertNotNull("Inner deserializer should be not null", inner);
    assertTrue("Inner deserializer type should be StringDeserializer", inner instanceof StringDeserializer);
    // test deserializer.inner.class
    props.put("deserializer.inner.class", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    props.remove("key.deserializer.inner.class");
    props.remove("value.deserializer.inner.class");
    WindowedDeserializer<?> windowedDeserializer1 = new WindowedDeserializer<>();
    windowedDeserializer1.configure(props, false);
    Deserializer<?> inner1 = windowedDeserializer1.innerDeserializer();
    assertNotNull("Inner deserializer should be not null", inner1);
    assertTrue("Inner deserializer type should be ByteArrayDeserializer", inner1 instanceof ByteArrayDeserializer);
}
项目:kafka-0.11.0.0-src-with-comment    文件:FetcherTest.java   
private <K, V> Fetcher<K, V> createFetcher(SubscriptionState subscriptions,
                                           Metrics metrics,
                                           Deserializer<K> keyDeserializer,
                                           Deserializer<V> valueDeserializer,
                                           int maxPollRecords,
                                           IsolationLevel isolationLevel) {
    return new Fetcher<>(consumerClient,
            minBytes,
            maxBytes,
            maxWaitMs,
            fetchSize,
            maxPollRecords,
            true, // check crc
            keyDeserializer,
            valueDeserializer,
            metadata,
            subscriptions,
            metrics,
            metricsRegistry,
            time,
            retryBackoffMs,
            isolationLevel);
}
项目:oryx2    文件:ModelManagerListener.java   
@SuppressWarnings("unchecked")
void init(ServletContext context) {
  String serializedConfig = context.getInitParameter(ConfigUtils.class.getName() + ".serialized");
  Objects.requireNonNull(serializedConfig);
  this.config = ConfigUtils.deserialize(serializedConfig);
  this.updateTopic = config.getString("oryx.update-topic.message.topic");
  this.maxMessageSize = config.getInt("oryx.update-topic.message.max-size");
  this.updateTopicLockMaster = config.getString("oryx.update-topic.lock.master");
  this.updateTopicBroker = config.getString("oryx.update-topic.broker");
  this.readOnly = config.getBoolean("oryx.serving.api.read-only");
  if (!readOnly) {
    this.inputTopic = config.getString("oryx.input-topic.message.topic");
    this.inputTopicLockMaster = config.getString("oryx.input-topic.lock.master");
    this.inputTopicBroker = config.getString("oryx.input-topic.broker");
  }
  this.modelManagerClassName = config.getString("oryx.serving.model-manager-class");
  this.updateDecoderClass = (Class<? extends Deserializer<U>>) ClassUtils.loadClass(
      config.getString("oryx.update-topic.message.decoder-class"), Deserializer.class);
  Preconditions.checkArgument(maxMessageSize > 0);
}
项目:likafka-clients    文件:MessageAssemblerTest.java   
@Test
public void testSingleMessageSegment() {
  // Create serializer/deserializers.
  Serializer<LargeMessageSegment> segmentSerializer = new DefaultSegmentSerializer();
  Deserializer<LargeMessageSegment> segmentDeserializer = new DefaultSegmentDeserializer();

  byte[] messageWrappedBytes = wrapMessageBytes(segmentSerializer, "message".getBytes());

  MessageAssembler messageAssembler = new MessageAssemblerImpl(100, 100, true, segmentDeserializer);
  MessageAssembler.AssembleResult assembleResult =
      messageAssembler.assemble(new TopicPartition("topic", 0), 0, messageWrappedBytes);

  assertNotNull(assembleResult.messageBytes());
  assertEquals(assembleResult.messageStartingOffset(), 0, "The message starting offset should be 0");
  assertEquals(assembleResult.messageEndingOffset(), 0, "The message ending offset should be 0");
}
项目:li-apache-kafka-clients    文件:SerializerDeserializerTest.java   
@Test
public void testSerde() {
  Serializer<String> stringSerializer = new StringSerializer();
  Deserializer<String> stringDeserializer = new StringDeserializer();
  Serializer<LargeMessageSegment> segmentSerializer = new DefaultSegmentSerializer();
  Deserializer<LargeMessageSegment> segmentDeserializer = new DefaultSegmentDeserializer();

  String s = LiKafkaClientsTestUtils.getRandomString(100);
  assertEquals(s.length(), 100);
  byte[] stringBytes = stringSerializer.serialize("topic", s);
  assertEquals(stringBytes.length, 100);
  LargeMessageSegment segment =
      new LargeMessageSegment(LiKafkaClientsUtils.randomUUID(), 0, 2, stringBytes.length, ByteBuffer.wrap(stringBytes));
  // String bytes + segment header
  byte[] serializedSegment = segmentSerializer.serialize("topic", segment);
  assertEquals(serializedSegment.length, 1 + stringBytes.length + LargeMessageSegment.SEGMENT_INFO_OVERHEAD + 4);

  LargeMessageSegment deserializedSegment = segmentDeserializer.deserialize("topic", serializedSegment);
  assertEquals(deserializedSegment.messageId, segment.messageId);
  assertEquals(deserializedSegment.messageSizeInBytes, segment.messageSizeInBytes);
  assertEquals(deserializedSegment.numberOfSegments, segment.numberOfSegments);
  assertEquals(deserializedSegment.sequenceNumber, segment.sequenceNumber);
  assertEquals(deserializedSegment.payload.limit(), 100);
  String deserializedString = stringDeserializer.deserialize("topic", deserializedSegment.payloadArray());
  assertEquals(deserializedString.length(), s.length());
}
项目:li-apache-kafka-clients    文件:MessageAssemblerTest.java   
@Test
public void testSingleMessageSegment() {
  // Create serializer/deserializers.
  Serializer<LargeMessageSegment> segmentSerializer = new DefaultSegmentSerializer();
  Deserializer<LargeMessageSegment> segmentDeserializer = new DefaultSegmentDeserializer();

  byte[] messageWrappedBytes = wrapMessageBytes(segmentSerializer, "message".getBytes());

  MessageAssembler messageAssembler = new MessageAssemblerImpl(100, 100, true, segmentDeserializer);
  MessageAssembler.AssembleResult assembleResult =
      messageAssembler.assemble(new TopicPartition("topic", 0), 0, messageWrappedBytes);

  assertNotNull(assembleResult.messageBytes());
  assertEquals(assembleResult.messageStartingOffset(), 0, "The message starting offset should be 0");
  assertEquals(assembleResult.messageEndingOffset(), 0, "The message ending offset should be 0");
}
项目:hello-kafka-streams    文件:JsonPOJOSerde.java   
@Override
public Deserializer<T> deserializer() {
    return new Deserializer<T>() {
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {

        }

        @Override
        public T deserialize(String topic, byte[] data) {
            T result;
            try {
                result = mapper.readValue(data, cls);
            } catch (Exception e) {
                throw new SerializationException(e);
            }

            return result;
        }

        @Override
        public void close() {

        }
    };
}
项目:kmq    文件:KafkaClients.java   
public <K, V> KafkaConsumer<K, V> createConsumer(String groupId,
                                                 Class<? extends Deserializer<K>> keyDeserializer,
                                                 Class<? extends Deserializer<V>> valueDeserializer) {
    Properties props = new Properties();
    props.put("bootstrap.servers", bootstrapServers);
    props.put("enable.auto.commit", "false");
    props.put("key.deserializer", keyDeserializer.getName());
    props.put("value.deserializer", valueDeserializer.getName());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    if (groupId != null) {
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    }

    return new KafkaConsumer<>(props);
}
项目:nighthawk    文件:PayloadCodec.java   
public static <K, V> Payload<K, V> decodePayload(Deserializer<V> valueDeserializer, ConsumerRecord<K, byte[]> originConsumerRecord) {
    TracingHeader tracingHeader = null;
    ConsumerRecord<K, V> dataRecord = null;
    boolean sampled = false;
    byte[] data = originConsumerRecord.value();
    byte[] vData = null;
    if (data.length <= HEADER_LENGTH) {
        vData = data;
    } else {
        ByteBuffer byteBuf = ByteBuffer.wrap(data);
        short magic = byteBuf.getShort(0);
        short tpLen = byteBuf.getShort(2);
        if (magic == MAGIC && tpLen == TracingHeader.LENGTH) {
            byte[] tpBytes = new byte[tpLen];
            System.arraycopy(byteBuf.array(), HEADER_LENGTH, tpBytes, 0, tpLen);
            tracingHeader = TracingHeader.fromBytes(tpBytes);
            sampled = true;
            int dataOffset = tpLen + HEADER_LENGTH;
            vData = new byte[byteBuf.array().length - dataOffset];
            System.arraycopy(byteBuf.array(), dataOffset, vData, 0, vData.length);
        } else {
            vData = data;
        }
    }
    dataRecord = new ConsumerRecord<>(originConsumerRecord.topic(),
            originConsumerRecord.partition(), originConsumerRecord.offset(),
            originConsumerRecord.key(), valueDeserializer.deserialize(originConsumerRecord.topic(), vData));
    return new Payload<>(tracingHeader, dataRecord, sampled);
}
项目:nighthawk    文件:ListenableTracingConsumer.java   
public ListenableTracingConsumer(Consumer<K, byte[]> delegate, Collection<String> topics, Deserializer<V> valueDeserializer) {
    super(delegate);
    this.delegate = delegate;
    this.topics = topics;
    this.topicPattern = null;
    this.valueDeserializer = valueDeserializer;
}
项目:nighthawk    文件:ListenableTracingConsumer.java   
public ListenableTracingConsumer(Consumer<K, byte[]> delegate, Pattern topicPattern, Deserializer<V> valueDeserializer) {
    super(delegate);
    this.delegate = delegate;
    this.topicPattern = topicPattern;
    this.topics = null;
    this.valueDeserializer = valueDeserializer;
}
项目:kafka-junit    文件:KafkaTestServer.java   
/**
 * Return Kafka Consumer configured to consume from internal Kafka Server.
 * @param <K> Type of message key
 * @param <V> Type of message value
 * @param keyDeserializer Class of deserializer to be used for keys.
 * @param valueDeserializer Class of deserializer to be used for values.
 * @return KafkaProducer configured to produce into Test server.
 */
public <K, V> KafkaConsumer<K, V> getKafkaConsumer(
    final Class<? extends Deserializer<K>> keyDeserializer,
    final Class<? extends Deserializer<V>> valueDeserializer) {

    // Build config
    Map<String, Object> kafkaConsumerConfig = buildDefaultClientConfig();
    kafkaConsumerConfig.put("key.deserializer", keyDeserializer);
    kafkaConsumerConfig.put("value.deserializer", valueDeserializer);
    kafkaConsumerConfig.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");

    // Create and return Consumer.
    return new KafkaConsumer<>(kafkaConsumerConfig);
}
项目:kafka-webview    文件:DeserializerConfig.java   
/**
 * Constructor.
 * @param keyDeserializerClass Class for deserializer for keys.
 * @param valueDeserializerClass Class for deserializer for values.
 */
private DeserializerConfig(
    final Class<? extends Deserializer> keyDeserializerClass,
    final Map<String, String> keyDeserializerOptions,
    final Class<? extends Deserializer> valueDeserializerClass,
    final Map<String, String> valueDeserializerOptions
) {
    this.keyDeserializerClass = keyDeserializerClass;
    this.keyDeserializerOptions = new HashMap<>();
    this.keyDeserializerOptions.putAll(keyDeserializerOptions);

    this.valueDeserializerClass = valueDeserializerClass;
    this.valueDeserializerOptions = new HashMap<>();
    this.valueDeserializerOptions.putAll(valueDeserializerOptions);
}
项目:kafka-webview    文件:WebKafkaConsumerFactory.java   
/**
 * Constructor.
 */
public WebKafkaConsumerFactory(
    final PluginFactory<Deserializer> deserializerPluginFactory,
    final PluginFactory<RecordFilter> recordFilterPluginFactory,
    final SecretManager secretManager,
    final KafkaConsumerFactory kafkaConsumerFactory) {
    this.deserializerPluginFactory = deserializerPluginFactory;
    this.recordFilterPluginFactory = recordFilterPluginFactory;
    this.secretManager = secretManager;
    this.kafkaConsumerFactory = kafkaConsumerFactory;
}
项目:kafka-webview    文件:WebKafkaConsumerFactory.java   
private Class<? extends Deserializer> getDeserializerClass(final MessageFormat messageFormat) {
    try {
        if (messageFormat.isDefaultFormat()) {
            return deserializerPluginFactory.getPluginClass(messageFormat.getClasspath());
        } else {
            return deserializerPluginFactory.getPluginClass(messageFormat.getJar(), messageFormat.getClasspath());
        }
    } catch (final LoaderException exception) {
        throw new RuntimeException(exception.getMessage(), exception);
    }
}
项目:kafka-webview    文件:PluginFactoryTest.java   
/**
 * Test creating a Deserializer.
 */
@Test
public void testWithDeserializer() throws LoaderException {
    final String jarFilename = "testPlugins.jar";
    final String classPath = "examples.deserializer.ExampleDeserializer";

    // Find jar on filesystem.
    final URL jar = getClass().getClassLoader().getResource("testDeserializer/" + jarFilename);
    final String jarPath = new File(jar.getFile()).getParent();

    // Create factory
    final PluginFactory<Deserializer> factory = new PluginFactory<>(jarPath, Deserializer.class);
    final Path pathForJar = factory.getPathForJar(jarFilename);

    // Validate path is correct
    assertEquals("Has expected Path", jar.getPath(), pathForJar.toString());

    // Get class instance
    final Class<? extends Deserializer> pluginFilterClass = factory.getPluginClass(jarFilename, classPath);

    // Validate
    assertNotNull(pluginFilterClass);
    assertEquals("Has expected name", classPath, pluginFilterClass.getName());
    assertTrue("Validate came from correct class loader", pluginFilterClass.getClassLoader() instanceof PluginClassLoader);

    // Crete filter instance
    final Deserializer deserializer = factory.getPlugin(jarFilename, classPath);
    assertNotNull(deserializer);
    assertEquals("Has correct name", classPath, deserializer.getClass().getName());

    // Call method on interface
    final String value = "MyValue";
    final String result = (String) deserializer.deserialize("MyTopic", value.getBytes(StandardCharsets.UTF_8));
}
项目:kafka-webview    文件:PluginFactoryTest.java   
/**
 * Tests loading a deserializer not from an external jar.
 */
@Test
public void testLoadingDefaultDeserializer() throws LoaderException {
    final String classPath = StringDeserializer.class.getName();

    // Create factory
    final PluginFactory<Deserializer> factory = new PluginFactory<>("/tmp", Deserializer.class);

    // Get class instance
    final Class<? extends Deserializer> pluginFilterClass = factory.getPluginClass(classPath);

    // Validate
    assertNotNull(pluginFilterClass);
    assertEquals("Has expected name", classPath, pluginFilterClass.getName());
}
项目:kafka-webview    文件:WebKafkaConsumerFactoryTest.java   
private WebKafkaConsumerFactory createDefaultFactory() {
    final PluginFactory<Deserializer> deserializerPluginFactory = new PluginFactory<>("not/used", Deserializer.class);
    final PluginFactory<RecordFilter> filterPluginFactoryPluginFactory = new PluginFactory<>("not/used", RecordFilter.class);
    final SecretManager secretManager = new SecretManager("Passphrase");
    final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory("not/used");

    return new WebKafkaConsumerFactory(
        deserializerPluginFactory,
        filterPluginFactoryPluginFactory,
        secretManager,
        kafkaConsumerFactory
    );
}
项目:apache-kafka-demos    文件:FilterStream.java   
public static void main(String[] args) throws InterruptedException {

        Properties props = new Properties();
        props.put(APPLICATION_ID_CONFIG, "my-stream-processing-application");
        props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.serializer", JsonPOJOSerializer.class.getName());
        props.put("value.deserializer", JsonPOJODeserializer.class.getName());

        Map<String, Object> serdeProps = new HashMap<>();
        serdeProps.put("JsonPOJOClass", Messung.class);

        final Serializer<Messung> serializer = new JsonPOJOSerializer<>();
        serializer.configure(serdeProps, false);

        final Deserializer<Messung> deserializer = new JsonPOJODeserializer<>();
        deserializer.configure(serdeProps, false);

        final Serde<Messung> serde = Serdes.serdeFrom(serializer, deserializer);

        StreamsConfig config = new StreamsConfig(props);

        KStreamBuilder builder = new KStreamBuilder();

        builder.stream(Serdes.String(), serde, "produktion")
                .filter( (k,v) -> v.type.equals("Biogas"))
                .to(Serdes.String(), serde,"produktion2");

        KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();
    }
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamImpl.java   
static <K1, V1> String createReparitionedSource(AbstractStream<K1> stream,
                                                Serde<K1> keySerde,
                                                Serde<V1> valSerde,
                                                final String topicNamePrefix) {
    Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null;
    Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null;
    Deserializer<K1> keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
    Deserializer<V1> valDeserializer = valSerde != null ? valSerde.deserializer() : null;
    String baseName = topicNamePrefix != null ? topicNamePrefix : stream.name;

    String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
    String sinkName = stream.topology.newName(SINK_NAME);
    String filterName = stream.topology.newName(FILTER_NAME);
    String sourceName = stream.topology.newName(SOURCE_NAME);

    stream.topology.addInternalTopic(repartitionTopic);
    stream.topology.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() {
        @Override
        public boolean test(final K1 key, final V1 value) {
            return key != null;
        }
    }, false), stream.name);

    stream.topology.addSink(sinkName, repartitionTopic, keySerializer,
                     valSerializer, filterName);
    stream.topology.addSource(sourceName, keyDeserializer, valDeserializer,
                       repartitionTopic);

    return sourceName;
}
项目:kafka-0.11.0.0-src-with-comment    文件:KGroupedTableImpl.java   
private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
                                     final String functionName,
                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
    String sinkName = topology.newName(KStreamImpl.SINK_NAME);
    String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
    String funcName = topology.newName(functionName);

    String topic = storeSupplier.name() + KStreamImpl.REPARTITION_TOPIC_SUFFIX;

    Serializer<? extends K> keySerializer = keySerde == null ? null : keySerde.serializer();
    Deserializer<? extends K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
    Serializer<? extends V> valueSerializer = valSerde == null ? null : valSerde.serializer();
    Deserializer<? extends V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();

    ChangedSerializer<? extends V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
    ChangedDeserializer<? extends V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);

    // send the aggregate key-value pairs to the intermediate topic for partitioning
    topology.addInternalTopic(topic);
    topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name);

    // read the intermediate topic
    topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);

    // aggregate the values with the aggregator and local store
    topology.addProcessor(funcName, aggregateSupplier, sourceName);
    topology.addStateStore(storeSupplier, funcName);

    // return the KTable representation with the intermediate topic as the sources
    return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName), storeSupplier.name(), isQueryable);
}
项目:kafka-0.11.0.0-src-with-comment    文件:SourceNode.java   
public SourceNode(String name, List<String> topics, TimestampExtractor timestampExtractor, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) {
    super(name);
    this.topics = topics;
    this.timestampExtractor = timestampExtractor;
    this.keyDeserializer = ensureExtended(keyDeserializer);
    this.valDeserializer = ensureExtended(valDeserializer);
}
项目:kafka-0.11.0.0-src-with-comment    文件:TopologyBuilder.java   
private SourceNodeFactory(final String name,
                          final String[] topics,
                          final Pattern pattern,
                          final TimestampExtractor timestampExtractor,
                          final Deserializer<?> keyDeserializer,
                          final Deserializer<?> valDeserializer) {
    super(name);
    this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<String>();
    this.pattern = pattern;
    this.keyDeserializer = keyDeserializer;
    this.valDeserializer = valDeserializer;
    this.timestampExtractor = timestampExtractor;
}
项目:kafka-0.11.0.0-src-with-comment    文件:TopologyBuilder.java   
/**
 * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
 * The source will use the specified key and value deserializers.
 *
 * @param offsetReset        the auto offset reset policy to use for this stream if no committed offsets found;
 *                           acceptable values are earliest or latest.
 * @param name               the unique name of the source used to reference this node when
 *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
 * @param timestampExtractor the stateless timestamp extractor used for this source,
 *                           if not specified the default extractor defined in the configs will be used
 * @param keyDeserializer    key deserializer used to read this source, if not specified the default
 *                           key deserializer defined in the configs will be used
 * @param valDeserializer    value deserializer used to read this source,
 *                           if not specified the default value deserializer defined in the configs will be used
 * @param topics             the name of one or more Kafka topics that this source is to consume
 * @return this builder instance so methods can be chained together; never null
 * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source
 */

public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
                                                    final String name,
                                                    final TimestampExtractor timestampExtractor,
                                                    final Deserializer keyDeserializer,
                                                    final Deserializer valDeserializer,
                                                    final String... topics) {
    if (topics.length == 0) {
        throw new TopologyBuilderException("You must provide at least one topic");
    }
    Objects.requireNonNull(name, "name must not be null");
    if (nodeFactories.containsKey(name))
        throw new TopologyBuilderException("Processor " + name + " is already added.");

    for (String topic : topics) {
        Objects.requireNonNull(topic, "topic names cannot be null");
        validateTopicNotAlreadyRegistered(topic);
        maybeAddToResetList(earliestResetTopics, latestResetTopics, offsetReset, topic);
        sourceTopicNames.add(topic);
    }

    nodeFactories.put(name, new SourceNodeFactory(name, topics, null, timestampExtractor, keyDeserializer, valDeserializer));
    nodeToSourceTopics.put(name, Arrays.asList(topics));
    nodeGrouper.add(name);

    return this;
}
项目:kafka-0.11.0.0-src-with-comment    文件:TopologyBuilder.java   
/**
 * Add a new source that consumes from topics matching the given pattern
 * and forwards the records to child processor and/or sink nodes.
 * The source will use the specified key and value deserializers. The provided
 * de-/serializers will be used for all matched topics, so care should be taken to specify patterns for
 * topics that share the same key-value data format.
 *
 * @param offsetReset        the auto offset reset policy to use for this stream if no committed offsets found;
 *                           acceptable values are earliest or latest
 * @param name               the unique name of the source used to reference this node when
 *                           {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}.
 * @param timestampExtractor the stateless timestamp extractor used for this source,
 *                           if not specified the default extractor defined in the configs will be used
 * @param keyDeserializer    key deserializer used to read this source, if not specified the default
 *                           key deserializer defined in the configs will be used
 * @param valDeserializer    value deserializer used to read this source,
 *                           if not specified the default value deserializer defined in the configs will be used
 * @param topicPattern       regular expression pattern to match Kafka topics that this source is to consume
 * @return this builder instance so methods can be chained together; never null
 * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name
 */

public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset,
                                                    final String name,
                                                    final TimestampExtractor timestampExtractor,
                                                    final Deserializer keyDeserializer,
                                                    final Deserializer valDeserializer,
                                                    final Pattern topicPattern) {
    Objects.requireNonNull(topicPattern, "topicPattern can't be null");
    Objects.requireNonNull(name, "name can't be null");

    if (nodeFactories.containsKey(name)) {
        throw new TopologyBuilderException("Processor " + name + " is already added.");
    }

    for (String sourceTopicName : sourceTopicNames) {
        if (topicPattern.matcher(sourceTopicName).matches()) {
            throw new TopologyBuilderException("Pattern  " + topicPattern + " will match a topic that has already been registered by another source.");
        }
    }

    maybeAddToResetList(earliestResetPatterns, latestResetPatterns, offsetReset, topicPattern);

    nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, timestampExtractor, keyDeserializer, valDeserializer));
    nodeToSourcePatterns.put(name, topicPattern);
    nodeGrouper.add(name);

    return this;
}
项目:kafka-0.11.0.0-src-with-comment    文件:ProcessorTopologyTestDriver.java   
/**
 * Read the next record from the given topic. These records were output by the topology during the previous calls to
 * {@link #process(String, byte[], byte[])}.
 *
 * @param topic the name of the topic
 * @param keyDeserializer the deserializer for the key type
 * @param valueDeserializer the deserializer for the value type
 * @return the next record on that topic, or null if there is no record available
 */
public <K, V> ProducerRecord<K, V> readOutput(final String topic,
                                              final Deserializer<K> keyDeserializer,
                                              final Deserializer<V> valueDeserializer) {
    final ProducerRecord<byte[], byte[]> record = readOutput(topic);
    if (record == null) {
        return null;
    }
    final K key = keyDeserializer.deserialize(record.topic(), record.key());
    final V value = valueDeserializer.deserialize(record.topic(), record.value());
    return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value);
}
项目:kafka-0.11.0.0-src-with-comment    文件:Fetcher.java   
public Fetcher(ConsumerNetworkClient client,
               int minBytes,
               int maxBytes,
               int maxWaitMs,
               int fetchSize,
               int maxPollRecords,
               boolean checkCrcs,
               Deserializer<K> keyDeserializer,
               Deserializer<V> valueDeserializer,
               Metadata metadata,
               SubscriptionState subscriptions,
               Metrics metrics,
               FetcherMetricsRegistry metricsRegistry,
               Time time,
               long retryBackoffMs,
               IsolationLevel isolationLevel) {
    this.time = time;
    this.client = client;
    this.metadata = metadata;
    this.subscriptions = subscriptions;
    this.minBytes = minBytes;
    this.maxBytes = maxBytes;
    this.maxWaitMs = maxWaitMs;
    this.fetchSize = fetchSize;
    this.maxPollRecords = maxPollRecords;
    this.checkCrcs = checkCrcs;
    this.keyDeserializer = ensureExtended(keyDeserializer);
    this.valueDeserializer = ensureExtended(valueDeserializer);
    this.completedFetches = new ConcurrentLinkedQueue<>();
    this.sensors = new FetchManagerMetrics(metrics, metricsRegistry);
    this.retryBackoffMs = retryBackoffMs;
    this.isolationLevel = isolationLevel;

    subscriptions.addListener(this);
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerConfig.java   
public static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs,
                                                          Deserializer<?> keyDeserializer,
                                                          Deserializer<?> valueDeserializer) {
    Map<String, Object> newConfigs = new HashMap<String, Object>();
    newConfigs.putAll(configs);
    if (keyDeserializer != null)
        newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());
    if (valueDeserializer != null)
        newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());
    return newConfigs;
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerConfig.java   
public static Properties addDeserializerToConfig(Properties properties,
                                                 Deserializer<?> keyDeserializer,
                                                 Deserializer<?> valueDeserializer) {
    Properties newProperties = new Properties();
    newProperties.putAll(properties);
    if (keyDeserializer != null)
        newProperties.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
    if (valueDeserializer != null)
        newProperties.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
    return newProperties;
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConsumer.java   
KafkaConsumer(String clientId,
              ConsumerCoordinator coordinator,
              Deserializer<K> keyDeserializer,
              Deserializer<V> valueDeserializer,
              Fetcher<K, V> fetcher,
              ConsumerInterceptors<K, V> interceptors,
              Time time,
              ConsumerNetworkClient client,
              Metrics metrics,
              SubscriptionState subscriptions,
              Metadata metadata,
              long retryBackoffMs,
              long requestTimeoutMs) {
    this.clientId = clientId;
    this.coordinator = coordinator;
    this.keyDeserializer = keyDeserializer;
    this.valueDeserializer = valueDeserializer;
    this.fetcher = fetcher;
    this.interceptors = interceptors;
    this.time = time;
    this.client = client;
    this.metrics = metrics;
    this.subscriptions = subscriptions;
    this.metadata = metadata;
    this.retryBackoffMs = retryBackoffMs;
    this.requestTimeoutMs = requestTimeoutMs;
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConsumer.java   
private ClusterResourceListeners configureClusterResourceListeners(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, List<?>... candidateLists) {
    ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners();
    for (List<?> candidateList: candidateLists)
        clusterResourceListeners.maybeAddAll(candidateList);

    clusterResourceListeners.maybeAdd(keyDeserializer);
    clusterResourceListeners.maybeAdd(valueDeserializer);
    return clusterResourceListeners;
}