Java 类org.apache.kafka.clients.consumer.CommitFailedException 实例源码

项目:ja-micro    文件:OffsetCommitter.java   
public void recommitOffsets() {
    LocalDateTime now = LocalDateTime.now(clock);
    if (now.isAfter(lastUpdateTime.plus(IDLE_DURATION))) {
        for (TopicPartition tp : offsetData.keySet()) {
            OffsetAndTime offsetAndTime = offsetData.get(tp);
            if (now.isAfter(offsetAndTime.time.plus(IDLE_DURATION))) {
                try {
                    consumer.commitSync(Collections.singletonMap(tp,
                            new OffsetAndMetadata(offsetAndTime.offset)));
                } catch (CommitFailedException covfefe) {
                    logger.info("Caught CommitFailedException attempting to commit {} {}",
                            tp, offsetAndTime.offset);
                }
                offsetAndTime.time = now;
            }
        }
        lastUpdateTime = now;
    }
}
项目:ja-micro    文件:KafkaSubscriberTest.java   
@Test
public void subscriberLosesPartitionAssignment() {
    KafkaSubscriber<String> subscriber = new KafkaSubscriber<>(new MessageCallback(),
            "topic", "groupId", false,
            KafkaSubscriber.OffsetReset.Earliest, 1, 1, 1,
            5000, 5000);
    KafkaTopicInfo message1 = new KafkaTopicInfo("topic", 0, 1, null);
    KafkaTopicInfo message2 = new KafkaTopicInfo("topic", 0, 2, null);
    KafkaTopicInfo message3 = new KafkaTopicInfo("topic", 1, 1, null);
    KafkaTopicInfo message4 = new KafkaTopicInfo("topic", 1, 2, null);
    subscriber.consume(message1);
    subscriber.consume(message2);
    subscriber.consume(message3);
    subscriber.consume(message4);
    KafkaConsumer realConsumer = mock(KafkaConsumer.class);
    class ArgMatcher implements ArgumentMatcher<Map<TopicPartition, OffsetAndMetadata>> {
        @Override
        public boolean matches(Map<TopicPartition, OffsetAndMetadata> arg) {
            OffsetAndMetadata oam = arg.values().iterator().next();
            return oam.offset() == 3;
        }
    }
    doThrow(new CommitFailedException()).when(realConsumer).commitSync(argThat(new ArgMatcher()));
    subscriber.realConsumer = realConsumer;
    subscriber.offsetCommitter = new OffsetCommitter(realConsumer, Clock.systemUTC());
    subscriber.consumeMessages();
}
项目:kafka-0.11.0.0-src-with-comment    文件:StreamTask.java   
private void commitOffsets(final boolean startNewTransaction) {
    if (commitOffsetNeeded) {
        log.debug("{} Committing offsets", logPrefix);
        final Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size());
        for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
            final TopicPartition partition = entry.getKey();
            final long offset = entry.getValue() + 1;
            consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset));
            stateMgr.putOffsetLimit(partition, offset);
        }

        if (eosEnabled) {
            producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId);
            producer.commitTransaction();
            transactionInFlight = false;
            if (startNewTransaction) {
                transactionInFlight = true;
                producer.beginTransaction();
            }
        } else {
            try {
                consumer.commitSync(consumedOffsetsAndMetadata);
            } catch (final CommitFailedException e) {
                log.warn("{} Failed offset commits {}: ", logPrefix, consumedOffsetsAndMetadata, e);
                throw e;
            }
        }
        commitOffsetNeeded = false;
    } else if (eosEnabled && !startNewTransaction && transactionInFlight) { // need to make sure to commit txn for suspend case
        producer.commitTransaction();
        transactionInFlight = false;
    }

    commitRequested = false;
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinatorTest.java   
@Test(expected = CommitFailedException.class)
public void testCommitOffsetIllegalGeneration() {
    // we cannot retry if a rebalance occurs before the commit completed
    client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
    coordinator.ensureCoordinatorReady();

    client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.ILLEGAL_GENERATION)));
    coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinatorTest.java   
@Test(expected = CommitFailedException.class)
public void testCommitOffsetUnknownMemberId() {
    // we cannot retry if a rebalance occurs before the commit completed
    client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
    coordinator.ensureCoordinatorReady();

    client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.UNKNOWN_MEMBER_ID)));
    coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinatorTest.java   
