public void commitOffset(long offset) { OffsetAndMetadata offsetMeta = new OffsetAndMetadata(offset + 1, ""); Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>(); offsetMap.put(statTopicPartition, offsetMeta); OffsetCommitCallback callback = new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { if (e != null) { LOG.warn(String.format("CommitAsync failed!!!! offset %d, Topic %s", offset, statTopic)); } else { ; //do nothing when OK; //logger.info(String.format("OK. offset %d, Topic %s", record.offset(), record.topic())); } } }; consumer.commitAsync(offsetMap, callback); }
@Override public void commitAsync(final OffsetCommitCallback callback) { Retries.tryMe(new IgniteInClosure<RetryRunnableAsyncOnCallback>() { @Override public void apply(final RetryRunnableAsyncOnCallback retryRunnableAsyncOnCallback) { inner.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { callback.onComplete(offsets, exception); if (exception != null) { retryRunnableAsyncOnCallback.retry(exception); } } }); } }, strategy()); }
private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) { this.subscriptions.needRefreshCommits(); //rebalance RequestFuture<Void> future = sendOffsetCommitRequest(offsets); final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback; future.addListener(new RequestFutureListener<Void>() { @Override public void onSuccess(Void value) { if (interceptors != null) interceptors.onCommit(offsets); completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null)); } @Override public void onFailure(RuntimeException e) { Exception commitException = e; if (e instanceof RetriableException) commitException = RetriableCommitFailedException.withUnderlyingMessage(e.getMessage()); completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException)); } }); }
private void doAutoCommitOffsetsAsync() { Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed(); log.debug("Sending asynchronous auto-commit of offsets {} for group {}", allConsumedOffsets, groupId); //rebalance commitOffsetsAsync(allConsumedOffsets, new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { log.warn("Auto-commit of offsets {} failed for group {}: {}", offsets, groupId, exception.getMessage()); if (exception instanceof RetriableException) nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline); } else { log.debug("Completed auto-commit of offsets {} for group {}", offsets, groupId); } } }); }
static void commitOffsets(Consumer<?, ?> consumer, Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, boolean async) { if (offsetsToCommit == null || offsetsToCommit.isEmpty()) { return; } OffsetCommitCallback callback = (offsets, exception) -> { if (exception != null) { LOG.warn("Unable to commit offsets for {} TopicPartition(s) {}: {}", offsets.size(), offsetsAsString(offsets), exception.getMessage(), exception); } else { LOG.debug("Successfully committed offset(s) for {} TopicPartition(s): {}", offsets.size(), offsetsAsString(offsets)); } }; if (async) { consumer.commitAsync(offsetsToCommit, callback); } else { consumer.commitSync(offsetsToCommit); } }
public void run(final long now) { if (coordinatorUnknown()) { log.debug("Cannot auto-commit offsets for group {} since the coordinator is unknown", groupId); reschedule(now + retryBackoffMs); return; } if (needRejoin()) { // skip the commit when we're rejoining since we'll commit offsets synchronously // before the revocation callback is invoked reschedule(now + interval); return; } commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception == null) { reschedule(now + interval); } else { log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage()); reschedule(now + interval); } } }); }
@Override public void commit(Map<TopicPartition, OffsetAndMetadata> offsets, Handler<AsyncResult<Map<TopicPartition, OffsetAndMetadata>>> completionHandler) { this.submitTask((consumer, future) -> { OffsetCommitCallback callback = (result, exception) -> { if (future != null) { if (exception != null) { future.fail(exception); } else { future.complete(result); } } }; if (offsets == null) { consumer.commitAsync(callback); } else { consumer.commitAsync(offsets, callback); } }, completionHandler); }
private Capture<OffsetCommitCallback> expectOffsetCommit(final long expectedMessages, final RuntimeException error, final Exception consumerCommitError, final long consumerCommitDelayMs, final boolean invokeCallback) throws Exception { final long finalOffset = FIRST_OFFSET + expectedMessages; // All assigned partitions will have offsets committed, but we've only processed messages/updated offsets for one final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(); offsetsToCommit.put(TOPIC_PARTITION, new OffsetAndMetadata(finalOffset)); offsetsToCommit.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); offsetsToCommit.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET)); sinkTask.preCommit(offsetsToCommit); IExpectationSetters<Object> expectation = PowerMock.expectLastCall(); if (error != null) { expectation.andThrow(error).once(); return null; } else { expectation.andReturn(offsetsToCommit); } final Capture<OffsetCommitCallback> capturedCallback = EasyMock.newCapture(); consumer.commitAsync(EasyMock.eq(offsetsToCommit), EasyMock.capture(capturedCallback)); PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { @Override public Object answer() throws Throwable { time.sleep(consumerCommitDelayMs); if (invokeCallback) capturedCallback.getValue().onComplete(offsetsToCommit, consumerCommitError); return null; } }); return capturedCallback; }
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) { invokeCompletedOffsetCommitCallbacks(); // 没有找到可用的Node if (!coordinatorUnknown()) { //rebalance doCommitOffsetsAsync(offsets, callback); } else { // we don't know the current coordinator, so try to find it and then send the commit // or fail (we don't want recursive retries which can cause offset commits to arrive // out of order). Note that there may be multiple offset commits chained to the same // coordinator lookup request. This is fine because the listeners will be invoked in // the same order that they were added. Note also that AbstractCoordinator prevents // multiple concurrent coordinator lookup requests. pendingAsyncCommits.incrementAndGet(); lookupCoordinator().addListener(new RequestFutureListener<Void>() { @Override public void onSuccess(Void value) { pendingAsyncCommits.decrementAndGet(); doCommitOffsetsAsync(offsets, callback); } @Override public void onFailure(RuntimeException e) { pendingAsyncCommits.decrementAndGet(); completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, RetriableCommitFailedException.withUnderlyingMessage(e.getMessage()))); } }); } // ensure the commit has a chance to be transmitted (without blocking on its completion). // Note that commits are treated as heartbeats by the coordinator, so there is no need to // explicitly allow heartbeats through delayed task execution. client.pollNoWakeup(); }
private OffsetCommitCallback callback(final AtomicBoolean success) { return new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception == null) success.set(true); } }; }
/** * Use the supplied function to asynchronously consume messages from the cluster. * * @param groupId the name of the group; may not be null * @param clientId the name of the client; may not be null * @param autoOffsetReset how to pick a starting offset when there is no initial offset in ZooKeeper or if an offset is * out of range; may be null for the default to be used * @param keyDeserializer the deserializer for the keys; may not be null * @param valueDeserializer the deserializer for the values; may not be null * @param continuation the function that determines if the consumer should continue; may not be null * @param offsetCommitCallback the callback that should be used after committing offsets; may be null if offsets are * not to be committed * @param completion the function to call when the consumer terminates; may be null * @param topics the set of topics to consume; may not be null or empty * @param consumerFunction the function to consume the messages; may not be null */ public <K, V> void consume(String groupId, String clientId, OffsetResetStrategy autoOffsetReset, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, BooleanSupplier continuation, OffsetCommitCallback offsetCommitCallback, Runnable completion, Collection<String> topics, java.util.function.Consumer<ConsumerRecord<K, V>> consumerFunction) { Properties props = getConsumerProperties(groupId, clientId, autoOffsetReset); Thread t = new Thread(() -> { LOGGER.info("Starting consumer {} to read messages", clientId); try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(props, keyDeserializer, valueDeserializer)) { consumer.subscribe(new ArrayList<>(topics)); while (continuation.getAsBoolean()) { consumer.poll(10).forEach(record -> { LOGGER.info("Consumer {}: consuming message {}", clientId, record); consumerFunction.accept(record); if (offsetCommitCallback != null) { consumer.commitAsync(offsetCommitCallback); } }); } } finally { if (completion != null) completion.run(); LOGGER.debug("Stopping consumer {}", clientId); } }); t.setName(clientId + "-thread"); t.start(); }
public void consumeIntegers(BooleanSupplier continuation, Runnable completion, Collection<String> topics, Consumer<ConsumerRecord<String, Integer>> consumerFunction) { Deserializer<String> keyDes = new StringDeserializer(); Deserializer<Integer> valDes = new IntegerDeserializer(); String randomId = UUID.randomUUID().toString(); OffsetCommitCallback offsetCommitCallback = null; this.consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, (OffsetCommitCallback) offsetCommitCallback, completion, topics, consumerFunction); }
private void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets, boolean ignoreConsumerHighWatermark, OffsetCommitCallback callback, boolean sync) { Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = getOffsetsToCommit(offsets, ignoreConsumerHighWatermark); if (sync) { LOG.trace("Committing offsets synchronously: {}", offsetsToCommit); _kafkaConsumer.commitSync(offsetsToCommit); } else { LOG.trace("Committing offsets asynchronously: {}", offsetsToCommit); _offsetCommitCallback.setUserCallback(callback); _kafkaConsumer.commitAsync(offsetsToCommit, _offsetCommitCallback); } }
@Test public void testOffsetCommitCallback() { String topic = "testOffsetCommitCallback"; produceSyntheticMessages(topic); Properties props = new Properties(); // All the consumers should have the same group id. props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testOffsetCommitCallback"); // Make sure we start to consume from the beginning. props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Only fetch one record at a time. props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1"); try (LiKafkaConsumer<String, String> consumer = createConsumer(props)) { TopicPartition tp = new TopicPartition(topic, 0); consumer.assign(Collections.singleton(tp)); consumer.poll(5000); // M2 final AtomicBoolean offsetCommitted = new AtomicBoolean(false); consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap, Exception e) { assertEquals(topicPartitionOffsetAndMetadataMap.get(tp), new OffsetAndMetadata(3, ""), "The committed user offset should be 3"); offsetCommitted.set(true); } }); while (!offsetCommitted.get()) { consumer.poll(10); } assertEquals(consumer.committed(tp), new OffsetAndMetadata(3, ""), "The committed user offset should be 3"); assertEquals(consumer.committedSafeOffset(tp).longValue(), 0, "The committed actual offset should be 0"); } }
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) { this.subscriptions.needRefreshCommits(); RequestFuture<Void> future = sendOffsetCommitRequest(offsets); final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback; future.addListener(new RequestFutureListener<Void>() { @Override public void onSuccess(Void value) { if (interceptors != null) interceptors.onCommit(offsets); cb.onComplete(offsets, null); } @Override public void onFailure(RuntimeException e) { if (e instanceof RetriableException) { cb.onComplete(offsets, new RetriableCommitFailedException("Commit offsets failed with retriable exception. You should retry committing offsets.", e)); } else { cb.onComplete(offsets, e); } } }); // ensure the commit has a chance to be transmitted (without blocking on its completion). // Note that commits are treated as heartbeats by the coordinator, so there is no need to // explicitly allow heartbeats through delayed task execution. client.pollNoWakeup(); }
private OffsetAndMetadata commitAndRetrieveOffsets( LiKafkaConsumer<String, String> consumer, TopicPartition tp, Map<TopicPartition, OffsetAndMetadata> offsetMap) throws Exception { final AtomicBoolean callbackFired = new AtomicBoolean(false); final AtomicReference<Exception> offsetCommitIssue = new AtomicReference<>(null); OffsetAndMetadata committed = null; long now = System.currentTimeMillis(); long deadline = now + TimeUnit.MINUTES.toMillis(1); while (System.currentTimeMillis() < deadline) { //call commitAsync, wait for a NON-NULL return value (see https://issues.apache.org/jira/browse/KAFKA-6183) OffsetCommitCallback commitCallback = new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap, Exception e) { if (e != null) { offsetCommitIssue.set(e); } callbackFired.set(true); } }; if (offsetMap != null) { consumer.commitAsync(offsetMap, commitCallback); } else { consumer.commitAsync(commitCallback); } while (!callbackFired.get()) { consumer.poll(20); } Assert.assertNull(offsetCommitIssue.get(), "offset commit failed"); committed = consumer.committed(tp); if (committed != null) { break; } Thread.sleep(100); } assertNotNull(committed, "unable to retrieve committed offsets within timeout"); return committed; }
private void commitOffsets(ConsumerWorker worker) { KafkaConsumer<String, Serializable> consumer = worker.consumer; if(worker.isCommiting())return; worker.setCommiting(true); try { if(worker.uncommittedOffsetMap.isEmpty())return ; logger.debug("committing the offsets : {}", worker.uncommittedOffsetMap); consumer.commitAsync(worker.uncommittedOffsetMap, new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { // worker.setCommiting(false); if(exception == null){ worker.resetUncommittedOffsetMap(); logger.debug("committed the offsets : {}",offsets); }else{ logger.error("committ the offsets error",exception); } } }); } finally { } }
@Override public void commitAsync(OffsetCommitCallback callback) { consumer.commitAsync(callback); }
@Override public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) { consumer.commitAsync(offsets, callback); }
@Override public void commitAsync(OffsetCommitCallback callback) { activeConsumer().commitAsync(callback); }
@Override public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) { activeConsumer().commitAsync(offsets, callback); }
@Override public void commitAsync(OffsetCommitCallback callback) { }
@Override public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) { }
@Test public void testRequestCommit() throws Exception { expectInitializeTask(); expectPollInitialAssignment(); expectConsumerPoll(1); expectConversionAndTransformation(1); sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject()); EasyMock.expectLastCall(); final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); sinkTask.preCommit(offsets); EasyMock.expectLastCall().andReturn(offsets); final Capture<OffsetCommitCallback> callback = EasyMock.newCapture(); consumer.commitAsync(EasyMock.eq(offsets), EasyMock.capture(callback)); EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() { @Override public Void answer() throws Throwable { callback.getValue().onComplete(offsets, null); return null; } }); expectConsumerPoll(0); sinkTask.put(Collections.<SinkRecord>emptyList()); EasyMock.expectLastCall(); PowerMock.replayAll(); workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); workerTask.iteration(); // initial assignment workerTask.iteration(); // first record delivered sinkTaskContext.getValue().requestCommit(); assertTrue(sinkTaskContext.getValue().isCommitRequested()); assertNotEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets")); workerTask.iteration(); // triggers the commit assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been cleared assertEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets")); assertEquals(0, workerTask.commitFailures()); PowerMock.verifyAll(); }
@Test public void testPreCommit() throws Exception { expectInitializeTask(); // iter 1 expectPollInitialAssignment(); // iter 2 expectConsumerPoll(2); expectConversionAndTransformation(2); sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject()); EasyMock.expectLastCall(); final Map<TopicPartition, OffsetAndMetadata> workerStartingOffsets = new HashMap<>(); workerStartingOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET)); workerStartingOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>(); workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2)); workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); final Map<TopicPartition, OffsetAndMetadata> taskOffsets = new HashMap<>(); taskOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); // act like FIRST_OFFSET+2 has not yet been flushed by the task taskOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET + 1)); // should be ignored because > current offset taskOffsets.put(new TopicPartition(TOPIC, 3), new OffsetAndMetadata(FIRST_OFFSET)); // should be ignored because this partition is not assigned final Map<TopicPartition, OffsetAndMetadata> committableOffsets = new HashMap<>(); committableOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); committableOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET)); sinkTask.preCommit(workerCurrentOffsets); EasyMock.expectLastCall().andReturn(taskOffsets); final Capture<OffsetCommitCallback> callback = EasyMock.newCapture(); consumer.commitAsync(EasyMock.eq(committableOffsets), EasyMock.capture(callback)); EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() { @Override public Void answer() throws Throwable { callback.getValue().onComplete(committableOffsets, null); return null; } }); expectConsumerPoll(0); sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject()); EasyMock.expectLastCall(); PowerMock.replayAll(); workerTask.initialize(TASK_CONFIG); workerTask.initializeAndStart(); workerTask.iteration(); // iter 1 -- initial assignment assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets")); workerTask.iteration(); // iter 2 -- deliver 2 records assertEquals(workerCurrentOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets")); assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets")); sinkTaskContext.getValue().requestCommit(); workerTask.iteration(); // iter 3 -- commit assertEquals(committableOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets")); PowerMock.verifyAll(); }
private OffsetCommitCompletion(OffsetCommitCallback callback, Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { this.callback = callback; this.offsets = offsets; this.exception = exception; }
public void consumeStrings(BooleanSupplier continuation, Runnable completion, Collection<String> topics, Consumer<ConsumerRecord<String, String>> consumerFunction) { Deserializer<String> keyDes = new StringDeserializer(); String randomId = UUID.randomUUID().toString(); OffsetCommitCallback offsetCommitCallback = null; this.consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, keyDes, continuation, (OffsetCommitCallback) offsetCommitCallback, completion, topics, consumerFunction); }
public void setUserCallback(OffsetCommitCallback userCallback) { _userCallback = userCallback; }
@Override public void commitAsync(OffsetCommitCallback callback) { // preserve the high watermark. commitOffsets(currentOffsetAndMetadataMap(), false, callback, false); }
@Override public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) { // Ignore the high watermark. commitOffsets(offsets, false, null, true); }
/** * Initialize the coordination manager. */ public ConsumerCoordinator(ConsumerNetworkClient client, String groupId, int sessionTimeoutMs, int heartbeatIntervalMs, List<PartitionAssignor> assignors, Metadata metadata, SubscriptionState subscriptions, Metrics metrics, String metricGrpPrefix, Time time, long retryBackoffMs, OffsetCommitCallback defaultOffsetCommitCallback, boolean autoCommitEnabled, long autoCommitIntervalMs, ConsumerInterceptors<?, ?> interceptors, boolean excludeInternalTopics) { super(client, groupId, sessionTimeoutMs, heartbeatIntervalMs, metrics, metricGrpPrefix, time, retryBackoffMs); this.metadata = metadata; this.metadata.requestUpdate(); this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch()); this.subscriptions = subscriptions; this.defaultOffsetCommitCallback = defaultOffsetCommitCallback; this.autoCommitEnabled = autoCommitEnabled; this.assignors = assignors; addMetadataListener(); if (autoCommitEnabled) { this.autoCommitTask = new AutoCommitTask(autoCommitIntervalMs); this.autoCommitTask.reschedule(); } else { this.autoCommitTask = null; } this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix); this.interceptors = interceptors; this.excludeInternalTopics = excludeInternalTopics; }
@Override public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) { // Ignore the high watermark. commitOffsets(offsets, true, callback, false); }
/** * Commit the specified offsets for the specified list of topics and partitions to Kafka. * @param offsets given offsets * @param callback Callback to invoke when the commit completes */ @Override public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) { consumer.commitAsync(offsets, callback); }
/** * Commit offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions. * <p> * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API * should not be used. * <p> * This is an asynchronous call and will not block. Any errors encountered are either passed to the callback * (if provided) or discarded. * * @param callback Callback to invoke when the commit completes */ @InterfaceOrigin.ApacheKafka void commitAsync(OffsetCommitCallback callback);
/** * Commit the specified offsets for the specified list of topics and partitions to Kafka. * <p> * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API * should not be used. The committed offset should be the next message your application will consume, * i.e. lastProcessedMessageOffset + 1. * <p> * This is an asynchronous call and will not block. Any errors encountered are either passed to the callback * (if provided) or discarded. * <p> * This method is large message transparent. * <p> * With large message support, the consumer will internally translate the user provided offsets to the safe * offsets for the corresponding partitions (see {@link #safeOffset(TopicPartition, long)}. The actual offset * committed to Kafka can be retrieved through method {@link #committedSafeOffset(TopicPartition)}. The arguments * passed to the callback is large message transparent. * * @param offsets A map of offsets by partition with associate metadata. This map will be copied internally, so it * is safe to mutate the map after returning. * @param callback Callback to invoke when the commit completes */ @InterfaceOrigin.ApacheKafka void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);