Java 类org.apache.kafka.common.serialization.LongDeserializer 实例源码

项目:wechat-mall    文件:OrderConsumer.java   
@Override
public KafkaConsumer<Long, byte[]> worker() {

    Properties props = AbstractKafkaClient.configBuilder()//
            .put(BOOTSTRAP_SERVERS_CONFIG, bootstrap)//
            .put(GROUP_ID_CONFIG, MallConstants.ORDER_GROUP)//
            .put(ENABLE_AUTO_COMMIT_CONFIG, true)//
            .put(MAX_POLL_RECORDS_CONFIG, "100")//
            .put(SESSION_TIMEOUT_MS_CONFIG, "30000")//
            .put(FETCH_MIN_BYTES_CONFIG, 1)//
            .put(AUTO_COMMIT_INTERVAL_MS_CONFIG, AUTO_COMMIT_INTERVAL_MS)//
            .put(KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName())//
            .put(VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName())//
            .build();

    this.worker = new KafkaConsumer<>(props);
    this.worker.subscribe(topics);
    System.out.printf("started");
    return this.worker;
}
项目:kafka-webview    文件:DataLoaderConfig.java   
/**
 * Creates default message formats.
 */
private void createDefaultMessageFormats() {
    final Map<String, String> defaultFormats = new HashMap<>();
    defaultFormats.put("Short", ShortDeserializer.class.getName());
    defaultFormats.put("ByteArray", ByteArrayDeserializer.class.getName());
    defaultFormats.put("Bytes", BytesDeserializer.class.getName());
    defaultFormats.put("Double", DoubleDeserializer.class.getName());
    defaultFormats.put("Float", FloatDeserializer.class.getName());
    defaultFormats.put("Integer", IntegerDeserializer.class.getName());
    defaultFormats.put("Long", LongDeserializer.class.getName());
    defaultFormats.put("String", StringDeserializer.class.getName());

    // Create if needed.
    for (final Map.Entry<String, String> entry : defaultFormats.entrySet()) {
        MessageFormat messageFormat = messageFormatRepository.findByName(entry.getKey());
        if (messageFormat == null) {
            messageFormat = new MessageFormat();
        }
        messageFormat.setName(entry.getKey());
        messageFormat.setClasspath(entry.getValue());
        messageFormat.setJar("n/a");
        messageFormat.setDefaultFormat(true);
        messageFormatRepository.save(messageFormat);
    }
}
项目:talk-kafka-messaging-logs    文件:ProduceConsumeLongByteArrayRecord.java   
private static void consumeRecords(String bootstrapServers) {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "byte-array-consumer");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

    Consumer<Long, byte[]> consumer = new KafkaConsumer<>(props);

    consumer.subscribe(Arrays.asList(TOPIC));

    ConsumerRecords<Long, byte[]> records = consumer.poll(10000);

    for (ConsumerRecord<Long, byte[]> record : records)
        out.printf(
                "key = %s value = %s%n",
                record.key(),
                new String(record.value()));

    consumer.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:JoinIntegrationTest.java   
