Java 类org.apache.kafka.clients.consumer.OffsetAndTimestamp 实例源码

项目:kafka-webview    文件:WebKafkaConsumer.java   
/**
 * Seek consumer to specific timestamp
 * @param timestamp Unix timestamp in milliseconds to seek to.
 */
public ConsumerState seek(final long timestamp) {
    // Find offsets for timestamp
    final Map<TopicPartition, Long> timestampMap = new HashMap<>();
    for (final TopicPartition topicPartition: getAllPartitions()) {
        timestampMap.put(topicPartition, timestamp);
    }
    final Map<TopicPartition, OffsetAndTimestamp> offsetMap = kafkaConsumer.offsetsForTimes(timestampMap);

    // Build map of partition => offset
    final Map<Integer, Long> partitionOffsetMap = new HashMap<>();
    for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry: offsetMap.entrySet()) {
        partitionOffsetMap.put(entry.getKey().partition(), entry.getValue().offset());
    }

    // Now lets seek to those offsets
    return seek(partitionOffsetMap);
}
项目:kafka-webview    文件:SocketKafkaConsumer.java   
/**
 * Seek consumer to specific timestamp
 * @param timestamp Unix timestamp in milliseconds to seek to.
 */
private void seekToTimestamp(final long timestamp) {
    // Find offsets for timestamp
    final Map<TopicPartition, Long> timestampMap = new HashMap<>();
    for (final TopicPartition topicPartition: getAllPartitions()) {
        timestampMap.put(topicPartition, timestamp);
    }
    final Map<TopicPartition, OffsetAndTimestamp> offsetMap = kafkaConsumer.offsetsForTimes(timestampMap);

    // Build map of partition => offset
    final Map<TopicPartition, Long> partitionOffsetMap = new HashMap<>();
    for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry: offsetMap.entrySet()) {
        partitionOffsetMap.put(entry.getKey(), entry.getValue().offset());
    }

    // Now lets seek to those offsets
    seek(partitionOffsetMap);
}
项目:Lagerta    文件:ReconcilerImpl.java   
private void seekToMissingTransactions(Map<TopicPartition, List<Long>> txByPartition) {
    Map<TopicPartition, Long> timestamps = txByPartition.entrySet().stream()
            .collect(Collectors.toMap(
                    Map.Entry::getKey,
                    entry -> Collections.min(entry.getValue())
            ));
    Map<TopicPartition, OffsetAndTimestamp> foundOffsets = consumer.offsetsForTimes(timestamps);
    Map<TopicPartition, OffsetAndMetadata> toCommit = foundOffsets.entrySet().stream()
            .collect(Collectors.toMap(
                    Map.Entry::getKey,
                    entry -> {
                        long offset = entry.getValue() != null? entry.getValue().offset() : 0;
                        return new OffsetAndMetadata(offset);
                    }
            ));
    consumer.commitSync(toCommit);
}
项目:Lagerta    文件:PublisherKafkaService.java   
public void seekToTransaction(DataRecoveryConfig config, long transactionId, KafkaFactory kafkaFactory,
    String groupId) {
    String topic = config.getLocalTopic();
    Properties consumerProperties = PropertiesUtil.propertiesForGroup(config.getConsumerConfig(), groupId);

    try (Consumer<ByteBuffer, ByteBuffer> consumer = kafkaFactory.consumer(consumerProperties)) {
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        Map<TopicPartition, Long> seekMap = new HashMap<>(partitionInfos.size());

        for (PartitionInfo partitionInfo : partitionInfos) {
            seekMap.put(new TopicPartition(topic, partitionInfo.partition()), transactionId);
        }
        consumer.assign(seekMap.keySet());
        Map<TopicPartition, OffsetAndTimestamp> foundOffsets = consumer.offsetsForTimes(seekMap);
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();

        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : foundOffsets.entrySet()) {
            if (entry.getValue() != null) {
                offsetsToCommit.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().offset()));
            }
        }
        consumer.commitSync(offsetsToCommit);
    }
}
项目:post-kafka-rewind-consumer-offset    文件:KafkaConsumerFromTime.java   
public static void main(String[] args) {
    KafkaConsumer<String, String> consumer = createConsumer();
    consumer.subscribe(Arrays.asList(TOPIC));

    boolean flag = true;

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);

        if (flag) {
            Set<TopicPartition> assignments = consumer.assignment();
            Map<TopicPartition, Long> query = new HashMap<>();
            for (TopicPartition topicPartition : assignments) {
                query.put(
                        topicPartition,
                        Instant.now().minus(10, MINUTES).toEpochMilli());
            }

            Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);

            result.entrySet()
                    .stream()
                    .forEach(entry ->
                            consumer.seek(
                                    entry.getKey(),
                                    Optional.ofNullable(entry.getValue())
                                            .map(OffsetAndTimestamp::offset)
                                            .orElse(new Long(0))));

            flag = false;
        }


        for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }


}
项目:Lagerta    文件:ConsumerProxyRetry.java   
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
        final Map<TopicPartition, Long> timestampsToSearch) {
    return Retries.tryMe(new Callable<Map<TopicPartition, OffsetAndTimestamp>>() {
        @Override
        public Map<TopicPartition, OffsetAndTimestamp> call() throws Exception {
            return inner.offsetsForTimes(timestampsToSearch);
        }
    }, strategy());
}
项目:kafka-0.11.0.0-src-with-comment    文件:Fetcher.java   
public Map<TopicPartition, OffsetAndTimestamp> getOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch,
                                                                 long timeout) {
    Map<TopicPartition, OffsetData> offsetData = retrieveOffsetsByTimes(timestampsToSearch, timeout, true);
    HashMap<TopicPartition, OffsetAndTimestamp> offsetsByTimes = new HashMap<>(offsetData.size());
    for (Map.Entry<TopicPartition, OffsetData> entry : offsetData.entrySet()) {
        OffsetData data = entry.getValue();
        if (data == null)
            offsetsByTimes.put(entry.getKey(), null);
        else
            offsetsByTimes.put(entry.getKey(), new OffsetAndTimestamp(data.offset, data.timestamp));
    }
    return offsetsByTimes;
}
项目:java-kafka-client    文件:TracingKafkaConsumer.java   
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
    Map<TopicPartition, Long> timestampsToSearch) {
  return consumer.offsetsForTimes(timestampsToSearch);
}
项目:Lagerta    文件:ConsumerForTests.java   
@Override public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
    Map<TopicPartition, Long> timestampsToSearch) {
    return activeConsumer().offsetsForTimes(timestampsToSearch);
}
项目:Lagerta    文件:ConsumerAdapter.java   
@Override public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
    Map<TopicPartition, Long> timestampsToSearch) {
    return Collections.emptyMap();
}
项目:likafka-clients    文件:LiKafkaConsumerImpl.java   
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
  return _kafkaConsumer.offsetsForTimes(timestampsToSearch);
}
项目:vertx-kafka-client    文件:ConsumerTestBase.java   
@Test
public void testOffsetsForTimes(TestContext ctx) throws Exception {
  String topicName = "testOffsetsForTimes";
  String consumerId = topicName;
  Async batch = ctx.async();
  AtomicInteger index = new AtomicInteger();
  int numMessages = 1000;
  long beforeProduce = System.currentTimeMillis();
  kafkaCluster.useTo().produceStrings(numMessages, batch::complete, () ->
    new ProducerRecord<>(topicName, 0, "key-" + index.get(), "value-" + index.getAndIncrement()));
  batch.awaitSuccess(20000);
  long produceDuration = System.currentTimeMillis() - beforeProduce;
  Properties config = kafkaCluster.useTo().getConsumerProperties(consumerId, consumerId, OffsetResetStrategy.EARLIEST);
  config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

  Context context = vertx.getOrCreateContext();
  consumer = createConsumer(context, config);

  consumer.exceptionHandler(ctx::fail);

  TopicPartition topicPartition = new TopicPartition(topicName, 0);

  // Test contains two sub-tests
  Async done = ctx.async(2);
  consumer.handler(handler -> {
    // nothing to do in this test
  });

  consumer.subscribe(Collections.singleton(topicName), ctx.asyncAssertSuccess(subscribeRes -> {
    // search by timestamp
    // take timestamp BEFORE start of ingestion and add half of the ingestion duration to it
    long searchTimestamp = beforeProduce + (produceDuration / 2);
    consumer.offsetsForTimes(Collections.singletonMap(topicPartition, searchTimestamp), ctx.asyncAssertSuccess(offsetAndTimestamps -> {
      OffsetAndTimestamp offsetAndTimestamp = offsetAndTimestamps.get(topicPartition);
      ctx.assertEquals(1, offsetAndTimestamps.size());
      // Offset must be somewhere between beginningOffset and endOffset
      ctx.assertTrue(offsetAndTimestamp.offset() >= 0L && offsetAndTimestamp.offset() <= (long) numMessages,
        "Invalid offset 0 <= " + offsetAndTimestamp.offset() + " <= " + numMessages);
      // Timestamp of returned offset must be at >= searchTimestamp
      ctx.assertTrue(offsetAndTimestamp.timestamp() >= searchTimestamp);
      done.countDown();
    }));

    consumer.offsetsForTimes(topicPartition, searchTimestamp, ctx.asyncAssertSuccess(offsetAndTimestamp -> {
      // Offset must be somewhere between beginningOffset and endOffset
      ctx.assertTrue(offsetAndTimestamp.offset() >= 0L && offsetAndTimestamp.offset() <= (long) numMessages,
        "Invalid offset 0 <= " + offsetAndTimestamp.offset() + " <= " + numMessages);
      // Timestamp of returned offset must be at >= searchTimestamp
      ctx.assertTrue(offsetAndTimestamp.timestamp() >= searchTimestamp);
      done.countDown();
    }));
  }));
}
项目:li-apache-kafka-clients    文件:LiKafkaConsumerImpl.java   
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
  return _kafkaConsumer.offsetsForTimes(timestampsToSearch);
}
项目:vertx-kafka-client    文件:KafkaReadStream.java   
/**
 * Look up the offsets for the given partitions by timestamp.
 * @param topicPartitionTimestamps A map with pairs of (TopicPartition, Timestamp).
 * @param handler handler called on operation completed
 */
void offsetsForTimes(Map<TopicPartition, Long> topicPartitionTimestamps, Handler<AsyncResult<Map<TopicPartition, OffsetAndTimestamp>>> handler);
项目:vertx-kafka-client    文件:KafkaReadStream.java   
/**
 * * Look up the offset for the given partition by timestamp.
 * @param topicPartition Partition to query.
 * @param timestamp Timestamp used to determine the offset.
 * @param handler handler called on operation completed
 */
void offsetsForTimes(TopicPartition topicPartition, long timestamp, Handler<AsyncResult<OffsetAndTimestamp>> handler);