Java 类org.apache.kafka.clients.consumer.KafkaConsumer 实例源码

项目:WiFiProbeAnalysis    文件:KafkaConsumerForHive.java   
public String receive() {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    consumer.subscribe(Arrays.asList(properties.getProperty("topic")));
    final int minBatchSize = 200;
    List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);

        for (ConsumerRecord<String, String> record : records) {
            buffer.add(record);
            System.err.println(buffer.size() + "----->" + record);

        }
        if (buffer.size() >= minBatchSize) {
            writeFileToHadoop(buffer);//先把buffer写入文件中
            consumer.commitSync();
            buffer.clear();
        }
    }
}
项目:echo    文件:KafkaFlowFilesConsumer.java   
@OnScheduled
public void onScheduled(final ProcessContext context) {
    try {
        topic = context.getProperty(TOPIC).getValue();
        groupName = context.getProperty(CONSUMER_GROUP_NAME).getValue();
        brokerIP = context.getProperty(BROKERIP).getValue();
        props = new Properties();
        props.put("bootstrap.servers", brokerIP);
        props.put("group.id", groupName);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest");
        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(topic));
    } catch (Exception e) {
        e.printStackTrace();
    }
}
项目:WiFiProbeAnalysis    文件:KafkaConsumers.java   
public List<String> receive() {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    consumer.subscribe(Arrays.asList(properties.getProperty("topic")));
    List<String> buffer = new ArrayList<String>();
    String msg = "";
    while (true) {
        System.err.println("consumer receive------------------");
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            buffer.add(record.value());
        }
        consumer.close();
        return buffer;
    }


}
项目:post-kafka-rewind-consumer-offset    文件:KafkaConsumerFromOffset.java   
public static void main(String[] args) {
    KafkaConsumer<String, String> consumer = createConsumer();
    consumer.subscribe(Arrays.asList(TOPIC));

    boolean flag = true;


    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        if (flag) {
            Set<TopicPartition> assignments = consumer.assignment();
            assignments.forEach(topicPartition ->
                    consumer.seek(
                            topicPartition,
                            90));
            flag = false;
        }


        for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }


}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamsResetter.java   
private void maybeSeekToEnd(final KafkaConsumer<byte[], byte[]> client, final Set<TopicPartition> intermediateTopicPartitions) {

        final String groupId = options.valueOf(applicationIdOption);
        final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);

        if (intermediateTopicPartitions.size() > 0) {
            if (!dryRun) {
                client.seekToEnd(intermediateTopicPartitions);
            } else {
                System.out.println("Following intermediate topics offsets will be reset to end (for consumer group " + groupId + ")");
                for (final String topic : intermediateTopics) {
                    if (allTopics.contains(topic)) {
                        System.out.println("Topic: " + topic);
                    }
                }
            }
        }

    }
