Java 类org.apache.kafka.common.serialization.StringDeserializer 实例源码

项目:amigo-chatbot    文件:ConsumerLoop.java   
public ConsumerLoop(int id,
                    String groupId,
                    List<String> topics,
                    DBClient dbClient) {
    this.id = id;
    this.topics = topics;
    Properties props = new Properties();

    String kafkaHostName = System.getenv("KAFKA_HOST_NAME");
    log.info("Kafka host: " + kafkaHostName);
    props.put("bootstrap.servers", kafkaHostName + ":9092");
    props.put("group.id", groupId);
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    this.consumer = new KafkaConsumer<>(props);
    this.dbClient = dbClient;
}
项目:ja-micro    文件:TopicVerification.java   
public boolean verifyTopicsExist(String kafkaBrokers, Set<String> requiredTopics,
                                 boolean checkPartitionCounts) {
    Properties props = new Properties();
    props.put("bootstrap.servers", kafkaBrokers);
    props.put("group.id", UUID.randomUUID().toString());
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    KafkaConsumer consumer = new KafkaConsumer(props);
    try {
        @SuppressWarnings("unchecked")
        Map<String, List<PartitionInfo>> topics = consumer.listTopics();

        Set<Integer> partitionCount = new HashSet<>();
        for (String requiredTopic : requiredTopics) {
            List<PartitionInfo> partitions = topics.get(requiredTopic);
            if (partitions == null) {
                logger.info("Required kafka topic {} not present", requiredTopic);
                return false;
            }
            partitionCount.add(partitions.size());
        }
        if (checkPartitionCounts && partitionCount.size() > 1) {
            logger.warn("Partition count mismatch in topics {}",
                    Arrays.toString(requiredTopics.toArray()));
            return false;
        }
        return true;
    } finally {
        consumer.close();
    }
}
项目:ja-micro    文件:Consumer.java   
Consumer(Topic topic, String consumerGroupId, Properties props, PartitionProcessorFactory processorFactory) {
    this.topic = topic;
    this.consumerGroupId = consumerGroupId;

    // Mandatory settings, not changeable
    props.put("group.id", consumerGroupId);
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", ByteArrayDeserializer.class.getName());

    kafka = new KafkaConsumer<>(props);
    partitions = new AssignedPartitions(processorFactory);

    long now = System.currentTimeMillis();

    // start it
    consumerLoopExecutor.execute(new ConsumerLoop());
}
项目:spring-tutorial    文件:StudentConsumerConfig.java   
/**
 * reads the config info from yml file
 * 
 * @return
 */
@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> propsMap = new HashMap<>();
    propsMap.put("bootstrap.servers",
            env.getProperty("kafka.broker"));
    propsMap.put("enable.auto.commit",
            env.getProperty("enable.auto.commit"));
    propsMap.put("auto.commit.interval.ms",
            env.getProperty("auto.commit.interval.ms"));
    propsMap.put("key.deserializer",
            StringDeserializer.class);
    propsMap.put("value.deserializer",
            JsonDeserializer.class);
    propsMap.put("group.id",
            env.getProperty("group.id"));
    propsMap.put("auto.offset.reset",
            env.getProperty("kafka.auto.offset.reset"));
    return propsMap;

}
项目:kafka-webview    文件:DataLoaderConfig.java   
/**
 * Creates default message formats.
 */
private void createDefaultMessageFormats() {
    final Map<String, String> defaultFormats = new HashMap<>();
    defaultFormats.put("Short", ShortDeserializer.class.getName());
    defaultFormats.put("ByteArray", ByteArrayDeserializer.class.getName());
    defaultFormats.put("Bytes", BytesDeserializer.class.getName());
    defaultFormats.put("Double", DoubleDeserializer.class.getName());
    defaultFormats.put("Float", FloatDeserializer.class.getName());
    defaultFormats.put("Integer", IntegerDeserializer.class.getName());
    defaultFormats.put("Long", LongDeserializer.class.getName());
    defaultFormats.put("String", StringDeserializer.class.getName());

    // Create if needed.
    for (final Map.Entry<String, String> entry : defaultFormats.entrySet()) {
        MessageFormat messageFormat = messageFormatRepository.findByName(entry.getKey());
        if (messageFormat == null) {
            messageFormat = new MessageFormat();
        }
        messageFormat.setName(entry.getKey());
        messageFormat.setClasspath(entry.getValue());
        messageFormat.setJar("n/a");
        messageFormat.setDefaultFormat(true);
        messageFormatRepository.save(messageFormat);
    }
}
项目:azeroth    文件:KafkaConsumerCommand.java   
private KafkaConsumer<String, Serializable> getConsumer(String groupId) {

        KafkaConsumer<String, Serializable> kafkaConsumer = null;
        if ((kafkaConsumer = kafkaConsumers.get(groupId)) != null)
            return kafkaConsumer;

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());

        kafkaConsumer = new KafkaConsumer<String, Serializable>(properties);
        kafkaConsumers.put(groupId, kafkaConsumer);
        return kafkaConsumer;

    }
