Java 类org.apache.kafka.clients.consumer.OffsetAndMetadata 实例源码

项目:wechat-mall    文件:OrderProducer.java   
@Override
public void send(Long k, byte[] v) {
    KafkaProducer<Long, byte[]> p = getWorker();
    p.initTransactions();
    p.beginTransaction();
    Future<RecordMetadata> res = worker.send(new ProducerRecord<Long, byte[]>(topic, k, v));
    RecordMetadata record;
    try {
        record = res.get();
        offsets.clear();
        offsets.put(new TopicPartition(topic, record.partition()), new OffsetAndMetadata(record.offset()));
        p.sendOffsetsToTransaction(offsets, MallConstants.ORDER_GROUP);
        p.commitTransaction();
    } catch (InterruptedException | ExecutionException e) {
        p.abortTransaction();
    }
}
项目:bireme    文件:KafkaPipeLine.java   
@Override
public void commit() {
  HashMap<TopicPartition, OffsetAndMetadata> offsets =
      new HashMap<TopicPartition, OffsetAndMetadata>();

  partitionOffset.forEach((key, value) -> {
    String topic = key.split("\\+")[0];
    int partition = Integer.valueOf(key.split("\\+")[1]);
    offsets.put(new TopicPartition(topic, partition), new OffsetAndMetadata(value + 1));
  });

  consumer.commitSync(offsets);
  committed.set(true);
  partitionOffset.clear();

  // record the time being committed
  timerCTX.stop();

  stat.newestCompleted = newestRecord;
  stat.delay = new Date().getTime() - start.getTime();
}
项目:ja-micro    文件:OffsetCommitter.java   
public void recommitOffsets() {
    LocalDateTime now = LocalDateTime.now(clock);
    if (now.isAfter(lastUpdateTime.plus(IDLE_DURATION))) {
        for (TopicPartition tp : offsetData.keySet()) {
            OffsetAndTime offsetAndTime = offsetData.get(tp);
            if (now.isAfter(offsetAndTime.time.plus(IDLE_DURATION))) {
                try {
                    consumer.commitSync(Collections.singletonMap(tp,
                            new OffsetAndMetadata(offsetAndTime.offset)));
                } catch (CommitFailedException covfefe) {
                    logger.info("Caught CommitFailedException attempting to commit {} {}",
                            tp, offsetAndTime.offset);
                }
                offsetAndTime.time = now;
            }
        }
        lastUpdateTime = now;
    }
}
项目:flume-release-1.7.0    文件:KafkaSource.java   
private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(ZkUtils client,
                                                                   String topicStr) {
  Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
  ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, topicStr);
  List<String> partitions = asJavaListConverter(
      client.getChildrenParentMayNotExist(topicDirs.consumerOffsetDir())).asJava();
  for (String partition : partitions) {
    TopicPartition key = new TopicPartition(topicStr, Integer.valueOf(partition));
    Option<String> data = client.readDataMaybeNull(
        topicDirs.consumerOffsetDir() + "/" + partition)._1();
    if (data.isDefined()) {
      Long offset = Long.valueOf(data.get());
      offsets.put(key, new OffsetAndMetadata(offset));
    }
  }
  return offsets;
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinator.java   
/**
 * Fetch the committed offsets for a set of partitions. This is a non-blocking call. The
 * returned future can be polled to get the actual offsets returned from the broker.
 *
 * @param partitions The set of partitions to get offsets for.
 * @return A request future containing the committed offsets.
 */