项目:kafka-0.11.0.0-src-with-comment    文件:BrokerCompatibilityTest.java   
private static void loopUntilRecordReceived(final String kafka, final boolean eosEnabled) {
    final Properties consumerProperties = new Properties();
    consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
    consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "broker-compatibility-consumer");
    consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    if (eosEnabled) {
        consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
    }

    final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
    consumer.subscribe(Collections.singletonList(SINK_TOPIC));

    while (true) {
        final ConsumerRecords<String, String> records = consumer.poll(100);
        for (final ConsumerRecord<String, String> record : records) {
            if (record.key().equals("key") && record.value().equals("value")) {
                consumer.close();
                return;
            }
        }
    }
}
项目:big-data-benchmark    文件:TradeTestConsumer.java   
public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", args[0]);
    props.setProperty("group.id", UUID.randomUUID().toString());
    props.setProperty("key.deserializer", LongDeserializer.class.getName());
    props.setProperty("value.deserializer", TradeDeserializer.class.getName());
    props.setProperty("auto.offset.reset", "earliest");
    KafkaConsumer<Long, Trade> consumer = new KafkaConsumer<>(props);
    List<String> topics = Arrays.asList(args[1]);
    consumer.subscribe(topics);
    System.out.println("Subscribed to topics " + topics);
    long count = 0;
    long start = System.nanoTime();
    while (true) {
        ConsumerRecords<Long, Trade> poll = consumer.poll(5000);
        System.out.println("Partitions in batch: " + poll.partitions());
        LongSummaryStatistics stats = StreamSupport.stream(poll.spliterator(), false)
                                                                   .mapToLong(r -> r.value().getTime()).summaryStatistics();
        System.out.println("Oldest record time: " + stats.getMin() + ", newest record: " + stats.getMax());
        count += poll.count();
        long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
        long rate = (long) ((double) count / elapsed * 1000);
        System.out.printf("Total count: %,d in %,dms. Average rate: %,d records/s %n", count, elapsed, rate);

    }
}
项目:DBus    文件:KafkaConsumerEvent.java   
public KafkaConsumerEvent(String topic) {
    super(0l);
    this.topic = topic;
    Properties props = HeartBeatConfigContainer.getInstance().getKafkaConsumerConfig();
    Properties producerProps = HeartBeatConfigContainer.getInstance().getKafkaProducerConfig();
    try {
        dataConsumer = new KafkaConsumer<>(props);
        partition0 = new TopicPartition(this.topic, 0);
        dataConsumer.assign(Arrays.asList(partition0));
        dataConsumer.seekToEnd(Arrays.asList(partition0));
        KafkaConsumerContainer.getInstances().putConsumer(this.topic, dataConsumer);

        statProducer = new KafkaProducer<>(producerProps);
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    startTime = System.currentTimeMillis();
}
项目:DBus    文件:KafkaReader.java   
/**
 * createConsumer - create a new consumer
 * @return
 * @throws Exception
 */
private Consumer<String, String> createConsumer() throws Exception {

    // Seek to end automatically
    TopicPartition dataTopicPartition = new TopicPartition(topicName, 0);
    List<TopicPartition> topics = Arrays.asList(dataTopicPartition);

    Properties props = ConfUtils.getProps(CONSUMER_PROPS);
    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.assign(topics);

    if(offset == -1){
        consumer.seekToEnd(topics);
        logger.info("Consumer seek to end");
    }else{
        consumer.seek(dataTopicPartition, offset);
        logger.info(String.format("read changed as offset: %s", consumer.position(dataTopicPartition)));
    }
    return consumer;
}
项目:stroom-stats    文件:StatisticsAggregationProcessor.java   
private void cleanUp(KafkaConsumer<StatEventKey, StatAggregate> kafkaConsumer, int unCommittedRecCount) {

        //force a flush of anything in the aggregator
        if (statAggregator != null) {
            LOGGER.debug("Forcing a flush of aggregator {} on processor {}", statAggregator, this);
            flushAggregator();
        }
        if (kafkaConsumer != null) {
            if (unCommittedRecCount > 0) {
                LOGGER.debug("Committing kafka offset on processor {}", this);
                kafkaConsumer.commitSync();
            }
            LOGGER.debug("Closing kafka consumer on processor {}", this);
            kafkaConsumer.close();
        }
    }
项目:kalinka    文件:KafkaMessageConsumer2.java   
@SuppressWarnings("unchecked")
public KafkaMessageConsumer2(final Map<String, Object> consumerConfig, final String topic, final ISenderProvider<T> senderProvider,
        final MessagePublisherProvider<T, K, V> publisherProvider) {
    this.consumer = new KafkaConsumer<>(consumerConfig);
    this.pollTimeout = (Long) consumerConfig.get(Constants.KAFKA_POLL_TIMEOUT);
    this.senderProvider = senderProvider;
    this.publisherProvider = publisherProvider;

    final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);

    final List<String> subscribedPartitions = (List<String>) consumerConfig.get(Constants.KAFKA_SUBSCRIBED_PARTITIONS);
    final Collection<TopicPartition> partitions = partitionInfos.stream().filter(p -> subscribedPartitions.contains(Integer.valueOf(p.partition())))
            .map(p -> new TopicPartition(p.topic(), p.partition())).collect(Collectors.toList());
    LOG.info("Assigning to topic={}, partitions={}", topic, partitions);
    this.consumer.assign(partitions);
}
项目:kafka-docker-demo    文件:ConsumerDemo.java   
public static void main(String[] args) throws Exception {
    Properties props = new Properties();

    props.put("bootstrap.servers", "192.168.77.7:9092,192.168.77.7:9093,192.168.77.7:9094");
    props.put("group.id", "test-group-id");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

    consumer.subscribe(Collections.singletonList("test"));

    System.out.println("Subscribed to topic test");

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)

            System.out.println(String.format("offset = %s, key = %s, value = %s", record.offset(), record.key(), record.value()));
    }
}
项目:ja-micro    文件:TopicVerification.java   
public boolean verifyTopicsExist(String kafkaBrokers, Set<String> requiredTopics,
                                 boolean checkPartitionCounts) {
    Properties props = new Properties();
    props.put("bootstrap.servers", kafkaBrokers);
    props.put("group.id", UUID.randomUUID().toString());
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    KafkaConsumer consumer = new KafkaConsumer(props);
    try {
        @SuppressWarnings("unchecked")
        Map<String, List<PartitionInfo>> topics = consumer.listTopics();

        Set<Integer> partitionCount = new HashSet<>();
        for (String requiredTopic : requiredTopics) {
            List<PartitionInfo> partitions = topics.get(requiredTopic);
            if (partitions == null) {
                logger.info("Required kafka topic {} not present", requiredTopic);
                return false;
            }
            partitionCount.add(partitions.size());
        }
        if (checkPartitionCounts && partitionCount.size() > 1) {
            logger.warn("Partition count mismatch in topics {}",
                    Arrays.toString(requiredTopics.toArray()));
            return false;
        }
        return true;
    } finally {
        consumer.close();
    }
}
项目:kafka-junit    文件:KafkaTestUtils.java   
/**
 * This will consume all records from all partitions on the given topic.
 * @param topic Topic to consume from.
 * @return List of ConsumerRecords consumed.
 */
