Java 类org.apache.kafka.clients.consumer.OffsetOutOfRangeException 实例源码

项目:kafka-0.11.0.0-src-with-comment    文件:FetcherTest.java   
@Test
public void testFetchOffsetOutOfRangeException() {
    subscriptionsNoAutoReset.assignFromUser(singleton(tp1));
    subscriptionsNoAutoReset.seek(tp1, 0);

    fetcherNoAutoReset.sendFetches();
    client.prepareResponse(fetchResponse(tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
    consumerClient.poll(0);

    assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp1));
    for (int i = 0; i < 2; i++) {
        try {
            fetcherNoAutoReset.fetchedRecords();
            fail("Should have thrown OffsetOutOfRangeException");
        } catch (OffsetOutOfRangeException e) {
            assertTrue(e.offsetOutOfRangePartitions().containsKey(tp1));
            assertEquals(e.offsetOutOfRangePartitions().size(), 1);
        }
    }
}
项目:kafka    文件:Fetcher.java   
/**
 * If any partition from previous fetchResponse contains OffsetOutOfRange error and
 * the defaultResetPolicy is NONE, throw OffsetOutOfRangeException
 *
 * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse
 */
private void throwIfOffsetOutOfRange() throws OffsetOutOfRangeException {
    Map<TopicPartition, Long> currentOutOfRangePartitions = new HashMap<>();

    // filter offsetOutOfRangePartitions to retain only the fetchable partitions
    for (Map.Entry<TopicPartition, Long> entry: this.offsetOutOfRangePartitions.entrySet()) {
        if (!subscriptions.isFetchable(entry.getKey())) {
            log.debug("Ignoring fetched records for {} since it is no longer fetchable", entry.getKey());
            continue;
        }
        Long position = subscriptions.position(entry.getKey());
        // ignore partition if the current position != the offset in fetchResponse, e.g. after seek()
        if (position != null && entry.getValue().equals(position))
            currentOutOfRangePartitions.put(entry.getKey(), entry.getValue());
    }
    this.offsetOutOfRangePartitions.clear();
    if (!currentOutOfRangePartitions.isEmpty())
        throw new OffsetOutOfRangeException(currentOutOfRangePartitions);
}
项目:kafka    文件:FetcherTest.java   
@Test
public void testFetchOffsetOutOfRangeException() {
    subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp));
    subscriptionsNoAutoReset.seek(tp, 0);

    fetcherNoAutoReset.sendFetches();
    client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
    consumerClient.poll(0);
    assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
    try {
        fetcherNoAutoReset.fetchedRecords();
        fail("Should have thrown OffsetOutOfRangeException");
    } catch (OffsetOutOfRangeException e) {
        assertTrue(e.offsetOutOfRangePartitions().containsKey(tp));
        assertEquals(e.offsetOutOfRangePartitions().size(), 1);
    }
    assertEquals(0, fetcherNoAutoReset.fetchedRecords().size());
}
项目:li-apache-kafka-clients    文件:LiKafkaConsumerImpl.java   
@Override
public long position(TopicPartition partition) {
  // Not handling large message here. The position will be actual position.
  try {
    return _kafkaConsumer.position(partition);
  } catch (OffsetOutOfRangeException | NoOffsetForPartitionException oe) {
    switch (_offsetResetStrategy) {
      case EARLIEST:
        oe.partitions().forEach(_consumerRecordsProcessor::clear);
        _kafkaConsumer.seekToBeginning(oe.partitions());
        return position(partition);
      case LATEST:
        oe.partitions().forEach(_consumerRecordsProcessor::clear);
        _kafkaConsumer.seekToEnd(oe.partitions());
        return position(partition);
      default:
        throw oe;
    }
  }
}
项目:kafka-spark-consumer    文件:KafkaUtils.java   
public static ConsumerRecords<byte[], byte[]> fetchMessages(
  KafkaConfig config, KafkaConsumer<byte[], byte[]> consumer, Partition partition,
  long offset) {
  String topic = (String) config._stateConf.get(Config.KAFKA_TOPIC);
  int partitionId = partition.partition;
  TopicPartition  topicAndPartition = new TopicPartition (topic, partitionId);
  consumer.seek(topicAndPartition, offset);
  ConsumerRecords<byte[], byte[]> records;
  try {
    records = consumer.poll(config._fillFreqMs / 2);
  } catch(OffsetOutOfRangeException oore) {
    throw new OutOfRangeException(oore.getMessage());
  } catch (Exception e) {
    if (e instanceof ConnectException
      || e instanceof SocketTimeoutException || e instanceof IOException
      || e instanceof UnresolvedAddressException) {

      LOG.warn("Network error when fetching messages:", e);
      throw new FailedFetchException(e);
    } else {
      throw new RuntimeException(e);
    }
  }
  return records;
}
项目:storm-dynamic-spout    文件:ConsumerTest.java   
/**
 * Calling nextRecord calls fillBuffer() which has special handling for OutOfRangeExceptions.  There is some
 * recursion in this method, but it should not go on forever and yield a StackOverflow exception.  This test verifies
 * that when the KafkaConsumer is relentlessly throwing OutOfRangeExceptions that we stop trying to fill the buffer
 * after five attempts and do not have any other errors.
 */
