/** * Gets records from Kinesis and deaggregates them if needed. * * @return list of deaggregated records * @throws TransientKinesisException - in case of recoverable situation */ public GetKinesisRecordsResult getRecords(final String shardIterator, final String streamName, final String shardId, final Integer limit) throws TransientKinesisException { return wrapExceptions(new Callable<GetKinesisRecordsResult>() { @Override public GetKinesisRecordsResult call() throws Exception { GetRecordsResult response = kinesis.getRecords(new GetRecordsRequest() .withShardIterator(shardIterator) .withLimit(limit)); return new GetKinesisRecordsResult( UserRecord.deaggregate(response.getRecords()), response.getNextShardIterator(), response.getMillisBehindLatest(), streamName, shardId); } }); }
@Override protected int poll() throws Exception { GetRecordsRequest req = new GetRecordsRequest() .withShardIterator(getShardItertor()) .withLimit(getEndpoint().getMaxResultsPerRequest()); GetRecordsResult result = getClient().getRecords(req); Queue<Exchange> exchanges = createExchanges(result.getRecords()); int processedExchangeCount = processBatch(CastUtils.cast(exchanges)); // May cache the last successful sequence number, and pass it to the // getRecords request. That way, on the next poll, we start from where // we left off, however, I don't know what happens to subsequent // exchanges when an earlier echangee fails. currentShardIterator = result.getNextShardIterator(); return processedExchangeCount; }
@Before public void setup() throws Exception { KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", component); endpoint.setAmazonKinesisClient(kinesisClient); endpoint.setIteratorType(ShardIteratorType.LATEST); undertest = new KinesisConsumer(endpoint, processor); when(kinesisClient.getRecords(any(GetRecordsRequest.class))) .thenReturn(new GetRecordsResult() .withNextShardIterator("nextShardIterator") ); when(kinesisClient.describeStream(any(DescribeStreamRequest.class))) .thenReturn(new DescribeStreamResult() .withStreamDescription(new StreamDescription() .withShards(new Shard().withShardId("shardId")) ) ); when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))) .thenReturn(new GetShardIteratorResult() .withShardIterator("shardIterator") ); }
@Test public void recordsAreSentToTheProcessor() throws Exception { when(kinesisClient.getRecords(any(GetRecordsRequest.class))) .thenReturn(new GetRecordsResult() .withNextShardIterator("nextShardIterator") .withRecords(new Record().withSequenceNumber("1"), new Record().withSequenceNumber("2")) ); int messageCount = undertest.poll(); assertThat(messageCount, is(2)); final ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); verify(processor, times(2)).process(exchangeCaptor.capture(), any(AsyncCallback.class)); assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getSequenceNumber(), is("1")); assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getSequenceNumber(), is("2")); }
@Test public void exchangePropertiesAreSet() throws Exception { String partitionKey = "partitionKey"; String sequenceNumber = "1"; when(kinesisClient.getRecords(any(GetRecordsRequest.class))) .thenReturn(new GetRecordsResult() .withNextShardIterator("nextShardIterator") .withRecords(new Record() .withSequenceNumber(sequenceNumber) .withApproximateArrivalTimestamp(new Date(42)) .withPartitionKey(partitionKey) ) ); undertest.poll(); final ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); verify(processor).process(exchangeCaptor.capture(), any(AsyncCallback.class)); assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.APPROX_ARRIVAL_TIME, long.class), is(42L)); assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.PARTITION_KEY, String.class), is(partitionKey)); assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.SEQUENCE_NUMBER, String.class), is(sequenceNumber)); }
public String processNextIterator(String iterator) { GetRecordsRequest getRequest = new GetRecordsRequest(); getRequest.setLimit(1000); getRequest.setShardIterator(iterator); // call "get" operation and get everything in this shard range GetRecordsResult getResponse = client.getRecords(getRequest); iterator = getResponse.getNextShardIterator(); List<Record> records = getResponse.getRecords(); processResponseRecords(records); return iterator; }
public static List<com.amazonaws.services.kinesis.model.Record> getPreviewRecords( ClientConfiguration awsClientConfig, KinesisConfigBean conf, int maxBatchSize, GetShardIteratorRequest getShardIteratorRequest ) throws StageException { AmazonKinesis kinesisClient = getKinesisClient(awsClientConfig, conf); GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest); String shardIterator = getShardIteratorResult.getShardIterator(); GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(maxBatchSize); GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest); return getRecordsResult.getRecords(); }
@Override public void start(Map<String, String> settings) { this.config = new KinesisSourceConnectorConfig(settings); this.kinesisClient = this.kinesisClientFactory.create(this.config); this.sourcePartition = ImmutableMap.of(RecordConverter.FIELD_SHARD_ID, this.config.kinesisShardId); Map<String, Object> lastOffset = this.context.offsetStorageReader().offset(this.sourcePartition); GetShardIteratorRequest shardIteratorRequest = new GetShardIteratorRequest() .withShardId(this.config.kinesisShardId) .withStreamName(this.config.kinesisStreamName); if (null != lastOffset && !lastOffset.isEmpty()) { String startingSequenceNumber = (String) lastOffset.get(RecordConverter.FIELD_SEQUENCE_NUMBER); log.info("start() - Starting iterator after last processed sequence number of '{}'", startingSequenceNumber); shardIteratorRequest.setShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); shardIteratorRequest.setStartingSequenceNumber(startingSequenceNumber); } else { log.info("start() - Setting Shard Iterator Type to {} for {}", this.config.kinesisPosition, this.config.kinesisShardId); shardIteratorRequest.setShardIteratorType(this.config.kinesisPosition); } GetShardIteratorResult shardIteratorResult = this.kinesisClient.getShardIterator(shardIteratorRequest); log.info("start() - Using Shard Iterator {}", shardIteratorResult.getShardIterator()); this.recordsRequest = new GetRecordsRequest() .withLimit(this.config.kinesisRecordLimit) .withShardIterator(shardIteratorResult.getShardIterator()); this.recordConverter = new RecordConverter(this.config); }
@Override public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) { String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":"); int shardId = parseInt(shardIteratorParts[0]); int startingRecord = parseInt(shardIteratorParts[1]); List<Record> shardData = shardedData.get(shardId); int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size()); int fromIndex = min(startingRecord, toIndex); return new GetRecordsResult() .withRecords(shardData.subList(fromIndex, toIndex)) .withNextShardIterator(String.format("%s:%s", shardId, toIndex)) .withMillisBehindLatest(0L); }
/** * {@inheritDoc} */ @Override public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException { final GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(maxRecordsToGet); GetRecordsResult getRecordsResult = null; int attempt = 0; while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) { try { getRecordsResult = kinesisClient.getRecords(getRecordsRequest); } catch (AmazonServiceException ex) { if (isRecoverableException(ex)) { long backoffMillis = fullJitterBackoff( getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++); LOG.warn("Got recoverable AmazonServiceException. Backing off for " + backoffMillis + " millis (" + ex.getErrorMessage() + ")"); Thread.sleep(backoffMillis); } else { throw ex; } } } if (getRecordsResult == null) { throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts + " retry attempts returned ProvisionedThroughputExceededException."); } return getRecordsResult; }
/** * {@inheritDoc} */ @Override public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) throws InterruptedException { final GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(maxRecordsToGet); GetRecordsResult getRecordsResult = null; int attempt = 0; while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) { try { getRecordsResult = kinesisClient.getRecords(getRecordsRequest); } catch (ProvisionedThroughputExceededException ex) { long backoffMillis = fullJitterBackoff( getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++); LOG.warn("Got ProvisionedThroughputExceededException. Backing off for " + backoffMillis + " millis."); Thread.sleep(backoffMillis); } } if (getRecordsResult == null) { throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts + " retry attempts returned ProvisionedThroughputExceededException."); } return getRecordsResult; }
@Test public void itUsesTheShardIteratorOnPolls() throws Exception { undertest.poll(); final ArgumentCaptor<GetRecordsRequest> getRecordsReqCap = ArgumentCaptor.forClass(GetRecordsRequest.class); verify(kinesisClient).getRecords(getRecordsReqCap.capture()); assertThat(getRecordsReqCap.getValue().getShardIterator(), is("shardIterator")); }
@Test public void itUsesTheShardIteratorOnSubsiquentPolls() throws Exception { undertest.poll(); undertest.poll(); final ArgumentCaptor<GetRecordsRequest> getRecordsReqCap = ArgumentCaptor.forClass(GetRecordsRequest.class); verify(kinesisClient, times(1)).describeStream(any(DescribeStreamRequest.class)); verify(kinesisClient, times(1)).getShardIterator(any(GetShardIteratorRequest.class)); verify(kinesisClient, times(2)).getRecords(getRecordsReqCap.capture()); assertThat(getRecordsReqCap.getAllValues().get(0).getShardIterator(), is("shardIterator")); assertThat(getRecordsReqCap.getAllValues().get(1).getShardIterator(), is("nextShardIterator")); }
/** * Retrieves the next batch of records from Kinesis using the shard iterator. * * Most of the time this results in one getRecords call. However we allow for * a call to return an empty list, and we'll try again if we are far enough * away from the latest record. */ private void getKinesisRecords() throws ResourceNotFoundException { // Normally this loop will execute once, but we have to allow for the odd Kinesis // behavior, per the docs: // A single call to getRecords might return an empty record list, even when the shard contains // more records at later sequence numbers boolean fetchedRecords = false; int attempts = 0; while (!fetchedRecords && attempts < fetchAttempts) { long now = System.currentTimeMillis(); if (now - lastReadTime <= sleepTime) { try { Thread.sleep(now - lastReadTime); } catch (InterruptedException e) { log.error("Sleep interrupted.", e); } } getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(batchSize); getRecordsResult = clientManager.getClient().getRecords(getRecordsRequest); lastReadTime = System.currentTimeMillis(); shardIterator = getRecordsResult.getNextShardIterator(); kinesisRecords = getRecordsResult.getRecords(); if (kinesisConnectorConfig.isLogBatches()) { log.info("Fetched %d records from Kinesis. MillisBehindLatest=%d", kinesisRecords.size(), getRecordsResult.getMillisBehindLatest()); } fetchedRecords = (kinesisRecords.size() > 0 || getMillisBehindLatest() <= MILLIS_BEHIND_LIMIT); attempts++; } listIterator = kinesisRecords.iterator(); batchesRead++; messagesRead += kinesisRecords.size(); }
public static void main(String[] args) { AmazonKinesisClient kinesisClient = Helper.setupKinesisClient(); // Retrieve the Shards from a Stream DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(Helper.properties().getProperty("kinesisStreamName")); DescribeStreamResult describeStreamResult; List<Shard> shards = new ArrayList<>(); String lastShardId = null; do { describeStreamRequest.setExclusiveStartShardId(lastShardId); describeStreamResult = kinesisClient.describeStream(describeStreamRequest); shards.addAll(describeStreamResult.getStreamDescription().getShards()); if (shards.size() > 0) { lastShardId = shards.get(shards.size() - 1).getShardId(); } } while (describeStreamResult.getStreamDescription().getHasMoreShards()); // Get Data from the Shards in a Stream // Hard-coded to use only 1 shard String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(Helper.properties().getProperty("kinesisStreamName")); getShardIteratorRequest.setShardId(shards.get(0).getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator(); // Continuously read data records from shard. List<Record> records; while (true) { // Create new GetRecordsRequest with existing shardIterator. // Set maximum records to return to 1000. GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(1000); GetRecordsResult result = kinesisClient.getRecords(getRecordsRequest); // Put result into record list. Result may be empty. records = result.getRecords(); // Print records for (Record record : records) { ByteBuffer byteBuffer = record.getData(); System.out.println(String.format("Seq No: %s - %s", record.getSequenceNumber(), new String(byteBuffer.array()))); } try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); } }