public List<ConsumerRecord<byte[], byte[]>> consumeAllRecordsFromTopic(final String topic) {
    // Connect to broker to determine what partitions are available.
    KafkaConsumer<byte[], byte[]> kafkaConsumer = kafkaTestServer.getKafkaConsumer(
        ByteArrayDeserializer.class,
        ByteArrayDeserializer.class
    );

    final List<Integer> partitionIds = new ArrayList<>();
    for (PartitionInfo partitionInfo: kafkaConsumer.partitionsFor(topic)) {
        partitionIds.add(partitionInfo.partition());
    }
    kafkaConsumer.close();

    return consumeAllRecordsFromTopic(topic, partitionIds);
}
项目:kafka-junit    文件:KafkaTestUtils.java   
/**
 * This will consume all records from only the partitions given.
 * @param topic Topic to consume from.
 * @param partitionIds Collection of PartitionIds to consume.
 * @return List of ConsumerRecords consumed.
 */
public List<ConsumerRecord<byte[], byte[]>> consumeAllRecordsFromTopic(final String topic, Collection<Integer> partitionIds) {
    // Create topic Partitions
    List<TopicPartition> topicPartitions = new ArrayList<>();
    for (Integer partitionId: partitionIds) {
        topicPartitions.add(new TopicPartition(topic, partitionId));
    }

    // Connect Consumer
    KafkaConsumer<byte[], byte[]> kafkaConsumer =
        kafkaTestServer.getKafkaConsumer(ByteArrayDeserializer.class, ByteArrayDeserializer.class);

    // Assign topic partitions & seek to head of them
    kafkaConsumer.assign(topicPartitions);
    kafkaConsumer.seekToBeginning(topicPartitions);

    // Pull records from kafka, keep polling until we get nothing back
    final List<ConsumerRecord<byte[], byte[]>> allRecords = new ArrayList<>();
    ConsumerRecords<byte[], byte[]> records;
    do {
        // Grab records from kafka
        records = kafkaConsumer.poll(2000L);
        logger.info("Found {} records in kafka", records.count());

        // Add to our array list
        records.forEach(allRecords::add);

    }
    while (!records.isEmpty());

    // close consumer
    kafkaConsumer.close();

    // return all records
    return allRecords;
}
项目:j1st-mqtt    文件:KafkaBrokerCommunicator.java   
@Override
public void init(AbstractConfiguration config, String brokerId, BrokerListenerFactory factory) {
    init(config);

    BROKER_TOPIC = BROKER_TOPIC_PREFIX + "." + brokerId;

    logger.trace("Initializing Kafka consumer ...");

    // consumer config
    Properties props = new Properties();
    props.put("bootstrap.servers", config.getString("bootstrap.servers"));
    props.put("group.id", UUIDs.shortUuid());
    props.put("enable.auto.commit", "true");
    props.put("key.serializer", StringSerializer.class.getName());
    props.put("value.serializer", InternalMessageSerializer.class.getName());

    // consumer
    this.consumer = new KafkaConsumer<>(props);

    // consumer worker
    this.worker = new KafkaBrokerWorker(this.consumer, BROKER_TOPIC, factory.newListener());
    this.executor.submit(this.worker);
}
项目:kafka-webview    文件:WebKafkaConsumerFactory.java   
/**
 * Create a WebSocket Consumer Client.  These instances are intended to be long lived
 * and run in the background, streaming consumed records to a Web Socket.
 * @param view What view to consume from.
 * @param filterDefinitions Any additional filters to apply.
 * @param startingPosition Defines where the Socket consumer should resume from.
 * @param sessionIdentifier An identifier for the consumer.
 */
public SocketKafkaConsumer createWebSocketClient(
    final View view,
    final Collection<FilterDefinition> filterDefinitions,
    final StartingPosition startingPosition,
    final SessionIdentifier sessionIdentifier) {
    // Create client config builder
    final ClientConfig clientConfig = createClientConfig(view, filterDefinitions, sessionIdentifier)
        .withStartingPosition(startingPosition)
        .build();

    // Create kafka consumer
    final KafkaConsumer kafkaConsumer = createKafkaConsumer(clientConfig);

    // Create consumer
    return new SocketKafkaConsumer(kafkaConsumer, clientConfig);
}
项目:kafka-0.11.0.0-src-with-comment    文件:TransactionalMessageCopier.java   
private static KafkaConsumer<String, String> createConsumer(Namespace parsedArgs) {
    String consumerGroup = parsedArgs.getString("consumerGroup");
    String brokerList = parsedArgs.getString("brokerList");
    Integer numMessagesPerTransaction = parsedArgs.getInt("messagesPerTransaction");

    Properties props = new Properties();

    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, numMessagesPerTransaction.toString());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");

    return new KafkaConsumer<>(props);
}
项目:microservices-transactions-tcc    文件:CompositeTransactionManagerKafkaImpl.java   
@SuppressWarnings("unchecked")
@Override
public List<EntityCommand<?>> fetch(String txId) {
    List<EntityCommand<?>> transactionOperations = new ArrayList<EntityCommand<?>>();

    Map<String, Object> consumerConfigs = (Map<String, Object>)configuration.get("kafkaConsumerConfiguration");
    consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());

    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(consumerConfigs);
    kafkaConsumer.subscribe(Arrays.asList(txId));

    ConsumerRecords<String, String> records = kafkaConsumer.poll(kafkaConsumerPollTimeout);
    for (ConsumerRecord<String, String> record : records){
        LOG.info("offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value());
        try {
            transactionOperations.add(serializer.readFromString(record.value()));
        } catch (SerializationFailedException e) {
            LOG.error("Unable to deserialize [{}] because of: {}", record.value(), e.getMessage());
        }
    }

    kafkaConsumer.close();

    return transactionOperations;
}
项目:flume-release-1.7.0    文件:KafkaPartitionTestUtil.java   
/**
 * Return a map containing one List of records per partition.
 * This internally creates a Kafka Consumer using the provided consumer properties.
 *
 * @param numPtns
 * @param consumerProperties
 * @return A Map of Partitions(Integer) and the resulting List of messages (byte[]) retrieved
 */