@Test
public void testNextRecordWithRecursiveOutOfRangeException() {
    final KafkaConsumer<byte[], byte[]> kafkaConsumer = Mockito.mock(KafkaConsumer.class);

    Mockito.when(kafkaConsumer.assignment()).thenReturn(Sets.newHashSet(new TopicPartition("Foobar", 0)));
    Mockito.when(kafkaConsumer.partitionsFor(topicName)).thenReturn(Arrays.asList(
        new PartitionInfo(topicName, 0, null, null, null)
    ));
    Mockito.when(kafkaConsumer.poll(300)).thenThrow(
        new OffsetOutOfRangeException(new HashMap<>())
    );

    final PersistenceAdapter persistenceAdapter = new InMemoryPersistenceAdapter();
    persistenceAdapter.open(Maps.newHashMap());

    // Create our consumer
    final Consumer consumer = new Consumer(kafkaConsumer);
    consumer.open(
        getDefaultConfig(topicName),
        getDefaultVSpoutId(),
        getDefaultConsumerCohortDefinition(),
        persistenceAdapter,
        new LogRecorder(),
        null
    );

    final Record record = consumer.nextRecord();

    assertNull(record);

    Mockito.verify(kafkaConsumer, Mockito.times(5)).poll(300);

    consumer.close();
}
项目:storm-dynamic-spout    文件:Consumer.java   
/**
 * Internal method used to fill internal message buffer from kafka.
 *
 * Limited by the number of trips made. This should only be called from {@link #fillBuffer()}.
 */
private void fillBuffer(final int trips) {
    // If our buffer is null, or our iterator is at the end
    if (buffer == null || !bufferIterator.hasNext()) {

        // If we have no assigned partitions to consume from, then don't call poll()
        // The underlying consumer call here does NOT make an API call, so this is safe to call within this loop.
        if (getKafkaConsumer().assignment().isEmpty()) {
            // No assigned partitions, nothing to consume :)
            return;
        }

        // Time to refill the buffer
        try {
            buffer = getKafkaConsumer().poll(300);
        } catch (OffsetOutOfRangeException outOfRangeException) {
            // Handle it
            handleOffsetOutOfRange(outOfRangeException);

            // Clear out so we can attempt next time.
            buffer = null;
            bufferIterator = null;

            // Why 5? Because it's less than 6.
            if (trips >= 5) {
                logger.error(
                    "Attempted to fill the buffer after an OffsetOutOfRangeException, but this was my fifth attempt so I'm bailing."
                );
                // nextRecord() will get called by the VirtualSpout instance soon so we're not giving up, just avoiding a StackOverflow
                // exception on this current run of checks.
                return;
            }

            fillBuffer(trips + 1);
            return;
        }

        // Create new iterator
        bufferIterator = buffer.iterator();
    }
}
项目:storm-dynamic-spout    文件:Consumer.java   
/**
 * This method handles when a partition seek/retrieve request was out of bounds.
 * This happens in two scenarios:
 *  1 - The offset is too old and was cleaned up / removed by the broker.
 *  2 - The offset just plain does not exist.
 *
 * This is particularly nasty in that if the poll() was able to pull SOME messages from
 * SOME partitions before the exception was thrown, those messages are considered "consumed"
 * by KafkaClient, and there's no way to get them w/o seeking back to them for those partitions.
 *
 * This means when we roll back, we may replay some messages :/
 *
 * @param outOfRangeException The exception that was raised by the consumer.
 */
