@Override public void seekToCommitted(Collection<TopicPartition> partitions) { for (TopicPartition tp : partitions) { OffsetAndMetadata offsetAndMetadata = _kafkaConsumer.committed(tp); if (offsetAndMetadata == null) { throw new NoOffsetForPartitionException(tp); } _kafkaConsumer.seek(tp, offsetAndMetadata.offset()); _consumerRecordsProcessor.clear(tp); Long hw = LiKafkaClientsUtils.offsetFromWrappedMetadata(offsetAndMetadata.metadata()); if (hw == null) { hw = offsetAndMetadata.offset(); } _consumerRecordsProcessor.setPartitionConsumerHighWaterMark(tp, hw); } }
/** * Reset offsets for the given partition using the offset reset strategy. * * @param partition The given partition that needs reset offset * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined */ private void resetOffset(TopicPartition partition) { OffsetResetStrategy strategy = subscriptions.resetStrategy(partition); final long timestamp; if (strategy == OffsetResetStrategy.EARLIEST) timestamp = ListOffsetRequest.EARLIEST_TIMESTAMP; else if (strategy == OffsetResetStrategy.LATEST) timestamp = ListOffsetRequest.LATEST_TIMESTAMP; else throw new NoOffsetForPartitionException(partition); log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase(Locale.ROOT)); long offset = listOffset(partition, timestamp); // we might lose the assignment while fetching the offset, so check it is still active if (subscriptions.isAssigned(partition)) this.subscriptions.seek(partition, offset); }
@Override public long position(TopicPartition partition) { // Not handling large message here. The position will be actual position. try { return _kafkaConsumer.position(partition); } catch (OffsetOutOfRangeException | NoOffsetForPartitionException oe) { switch (_offsetResetStrategy) { case EARLIEST: oe.partitions().forEach(_consumerRecordsProcessor::clear); _kafkaConsumer.seekToBeginning(oe.partitions()); return position(partition); case LATEST: oe.partitions().forEach(_consumerRecordsProcessor::clear); _kafkaConsumer.seekToEnd(oe.partitions()); return position(partition); default: throw oe; } } }
protected void handleNoOffsetForPartitionException(NoOffsetForPartitionException e, AbstractKafkaConsumer consumer) { // if initialOffset is set to EARLIST or LATEST // and the application is run as first time // then there is no existing committed offset and this error will be caught // we need to seek to either beginning or end of the partition // based on the initial offset setting AbstractKafkaInputOperator.InitialOffset io = AbstractKafkaInputOperator.InitialOffset.valueOf(ownerOperator.getInitialOffset()); if (io == AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_EARLIEST || io == AbstractKafkaInputOperator.InitialOffset.EARLIEST) { consumer.seekToBeginning(e.partitions().toArray(new TopicPartition[0])); } else { consumer.seekToEnd(e.partitions().toArray(new TopicPartition[0])); } }
@Test public void testUpdateFetchPositionsNoneCommittedNoResetStrategy() { Set<TopicPartition> tps = new HashSet<>(Arrays.asList(tp1, tp2)); subscriptionsNoAutoReset.assignFromUser(tps); try { fetcherNoAutoReset.updateFetchPositions(tps); fail("Should have thrown NoOffsetForPartitionException"); } catch (NoOffsetForPartitionException e) { // we expect the exception to be thrown for both TPs at the same time Set<TopicPartition> partitions = e.partitions(); assertEquals(tps, partitions); } }
@Test public void testPosition() { String topic = "testSeek"; TopicPartition tp = new TopicPartition(topic, 0); TopicPartition tp1 = new TopicPartition(topic, 1); produceSyntheticMessages(topic); // Reset to earliest Properties props = new Properties(); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPosition1"); try (LiKafkaConsumer<String, String> consumer = createConsumer(props)) { consumer.assign(Arrays.asList(tp, tp1)); assertEquals(0, consumer.position(tp)); } // Reset to latest props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPosition2"); try (LiKafkaConsumer<String, String> consumer = createConsumer(props)) { consumer.assign(Arrays.asList(tp, tp1)); assertEquals(consumer.position(tp), 10); } props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPosition3"); try (LiKafkaConsumer<String, String> consumer = createConsumer(props)) { consumer.assign(Arrays.asList(tp, tp1)); consumer.position(tp); fail("Should have thrown NoOffsetForPartitionException"); } catch (NoOffsetForPartitionException nofpe) { // let it go. } }
@Test @SuppressWarnings("unchecked") public void whenReadEventsThenNakadiException() { // ARRANGE // final ImmutableList<RuntimeException> exceptions = ImmutableList.of(new NoOffsetForPartitionException( new TopicPartition("", 0)), new KafkaException()); int numberOfNakadiExceptions = 0; for (final Exception exception : exceptions) { final KafkaConsumer<byte[], byte[]> kafkaConsumerMock = mock(KafkaConsumer.class); when(kafkaConsumerMock.poll(POLL_TIMEOUT)).thenThrow(exception); try { // ACT // final NakadiKafkaConsumer consumer = new NakadiKafkaConsumer(kafkaConsumerMock, ImmutableList.of(), createTpTimelineMap(), POLL_TIMEOUT); consumer.readEvents(); // ASSERT // fail("An Exception was expected to be be thrown"); } catch (final Exception e) { numberOfNakadiExceptions++; } } assertThat("We should get a NakadiException for every call", numberOfNakadiExceptions, equalTo(exceptions.size())); }
@Override public ConsumerRecords<K, V> poll(long timeout) { long startMs = System.currentTimeMillis(); ConsumerRecords<K, V> processedRecords; // We will keep polling until timeout. long now = startMs; long expireMs = startMs + timeout; do { if (_autoCommitEnabled && now > _lastAutoCommitMs + _autoCommitInterval) { commitAsync(); _lastAutoCommitMs = now; } ConsumerRecords<byte[], byte[]> rawRecords = ConsumerRecords.empty(); try { rawRecords = _kafkaConsumer.poll(expireMs - now); } catch (OffsetOutOfRangeException | NoOffsetForPartitionException oe) { switch (_offsetResetStrategy) { case EARLIEST: _kafkaConsumer.seekToBeginning(oe.partitions()); oe.partitions().forEach(_consumerRecordsProcessor::clear); break; case LATEST: _kafkaConsumer.seekToEnd(oe.partitions()); oe.partitions().forEach(_consumerRecordsProcessor::clear); break; default: throw oe; } } // Check if we have enough high watermark for a partition. The high watermark is cleared during rebalance. // We make this check so that after rebalance we do not deliver duplicate messages to the user. if (!rawRecords.isEmpty() && _consumerRecordsProcessor.numConsumerHighWaterMarks() < assignment().size()) { for (TopicPartition tp : rawRecords.partitions()) { if (_consumerRecordsProcessor.consumerHighWaterMarkForPartition(tp) == null) { OffsetAndMetadata offsetAndMetadata = committed(tp); if (offsetAndMetadata != null) { long hw = offsetAndMetadata.offset(); _consumerRecordsProcessor.setPartitionConsumerHighWaterMark(tp, hw); } } } } processedRecords = _consumerRecordsProcessor.process(rawRecords); now = System.currentTimeMillis(); } while (processedRecords.isEmpty() && now < startMs + timeout); return processedRecords; }
@Override public ConsumerRecords<K, V> poll(long timeout) { if (_lastProcessedResult != null && _lastProcessedResult.exception() != null) { ConsumerRecordsProcessingException e = _lastProcessedResult.exception(); _lastProcessedResult = null; throw e; } long startMs = System.currentTimeMillis(); ConsumerRecords<K, V> processedRecords; // We will keep polling until timeout. long now = startMs; long expireMs = startMs + timeout; do { if (_autoCommitEnabled && now > _lastAutoCommitMs + _autoCommitInterval) { commitAsync(); _lastAutoCommitMs = now; } ConsumerRecords<byte[], byte[]> rawRecords = ConsumerRecords.empty(); try { rawRecords = _kafkaConsumer.poll(expireMs - now); } catch (OffsetOutOfRangeException | NoOffsetForPartitionException oe) { switch (_offsetResetStrategy) { case EARLIEST: oe.partitions().forEach(_consumerRecordsProcessor::clear); _kafkaConsumer.seekToBeginning(oe.partitions()); break; case LATEST: oe.partitions().forEach(_consumerRecordsProcessor::clear); _kafkaConsumer.seekToEnd(oe.partitions()); break; default: throw oe; } } // Check if we have enough high watermark for a partition. The high watermark is cleared during rebalance. // We make this check so that after rebalance we do not deliver duplicate messages to the user. if (!rawRecords.isEmpty() && _consumerRecordsProcessor.numConsumerHighWaterMarks() < assignment().size()) { for (TopicPartition tp : rawRecords.partitions()) { if (_consumerRecordsProcessor.consumerHighWaterMarkForPartition(tp) == null) { OffsetAndMetadata offsetAndMetadata = committed(tp); if (offsetAndMetadata != null) { long hw = offsetAndMetadata.offset(); _consumerRecordsProcessor.setPartitionConsumerHighWaterMark(tp, hw); } } } } _lastProcessedResult = _consumerRecordsProcessor.process(rawRecords); processedRecords = _lastProcessedResult.consumerRecords(); // Clear the internal reference. _lastProcessedResult.clearRecords(); // Rewind offset if there are processing exceptions. if (_lastProcessedResult.exception() != null) { for (Map.Entry<TopicPartition, Long> entry : _lastProcessedResult.resumeOffsets().entrySet()) { _kafkaConsumer.seek(entry.getKey(), entry.getValue()); } } now = System.currentTimeMillis(); } while (processedRecords.isEmpty() && now < startMs + timeout); return processedRecords; }