public static Map<Integer, List<byte[]>> retrieveRecordsFromPartitions(String topic, int numPtns,
                                                                 Properties consumerProperties) {

  Map<Integer, List<byte[]>> resultsMap = new HashMap<Integer, List<byte[]>>();
  for (int i = 0; i < numPtns; i++) {
    List<byte[]> partitionResults = new ArrayList<byte[]>();
    resultsMap.put(i, partitionResults);
    KafkaConsumer<String, byte[]> consumer =
        new KafkaConsumer<String, byte[]>(consumerProperties);

    TopicPartition partition = new TopicPartition(topic, i);

    consumer.assign(Arrays.asList(partition));

    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
    for (ConsumerRecord<String, byte[]> record : records) {
      partitionResults.add(record.value());
    }
    consumer.close();
  }
  return resultsMap;
}
项目:kafka-0.11.0.0-src-with-comment    文件:EosTestDriver.java   
public static void verify(final String kafka) {
    ensureStreamsApplicationDown(kafka);

    final Map<TopicPartition, Long> committedOffsets = getCommittedOffsets(kafka);

    final Properties props = new Properties();
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));

    try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
        final List<TopicPartition> partitions = getAllPartitions(consumer, "data", "echo", "min", "sum");
        consumer.assign(partitions);
        consumer.seekToBeginning(partitions);

        final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition
            = getOutputRecords(consumer, committedOffsets);

        truncate("data", recordPerTopicPerPartition, committedOffsets);

        verifyMin(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("min"));
        verifySum(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("sum"));

        verifyAllTransactionFinished(consumer, kafka);

        // do not modify: required test output
        System.out.println("ALL-RECORDS-DELIVERED");
    } catch (final Exception e) {
        e.printStackTrace(System.err);
        System.out.println("FAILED");
    }
}
项目:talk-kafka-messaging-logs    文件:ConsumerLoop.java   
public ConsumerLoop(int id,
                    String groupId,
                    List<String> topics) {
    this.id = id;
    this.topics = topics;
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    this.consumer = new KafkaConsumer<>(props);
}
项目:wngn-jms-kafka    文件:ComsumerDemo2.java   
public static void main(String[] args) {
    Map<String, Object> configs = new HashMap<String, Object>();
    // bootstrap.servers指定一个或多个broker,不用指定全部的broker,它将自动发现集群中的其余的borker。
    configs.put("bootstrap.servers", "192.168.0.107:9092,192.168.0.108:9092,192.168.0.109:9092");
    configs.put("group.id", "kafka-test");
    // 是否自动确认offset
    configs.put("enable.auto.commit", "false");
    // 自动确认offset的时间间隔
    configs.put("auto.commit.interval.ms", "1000");
    configs.put("session.timeout.ms", "30000");

    configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
    // 消费者订阅的topic, 可同时订阅多个
    consumer.subscribe(Arrays.asList("kafka-test"));

    final int minBatchSize = 200;
    List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            buffer.add(record);
        }
        // 数据达到批量要求,就写入DB,同步确认offset
        if (buffer.size() >= minBatchSize) {
            // insertIntoDb(buffer);
            consumer.commitSync();
            buffer.clear();
        }
    }
}
项目:bullet-kafka    文件:KafkaSubscriberTest.java   
@Test
public void testMessageFail() throws PubSubException {
    String randomMessage = UUID.randomUUID().toString();
    String randomID = UUID.randomUUID().toString();

    KafkaConsumer<String, byte[]> consumer = makeMockConsumer(randomID, randomMessage);
    KafkaSubscriber subscriber = new KafkaSubscriber(consumer, 50);
    PubSubMessage message = subscriber.receive();
    Assert.assertNotNull(message);
    subscriber.fail(message.getId());
    Assert.assertTrue(getAndCheck(randomMessage, randomID, subscriber));
}
项目:mapr-music    文件:CdcStatisticService.java   
@PostConstruct
public void init() {

    recomputeStatistics();

    Properties consumerProperties = new Properties();
    consumerProperties.setProperty("group.id", "mapr.music.statistics");
    consumerProperties.setProperty("enable.auto.commit", "true");
    consumerProperties.setProperty("auto.offset.reset", "latest");
    consumerProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    consumerProperties.setProperty("value.deserializer", "com.mapr.db.cdc.ChangeDataRecordDeserializer");

    loginTestUser(MAPR_USER_NAME, MAPR_USER_GROUP);

    // Create and adjust consumer which is used to consume MapR-DB CDC events for Albums table.
    KafkaConsumer<byte[], ChangeDataRecord> albumsChangelogConsumer = new KafkaConsumer<>(consumerProperties);
    albumsChangelogConsumer.subscribe(Collections.singletonList(ALBUMS_CHANGE_LOG));
    ChangeDataRecordHandler albumsHandler = new ChangeDataRecordHandler(albumsChangelogConsumer);
    albumsHandler.setOnDelete((id) -> decrementAlbums());
    albumsHandler.setOnInsert((id) -> incrementAlbums());

    // Create and adjust consumer which is used to consume MapR-DB CDC events for Artists table.
    KafkaConsumer<byte[], ChangeDataRecord> artistsChangelogConsumer = new KafkaConsumer<>(consumerProperties);
    artistsChangelogConsumer.subscribe(Collections.singletonList(ARTISTS_CHANGE_LOG));
    ChangeDataRecordHandler artistsHandler = new ChangeDataRecordHandler(artistsChangelogConsumer);
    artistsHandler.setOnDelete((id) -> decrementArtists());
    artistsHandler.setOnInsert((id) -> incrementArtists());

    threadFactory.newThread(albumsHandler).start();
    threadFactory.newThread(artistsHandler).start();
}
项目:storm-dynamic-spout    文件:Consumer.java   
/**
 * Get the kafka consumer, if it has been retried yet, set it up.
 * @return Kafka consumer
 */
KafkaConsumer<byte[], byte[]> getKafkaConsumer() {
    // If kafkaConsumer is not null, we'll create one.
    // If it is NOT null, we'll re-use the instance we got passed in from the constructor.
    // Typically you'd pass in an instance for testing.
    if (kafkaConsumer == null) {
        // Construct new consumer
        kafkaConsumer = new KafkaConsumer<>(getConsumerConfig().getKafkaConsumerProperties());
    }

    return kafkaConsumer;
}
项目:nighthawk    文件:ListenableTracingConsumerTest.java   
@Test
public void test() throws Exception {
    Properties props = new Properties();
    props.put("bootstrap.servers", "127.0.0.1:9092");//该地址是集群的子集,用来探测集群。
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
    props.put("acks", "all");// 记录完整提交,最慢的但是最大可能的持久化
    props.put("retries", 3);// 请求失败重试的次数
    props.put("batch.size", 16384);// batch的大小
    props.put("linger.ms", 1);// 默认情况即使缓冲区有剩余的空间,也会立即发送请求,设置一段时间用来等待从而将缓冲区填的更多,单位为毫秒,producer发送数据会延迟1ms,可以减少发送到kafka服务器的请求数据
    props.put("buffer.memory", 33554432);// 提供给生产者缓冲内存总量
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer.tracing.codec", Codec.JSON);
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    Consumer<String, byte[]> consumer = new KafkaConsumer<>(props);
    ListenableTracingConsumer<String, String> listenableTracingConsumer =
            new ListenableTracingConsumer<>(consumer, Pattern.compile("test"), new StringDeserializer());
    BraveFactoryBean factoryBean = new BraveFactoryBean();
    factoryBean.setServiceName("kafka-test");
    factoryBean.setTransport("http");
    factoryBean.setTransportAddress("127.0.0.1:9411");
    factoryBean.afterPropertiesSet();
    Brave brave = factoryBean.getObject();
    listenableTracingConsumer.addListener(new AbstractTracingListener<String, String>(brave) {
        @Override
        public void onPayload(Payload<String, String> payload) {
            try {
                System.out.println("key: " + payload.key());
                System.out.println("value: " + payload.value());
                Sleeper.JUST.sleepFor(2000, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    });
    listenableTracingConsumer.start();
    System.in.read();
}
项目:AthenaX    文件:ITestUtil.java   
static KafkaConsumer<byte[], byte[]> getConsumer(String groupName, String brokerList) {
  Properties prop = new Properties();
  prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
  prop.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
  prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupName);
  prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName());
  prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getCanonicalName());
  return new KafkaConsumer<>(prop);
}
项目:bireme    文件:KafkaPipeLine.java   
/**
 * Create a new KafkaConsumer, specify the server's ip and port, and groupID.
 *
 * @param server ip and port for Kafka server
 * @param groupID consumer's group id
 * @return the consumer
 */