项目:springboot_cwenao    文件:KafkaConsumerConfig.java   
public ConsumerFactory<String, String> consumerFactory() {

        Map<String, Object> properties = new HashMap<String, Object>();

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        return new DefaultKafkaConsumerFactory<String, String>(properties);

    }
项目:talk-kafka-messaging-logs    文件:ProduceConsumeStringAvroRecord.java   
private static void consumeRecords(String bootstrapServers) {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro-consumer");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

    Consumer<String, byte[]> consumer = new KafkaConsumer<>(props);

    consumer.subscribe(Arrays.asList(TOPIC));

    ConsumerRecords<String, byte[]> records = consumer.poll(10000);

    for (ConsumerRecord<String, byte[]> record : records)
        out.printf(
                "key = %s value = %s%n",
                record.key(),
                UserAvroSerdes.deserialize(record.value()).getName().toString());

    consumer.close();
}
项目:talk-kafka-messaging-logs    文件:ProduceConsumeIntegerStringRecord.java   
private static void consumeRecords(String bootstrapServers) {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "string-consumer");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    Consumer<Integer, String> consumer = new KafkaConsumer<>(props);

    consumer.subscribe(Arrays.asList(TOPIC));

    ConsumerRecords<Integer, String> records = consumer.poll(10000);

    for (ConsumerRecord<Integer, String> record : records)
        out.printf(
                "key = %s value = %s%n",
                record.key(),
                record.value());

    consumer.close();
}
项目:talk-kafka-messaging-logs    文件:ProduceConsumeRecordMetadata.java   
private static void consumeRecords(String bootstrapServers) {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "metadata-consumer");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    Consumer<Integer, String> consumer = new KafkaConsumer<>(props);

    consumer.subscribe(Arrays.asList(TOPIC));

    ConsumerRecords<Integer, String> records = consumer.poll(10000);

    for (ConsumerRecord<Integer, String> record : records) {
        System.out.printf("key = %s value = %s\t", record.key(), record.value());
        System.out.printf("ProducerRecord: topic=>%s partition=>%s offset=>%s timestamp=>%s checksum=>%s",
                record.topic(),
                record.partition(),
                record.offset(),
                FORMATTER.format(Instant.ofEpochMilli(record.timestamp())),
                record.checksum());
        System.out.println();
    }
    consumer.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConfigBackingStore.java   
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) {
    Map<String, Object> producerProps = new HashMap<>();
    producerProps.putAll(config.originals());
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

    Map<String, Object> consumerProps = new HashMap<>();
    consumerProps.putAll(config.originals());
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

    Map<String, Object> adminProps = new HashMap<>(config.originals());
    NewTopic topicDescription = TopicAdmin.defineTopic(topic).
            compacted().
            partitions(1).
            replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)).
            build();

    return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamsFineGrainedAutoResetIntegrationTest.java   
private void commitInvalidOffsets() {
    final KafkaConsumer consumer = new KafkaConsumer(TestUtils.consumerConfig(
        CLUSTER.bootstrapServers(),
        streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG),
        StringDeserializer.class,
        StringDeserializer.class));

    final Map<TopicPartition, OffsetAndMetadata> invalidOffsets = new HashMap<>();
    invalidOffsets.put(new TopicPartition(TOPIC_1_2, 0), new OffsetAndMetadata(5, null));
    invalidOffsets.put(new TopicPartition(TOPIC_2_2, 0), new OffsetAndMetadata(5, null));
    invalidOffsets.put(new TopicPartition(TOPIC_A_2, 0), new OffsetAndMetadata(5, null));
    invalidOffsets.put(new TopicPartition(TOPIC_C_2, 0), new OffsetAndMetadata(5, null));
    invalidOffsets.put(new TopicPartition(TOPIC_Y_2, 0), new OffsetAndMetadata(5, null));
    invalidOffsets.put(new TopicPartition(TOPIC_Z_2, 0), new OffsetAndMetadata(5, null));

    consumer.commitSync(invalidOffsets);

    consumer.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:JoinIntegrationTest.java   
