Java 类org.apache.kafka.clients.consumer.OffsetCommitCallback 实例源码

项目:DBus    文件:KafkaSource.java   
public void commitOffset(long offset) {
    OffsetAndMetadata offsetMeta = new OffsetAndMetadata(offset + 1, "");

    Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
    offsetMap.put(statTopicPartition, offsetMeta);

    OffsetCommitCallback callback = new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
            if (e != null) {
                LOG.warn(String.format("CommitAsync failed!!!! offset %d, Topic %s", offset, statTopic));
            }
            else {
                ; //do nothing when OK;
                //logger.info(String.format("OK. offset %d, Topic %s", record.offset(), record.topic()));
            }
        }
    };
    consumer.commitAsync(offsetMap, callback);
}
项目:Lagerta    文件:ConsumerProxyRetry.java   
@Override
public void commitAsync(final OffsetCommitCallback callback) {
    Retries.tryMe(new IgniteInClosure<RetryRunnableAsyncOnCallback>() {
        @Override
        public void apply(final RetryRunnableAsyncOnCallback retryRunnableAsyncOnCallback) {
            inner.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    callback.onComplete(offsets, exception);
                    if (exception != null) {
                        retryRunnableAsyncOnCallback.retry(exception);
                    }
                }
            });
        }
    }, strategy());
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinator.java   
private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
    this.subscriptions.needRefreshCommits();
    //rebalance
    RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
    final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
    future.addListener(new RequestFutureListener<Void>() {
        @Override
        public void onSuccess(Void value) {
            if (interceptors != null)
                interceptors.onCommit(offsets);

            completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
        }

        @Override
        public void onFailure(RuntimeException e) {
            Exception commitException = e;

            if (e instanceof RetriableException)
                commitException = RetriableCommitFailedException.withUnderlyingMessage(e.getMessage());

            completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
        }
    });
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinator.java   
private void doAutoCommitOffsetsAsync() {
    Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
    log.debug("Sending asynchronous auto-commit of offsets {} for group {}", allConsumedOffsets, groupId);
    //rebalance
    commitOffsetsAsync(allConsumedOffsets, new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (exception != null) {
                log.warn("Auto-commit of offsets {} failed for group {}: {}", offsets, groupId,
                        exception.getMessage());
                if (exception instanceof RetriableException)
                    nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline);
            } else {
                log.debug("Completed auto-commit of offsets {} for group {}", offsets, groupId);
            }
        }
    });
}
项目:rmap    文件:KafkaUtils.java   
static void commitOffsets(Consumer<?, ?> consumer, Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
                          boolean async) {

    if (offsetsToCommit == null || offsetsToCommit.isEmpty()) {
        return;
    }

    OffsetCommitCallback callback = (offsets, exception) -> {
        if (exception != null) {
            LOG.warn("Unable to commit offsets for {} TopicPartition(s) {}: {}",
                    offsets.size(),
                    offsetsAsString(offsets),
                    exception.getMessage(),
                    exception);
        } else {
            LOG.debug("Successfully committed offset(s) for {} TopicPartition(s): {}",
                    offsets.size(), offsetsAsString(offsets));
        }
    };

    if (async) {
        consumer.commitAsync(offsetsToCommit, callback);
    } else {
        consumer.commitSync(offsetsToCommit);
    }
}
项目:kafka    文件:ConsumerCoordinator.java   
public void run(final long now) {
    if (coordinatorUnknown()) {
        log.debug("Cannot auto-commit offsets for group {} since the coordinator is unknown", groupId);
        reschedule(now + retryBackoffMs);
        return;
    }

    if (needRejoin()) {
        // skip the commit when we're rejoining since we'll commit offsets synchronously
        // before the revocation callback is invoked
        reschedule(now + interval);
        return;
    }

    commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (exception == null) {
                reschedule(now + interval);
            } else {
                log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
                reschedule(now + interval);
            }
        }
    });
}
项目:vertx-kafka-client    文件:KafkaReadStreamImpl.java   
@Override
public void commit(Map<TopicPartition, OffsetAndMetadata> offsets, Handler<AsyncResult<Map<TopicPartition, OffsetAndMetadata>>> completionHandler) {
  this.submitTask((consumer, future) -> {
    OffsetCommitCallback callback = (result, exception) -> {
      if (future != null) {
        if (exception != null) {
          future.fail(exception);
        } else {
          future.complete(result);
        }
      }
    };
    if (offsets == null) {
      consumer.commitAsync(callback);
    } else {
      consumer.commitAsync(offsets, callback);
    }
  }, completionHandler);
}
项目:kafka-0.11.0.0-src-with-comment    文件:WorkerSinkTaskThreadedTest.java   
private Capture<OffsetCommitCallback> expectOffsetCommit(final long expectedMessages,
                                                         final RuntimeException error,
                                                         final Exception consumerCommitError,
                                                         final long consumerCommitDelayMs,
                                                         final boolean invokeCallback)
        throws Exception {
    final long finalOffset = FIRST_OFFSET + expectedMessages;

    // All assigned partitions will have offsets committed, but we've only processed messages/updated offsets for one
    final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
    offsetsToCommit.put(TOPIC_PARTITION, new OffsetAndMetadata(finalOffset));
    offsetsToCommit.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
    offsetsToCommit.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET));
    sinkTask.preCommit(offsetsToCommit);
    IExpectationSetters<Object> expectation = PowerMock.expectLastCall();
    if (error != null) {
        expectation.andThrow(error).once();
        return null;
    } else {
        expectation.andReturn(offsetsToCommit);
    }

    final Capture<OffsetCommitCallback> capturedCallback = EasyMock.newCapture();
    consumer.commitAsync(EasyMock.eq(offsetsToCommit),
            EasyMock.capture(capturedCallback));
    PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
        @Override
        public Object answer() throws Throwable {
            time.sleep(consumerCommitDelayMs);
            if (invokeCallback)
                capturedCallback.getValue().onComplete(offsetsToCommit, consumerCommitError);
            return null;
        }
    });
    return capturedCallback;
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinator.java   
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
    invokeCompletedOffsetCommitCallbacks();
    // 没有找到可用的Node
    if (!coordinatorUnknown()) {
        //rebalance
        doCommitOffsetsAsync(offsets, callback);
    } else {
        // we don't know the current coordinator, so try to find it and then send the commit
        // or fail (we don't want recursive retries which can cause offset commits to arrive
        // out of order). Note that there may be multiple offset commits chained to the same
        // coordinator lookup request. This is fine because the listeners will be invoked in
        // the same order that they were added. Note also that AbstractCoordinator prevents
        // multiple concurrent coordinator lookup requests.
        pendingAsyncCommits.incrementAndGet();
        lookupCoordinator().addListener(new RequestFutureListener<Void>() {
            @Override
            public void onSuccess(Void value) {
                pendingAsyncCommits.decrementAndGet();
                doCommitOffsetsAsync(offsets, callback);
            }

            @Override
            public void onFailure(RuntimeException e) {
                pendingAsyncCommits.decrementAndGet();
                completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets,
                        RetriableCommitFailedException.withUnderlyingMessage(e.getMessage())));
            }
        });
    }

    // ensure the commit has a chance to be transmitted (without blocking on its completion).
    // Note that commits are treated as heartbeats by the coordinator, so there is no need to
    // explicitly allow heartbeats through delayed task execution.
    client.pollNoWakeup();
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinatorTest.java   
private OffsetCommitCallback callback(final AtomicBoolean success) {
    return new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (exception == null)
                success.set(true);
        }
    };
}
项目:fluid    文件:KafkaUsage.java   
/**
 * Use the supplied function to asynchronously consume messages from the cluster.
 *
 * @param groupId              the name of the group; may not be null
 * @param clientId             the name of the client; may not be null
 * @param autoOffsetReset      how to pick a starting offset when there is no initial offset in ZooKeeper or if an offset is
 *                             out of range; may be null for the default to be used
 * @param keyDeserializer      the deserializer for the keys; may not be null
 * @param valueDeserializer    the deserializer for the values; may not be null
 * @param continuation         the function that determines if the consumer should continue; may not be null
 * @param offsetCommitCallback the callback that should be used after committing offsets; may be null if offsets are
 *                             not to be committed
 * @param completion           the function to call when the consumer terminates; may be null
 * @param topics               the set of topics to consume; may not be null or empty
 * @param consumerFunction     the function to consume the messages; may not be null
 */
