private void sentWatermark() { try { //refresh the list of available shards, if current state is too old if (System.currentTimeMillis() - lastShardRefreshTime >= SHARD_REFRESH_MILLIES) { refreshShards(); lastShardRefreshTime = System.currentTimeMillis(); } //send a watermark to every shard of the Kinesis stream shards.parallelStream() .map(shard -> new PutRecordRequest() .withStreamName(streamName) .withData(new WatermarkEvent(currentWatermark).payload) .withPartitionKey("23") .withExplicitHashKey(shard.getHashKeyRange().getStartingHashKey())) .map(kinesisClient::putRecord) .forEach(putRecordResult -> LOG.trace("send watermark {} to shard {}", new DateTime(currentWatermark), putRecordResult.getShardId())); LOG.debug("send watermark {}", new DateTime(currentWatermark)); } catch (LimitExceededException | ProvisionedThroughputExceededException e) { //if any request is throttled, just wait for the next iteration to submit another watermark LOG.warn("skipping watermark due to limit exceeded exception"); } }
@Test public void throughputExceeded() throws InterruptedException { final String SEQUENCE_NUMBER = "asdfasdfddsa"; Map<String, Object> sourceOffset = ImmutableMap.of(RecordConverter.FIELD_SEQUENCE_NUMBER, SEQUENCE_NUMBER); when(this.offsetStorageReader.offset(anyMap())).thenReturn(sourceOffset); when(this.kinesisClient.getShardIterator(any())).thenReturn( new GetShardIteratorResult().withShardIterator("dfasdfsadfasdf") ); this.task.start(settings); when(this.kinesisClient.getRecords(any())).thenThrow(new ProvisionedThroughputExceededException("")); List<SourceRecord> records = this.task.poll(); assertNotNull(records, "records should not be null"); assertTrue(records.isEmpty(), "records should be empty."); verify(this.task.time, atLeastOnce()).sleep(this.config.kinesisThroughputExceededBackoffMs); }
/** * Determines whether the exception is recoverable using exponential-backoff. * * @param ex Exception to inspect * @return <code>true</code> if the exception can be recovered from, else * <code>false</code> */ protected static boolean isRecoverableException(AmazonServiceException ex) { if (ex.getErrorType() == null) { return false; } switch (ex.getErrorType()) { case Client: return ex instanceof ProvisionedThroughputExceededException; case Service: case Unknown: return true; default: return false; } }
/** * {@inheritDoc} */ @Override public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException { GetShardIteratorResult getShardIteratorResult = null; int attempt = 0; while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) { try { getShardIteratorResult = kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum); } catch (ProvisionedThroughputExceededException ex) { long backoffMillis = fullJitterBackoff( getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++); LOG.warn("Got ProvisionedThroughputExceededException. Backing off for " + backoffMillis + " millis."); Thread.sleep(backoffMillis); } } if (getShardIteratorResult == null) { throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts + " retry attempts returned ProvisionedThroughputExceededException."); } return getShardIteratorResult.getShardIterator(); }
private void run(final int events, final OutputFormat format, final String streamName, final String region) throws Exception { AmazonKinesis kinesisClient = new AmazonKinesisClient( new DefaultAWSCredentialsProviderChain()); kinesisClient.setRegion(Region.getRegion(Regions.fromName(region))); int count = 0; SensorReading r = null; do { r = nextSensorReading(format); try { PutRecordRequest req = new PutRecordRequest() .withPartitionKey("" + rand.nextLong()) .withStreamName(streamName) .withData(ByteBuffer.wrap(r.toString().getBytes())); kinesisClient.putRecord(req); } catch (ProvisionedThroughputExceededException e) { Thread.sleep(BACKOFF); } System.out.println(r); count++; } while (count < events); }
@Override public List<SourceRecord> poll() throws InterruptedException { List<SourceRecord> records; try { GetRecordsResult recordsResult = this.kinesisClient.getRecords(this.recordsRequest); records = new ArrayList<>(recordsResult.getRecords().size()); log.trace("poll() - {} record(s) returned from shard {}.", this.config.kinesisShardId); for (Record record : recordsResult.getRecords()) { SourceRecord sourceRecord = this.recordConverter.sourceRecord(this.config.kinesisStreamName, this.config.kinesisShardId, record); records.add(sourceRecord); } log.trace("poll() - Changing shard iterator to {}", recordsResult.getNextShardIterator()); this.recordsRequest.setShardIterator(recordsResult.getNextShardIterator()); } catch (ProvisionedThroughputExceededException ex) { log.warn("poll() - Throughput exceeded sleeping {} ms", this.config.kinesisThroughputExceededBackoffMs, ex); this.time.sleep(this.config.kinesisThroughputExceededBackoffMs); return new ArrayList<>(); } if (records.isEmpty()) { log.trace("poll() - No records returned. Sleeping {} ms.", this.config.kinesisEmptyRecordsBackoffMs); this.time.sleep(this.config.kinesisEmptyRecordsBackoffMs); } return records; }
/** * {@inheritDoc} */ @Override public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException { final GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(maxRecordsToGet); GetRecordsResult getRecordsResult = null; int attempt = 0; while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) { try { getRecordsResult = kinesisClient.getRecords(getRecordsRequest); } catch (ProvisionedThroughputExceededException ex) { long backoffMillis = fullJitterBackoff( getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++); LOG.warn("Got ProvisionedThroughputExceededException. Backing off for " + backoffMillis + " millis."); Thread.sleep(backoffMillis); } } if (getRecordsResult == null) { throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts + " retry attempts returned ProvisionedThroughputExceededException."); } return getRecordsResult; }
@Test public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() { shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""), TransientKinesisException.class); }
@Test public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() { shouldHandleShardListingError(new ProvisionedThroughputExceededException(""), TransientKinesisException.class); }
@Test public void shouldHandleProvisionedThroughputExceededExceptionForGetBacklogBytes() { shouldHandleGetBacklogBytesError(new ProvisionedThroughputExceededException(""), TransientKinesisException.class); }
@Test public void testIsRecoverableExceptionWithProvisionedThroughputExceeded() { final ProvisionedThroughputExceededException ex = new ProvisionedThroughputExceededException("asdf"); ex.setErrorType(ErrorType.Client); assertTrue(KinesisProxy.isRecoverableException(ex)); }