Java 类org.apache.kafka.clients.consumer.ConsumerRecords 实例源码

项目:WiFiProbeAnalysis    文件:KafkaConsumerForHive.java   
public String receive() {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    consumer.subscribe(Arrays.asList(properties.getProperty("topic")));
    final int minBatchSize = 200;
    List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);

        for (ConsumerRecord<String, String> record : records) {
            buffer.add(record);
            System.err.println(buffer.size() + "----->" + record);

        }
        if (buffer.size() >= minBatchSize) {
            writeFileToHadoop(buffer);//先把buffer写入文件中
            consumer.commitSync();
            buffer.clear();
        }
    }
}
项目:mapr-music    文件:CdcStatisticService.java   
@Override
public void run() {
    while (true) {

        ConsumerRecords<byte[], ChangeDataRecord> changeRecords = consumer.poll(KAFKA_CONSUMER_POLL_TIMEOUT);
        for (ConsumerRecord<byte[], ChangeDataRecord> consumerRecord : changeRecords) {

            // The ChangeDataRecord contains all the changes made to a document
            ChangeDataRecord changeDataRecord = consumerRecord.value();
            String documentId = changeDataRecord.getId().getString();

            // Handle 'RECORD_INSERT'
            if (changeDataRecord.getType() == ChangeDataRecordType.RECORD_INSERT && this.onInsert != null) {
                this.onInsert.handle(documentId);
            }

            // Handle 'RECORD_DELETE'
            if (changeDataRecord.getType() == ChangeDataRecordType.RECORD_DELETE && this.onDelete != null) {
                this.onDelete.handle(documentId);
            }

        }
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:VerifiableConsumer.java   
public void run() {
    try {
        printJson(new StartupComplete());
        consumer.subscribe(Collections.singletonList(topic), this);

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
            Map<TopicPartition, OffsetAndMetadata> offsets = onRecordsReceived(records);

            if (!useAutoCommit) {
                if (useAsyncCommit)
                    consumer.commitAsync(offsets, this);
                else
                    commitSync(offsets);
            }
        }
    } catch (WakeupException e) {
        // ignore, we are closing
    } finally {
        consumer.close();
        printJson(new ShutdownComplete());
        shutdownLatch.countDown();
    }
}
项目:WiFiProbeAnalysis    文件:KafkaConsumers.java   
public List<String> receive() {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    consumer.subscribe(Arrays.asList(properties.getProperty("topic")));
    List<String> buffer = new ArrayList<String>();
    String msg = "";
    while (true) {
        System.err.println("consumer receive------------------");
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            buffer.add(record.value());
        }
        consumer.close();
        return buffer;
    }


}
项目:newblog    文件:KafkaConsumerRunner.java   
@Override
public void run() {
    try {
        consumer.subscribe(Collections.singletonList(Config.getProperty("input_topic")));
        while (!closed.get()) {
            ConsumerRecords<String, String> records = consumer.poll(3000);
            try {
                consumer.commitSync(); // commit
            } catch (Exception ignored) {
            }
            if (records.count() > 0) {
                handler.consume(records);
            }
        }
    } catch (WakeupException e) {
        if (!closed.get()) throw e;
    } finally {
        consumer.close();
    }
}
项目:bireme    文件:KafkaPipeLine.java   
@SuppressWarnings("unchecked")
@Override
public void fillRowSet(RowSet rowSet) throws BiremeException {
  CommitCallback callback = changeSet.callback;
  HashMap<String, Long> offsets = ((KafkaCommitCallback) callback).partitionOffset;
  Row row = null;

  for (ConsumerRecord<String, String> change :
      (ConsumerRecords<String, String>) changeSet.changes) {
    row = new Row();

    if (!transform(change, row)) {
      continue;
    }

    addToRowSet(row, rowSet);
    offsets.put(change.topic() + "+" + change.partition(), change.offset());
    callback.setNewestRecord(row.produceTime);
  }

  callback.setNumOfTables(rowSet.rowBucket.size());
  rowSet.callback = callback;
}
项目:DBus    文件:AbstractMessageHandler.java   
/**
 * 处理通过kafka consumer获取到的一批消息
 *
 * @param records 待处理的消息集合
 */
