/** * Seek consumer to specific timestamp * @param timestamp Unix timestamp in milliseconds to seek to. */ public ConsumerState seek(final long timestamp) { // Find offsets for timestamp final Map<TopicPartition, Long> timestampMap = new HashMap<>(); for (final TopicPartition topicPartition: getAllPartitions()) { timestampMap.put(topicPartition, timestamp); } final Map<TopicPartition, OffsetAndTimestamp> offsetMap = kafkaConsumer.offsetsForTimes(timestampMap); // Build map of partition => offset final Map<Integer, Long> partitionOffsetMap = new HashMap<>(); for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry: offsetMap.entrySet()) { partitionOffsetMap.put(entry.getKey().partition(), entry.getValue().offset()); } // Now lets seek to those offsets return seek(partitionOffsetMap); }
/** * Seek consumer to specific timestamp * @param timestamp Unix timestamp in milliseconds to seek to. */ private void seekToTimestamp(final long timestamp) { // Find offsets for timestamp final Map<TopicPartition, Long> timestampMap = new HashMap<>(); for (final TopicPartition topicPartition: getAllPartitions()) { timestampMap.put(topicPartition, timestamp); } final Map<TopicPartition, OffsetAndTimestamp> offsetMap = kafkaConsumer.offsetsForTimes(timestampMap); // Build map of partition => offset final Map<TopicPartition, Long> partitionOffsetMap = new HashMap<>(); for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry: offsetMap.entrySet()) { partitionOffsetMap.put(entry.getKey(), entry.getValue().offset()); } // Now lets seek to those offsets seek(partitionOffsetMap); }
private void seekToMissingTransactions(Map<TopicPartition, List<Long>> txByPartition) { Map<TopicPartition, Long> timestamps = txByPartition.entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, entry -> Collections.min(entry.getValue()) )); Map<TopicPartition, OffsetAndTimestamp> foundOffsets = consumer.offsetsForTimes(timestamps); Map<TopicPartition, OffsetAndMetadata> toCommit = foundOffsets.entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, entry -> { long offset = entry.getValue() != null? entry.getValue().offset() : 0; return new OffsetAndMetadata(offset); } )); consumer.commitSync(toCommit); }
public void seekToTransaction(DataRecoveryConfig config, long transactionId, KafkaFactory kafkaFactory, String groupId) { String topic = config.getLocalTopic(); Properties consumerProperties = PropertiesUtil.propertiesForGroup(config.getConsumerConfig(), groupId); try (Consumer<ByteBuffer, ByteBuffer> consumer = kafkaFactory.consumer(consumerProperties)) { List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); Map<TopicPartition, Long> seekMap = new HashMap<>(partitionInfos.size()); for (PartitionInfo partitionInfo : partitionInfos) { seekMap.put(new TopicPartition(topic, partitionInfo.partition()), transactionId); } consumer.assign(seekMap.keySet()); Map<TopicPartition, OffsetAndTimestamp> foundOffsets = consumer.offsetsForTimes(seekMap); Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(); for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : foundOffsets.entrySet()) { if (entry.getValue() != null) { offsetsToCommit.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().offset())); } } consumer.commitSync(offsetsToCommit); } }
public static void main(String[] args) { KafkaConsumer<String, String> consumer = createConsumer(); consumer.subscribe(Arrays.asList(TOPIC)); boolean flag = true; while (true) { ConsumerRecords<String, String> records = consumer.poll(100); if (flag) { Set<TopicPartition> assignments = consumer.assignment(); Map<TopicPartition, Long> query = new HashMap<>(); for (TopicPartition topicPartition : assignments) { query.put( topicPartition, Instant.now().minus(10, MINUTES).toEpochMilli()); } Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query); result.entrySet() .stream() .forEach(entry -> consumer.seek( entry.getKey(), Optional.ofNullable(entry.getValue()) .map(OffsetAndTimestamp::offset) .orElse(new Long(0)))); flag = false; } for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
@Override public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes( final Map<TopicPartition, Long> timestampsToSearch) { return Retries.tryMe(new Callable<Map<TopicPartition, OffsetAndTimestamp>>() { @Override public Map<TopicPartition, OffsetAndTimestamp> call() throws Exception { return inner.offsetsForTimes(timestampsToSearch); } }, strategy()); }
public Map<TopicPartition, OffsetAndTimestamp> getOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch, long timeout) { Map<TopicPartition, OffsetData> offsetData = retrieveOffsetsByTimes(timestampsToSearch, timeout, true); HashMap<TopicPartition, OffsetAndTimestamp> offsetsByTimes = new HashMap<>(offsetData.size()); for (Map.Entry<TopicPartition, OffsetData> entry : offsetData.entrySet()) { OffsetData data = entry.getValue(); if (data == null) offsetsByTimes.put(entry.getKey(), null); else offsetsByTimes.put(entry.getKey(), new OffsetAndTimestamp(data.offset, data.timestamp)); } return offsetsByTimes; }
@Override public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes( Map<TopicPartition, Long> timestampsToSearch) { return consumer.offsetsForTimes(timestampsToSearch); }
@Override public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes( Map<TopicPartition, Long> timestampsToSearch) { return activeConsumer().offsetsForTimes(timestampsToSearch); }
@Override public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes( Map<TopicPartition, Long> timestampsToSearch) { return Collections.emptyMap(); }
@Override public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) { return _kafkaConsumer.offsetsForTimes(timestampsToSearch); }
@Test public void testOffsetsForTimes(TestContext ctx) throws Exception { String topicName = "testOffsetsForTimes"; String consumerId = topicName; Async batch = ctx.async(); AtomicInteger index = new AtomicInteger(); int numMessages = 1000; long beforeProduce = System.currentTimeMillis(); kafkaCluster.useTo().produceStrings(numMessages, batch::complete, () -> new ProducerRecord<>(topicName, 0, "key-" + index.get(), "value-" + index.getAndIncrement())); batch.awaitSuccess(20000); long produceDuration = System.currentTimeMillis() - beforeProduce; Properties config = kafkaCluster.useTo().getConsumerProperties(consumerId, consumerId, OffsetResetStrategy.EARLIEST); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); Context context = vertx.getOrCreateContext(); consumer = createConsumer(context, config); consumer.exceptionHandler(ctx::fail); TopicPartition topicPartition = new TopicPartition(topicName, 0); // Test contains two sub-tests Async done = ctx.async(2); consumer.handler(handler -> { // nothing to do in this test }); consumer.subscribe(Collections.singleton(topicName), ctx.asyncAssertSuccess(subscribeRes -> { // search by timestamp // take timestamp BEFORE start of ingestion and add half of the ingestion duration to it long searchTimestamp = beforeProduce + (produceDuration / 2); consumer.offsetsForTimes(Collections.singletonMap(topicPartition, searchTimestamp), ctx.asyncAssertSuccess(offsetAndTimestamps -> { OffsetAndTimestamp offsetAndTimestamp = offsetAndTimestamps.get(topicPartition); ctx.assertEquals(1, offsetAndTimestamps.size()); // Offset must be somewhere between beginningOffset and endOffset ctx.assertTrue(offsetAndTimestamp.offset() >= 0L && offsetAndTimestamp.offset() <= (long) numMessages, "Invalid offset 0 <= " + offsetAndTimestamp.offset() + " <= " + numMessages); // Timestamp of returned offset must be at >= searchTimestamp ctx.assertTrue(offsetAndTimestamp.timestamp() >= searchTimestamp); done.countDown(); })); consumer.offsetsForTimes(topicPartition, searchTimestamp, ctx.asyncAssertSuccess(offsetAndTimestamp -> { // Offset must be somewhere between beginningOffset and endOffset ctx.assertTrue(offsetAndTimestamp.offset() >= 0L && offsetAndTimestamp.offset() <= (long) numMessages, "Invalid offset 0 <= " + offsetAndTimestamp.offset() + " <= " + numMessages); // Timestamp of returned offset must be at >= searchTimestamp ctx.assertTrue(offsetAndTimestamp.timestamp() >= searchTimestamp); done.countDown(); })); })); }
/** * Look up the offsets for the given partitions by timestamp. * @param topicPartitionTimestamps A map with pairs of (TopicPartition, Timestamp). * @param handler handler called on operation completed */ void offsetsForTimes(Map<TopicPartition, Long> topicPartitionTimestamps, Handler<AsyncResult<Map<TopicPartition, OffsetAndTimestamp>>> handler);
/** * * Look up the offset for the given partition by timestamp. * @param topicPartition Partition to query. * @param timestamp Timestamp used to determine the offset. * @param handler handler called on operation completed */ void offsetsForTimes(TopicPartition topicPartition, long timestamp, Handler<AsyncResult<OffsetAndTimestamp>> handler);