public <K, V> void consume(String groupId, String clientId, OffsetResetStrategy autoOffsetReset,
                           Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer,
                           BooleanSupplier continuation, OffsetCommitCallback offsetCommitCallback, Runnable completion,
                           Collection<String> topics,
                           java.util.function.Consumer<ConsumerRecord<K, V>> consumerFunction) {
    Properties props = getConsumerProperties(groupId, clientId, autoOffsetReset);
    Thread t = new Thread(() -> {
        LOGGER.info("Starting consumer {} to read messages", clientId);
        try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(props, keyDeserializer, valueDeserializer)) {
            consumer.subscribe(new ArrayList<>(topics));
            while (continuation.getAsBoolean()) {
                consumer.poll(10).forEach(record -> {
                    LOGGER.info("Consumer {}: consuming message {}", clientId, record);
                    consumerFunction.accept(record);
                    if (offsetCommitCallback != null) {
                        consumer.commitAsync(offsetCommitCallback);
                    }
                });
            }
        } finally {
            if (completion != null) completion.run();
            LOGGER.debug("Stopping consumer {}", clientId);
        }
    });
    t.setName(clientId + "-thread");
    t.start();
}
项目:fluid    文件:KafkaUsage.java   
public void consumeIntegers(BooleanSupplier continuation, Runnable completion, Collection<String> topics, Consumer<ConsumerRecord<String, Integer>> consumerFunction) {
    Deserializer<String> keyDes = new StringDeserializer();
    Deserializer<Integer> valDes = new IntegerDeserializer();
    String randomId = UUID.randomUUID().toString();
    OffsetCommitCallback offsetCommitCallback = null;
    this.consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, valDes, continuation, (OffsetCommitCallback) offsetCommitCallback, completion, topics, consumerFunction);
}
项目:likafka-clients    文件:LiKafkaConsumerImpl.java   
private void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets,
                           boolean ignoreConsumerHighWatermark,
                           OffsetCommitCallback callback,
                           boolean sync) {
  Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = getOffsetsToCommit(offsets, ignoreConsumerHighWatermark);
  if (sync) {
    LOG.trace("Committing offsets synchronously: {}", offsetsToCommit);
    _kafkaConsumer.commitSync(offsetsToCommit);
  } else {
    LOG.trace("Committing offsets asynchronously: {}", offsetsToCommit);
    _offsetCommitCallback.setUserCallback(callback);
    _kafkaConsumer.commitAsync(offsetsToCommit, _offsetCommitCallback);
  }
}
项目:likafka-clients    文件:LiKafkaConsumerIntegrationTest.java   
@Test
public void testOffsetCommitCallback() {
  String topic = "testOffsetCommitCallback";
  produceSyntheticMessages(topic);
  Properties props = new Properties();
  // All the consumers should have the same group id.
  props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testOffsetCommitCallback");
  // Make sure we start to consume from the beginning.
  props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  // Only fetch one record at a time.
  props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
  try (LiKafkaConsumer<String, String> consumer = createConsumer(props)) {
    TopicPartition tp = new TopicPartition(topic, 0);
    consumer.assign(Collections.singleton(tp));
    consumer.poll(5000); // M2
    final AtomicBoolean offsetCommitted = new AtomicBoolean(false);
    consumer.commitAsync(new OffsetCommitCallback() {
      @Override
      public void onComplete(Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap, Exception e) {
        assertEquals(topicPartitionOffsetAndMetadataMap.get(tp), new OffsetAndMetadata(3, ""), "The committed user offset should be 3");
        offsetCommitted.set(true);
      }
    });
    while (!offsetCommitted.get()) {
      consumer.poll(10);
    }
    assertEquals(consumer.committed(tp), new OffsetAndMetadata(3, ""), "The committed user offset should be 3");
    assertEquals(consumer.committedSafeOffset(tp).longValue(), 0, "The committed actual offset should be 0");
  }
}
项目:kafka    文件:ConsumerCoordinator.java   
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
    this.subscriptions.needRefreshCommits();
    RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
    final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
    future.addListener(new RequestFutureListener<Void>() {
        @Override
        public void onSuccess(Void value) {
            if (interceptors != null)
                interceptors.onCommit(offsets);
            cb.onComplete(offsets, null);
        }

        @Override
        public void onFailure(RuntimeException e) {
            if (e instanceof RetriableException) {
                cb.onComplete(offsets, new RetriableCommitFailedException("Commit offsets failed with retriable exception. You should retry committing offsets.", e));
            } else {
                cb.onComplete(offsets, e);
            }
        }
    });

    // ensure the commit has a chance to be transmitted (without blocking on its completion).
    // Note that commits are treated as heartbeats by the coordinator, so there is no need to
    // explicitly allow heartbeats through delayed task execution.
    client.pollNoWakeup();
}
项目:kafka    文件:ConsumerCoordinatorTest.java   
private OffsetCommitCallback callback(final AtomicBoolean success) {
    return new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (exception == null)
                success.set(true);
        }
    };
}
项目:li-apache-kafka-clients    文件:LiKafkaConsumerIntegrationTest.java   
private OffsetAndMetadata commitAndRetrieveOffsets(
    LiKafkaConsumer<String, String> consumer,
    TopicPartition tp, Map<TopicPartition,
    OffsetAndMetadata> offsetMap) throws Exception {
  final AtomicBoolean callbackFired = new AtomicBoolean(false);
  final AtomicReference<Exception> offsetCommitIssue = new AtomicReference<>(null);
  OffsetAndMetadata committed = null;
  long now = System.currentTimeMillis();
  long deadline = now + TimeUnit.MINUTES.toMillis(1);
  while (System.currentTimeMillis() < deadline) {
    //call commitAsync, wait for a NON-NULL return value (see https://issues.apache.org/jira/browse/KAFKA-6183)
    OffsetCommitCallback commitCallback = new OffsetCommitCallback() {
      @Override
      public void onComplete(Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap, Exception e) {
        if (e != null) {
          offsetCommitIssue.set(e);
        }
        callbackFired.set(true);
      }
    };
    if (offsetMap != null) {
      consumer.commitAsync(offsetMap, commitCallback);
    } else {
      consumer.commitAsync(commitCallback);
    }
    while (!callbackFired.get()) {
      consumer.poll(20);
    }
    Assert.assertNull(offsetCommitIssue.get(), "offset commit failed");
    committed = consumer.committed(tp);
    if (committed != null) {
      break;
    }
    Thread.sleep(100);
  }
  assertNotNull(committed, "unable to retrieve committed offsets within timeout");
  return committed;
}
项目:li-apache-kafka-clients    文件:LiKafkaConsumerImpl.java   
private void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets,
                           boolean ignoreConsumerHighWatermark,
                           OffsetCommitCallback callback,
                           boolean sync) {
  Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = getOffsetsToCommit(offsets, ignoreConsumerHighWatermark);
  if (sync) {
    LOG.trace("Committing offsets synchronously: {}", offsetsToCommit);
    _kafkaConsumer.commitSync(offsetsToCommit);
  } else {
    LOG.trace("Committing offsets asynchronously: {}", offsetsToCommit);
    _offsetCommitCallback.setUserCallback(callback);
    _kafkaConsumer.commitAsync(offsetsToCommit, _offsetCommitCallback);
  }
}
项目:jeesuite-libs    文件:NewApiTopicConsumer.java   
private void commitOffsets(ConsumerWorker worker) {

    KafkaConsumer<String, Serializable> consumer = worker.consumer;
    if(worker.isCommiting())return;
    worker.setCommiting(true);
    try {

        if(worker.uncommittedOffsetMap.isEmpty())return ;

        logger.debug("committing the offsets : {}", worker.uncommittedOffsetMap);
        consumer.commitAsync(worker.uncommittedOffsetMap, new OffsetCommitCallback() {
            @Override
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                //
                worker.setCommiting(false);
                if(exception == null){
                    worker.resetUncommittedOffsetMap();
                    logger.debug("committed the offsets : {}",offsets);
                }else{
                    logger.error("committ the offsets error",exception);
                }
            }
        });
    } finally {

    }
}
项目:java-kafka-client    文件:TracingKafkaConsumer.java   
@Override
public void commitAsync(OffsetCommitCallback callback) {
  consumer.commitAsync(callback);
}
项目:java-kafka-client    文件:TracingKafkaConsumer.java   
@Override
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets,
    OffsetCommitCallback callback) {
  consumer.commitAsync(offsets, callback);
}
项目:Lagerta    文件:ConsumerForTests.java   
@Override public void commitAsync(OffsetCommitCallback callback) {
    activeConsumer().commitAsync(callback);
}
项目:Lagerta    文件:ConsumerForTests.java   
@Override public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
    activeConsumer().commitAsync(offsets, callback);
}
项目:Lagerta    文件:ConsumerAdapter.java   
@Override public void commitAsync(OffsetCommitCallback callback) {
}
项目:Lagerta    文件:ConsumerAdapter.java   
@Override public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
}
项目:kafka-0.11.0.0-src-with-comment    文件:WorkerSinkTaskTest.java   
@Test
public void testRequestCommit() throws Exception {
    expectInitializeTask();

    expectPollInitialAssignment();

    expectConsumerPoll(1);
    expectConversionAndTransformation(1);
    sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
    EasyMock.expectLastCall();

    final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
    offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
    sinkTask.preCommit(offsets);
    EasyMock.expectLastCall().andReturn(offsets);

    final Capture<OffsetCommitCallback> callback = EasyMock.newCapture();
    consumer.commitAsync(EasyMock.eq(offsets), EasyMock.capture(callback));
    EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
        @Override
        public Void answer() throws Throwable {
            callback.getValue().onComplete(offsets, null);
            return null;
        }
    });

    expectConsumerPoll(0);
    sinkTask.put(Collections.<SinkRecord>emptyList());
    EasyMock.expectLastCall();

    PowerMock.replayAll();

    workerTask.initialize(TASK_CONFIG);
    workerTask.initializeAndStart();

    workerTask.iteration(); // initial assignment

    workerTask.iteration(); // first record delivered

    sinkTaskContext.getValue().requestCommit();
    assertTrue(sinkTaskContext.getValue().isCommitRequested());
    assertNotEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
    workerTask.iteration(); // triggers the commit
    assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been cleared
    assertEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
    assertEquals(0, workerTask.commitFailures());

    PowerMock.verifyAll();
}
项目:kafka-0.11.0.0-src-with-comment    文件:WorkerSinkTaskTest.java   
@Test
public void testPreCommit() throws Exception {
    expectInitializeTask();

    // iter 1
    expectPollInitialAssignment();

    // iter 2
    expectConsumerPoll(2);
    expectConversionAndTransformation(2);
    sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
    EasyMock.expectLastCall();

    final Map<TopicPartition, OffsetAndMetadata> workerStartingOffsets = new HashMap<>();
    workerStartingOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET));
    workerStartingOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));

    final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>();
    workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2));
    workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));

    final Map<TopicPartition, OffsetAndMetadata> taskOffsets = new HashMap<>();
    taskOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); // act like FIRST_OFFSET+2 has not yet been flushed by the task
    taskOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET + 1)); // should be ignored because > current offset
    taskOffsets.put(new TopicPartition(TOPIC, 3), new OffsetAndMetadata(FIRST_OFFSET)); // should be ignored because this partition is not assigned

    final Map<TopicPartition, OffsetAndMetadata> committableOffsets = new HashMap<>();
    committableOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
    committableOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));

    sinkTask.preCommit(workerCurrentOffsets);
    EasyMock.expectLastCall().andReturn(taskOffsets);
    final Capture<OffsetCommitCallback> callback = EasyMock.newCapture();
    consumer.commitAsync(EasyMock.eq(committableOffsets), EasyMock.capture(callback));
    EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
        @Override
        public Void answer() throws Throwable {
            callback.getValue().onComplete(committableOffsets, null);
            return null;
        }
    });
    expectConsumerPoll(0);
    sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
    EasyMock.expectLastCall();

    PowerMock.replayAll();

    workerTask.initialize(TASK_CONFIG);
    workerTask.initializeAndStart();
    workerTask.iteration(); // iter 1 -- initial assignment

    assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets"));
    workerTask.iteration(); // iter 2 -- deliver 2 records

    assertEquals(workerCurrentOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets"));
    assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
    sinkTaskContext.getValue().requestCommit();
    workerTask.iteration(); // iter 3 -- commit
    assertEquals(committableOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));

    PowerMock.verifyAll();
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinator.java   
private OffsetCommitCompletion(OffsetCommitCallback callback, Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
    this.callback = callback;
    this.offsets = offsets;
    this.exception = exception;
}
项目:fluid    文件:KafkaUsage.java   
public void consumeStrings(BooleanSupplier continuation, Runnable completion, Collection<String> topics, Consumer<ConsumerRecord<String, String>> consumerFunction) {
    Deserializer<String> keyDes = new StringDeserializer();
    String randomId = UUID.randomUUID().toString();
    OffsetCommitCallback offsetCommitCallback = null;
    this.consume(randomId, randomId, OffsetResetStrategy.EARLIEST, keyDes, keyDes, continuation, (OffsetCommitCallback) offsetCommitCallback, completion, topics, consumerFunction);
}
项目:likafka-clients    文件:LiKafkaOffsetCommitCallback.java   
public void setUserCallback(OffsetCommitCallback userCallback) {
  _userCallback = userCallback;
}
项目:likafka-clients    文件:LiKafkaConsumerImpl.java   
@Override
public void commitAsync(OffsetCommitCallback callback) {
  // preserve the high watermark.
  commitOffsets(currentOffsetAndMetadataMap(), false, callback, false);
}
项目:likafka-clients    文件:LiKafkaConsumerImpl.java   
@Override
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
  // Ignore the high watermark.
  commitOffsets(offsets, false, null, true);
}
项目:kafka    文件:ConsumerCoordinator.java   
/**
 * Initialize the coordination manager.
 */
