@Test public void refreshesExpiredIterator() throws IOException, TransientKinesisException { when(firstResult.getRecords()).thenReturn(singletonList(a)); when(secondResult.getRecords()).thenReturn(singletonList(b)); when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID)) .thenThrow(ExpiredIteratorException.class); when(aCheckpoint.getShardIterator(kinesisClient)) .thenReturn(SECOND_REFRESHED_ITERATOR); when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID)) .thenReturn(secondResult); assertThat(iterator.readNextBatch()).isEqualTo(singletonList(a)); iterator.ackRecord(a); assertThat(iterator.readNextBatch()).isEqualTo(singletonList(b)); assertThat(iterator.readNextBatch()).isEqualTo(Collections.emptyList()); }
/** * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should * be used for the next call to this method. * * <p>Note: it is important that this method is not called again before all the records from the last result have been * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to * incorrect shard iteration if the iterator had to be refreshed. * * @param shardItr shard iterator to use * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt * @return get records result * @throws InterruptedException */ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException { GetRecordsResult getRecordsResult = null; while (getRecordsResult == null) { try { getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords); // Update millis behind latest so it gets reported by the millisBehindLatest gauge shardMetricsReporter.setMillisBehindLatest(getRecordsResult.getMillisBehindLatest()); } catch (ExpiredIteratorException eiEx) { LOG.warn("Encountered an unexpected expired iterator {} for shard {};" + " refreshing the iterator ...", shardItr, subscribedShard); shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator if (fetchIntervalMillis != 0) { Thread.sleep(fetchIntervalMillis); } } } return getRecordsResult; }
@Override public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { if ((Integer.valueOf(shardIterator) == orderOfCallToExpire - 1) && !expiredOnceAlready) { // we fake only once the expired iterator exception at the specified get records attempt order expiredOnceAlready = true; throw new ExpiredIteratorException("Artificial expired shard iterator"); } else if (expiredOnceAlready && !expiredIteratorRefreshed) { // if we've thrown the expired iterator exception already, but the iterator was not refreshed, // throw a hard exception to the test that is testing this Kinesis behaviour throw new RuntimeException("expired shard iterator was not refreshed on the next getRecords() call"); } else { // assuming that the maxRecordsToGet is always large enough return new GetRecordsResult() .withRecords(shardItrToRecordBatch.get(shardIterator)) .withMillisBehindLatest(millisBehindLatest) .withNextShardIterator( (Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls - 1) ? null : String.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard iterator is null } }
/** * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should * be used for the next call to this method. * * Note: it is important that this method is not called again before all the records from the last result have been * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to * incorrect shard iteration if the iterator had to be refreshed. * * @param shardItr shard iterator to use * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt * @return get records result * @throws InterruptedException */ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException { GetRecordsResult getRecordsResult = null; while (getRecordsResult == null) { try { getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords); } catch (ExpiredIteratorException eiEx) { LOG.warn("Encountered an unexpected expired iterator {} for shard {};" + " refreshing the iterator ...", shardItr, subscribedShard); shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator if (fetchIntervalMillis != 0) { Thread.sleep(fetchIntervalMillis); } } } return getRecordsResult; }
@Override public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { if ((Integer.valueOf(shardIterator) == orderOfCallToExpire-1) && !expiredOnceAlready) { // we fake only once the expired iterator exception at the specified get records attempt order expiredOnceAlready = true; throw new ExpiredIteratorException("Artificial expired shard iterator"); } else if (expiredOnceAlready && !expiredIteratorRefreshed) { // if we've thrown the expired iterator exception already, but the iterator was not refreshed, // throw a hard exception to the test that is testing this Kinesis behaviour throw new RuntimeException("expired shard iterator was not refreshed on the next getRecords() call"); } else { // assuming that the maxRecordsToGet is always large enough return new GetRecordsResult() .withRecords(shardItrToRecordBatch.get(shardIterator)) .withNextShardIterator( (Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls - 1) ? null : String.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard iterator is null } }
private GetKinesisRecordsResult fetchRecords() throws TransientKinesisException { try { GetKinesisRecordsResult response = kinesis.getRecords(shardIterator, streamName, shardId); shardIterator = response.getNextShardIterator(); return response; } catch (ExpiredIteratorException e) { LOG.info("Refreshing expired iterator", e); shardIterator = checkpoint.get().getShardIterator(kinesis); return fetchRecords(); } }
@Test public void shouldHandleExpiredIterationExceptionForGetShardIterator() { shouldHandleGetShardIteratorError(new ExpiredIteratorException(""), ExpiredIteratorException.class); }
@Test public void shouldHandleExpiredIterationExceptionForShardListing() { shouldHandleShardListingError(new ExpiredIteratorException(""), ExpiredIteratorException.class); }
@Test public void testIsRecoverableExceptionWithExpiredIteratorException() { final ExpiredIteratorException ex = new ExpiredIteratorException("asdf"); ex.setErrorType(ErrorType.Client); assertFalse(KinesisProxy.isRecoverableException(ex)); }