@Test(expected = CommitFailedException.class)
public void testCommitOffsetRebalanceInProgress() {
    // we cannot retry if a rebalance occurs before the commit completed
    client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
    coordinator.ensureCoordinatorReady();

    client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.REBALANCE_IN_PROGRESS)));
    coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
}
项目:kafka    文件:ConsumerCoordinatorTest.java   
@Test(expected = CommitFailedException.class)
public void testCommitOffsetIllegalGeneration() {
    // we cannot retry if a rebalance occurs before the commit completed
    client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
    coordinator.ensureCoordinatorReady();

    client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.ILLEGAL_GENERATION.code())));
    coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
}
项目:kafka    文件:ConsumerCoordinatorTest.java   
@Test(expected = CommitFailedException.class)
public void testCommitOffsetUnknownMemberId() {
    // we cannot retry if a rebalance occurs before the commit completed
    client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
    coordinator.ensureCoordinatorReady();

    client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.UNKNOWN_MEMBER_ID.code())));
    coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
}
项目:kafka    文件:ConsumerCoordinatorTest.java   
@Test(expected = CommitFailedException.class)
public void testCommitOffsetRebalanceInProgress() {
    // we cannot retry if a rebalance occurs before the commit completed
    client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
    coordinator.ensureCoordinatorReady();

    client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.REBALANCE_IN_PROGRESS.code())));
    coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
}
项目:datacollector    文件:BaseKafkaConsumer09.java   
@Override
public void commit() {
  synchronized (pollCommitMutex) {
    // While rebalancing there is no point for us to commit offset since it's not allowed operation
    if(rebalanceInProgress.get()) {
      LOG.debug("Kafka is rebalancing, not commiting offsets");
      return;
    }

    if(needToCallPoll.get()) {
      LOG.debug("Waiting on poll to be properly called before continuing.");
      return;
    }

    try {
      if(topicPartitionToOffsetMetadataMap.isEmpty()) {
        LOG.debug("Skipping committing offsets since we haven't consume anything.");
        return;
      }

      LOG.debug("Committing offsets: {}", topicPartitionToOffsetMetadataMap.toString());
      kafkaConsumer.commitSync(topicPartitionToOffsetMetadataMap);
    } catch(CommitFailedException ex) {
      LOG.warn("Can't commit offset to Kafka: {}", ex.toString(), ex);
      // After CommitFailedException we MUST call consumer's poll() method first
      needToCallPoll.set(true);
      // The consumer thread might be stuck on writing to the queue, so we need to clean it up to unblock that thread
      recordQueue.clear();
    } finally {
      // either we've committed the offsets (so now we drop them so that we don't re-commit anything)
      // or CommitFailedException was thrown, in which case poll needs to be called again and they are invalid
      topicPartitionToOffsetMetadataMap.clear();
    }
  }
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinator.java   
/**
 * Commit offsets for the specified list of topics and partitions. This is a non-blocking call
 * which returns a request future that can be polled in the case of a synchronous commit or ignored in the
 * asynchronous case.
 *
 * @param offsets The list of offsets per partition that should be committed.
 * @return A request future whose value indicates whether the commit was successful or not
 */
private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
    if (offsets.isEmpty())
        return RequestFuture.voidSuccess();

    Node coordinator = coordinator();
    if (coordinator == null)
        return RequestFuture.coordinatorNotAvailable();

    // create the offset commit request
    Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
    for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
        OffsetAndMetadata offsetAndMetadata = entry.getValue();
        if (offsetAndMetadata.offset() < 0) {
            return RequestFuture.failure(new IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset()));
        }
        offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(
                offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
    }

    final Generation generation;
    if (subscriptions.partitionsAutoAssigned())
        generation = generation();
    else
        generation = Generation.NO_GENERATION;

    // if the generation is null, we are not part of an active group (and we expect to be).
    // the only thing we can do is fail the commit and let the user rejoin the group in poll()
    if (generation == null)
        return RequestFuture.failure(new CommitFailedException());

    OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(this.groupId, offsetData).
            setGenerationId(generation.generationId).
            setMemberId(generation.memberId).
            setRetentionTime(OffsetCommitRequest.DEFAULT_RETENTION_TIME);

    log.trace("Sending OffsetCommit request with {} to coordinator {} for group {}", offsets, coordinator, groupId);
    //compose会调用 RequestFutureAdapter 的onSuccess
    //OffsetCommitResponseHandler extends CoordinatorResponseHandle
    //调用CoordinatorResponseHandle的onSuccess方法
    //JoinGroupResponseHandler调用其handle方法并调用onJoinLeader
    //在onJoinLeader中调用performAssignment方法
    //performAssignment 在本类中实现
    return client.send(coordinator, builder)
            .compose(new OffsetCommitResponseHandler(offsets));
}
项目:kafka-0.11.0.0-src-with-comment    文件:ConsumerCoordinator.java   
@Override
public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
    sensors.commitLatency.record(response.requestLatencyMs());
    Set<String> unauthorizedTopics = new HashSet<>();

    for (Map.Entry<TopicPartition, Errors> entry : commitResponse.responseData().entrySet()) {
        TopicPartition tp = entry.getKey();
        OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
        long offset = offsetAndMetadata.offset();

        Errors error = entry.getValue();
        if (error == Errors.NONE) {
            log.debug("Group {} committed offset {} for partition {}", groupId, offset, tp);
            if (subscriptions.isAssigned(tp))
                // update the local cache only if the partition is still assigned
                subscriptions.committed(tp, offsetAndMetadata);
        } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
            log.error("Not authorized to commit offsets for group {}", groupId);
            future.raise(new GroupAuthorizationException(groupId));
            return;
        } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
            unauthorizedTopics.add(tp.topic());
        } else if (error == Errors.OFFSET_METADATA_TOO_LARGE
                || error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
            // raise the error to the user
            log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message());
            future.raise(error);
            return;
        } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
            // just retry
            log.debug("Offset commit for group {} failed: {}", groupId, error.message());
            future.raise(error);
            return;
        } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                || error == Errors.NOT_COORDINATOR
                || error == Errors.REQUEST_TIMED_OUT) {
            log.debug("Offset commit for group {} failed: {}", groupId, error.message());
            coordinatorDead();
            future.raise(error);
            return;
        } else if (error == Errors.UNKNOWN_MEMBER_ID
                || error == Errors.ILLEGAL_GENERATION
                || error == Errors.REBALANCE_IN_PROGRESS) {
            // need to re-join group
            log.debug("Offset commit for group {} failed: {}", groupId, error.message());
            resetGeneration();
            future.raise(new CommitFailedException());
            return;
        } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
            log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message());
            future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic"));
            return;
        } else {
            log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message());
            future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
            return;
        }
    }

    if (!unauthorizedTopics.isEmpty()) {
        log.error("Not authorized to commit to topics {} for group {}", unauthorizedTopics, groupId);
        future.raise(new TopicAuthorizationException(unauthorizedTopics));
    } else {
        future.complete(null);
    }
}
项目:kafka    文件:ConsumerCoordinator.java   
@Override
public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
    sensors.commitLatency.record(response.requestLatencyMs());
    Set<String> unauthorizedTopics = new HashSet<>();

    for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
        TopicPartition tp = entry.getKey();
        OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
        long offset = offsetAndMetadata.offset();

        Errors error = Errors.forCode(entry.getValue());
        if (error == Errors.NONE) {
            log.debug("Group {} committed offset {} for partition {}", groupId, offset, tp);
            if (subscriptions.isAssigned(tp))
                // update the local cache only if the partition is still assigned
                subscriptions.committed(tp, offsetAndMetadata);
        } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
            log.error("Not authorized to commit offsets for group {}", groupId);
            future.raise(new GroupAuthorizationException(groupId));
            return;
        } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
            unauthorizedTopics.add(tp.topic());
        } else if (error == Errors.OFFSET_METADATA_TOO_LARGE
                || error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
            // raise the error to the user
            log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message());
            future.raise(error);
            return;
        } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
            // just retry
            log.debug("Offset commit for group {} failed: {}", groupId, error.message());
            future.raise(error);
            return;
        } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                || error == Errors.NOT_COORDINATOR_FOR_GROUP
                || error == Errors.REQUEST_TIMED_OUT) {
            log.debug("Offset commit for group {} failed: {}", groupId, error.message());
            coordinatorDead();
            future.raise(error);
            return;
        } else if (error == Errors.UNKNOWN_MEMBER_ID
                || error == Errors.ILLEGAL_GENERATION
                || error == Errors.REBALANCE_IN_PROGRESS) {
            // need to re-join group
            log.debug("Offset commit for group {} failed: {}", groupId, error.message());
            subscriptions.needReassignment();
            future.raise(new CommitFailedException("Commit cannot be completed since the group has already " +
                    "rebalanced and assigned the partitions to another member. This means that the time " +
                    "between subsequent calls to poll() was longer than the configured session.timeout.ms, " +
                    "which typically implies that the poll loop is spending too much time message processing. " +
                    "You can address this either by increasing the session timeout or by reducing the maximum " +
                    "size of batches returned in poll() with max.poll.records."));
            return;
        } else {
            log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message());
            future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
            return;
        }
    }

    if (!unauthorizedTopics.isEmpty()) {
        log.error("Not authorized to commit to topics {} for group {}", unauthorizedTopics, groupId);
        future.raise(new TopicAuthorizationException(unauthorizedTopics));
    } else {
        future.complete(null);
    }
}