public static KafkaConsumer<String, String> createConsumer(String server, String groupID) {
  Properties props = new Properties();
  props.put("bootstrap.servers", server);
  props.put("group.id", groupID);
  props.put("enable.auto.commit", false);
  props.put("session.timeout.ms", 60000);
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("auto.offset.reset", "earliest");
  return new KafkaConsumer<String, String>(props);
}
项目:apache-kafka-demos    文件:Consumer_A.java   
public static void main(String[] args) throws InterruptedException {

        Properties props = new Properties();
        props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(GROUP_ID_CONFIG, "a");
        props.put(ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Arrays.asList("produktion"));

        System.out.println("Consumer A gestartet!");

        while(true) {

            ConsumerRecords<String, String> records = consumer.poll(1000);
            if (records.count() == 0)
                continue;

            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset= %d, key= %s, value= %s\n", record.offset(), record.key(), record.value());

        }
    }
项目:bullet-kafka    文件:KafkaSubscriberTest.java   
@Test
public void testMaxUnackedMessages() throws PubSubException {
    String randomMessage = UUID.randomUUID().toString();
    String randomID = UUID.randomUUID().toString();

    KafkaConsumer<String, byte[]> consumer = makeMockConsumer(randomID, randomMessage);
    KafkaSubscriber subscriber = new KafkaSubscriber(consumer, 1);
    // Multiple receives without a commit.
    Assert.assertNotNull(subscriber.receive());
    Assert.assertNull(subscriber.receive());
}
项目:DBus    文件:DataTableResource.java   
@GET
@Path("/readKafkaTopic")
public Response readKafkaTopic(Map<String, Object > map) {
    try {
        Properties properties = PropertiesUtils.getProps("consumer.properties");
        properties.setProperty("client.id","readKafkaTopic");
        properties.setProperty("group.id","readKafkaTopic");
        //properties.setProperty("bootstrap.servers", "localhost:9092");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        String topic = map.get("topic").toString();
        //System.out.println("topic="+topic);
        TopicPartition topicPartition = new TopicPartition(topic, 0);
        List<TopicPartition> topics = Arrays.asList(topicPartition);
        consumer.assign(topics);
        consumer.seekToEnd(topics);
        long current = consumer.position(topicPartition);
        long end = current;
        current -= 1000;
        if(current < 0) current = 0;
        consumer.seek(topicPartition, current);
        List<String> result = new ArrayList<>();
        while (current < end) {
            //System.out.println("topic position = "+current);
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                result.add(record.value());
                //System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            current = consumer.position(topicPartition);
        }
        consumer.close();
        return Response.ok().entity(result).build();
    } catch (Exception e) {
        logger.error("Error encountered while readKafkaTopic with parameter:{}", JSON.toJSONString(map), e);
        return Response.status(204).entity(new Result(-1, e.getMessage())).build();
    }
}
项目:storm-dynamic-spout    文件:ConsumerTest.java   
/**
 * Calling nextRecord calls fillBuffer() which has special handling for OutOfRangeExceptions.  There is some
 * recursion in this method, but it should not go on forever and yield a StackOverflow exception.  This test verifies
 * that when the KafkaConsumer is relentlessly throwing OutOfRangeExceptions that we stop trying to fill the buffer
 * after five attempts and do not have any other errors.
 */
