public void recommitOffsets() { LocalDateTime now = LocalDateTime.now(clock); if (now.isAfter(lastUpdateTime.plus(IDLE_DURATION))) { for (TopicPartition tp : offsetData.keySet()) { OffsetAndTime offsetAndTime = offsetData.get(tp); if (now.isAfter(offsetAndTime.time.plus(IDLE_DURATION))) { try { consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(offsetAndTime.offset))); } catch (CommitFailedException covfefe) { logger.info("Caught CommitFailedException attempting to commit {} {}", tp, offsetAndTime.offset); } offsetAndTime.time = now; } } lastUpdateTime = now; } }
@Test public void subscriberLosesPartitionAssignment() { KafkaSubscriber<String> subscriber = new KafkaSubscriber<>(new MessageCallback(), "topic", "groupId", false, KafkaSubscriber.OffsetReset.Earliest, 1, 1, 1, 5000, 5000); KafkaTopicInfo message1 = new KafkaTopicInfo("topic", 0, 1, null); KafkaTopicInfo message2 = new KafkaTopicInfo("topic", 0, 2, null); KafkaTopicInfo message3 = new KafkaTopicInfo("topic", 1, 1, null); KafkaTopicInfo message4 = new KafkaTopicInfo("topic", 1, 2, null); subscriber.consume(message1); subscriber.consume(message2); subscriber.consume(message3); subscriber.consume(message4); KafkaConsumer realConsumer = mock(KafkaConsumer.class); class ArgMatcher implements ArgumentMatcher<Map<TopicPartition, OffsetAndMetadata>> { @Override public boolean matches(Map<TopicPartition, OffsetAndMetadata> arg) { OffsetAndMetadata oam = arg.values().iterator().next(); return oam.offset() == 3; } } doThrow(new CommitFailedException()).when(realConsumer).commitSync(argThat(new ArgMatcher())); subscriber.realConsumer = realConsumer; subscriber.offsetCommitter = new OffsetCommitter(realConsumer, Clock.systemUTC()); subscriber.consumeMessages(); }
private void commitOffsets(final boolean startNewTransaction) { if (commitOffsetNeeded) { log.debug("{} Committing offsets", logPrefix); final Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size()); for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) { final TopicPartition partition = entry.getKey(); final long offset = entry.getValue() + 1; consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset)); stateMgr.putOffsetLimit(partition, offset); } if (eosEnabled) { producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId); producer.commitTransaction(); transactionInFlight = false; if (startNewTransaction) { transactionInFlight = true; producer.beginTransaction(); } } else { try { consumer.commitSync(consumedOffsetsAndMetadata); } catch (final CommitFailedException e) { log.warn("{} Failed offset commits {}: ", logPrefix, consumedOffsetsAndMetadata, e); throw e; } } commitOffsetNeeded = false; } else if (eosEnabled && !startNewTransaction && transactionInFlight) { // need to make sure to commit txn for suspend case producer.commitTransaction(); transactionInFlight = false; } commitRequested = false; }
@Test(expected = CommitFailedException.class) public void testCommitOffsetIllegalGeneration() { // we cannot retry if a rebalance occurs before the commit completed client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.ILLEGAL_GENERATION))); coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE); }
@Test(expected = CommitFailedException.class) public void testCommitOffsetUnknownMemberId() { // we cannot retry if a rebalance occurs before the commit completed client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.UNKNOWN_MEMBER_ID))); coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE); }
@Test(expected = CommitFailedException.class) public void testCommitOffsetRebalanceInProgress() { // we cannot retry if a rebalance occurs before the commit completed client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.REBALANCE_IN_PROGRESS))); coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE); }
@Test(expected = CommitFailedException.class) public void testCommitOffsetIllegalGeneration() { // we cannot retry if a rebalance occurs before the commit completed client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.ILLEGAL_GENERATION.code()))); coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata"))); }
@Test(expected = CommitFailedException.class) public void testCommitOffsetUnknownMemberId() { // we cannot retry if a rebalance occurs before the commit completed client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.UNKNOWN_MEMBER_ID.code()))); coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata"))); }
@Test(expected = CommitFailedException.class) public void testCommitOffsetRebalanceInProgress() { // we cannot retry if a rebalance occurs before the commit completed client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.REBALANCE_IN_PROGRESS.code()))); coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata"))); }
@Override public void commit() { synchronized (pollCommitMutex) { // While rebalancing there is no point for us to commit offset since it's not allowed operation if(rebalanceInProgress.get()) { LOG.debug("Kafka is rebalancing, not commiting offsets"); return; } if(needToCallPoll.get()) { LOG.debug("Waiting on poll to be properly called before continuing."); return; } try { if(topicPartitionToOffsetMetadataMap.isEmpty()) { LOG.debug("Skipping committing offsets since we haven't consume anything."); return; } LOG.debug("Committing offsets: {}", topicPartitionToOffsetMetadataMap.toString()); kafkaConsumer.commitSync(topicPartitionToOffsetMetadataMap); } catch(CommitFailedException ex) { LOG.warn("Can't commit offset to Kafka: {}", ex.toString(), ex); // After CommitFailedException we MUST call consumer's poll() method first needToCallPoll.set(true); // The consumer thread might be stuck on writing to the queue, so we need to clean it up to unblock that thread recordQueue.clear(); } finally { // either we've committed the offsets (so now we drop them so that we don't re-commit anything) // or CommitFailedException was thrown, in which case poll needs to be called again and they are invalid topicPartitionToOffsetMetadataMap.clear(); } } }
/** * Commit offsets for the specified list of topics and partitions. This is a non-blocking call * which returns a request future that can be polled in the case of a synchronous commit or ignored in the * asynchronous case. * * @param offsets The list of offsets per partition that should be committed. * @return A request future whose value indicates whether the commit was successful or not */ private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) { if (offsets.isEmpty()) return RequestFuture.voidSuccess(); Node coordinator = coordinator(); if (coordinator == null) return RequestFuture.coordinatorNotAvailable(); // create the offset commit request Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size()); for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) { OffsetAndMetadata offsetAndMetadata = entry.getValue(); if (offsetAndMetadata.offset() < 0) { return RequestFuture.failure(new IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset())); } offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData( offsetAndMetadata.offset(), offsetAndMetadata.metadata())); } final Generation generation; if (subscriptions.partitionsAutoAssigned()) generation = generation(); else generation = Generation.NO_GENERATION; // if the generation is null, we are not part of an active group (and we expect to be). // the only thing we can do is fail the commit and let the user rejoin the group in poll() if (generation == null) return RequestFuture.failure(new CommitFailedException()); OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(this.groupId, offsetData). setGenerationId(generation.generationId). setMemberId(generation.memberId). setRetentionTime(OffsetCommitRequest.DEFAULT_RETENTION_TIME); log.trace("Sending OffsetCommit request with {} to coordinator {} for group {}", offsets, coordinator, groupId); //compose会调用 RequestFutureAdapter 的onSuccess //OffsetCommitResponseHandler extends CoordinatorResponseHandle //调用CoordinatorResponseHandle的onSuccess方法 //JoinGroupResponseHandler调用其handle方法并调用onJoinLeader //在onJoinLeader中调用performAssignment方法 //performAssignment 在本类中实现 return client.send(coordinator, builder) .compose(new OffsetCommitResponseHandler(offsets)); }
@Override public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) { sensors.commitLatency.record(response.requestLatencyMs()); Set<String> unauthorizedTopics = new HashSet<>(); for (Map.Entry<TopicPartition, Errors> entry : commitResponse.responseData().entrySet()) { TopicPartition tp = entry.getKey(); OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp); long offset = offsetAndMetadata.offset(); Errors error = entry.getValue(); if (error == Errors.NONE) { log.debug("Group {} committed offset {} for partition {}", groupId, offset, tp); if (subscriptions.isAssigned(tp)) // update the local cache only if the partition is still assigned subscriptions.committed(tp, offsetAndMetadata); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { log.error("Not authorized to commit offsets for group {}", groupId); future.raise(new GroupAuthorizationException(groupId)); return; } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { unauthorizedTopics.add(tp.topic()); } else if (error == Errors.OFFSET_METADATA_TOO_LARGE || error == Errors.INVALID_COMMIT_OFFSET_SIZE) { // raise the error to the user log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message()); future.raise(error); return; } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) { // just retry log.debug("Offset commit for group {} failed: {}", groupId, error.message()); future.raise(error); return; } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR || error == Errors.REQUEST_TIMED_OUT) { log.debug("Offset commit for group {} failed: {}", groupId, error.message()); coordinatorDead(); future.raise(error); return; } else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION || error == Errors.REBALANCE_IN_PROGRESS) { // need to re-join group log.debug("Offset commit for group {} failed: {}", groupId, error.message()); resetGeneration(); future.raise(new CommitFailedException()); return; } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message()); future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic")); return; } else { log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message()); future.raise(new KafkaException("Unexpected error in commit: " + error.message())); return; } } if (!unauthorizedTopics.isEmpty()) { log.error("Not authorized to commit to topics {} for group {}", unauthorizedTopics, groupId); future.raise(new TopicAuthorizationException(unauthorizedTopics)); } else { future.complete(null); } }
@Override public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) { sensors.commitLatency.record(response.requestLatencyMs()); Set<String> unauthorizedTopics = new HashSet<>(); for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) { TopicPartition tp = entry.getKey(); OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp); long offset = offsetAndMetadata.offset(); Errors error = Errors.forCode(entry.getValue()); if (error == Errors.NONE) { log.debug("Group {} committed offset {} for partition {}", groupId, offset, tp); if (subscriptions.isAssigned(tp)) // update the local cache only if the partition is still assigned subscriptions.committed(tp, offsetAndMetadata); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { log.error("Not authorized to commit offsets for group {}", groupId); future.raise(new GroupAuthorizationException(groupId)); return; } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { unauthorizedTopics.add(tp.topic()); } else if (error == Errors.OFFSET_METADATA_TOO_LARGE || error == Errors.INVALID_COMMIT_OFFSET_SIZE) { // raise the error to the user log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message()); future.raise(error); return; } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) { // just retry log.debug("Offset commit for group {} failed: {}", groupId, error.message()); future.raise(error); return; } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP || error == Errors.REQUEST_TIMED_OUT) { log.debug("Offset commit for group {} failed: {}", groupId, error.message()); coordinatorDead(); future.raise(error); return; } else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION || error == Errors.REBALANCE_IN_PROGRESS) { // need to re-join group log.debug("Offset commit for group {} failed: {}", groupId, error.message()); subscriptions.needReassignment(); future.raise(new CommitFailedException("Commit cannot be completed since the group has already " + "rebalanced and assigned the partitions to another member. This means that the time " + "between subsequent calls to poll() was longer than the configured session.timeout.ms, " + "which typically implies that the poll loop is spending too much time message processing. " + "You can address this either by increasing the session timeout or by reducing the maximum " + "size of batches returned in poll() with max.poll.records.")); return; } else { log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message()); future.raise(new KafkaException("Unexpected error in commit: " + error.message())); return; } } if (!unauthorizedTopics.isEmpty()) { log.error("Not authorized to commit to topics {} for group {}", unauthorizedTopics, groupId); future.raise(new TopicAuthorizationException(unauthorizedTopics)); } else { future.complete(null); } }