Java 类com.amazonaws.services.kinesis.model.LimitExceededException 实例源码

项目:flink-stream-processing-refarch    文件:WatermarkTracker.java   
private void sentWatermark() {
  try {
    //refresh the list of available shards, if current state is too old
    if (System.currentTimeMillis() - lastShardRefreshTime >= SHARD_REFRESH_MILLIES) {
      refreshShards();

      lastShardRefreshTime = System.currentTimeMillis();
    }

    //send a watermark to every shard of the Kinesis stream
    shards.parallelStream()
        .map(shard -> new PutRecordRequest()
            .withStreamName(streamName)
            .withData(new WatermarkEvent(currentWatermark).payload)
            .withPartitionKey("23")
            .withExplicitHashKey(shard.getHashKeyRange().getStartingHashKey()))
        .map(kinesisClient::putRecord)
        .forEach(putRecordResult -> LOG.trace("send watermark {} to shard {}", new DateTime(currentWatermark), putRecordResult.getShardId()));

    LOG.debug("send watermark {}", new DateTime(currentWatermark));
  } catch (LimitExceededException | ProvisionedThroughputExceededException e) {
    //if any request is throttled, just wait for the next iteration to submit another watermark
    LOG.warn("skipping watermark due to limit exceeded exception");
  }
}
项目:beam    文件:SimplifiedKinesisClientTest.java   
@Test
public void shouldHandleLimitExceededExceptionForGetShardIterator() {
  shouldHandleGetShardIteratorError(new LimitExceededException(""),
      TransientKinesisException.class);
}
项目:beam    文件:SimplifiedKinesisClientTest.java   
@Test
public void shouldHandleLimitExceededExceptionForShardListing() {
  shouldHandleShardListingError(new LimitExceededException(""),
      TransientKinesisException.class);
}
项目:beam    文件:SimplifiedKinesisClientTest.java   
@Test
public void shouldHandleLimitExceededExceptionForGetBacklogBytes() {
  shouldHandleGetBacklogBytesError(new LimitExceededException(""),
      TransientKinesisException.class);
}
项目:flink    文件:KinesisProxy.java   
/**
 * Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis stream possess.
 *
 * <p>This method is using a "full jitter" approach described in AWS's article,
 * <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">"Exponential Backoff and Jitter"</a>.
 * This is necessary because concurrent calls will be made by all parallel subtask's fetcher. This
 * jitter backoff approach will help distribute calls across the fetchers over time.
 *
 * @param streamName the stream to describe
 * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result)
 * @return the result of the describe stream operation
 */
private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException {
    final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
    describeStreamRequest.setStreamName(streamName);
    describeStreamRequest.setExclusiveStartShardId(startShardId);

    DescribeStreamResult describeStreamResult = null;

    // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
    int attemptCount = 0;
    while (describeStreamResult == null) { // retry until we get a result
        try {
            describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
        } catch (LimitExceededException le) {
            long backoffMillis = fullJitterBackoff(
                describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
            LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for "
                + backoffMillis + " millis.");
            Thread.sleep(backoffMillis);
        } catch (ResourceNotFoundException re) {
            throw new RuntimeException("Error while getting stream details", re);
        }
    }

    String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
    if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) {
        if (LOG.isWarnEnabled()) {
            LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " +
                "describeStream operation will not contain any shard information.");
        }
    }

    // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive
    // start shard id in the returned shards list; check if we need to remove these erroneously returned shards
    if (startShardId != null) {
        List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
        Iterator<Shard> shardItr = shards.iterator();
        while (shardItr.hasNext()) {
            if (StreamShardHandle.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) {
                shardItr.remove();
            }
        }
    }

    return describeStreamResult;
}
项目:flink    文件:KinesisProxy.java   
/**
 * Get metainfo for a Kinesis stream, which contains information about which shards this Kinesis stream possess.
 *
 * This method is using a "full jitter" approach described in AWS's article,
 * <a href="https://www.awsarchitectureblog.com/2015/03/backoff.html">"Exponential Backoff and Jitter"</a>.
 * This is necessary because concurrent calls will be made by all parallel subtask's fetcher. This
 * jitter backoff approach will help distribute calls across the fetchers over time.
 *
 * @param streamName the stream to describe
 * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result)
 * @return the result of the describe stream operation
 */
private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException {
    final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
    describeStreamRequest.setStreamName(streamName);
    describeStreamRequest.setExclusiveStartShardId(startShardId);

    DescribeStreamResult describeStreamResult = null;

    // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
    int attemptCount = 0;
    while (describeStreamResult == null) { // retry until we get a result
        try {
            describeStreamResult = kinesisClient.describeStream(describeStreamRequest);
        } catch (LimitExceededException le) {
            long backoffMillis = fullJitterBackoff(
                describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
            LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for "
                + backoffMillis + " millis.");
            Thread.sleep(backoffMillis);
        } catch (ResourceNotFoundException re) {
            throw new RuntimeException("Error while getting stream details", re);
        }
    }

    String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
    if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) {
        if (LOG.isWarnEnabled()) {
            LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " +
                "describeStream operation will not contain any shard information.");
        }
    }

    // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive
    // start shard id in the returned shards list; check if we need to remove these erroneously returned shards
    if (startShardId != null) {
        List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
        Iterator<Shard> shardItr = shards.iterator();
        while (shardItr.hasNext()) {
            if (KinesisStreamShard.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) {
                shardItr.remove();
            }
        }
    }

    return describeStreamResult;
}