@BeforeClass
public static void setupConfigsAndUtils() throws Exception {
    PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
    PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
    PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
    PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer");
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
    RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
    STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
    STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
    STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
}
项目:kafka-0.11.0.0-src-with-comment    文件:WindowedStreamPartitionerTest.java   
@Test
public void testWindowedDeserializerNoArgConstructors() {
    Map<String, String> props = new HashMap<>();
    // test key[value].deserializer.inner.class takes precedence over serializer.inner.class
    WindowedDeserializer<StringSerializer> windowedDeserializer = new WindowedDeserializer<>();
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
    props.put("key.deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer");
    windowedDeserializer.configure(props, true);
    Deserializer<?> inner = windowedDeserializer.innerDeserializer();
    assertNotNull("Inner deserializer should be not null", inner);
    assertTrue("Inner deserializer type should be StringDeserializer", inner instanceof StringDeserializer);
    // test deserializer.inner.class
    props.put("deserializer.inner.class", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    props.remove("key.deserializer.inner.class");
    props.remove("value.deserializer.inner.class");
    WindowedDeserializer<?> windowedDeserializer1 = new WindowedDeserializer<>();
    windowedDeserializer1.configure(props, false);
    Deserializer<?> inner1 = windowedDeserializer1.innerDeserializer();
    assertNotNull("Inner deserializer should be not null", inner1);
    assertTrue("Inner deserializer type should be ByteArrayDeserializer", inner1 instanceof ByteArrayDeserializer);
}
项目:kafka-0.11.0.0-src-with-comment    文件:BrokerCompatibilityTest.java   
private static void loopUntilRecordReceived(final String kafka, final boolean eosEnabled) {
    final Properties consumerProperties = new Properties();
    consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
    consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "broker-compatibility-consumer");
    consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    if (eosEnabled) {
        consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
    }

    final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
    consumer.subscribe(Collections.singletonList(SINK_TOPIC));

    while (true) {
        final ConsumerRecords<String, String> records = consumer.poll(100);
        for (final ConsumerRecord<String, String> record : records) {
            if (record.key().equals("key") && record.value().equals("value")) {
                consumer.close();
                return;
            }
        }
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConsumerTest.java   
@Test
public void testInterceptorConstructorClose() throws Exception {
    try {
        Properties props = new Properties();
        // test with client ID assigned by KafkaConsumer
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
        props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(
                props, new StringDeserializer(), new StringDeserializer());
        assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
        assertEquals(0, MockConsumerInterceptor.CLOSE_COUNT.get());

        consumer.close();
        assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
        assertEquals(1, MockConsumerInterceptor.CLOSE_COUNT.get());
        // Cluster metadata will only be updated on calling poll.
        Assert.assertNull(MockConsumerInterceptor.CLUSTER_META.get());

    } finally {
        // cleanup since we are using mutable static variables in MockConsumerInterceptor
        MockConsumerInterceptor.resetCounters();
    }
}
项目:fluid    文件:KafkaSinkTest.java   
@Test
public void testSinkWithString() throws InterruptedException {
    KafkaUsage usage = new KafkaUsage();
    String topic = UUID.randomUUID().toString();
    CountDownLatch latch = new CountDownLatch(1);
    List<String> values = new ArrayList<>();
    usage.consumeStrings(topic, 10, 10, TimeUnit.SECONDS,
        latch::countDown,
        (k, v) -> values.contains(v));

    KafkaSink<String> sink = new KafkaSink<>(vertx,
        getKafkaConfig()
            .put("topic", topic)
            .put("value.serializer", StringSerializer.class.getName())
            .put("value.deserializer", StringDeserializer.class.getName())
    );


    Stream<String> stream = new Random().longs(10).mapToObj(Long::toString);
    Source.fromPayloads(stream)
        .onPayload(values::add)
        .to(sink);

  assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue();
}
项目:movie-recommender    文件:SparkModule.java   
@Provides
JavaInputDStream<ConsumerRecord<String, RawRating>> providesKafkaInputStream(JavaStreamingContext streamingContext) {
    Map<String, Object> kafkaParams = new HashedMap();
    kafkaParams.put("bootstrap.servers", "localhost:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", JsonDeserializer.class);
    kafkaParams.put("serializedClass", RawRating.class);
    kafkaParams.put("group.id", "rating_stream");
    kafkaParams.put("auto.offset.reset", "latest");
    kafkaParams.put("enable.auto.commit", false);
    Collection<String> topics = Arrays.asList("topicA", "topicB");

    return KafkaUtils.createDirectStream(
            streamingContext,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, RawRating>Subscribe(topics, kafkaParams)
    );
}
项目:rmap    文件:SimpleKafkaIT.java   
@Test
public void testAutoCommit() throws Exception {
    LOG.info("Start testAutoCommit");
    ContainerProperties containerProps = new ContainerProperties("topic3", "topic4");
    final CountDownLatch latch = new CountDownLatch(4);
    containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
        LOG.info("received: " + message);
        latch.countDown();
    });
    KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps,
            IntegerDeserializer.class, StringDeserializer.class);
    container.setBeanName("testAutoCommit");
    container.start();
    Thread.sleep(5000); // wait a bit for the container to start
    KafkaTemplate<Integer, String> template = createTemplate(IntegerSerializer.class, StringSerializer.class);
    template.setDefaultTopic("topic3");
    template.sendDefault(0, "foo");
    template.sendDefault(2, "bar");
    template.sendDefault(0, "baz");
    template.sendDefault(2, "qux");
    template.flush();
    assertTrue(latch.await(60, TimeUnit.SECONDS));
    container.stop();
    LOG.info("Stop testAutoCommit");
}
项目:ksql    文件:BrokerCompatibilityCheck.java   
public static BrokerCompatibilityCheck create(final Map<String, Object> streamsConfig,
                                              final KafkaTopicClient topicClient) {
  Set<String> topicNames = topicClient.listTopicNames();
  // the offsetsForTime call needs a partition that exists else it can block forever
  if (topicNames.isEmpty()) {
    topicClient.createTopic(KSQL_COMPATIBILITY_CHECK, 1, (short)1);
    topicNames = Utils.mkSet(KSQL_COMPATIBILITY_CHECK);
  }
  final Map<String, Object> consumerConfigs = new StreamsConfig(streamsConfig)
      .getConsumerConfigs(KSQL_COMPATIBILITY_CHECK, "ksql_server");

  // remove this otherwise it will try and instantiate the StreamsPartitionAssignor
  consumerConfigs.remove("partition.assignment.strategy");
  final KafkaConsumer<String, String> consumer
      = new KafkaConsumer<>(consumerConfigs, new StringDeserializer(), new StringDeserializer());
  return new BrokerCompatibilityCheck(consumer,  new TopicPartition(topicNames.iterator().next(), 0));
}
项目:vertx-kafka-client    文件:ConsumerTestBase.java   
@Test
public void testConsume(TestContext ctx) throws Exception {
  final String topicName = "testConsume";
  String consumerId = topicName;
  Async batch = ctx.async();
  AtomicInteger index = new AtomicInteger();
  int numMessages = 1000;
  kafkaCluster.useTo().produceStrings(numMessages, batch::complete,  () ->
      new ProducerRecord<>(topicName, 0, "key-" + index.get(), "value-" + index.getAndIncrement()));
  batch.awaitSuccess(20000);
  Properties config = kafkaCluster.useTo().getConsumerProperties(consumerId, consumerId, OffsetResetStrategy.EARLIEST);
  config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  consumer = createConsumer(vertx, config);
  Async done = ctx.async();
  AtomicInteger count = new AtomicInteger(numMessages);
  consumer.exceptionHandler(ctx::fail);
  consumer.handler(rec -> {
    if (count.decrementAndGet() == 0) {
      done.complete();
    }
  });
  consumer.subscribe(Collections.singleton(topicName));
}
项目:vertx-kafka-client    文件:ConsumerTestBase.java   
@Test
public void testPartitionsFor(TestContext ctx) throws Exception {
  String topicName = "testPartitionsFor";
  String consumerId = topicName;
  kafkaCluster.createTopic(topicName, 2, 1);
  Properties config = kafkaCluster.useTo().getConsumerProperties(consumerId, consumerId, OffsetResetStrategy.EARLIEST);
  config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  Context context = vertx.getOrCreateContext();
  consumer = createConsumer(context, config);

  Async done = ctx.async();

  consumer.partitionsFor(topicName, ar -> {
    if (ar.succeeded()) {
      List<PartitionInfo> partitionInfo = ar.result();
      ctx.assertEquals(2, partitionInfo.size());
    } else {
      ctx.fail();
    }
    done.complete();
  });
}
项目:vertx-kafka-client    文件:ConsumerTestBase.java   
@Test
public void testBatchHandler(TestContext ctx) throws Exception {
  String topicName = "testBatchHandler";
  String consumerId = topicName;
  Async batch1 = ctx.async();
  AtomicInteger index = new AtomicInteger();
  int numMessages = 500;
  kafkaCluster.useTo().produceStrings(numMessages, batch1::complete,  () ->
      new ProducerRecord<>(topicName, 0, "key-" + index.get(), "value-" + index.getAndIncrement()));
  batch1.awaitSuccess(10000);
  Properties config = kafkaCluster.useTo().getConsumerProperties(consumerId, consumerId, OffsetResetStrategy.EARLIEST);
  config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  Context context = vertx.getOrCreateContext();
  consumer = createConsumer(context, config);
  Async batchHandler = ctx.async();
  consumer.batchHandler(records -> {
    ctx.assertEquals(numMessages, records.count());
    batchHandler.complete();
  });
  consumer.exceptionHandler(ctx::fail);
  consumer.handler(rec -> {});
  consumer.subscribe(Collections.singleton(topicName));
}
项目:vertx-kafka-client    文件:ConsumerTestBase.java   
@Test
public void testPollTimeout(TestContext ctx) throws Exception {
  Async async = ctx.async();
  String topicName = "testPollTimeout";
  Properties config = kafkaCluster.useTo().getConsumerProperties(topicName, topicName, OffsetResetStrategy.EARLIEST);
  config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

  io.vertx.kafka.client.common.TopicPartition topicPartition = new io.vertx.kafka.client.common.TopicPartition(topicName, 0);
  KafkaConsumer<Object, Object> consumerWithCustomTimeout = KafkaConsumer.create(vertx, config);

  int pollingTimeout = 1500;
  // Set the polling timeout to 1500 ms (default is 1000)
  consumerWithCustomTimeout.pollTimeout(pollingTimeout);
  // Subscribe to the empty topic (we want the poll() call to timeout!)
  consumerWithCustomTimeout.subscribe(topicName, subscribeRes -> {
    consumerWithCustomTimeout.handler(rec -> {}); // Consumer will now immediately poll once
    long beforeSeek = System.currentTimeMillis();
    consumerWithCustomTimeout.seekToBeginning(topicPartition, seekRes -> {
      long durationWShortTimeout = System.currentTimeMillis() - beforeSeek;
      ctx.assertTrue(durationWShortTimeout >= pollingTimeout, "Operation must take at least as long as the polling timeout");
      consumerWithCustomTimeout.close();
      async.countDown();
    });
  });
}
项目:vertx-kafka-client    文件:CleanupTest.java   
@Test
// Regression test for ISS-73: undeployment of a verticle with unassigned consumer fails
public void testUndeployUnassignedConsumer(TestContext ctx) throws Exception {
  Properties config = kafkaCluster.useTo().getConsumerProperties("testUndeployUnassignedConsumer_consumer",
    "testUndeployUnassignedConsumer_consumer", OffsetResetStrategy.EARLIEST);
  config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

  Async async = ctx.async(1);
  vertx.deployVerticle(new AbstractVerticle() {
    @Override
    public void start() throws Exception {
      KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);
      vertx.setTimer(20, record -> {
        // Very rarely, this throws a AlreadyUndedeployed error
        vertx.undeploy(context.deploymentID(), ctx.asyncAssertSuccess(ar -> {
          async.complete();
        }));
      });
    }
  }, ctx.asyncAssertSuccess());
}
项目:li-apache-kafka-clients    文件:AbstractKafkaClientsIntegrationTestHarness.java   
protected Properties getConsumerProperties(Properties overrides) {
  Properties result = new Properties();

  //populate defaults
  result.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
  result.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testingConsumer");
  result.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
  result.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());

  setSecurityConfigs(result, "consumer");

  result.setProperty(LiKafkaConsumerConfig.MESSAGE_ASSEMBLER_BUFFER_CAPACITY_CONFIG, "300000");
  result.setProperty(LiKafkaConsumerConfig.MESSAGE_ASSEMBLER_EXPIRATION_OFFSET_GAP_CONFIG, "10000");
  result.setProperty(LiKafkaConsumerConfig.EXCEPTION_ON_MESSAGE_DROPPED_CONFIG, "true");
  result.setProperty(LiKafkaConsumerConfig.MAX_TRACKED_MESSAGES_PER_PARTITION_CONFIG, "10000");
  result.setProperty(LiKafkaConsumerConfig.SEGMENT_DESERIALIZER_CLASS_CONFIG, DefaultSegmentDeserializer.class.getCanonicalName());

  //apply overrides
  if (overrides != null) {
    result.putAll(overrides);
  }

  return result;
}
项目:li-apache-kafka-clients    文件:SerializerDeserializerTest.java   
@Test
public void testSerde() {
  Serializer<String> stringSerializer = new StringSerializer();
  Deserializer<String> stringDeserializer = new StringDeserializer();
  Serializer<LargeMessageSegment> segmentSerializer = new DefaultSegmentSerializer();
  Deserializer<LargeMessageSegment> segmentDeserializer = new DefaultSegmentDeserializer();

  String s = LiKafkaClientsTestUtils.getRandomString(100);
  assertEquals(s.length(), 100);
  byte[] stringBytes = stringSerializer.serialize("topic", s);
  assertEquals(stringBytes.length, 100);
  LargeMessageSegment segment =
      new LargeMessageSegment(LiKafkaClientsUtils.randomUUID(), 0, 2, stringBytes.length, ByteBuffer.wrap(stringBytes));
  // String bytes + segment header
  byte[] serializedSegment = segmentSerializer.serialize("topic", segment);
  assertEquals(serializedSegment.length, 1 + stringBytes.length + LargeMessageSegment.SEGMENT_INFO_OVERHEAD + 4);

  LargeMessageSegment deserializedSegment = segmentDeserializer.deserialize("topic", serializedSegment);
  assertEquals(deserializedSegment.messageId, segment.messageId);
  assertEquals(deserializedSegment.messageSizeInBytes, segment.messageSizeInBytes);
  assertEquals(deserializedSegment.numberOfSegments, segment.numberOfSegments);
  assertEquals(deserializedSegment.sequenceNumber, segment.sequenceNumber);
  assertEquals(deserializedSegment.payload.limit(), 100);
  String deserializedString = stringDeserializer.deserialize("topic", deserializedSegment.payloadArray());
  assertEquals(deserializedString.length(), s.length());
}
项目:cqrs-eventsourcing-kafka    文件:KafkaCommandListener.java   
public KafkaCommandListener(String zookeeper, String groupId, ObjectMapper objectMapper,
                            EventPublisher applicationEventPublisher,
                            String aggregateRootName) {
    resolver = ActionHandlerResolver.getCurrent();

    Properties props = new Properties();
    props.put("bootstrap.servers", zookeeper);
    props.put("group.id", groupId);
    props.put("key.deserializer", StringDeserializer.class);
    props.put("value.deserializer", StringDeserializer.class);
    props.put("enable.auto.commit", "false");
    props.put("auto.offset.reset", "earliest");

    this.consumer = new KafkaConsumer(props);
    this.objectMapper = objectMapper;
    this.applicationEventPublisher = applicationEventPublisher;
    this.aggregateRootName = aggregateRootName;

    this.applicationEventsStream = aggregateRootName + APPLICATION_EVENTS;
}
项目:Microservices-Deployment-Cookbook    文件:GeoLocationConsumer.java   
public void run() {
    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.99.100:9092");
    props.put("group.id", "geolocationConsumer");
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());

    try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
        consumer.subscribe(Arrays.asList("geolocations"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", 
                        record.offset(), 
                        record.key(), 
                        record.value());

                REPO.addGeoLocation(GSON.fromJson(record.value(), GeoLocation.class));
            }
        }
    } catch (Exception e) {
        System.err.println("Error while consuming geolocations. Details: " + e.getMessage());
    }
}
项目:jeesuite-libs    文件:KafkaConsumerCommand.java   
private KafkaConsumer<String, Serializable> getConsumer(String groupId) {

    KafkaConsumer<String, Serializable> kafkaConsumer = null; 
    if ((kafkaConsumer = kafkaConsumers.get(groupId))!= null)
        return kafkaConsumer;

    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    kafkaConsumer = new KafkaConsumer<String, Serializable>(properties);
    kafkaConsumers.put(groupId, kafkaConsumer);
    return kafkaConsumer;

}
项目:incubator-pulsar    文件:ConsumerExample.java   
public static void main(String[] args) {
    String topic = "persistent://sample/standalone/ns/my-topic";

    Properties props = new Properties();
    props.put("bootstrap.servers", "pulsar://localhost:6650");
    props.put("group.id", "my-subscription-name");
    props.put("enable.auto.commit", "false");
    props.put("key.deserializer", IntegerDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());

    Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(topic));

    while (true) {
        ConsumerRecords<Integer, String> records = consumer.poll(100);
        records.forEach(record -> {
            log.info("Received record: {}", record);
        });

        // Commit last offset
        consumer.commitSync();
    }
}
项目:KafkaExample    文件:WordCountTopology.java   
public static void main(String[] args) throws IOException {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka0:19092");
        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "zookeeper0:12181/kafka");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "words")
                .addProcessor("WordCountProcessor", WordCountProcessor::new, "SOURCE")
                .addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "WordCountProcessor")
