Java 类com.amazonaws.services.kinesis.model.ExpiredIteratorException 实例源码

项目:beam    文件:ShardRecordsIteratorTest.java   
@Test
public void refreshesExpiredIterator() throws IOException, TransientKinesisException {
  when(firstResult.getRecords()).thenReturn(singletonList(a));
  when(secondResult.getRecords()).thenReturn(singletonList(b));

  when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
      .thenThrow(ExpiredIteratorException.class);
  when(aCheckpoint.getShardIterator(kinesisClient))
      .thenReturn(SECOND_REFRESHED_ITERATOR);
  when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID))
      .thenReturn(secondResult);

  assertThat(iterator.readNextBatch()).isEqualTo(singletonList(a));
  iterator.ackRecord(a);
  assertThat(iterator.readNextBatch()).isEqualTo(singletonList(b));
  assertThat(iterator.readNextBatch()).isEqualTo(Collections.emptyList());
}
项目:flink    文件:ShardConsumer.java   
/**
 * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
 * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
 * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
 * be used for the next call to this method.
 *
 * <p>Note: it is important that this method is not called again before all the records from the last result have been
 * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
 * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
 * incorrect shard iteration if the iterator had to be refreshed.
 *
 * @param shardItr shard iterator to use
 * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
 * @return get records result
 * @throws InterruptedException
 */
private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
    GetRecordsResult getRecordsResult = null;
    while (getRecordsResult == null) {
        try {
            getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);

            // Update millis behind latest so it gets reported by the millisBehindLatest gauge
            shardMetricsReporter.setMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
        } catch (ExpiredIteratorException eiEx) {
            LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
                " refreshing the iterator ...", shardItr, subscribedShard);
            shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());

            // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
            if (fetchIntervalMillis != 0) {
                Thread.sleep(fetchIntervalMillis);
            }
        }
    }
    return getRecordsResult;
}
项目:flink    文件:FakeKinesisBehavioursFactory.java   
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
    if ((Integer.valueOf(shardIterator) == orderOfCallToExpire - 1) && !expiredOnceAlready) {
        // we fake only once the expired iterator exception at the specified get records attempt order
        expiredOnceAlready = true;
        throw new ExpiredIteratorException("Artificial expired shard iterator");
    } else if (expiredOnceAlready && !expiredIteratorRefreshed) {
        // if we've thrown the expired iterator exception already, but the iterator was not refreshed,
        // throw a hard exception to the test that is testing this Kinesis behaviour
        throw new RuntimeException("expired shard iterator was not refreshed on the next getRecords() call");
    } else {
        // assuming that the maxRecordsToGet is always large enough
        return new GetRecordsResult()
            .withRecords(shardItrToRecordBatch.get(shardIterator))
            .withMillisBehindLatest(millisBehindLatest)
            .withNextShardIterator(
                (Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls - 1)
                    ? null : String.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard iterator is null
    }
}
项目:flink    文件:ShardConsumer.java   
/**
 * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
 * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
 * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
 * be used for the next call to this method.
 *
 * Note: it is important that this method is not called again before all the records from the last result have been
 * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
 * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
 * incorrect shard iteration if the iterator had to be refreshed.
 *
 * @param shardItr shard iterator to use
 * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
 * @return get records result
 * @throws InterruptedException
 */
private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
    GetRecordsResult getRecordsResult = null;
    while (getRecordsResult == null) {
        try {
            getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
        } catch (ExpiredIteratorException eiEx) {
            LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
                " refreshing the iterator ...", shardItr, subscribedShard);
            shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());

            // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
            if (fetchIntervalMillis != 0) {
                Thread.sleep(fetchIntervalMillis);
            }
        }
    }
    return getRecordsResult;
}
项目:flink    文件:FakeKinesisBehavioursFactory.java   
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
    if ((Integer.valueOf(shardIterator) == orderOfCallToExpire-1) && !expiredOnceAlready) {
        // we fake only once the expired iterator exception at the specified get records attempt order
        expiredOnceAlready = true;
        throw new ExpiredIteratorException("Artificial expired shard iterator");
    } else if (expiredOnceAlready && !expiredIteratorRefreshed) {
        // if we've thrown the expired iterator exception already, but the iterator was not refreshed,
        // throw a hard exception to the test that is testing this Kinesis behaviour
        throw new RuntimeException("expired shard iterator was not refreshed on the next getRecords() call");
    } else {
        // assuming that the maxRecordsToGet is always large enough
        return new GetRecordsResult()
            .withRecords(shardItrToRecordBatch.get(shardIterator))
            .withNextShardIterator(
                (Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls - 1)
                    ? null : String.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard iterator is null
    }
}
项目:beam    文件:ShardRecordsIterator.java   
private GetKinesisRecordsResult fetchRecords() throws TransientKinesisException {
  try {
    GetKinesisRecordsResult response = kinesis.getRecords(shardIterator, streamName, shardId);
    shardIterator = response.getNextShardIterator();
    return response;
  } catch (ExpiredIteratorException e) {
    LOG.info("Refreshing expired iterator", e);
    shardIterator = checkpoint.get().getShardIterator(kinesis);
    return fetchRecords();
  }
}
项目:beam    文件:SimplifiedKinesisClientTest.java   
@Test
public void shouldHandleExpiredIterationExceptionForGetShardIterator() {
  shouldHandleGetShardIteratorError(new ExpiredIteratorException(""),
      ExpiredIteratorException.class);
}
项目:beam    文件:SimplifiedKinesisClientTest.java   
@Test
public void shouldHandleExpiredIterationExceptionForShardListing() {
  shouldHandleShardListingError(new ExpiredIteratorException(""),
      ExpiredIteratorException.class);
}
项目:flink    文件:KinesisProxyTest.java   
@Test
public void testIsRecoverableExceptionWithExpiredIteratorException() {
    final ExpiredIteratorException ex = new ExpiredIteratorException("asdf");
    ex.setErrorType(ErrorType.Client);
    assertFalse(KinesisProxy.isRecoverableException(ex));
}