// 创建并缓存OffsetFethcRequest
private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
    Node coordinator = coordinator();
    if (coordinator == null)
        return RequestFuture.coordinatorNotAvailable();

    log.debug("Group {} fetching committed offsets for partitions: {}", groupId, partitions);
    // construct the request
    OffsetFetchRequest.Builder requestBuilder =
            new OffsetFetchRequest.Builder(this.groupId, new ArrayList<>(partitions));

    // send the request with a callback
    // 使用OffsetFetchResponseHandler来处理响应
    return client.send(coordinator, requestBuilder)
            .compose(new OffsetFetchResponseHandler());
}
项目:kafka-0.11.0.0-src-with-comment    文件:VerifiableConsumer.java   
public void run() {
    try {
        printJson(new StartupComplete());
        consumer.subscribe(Collections.singletonList(topic), this);

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
            Map<TopicPartition, OffsetAndMetadata> offsets = onRecordsReceived(records);

            if (!useAutoCommit) {
                if (useAsyncCommit)
                    consumer.commitAsync(offsets, this);
                else
                    commitSync(offsets);
            }
        }
    } catch (WakeupException e) {
        // ignore, we are closing
    } finally {
        consumer.close();
        printJson(new ShutdownComplete());
        shutdownLatch.countDown();
    }
}
项目:wngn-jms-kafka    文件:FileStreamSinkTaskTest.java   
@Test
public void testPutFlush() {
    HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    final String newLine = System.getProperty("line.separator"); 

    // We do not call task.start() since it would override the output stream

    task.put(Arrays.asList(
            new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line1", 1)
    ));
    offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(1L));
    task.flush(offsets);
    assertEquals("line1" + newLine, os.toString());

    task.put(Arrays.asList(
            new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line2", 2),
            new SinkRecord("topic2", 0, null, null, Schema.STRING_SCHEMA, "line3", 1)
    ));
    offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(2L));
    offsets.put(new TopicPartition("topic2", 0), new OffsetAndMetadata(1L));
    task.flush(offsets);
    assertEquals("line1" + newLine + "line2" + newLine + "line3" + newLine, os.toString());
}
项目:kinesis-kafka-connector    文件:AmazonKinesisSinkTask.java   
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> arg0) {
    // TODO Auto-generated method stub
    if (singleKinesisProducerPerPartition) {
        producerMap.values().forEach(producer -> {
            if (flushSync)
                producer.flushSync();
            else
                producer.flush();
        });
    } else {
        if (flushSync)
            kinesisProducer.flushSync();
        else
            kinesisProducer.flush();
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerInterceptorsTest.java   
@Test
public void testOnCommitChain() {
    List<ConsumerInterceptor<Integer, Integer>> interceptorList = new ArrayList<>();
    // we are testing two different interceptors by configuring the same interceptor differently, which is not
    // how it would be done in KafkaConsumer, but ok for testing interceptor callbacks
    FilterConsumerInterceptor<Integer, Integer> interceptor1 = new FilterConsumerInterceptor<>(filterPartition1);
    FilterConsumerInterceptor<Integer, Integer> interceptor2 = new FilterConsumerInterceptor<>(filterPartition2);
    interceptorList.add(interceptor1);
    interceptorList.add(interceptor2);
    ConsumerInterceptors<Integer, Integer> interceptors = new ConsumerInterceptors<>(interceptorList);

    // verify that onCommit is called for all interceptors in the chain
    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    offsets.put(tp, new OffsetAndMetadata(0));
    interceptors.onCommit(offsets);
    assertEquals(2, onCommitCount);

    // verify that even if one of the interceptors throws an exception, all interceptors' onCommit are called
    interceptor1.injectOnCommitError(true);
    interceptors.onCommit(offsets);
    assertEquals(4, onCommitCount);

    interceptors.close();
}
项目:Lagerta    文件:ReconcilerImpl.java   
private void seekToMissingTransactions(Map<TopicPartition, List<Long>> txByPartition) {
    Map<TopicPartition, Long> timestamps = txByPartition.entrySet().stream()
            .collect(Collectors.toMap(
                    Map.Entry::getKey,
                    entry -> Collections.min(entry.getValue())
            ));
    Map<TopicPartition, OffsetAndTimestamp> foundOffsets = consumer.offsetsForTimes(timestamps);
    Map<TopicPartition, OffsetAndMetadata> toCommit = foundOffsets.entrySet().stream()
            .collect(Collectors.toMap(
                    Map.Entry::getKey,
                    entry -> {
                        long offset = entry.getValue() != null? entry.getValue().offset() : 0;
                        return new OffsetAndMetadata(offset);
                    }
            ));
    consumer.commitSync(toCommit);
}
项目:Lagerta    文件:PublisherKafkaService.java   
public void seekToTransaction(DataRecoveryConfig config, long transactionId, KafkaFactory kafkaFactory,
    String groupId) {
    String topic = config.getLocalTopic();
    Properties consumerProperties = PropertiesUtil.propertiesForGroup(config.getConsumerConfig(), groupId);

    try (Consumer<ByteBuffer, ByteBuffer> consumer = kafkaFactory.consumer(consumerProperties)) {
        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
        Map<TopicPartition, Long> seekMap = new HashMap<>(partitionInfos.size());

        for (PartitionInfo partitionInfo : partitionInfos) {
            seekMap.put(new TopicPartition(topic, partitionInfo.partition()), transactionId);
        }
        consumer.assign(seekMap.keySet());
        Map<TopicPartition, OffsetAndTimestamp> foundOffsets = consumer.offsetsForTimes(seekMap);
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();

        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : foundOffsets.entrySet()) {
            if (entry.getValue() != null) {
                offsetsToCommit.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().offset()));
            }
        }
        consumer.commitSync(offsetsToCommit);
    }
}
项目:Lagerta    文件:OffsetCalculator.java   
public Map<TopicPartition, OffsetAndMetadata> calculateChangedOffsets(List<List<TransactionWrapper>> txToCommit) {
    if (txToCommit.isEmpty()) {
        return Collections.emptyMap();
    }
    Lazy<TopicPartition, MutableLongList> offsetsFromTransactions = calculateOffsetsFromTransactions(txToCommit);
    Collection<TopicPartition> allTopics = new HashSet<>(offsets.keySet());
    allTopics.addAll(offsetsFromTransactions.keySet());
    Map<TopicPartition, OffsetAndMetadata> result = new HashMap<>();
    for (TopicPartition topic : allTopics) {
        OffsetHolder offsetHolder = offsets.get(topic);
        long currentOffset = offsetHolder.getLastDenseOffset();
        long updatedOffset = MergeHelper.mergeWithDenseCompaction(offsetsFromTransactions.get(topic),
            offsetHolder.getSparseCommittedOffsets(), currentOffset);
        if (updatedOffset != INITIAL_SYNC_POINT && updatedOffset != currentOffset) {
            offsetHolder.setLastDenseOffset(updatedOffset);
            result.put(topic, new OffsetAndMetadata(updatedOffset));
        }
    }
    return result;
}
项目:Lagerta    文件:ConsumerProxyRetry.java   
@Override
public void commitAsync(final OffsetCommitCallback callback) {
    Retries.tryMe(new IgniteInClosure<RetryRunnableAsyncOnCallback>() {
        @Override
        public void apply(final RetryRunnableAsyncOnCallback retryRunnableAsyncOnCallback) {
            inner.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    callback.onComplete(offsets, exception);
                    if (exception != null) {
                        retryRunnableAsyncOnCallback.retry(exception);
                    }
                }
            });
        }
    }, strategy());
}
项目:kafka-0.11.0.0-src-with-comment    文件:SubscriptionStateTest.java   
@Test
public void topicSubscription() {
    state.subscribe(singleton(topic), rebalanceListener);
    assertEquals(1, state.subscription().size());
    assertTrue(state.assignedPartitions().isEmpty());
    assertTrue(state.partitionsAutoAssigned());
    state.assignFromSubscribed(singleton(tp0));
    state.seek(tp0, 1);
    state.committed(tp0, new OffsetAndMetadata(1));
    assertAllPositions(tp0, 1L);
    state.assignFromSubscribed(singleton(tp1));
    assertTrue(state.isAssigned(tp1));
    assertFalse(state.isAssigned(tp0));
    assertFalse(state.isFetchable(tp1));
    assertEquals(singleton(tp1), state.assignedPartitions());
}
项目:kafka-0.11.0.0-src-with-comment    文件:WorkerSinkTask.java   
private void rewind() {
    Map<TopicPartition, Long> offsets = context.offsets();
    if (offsets.isEmpty()) {
        return;
    }
    for (Map.Entry<TopicPartition, Long> entry: offsets.entrySet()) {
        TopicPartition tp = entry.getKey();
        Long offset = entry.getValue();
        if (offset != null) {
            log.trace("Rewind {} to offset {}.", tp, offset);
            consumer.seek(tp, offset);
            lastCommittedOffsets.put(tp, new OffsetAndMetadata(offset));
            currentOffsets.put(tp, new OffsetAndMetadata(offset));
        } else {
            log.warn("Cannot rewind {} to null offset.", tp);
        }
    }
    context.clearOffsets();
}
项目:kafka-0.11.0.0-src-with-comment    文件:WorkerSinkTaskTest.java   
private void expectRebalanceAssignmentError(RuntimeException e) {
    final List<TopicPartition> partitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2);

    sinkTask.close(new HashSet<>(partitions));
    EasyMock.expectLastCall();

    sinkTask.preCommit(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
    EasyMock.expectLastCall().andReturn(Collections.emptyMap());

    EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
    EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);

    sinkTask.open(partitions);
    EasyMock.expectLastCall().andThrow(e);

    EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
            new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                @Override
                public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
                    rebalanceListener.getValue().onPartitionsRevoked(partitions);
                    rebalanceListener.getValue().onPartitionsAssigned(partitions);
                    return ConsumerRecords.empty();
                }
            });
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinatorTest.java   
@Test
public void testCommitOffsetOnly() {
    subscriptions.assignFromUser(singleton(t1p));

    client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
    coordinator.ensureCoordinatorReady();

    client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));

    AtomicBoolean success = new AtomicBoolean(false);
    coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), callback(success));
    coordinator.invokeCompletedOffsetCommitCallbacks();
    assertTrue(success.get());

    assertEquals(100L, subscriptions.committed(t1p).offset());
}
项目:kafka-0.11.0.0-src-with-comment    文件:KStreamsFineGrainedAutoResetIntegrationTest.java   
private void commitInvalidOffsets() {
    final KafkaConsumer consumer = new KafkaConsumer(TestUtils.consumerConfig(
        CLUSTER.bootstrapServers(),
        streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG),
        StringDeserializer.class,
        StringDeserializer.class));

    final Map<TopicPartition, OffsetAndMetadata> invalidOffsets = new HashMap<>();
    invalidOffsets.put(new TopicPartition(TOPIC_1_2, 0), new OffsetAndMetadata(5, null));
    invalidOffsets.put(new TopicPartition(TOPIC_2_2, 0), new OffsetAndMetadata(5, null));
    invalidOffsets.put(new TopicPartition(TOPIC_A_2, 0), new OffsetAndMetadata(5, null));
    invalidOffsets.put(new TopicPartition(TOPIC_C_2, 0), new OffsetAndMetadata(5, null));
    invalidOffsets.put(new TopicPartition(TOPIC_Y_2, 0), new OffsetAndMetadata(5, null));
    invalidOffsets.put(new TopicPartition(TOPIC_Z_2, 0), new OffsetAndMetadata(5, null));

    consumer.commitSync(invalidOffsets);

    consumer.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:StandbyTaskTest.java   