public ConsumerCoordinator(ConsumerNetworkClient client,
                           String groupId,
                           int sessionTimeoutMs,
                           int heartbeatIntervalMs,
                           List<PartitionAssignor> assignors,
                           Metadata metadata,
                           SubscriptionState subscriptions,
                           Metrics metrics,
                           String metricGrpPrefix,
                           Time time,
                           long retryBackoffMs,
                           OffsetCommitCallback defaultOffsetCommitCallback,
                           boolean autoCommitEnabled,
                           long autoCommitIntervalMs,
                           ConsumerInterceptors<?, ?> interceptors,
                           boolean excludeInternalTopics) {
    super(client,
            groupId,
            sessionTimeoutMs,
            heartbeatIntervalMs,
            metrics,
            metricGrpPrefix,
            time,
            retryBackoffMs);
    this.metadata = metadata;

    this.metadata.requestUpdate();
    this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch());
    this.subscriptions = subscriptions;
    this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
    this.autoCommitEnabled = autoCommitEnabled;
    this.assignors = assignors;

    addMetadataListener();

    if (autoCommitEnabled) {
        this.autoCommitTask = new AutoCommitTask(autoCommitIntervalMs);
        this.autoCommitTask.reschedule();
    } else {
        this.autoCommitTask = null;
    }

    this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
    this.interceptors = interceptors;
    this.excludeInternalTopics = excludeInternalTopics;
}
项目:li-apache-kafka-clients    文件:LiKafkaOffsetCommitCallback.java   
public void setUserCallback(OffsetCommitCallback userCallback) {
  _userCallback = userCallback;
}
项目:li-apache-kafka-clients    文件:LiKafkaConsumerImpl.java   
@Override
public void commitAsync(OffsetCommitCallback callback) {
  // preserve the high watermark.
  commitOffsets(currentOffsetAndMetadataMap(), false, callback, false);
}
项目:li-apache-kafka-clients    文件:LiKafkaConsumerImpl.java   
@Override
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
  // Ignore the high watermark.
  commitOffsets(offsets, true, callback, false);
}
项目:apex-malhar    文件:KafkaConsumer010.java   
/**
 * Commit the specified offsets for the specified list of topics and partitions to Kafka.
 * @param offsets given offsets
 * @param callback Callback to invoke when the commit completes
 */