@Test
public void testNextRecordWithRecursiveOutOfRangeException() {
    final KafkaConsumer<byte[], byte[]> kafkaConsumer = Mockito.mock(KafkaConsumer.class);

    Mockito.when(kafkaConsumer.assignment()).thenReturn(Sets.newHashSet(new TopicPartition("Foobar", 0)));
    Mockito.when(kafkaConsumer.partitionsFor(topicName)).thenReturn(Arrays.asList(
        new PartitionInfo(topicName, 0, null, null, null)
    ));
    Mockito.when(kafkaConsumer.poll(300)).thenThrow(
        new OffsetOutOfRangeException(new HashMap<>())
    );

    final PersistenceAdapter persistenceAdapter = new InMemoryPersistenceAdapter();
    persistenceAdapter.open(Maps.newHashMap());

    // Create our consumer
    final Consumer consumer = new Consumer(kafkaConsumer);
    consumer.open(
        getDefaultConfig(topicName),
        getDefaultVSpoutId(),
        getDefaultConsumerCohortDefinition(),
        persistenceAdapter,
        new LogRecorder(),
        null
    );

    final Record record = consumer.nextRecord();

    assertNull(record);

    Mockito.verify(kafkaConsumer, Mockito.times(5)).poll(300);

    consumer.close();
}
项目:stroom-stats    文件:StatisticsAggregationProcessor.java   
private KafkaConsumer<StatEventKey, StatAggregate> buildConsumer() {

        try {
            Map<String, Object> props = getConsumerProps();
            LOGGER.debug(() ->
                "Starting aggregation consumer [" + instanceId + "] with properties:\n" + props.entrySet().stream()
                        .map(entry -> "    " + entry.getKey() + ": " + entry.getValue().toString())
                        .collect(Collectors.joining("\n"))
            );
            KafkaConsumer<StatEventKey, StatAggregate> kafkaConsumer = new KafkaConsumer<>(
                    props,
                    statKeySerde.deserializer(),
                    statAggregateSerde.deserializer());

            StatisticsAggregationRebalanceListener rebalanceListener = new StatisticsAggregationRebalanceListener(
                    this,
                    kafkaConsumer);

            kafkaConsumer.subscribe(Collections.singletonList(inputTopic), rebalanceListener);

            //Update our collection of partitions for later health check use
//            assignedPartitions = kafkaConsumer.partitionsFor(inputTopic).stream()
//                    .map(PartitionInfo::partition)
//                    .collect(Collectors.toList());
            setAssignedPartitions(kafkaConsumer.assignment());

            return kafkaConsumer;
        } catch (Exception e) {
            LOGGER.error(String.format("Error building consumer for topic %s on processor %s", inputTopic, this), e);
            throw e;
        }
    }
项目:stroom-stats    文件:EmbeddedKafkaIT.java   
/**
     * Start a consumer that subscribes to all embeddedKafka topics to help with debugging.
     * Dumps out the key/msg as byte arrays given that the object types may vary
     */
    private void startAllTopicsConsumer(Map<String, Object> consumerProps) throws InterruptedException {

        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(() -> {
            KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerProps,
                    Serdes.ByteArray().deserializer(),
                    Serdes.ByteArray().deserializer());
            try {
                kafkaEmbedded.consumeFromAllEmbeddedTopics(kafkaConsumer);
            } catch (Exception e) {
                throw new RuntimeException(String.format("Error subscribing to all embedded topics"), e);
            }

            try {
                while (true) {
                    ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100);
//                    ConsumerRecords<StatEventKey, StatAggregate> records = kafkaConsumer.poll(100);
//                    for (ConsumerRecord<StatEventKey, StatAggregate> record : records) {
                    for (ConsumerRecord<byte[], byte[]> record : records) {
                        LOGGER.info("consuming from topic = {}, partition = {}, offset = {}, key = {}, value = {}",
                                record.topic(), record.partition(), record.offset(), record.key(), record.value());
                    }
                }
            } finally {
                kafkaConsumer.close();
            }
        });
    }
项目:stroom-stats    文件:StatisticsFlatMappingServiceIT.java   
/**
 * Start a consume consuming from both bad events topics, log each message and add each message
 * into a map keyed by topic name
 * A {@link CountDownLatch} is returned to allow the caller to wait for the expected number of messages
 */