@Test
public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStores() throws Exception {
    final String changelogName = "test-application-my-store-changelog";
    final List<TopicPartition> partitions = Utils.mkList(new TopicPartition(changelogName, 0));
    consumer.assign(partitions);
    final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
    committedOffsets.put(new TopicPartition(changelogName, 0), new OffsetAndMetadata(0L));
    consumer.commitSync(committedOffsets);

    restoreStateConsumer.updatePartitions(changelogName, Utils.mkList(
            new PartitionInfo(changelogName, 0, Node.noNode(), new Node[0], new Node[0])));
    final KStreamBuilder builder = new KStreamBuilder();
    builder.stream("topic").groupByKey().count("my-store");
    final ProcessorTopology topology = builder.setApplicationId(applicationId).build(0);
    StreamsConfig config = createConfig(baseDir);

    new StandbyTask(taskId, applicationId, partitions, topology, consumer, changelogReader, config,
        new MockStreamsMetrics(new Metrics()), stateDirectory);
}
项目:kafka-0.11.0.0-src-with-comment    文件:FetcherTest.java   
@Test
public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() {
    subscriptions.assignFromUser(singleton(tp1));
    subscriptions.committed(tp1, new OffsetAndMetadata(0));
    subscriptions.pause(tp1); // paused partition does not have a valid position
    subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST);

    client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
                           listOffsetResponse(Errors.NONE, 1L, 10L));
    fetcher.updateFetchPositions(singleton(tp1));

    assertFalse(subscriptions.isOffsetResetNeeded(tp1));
    assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused
    assertTrue(subscriptions.hasValidPosition(tp1));
    assertEquals(10, subscriptions.position(tp1).longValue());
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinator.java   
/**
 * Refresh the committed offsets for provided partitions.
 */
