Java 类com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException 实例源码

项目:flink-stream-processing-refarch    文件:WatermarkTracker.java   
private void sentWatermark() {
  try {
    //refresh the list of available shards, if current state is too old
    if (System.currentTimeMillis() - lastShardRefreshTime >= SHARD_REFRESH_MILLIES) {
      refreshShards();

      lastShardRefreshTime = System.currentTimeMillis();
    }

    //send a watermark to every shard of the Kinesis stream
    shards.parallelStream()
        .map(shard -> new PutRecordRequest()
            .withStreamName(streamName)
            .withData(new WatermarkEvent(currentWatermark).payload)
            .withPartitionKey("23")
            .withExplicitHashKey(shard.getHashKeyRange().getStartingHashKey()))
        .map(kinesisClient::putRecord)
        .forEach(putRecordResult -> LOG.trace("send watermark {} to shard {}", new DateTime(currentWatermark), putRecordResult.getShardId()));

    LOG.debug("send watermark {}", new DateTime(currentWatermark));
  } catch (LimitExceededException | ProvisionedThroughputExceededException e) {
    //if any request is throttled, just wait for the next iteration to submit another watermark
    LOG.warn("skipping watermark due to limit exceeded exception");
  }
}
项目:kafka-connect-kinesis    文件:KinesisSourceTaskTest.java   
@Test
public void throughputExceeded() throws InterruptedException {
  final String SEQUENCE_NUMBER = "asdfasdfddsa";
  Map<String, Object> sourceOffset = ImmutableMap.of(RecordConverter.FIELD_SEQUENCE_NUMBER, SEQUENCE_NUMBER);
  when(this.offsetStorageReader.offset(anyMap())).thenReturn(sourceOffset);
  when(this.kinesisClient.getShardIterator(any())).thenReturn(
      new GetShardIteratorResult().withShardIterator("dfasdfsadfasdf")
  );
  this.task.start(settings);
  when(this.kinesisClient.getRecords(any())).thenThrow(new ProvisionedThroughputExceededException(""));

  List<SourceRecord> records = this.task.poll();
  assertNotNull(records, "records should not be null");
  assertTrue(records.isEmpty(), "records should be empty.");

  verify(this.task.time, atLeastOnce()).sleep(this.config.kinesisThroughputExceededBackoffMs);
}
项目:flink    文件:KinesisProxy.java   
/**
 * Determines whether the exception is recoverable using exponential-backoff.
 *
 * @param ex Exception to inspect
 * @return <code>true</code> if the exception can be recovered from, else
 *         <code>false</code>
 */
protected static boolean isRecoverableException(AmazonServiceException ex) {
    if (ex.getErrorType() == null) {
        return false;
    }

    switch (ex.getErrorType()) {
        case Client:
            return ex instanceof ProvisionedThroughputExceededException;
        case Service:
        case Unknown:
            return true;
        default:
            return false;
    }
}
项目:flink    文件:KinesisProxy.java   
/**
 * {@inheritDoc}
 */
@Override
public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException {
    GetShardIteratorResult getShardIteratorResult = null;

    int attempt = 0;
    while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) {
        try {
            getShardIteratorResult =
                kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum);
        } catch (ProvisionedThroughputExceededException ex) {
            long backoffMillis = fullJitterBackoff(
                getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++);
            LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
                + backoffMillis + " millis.");
            Thread.sleep(backoffMillis);
        }
    }

    if (getShardIteratorResult == null) {
        throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts +
            " retry attempts returned ProvisionedThroughputExceededException.");
    }
    return getShardIteratorResult.getShardIterator();
}
项目:amazon-kinesis-aggregators    文件:SensorReadingProducer.java   
private void run(final int events, final OutputFormat format,
        final String streamName, final String region) throws Exception {
    AmazonKinesis kinesisClient = new AmazonKinesisClient(
            new DefaultAWSCredentialsProviderChain());
    kinesisClient.setRegion(Region.getRegion(Regions.fromName(region)));
    int count = 0;
    SensorReading r = null;
    do {
        r = nextSensorReading(format);

        try {
            PutRecordRequest req = new PutRecordRequest()
                    .withPartitionKey("" + rand.nextLong())
                    .withStreamName(streamName)
                    .withData(ByteBuffer.wrap(r.toString().getBytes()));
            kinesisClient.putRecord(req);
        } catch (ProvisionedThroughputExceededException e) {
            Thread.sleep(BACKOFF);
        }

        System.out.println(r);
        count++;
    } while (count < events);
}
项目:kafka-connect-kinesis    文件:KinesisSourceTask.java   
@Override
public List<SourceRecord> poll() throws InterruptedException {
  List<SourceRecord> records;

  try {
    GetRecordsResult recordsResult = this.kinesisClient.getRecords(this.recordsRequest);
    records = new ArrayList<>(recordsResult.getRecords().size());
    log.trace("poll() - {} record(s) returned from shard {}.", this.config.kinesisShardId);

    for (Record record : recordsResult.getRecords()) {
      SourceRecord sourceRecord = this.recordConverter.sourceRecord(this.config.kinesisStreamName, this.config.kinesisShardId, record);
      records.add(sourceRecord);
    }

    log.trace("poll() - Changing shard iterator to {}", recordsResult.getNextShardIterator());
    this.recordsRequest.setShardIterator(recordsResult.getNextShardIterator());
  } catch (ProvisionedThroughputExceededException ex) {
    log.warn("poll() - Throughput exceeded sleeping {} ms", this.config.kinesisThroughputExceededBackoffMs, ex);
    this.time.sleep(this.config.kinesisThroughputExceededBackoffMs);
    return new ArrayList<>();
  }

  if (records.isEmpty()) {
    log.trace("poll() - No records returned. Sleeping {} ms.", this.config.kinesisEmptyRecordsBackoffMs);
    this.time.sleep(this.config.kinesisEmptyRecordsBackoffMs);
  }

  return records;
}
项目:flink    文件:KinesisProxy.java   
/**
 * {@inheritDoc}
 */
@Override
public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException {
    final GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
    getRecordsRequest.setShardIterator(shardIterator);
    getRecordsRequest.setLimit(maxRecordsToGet);

    GetRecordsResult getRecordsResult = null;

    int attempt = 0;
    while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) {
        try {
            getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
        } catch (ProvisionedThroughputExceededException ex) {
            long backoffMillis = fullJitterBackoff(
                getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
            LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
                + backoffMillis + " millis.");
            Thread.sleep(backoffMillis);
        }
    }

    if (getRecordsResult == null) {
        throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts +
            " retry attempts returned ProvisionedThroughputExceededException.");
    }

    return getRecordsResult;
}
项目:beam    文件:SimplifiedKinesisClientTest.java   
@Test
public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() {
  shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""),
      TransientKinesisException.class);
}
项目:beam    文件:SimplifiedKinesisClientTest.java   
@Test
public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() {
  shouldHandleShardListingError(new ProvisionedThroughputExceededException(""),
      TransientKinesisException.class);
}
项目:beam    文件:SimplifiedKinesisClientTest.java   
@Test
public void shouldHandleProvisionedThroughputExceededExceptionForGetBacklogBytes() {
  shouldHandleGetBacklogBytesError(new ProvisionedThroughputExceededException(""),
      TransientKinesisException.class);
}
项目:flink    文件:KinesisProxyTest.java   
@Test
public void testIsRecoverableExceptionWithProvisionedThroughputExceeded() {
    final ProvisionedThroughputExceededException ex = new ProvisionedThroughputExceededException("asdf");
    ex.setErrorType(ErrorType.Client);
    assertTrue(KinesisProxy.isRecoverableException(ex));
}