private CountDownLatch startBadEventsConsumer(final Map<String, Object> consumerProps, final int expectedMsgCount,
                                              List<BadStatMessage> badMessages) {

    final CountDownLatch latch = new CountDownLatch(expectedMsgCount);
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    executorService.execute(() -> {
        Thread.currentThread().setName("bad-events-consumer-thread");
        activeConsumerThreads.incrementAndGet();
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerProps,
                Serdes.String().deserializer(),
                Serdes.String().deserializer());

        //Subscribe to all bad event topics
        kafkaConsumer.subscribe(BAD_TOPICS_MAP.values());

        try {
            while (areConsumersEnabled.get()) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                if (records.count() > 0) {
                    for (ConsumerRecord<String, String> record : records) {
                        LOGGER.warn("Bad events Consumer - topic = {}, partition = {}, offset = {}, key = {}, value = {}",
                                record.topic(), record.partition(), record.offset(), record.key(), record.value());
                        badMessages.add(new BadStatMessage(record.topic(), record.key(), record.value()));
                        latch.countDown();
                    }
                    if (latch.getCount() == 0) {
                        break;
                    }
                    kafkaConsumer.commitSync();
                }
            }
        } finally {
            kafkaConsumer.close();
            activeConsumerThreads.decrementAndGet();
        }
    });
    return latch;
}
项目:azeroth    文件:KafkaConsumerCommand.java   
protected long getLogSize(KafkaConsumer<String, Serializable> kafkaConsumer, String topic,
                          int partition) {
    TopicPartition topicPartition = new TopicPartition(topic, partition);
    List<TopicPartition> asList = Arrays.asList(topicPartition);
    kafkaConsumer.assign(asList);
    kafkaConsumer.seekToEnd(asList);
    long logEndOffset = kafkaConsumer.position(topicPartition);
    return logEndOffset;
}
项目:kafka-0.11.0.0-src-with-comment    文件:EosTestDriver.java   
private static Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> getOutputRecords(final KafkaConsumer<byte[], byte[]> consumer,
                                                                                                       final Map<TopicPartition, Long> committedOffsets) {
    final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition = new HashMap<>();

    long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
    boolean allRecordsReceived = false;
    while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) {
        final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(500);

        for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) {
            maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
            addRecord(record, recordPerTopicPerPartition);
        }

        if (receivedRecords.count() > 0) {
            allRecordsReceived =
                receivedAllRecords(
                    recordPerTopicPerPartition.get("data"),
                    recordPerTopicPerPartition.get("echo"),
                    committedOffsets);
        }
    }

    if (!allRecordsReceived) {
        throw new RuntimeException("FAIL: did not receive all records after 30 sec idle time.");
    }

    return recordPerTopicPerPartition;
}
项目:doctorkafka    文件:BrokerStatsFilter.java   
public static void main(String[] args) throws Exception {
  CommandLine commandLine = parseCommandLine(args);
  String brokerStatsZk = commandLine.getOptionValue(BROKERSTATS_ZOOKEEPER);
  String brokerStatsTopic = commandLine.getOptionValue(BROKERSTATS_TOPIC);
  String brokerName = commandLine.getOptionValue(BROKERNAME);
  Set<String> brokerNames = new HashSet<>();
  brokerNames.add(brokerName);

  KafkaConsumer kafkaConsumer = KafkaUtils.getKafkaConsumer(brokerStatsZk,
      "org.apache.kafka.common.serialization.ByteArrayDeserializer",
      "org.apache.kafka.common.serialization.ByteArrayDeserializer", 1);

  long startTimestampInMillis = System.currentTimeMillis() - 86400 * 1000L;
  Map<TopicPartition, Long> offsets = ReplicaStatsManager.getProcessingStartOffsets(
      kafkaConsumer, brokerStatsTopic, startTimestampInMillis);
  kafkaConsumer.unsubscribe();
  kafkaConsumer.assign(offsets.keySet());
  Map<TopicPartition, Long> latestOffsets = kafkaConsumer.endOffsets(offsets.keySet());
  kafkaConsumer.close();

  Map<Long, BrokerStats> brokerStatsMap = new TreeMap<>();
  for (TopicPartition topicPartition : offsets.keySet()) {
    LOG.info("Start processing {}", topicPartition);
    long startOffset = offsets.get(topicPartition);
    long endOffset = latestOffsets.get(topicPartition);

    List<BrokerStats> statsList = processOnePartition(brokerStatsZk, topicPartition,
        startOffset, endOffset, brokerNames);
    for (BrokerStats brokerStats : statsList) {
      brokerStatsMap.put(brokerStats.getTimestamp(), brokerStats);
    }
    LOG.info("Finished processing {}, retrieved {} records", topicPartition, statsList.size());
  }

  for (Map.Entry<Long, BrokerStats> entry: brokerStatsMap.entrySet()) {
    System.out.println(entry.getKey() + " : " + entry.getValue());
  }
}
项目:ipo    文件:KafkaConsumerThread.java   
public KafkaConsumerThread(String groupId, List<String> topics) {
    this.topics = topics;
    Properties props = new Properties();
    props.put("bootstrap.servers", "123.207.61.225:9092");
    props.put("group.id", groupId);
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    this.consumer = new KafkaConsumer<String, String>(props);
}