@Override
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
{
  consumer.commitAsync(offsets, callback);
}
项目:apex-malhar    文件:KafkaConsumer09.java   
/**
 * Commit the specified offsets for the specified list of topics and partitions to Kafka.
 * @param offsets given offsets
 * @param callback Callback to invoke when the commit completes
 */
@Override
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
{
  consumer.commitAsync(offsets, callback);
}
项目:likafka-clients    文件:LiKafkaConsumer.java   
/**
 * Commit offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
 * <p>
 * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
 * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
 * should not be used.
 * <p>
 * This is an asynchronous call and will not block. Any errors encountered are either passed to the callback
 * (if provided) or discarded.
 *
 * @param callback Callback to invoke when the commit completes
 */
@InterfaceOrigin.ApacheKafka
void commitAsync(OffsetCommitCallback callback);
项目:likafka-clients    文件:LiKafkaConsumer.java   
/**
 * Commit the specified offsets for the specified list of topics and partitions to Kafka.
 * <p>
 * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
 * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
 * should not be used. The committed offset should be the next message your application will consume,
 * i.e. lastProcessedMessageOffset + 1.
 * <p>
 * This is an asynchronous call and will not block. Any errors encountered are either passed to the callback
 * (if provided) or discarded.
 * <p>
 * This method is large message transparent.
 * <p>
 * With large message support, the consumer will internally translate the user provided offsets to the safe
 * offsets for the corresponding partitions (see {@link #safeOffset(TopicPartition, long)}. The actual offset
 * committed to Kafka can be retrieved through method {@link #committedSafeOffset(TopicPartition)}. The arguments
 * passed to the callback is large message transparent.
 *
 * @param offsets  A map of offsets by partition with associate metadata. This map will be copied internally, so it
 *                 is safe to mutate the map after returning.
 * @param callback Callback to invoke when the commit completes
 */
@InterfaceOrigin.ApacheKafka
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);