private void handleOffsetOutOfRange(final OffsetOutOfRangeException outOfRangeException) {
    final Set<TopicPartition> resetPartitions = Sets.newHashSet();

    // Loop over all the partitions in this exception
    for (final TopicPartition topicPartition : outOfRangeException.offsetOutOfRangePartitions().keySet()) {
        // The offset that was in the error
        final long exceptionOffset = outOfRangeException.offsetOutOfRangePartitions().get(topicPartition);
        // What kafka says the last offset is
        final long endingOffset = getKafkaConsumer().endOffsets(Collections.singletonList(topicPartition))
            .get(topicPartition);

        logger.warn("Offset Out of Range for partition {} at offset {}, kafka says last offset in partition is {}",
            topicPartition.partition(), exceptionOffset, endingOffset);

        // We have a hypothesis that the consumer can actually seek past the last message of the topic,
        // this yields this error and we want to catch it and try to back it up just a bit to a place that
        // we can work from.
        if (exceptionOffset >= endingOffset) {
            logger.warn(
                "OutOfRangeException yielded offset {}, which is past our ending offset of {} for {}",
                exceptionOffset,
                endingOffset,
                topicPartition
            );

            // Seek to the end we found above.  The end may have moved since we last asked, which is why we are not doing seekToEnd()
            getKafkaConsumer().seek(
                topicPartition,
                endingOffset
            );

            partitionOffsetsManager.replaceEntry(
                new ConsumerPartition(topicPartition.topic(), topicPartition.partition()),
                endingOffset
            );
        } else {
            resetPartitions.add(topicPartition);
        }
    }

    // All of the error'd partitions we need to seek to earliest available position.
    resetPartitionsToEarliest(resetPartitions);
}
项目:likafka-clients    文件:LiKafkaConsumerImpl.java   
@Override
public ConsumerRecords<K, V> poll(long timeout) {
  long startMs = System.currentTimeMillis();
  ConsumerRecords<K, V> processedRecords;
  // We will keep polling until timeout.
  long now = startMs;
  long expireMs = startMs + timeout;
  do {
    if (_autoCommitEnabled && now > _lastAutoCommitMs + _autoCommitInterval) {
      commitAsync();
      _lastAutoCommitMs = now;
    }
    ConsumerRecords<byte[], byte[]> rawRecords = ConsumerRecords.empty();
    try {
       rawRecords = _kafkaConsumer.poll(expireMs - now);
    } catch (OffsetOutOfRangeException | NoOffsetForPartitionException oe) {
      switch (_offsetResetStrategy) {
        case EARLIEST:
          _kafkaConsumer.seekToBeginning(oe.partitions());
          oe.partitions().forEach(_consumerRecordsProcessor::clear);
          break;
        case LATEST:
          _kafkaConsumer.seekToEnd(oe.partitions());
          oe.partitions().forEach(_consumerRecordsProcessor::clear);
          break;
        default:
          throw oe;
      }
    }
    // Check if we have enough high watermark for a partition. The high watermark is cleared during rebalance.
    // We make this check so that after rebalance we do not deliver duplicate messages to the user.
    if (!rawRecords.isEmpty() && _consumerRecordsProcessor.numConsumerHighWaterMarks() < assignment().size()) {
      for (TopicPartition tp : rawRecords.partitions()) {
        if (_consumerRecordsProcessor.consumerHighWaterMarkForPartition(tp) == null) {
          OffsetAndMetadata offsetAndMetadata = committed(tp);
          if (offsetAndMetadata != null) {
            long hw = offsetAndMetadata.offset();
            _consumerRecordsProcessor.setPartitionConsumerHighWaterMark(tp, hw);
          }
        }
      }
    }
    processedRecords = _consumerRecordsProcessor.process(rawRecords);
    now = System.currentTimeMillis();
  } while (processedRecords.isEmpty() && now < startMs + timeout);
  return processedRecords;
}
项目:li-apache-kafka-clients    文件:LiKafkaConsumerImpl.java   
@Override
public ConsumerRecords<K, V> poll(long timeout) {
  if (_lastProcessedResult != null && _lastProcessedResult.exception() != null) {
    ConsumerRecordsProcessingException e = _lastProcessedResult.exception();
    _lastProcessedResult = null;
    throw e;
  }
  long startMs = System.currentTimeMillis();
  ConsumerRecords<K, V> processedRecords;
  // We will keep polling until timeout.
  long now = startMs;
  long expireMs = startMs + timeout;
  do {
    if (_autoCommitEnabled && now > _lastAutoCommitMs + _autoCommitInterval) {
      commitAsync();
      _lastAutoCommitMs = now;
    }
    ConsumerRecords<byte[], byte[]> rawRecords = ConsumerRecords.empty();
    try {
       rawRecords = _kafkaConsumer.poll(expireMs - now);
    } catch (OffsetOutOfRangeException | NoOffsetForPartitionException oe) {
      switch (_offsetResetStrategy) {
        case EARLIEST:
          oe.partitions().forEach(_consumerRecordsProcessor::clear);
          _kafkaConsumer.seekToBeginning(oe.partitions());
          break;
        case LATEST:
          oe.partitions().forEach(_consumerRecordsProcessor::clear);
          _kafkaConsumer.seekToEnd(oe.partitions());
          break;
        default:
          throw oe;
      }
    }
    // Check if we have enough high watermark for a partition. The high watermark is cleared during rebalance.
    // We make this check so that after rebalance we do not deliver duplicate messages to the user.
    if (!rawRecords.isEmpty() && _consumerRecordsProcessor.numConsumerHighWaterMarks() < assignment().size()) {
      for (TopicPartition tp : rawRecords.partitions()) {
        if (_consumerRecordsProcessor.consumerHighWaterMarkForPartition(tp) == null) {
          OffsetAndMetadata offsetAndMetadata = committed(tp);
          if (offsetAndMetadata != null) {
            long hw = offsetAndMetadata.offset();
            _consumerRecordsProcessor.setPartitionConsumerHighWaterMark(tp, hw);
          }
        }
      }
    }
    _lastProcessedResult = _consumerRecordsProcessor.process(rawRecords);
    processedRecords = _lastProcessedResult.consumerRecords();
    // Clear the internal reference.
    _lastProcessedResult.clearRecords();
    // Rewind offset if there are processing exceptions.
    if (_lastProcessedResult.exception() != null) {
      for (Map.Entry<TopicPartition, Long> entry : _lastProcessedResult.resumeOffsets().entrySet()) {
        _kafkaConsumer.seek(entry.getKey(), entry.getValue());
      }
    }
    now = System.currentTimeMillis();
  } while (processedRecords.isEmpty() && now < startMs + timeout);
  return processedRecords;
}