@BeforeClass
public static void setupConfigsAndUtils() throws Exception {
    PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
    PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
    PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
    PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer");
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
    STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
    STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
    STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
}
项目:kafka-0.11.0.0-src-with-comment    文件:EosIntegrationTest.java   
private List<KeyValue<Long, Long>> readResult(final int numberOfRecords,
                                              final String groupId) throws Exception {
    if (groupId != null) {
        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
            TestUtils.consumerConfig(
                CLUSTER.bootstrapServers(),
                groupId,
                LongDeserializer.class,
                LongDeserializer.class,
                new Properties() {
                    {
                        put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
                    }
                }),
            SINGLE_PARTITION_OUTPUT_TOPIC,
            numberOfRecords
        );
    }

    // read uncommitted
    return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
        TestUtils.consumerConfig(CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class),
        SINGLE_PARTITION_OUTPUT_TOPIC,
        numberOfRecords
    );
}
项目:big-data-benchmark    文件:TradeTestConsumer.java   
public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", args[0]);
    props.setProperty("group.id", UUID.randomUUID().toString());
    props.setProperty("key.deserializer", LongDeserializer.class.getName());
    props.setProperty("value.deserializer", TradeDeserializer.class.getName());
    props.setProperty("auto.offset.reset", "earliest");
    KafkaConsumer<Long, Trade> consumer = new KafkaConsumer<>(props);
    List<String> topics = Arrays.asList(args[1]);
    consumer.subscribe(topics);
    System.out.println("Subscribed to topics " + topics);
    long count = 0;
    long start = System.nanoTime();
    while (true) {
        ConsumerRecords<Long, Trade> poll = consumer.poll(5000);
        System.out.println("Partitions in batch: " + poll.partitions());
        LongSummaryStatistics stats = StreamSupport.stream(poll.spliterator(), false)
                                                                   .mapToLong(r -> r.value().getTime()).summaryStatistics();
        System.out.println("Oldest record time: " + stats.getMin() + ", newest record: " + stats.getMax());
        count += poll.count();
        long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
        long rate = (long) ((double) count / elapsed * 1000);
        System.out.printf("Total count: %,d in %,dms. Average rate: %,d records/s %n", count, elapsed, rate);

    }
}
项目:beam    文件:KafkaIOTest.java   
/**
 * Creates a consumer with two topics, with 10 partitions each.
 * numElements are (round-robin) assigned all the 20 partitions.
 */
