@Test public void testFetchOffsetOutOfRangeException() { subscriptionsNoAutoReset.assignFromUser(singleton(tp1)); subscriptionsNoAutoReset.seek(tp1, 0); fetcherNoAutoReset.sendFetches(); client.prepareResponse(fetchResponse(tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); consumerClient.poll(0); assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp1)); for (int i = 0; i < 2; i++) { try { fetcherNoAutoReset.fetchedRecords(); fail("Should have thrown OffsetOutOfRangeException"); } catch (OffsetOutOfRangeException e) { assertTrue(e.offsetOutOfRangePartitions().containsKey(tp1)); assertEquals(e.offsetOutOfRangePartitions().size(), 1); } } }
/** * If any partition from previous fetchResponse contains OffsetOutOfRange error and * the defaultResetPolicy is NONE, throw OffsetOutOfRangeException * * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse */ private void throwIfOffsetOutOfRange() throws OffsetOutOfRangeException { Map<TopicPartition, Long> currentOutOfRangePartitions = new HashMap<>(); // filter offsetOutOfRangePartitions to retain only the fetchable partitions for (Map.Entry<TopicPartition, Long> entry: this.offsetOutOfRangePartitions.entrySet()) { if (!subscriptions.isFetchable(entry.getKey())) { log.debug("Ignoring fetched records for {} since it is no longer fetchable", entry.getKey()); continue; } Long position = subscriptions.position(entry.getKey()); // ignore partition if the current position != the offset in fetchResponse, e.g. after seek() if (position != null && entry.getValue().equals(position)) currentOutOfRangePartitions.put(entry.getKey(), entry.getValue()); } this.offsetOutOfRangePartitions.clear(); if (!currentOutOfRangePartitions.isEmpty()) throw new OffsetOutOfRangeException(currentOutOfRangePartitions); }
@Test public void testFetchOffsetOutOfRangeException() { subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp)); subscriptionsNoAutoReset.seek(tp, 0); fetcherNoAutoReset.sendFetches(); client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0)); consumerClient.poll(0); assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp)); try { fetcherNoAutoReset.fetchedRecords(); fail("Should have thrown OffsetOutOfRangeException"); } catch (OffsetOutOfRangeException e) { assertTrue(e.offsetOutOfRangePartitions().containsKey(tp)); assertEquals(e.offsetOutOfRangePartitions().size(), 1); } assertEquals(0, fetcherNoAutoReset.fetchedRecords().size()); }
@Override public long position(TopicPartition partition) { // Not handling large message here. The position will be actual position. try { return _kafkaConsumer.position(partition); } catch (OffsetOutOfRangeException | NoOffsetForPartitionException oe) { switch (_offsetResetStrategy) { case EARLIEST: oe.partitions().forEach(_consumerRecordsProcessor::clear); _kafkaConsumer.seekToBeginning(oe.partitions()); return position(partition); case LATEST: oe.partitions().forEach(_consumerRecordsProcessor::clear); _kafkaConsumer.seekToEnd(oe.partitions()); return position(partition); default: throw oe; } } }
public static ConsumerRecords<byte[], byte[]> fetchMessages( KafkaConfig config, KafkaConsumer<byte[], byte[]> consumer, Partition partition, long offset) { String topic = (String) config._stateConf.get(Config.KAFKA_TOPIC); int partitionId = partition.partition; TopicPartition topicAndPartition = new TopicPartition (topic, partitionId); consumer.seek(topicAndPartition, offset); ConsumerRecords<byte[], byte[]> records; try { records = consumer.poll(config._fillFreqMs / 2); } catch(OffsetOutOfRangeException oore) { throw new OutOfRangeException(oore.getMessage()); } catch (Exception e) { if (e instanceof ConnectException || e instanceof SocketTimeoutException || e instanceof IOException || e instanceof UnresolvedAddressException) { LOG.warn("Network error when fetching messages:", e); throw new FailedFetchException(e); } else { throw new RuntimeException(e); } } return records; }
/** * Calling nextRecord calls fillBuffer() which has special handling for OutOfRangeExceptions. There is some * recursion in this method, but it should not go on forever and yield a StackOverflow exception. This test verifies * that when the KafkaConsumer is relentlessly throwing OutOfRangeExceptions that we stop trying to fill the buffer * after five attempts and do not have any other errors. */ @Test public void testNextRecordWithRecursiveOutOfRangeException() { final KafkaConsumer<byte[], byte[]> kafkaConsumer = Mockito.mock(KafkaConsumer.class); Mockito.when(kafkaConsumer.assignment()).thenReturn(Sets.newHashSet(new TopicPartition("Foobar", 0))); Mockito.when(kafkaConsumer.partitionsFor(topicName)).thenReturn(Arrays.asList( new PartitionInfo(topicName, 0, null, null, null) )); Mockito.when(kafkaConsumer.poll(300)).thenThrow( new OffsetOutOfRangeException(new HashMap<>()) ); final PersistenceAdapter persistenceAdapter = new InMemoryPersistenceAdapter(); persistenceAdapter.open(Maps.newHashMap()); // Create our consumer final Consumer consumer = new Consumer(kafkaConsumer); consumer.open( getDefaultConfig(topicName), getDefaultVSpoutId(), getDefaultConsumerCohortDefinition(), persistenceAdapter, new LogRecorder(), null ); final Record record = consumer.nextRecord(); assertNull(record); Mockito.verify(kafkaConsumer, Mockito.times(5)).poll(300); consumer.close(); }
/** * Internal method used to fill internal message buffer from kafka. * * Limited by the number of trips made. This should only be called from {@link #fillBuffer()}. */ private void fillBuffer(final int trips) { // If our buffer is null, or our iterator is at the end if (buffer == null || !bufferIterator.hasNext()) { // If we have no assigned partitions to consume from, then don't call poll() // The underlying consumer call here does NOT make an API call, so this is safe to call within this loop. if (getKafkaConsumer().assignment().isEmpty()) { // No assigned partitions, nothing to consume :) return; } // Time to refill the buffer try { buffer = getKafkaConsumer().poll(300); } catch (OffsetOutOfRangeException outOfRangeException) { // Handle it handleOffsetOutOfRange(outOfRangeException); // Clear out so we can attempt next time. buffer = null; bufferIterator = null; // Why 5? Because it's less than 6. if (trips >= 5) { logger.error( "Attempted to fill the buffer after an OffsetOutOfRangeException, but this was my fifth attempt so I'm bailing." ); // nextRecord() will get called by the VirtualSpout instance soon so we're not giving up, just avoiding a StackOverflow // exception on this current run of checks. return; } fillBuffer(trips + 1); return; } // Create new iterator bufferIterator = buffer.iterator(); } }
/** * This method handles when a partition seek/retrieve request was out of bounds. * This happens in two scenarios: * 1 - The offset is too old and was cleaned up / removed by the broker. * 2 - The offset just plain does not exist. * * This is particularly nasty in that if the poll() was able to pull SOME messages from * SOME partitions before the exception was thrown, those messages are considered "consumed" * by KafkaClient, and there's no way to get them w/o seeking back to them for those partitions. * * This means when we roll back, we may replay some messages :/ * * @param outOfRangeException The exception that was raised by the consumer. */ private void handleOffsetOutOfRange(final OffsetOutOfRangeException outOfRangeException) { final Set<TopicPartition> resetPartitions = Sets.newHashSet(); // Loop over all the partitions in this exception for (final TopicPartition topicPartition : outOfRangeException.offsetOutOfRangePartitions().keySet()) { // The offset that was in the error final long exceptionOffset = outOfRangeException.offsetOutOfRangePartitions().get(topicPartition); // What kafka says the last offset is final long endingOffset = getKafkaConsumer().endOffsets(Collections.singletonList(topicPartition)) .get(topicPartition); logger.warn("Offset Out of Range for partition {} at offset {}, kafka says last offset in partition is {}", topicPartition.partition(), exceptionOffset, endingOffset); // We have a hypothesis that the consumer can actually seek past the last message of the topic, // this yields this error and we want to catch it and try to back it up just a bit to a place that // we can work from. if (exceptionOffset >= endingOffset) { logger.warn( "OutOfRangeException yielded offset {}, which is past our ending offset of {} for {}", exceptionOffset, endingOffset, topicPartition ); // Seek to the end we found above. The end may have moved since we last asked, which is why we are not doing seekToEnd() getKafkaConsumer().seek( topicPartition, endingOffset ); partitionOffsetsManager.replaceEntry( new ConsumerPartition(topicPartition.topic(), topicPartition.partition()), endingOffset ); } else { resetPartitions.add(topicPartition); } } // All of the error'd partitions we need to seek to earliest available position. resetPartitionsToEarliest(resetPartitions); }
@Override public ConsumerRecords<K, V> poll(long timeout) { long startMs = System.currentTimeMillis(); ConsumerRecords<K, V> processedRecords; // We will keep polling until timeout. long now = startMs; long expireMs = startMs + timeout; do { if (_autoCommitEnabled && now > _lastAutoCommitMs + _autoCommitInterval) { commitAsync(); _lastAutoCommitMs = now; } ConsumerRecords<byte[], byte[]> rawRecords = ConsumerRecords.empty(); try { rawRecords = _kafkaConsumer.poll(expireMs - now); } catch (OffsetOutOfRangeException | NoOffsetForPartitionException oe) { switch (_offsetResetStrategy) { case EARLIEST: _kafkaConsumer.seekToBeginning(oe.partitions()); oe.partitions().forEach(_consumerRecordsProcessor::clear); break; case LATEST: _kafkaConsumer.seekToEnd(oe.partitions()); oe.partitions().forEach(_consumerRecordsProcessor::clear); break; default: throw oe; } } // Check if we have enough high watermark for a partition. The high watermark is cleared during rebalance. // We make this check so that after rebalance we do not deliver duplicate messages to the user. if (!rawRecords.isEmpty() && _consumerRecordsProcessor.numConsumerHighWaterMarks() < assignment().size()) { for (TopicPartition tp : rawRecords.partitions()) { if (_consumerRecordsProcessor.consumerHighWaterMarkForPartition(tp) == null) { OffsetAndMetadata offsetAndMetadata = committed(tp); if (offsetAndMetadata != null) { long hw = offsetAndMetadata.offset(); _consumerRecordsProcessor.setPartitionConsumerHighWaterMark(tp, hw); } } } } processedRecords = _consumerRecordsProcessor.process(rawRecords); now = System.currentTimeMillis(); } while (processedRecords.isEmpty() && now < startMs + timeout); return processedRecords; }
@Override public ConsumerRecords<K, V> poll(long timeout) { if (_lastProcessedResult != null && _lastProcessedResult.exception() != null) { ConsumerRecordsProcessingException e = _lastProcessedResult.exception(); _lastProcessedResult = null; throw e; } long startMs = System.currentTimeMillis(); ConsumerRecords<K, V> processedRecords; // We will keep polling until timeout. long now = startMs; long expireMs = startMs + timeout; do { if (_autoCommitEnabled && now > _lastAutoCommitMs + _autoCommitInterval) { commitAsync(); _lastAutoCommitMs = now; } ConsumerRecords<byte[], byte[]> rawRecords = ConsumerRecords.empty(); try { rawRecords = _kafkaConsumer.poll(expireMs - now); } catch (OffsetOutOfRangeException | NoOffsetForPartitionException oe) { switch (_offsetResetStrategy) { case EARLIEST: oe.partitions().forEach(_consumerRecordsProcessor::clear); _kafkaConsumer.seekToBeginning(oe.partitions()); break; case LATEST: oe.partitions().forEach(_consumerRecordsProcessor::clear); _kafkaConsumer.seekToEnd(oe.partitions()); break; default: throw oe; } } // Check if we have enough high watermark for a partition. The high watermark is cleared during rebalance. // We make this check so that after rebalance we do not deliver duplicate messages to the user. if (!rawRecords.isEmpty() && _consumerRecordsProcessor.numConsumerHighWaterMarks() < assignment().size()) { for (TopicPartition tp : rawRecords.partitions()) { if (_consumerRecordsProcessor.consumerHighWaterMarkForPartition(tp) == null) { OffsetAndMetadata offsetAndMetadata = committed(tp); if (offsetAndMetadata != null) { long hw = offsetAndMetadata.offset(); _consumerRecordsProcessor.setPartitionConsumerHighWaterMark(tp, hw); } } } } _lastProcessedResult = _consumerRecordsProcessor.process(rawRecords); processedRecords = _lastProcessedResult.consumerRecords(); // Clear the internal reference. _lastProcessedResult.clearRecords(); // Rewind offset if there are processing exceptions. if (_lastProcessedResult.exception() != null) { for (Map.Entry<TopicPartition, Long> entry : _lastProcessedResult.resumeOffsets().entrySet()) { _kafkaConsumer.seek(entry.getKey(), entry.getValue()); } } now = System.currentTimeMillis(); } while (processedRecords.isEmpty() && now < startMs + timeout); return processedRecords; }