Java 类org.apache.kafka.common.serialization.LongSerializer 实例源码

项目:talk-kafka-messaging-logs    文件:ProduceConsumeLongByteArrayRecord.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    Producer<Long, byte[]> producer = new KafkaProducer<>(properties);

    LongStream.rangeClosed(1, 100).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC, //topic
                            number, //key
                            String.format("record-%s", number.toString()).getBytes())) //value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamRepartitionJoinTest.java   
private void produceToStreamOne()
    throws ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronously(
        streamOneInput,
        Arrays.asList(
            new KeyValue<>(10L, 1),
            new KeyValue<>(5L, 2),
            new KeyValue<>(12L, 3),
            new KeyValue<>(15L, 4),
            new KeyValue<>(20L, 5),
            new KeyValue<Long, Integer>(70L, null)), // nulls should be filtered
        TestUtils.producerConfig(
            CLUSTER.bootstrapServers(),
            LongSerializer.class,
            IntegerSerializer.class,
            new Properties()),
        mockTime);
}
项目:kafka-0.11.0.0-src-with-comment    文件:GlobalKTableIntegrationTest.java   
private void produceTopicValues(final String topic) throws java.util.concurrent.ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronously(
            topic,
            Arrays.asList(
                    new KeyValue<>("a", 1L),
                    new KeyValue<>("b", 2L),
                    new KeyValue<>("c", 3L),
                    new KeyValue<>("d", 4L),
                    new KeyValue<>("e", 5L)),
            TestUtils.producerConfig(
                    CLUSTER.bootstrapServers(),
                    StringSerializer.class,
                    LongSerializer.class,
                    new Properties()),
            mockTime);
}
项目:kafka-0.11.0.0-src-with-comment    文件:GlobalKTableIntegrationTest.java   
private void produceGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronously(
            globalOne,
            Arrays.asList(
                    new KeyValue<>(1L, "F"),
                    new KeyValue<>(2L, "G"),
                    new KeyValue<>(3L, "H"),
                    new KeyValue<>(4L, "I"),
                    new KeyValue<>(5L, "J")),
            TestUtils.producerConfig(
                    CLUSTER.bootstrapServers(),
                    LongSerializer.class,
                    StringSerializer.class,
                    new Properties()),
            mockTime);
}
项目:kafka-0.11.0.0-src-with-comment    文件:JoinIntegrationTest.java   
@BeforeClass
public static void setupConfigsAndUtils() throws Exception {
    PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
    PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
    PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
    PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer");
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
    STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
    STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
    STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
}
项目:kafka-0.11.0.0-src-with-comment    文件:ResetIntegrationTest.java   
private void prepareInputData() throws Exception {
    CLUSTER.deleteAndRecreateTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN);

    final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, StringSerializer.class);

    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "aaa")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "bbb")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ccc")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "ddd")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "eee")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(10);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "fff")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(1);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "ggg")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(1);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "hhh")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(1);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(0L, "iii")), producerConfig, mockTime.milliseconds());
    mockTime.sleep(1);
    IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(INPUT_TOPIC, Collections.singleton(new KeyValue<>(1L, "jjj")), producerConfig, mockTime.milliseconds());
}
项目:big-data-benchmark    文件:RealTimeTradeProducer.java   
private RealTimeTradeProducer(int index, String broker, String topic, int tradesPerSecond, int keysFrom, int keysTo) throws IOException,
        URISyntaxException {
    if (tradesPerSecond <= 0) {
        throw new RuntimeException("tradesPerSecond=" + tradesPerSecond);
    }
    this.index = index;
    this.topic = topic;
    this.tradesPerSecond = tradesPerSecond;
    tickers = new String[keysTo - keysFrom];
    Arrays.setAll(tickers, i -> "T-" + Integer.toString(i + keysFrom));
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", broker);
    props.setProperty("key.serializer", LongSerializer.class.getName());
    props.setProperty("value.serializer", TradeSerializer.class.getName());
    this.producer = new KafkaProducer<>(props);
}
项目:beam    文件:KafkaIOTest.java   
@Test
public void testSinkDisplayData() {
  try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {
    KafkaIO.Write<Integer, Long> write = KafkaIO.<Integer, Long>write()
      .withBootstrapServers("myServerA:9092,myServerB:9092")
      .withTopic("myTopic")
      .withValueSerializer(LongSerializer.class)
      .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey));

    DisplayData displayData = DisplayData.from(write);

    assertThat(displayData, hasDisplayItem("topic", "myTopic"));
    assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092"));
    assertThat(displayData, hasDisplayItem("retries", 3));
  }
}
项目:beam    文件:KafkaIOTest.java   
MockProducerWrapper() {
  producerKey = String.valueOf(ThreadLocalRandom.current().nextLong());
  mockProducer = new MockProducer<Integer, Long>(
    false, // disable synchronous completion of send. see ProducerSendCompletionThread below.
    new IntegerSerializer(),
    new LongSerializer()) {

    // override flush() so that it does not complete all the waiting sends, giving a chance to
    // ProducerCompletionThread to inject errors.

    @Override
    public void flush() {
      while (completeNext()) {
        // there are some uncompleted records. let the completion thread handle them.
        try {
          Thread.sleep(10);
        } catch (InterruptedException e) {
          // ok to retry.
        }
      }
    }
  };

  // Add the producer to the global map so that producer factory function can access it.
  assertNull(MOCK_PRODUCER_MAP.putIfAbsent(producerKey, mockProducer));
}
项目:kafka-0.11.0.0-src-with-comment    文件:GlobalKTableIntegrationTest.java   
private void produceInitialGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
    IntegrationTestUtils.produceKeyValuesSynchronously(
            globalOne,
            Arrays.asList(
                    new KeyValue<>(1L, "A"),
                    new KeyValue<>(2L, "B"),
                    new KeyValue<>(3L, "C"),
                    new KeyValue<>(4L, "D")),
            TestUtils.producerConfig(
                    CLUSTER.bootstrapServers(),
                    LongSerializer.class,
                    StringSerializer.class,
                    new Properties()),
            mockTime);
}
项目:kafka-0.11.0.0-src-with-comment    文件:EosIntegrationTest.java   
private void writeInputData(final List<KeyValue<Long, Long>> records) throws Exception {
    IntegrationTestUtils.produceKeyValuesSynchronously(
        MULTI_PARTITION_INPUT_TOPIC,
        records,
        TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class),
        CLUSTER.time
    );
}
项目:talk-observing-distributed-systems    文件:WorkerServiceApplication.java   
public void run(Configuration configuration, Environment environment) throws Exception {
  final CollectorRegistry collectorRegistry = new CollectorRegistry();
  collectorRegistry.register(new DropwizardExports(environment.metrics()));
  environment.admin()
      .addServlet("metrics", new MetricsServlet(collectorRegistry))
      .addMapping("/metrics");

  final PrometheusMetricsReporter reporter = PrometheusMetricsReporter.newMetricsReporter()
      .withCollectorRegistry(collectorRegistry)
      .withConstLabel("service", getName())
      .build();

  final Tracer tracer = getTracer();
  final Tracer metricsTracer = io.opentracing.contrib.metrics.Metrics.decorate(tracer, reporter);
  GlobalTracer.register(metricsTracer);

  final DynamicFeature tracing = new ServerTracingDynamicFeature.Builder(metricsTracer).build();
  environment.jersey().register(tracing);

  final Properties producerConfigs = new Properties();
  producerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "tweets-kafka:9092");
  producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
  producerConfigs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
  final KafkaProducer<Long, String> kafkaProducer =
      new KafkaProducer<>(producerConfigs, new LongSerializer(), new StringSerializer());
  final Producer<Long, String> tracingKafkaProducer =
      new TracingKafkaProducer<>(kafkaProducer, metricsTracer);
  final ObjectMapper objectMapper = environment.getObjectMapper();
  final TweetEventRepository tweetRepository = new KafkaTweetEventRepository(tracingKafkaProducer, objectMapper);
  final TweetsService tweetsService = new TweetsService(tweetRepository);
  final TweetsResource tweetsResource = new TweetsResource(tweetsService);
  environment.jersey().register(tweetsResource);
}
项目:big-data-benchmark    文件:TradeProducer.java   
private TradeProducer(String broker) {
    loadTickers();
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", broker);
    props.setProperty("key.serializer", LongSerializer.class.getName());
    props.setProperty("value.serializer", TradeSerializer.class.getName());
    producer = new KafkaProducer<>(props);
}
项目:mdw    文件:KafkaAdapter.java   
private static Producer<Object, Object> createProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "localhost:9092,localhost:9093,localhost:9094");
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaMDWProducer");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            LongSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class.getName());
    return new KafkaProducer<>(props);
}
项目:beam    文件:KafkaIOTest.java   
@Test
public void testSink() throws Exception {
  // Simply read from kafka source and write to kafka sink. Then verify the records
  // are correctly published to mock kafka producer.

  int numElements = 1000;

  try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {

    ProducerSendCompletionThread completionThread =
      new ProducerSendCompletionThread(producerWrapper.mockProducer).start();

    String topic = "test";

    p
      .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
          .withoutMetadata())
      .apply(KafkaIO.<Integer, Long>write()
          .withBootstrapServers("none")
          .withTopic(topic)
          .withKeySerializer(IntegerSerializer.class)
          .withValueSerializer(LongSerializer.class)
          .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey)));

    p.run();

    completionThread.shutdown();

    verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, false);
  }
}
项目:beam    文件:KafkaIOTest.java   
@Test
public void testValuesSink() throws Exception {
  // similar to testSink(), but use values()' interface.

  int numElements = 1000;

  try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {

    ProducerSendCompletionThread completionThread =
      new ProducerSendCompletionThread(producerWrapper.mockProducer).start();

    String topic = "test";

    p
      .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
          .withoutMetadata())
      .apply(Values.<Long>create()) // there are no keys
      .apply(KafkaIO.<Integer, Long>write()
          .withBootstrapServers("none")
          .withTopic(topic)
          .withValueSerializer(LongSerializer.class)
          .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey))
          .values());

    p.run();

    completionThread.shutdown();

    verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, true);
  }
}
项目:kafka-0.11.0.0-src-with-comment    文件:EosIntegrationTest.java   
private void runSimpleCopyTest(final int numberOfRestarts,
                               final String inputTopic,
                               final String throughTopic,
                               final String outputTopic) throws Exception {
    final KStreamBuilder builder = new KStreamBuilder();
    final KStream<Long, Long> input = builder.stream(inputTopic);
    KStream<Long, Long> output = input;
    if (throughTopic != null) {
        output = input.through(throughTopic);
    }
    output.to(outputTopic);

    for (int i = 0; i < numberOfRestarts; ++i) {
        final long factor = i;
        final KafkaStreams streams = new KafkaStreams(
            builder,
            StreamsTestUtils.getStreamsConfig(
                applicationId,
                CLUSTER.bootstrapServers(),
                Serdes.LongSerde.class.getName(),
                Serdes.LongSerde.class.getName(),
                new Properties() {
                    {
                        put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
                        put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
                    }
                }));

        try {
            streams.start();

            final List<KeyValue<Long, Long>> inputData = prepareData(factor * 100, factor * 100 + 10L, 0L, 1L);

            IntegrationTestUtils.produceKeyValuesSynchronously(
                inputTopic,
                inputData,
                TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class),
                CLUSTER.time
            );

            final List<KeyValue<Long, Long>> committedRecords
                = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
                TestUtils.consumerConfig(
                    CLUSTER.bootstrapServers(),
                    CONSUMER_GROUP_ID,
                    LongDeserializer.class,
                    LongDeserializer.class,
                    new Properties() {
                        {
                            put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
                        }
                    }),
                inputTopic,
                inputData.size()
            );

            checkResultPerKey(committedRecords, inputData);
        } finally {
            streams.close();
        }
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:EosIntegrationTest.java   
@Test
public void shouldBeAbleToPerformMultipleTransactions() throws Exception {
    final KStreamBuilder builder = new KStreamBuilder();
    builder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC);

    final KafkaStreams streams = new KafkaStreams(
        builder,
        StreamsTestUtils.getStreamsConfig(
            applicationId,
            CLUSTER.bootstrapServers(),
            Serdes.LongSerde.class.getName(),
            Serdes.LongSerde.class.getName(),
            new Properties() {
                {
                    put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
                }
            }));

    try {
        streams.start();

        final List<KeyValue<Long, Long>> firstBurstOfData = prepareData(0L, 5L, 0L);
        final List<KeyValue<Long, Long>> secondBurstOfData = prepareData(5L, 8L, 0L);

        IntegrationTestUtils.produceKeyValuesSynchronously(
            SINGLE_PARTITION_INPUT_TOPIC,
            firstBurstOfData,
            TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class),
            CLUSTER.time
        );

        final List<KeyValue<Long, Long>> firstCommittedRecords
            = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
            TestUtils.consumerConfig(
                CLUSTER.bootstrapServers(),
                CONSUMER_GROUP_ID,
                LongDeserializer.class,
                LongDeserializer.class,
                new Properties() {
                    {
                        put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
                    }
                }),
            SINGLE_PARTITION_OUTPUT_TOPIC,
            firstBurstOfData.size()
        );

        assertThat(firstCommittedRecords, equalTo(firstBurstOfData));

        IntegrationTestUtils.produceKeyValuesSynchronously(
            SINGLE_PARTITION_INPUT_TOPIC,
            secondBurstOfData,
            TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, LongSerializer.class),
            CLUSTER.time
        );

        final List<KeyValue<Long, Long>> secondCommittedRecords
            = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
            TestUtils.consumerConfig(
                CLUSTER.bootstrapServers(),
                CONSUMER_GROUP_ID,
                LongDeserializer.class,
                LongDeserializer.class,
                new Properties() {
                    {
                        put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
                    }
                }),
            SINGLE_PARTITION_OUTPUT_TOPIC,
            secondBurstOfData.size()
        );

        assertThat(secondCommittedRecords, equalTo(secondBurstOfData));
    } finally {
        streams.close();
    }
}
项目:spring-cloud-stream-binder-kafka    文件:KafkaBinderAutoConfigurationPropertiesTest.java   
@Test
public void testKafkaBinderConfigurationWithKafkaProperties() throws Exception {
    assertNotNull(this.kafkaMessageChannelBinder);
    ExtendedProducerProperties<KafkaProducerProperties> producerProperties = new ExtendedProducerProperties<>(
            new KafkaProducerProperties());
    Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory",
            String.class, ExtendedProducerProperties.class);
    getProducerFactoryMethod.setAccessible(true);
    DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod
            .invoke(this.kafkaMessageChannelBinder, "foo", producerProperties);
    Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs",
            Map.class);
    ReflectionUtils.makeAccessible(producerFactoryConfigField);
    Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField,
            producerFactory);
    assertTrue(producerConfigs.get("batch.size").equals(10));
    assertTrue(producerConfigs.get("key.serializer").equals(LongSerializer.class));
    assertTrue(producerConfigs.get("key.deserializer") == null);
    assertTrue(producerConfigs.get("value.serializer").equals(LongSerializer.class));
    assertTrue(producerConfigs.get("value.deserializer") == null);
    assertTrue(producerConfigs.get("compression.type").equals("snappy"));
    List<String> bootstrapServers = new ArrayList<>();
    bootstrapServers.add("10.98.09.199:9092");
    bootstrapServers.add("10.98.09.196:9092");
    assertTrue((((List<String>) producerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers)));
    Method createKafkaConsumerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod(
            "createKafkaConsumerFactory", boolean.class, String.class, ExtendedConsumerProperties.class);
    createKafkaConsumerFactoryMethod.setAccessible(true);
    ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<>(
            new KafkaConsumerProperties());
    DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) createKafkaConsumerFactoryMethod
            .invoke(this.kafkaMessageChannelBinder, true, "test", consumerProperties);
    Field consumerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class, "configs",
            Map.class);
    ReflectionUtils.makeAccessible(consumerFactoryConfigField);
    Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField,
            consumerFactory);
    assertTrue(consumerConfigs.get("key.deserializer").equals(LongDeserializer.class));
    assertTrue(consumerConfigs.get("key.serializer") == null);
    assertTrue(consumerConfigs.get("value.deserializer").equals(LongDeserializer.class));
    assertTrue(consumerConfigs.get("value.serialized") == null);
    assertTrue(consumerConfigs.get("group.id").equals("groupIdFromBootConfig"));
    assertTrue(consumerConfigs.get("auto.offset.reset").equals("earliest"));
    assertTrue((((List<String>) consumerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers)));
}
项目:kafka-reactive-streams    文件:KafkaSubscriberBlackboxTest.java   
public KafkaSubscriberBlackboxTest() {
    super(new TestEnvironment());
    mockProducer = new MockProducer<Long, Long>(true, new LongSerializer(), new LongSerializer());
}
项目:kafka-reactive-streams    文件:KafkaSubscriberWhiteboxTest.java   
public KafkaSubscriberWhiteboxTest() {
    super(new TestEnvironment());
    mockProducer = new MockProducer<Long, Long>(true, new LongSerializer(), new LongSerializer());
}
项目:beam    文件:KafkaIOTest.java   
@Test
public void testEOSink() {
  // testSink() with EOS enabled.
  // This does not actually inject retries in a stage to test exactly-once-semantics.
  // It mainly exercises the code in normal flow without retries.
  // Ideally we should test EOS Sink by triggering replays of a messages between stages.
  // It is not feasible to test such retries with direct runner. When DoFnTester supports
  // state, we can test KafkaEOWriter DoFn directly to ensure it handles retries correctly.

  if (!ProducerSpEL.supportsTransactions()) {
    LOG.warn("testEOSink() is disabled as Kafka client version does not support transactions.");
    return;
  }

  int numElements = 1000;

  try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {

    ProducerSendCompletionThread completionThread =
      new ProducerSendCompletionThread(producerWrapper.mockProducer).start();

    String topic = "test";

    p
      .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
               .withoutMetadata())
      .apply(KafkaIO.<Integer, Long>write()
               .withBootstrapServers("none")
               .withTopic(topic)
               .withKeySerializer(IntegerSerializer.class)
               .withValueSerializer(LongSerializer.class)
               .withEOS(1, "test")
               .withConsumerFactoryFn(new ConsumerFactoryFn(
                 Lists.newArrayList(topic), 10, 10, OffsetResetStrategy.EARLIEST))
               .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey)));

    p.run();

    completionThread.shutdown();

    verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, false);
  }
}
项目:beam    文件:KafkaIOTest.java   
@Test
public void testSinkWithSendErrors() throws Throwable {
  // similar to testSink(), except that up to 10 of the send calls to producer will fail
  // asynchronously.

  // TODO: Ideally we want the pipeline to run to completion by retrying bundles that fail.
  // We limit the number of errors injected to 10 below. This would reflect a real streaming
  // pipeline. But I am sure how to achieve that. For now expect an exception:

  thrown.expect(InjectedErrorException.class);
  thrown.expectMessage("Injected Error #1");

  int numElements = 1000;

  try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {

    ProducerSendCompletionThread completionThreadWithErrors =
      new ProducerSendCompletionThread(producerWrapper.mockProducer, 10, 100).start();

    String topic = "test";

    p
      .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
          .withoutMetadata())
      .apply(KafkaIO.<Integer, Long>write()
          .withBootstrapServers("none")
          .withTopic(topic)
          .withKeySerializer(IntegerSerializer.class)
          .withValueSerializer(LongSerializer.class)
          .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey)));

    try {
      p.run();
    } catch (PipelineExecutionException e) {
      // throwing inner exception helps assert that first exception is thrown from the Sink
      throw e.getCause().getCause();
    } finally {
      completionThreadWithErrors.shutdown();
    }
  }
}
项目:beam    文件:KafkaIOTest.java   
@Test
public void testSinkMetrics() throws Exception {
  // Simply read from kafka source and write to kafka sink. Then verify the metrics are reported.

  int numElements = 1000;

  try (MockProducerWrapper producerWrapper = new MockProducerWrapper()) {

    ProducerSendCompletionThread completionThread =
      new ProducerSendCompletionThread(producerWrapper.mockProducer).start();

    String topic = "test";

    p
        .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
            .withoutMetadata())
        .apply("writeToKafka", KafkaIO.<Integer, Long>write()
            .withBootstrapServers("none")
            .withTopic(topic)
            .withKeySerializer(IntegerSerializer.class)
            .withValueSerializer(LongSerializer.class)
            .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey)));

    PipelineResult result = p.run();

    MetricName elementsWritten = SinkMetrics.elementsWritten().getName();

    MetricQueryResults metrics = result.metrics().queryMetrics(
        MetricsFilter.builder()
            .addNameFilter(MetricNameFilter.inNamespace(elementsWritten.namespace()))
            .build());


    assertThat(metrics.counters(), hasItem(
        attemptedMetricsResult(
            elementsWritten.namespace(),
            elementsWritten.name(),
            "writeToKafka",
            1000L)));

    completionThread.shutdown();
  }
}