public void handleMessages(ConsumerRecords<String, byte[]> records) {

    // 按记录进行处理
    for (ConsumerRecord<String, byte[]> record : records) {
        // 过滤掉暂停的topic的消息
        if (consumerListener.filterPausedTopic(record.topic())) {
            listener.increaseFlowSize(record.serializedValueSize());
            try {
                this.chooseProcessor(record.key(), record.topic()).process(record);
            } catch (Exception e) {
                logger.error("sport process error", e);
                consumerListener.seek(record);
                break;
            }
        } else {
            listener.reduceFlowSize(record.serializedValueSize());
            consumerListener.syncOffset(record);
            logger.info("The record of topic {} was skipped whose offset is {}", record.topic(), record.offset());
        }
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamThread.java   
/**
 * Get the next batch of records by polling.
 * @return Next batch of records or null if no records available.
 */
private ConsumerRecords<byte[], byte[]> pollRequests() {
    ConsumerRecords<byte[], byte[]> records = null;

    try {
        records = consumer.poll(pollTimeMs);
    } catch (final InvalidOffsetException e) {
        resetInvalidOffsets(e);
    }

    if (rebalanceException != null) {
        if (!(rebalanceException instanceof ProducerFencedException)) {
            throw new StreamsException(logPrefix + " Failed to rebalance.", rebalanceException);
        }
    }

    return records;
}
项目:kafka-streams-machine-learning-examples    文件:IntegrationTestUtils.java   
/**
 * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from are
 * already configured in the consumer).
 *
 * @param topic          Kafka topic to read messages from
 * @param consumerConfig Kafka consumer configuration
 * @param maxMessages    Maximum number of messages to read via the consumer
 * @return The KeyValue elements retrieved via the consumer
 */
public static <K, V> List<KeyValue<K, V>> readKeyValues(String topic, Properties consumerConfig, int maxMessages) {
  KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerConfig);
  consumer.subscribe(Collections.singletonList(topic));
  int pollIntervalMs = 100;
  int maxTotalPollTimeMs = 2000;
  int totalPollTimeMs = 0;
  List<KeyValue<K, V>> consumedValues = new ArrayList<>();
  while (totalPollTimeMs < maxTotalPollTimeMs && continueConsuming(consumedValues.size(), maxMessages)) {
    totalPollTimeMs += pollIntervalMs;
    ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
    for (ConsumerRecord<K, V> record : records) {
      consumedValues.add(new KeyValue<>(record.key(), record.value()));
    }
  }
  consumer.close();
  return consumedValues;
}
项目:kafka-0.11.0.0-src-with-comment    文件:WorkerSinkTaskThreadedTest.java   
@SuppressWarnings("unchecked")
private IExpectationSetters<Object> expectOnePoll() {
    // Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of
    // returning empty data, we return one record. The expectation is that the data will be ignored by the
    // response behavior specified using the return value of this method.
    EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
            new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                @Override
                public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
                    // "Sleep" so time will progress
                    time.sleep(1L);
                    ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
                            Collections.singletonMap(
                                    new TopicPartition(TOPIC, PARTITION),
                                    Arrays.asList(
                                            new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0L, 0, 0, RAW_KEY, RAW_VALUE)
                                    )));
                    recordsReturned++;
                    return records;
                }
            });
    EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
    EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
    sinkTask.put(EasyMock.anyObject(Collection.class));
    return EasyMock.expectLastCall();
}
项目:kafka-docker-demo    文件:ConsumerDemo.java   
public static void main(String[] args) throws Exception {
    Properties props = new Properties();

    props.put("bootstrap.servers", "192.168.77.7:9092,192.168.77.7:9093,192.168.77.7:9094");
    props.put("group.id", "test-group-id");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

    consumer.subscribe(Collections.singletonList("test"));

    System.out.println("Subscribed to topic test");

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)

            System.out.println(String.format("offset = %s, key = %s, value = %s", record.offset(), record.key(), record.value()));
    }
}
项目:ipo    文件:Kafka.java   
public static void main(String[] args) {
    // TODO Auto-generated method stub
    Properties props = new Properties();
    props.put("bootstrap.servers", "123.207.61.225:9092");
    props.put("group.id", "test-consumer-group");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("log.level", "info");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe(Arrays.asList("test"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
    }
}
项目:doctorkafka    文件:ReplicaStatsManager.java   
private static ConsumerRecord<byte[], byte[]> retrieveOneMessage(KafkaConsumer kafkaConsumer,
                                                                 TopicPartition topicPartition,
                                                                 long offset) {
  kafkaConsumer.seek(topicPartition, offset);
  ConsumerRecords<byte[], byte[]> records;
  ConsumerRecord<byte[], byte[]> record = null;
  while (record == null) {
    records = kafkaConsumer.poll(100);
    if (!records.isEmpty()) {
      LOG.debug("records.count() = {}", records.count());
      List<ConsumerRecord<byte[], byte[]>> reclist = records.records(topicPartition);
      if (reclist != null && !reclist.isEmpty()) {
        record = reclist.get(0);
        break;
      } else {
        LOG.info("recList is null or empty");
      }
    }
  }
  return record;
}
项目:spring-tutorial    文件:SampleConsumer.java   
public static void main(String[] args) {
    Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
     consumer.subscribe(Arrays.asList("demo-topic1"));
     try {
         while (true) {
             ConsumerRecords<String, String> records = consumer.poll(100);
             for (ConsumerRecord<String, String> record : records)
             System.out.println(record.toString());
         }

     } catch(Exception e) {
         LOGGER.error("Exception occured while consuing messages",e);
     }finally {
         consumer.close();
     }

}
项目:Lagerta    文件:ConsumerOutOfOrderIntegrationTest.java   
private void failOnKafkaPoll() {
    drGrid().compute().broadcast(new ActiveStoreIgniteRunnable() {
        @Inject
        private transient KafkaFactory kafkaFactory;

        @Override protected void runInjected() {
            ((KafkaFactoryForTests)kafkaFactory).substituteConsumers(
                new ConsumerAdapter() {
                    @Override public ConsumerRecords poll(long timeout) {
                        throw new KafkaException("Poll failed");
                    }
                });
        }
    });
    kafkaConsumerFailing = true;
}
项目:kafka-junit    文件:KafkaTestUtils.java   
/**
 * This will consume all records from only the partitions given.
 * @param topic Topic to consume from.
 * @param partitionIds Collection of PartitionIds to consume.
 * @return List of ConsumerRecords consumed.
 */
public List<ConsumerRecord<byte[], byte[]>> consumeAllRecordsFromTopic(final String topic, Collection<Integer> partitionIds) {
    // Create topic Partitions
    List<TopicPartition> topicPartitions = new ArrayList<>();
    for (Integer partitionId: partitionIds) {
        topicPartitions.add(new TopicPartition(topic, partitionId));
    }

    // Connect Consumer
    KafkaConsumer<byte[], byte[]> kafkaConsumer =
        kafkaTestServer.getKafkaConsumer(ByteArrayDeserializer.class, ByteArrayDeserializer.class);

    // Assign topic partitions & seek to head of them
    kafkaConsumer.assign(topicPartitions);
    kafkaConsumer.seekToBeginning(topicPartitions);

    // Pull records from kafka, keep polling until we get nothing back
    final List<ConsumerRecord<byte[], byte[]>> allRecords = new ArrayList<>();
    ConsumerRecords<byte[], byte[]> records;
    do {
        // Grab records from kafka
        records = kafkaConsumer.poll(2000L);
        logger.info("Found {} records in kafka", records.count());

        // Add to our array list
        records.forEach(allRecords::add);

    }
    while (!records.isEmpty());

    // close consumer
    kafkaConsumer.close();

    // return all records
    return allRecords;
}
项目:kafka-webview    文件:RecordFilterInterceptorTest.java   
private ConsumerRecords createConsumerRecords(final int count) {
    final String topic = "MyTopic";
    final int partition = 0;

    final Map<TopicPartition, List<ConsumerRecord>> recordsMap = new HashMap<>();
    final TopicPartition topicPartition = new TopicPartition(topic, partition);
    final List<ConsumerRecord> consumerRecords = new ArrayList<>();

    for (int x = 0; x < count; x++) {
        consumerRecords.add(
            new ConsumerRecord<Object, Object>(topic, partition, x, "Key" + x, "Value" + x)
        );
    }
    recordsMap.put(topicPartition, consumerRecords);

    return new ConsumerRecords(recordsMap);
}
项目:flume-release-1.7.0    文件:KafkaPartitionTestUtil.java   
/**
 * Return a map containing one List of records per partition.
 * This internally creates a Kafka Consumer using the provided consumer properties.
 *
 * @param numPtns
 * @param consumerProperties
 * @return A Map of Partitions(Integer) and the resulting List of messages (byte[]) retrieved
 */
public static Map<Integer, List<byte[]>> retrieveRecordsFromPartitions(String topic, int numPtns,
                                                                 Properties consumerProperties) {

  Map<Integer, List<byte[]>> resultsMap = new HashMap<Integer, List<byte[]>>();
  for (int i = 0; i < numPtns; i++) {
    List<byte[]> partitionResults = new ArrayList<byte[]>();
    resultsMap.put(i, partitionResults);
    KafkaConsumer<String, byte[]> consumer =
        new KafkaConsumer<String, byte[]>(consumerProperties);

    TopicPartition partition = new TopicPartition(topic, i);

    consumer.assign(Arrays.asList(partition));

    ConsumerRecords<String, byte[]> records = consumer.poll(1000);
    for (ConsumerRecord<String, byte[]> record : records) {
      partitionResults.add(record.value());
    }
    consumer.close();
  }
  return resultsMap;
}
项目:Lagerta    文件:Reader.java   
private void pollAndCommitTransactionsBatch() {
    ConsumerRecords<ByteBuffer, ByteBuffer> records = consumer.poll(POLL_TIMEOUT);
    List<TransactionScope> scopes = new ArrayList<>(records.count());
    for (ConsumerRecord<ByteBuffer, ByteBuffer> record : records) {
        TransactionScope transactionScope = serializer.deserialize(record.key());
        if (transactionScope.getScope().isEmpty()) {
            LOGGER.warn("[R] {} polled empty transaction {}", readerId, transactionScope.getTransactionId());
        }
        TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
        buffer.put(transactionScope.getTransactionId(),
                new TransactionData(transactionScope, record.value(), topicPartition, record.offset()));
        scopes.add(transactionScope);
        committedOffsetMap.computeIfAbsent(topicPartition, COMMITTED_OFFSET).notifyRead(record.offset());
    }
    if (!scopes.isEmpty()) {
        scopes.sort(SCOPE_COMPARATOR);
        LOGGER.trace("[R] {} polled {}", readerId, scopes);
    }
    approveAndCommitTransactionsBatch(scopes);
}
项目:scalable-task-scheduler    文件:Consumer.java   
private void keepPolling() throws InterruptedException {
    // keep on polling until shutdown for this thread is called.
    while (!shutdown) {
        ConsumerRecords<K, V> records = consumer.poll(pollingTime);

        // if polling gave no tasks, then sleep this thread for n seconds.
        if (records.isEmpty()) {
            log.debug("NO RECORDS fetched from queue. Putting current THREAD to SLEEP.");
            Thread.sleep(sleepTime);
            continue;
        }

        log.info("Processing a batch of records.");
        if (!processor.process(records)) {
            log.error("ERROR occurred while PROCESSING RECORDS.");
        }
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:WorkerSinkTask.java   
private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {
    for (ConsumerRecord<byte[], byte[]> msg : msgs) {
        log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
        SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key());
        SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value());
        SinkRecord record = new SinkRecord(msg.topic(), msg.partition(),
                keyAndSchema.schema(), keyAndSchema.value(),
                valueAndSchema.schema(), valueAndSchema.value(),
                msg.offset(),
                ConnectUtils.checkAndConvertTimestamp(msg.timestamp()),
                msg.timestampType());
        record = transformationChain.apply(record);
        if (record != null) {
            messageBatch.add(record);
        }
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:IntegrationTestUtils.java   
/**
 * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from
 * are already configured in the consumer).
 *
 * @param topic          Kafka topic to read messages from
 * @param consumer       Kafka consumer
 * @param waitTime       Maximum wait time in milliseconds
 * @param maxMessages    Maximum number of messages to read via the consumer
 * @return The KeyValue elements retrieved via the consumer
 */
private static <K, V> List<KeyValue<K, V>> readKeyValues(final String topic,
    final Consumer<K, V> consumer, final long waitTime, final int maxMessages) {
    final List<KeyValue<K, V>> consumedValues;
    consumer.subscribe(Collections.singletonList(topic));
    final int pollIntervalMs = 100;
    consumedValues = new ArrayList<>();
    int totalPollTimeMs = 0;
    while (totalPollTimeMs < waitTime &&
        continueConsuming(consumedValues.size(), maxMessages)) {
        totalPollTimeMs += pollIntervalMs;
        final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
        for (final ConsumerRecord<K, V> record : records) {
            consumedValues.add(new KeyValue<>(record.key(), record.value()));
        }
    }
    return consumedValues;
}
项目:post-kafka-rewind-consumer-offset    文件:KafkaConsumerFromOffset.java   
public static void main(String[] args) {
    KafkaConsumer<String, String> consumer = createConsumer();
    consumer.subscribe(Arrays.asList(TOPIC));

    boolean flag = true;


    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        if (flag) {
            Set<TopicPartition> assignments = consumer.assignment();
            assignments.forEach(topicPartition ->
                    consumer.seek(
                            topicPartition,
                            90));
            flag = false;
        }


        for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }


}
项目:iotplatform    文件:KafkaMsgReceiver4MQTT.java   
public boolean consume(ConsumerRecords<String, String> records) {
  for (ConsumerRecord<String, String> consumerRecord : records) {
    log.info("topic:{}, key:{}, value:{}", consumerRecord.topic(), consumerRecord.key(), consumerRecord.value());
    String deviceId = consumerRecord.key();
    String payload = consumerRecord.value();
    Optional<Device> deviceOpt = authService.findDeviceById(DeviceId.fromString(deviceId));

    if (deviceOpt.isPresent()) {
      Device device = deviceOpt.get();
      JsonObject root = (JsonObject) new JsonParser().parse(payload);
      int messageId = root.getAsJsonPrimitive("messageId").getAsInt();
      FromDeviceMsg msg = JsonConverter.convertToTelemetry(root.get("d"), messageId);
      BasicToDeviceActorSessionMsg basicToDeviceActorSessionMsg = new BasicToDeviceActorSessionMsg(device,
          new BasicAdaptorToSessionActorMsg(deviceSessionCtx, msg));
      processor.process(basicToDeviceActorSessionMsg);
    }

  }
  return true;
}
项目:kafka-0.11.0.0-src-with-comment    文件:WorkerSinkTaskTest.java   
private void expectPollInitialAssignment() {
    final List<TopicPartition> partitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2);

    sinkTask.open(partitions);
    EasyMock.expectLastCall();

    EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() {
        @Override
        public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
            rebalanceListener.getValue().onPartitionsAssigned(partitions);
            return ConsumerRecords.empty();
        }
    });
    EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
    EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);

    sinkTask.put(Collections.<SinkRecord>emptyList());
    EasyMock.expectLastCall();
}
项目:testcontainers-java-module-confluent-platform    文件:HelloConsumer.java   
public void consume() {
  consumer.subscribe(singletonList("hello_world_topic"));
  ConsumerRecords<String, String> records = consumer.poll(10000);
  for (ConsumerRecord<String, String> record : records) {
    System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(),
                      record.value());
    this.receivedRecords.add(record);
  }
}
项目:EasyTransaction    文件:KafkaEasyTransMsgConsumerImpl.java   
private synchronized void pollAndDispatchMessage() throws InterruptedException {
    // 处理记录过程中,不能修改consumer相关的设定
    // 拉取需要处理的记录
    ConsumerRecords<String, byte[]> allRecords = consumer.poll(10000);

    // 为每个消息都封装成CALLABLE的形式,并进行调用处理
    Iterator<ConsumerRecord<String, byte[]>> iterator = allRecords.iterator();
    List<MessageHandler> listJob = new LinkedList<>();
    while (iterator.hasNext()) {
        listJob.add(new MessageHandler(iterator.next()));
    }
    executeJobs(listJob);
    // 全部调用成功,更新消费坐标
    consumer.commitAsync();
}
项目:java-kafka-client    文件:TracingKafkaConsumer.java   
@Override
public ConsumerRecords<K, V> poll(long timeout) {
  ConsumerRecords<K, V> records = consumer.poll(timeout);

  for (ConsumerRecord<K, V> record : records) {
    buildAndFinishChildSpan(record);
  }

  return records;
}
项目:mapr-music    文件:ChangelogListenerImpl.java   
@Override
public void listen() {

    if (this.onInsert == null && this.onUpdate == null && this.onDelete == null) {
        log.warn("There is no callbacks set. Listening change data records without callbacks has no effect.");
    }

    this.consumer.subscribe(Collections.singletonList(this.changelog));
    log.info("Start listening changelog '{}'", this.changelog);

    new Thread(() -> {
        while (true) {

            ConsumerRecords<byte[], ChangeDataRecord> changeRecords = consumer.poll(KAFKA_CONSUMER_POLL_TIMEOUT);
            for (ConsumerRecord<byte[], ChangeDataRecord> consumerRecord : changeRecords) {

                // The ChangeDataRecord contains all the changes made to a document
                ChangeDataRecord changeDataRecord = consumerRecord.value();
                ChangeDataRecordType recordType = changeDataRecord.getType();
                switch (recordType) {
                    case RECORD_INSERT:
                        handleInsert(changeDataRecord);
                        break;
                    case RECORD_UPDATE:
                        handleUpdate(changeDataRecord);
                        break;
                    case RECORD_DELETE:
                        handleDelete(changeDataRecord);
                        break;
                    default:
                        log.warn("Get record of unknown type '{}'. Ignoring ...", recordType);
                }
            }

        }
    }).start();
}
项目:wechat-mall    文件:OrderConsumer.java   
@Override
public void work() {
    System.out.println("pull");
    ConsumerRecords<Long, byte[]> records = getWorker().poll(100);
    System.out.printf("pulled count = %d %n", records.count());
    log.info("pulled count = %d %n", records.count());
    for (ConsumerRecord<Long, byte[]> record : records) {
        orderHandle(record);
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:Consumer.java   
@Override
public void doWork() {
    consumer.subscribe(Collections.singletonList(this.topic));
    ConsumerRecords<Integer, String> records = consumer.poll(1000);
    for (ConsumerRecord<Integer, String> record : records) {
        System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
    }
}
项目:newblog    文件:KafkaConsumerHandlerImpl.java   
@Override
public void consume(ConsumerRecords<String, String> records) {
    for (ConsumerRecord<String, String> record : records) {
        String msg = record.value();
        logger.info(msg);
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:GlobalStreamThread.java   
void pollAndUpdate() {
    final ConsumerRecords<byte[], byte[]> received = consumer.poll(pollMs);
    for (ConsumerRecord<byte[], byte[]> record : received) {
        stateMaintainer.update(record);
    }
    final long now = time.milliseconds();
    if (flushInterval >= 0 && now >= lastFlush + flushInterval) {
        stateMaintainer.flushState();
        lastFlush = now;
    }
}
项目:iotplatform    文件:BaseKafkaMsgReceiver.java   
public void run() {
  while (true) {
    ConsumerRecords<String, String> records = kafkaConsumer.poll(60000);
    if (records != null && !records.isEmpty()) {
      log.info("records size:{}", records.count());

      boolean success = consume(records);
      if (success) {
        log.info("now commit offset");
        kafkaConsumer.commitSync();
      }
    }
  }
}
项目:open-kilda    文件:KafkaMessageCollector.java   
@Override
public void run() {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
    consumer.subscribe(topics);

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            logger.trace("received message: {} - {}", record.offset(), record.value());
            parseRecordExecutor.execute(new ParseRecord(record));
        }
    }
}
项目:open-kilda    文件:Consumer.java   
@Override
public void run() {
    Properties kprops = new Properties();

    kprops.put("bootstrap.servers", kafka.url());
    kprops.put("group.id", groupId);
    // NB: only "true" is valid; no code to handle "false" yet.
    kprops.put("enable.auto.commit", "true");
    kprops.put("auto.commit.interval.ms", commitInterval);
    kprops.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    kprops.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kprops);

    consumer.subscribe(Arrays.asList(topicName));

    Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("heartbeat.consumer").build())
            .execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        while (true) {
                            logger.trace("==> Poll for records");
                            ConsumerRecords<String, String> records = consumer.poll(1000);
                            logger.trace("==> Number of records: " + records.count());
                            // NB: if you want the beginning ..
                            // consumer.seekToBeginning(records.partitions());
                            for (ConsumerRecord<String, String> record : records)
                                logger.debug("==> ==> offset = {}, key = {}, value = {}", record.offset(),
                                        record.key(), record.value());
                            Thread.sleep(sleepTime);
                        }
                    } catch (InterruptedException e) {
                        logger.info("Heartbeat Consumer Interrupted");
                    } finally {
                        consumer.close();
                    }
                }
            });
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerInterceptorsTest.java   
@Override
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
    onConsumeCount++;
    if (throwExceptionOnConsume)
        throw new KafkaException("Injected exception in FilterConsumerInterceptor.onConsume.");

    // filters out topic/partitions with partition == FILTER_PARTITION
    Map<TopicPartition, List<ConsumerRecord<K, V>>> recordMap = new HashMap<>();
    for (TopicPartition tp : records.partitions()) {
        if (tp.partition() != filterPartition)
            recordMap.put(tp, records.records(tp));
    }
    return new ConsumerRecords<K, V>(recordMap);
}
项目:Lagerta    文件:ConsumerProxyRetry.java   
@SuppressWarnings("unchecked")
@Override
public ConsumerRecords<K, V> poll(final long timeout) {
    return Retries.tryMe(new Callable<ConsumerRecords<K, V>>() {
        @Override
        public ConsumerRecords<K, V> call() throws Exception {
            return inner.poll(timeout);
        }
    }, strategy(), EMPTY_POLL);
}
项目:kafka-0.11.0.0-src-with-comment    文件:EosTestDriver.java   
private static Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> getOutputRecords(final KafkaConsumer<byte[], byte[]> consumer,
                                                                                                       final Map<TopicPartition, Long> committedOffsets) {
    final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition = new HashMap<>();

    long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
    boolean allRecordsReceived = false;
    while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) {
        final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(500);

        for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) {
            maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
            addRecord(record, recordPerTopicPerPartition);
        }

        if (receivedRecords.count() > 0) {
            allRecordsReceived =
                receivedAllRecords(
                    recordPerTopicPerPartition.get("data"),
                    recordPerTopicPerPartition.get("echo"),
                    committedOffsets);
        }
    }

    if (!allRecordsReceived) {
        throw new RuntimeException("FAIL: did not receive all records after 30 sec idle time.");
    }

    return recordPerTopicPerPartition;
}
项目:DBus    文件:DataTableResource.java   
@GET
@Path("/readKafkaTopic")
public Response readKafkaTopic(Map<String, Object > map) {
    try {
        Properties properties = PropertiesUtils.getProps("consumer.properties");
        properties.setProperty("client.id","readKafkaTopic");
        properties.setProperty("group.id","readKafkaTopic");
        //properties.setProperty("bootstrap.servers", "localhost:9092");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        String topic = map.get("topic").toString();
        //System.out.println("topic="+topic);
        TopicPartition topicPartition = new TopicPartition(topic, 0);
        List<TopicPartition> topics = Arrays.asList(topicPartition);
        consumer.assign(topics);
        consumer.seekToEnd(topics);
        long current = consumer.position(topicPartition);
        long end = current;
        current -= 1000;
        if(current < 0) current = 0;
        consumer.seek(topicPartition, current);
        List<String> result = new ArrayList<>();
        while (current < end) {
            //System.out.println("topic position = "+current);
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                result.add(record.value());
                //System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            current = consumer.position(topicPartition);
        }
        consumer.close();
        return Response.ok().entity(result).build();
    } catch (Exception e) {
        logger.error("Error encountered while readKafkaTopic with parameter:{}", JSON.toJSONString(map), e);
        return Response.status(204).entity(new Result(-1, e.getMessage())).build();
    }
}