// 发送offsetFetchRequest请求从服务端拉取最近提交的offset集合,更新到Subcriptions中
// 更新分区状态中已提交的偏移量
public void refreshCommittedOffsetsIfNeeded() {
    if (subscriptions.refreshCommitsNeeded()) {
        //ConsumerNetworkClient send方法
        Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions());
        // 处理offset请求
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            TopicPartition tp = entry.getKey();
            // verify assignment is still active
            if (subscriptions.isAssigned(tp))
                //提交offset
                this.subscriptions.committed(tp, entry.getValue());
        }
        this.subscriptions.commitsRefreshed();
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinator.java   
/**
 * Fetch the current committed offsets from the coordinator for a set of partitions.
 * @param partitions The partitions to fetch offsets for
 * @return A map from partition to the committed offset
 */
public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) {
    while (true) {
        // 检测GroupCoordinator的状态
        ensureCoordinatorReady();

        // contact coordinator to fetch committed offsets
        // 创建并缓存OffsetFetchRequest请求
        RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions);
        // 阻塞发送request
        client.poll(future);

        if (future.succeeded())
            return future.value();

        if (!future.isRetriable())
            throw future.exception();
        // 退避一段时间
        time.sleep(retryBackoffMs);
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinator.java   
private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
    this.subscriptions.needRefreshCommits();
    //rebalance
    RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
    final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
    future.addListener(new RequestFutureListener<Void>() {
        @Override
        public void onSuccess(Void value) {
            if (interceptors != null)
                interceptors.onCommit(offsets);

            completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
        }

        @Override
        public void onFailure(RuntimeException e) {
            Exception commitException = e;

            if (e instanceof RetriableException)
                commitException = RetriableCommitFailedException.withUnderlyingMessage(e.getMessage());

            completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
        }
    });
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinator.java   
private void doAutoCommitOffsetsAsync() {
    Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
    log.debug("Sending asynchronous auto-commit of offsets {} for group {}", allConsumedOffsets, groupId);
    //rebalance
    commitOffsetsAsync(allConsumedOffsets, new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (exception != null) {
                log.warn("Auto-commit of offsets {} failed for group {}: {}", offsets, groupId,
                        exception.getMessage());
                if (exception instanceof RetriableException)
                    nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline);
            } else {
                log.debug("Completed auto-commit of offsets {} for group {}", offsets, groupId);
            }
        }
    });
}
项目:kafka-0.11.0.0-src-with-comment    文件:FetcherTest.java   
@Test
public void testUpdateFetchPositionOfPausedPartitionsWithoutAValidPosition() {
    subscriptions.assignFromUser(singleton(tp1));
    subscriptions.committed(tp1, new OffsetAndMetadata(0));
    subscriptions.pause(tp1); // paused partition does not have a valid position
    subscriptions.seek(tp1, 10);

    fetcher.updateFetchPositions(singleton(tp1));

    assertFalse(subscriptions.isOffsetResetNeeded(tp1));
    assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused
    assertTrue(subscriptions.hasValidPosition(tp1));
    assertEquals(10, subscriptions.position(tp1).longValue());
}
项目:SkyEye    文件:KafkaOffsetCommitCallback.java   
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
    if (null != exception) {
        // 如果异步提交发生了异常
        LOGGER.error("commit failed for offsets {}, and exception is {}", offsets, exception);
    }
}
项目:kafka-connect-mq-sink    文件:MQSinkTask.java   
/**
 * Flush all records that have been {@link #put(Collection)} for the specified topic-partitions.
 *
 * @param currentOffsets the current offset state as of the last call to {@link #put(Collection)}},
 *                       provided for convenience but could also be determined by tracking all offsets included in the {@link SinkRecord}s
 *                       passed to {@link #put}.
 */
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry: currentOffsets.entrySet()) {
        TopicPartition tp = entry.getKey();
        OffsetAndMetadata om = entry.getValue();
        log.trace("Flushing up to topic {}, partition {} and offset {}", tp.topic(), tp.partition(), om.offset());
    }

    writer.commit();
}
项目:kafka-0.11.0.0-src-with-comment    文件:FetcherTest.java   
@Test
public void testUpdateFetchPositionOfPausedPartitionsWithAValidPosition() {
    subscriptions.assignFromUser(singleton(tp1));
    subscriptions.committed(tp1, new OffsetAndMetadata(0));
    subscriptions.seek(tp1, 10);
    subscriptions.pause(tp1); // paused partition already has a valid position

    fetcher.updateFetchPositions(singleton(tp1));

    assertFalse(subscriptions.isOffsetResetNeeded(tp1));
    assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused
    assertTrue(subscriptions.hasValidPosition(tp1));
    assertEquals(10, subscriptions.position(tp1).longValue());
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinatorTest.java   
@Test
public void testCommitOffsetAsyncWithDefaultCallback() {
    int invokedBeforeTest = mockOffsetCommitCallback.invoked;
    client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
    coordinator.ensureCoordinatorReady();
    client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
    coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback);
    coordinator.invokeCompletedOffsetCommitCallbacks();
    assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
    assertNull(mockOffsetCommitCallback.exception);
}
项目:ja-micro    文件:Consumer.java   
private void checkIfRefreshCommitRequired() {
    // Here's the issue:
    // The retention of __consumer_offsets is less than most topics itself, so we need to re-commit regularly to keep the
    // last committed offset per consumer group. This is especially an issue in cases were we have bursty / little traffic.

    Map<TopicPartition, OffsetAndMetadata> commitOffsets = new HashMap<>();
    long now = System.currentTimeMillis();

    if (nextCommitRefreshRequiredTimestamp < now) {
        nextCommitRefreshRequiredTimestamp = now + COMMIT_REFRESH_INTERVAL_MILLIS;

        for (PartitionProcessor processor : partitions.allProcessors()) {
            TopicPartition assignedPartition = processor.getAssignedPartition();
            long lastCommittedOffset = processor.getLastCommittedOffset();

            // We haven't committed from this partiton yet
            if (lastCommittedOffset < 0) {
                OffsetAndMetadata offset = kafka.committed(assignedPartition);
                if (offset == null) {
                    // there was no commit on this partition at all
                    continue;
                }
                lastCommittedOffset = offset.offset();
                processor.forceSetLastCommittedOffset(lastCommittedOffset);
            }


            commitOffsets.put(assignedPartition, new OffsetAndMetadata(lastCommittedOffset));
        }


        kafka.commitSync(commitOffsets);

        logger.info("Refreshing last committed offset {}", commitOffsets);
    }


}
项目:kafka-0.11.0.0-src-with-comment    文件:TransactionManagerTest.java   
@Test
public void testGroupAuthorizationFailureInFindCoordinator() {
    final String consumerGroupId = "consumer";
    final long pid = 13131L;
    final short epoch = 1;

    doInitTransactions(pid, epoch);

    transactionManager.beginTransaction();
    TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction(
            singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(39L)), consumerGroupId);

    prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch);
    sender.run(time.milliseconds());  // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued
    sender.run(time.milliseconds());  // FindCoordinator Enqueued

    prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, false, CoordinatorType.GROUP, consumerGroupId);
    sender.run(time.milliseconds());  // FindCoordinator Failed
    sender.run(time.milliseconds());  // TxnOffsetCommit Aborted
    assertTrue(transactionManager.hasError());
    assertTrue(transactionManager.lastError() instanceof GroupAuthorizationException);
    assertTrue(sendOffsetsResult.isCompleted());
    assertFalse(sendOffsetsResult.isSuccessful());
    assertTrue(sendOffsetsResult.error() instanceof GroupAuthorizationException);

    GroupAuthorizationException exception = (GroupAuthorizationException) sendOffsetsResult.error();
    assertEquals(consumerGroupId, exception.groupId());

    assertAbortableError(GroupAuthorizationException.class);
}
项目:ja-micro    文件:KafkaSubscriberTest.java   
@Test
public void subscriberLosesPartitionAssignment() {
    KafkaSubscriber<String> subscriber = new KafkaSubscriber<>(new MessageCallback(),
            "topic", "groupId", false,
            KafkaSubscriber.OffsetReset.Earliest, 1, 1, 1,
            5000, 5000);
    KafkaTopicInfo message1 = new KafkaTopicInfo("topic", 0, 1, null);
    KafkaTopicInfo message2 = new KafkaTopicInfo("topic", 0, 2, null);
    KafkaTopicInfo message3 = new KafkaTopicInfo("topic", 1, 1, null);
    KafkaTopicInfo message4 = new KafkaTopicInfo("topic", 1, 2, null);
    subscriber.consume(message1);
    subscriber.consume(message2);
    subscriber.consume(message3);
    subscriber.consume(message4);
    KafkaConsumer realConsumer = mock(KafkaConsumer.class);
    class ArgMatcher implements ArgumentMatcher<Map<TopicPartition, OffsetAndMetadata>> {
        @Override
        public boolean matches(Map<TopicPartition, OffsetAndMetadata> arg) {
            OffsetAndMetadata oam = arg.values().iterator().next();
            return oam.offset() == 3;
        }
    }
    doThrow(new CommitFailedException()).when(realConsumer).commitSync(argThat(new ArgMatcher()));
    subscriber.realConsumer = realConsumer;
    subscriber.offsetCommitter = new OffsetCommitter(realConsumer, Clock.systemUTC());
    subscriber.consumeMessages();
}
项目:ja-micro    文件:OffsetCommitterTest.java   
@Test
public void offsetGetsCommitted() throws Exception {
    committer.offsetCommitted(Collections.singletonMap(new TopicPartition("a", 1),
            new OffsetAndMetadata(333L)));
    committer.recommitOffsets();
    verifyNoMoreInteractions(consumer);
    when(clock.instant()).thenReturn(Instant.now().plus(OffsetCommitter.IDLE_DURATION).plusMillis(1000));
    committer.recommitOffsets();
    //now it's time to work...
    verify(consumer, times(1)).commitSync(any(Map.class));
}
项目:flume-release-1.7.0    文件:KafkaSource.java   
private void migrateOffsets(String topicStr) {
  ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT,
      JaasUtils.isZkSecurityEnabled());
  KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaProps);
  try {
    Map<TopicPartition, OffsetAndMetadata> kafkaOffsets =
        getKafkaOffsets(consumer, topicStr);
    if (!kafkaOffsets.isEmpty()) {
      log.info("Found Kafka offsets for topic " + topicStr +
          ". Will not migrate from zookeeper");
      log.debug("Offsets found: {}", kafkaOffsets);
      return;
    }

    log.info("No Kafka offsets found. Migrating zookeeper offsets");
    Map<TopicPartition, OffsetAndMetadata> zookeeperOffsets =
        getZookeeperOffsets(zkUtils, topicStr);
    if (zookeeperOffsets.isEmpty()) {
      log.warn("No offsets to migrate found in Zookeeper");
      return;
    }

    log.info("Committing Zookeeper offsets to Kafka");
    log.debug("Offsets to commit: {}", zookeeperOffsets);
    consumer.commitSync(zookeeperOffsets);
    // Read the offsets to verify they were committed
    Map<TopicPartition, OffsetAndMetadata> newKafkaOffsets =
        getKafkaOffsets(consumer, topicStr);
    log.debug("Offsets committed: {}", newKafkaOffsets);
    if (!newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet())) {
      throw new FlumeException("Offsets could not be committed");
    }
  } finally {
    zkUtils.close();
    consumer.close();
  }
}
项目:flume-release-1.7.0    文件:KafkaSource.java   
private Map<TopicPartition, OffsetAndMetadata> getKafkaOffsets(
    KafkaConsumer<String, byte[]> client, String topicStr) {
  Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
  List<PartitionInfo> partitions = client.partitionsFor(topicStr);
  for (PartitionInfo partition : partitions) {
    TopicPartition key = new TopicPartition(topicStr, partition.partition());
    OffsetAndMetadata offsetAndMetadata = client.committed(key);
    if (offsetAndMetadata != null) {
      offsets.put(key, offsetAndMetadata);
    }
  }
  return offsets;
}
项目:flume-release-1.7.0    文件:KafkaChannel.java   
private synchronized ConsumerAndRecords createConsumerAndRecords() {
  try {
    KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(consumerProps);
    ConsumerAndRecords car = new ConsumerAndRecords(consumer, channelUUID);
    logger.info("Created new consumer to connect to Kafka");
    car.consumer.subscribe(Arrays.asList(topic.get()),
                           new ChannelRebalanceListener(rebalanceFlag));
    car.offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
    consumers.add(car);
    return car;
  } catch (Exception e) {
    throw new FlumeException("Unable to connect to Kafka", e);
  }
}
项目:flume-release-1.7.0    文件:KafkaChannel.java   
private void migrateOffsets() {
  ZkUtils zkUtils = ZkUtils.apply(zookeeperConnect, ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT,
      JaasUtils.isZkSecurityEnabled());
  KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
  try {
    Map<TopicPartition, OffsetAndMetadata> kafkaOffsets = getKafkaOffsets(consumer);
    if (!kafkaOffsets.isEmpty()) {
      logger.info("Found Kafka offsets for topic " + topicStr +
          ". Will not migrate from zookeeper");
      logger.debug("Offsets found: {}", kafkaOffsets);
      return;
    }

    logger.info("No Kafka offsets found. Migrating zookeeper offsets");
    Map<TopicPartition, OffsetAndMetadata> zookeeperOffsets = getZookeeperOffsets(zkUtils);
    if (zookeeperOffsets.isEmpty()) {
      logger.warn("No offsets to migrate found in Zookeeper");
      return;
    }

    logger.info("Committing Zookeeper offsets to Kafka");
    logger.debug("Offsets to commit: {}", zookeeperOffsets);
    consumer.commitSync(zookeeperOffsets);
    // Read the offsets to verify they were committed
    Map<TopicPartition, OffsetAndMetadata> newKafkaOffsets = getKafkaOffsets(consumer);
    logger.debug("Offsets committed: {}", newKafkaOffsets);
    if (!newKafkaOffsets.keySet().containsAll(zookeeperOffsets.keySet())) {
      throw new FlumeException("Offsets could not be committed");
    }
  } finally {
    zkUtils.close();
    consumer.close();
  }
}
项目:flume-release-1.7.0    文件:KafkaChannel.java   
private Map<TopicPartition, OffsetAndMetadata> getKafkaOffsets(
    KafkaConsumer<String, byte[]> client) {
  Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
  List<PartitionInfo> partitions = client.partitionsFor(topicStr);
  for (PartitionInfo partition : partitions) {
    TopicPartition key = new TopicPartition(topicStr, partition.partition());
    OffsetAndMetadata offsetAndMetadata = client.committed(key);
    if (offsetAndMetadata != null) {
      offsets.put(key, offsetAndMetadata);
    }
  }
  return offsets;
}
项目:wngn-jms-kafka    文件:ComsumerDemo3.java   
public static void main(String[] args) {
    Map<String, Object> configs = new HashMap<String, Object>();
    // bootstrap.servers指定一个或多个broker,不用指定全部的broker,它将自动发现集群中的其余的borker。
    configs.put("bootstrap.servers", "192.168.0.107:9092,192.168.0.108:9092,192.168.0.109:9092");
    configs.put("group.id", "kafka-test");
    // 是否自动确认offset
    configs.put("enable.auto.commit", "false");
    // 自动确认offset的时间间隔
    configs.put("auto.commit.interval.ms", "1000");
    configs.put("session.timeout.ms", "30000");

    configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
    // 消费者订阅的topic, 可同时订阅多个
    consumer.subscribe(Arrays.asList("kafka-test"));

    final int minBatchSize = 200;
    List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
            for (ConsumerRecord<String, String> record : partitionRecords) {
                System.out.println(record.offset() + ": " + record.value());
            }
            /* 同步确认某个分区的特定offset */
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
        }
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:MockProducerTest.java   
@Test
public void shouldThrowOnNullConsumerGroupIdWhenSendOffsetsToTransaction() {
    producer.initTransactions();
    producer.beginTransaction();

    try {
        producer.sendOffsetsToTransaction(Collections.<TopicPartition, OffsetAndMetadata>emptyMap(), null);
        fail("Should have thrown NullPointerException");
    } catch (NullPointerException e) { }
}