private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(
    int numElements,
    int maxNumRecords,
    @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) {

  List<String> topics = ImmutableList.of("topic_a", "topic_b");

  KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read()
      .withBootstrapServers("myServer1:9092,myServer2:9092")
      .withTopics(topics)
      .withConsumerFactoryFn(new ConsumerFactoryFn(
          topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions
      .withKeyDeserializer(IntegerDeserializer.class)
      .withValueDeserializer(LongDeserializer.class)
      .withMaxNumRecords(maxNumRecords);

  if (timestampFn != null) {
    return reader.withTimestampFn(timestampFn);
  } else {
    return reader;
  }
}
项目:beam    文件:KafkaIOTest.java   
@Test
public void testUnboundedSourceWithSingleTopic() {
  // same as testUnboundedSource, but with single topic

  int numElements = 1000;
  String topic = "my_topic";

  KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read()
      .withBootstrapServers("none")
      .withTopic("my_topic")
      .withConsumerFactoryFn(new ConsumerFactoryFn(
          ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST))
      .withMaxNumRecords(numElements)
      .withKeyDeserializer(IntegerDeserializer.class)
      .withValueDeserializer(LongDeserializer.class);

  PCollection<Long> input = p
      .apply(reader.withoutMetadata())
      .apply(Values.<Long>create());

  addCountingAsserts(input, numElements);
  p.run();
}
项目:beam    文件:KafkaIOTest.java   
@Test
public void testSourceWithExplicitPartitionsDisplayData() {
  KafkaIO.Read<byte[], Long> read = KafkaIO.<byte[], Long>read()
      .withBootstrapServers("myServer1:9092,myServer2:9092")
      .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5),
          new TopicPartition("test", 6)))
      .withConsumerFactoryFn(new ConsumerFactoryFn(
          Lists.newArrayList("test"), 10, 10, OffsetResetStrategy.EARLIEST)) // 10 partitions
      .withKeyDeserializer(ByteArrayDeserializer.class)
      .withValueDeserializer(LongDeserializer.class);

  DisplayData displayData = DisplayData.from(read);

  assertThat(displayData, hasDisplayItem("topicPartitions", "test-5,test-6"));
  assertThat(displayData, hasDisplayItem("enable.auto.commit", false));
  assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092"));
  assertThat(displayData, hasDisplayItem("auto.offset.reset", "latest"));
  assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288));
}
项目:beam    文件:KafkaIOTest.java   
@Test
public void testInferKeyCoder() {
  CoderRegistry registry = CoderRegistry.createDefault();

  assertTrue(KafkaIO.inferCoder(registry, LongDeserializer.class).getValueCoder()
          instanceof VarLongCoder);

  assertTrue(KafkaIO.inferCoder(registry, StringDeserializer.class).getValueCoder()
          instanceof StringUtf8Coder);

  assertTrue(KafkaIO.inferCoder(registry, InstantDeserializer.class).getValueCoder()
          instanceof InstantCoder);

  assertTrue(KafkaIO.inferCoder(registry, DeserializerWithInterfaces.class).getValueCoder()
          instanceof VarLongCoder);
}
项目:wechat-mall    文件:OrderProducer.java   
@Override
public KafkaProducer<Long, byte[]> worker() {

    Properties props = AbstractKafkaClient.configBuilder()//
            .put(BOOTSTRAP_SERVERS_CONFIG, bootstrap)//
            .put(ACKS_CONFIG, "all").put(RETRIES_CONFIG, 3)//
            .put(BATCH_SIZE_CONFIG, 16384)//
            .put(LINGER_MS_CONFIG, 1)//
            .put(BUFFER_MEMORY_CONFIG, 33554432)//
            .put(KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName())//
            .put(VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName())//
            .build();

    return this.worker = new KafkaProducer<>(props);
}
项目:Lagerta    文件:EmbeddedKafkaRule.java   
private Collection<String> listTopics() {
    Properties consumerConfig = new Properties() {{
        setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.format("%s:%s", LOCALHOST, kafkaPort));
        setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
    }};
    try (Consumer<Long, Long> consumer = new KafkaConsumer<>(consumerConfig)) {
        return consumer.listTopics().keySet();
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamAggregationIntegrationTest.java   
private void shouldCountHelper() throws Exception {
    startStreams();

    produceMessages(mockTime.milliseconds());

    final List<KeyValue<String, Long>> results = receiveMessages(
        new StringDeserializer(),
        new LongDeserializer(),
        10);
    Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
        @Override
        public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) {
            return KStreamAggregationIntegrationTest.compare(o1, o2);
        }
    });

    assertThat(results, is(Arrays.asList(
        KeyValue.pair("A", 1L),
        KeyValue.pair("A", 2L),
        KeyValue.pair("B", 1L),
        KeyValue.pair("B", 2L),
        KeyValue.pair("C", 1L),
        KeyValue.pair("C", 2L),
        KeyValue.pair("D", 1L),
        KeyValue.pair("D", 2L),
        KeyValue.pair("E", 1L),
        KeyValue.pair("E", 2L)
    )));
}
项目:kafka-0.11.0.0-src-with-comment    文件:QueryableStateIntegrationTest.java   
private void waitUntilAtLeastNumRecordProcessed(final String topic, final int numRecs) throws InterruptedException {
    final Properties config = new Properties();
    config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "queryable-state-consumer");
    config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class.getName());
    config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        LongDeserializer.class.getName());
    IntegrationTestUtils.waitUntilMinValuesRecordsReceived(
        config,
        topic,
        numRecs,
        120 * 1000);
}
项目:talk-observing-distributed-systems    文件:IndexerServiceApplication.java   
public void run(Configuration configuration, Environment environment) throws Exception {
  // INSTRUMENTATION
  // Metrics Instrumentation
  final CollectorRegistry collectorRegistry = new CollectorRegistry();
  collectorRegistry.register(new DropwizardExports(environment.metrics()));
  environment.admin()
      .addServlet("metrics", new MetricsServlet(collectorRegistry))
      .addMapping("/metrics");

  final PrometheusMetricsReporter reporter = PrometheusMetricsReporter.newMetricsReporter()
      .withCollectorRegistry(collectorRegistry)
      .withConstLabel("service", getName())
      .build();

  // Tracing Instrumentation
  final Tracer tracer = getTracer();
  final Tracer metricsTracer = io.opentracing.contrib.metrics.Metrics.decorate(tracer, reporter);
  GlobalTracer.register(metricsTracer);

  final HttpHost httpHost = new HttpHost("tweets-elasticsearch", 9200);
  final RestClientBuilder restClientBuilder =
      RestClient.builder(httpHost).setHttpClientConfigCallback(new TracingHttpClientConfigCallback(metricsTracer));
  final RestClient restClient = restClientBuilder.build();
  final ElasticsearchTweetRepository elasticsearchRepository = new ElasticsearchTweetRepository(restClient);

  final Properties consumerConfigs = new Properties();
  consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "tweets-kafka:9092");
  consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, getName());
  consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  final KafkaConsumer<Long, String> kafkaConsumer = new KafkaConsumer<>(consumerConfigs, new LongDeserializer(), new StringDeserializer());
  final TracingKafkaConsumer<Long, String> tracingKafkaConsumer = new TracingKafkaConsumer<>(kafkaConsumer, metricsTracer);
  final Runnable kafkaTweetEventConsumer = new KafkaTweetEventConsumer(tracingKafkaConsumer, elasticsearchRepository);
  final ExecutorService executorService = environment.lifecycle().executorService("kafka-consumer").build();
  executorService.submit(kafkaTweetEventConsumer);
}
项目:big-data-benchmark    文件:JetTradeMonitor.java   
private static Properties getKafkaProperties(String brokerUrl, String offsetReset) {
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", brokerUrl);
    props.setProperty("group.id", UUID.randomUUID().toString());
    props.setProperty("key.deserializer", LongDeserializer.class.getName());
    props.setProperty("value.deserializer", TradeDeserializer.class.getName());
    props.setProperty("auto.offset.reset", offsetReset);
    props.setProperty("max.poll.records", "32768");
    //props.setProperty("metadata.max.age.ms", "5000");
    return props;
}
项目:big-data-benchmark    文件:FlinkTradeMonitor.java   
private static Properties getKafkaProperties(String brokerUrl, String offsetReset) {
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", brokerUrl);
    props.setProperty("group.id", UUID.randomUUID().toString());
    props.setProperty("key.deserializer", LongDeserializer.class.getName());
    props.setProperty("value.deserializer", TradeDeserializer.class.getName());
    props.setProperty("auto.offset.reset", offsetReset);
    props.setProperty("max.poll.records", "32768");
    return props;
}
项目:big-data-benchmark    文件:SparkTradeMonitor.java   
private static Map<String, Object> getKafkaProperties(String brokerUrl) {
    Map<String, Object> props = new HashMap<>();
    props.put("bootstrap.servers", brokerUrl);
    props.put("group.id", UUID.randomUUID().toString());
    props.put("key.deserializer", LongDeserializer.class);
    props.put("value.deserializer", TradeDeserializer.class);
    props.put("auto.offset.reset", "latest");
    return props;
}
项目:spring-cloud-stream-binder-kafka    文件:KafkaBinderTests.java   
@Test
@SuppressWarnings("unchecked")
public void testConsumerCustomDeserializer() throws Exception {
    Binding<?> binding = null;
    try {
        KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
        Map<String, String> propertiesToOverride = configurationProperties.getConfiguration();
        propertiesToOverride.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        propertiesToOverride.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
        configurationProperties.setConfiguration(propertiesToOverride);
        String testTopicName = "existing" + System.currentTimeMillis();
        configurationProperties.setAutoCreateTopics(false);
        Binder binder = getBinder(configurationProperties);

        ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
        DirectChannel input = createBindableChannel("input", createConsumerBindingProperties(consumerProperties));

        binding = binder.bindConsumer(testTopicName, "test", input, consumerProperties);
        DirectFieldAccessor consumerAccessor = new DirectFieldAccessor(getKafkaConsumer(binding));
        assertTrue("Expected StringDeserializer as a custom key deserializer",
                consumerAccessor.getPropertyValue("keyDeserializer") instanceof StringDeserializer);
        assertTrue("Expected LongDeserializer as a custom value deserializer",
                consumerAccessor.getPropertyValue("valueDeserializer") instanceof LongDeserializer);
    }
    finally {
        if (binding != null) {
            binding.unbind();
        }
    }
}
项目:beam    文件:KafkaIOTest.java   
@Test
public void testUnreachableKafkaBrokers() {
  // Expect an exception when the Kafka brokers are not reachable on the workers.
  // We specify partitions explicitly so that splitting does not involve server interaction.
  // Set request timeout to 10ms so that test does not take long.

  thrown.expect(Exception.class);
  thrown.expectMessage("Reader-0: Timeout while initializing partition 'test-0'");

  int numElements = 1000;
  PCollection<Long> input = p
      .apply(KafkaIO.<Integer, Long>read()
          .withBootstrapServers("8.8.8.8:9092") // Google public DNS ip.
          .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 0)))
          .withKeyDeserializer(IntegerDeserializer.class)
          .withValueDeserializer(LongDeserializer.class)
          .updateConsumerProperties(ImmutableMap.<String, Object>of(
              ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10,
              ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5,
              ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 8,
              ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 8))
          .withMaxNumRecords(10)
          .withoutMetadata())
      .apply(Values.<Long>create());

  addCountingAsserts(input, numElements);
  p.run();
}
项目:beam    文件:KafkaIOTest.java   
@Test
public void testUnboundedSourceWithExplicitPartitions() {
  int numElements = 1000;

  List<String> topics = ImmutableList.of("test");

  KafkaIO.Read<byte[], Long> reader = KafkaIO.<byte[], Long>read()
      .withBootstrapServers("none")
      .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5)))
      .withConsumerFactoryFn(new ConsumerFactoryFn(
          topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 10 partitions
      .withKeyDeserializer(ByteArrayDeserializer.class)
      .withValueDeserializer(LongDeserializer.class)
      .withMaxNumRecords(numElements / 10);

  PCollection<Long> input = p
      .apply(reader.withoutMetadata())
      .apply(Values.<Long>create());

  // assert that every element is a multiple of 5.
  PAssert
    .that(input)
    .satisfies(new AssertMultipleOf(5));

  PAssert
    .thatSingleton(input.apply(Count.<Long>globally()))
    .isEqualTo(numElements / 10L);

  p.run();
}
项目:beam    文件:KafkaIOTest.java   
@Test
public void testUnboundedSourceSplits() throws Exception {

  int numElements = 1000;
  int numSplits = 10;

  // Coders must be specified explicitly here due to the way the transform
  // is used in the test.
  UnboundedSource<KafkaRecord<Integer, Long>, ?> initial =
      mkKafkaReadTransform(numElements, null)
          .withKeyDeserializerAndCoder(IntegerDeserializer.class, BigEndianIntegerCoder.of())
          .withValueDeserializerAndCoder(LongDeserializer.class, BigEndianLongCoder.of())
          .makeSource();

  List<? extends UnboundedSource<KafkaRecord<Integer, Long>, ?>> splits =
      initial.split(numSplits, p.getOptions());
  assertEquals("Expected exact splitting", numSplits, splits.size());

  long elementsPerSplit = numElements / numSplits;
  assertEquals("Expected even splits", numElements, elementsPerSplit * numSplits);
  PCollectionList<Long> pcollections = PCollectionList.empty(p);
  for (int i = 0; i < splits.size(); ++i) {
    pcollections = pcollections.and(
        p.apply("split" + i, Read.from(splits.get(i)).withMaxNumRecords(elementsPerSplit))
         .apply("Remove Metadata " + i, ParDo.of(new RemoveKafkaMetadata<Integer, Long>()))
         .apply("collection " + i, Values.<Long>create()));
  }
  PCollection<Long> input = pcollections.apply(Flatten.<Long>pCollections());

  addCountingAsserts(input, numElements);
  p.run();
}
项目:kafka-0.11.0.0-src-with-comment    文件:EosIntegrationTest.java   
private void runSimpleCopyTest(final int numberOfRestarts,
                               final String inputTopic,
                               final String throughTopic,
                               final String outputTopic) throws Exception {
    final KStreamBuilder builder = new KStreamBuilder();
    final KStream<Long, Long> input = builder.stream(inputTopic);
    KStream<Long, Long> output = input;
    if (throughTopic != null) {
        output = input.through(throughTopic);
    }
    output.to(outputTopic);

    for (int i = 0; i < numberOfRestarts; ++i) {
        final long factor = i;
        final KafkaStreams streams = new KafkaStreams(
            builder,
            StreamsTestUtils.getStreamsConfig(
                applicationId,
                CLUSTER.bootstrapServers(),
                Serdes.LongSerde.class.getName(),
                Serdes.LongSerde.class.getName(),
                new Properties() {
                    {
                        put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
                        put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
                    }
                }));

        try {
            streams.start();

            final List<KeyValue<Long, Long>> inputData = prepareData(factor * 100, factor * 100 + 10L, 0L, 1L);

            IntegrationTestUtils.produceKeyValuesSynchronously(
                inputTopic,
                inputData,
                TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class),
                CLUSTER.time
            );

            final List<KeyValue<Long, Long>> committedRecords
                = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
                TestUtils.consumerConfig(
                    CLUSTER.bootstrapServers(),
                    CONSUMER_GROUP_ID,
                    LongDeserializer.class,
                    LongDeserializer.class,
                    new Properties() {
                        {
                            put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
                        }
                    }),
                inputTopic,
                inputData.size()
            );

            checkResultPerKey(committedRecords, inputData);
        } finally {
            streams.close();
        }
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:EosIntegrationTest.java   
@Test
public void shouldBeAbleToPerformMultipleTransactions() throws Exception {
    final KStreamBuilder builder = new KStreamBuilder();
    builder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC);

    final KafkaStreams streams = new KafkaStreams(
        builder,
        StreamsTestUtils.getStreamsConfig(
            applicationId,
            CLUSTER.bootstrapServers(),
            Serdes.LongSerde.class.getName(),
            Serdes.LongSerde.class.getName(),
            new Properties() {
                {
                    put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
                }
            }));

    try {
        streams.start();

        final List<KeyValue<Long, Long>> firstBurstOfData = prepareData(0L, 5L, 0L);
        final List<KeyValue<Long, Long>> secondBurstOfData = prepareData(5L, 8L, 0L);

        IntegrationTestUtils.produceKeyValuesSynchronously(
            SINGLE_PARTITION_INPUT_TOPIC,
            firstBurstOfData,
            TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class),
            CLUSTER.time
        );

        final List<KeyValue<Long, Long>> firstCommittedRecords
            = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
            TestUtils.consumerConfig(
                CLUSTER.bootstrapServers(),
                CONSUMER_GROUP_ID,
                LongDeserializer.class,
                LongDeserializer.class,
                new Properties() {
                    {
                        put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
                    }
                }),
            SINGLE_PARTITION_OUTPUT_TOPIC,
            firstBurstOfData.size()
        );

        assertThat(firstCommittedRecords, equalTo(firstBurstOfData));

        IntegrationTestUtils.produceKeyValuesSynchronously(
            SINGLE_PARTITION_INPUT_TOPIC,
            secondBurstOfData,
            TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class),
            CLUSTER.time
        );

        final List<KeyValue<Long, Long>> secondCommittedRecords
            = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
            TestUtils.consumerConfig(
                CLUSTER.bootstrapServers(),
                CONSUMER_GROUP_ID,
                LongDeserializer.class,
                LongDeserializer.class,
                new Properties() {
                    {
                        put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
                    }
                }),
            SINGLE_PARTITION_OUTPUT_TOPIC,
            secondBurstOfData.size()
        );

        assertThat(secondCommittedRecords, equalTo(secondBurstOfData));
    } finally {
        streams.close();
    }
}
项目:spring-cloud-stream-binder-kafka    文件:KafkaBinderAutoConfigurationPropertiesTest.java   
@Test
public void testKafkaBinderConfigurationWithKafkaProperties() throws Exception {
    assertNotNull(this.kafkaMessageChannelBinder);
    ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(
            new KafkaProducerProperties());
    Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory",
            String.class, ExtendedProducerProperties.class);
    getProducerFactoryMethod.setAccessible(true);
    DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod
            .invoke(this.kafkaMessageChannelBinder, "foo", producerProperties);
    Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs",
            Map.class);
    ReflectionUtils.makeAccessible(producerFactoryConfigField);
    Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField,
            producerFactory);
    assertTrue(producerConfigs.get("batch.size").equals(10));
    assertTrue(producerConfigs.get("key.serializer").equals(LongSerializer.class));
    assertTrue(producerConfigs.get("key.deserializer") == null);
    assertTrue(producerConfigs.get("value.serializer").equals(LongSerializer.class));
    assertTrue(producerConfigs.get("value.deserializer") == null);
    assertTrue(producerConfigs.get("compression.type").equals("snappy"));
    List<String> bootstrapServers = new ArrayList<>();
    bootstrapServers.add("10.98.09.199:9092");
    bootstrapServers.add("10.98.09.196:9092");
    assertTrue((((List<String>) producerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers)));
    Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod(
            "createKafkaConsumerFactory", boolean.class, String.class, ExtendedConsumerProperties.class);
    createKafkaConsumerFactoryMethod.setAccessible(true);
    ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>(
            new KafkaConsumerProperties());
    DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod
            .invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties);
    Field consumerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class, "configs",
            Map.class);
    ReflectionUtils.makeAccessible(consumerFactoryConfigField);
    Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField,
            consumerFactory);
    assertTrue(consumerConfigs.get("key.deserializer").equals(LongDeserializer.class));
    assertTrue(consumerConfigs.get("key.serializer") == null);
    assertTrue(consumerConfigs.get("value.deserializer").equals(LongDeserializer.class));
    assertTrue(consumerConfigs.get("value.serialized") == null);
    assertTrue(consumerConfigs.get("group.id").equals("groupIdFromBootConfig"));
    assertTrue(consumerConfigs.get("auto.offset.reset").equals("earliest"));
    assertTrue((((List<String>) consumerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers)));
}
项目:beam    文件:KafkaIOTest.java   
@Test
public void testUnboundedSourceCheckpointMarkWithEmptyPartitions() throws Exception {
  // Similar to testUnboundedSourceCheckpointMark(), but verifies that source resumes
  // properly from empty partitions, without missing messages added since checkpoint.

  // Initialize consumer with fewer elements than number of partitions so that some are empty.
  int initialNumElements = 5;
  UnboundedSource<KafkaRecord<Integer, Long>, KafkaCheckpointMark> source =
      mkKafkaReadTransform(initialNumElements, new ValueAsTimestampFn())
          .makeSource()
          .split(1, PipelineOptionsFactory.create())
          .get(0);

  UnboundedReader<KafkaRecord<Integer, Long>> reader = source.createReader(null, null);

  for (int l = 0; l < initialNumElements; ++l) {
    advanceOnce(reader, l > 0);
  }

  // Checkpoint and restart, and confirm that the source continues correctly.
  KafkaCheckpointMark mark = CoderUtils.clone(
      source.getCheckpointMarkCoder(), (KafkaCheckpointMark) reader.getCheckpointMark());

  // Create another source with MockConsumer with OffsetResetStrategy.LATEST. This insures that
  // the reader need to explicitly need to seek to first offset for partitions that were empty.

  int numElements = 100; // all the 20 partitions will have elements
  List<String> topics = ImmutableList.of("topic_a", "topic_b");

  source = KafkaIO.<Integer, Long>read()
      .withBootstrapServers("none")
      .withTopics(topics)
      .withConsumerFactoryFn(new ConsumerFactoryFn(
          topics, 10, numElements, OffsetResetStrategy.LATEST))
      .withKeyDeserializer(IntegerDeserializer.class)
      .withValueDeserializer(LongDeserializer.class)
      .withMaxNumRecords(numElements)
      .withTimestampFn(new ValueAsTimestampFn())
      .makeSource()
      .split(1, PipelineOptionsFactory.create())
      .get(0);

  reader = source.createReader(null, mark);

  // Verify in any order. As the partitions are unevenly read, the returned records are not in a
  // simple order. Note that testUnboundedSourceCheckpointMark() verifies round-robin oder.

  List<Long> expected = new ArrayList<>();
  List<Long> actual = new ArrayList<>();
  for (long i = initialNumElements; i < numElements; i++) {
    advanceOnce(reader, i > initialNumElements);
    expected.add(i);
    actual.add(reader.getCurrent().getKV().getValue());
  }
  assertThat(actual, IsIterableContainingInAnyOrder.containsInAnyOrder(expected.toArray()));
}