//              .connectProcessorAndStateStores("WordCountProcessor", "Counts")
                .addSink("SINK", "count", new StringSerializer(), new IntegerSerializer(), "WordCountProcessor");

        KafkaStreams stream = new KafkaStreams(builder, props);
        stream.start();
        System.in.read();
        stream.close();
        stream.cleanUp();
    }
项目:beam    文件:KafkaIOTest.java   
@Test
public void testInferKeyCoder() {
  CoderRegistry registry = CoderRegistry.createDefault();

  assertTrue(KafkaIO.inferCoder(registry, LongDeserializer.class).getValueCoder()
          instanceof VarLongCoder);

  assertTrue(KafkaIO.inferCoder(registry, StringDeserializer.class).getValueCoder()
          instanceof StringUtf8Coder);

  assertTrue(KafkaIO.inferCoder(registry, InstantDeserializer.class).getValueCoder()
          instanceof InstantCoder);

  assertTrue(KafkaIO.inferCoder(registry, DeserializerWithInterfaces.class).getValueCoder()
          instanceof VarLongCoder);
}
项目:ksql    文件:UdfAvroIntTest.java   
@Test
public void testApplyUdfsToColumns() throws Exception {
  final String testStreamName = "SelectUDFsStream".toUpperCase();

  final String queryString = String.format(
          "CREATE STREAM %s AS SELECT %s FROM %s WHERE %s;",
          testStreamName,
          "ITEMID, ORDERUNITS*10, PRICEARRAY[0]+10, KEYVALUEMAP['key1']*KEYVALUEMAP['key2']+10, PRICEARRAY[1]>1000",
          "ORDERS",
          "ORDERUNITS > 20 AND ITEMID LIKE '%_8'"
  );

  ksqlContext.sql(queryString);

  Schema resultSchema = ksqlContext.getMetaStore().getSource(testStreamName).getSchema();

  Map<String, GenericRow> expectedResults = Collections.singletonMap("8", new GenericRow(Arrays.asList(null, null, "ITEM_8", 800.0, 1110.0, 12.0, true)));

  Map<String, GenericRow> results = testHarness.consumeData(testStreamName, resultSchema, 4, new StringDeserializer(), IntegrationTestHarness.RESULTS_POLL_MAX_TIME_MS);
  assertThat(results, equalTo(expectedResults));
}
项目:testcontainers-java-module-confluent-platform    文件:HelloConsumer.java   
@NotNull
private static Properties createConsumerProperties(String bootstrapServer) {
  Properties props = new Properties();
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroup" +new Random().nextInt());
  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
  props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  return props;
}
项目:Practical-Real-time-Processing-and-Analytics    文件:VehicleDataGeneration.java   
private static Map<String, Location> getVehicleStartPoints() {
    Map<String, Location> vehicleStartPoint = new HashMap<String, Location>();
    Properties props = new Properties();
    props.put("zookeeper.connect", ZOOKEEPER_CONNECTION_STRING);
    props.put("group.id", "DataLoader" + r.nextInt(100));
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    props.put("auto.offset.reset", "smallest");

    ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(KAFKA_TOPIC_STATIC_DATA, new Integer(1)); 

    KafkaStream<byte[], byte[]> stream = consumer.createMessageStreams(topicCountMap).get(KAFKA_TOPIC_STATIC_DATA)
            .get(0);

    ConsumerIterator<byte[], byte[]> it = stream.iterator();

    while (it.hasNext()) {
        String message = new String(it.next().message());
        try {
            vehicleStartPoint = objectMapper.readValue(message, new TypeReference<Map<String, Location>>() {
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
        break;
    }
    consumer.shutdown();
    return vehicleStartPoint;
}
项目:cruise-control    文件:CruiseControlMetricsReporterTest.java   
@Test
public void testReportingMetrics() {
  Properties props = new Properties();
  props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
  props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MetricSerde.class.getName());
  props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testReportingMetrics");
  props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  setSecurityConfigs(props, "consumer");
  Consumer<String, CruiseControlMetric> consumer = new KafkaConsumer<>(props);

  ConsumerRecords<String, CruiseControlMetric> records = ConsumerRecords.empty();
  consumer.subscribe(Collections.singletonList(TOPIC));
  long startMs = System.currentTimeMillis();
  Set<Integer> metricTypes = new HashSet<>();
  while (metricTypes.size() < 16 && System.currentTimeMillis() < startMs + 15000) {
    records = consumer.poll(10);
    for (ConsumerRecord<String, CruiseControlMetric> record : records) {
      metricTypes.add((int) record.value().metricType().id());
    }
  }
  HashSet<Integer> expectedMetricTypes = new HashSet<>(Arrays.asList((int) ALL_TOPIC_BYTES_IN.id(),
                                                                     (int) ALL_TOPIC_BYTES_OUT.id(),
                                                                     (int) TOPIC_BYTES_IN.id(),
                                                                     (int) TOPIC_BYTES_OUT.id(),
                                                                     (int) PARTITION_SIZE.id(),
                                                                     (int) BROKER_CPU_UTIL.id(),
                                                                     (int) ALL_TOPIC_PRODUCE_REQUEST_RATE.id(),
                                                                     (int) ALL_TOPIC_FETCH_REQUEST_RATE.id(),
                                                                     (int) ALL_TOPIC_MESSAGES_IN_PER_SEC.id(),
                                                                     (int) TOPIC_PRODUCE_REQUEST_RATE.id(),
                                                                     (int) TOPIC_FETCH_REQUEST_RATE.id(),
                                                                     (int) TOPIC_MESSAGES_IN_PER_SEC.id(),
                                                                     (int) BROKER_PRODUCE_REQUEST_RATE.id(),
                                                                     (int) BROKER_CONSUMER_FETCH_REQUEST_RATE.id(),
                                                                     (int) BROKER_FOLLOWER_FETCH_REQUEST_RATE.id(),
                                                                     (int) BROKER_REQUEST_HANDLER_AVG_IDLE_PERCENT.id()));
  assertEquals("Expected to see " + expectedMetricTypes + ", but only see " + metricTypes, metricTypes, expectedMetricTypes);
}
项目:nighthawk    文件:ListenableTracingConsumerTest.java   
@Test
public void test() throws Exception {
    Properties props = new Properties();
    props.put("bootstrap.servers", "127.0.0.1:9092");//该地址是集群的子集,用来探测集群。
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
    props.put("acks", "all");// 记录完整提交,最慢的但是最大可能的持久化
    props.put("retries", 3);// 请求失败重试的次数
    props.put("batch.size", 16384);// batch的大小
    props.put("linger.ms", 1);// 默认情况即使缓冲区有剩余的空间,也会立即发送请求,设置一段时间用来等待从而将缓冲区填的更多,单位为毫秒,producer发送数据会延迟1ms,可以减少发送到kafka服务器的请求数据
    props.put("buffer.memory", 33554432);// 提供给生产者缓冲内存总量
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer.tracing.codec", Codec.JSON);
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    Consumer<String, byte[]> consumer = new KafkaConsumer<>(props);
    ListenableTracingConsumer<String, String> listenableTracingConsumer =
            new ListenableTracingConsumer<>(consumer, Pattern.compile("test"), new StringDeserializer());
    BraveFactoryBean factoryBean = new BraveFactoryBean();
    factoryBean.setServiceName("kafka-test");
    factoryBean.setTransport("http");
    factoryBean.setTransportAddress("127.0.0.1:9411");
    factoryBean.afterPropertiesSet();
    Brave brave = factoryBean.getObject();
    listenableTracingConsumer.addListener(new AbstractTracingListener<String, String>(brave) {
        @Override
        public void onPayload(Payload<String, String> payload) {
            try {
                System.out.println("key: " + payload.key());
                System.out.println("value: " + payload.value());
                Sleeper.JUST.sleepFor(2000, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    });
    listenableTracingConsumer.start();
    System.in.read();
}
项目:mtgo-best-bot    文件:BotConfig.java   
@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    // allows a pool of processes to divide the work of consuming and processing records
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "bot");

    return props;
}
项目:open-kilda    文件:MessageConsumerConfig.java   
/**
 * Kafka consumer configuration bean. This {@link Map} is used by {@link MessageConsumerConfig#consumerFactory}.
 *
 * @return kafka properties
 */
@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHosts);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    //props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    //props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return props;
}