@Test public void noSourceOffsets() throws InterruptedException { when(this.kinesisClient.getShardIterator(any())).thenReturn( new GetShardIteratorResult().withShardIterator("dfasdfsadfasdf") ); this.task.start(settings); GetRecordsResult recordsResult = new GetRecordsResult() .withNextShardIterator("dsfargadsfasdfasda") .withRecords(TestData.record()) .withMillisBehindLatest(0L); when(this.kinesisClient.getRecords(any())).thenReturn(recordsResult); List<SourceRecord> records = this.task.poll(); assertNotNull(records, "records should not be null."); assertFalse(records.isEmpty(), "records should not be empty."); verify(this.kinesisClient, atLeastOnce()).getShardIterator(any()); verify(this.kinesisClient, atLeastOnce()).getRecords(any()); }
@Test public void noRecords() throws InterruptedException { final String SEQUENCE_NUMBER = "asdfasdfddsa"; Map<String, Object> sourceOffset = ImmutableMap.of(RecordConverter.FIELD_SEQUENCE_NUMBER, SEQUENCE_NUMBER); when(this.offsetStorageReader.offset(anyMap())).thenReturn(sourceOffset); when(this.kinesisClient.getShardIterator(any())).thenReturn( new GetShardIteratorResult().withShardIterator("dfasdfsadfasdf") ); this.task.start(settings); GetRecordsResult recordsResult = new GetRecordsResult() .withNextShardIterator("dsfargadsfasdfasda") .withRecords(Arrays.asList()) .withMillisBehindLatest(0L); when(this.kinesisClient.getRecords(any())).thenReturn(recordsResult); List<SourceRecord> records = this.task.poll(); assertNotNull(records, "records should not be null"); assertTrue(records.isEmpty(), "records should be empty."); verify(this.task.time, atLeastOnce()).sleep(this.config.kinesisEmptyRecordsBackoffMs); }
/** * Gets records from Kinesis and deaggregates them if needed. * * @return list of deaggregated records * @throws TransientKinesisException - in case of recoverable situation */ public GetKinesisRecordsResult getRecords(final String shardIterator, final String streamName, final String shardId, final Integer limit) throws TransientKinesisException { return wrapExceptions(new Callable<GetKinesisRecordsResult>() { @Override public GetKinesisRecordsResult call() throws Exception { GetRecordsResult response = kinesis.getRecords(new GetRecordsRequest() .withShardIterator(shardIterator) .withLimit(limit)); return new GetKinesisRecordsResult( UserRecord.deaggregate(response.getRecords()), response.getNextShardIterator(), response.getMillisBehindLatest(), streamName, shardId); } }); }
/** * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should * be used for the next call to this method. * * <p>Note: it is important that this method is not called again before all the records from the last result have been * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to * incorrect shard iteration if the iterator had to be refreshed. * * @param shardItr shard iterator to use * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt * @return get records result * @throws InterruptedException */ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException { GetRecordsResult getRecordsResult = null; while (getRecordsResult == null) { try { getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords); // Update millis behind latest so it gets reported by the millisBehindLatest gauge shardMetricsReporter.setMillisBehindLatest(getRecordsResult.getMillisBehindLatest()); } catch (ExpiredIteratorException eiEx) { LOG.warn("Encountered an unexpected expired iterator {} for shard {};" + " refreshing the iterator ...", shardItr, subscribedShard); shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator if (fetchIntervalMillis != 0) { Thread.sleep(fetchIntervalMillis); } } } return getRecordsResult; }
public static KinesisProxyInterface noShardsFoundForRequestedStreamsBehaviour() { return new KinesisProxyInterface() { @Override public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) { return new GetShardListResult(); // not setting any retrieved shards for result } @Override public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) { return null; } @Override public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { return null; } }; }
@Override public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { if ((Integer.valueOf(shardIterator) == orderOfCallToExpire - 1) && !expiredOnceAlready) { // we fake only once the expired iterator exception at the specified get records attempt order expiredOnceAlready = true; throw new ExpiredIteratorException("Artificial expired shard iterator"); } else if (expiredOnceAlready && !expiredIteratorRefreshed) { // if we've thrown the expired iterator exception already, but the iterator was not refreshed, // throw a hard exception to the test that is testing this Kinesis behaviour throw new RuntimeException("expired shard iterator was not refreshed on the next getRecords() call"); } else { // assuming that the maxRecordsToGet is always large enough return new GetRecordsResult() .withRecords(shardItrToRecordBatch.get(shardIterator)) .withMillisBehindLatest(millisBehindLatest) .withNextShardIterator( (Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls - 1) ? null : String.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard iterator is null } }
/** * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should * be used for the next call to this method. * * Note: it is important that this method is not called again before all the records from the last result have been * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to * incorrect shard iteration if the iterator had to be refreshed. * * @param shardItr shard iterator to use * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt * @return get records result * @throws InterruptedException */ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException { GetRecordsResult getRecordsResult = null; while (getRecordsResult == null) { try { getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords); } catch (ExpiredIteratorException eiEx) { LOG.warn("Encountered an unexpected expired iterator {} for shard {};" + " refreshing the iterator ...", shardItr, subscribedShard); shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator if (fetchIntervalMillis != 0) { Thread.sleep(fetchIntervalMillis); } } } return getRecordsResult; }
public static KinesisProxyInterface noShardsFoundForRequestedStreamsBehaviour() { return new KinesisProxyInterface() { @Override public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) { return new GetShardListResult(); // not setting any retrieved shards for result } @Override public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) { return null; } @Override public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { return null; } }; }
@Override public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { if ((Integer.valueOf(shardIterator) == orderOfCallToExpire-1) && !expiredOnceAlready) { // we fake only once the expired iterator exception at the specified get records attempt order expiredOnceAlready = true; throw new ExpiredIteratorException("Artificial expired shard iterator"); } else if (expiredOnceAlready && !expiredIteratorRefreshed) { // if we've thrown the expired iterator exception already, but the iterator was not refreshed, // throw a hard exception to the test that is testing this Kinesis behaviour throw new RuntimeException("expired shard iterator was not refreshed on the next getRecords() call"); } else { // assuming that the maxRecordsToGet is always large enough return new GetRecordsResult() .withRecords(shardItrToRecordBatch.get(shardIterator)) .withNextShardIterator( (Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls - 1) ? null : String.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard iterator is null } }
@Override protected int poll() throws Exception { GetRecordsRequest req = new GetRecordsRequest() .withShardIterator(getShardItertor()) .withLimit(getEndpoint().getMaxResultsPerRequest()); GetRecordsResult result = getClient().getRecords(req); Queue<Exchange> exchanges = createExchanges(result.getRecords()); int processedExchangeCount = processBatch(CastUtils.cast(exchanges)); // May cache the last successful sequence number, and pass it to the // getRecords request. That way, on the next poll, we start from where // we left off, however, I don't know what happens to subsequent // exchanges when an earlier echangee fails. currentShardIterator = result.getNextShardIterator(); return processedExchangeCount; }
@Before public void setup() throws Exception { KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", component); endpoint.setAmazonKinesisClient(kinesisClient); endpoint.setIteratorType(ShardIteratorType.LATEST); undertest = new KinesisConsumer(endpoint, processor); when(kinesisClient.getRecords(any(GetRecordsRequest.class))) .thenReturn(new GetRecordsResult() .withNextShardIterator("nextShardIterator") ); when(kinesisClient.describeStream(any(DescribeStreamRequest.class))) .thenReturn(new DescribeStreamResult() .withStreamDescription(new StreamDescription() .withShards(new Shard().withShardId("shardId")) ) ); when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))) .thenReturn(new GetShardIteratorResult() .withShardIterator("shardIterator") ); }
@Test public void recordsAreSentToTheProcessor() throws Exception { when(kinesisClient.getRecords(any(GetRecordsRequest.class))) .thenReturn(new GetRecordsResult() .withNextShardIterator("nextShardIterator") .withRecords(new Record().withSequenceNumber("1"), new Record().withSequenceNumber("2")) ); int messageCount = undertest.poll(); assertThat(messageCount, is(2)); final ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); verify(processor, times(2)).process(exchangeCaptor.capture(), any(AsyncCallback.class)); assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getSequenceNumber(), is("1")); assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getSequenceNumber(), is("2")); }
@Test public void exchangePropertiesAreSet() throws Exception { String partitionKey = "partitionKey"; String sequenceNumber = "1"; when(kinesisClient.getRecords(any(GetRecordsRequest.class))) .thenReturn(new GetRecordsResult() .withNextShardIterator("nextShardIterator") .withRecords(new Record() .withSequenceNumber(sequenceNumber) .withApproximateArrivalTimestamp(new Date(42)) .withPartitionKey(partitionKey) ) ); undertest.poll(); final ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); verify(processor).process(exchangeCaptor.capture(), any(AsyncCallback.class)); assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.APPROX_ARRIVAL_TIME, long.class), is(42L)); assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.PARTITION_KEY, String.class), is(partitionKey)); assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.SEQUENCE_NUMBER, String.class), is(sequenceNumber)); }
public String processNextIterator(String iterator) { GetRecordsRequest getRequest = new GetRecordsRequest(); getRequest.setLimit(1000); getRequest.setShardIterator(iterator); // call "get" operation and get everything in this shard range GetRecordsResult getResponse = client.getRecords(getRequest); iterator = getResponse.getNextShardIterator(); List<Record> records = getResponse.getRecords(); processResponseRecords(records); return iterator; }
public static List<com.amazonaws.services.kinesis.model.Record> getPreviewRecords( ClientConfiguration awsClientConfig, KinesisConfigBean conf, int maxBatchSize, GetShardIteratorRequest getShardIteratorRequest ) throws StageException { AmazonKinesis kinesisClient = getKinesisClient(awsClientConfig, conf); GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest); String shardIterator = getShardIteratorResult.getShardIterator(); GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(maxBatchSize); GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest); return getRecordsResult.getRecords(); }
@Override public List<SourceRecord> poll() throws InterruptedException { List<SourceRecord> records; try { GetRecordsResult recordsResult = this.kinesisClient.getRecords(this.recordsRequest); records = new ArrayList<>(recordsResult.getRecords().size()); log.trace("poll() - {} record(s) returned from shard {}.", this.config.kinesisShardId); for (Record record : recordsResult.getRecords()) { SourceRecord sourceRecord = this.recordConverter.sourceRecord(this.config.kinesisStreamName, this.config.kinesisShardId, record); records.add(sourceRecord); } log.trace("poll() - Changing shard iterator to {}", recordsResult.getNextShardIterator()); this.recordsRequest.setShardIterator(recordsResult.getNextShardIterator()); } catch (ProvisionedThroughputExceededException ex) { log.warn("poll() - Throughput exceeded sleeping {} ms", this.config.kinesisThroughputExceededBackoffMs, ex); this.time.sleep(this.config.kinesisThroughputExceededBackoffMs); return new ArrayList<>(); } if (records.isEmpty()) { log.trace("poll() - No records returned. Sleeping {} ms.", this.config.kinesisEmptyRecordsBackoffMs); this.time.sleep(this.config.kinesisEmptyRecordsBackoffMs); } return records; }
@Test public void sourceOffsets() throws InterruptedException { final String SEQUENCE_NUMBER = "asdfasdfddsa"; Map<String, Object> sourceOffset = ImmutableMap.of(RecordConverter.FIELD_SEQUENCE_NUMBER, SEQUENCE_NUMBER); when(this.offsetStorageReader.offset(anyMap())).thenReturn(sourceOffset); when(this.kinesisClient.getShardIterator(any())).thenReturn( new GetShardIteratorResult().withShardIterator("dfasdfsadfasdf") ); this.task.start(settings); GetRecordsResult recordsResult = new GetRecordsResult() .withNextShardIterator("dsfargadsfasdfasda") .withRecords(TestData.record()) .withMillisBehindLatest(0L); when(this.kinesisClient.getRecords(any())).thenReturn(recordsResult); List<SourceRecord> records = this.task.poll(); assertNotNull(records, "records should not be null."); assertFalse(records.isEmpty(), "records should not be empty."); verify(this.offsetStorageReader, atLeastOnce()).offset(anyMap()); GetShardIteratorRequest expectedIteratorRequest = new GetShardIteratorRequest() .withShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER) .withShardId(this.config.kinesisShardId) .withStreamName(this.config.kinesisStreamName) .withStartingSequenceNumber(SEQUENCE_NUMBER); verify(this.kinesisClient, atLeastOnce()).getShardIterator(expectedIteratorRequest); }
@Override public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) { String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":"); int shardId = parseInt(shardIteratorParts[0]); int startingRecord = parseInt(shardIteratorParts[1]); List<Record> shardData = shardedData.get(shardId); int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size()); int fromIndex = min(startingRecord, toIndex); return new GetRecordsResult() .withRecords(shardData.subList(fromIndex, toIndex)) .withNextShardIterator(String.format("%s:%s", shardId, toIndex)) .withMillisBehindLatest(0L); }
/** * {@inheritDoc} */ @Override public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException { final GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(maxRecordsToGet); GetRecordsResult getRecordsResult = null; int attempt = 0; while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) { try { getRecordsResult = kinesisClient.getRecords(getRecordsRequest); } catch (AmazonServiceException ex) { if (isRecoverableException(ex)) { long backoffMillis = fullJitterBackoff( getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++); LOG.warn("Got recoverable AmazonServiceException. Backing off for " + backoffMillis + " millis (" + ex.getErrorMessage() + ")"); Thread.sleep(backoffMillis); } else { throw ex; } } } if (getRecordsResult == null) { throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts + " retry attempts returned ProvisionedThroughputExceededException."); } return getRecordsResult; }
@Override public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { // assuming that the maxRecordsToGet is always large enough return new GetRecordsResult() .withRecords(shardItrToRecordBatch.get(shardIterator)) .withMillisBehindLatest(millisBehindLatest) .withNextShardIterator( (Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls - 1) ? null : String.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard iterator is null }
/** * {@inheritDoc} */ @Override public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException { final GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(maxRecordsToGet); GetRecordsResult getRecordsResult = null; int attempt = 0; while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) { try { getRecordsResult = kinesisClient.getRecords(getRecordsRequest); } catch (ProvisionedThroughputExceededException ex) { long backoffMillis = fullJitterBackoff( getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++); LOG.warn("Got ProvisionedThroughputExceededException. Backing off for " + backoffMillis + " millis."); Thread.sleep(backoffMillis); } } if (getRecordsResult == null) { throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts + " retry attempts returned ProvisionedThroughputExceededException."); } return getRecordsResult; }
@Override public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { // assuming that the maxRecordsToGet is always large enough return new GetRecordsResult() .withRecords(shardItrToRecordBatch.get(shardIterator)) .withNextShardIterator( (Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls-1) ? null : String.valueOf(Integer.valueOf(shardIterator)+1)); // last next shard iterator is null }
@Override public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) { return null; }
public static void main(String[] args) { AmazonKinesisClient kinesisClient = Helper.setupKinesisClient(); // Retrieve the Shards from a Stream DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(Helper.properties().getProperty("kinesisStreamName")); DescribeStreamResult describeStreamResult; List<Shard> shards = new ArrayList<>(); String lastShardId = null; do { describeStreamRequest.setExclusiveStartShardId(lastShardId); describeStreamResult = kinesisClient.describeStream(describeStreamRequest); shards.addAll(describeStreamResult.getStreamDescription().getShards()); if (shards.size() > 0) { lastShardId = shards.get(shards.size() - 1).getShardId(); } } while (describeStreamResult.getStreamDescription().getHasMoreShards()); // Get Data from the Shards in a Stream // Hard-coded to use only 1 shard String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(Helper.properties().getProperty("kinesisStreamName")); getShardIteratorRequest.setShardId(shards.get(0).getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator(); // Continuously read data records from shard. List<Record> records; while (true) { // Create new GetRecordsRequest with existing shardIterator. // Set maximum records to return to 1000. GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(1000); GetRecordsResult result = kinesisClient.getRecords(getRecordsRequest); // Put result into record list. Result may be empty. records = result.getRecords(); // Print records for (Record record : records) { ByteBuffer byteBuffer = record.getData(); System.out.println(String.format("Seq No: %s - %s", record.getSequenceNumber(), new String(byteBuffer.array()))); } try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); } }
/** * Get the next batch of data records using a specific shard iterator. * * @param shardIterator a shard iterator that encodes info about which shard to read and where to start reading * @param maxRecordsToGet the maximum amount of records to retrieve for this batch * @return the batch of retrieved records, also with a shard iterator that can be used to get the next batch * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the * operation has exceeded the rate limit; this exception will be thrown * if the backoff is interrupted. */ GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException;
/** * Get the next batch of data records using a specific shard iterator * * @param shardIterator a shard iterator that encodes info about which shard to read and where to start reading * @param maxRecordsToGet the maximum amount of records to retrieve for this batch * @return the batch of retrieved records, also with a shard iterator that can be used to get the next batch * @throws InterruptedException this method will retry with backoff if AWS Kinesis complains that the * operation has exceeded the rate limit; this exception will be thrown * if the backoff is interrupted. */ GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException;