@Test public void noSourceOffsets() throws InterruptedException { when(this.kinesisClient.getShardIterator(any())).thenReturn( new GetShardIteratorResult().withShardIterator("dfasdfsadfasdf") ); this.task.start(settings); GetRecordsResult recordsResult = new GetRecordsResult() .withNextShardIterator("dsfargadsfasdfasda") .withRecords(TestData.record()) .withMillisBehindLatest(0L); when(this.kinesisClient.getRecords(any())).thenReturn(recordsResult); List<SourceRecord> records = this.task.poll(); assertNotNull(records, "records should not be null."); assertFalse(records.isEmpty(), "records should not be empty."); verify(this.kinesisClient, atLeastOnce()).getShardIterator(any()); verify(this.kinesisClient, atLeastOnce()).getRecords(any()); }
@Test public void throughputExceeded() throws InterruptedException { final String SEQUENCE_NUMBER = "asdfasdfddsa"; Map<String, Object> sourceOffset = ImmutableMap.of(RecordConverter.FIELD_SEQUENCE_NUMBER, SEQUENCE_NUMBER); when(this.offsetStorageReader.offset(anyMap())).thenReturn(sourceOffset); when(this.kinesisClient.getShardIterator(any())).thenReturn( new GetShardIteratorResult().withShardIterator("dfasdfsadfasdf") ); this.task.start(settings); when(this.kinesisClient.getRecords(any())).thenThrow(new ProvisionedThroughputExceededException("")); List<SourceRecord> records = this.task.poll(); assertNotNull(records, "records should not be null"); assertTrue(records.isEmpty(), "records should be empty."); verify(this.task.time, atLeastOnce()).sleep(this.config.kinesisThroughputExceededBackoffMs); }
@Test public void noRecords() throws InterruptedException { final String SEQUENCE_NUMBER = "asdfasdfddsa"; Map<String, Object> sourceOffset = ImmutableMap.of(RecordConverter.FIELD_SEQUENCE_NUMBER, SEQUENCE_NUMBER); when(this.offsetStorageReader.offset(anyMap())).thenReturn(sourceOffset); when(this.kinesisClient.getShardIterator(any())).thenReturn( new GetShardIteratorResult().withShardIterator("dfasdfsadfasdf") ); this.task.start(settings); GetRecordsResult recordsResult = new GetRecordsResult() .withNextShardIterator("dsfargadsfasdfasda") .withRecords(Arrays.asList()) .withMillisBehindLatest(0L); when(this.kinesisClient.getRecords(any())).thenReturn(recordsResult); List<SourceRecord> records = this.task.poll(); assertNotNull(records, "records should not be null"); assertTrue(records.isEmpty(), "records should be empty."); verify(this.task.time, atLeastOnce()).sleep(this.config.kinesisEmptyRecordsBackoffMs); }
@Test public void shouldReturnIteratorStartingWithTimestamp() throws Exception { Instant timestamp = Instant.now(); given(kinesis.getShardIterator(new GetShardIteratorRequest() .withStreamName(STREAM) .withShardId(SHARD_1) .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) .withTimestamp(timestamp.toDate()) )).willReturn(new GetShardIteratorResult() .withShardIterator(SHARD_ITERATOR)); String stream = underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp); assertThat(stream).isEqualTo(SHARD_ITERATOR); }
private String getShardIterator(GetShardIteratorRequest getShardIteratorRequest) throws InterruptedException { GetShardIteratorResult getShardIteratorResult = null; int attempt = 0; while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) { try { getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest); } catch (AmazonServiceException ex) { if (isRecoverableException(ex)) { long backoffMillis = fullJitterBackoff( getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++); LOG.warn("Got recoverable AmazonServiceException. Backing off for " + backoffMillis + " millis (" + ex.getErrorMessage() + ")"); Thread.sleep(backoffMillis); } else { throw ex; } } } if (getShardIteratorResult == null) { throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts + " retry attempts returned ProvisionedThroughputExceededException."); } return getShardIteratorResult.getShardIterator(); }
/** * {@inheritDoc} */ @Override public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException { GetShardIteratorResult getShardIteratorResult = null; int attempt = 0; while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) { try { getShardIteratorResult = kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum); } catch (ProvisionedThroughputExceededException ex) { long backoffMillis = fullJitterBackoff( getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++); LOG.warn("Got ProvisionedThroughputExceededException. Backing off for " + backoffMillis + " millis."); Thread.sleep(backoffMillis); } } if (getShardIteratorResult == null) { throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts + " retry attempts returned ProvisionedThroughputExceededException."); } return getShardIteratorResult.getShardIterator(); }
@Before public void setup() throws Exception { KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", component); endpoint.setAmazonKinesisClient(kinesisClient); endpoint.setIteratorType(ShardIteratorType.LATEST); undertest = new KinesisConsumer(endpoint, processor); when(kinesisClient.getRecords(any(GetRecordsRequest.class))) .thenReturn(new GetRecordsResult() .withNextShardIterator("nextShardIterator") ); when(kinesisClient.describeStream(any(DescribeStreamRequest.class))) .thenReturn(new DescribeStreamResult() .withStreamDescription(new StreamDescription() .withShards(new Shard().withShardId("shardId")) ) ); when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))) .thenReturn(new GetShardIteratorResult() .withShardIterator("shardIterator") ); }
public static List<com.amazonaws.services.kinesis.model.Record> getPreviewRecords( ClientConfiguration awsClientConfig, KinesisConfigBean conf, int maxBatchSize, GetShardIteratorRequest getShardIteratorRequest ) throws StageException { AmazonKinesis kinesisClient = getKinesisClient(awsClientConfig, conf); GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest); String shardIterator = getShardIteratorResult.getShardIterator(); GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(maxBatchSize); GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest); return getRecordsResult.getRecords(); }
@Override public void start(Map<String, String> settings) { this.config = new KinesisSourceConnectorConfig(settings); this.kinesisClient = this.kinesisClientFactory.create(this.config); this.sourcePartition = ImmutableMap.of(RecordConverter.FIELD_SHARD_ID, this.config.kinesisShardId); Map<String, Object> lastOffset = this.context.offsetStorageReader().offset(this.sourcePartition); GetShardIteratorRequest shardIteratorRequest = new GetShardIteratorRequest() .withShardId(this.config.kinesisShardId) .withStreamName(this.config.kinesisStreamName); if (null != lastOffset && !lastOffset.isEmpty()) { String startingSequenceNumber = (String) lastOffset.get(RecordConverter.FIELD_SEQUENCE_NUMBER); log.info("start() - Starting iterator after last processed sequence number of '{}'", startingSequenceNumber); shardIteratorRequest.setShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); shardIteratorRequest.setStartingSequenceNumber(startingSequenceNumber); } else { log.info("start() - Setting Shard Iterator Type to {} for {}", this.config.kinesisPosition, this.config.kinesisShardId); shardIteratorRequest.setShardIteratorType(this.config.kinesisPosition); } GetShardIteratorResult shardIteratorResult = this.kinesisClient.getShardIterator(shardIteratorRequest); log.info("start() - Using Shard Iterator {}", shardIteratorResult.getShardIterator()); this.recordsRequest = new GetRecordsRequest() .withLimit(this.config.kinesisRecordLimit) .withShardIterator(shardIteratorResult.getShardIterator()); this.recordConverter = new RecordConverter(this.config); }
@Test public void sourceOffsets() throws InterruptedException { final String SEQUENCE_NUMBER = "asdfasdfddsa"; Map<String, Object> sourceOffset = ImmutableMap.of(RecordConverter.FIELD_SEQUENCE_NUMBER, SEQUENCE_NUMBER); when(this.offsetStorageReader.offset(anyMap())).thenReturn(sourceOffset); when(this.kinesisClient.getShardIterator(any())).thenReturn( new GetShardIteratorResult().withShardIterator("dfasdfsadfasdf") ); this.task.start(settings); GetRecordsResult recordsResult = new GetRecordsResult() .withNextShardIterator("dsfargadsfasdfasda") .withRecords(TestData.record()) .withMillisBehindLatest(0L); when(this.kinesisClient.getRecords(any())).thenReturn(recordsResult); List<SourceRecord> records = this.task.poll(); assertNotNull(records, "records should not be null."); assertFalse(records.isEmpty(), "records should not be empty."); verify(this.offsetStorageReader, atLeastOnce()).offset(anyMap()); GetShardIteratorRequest expectedIteratorRequest = new GetShardIteratorRequest() .withShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER) .withShardId(this.config.kinesisShardId) .withStreamName(this.config.kinesisStreamName) .withStartingSequenceNumber(SEQUENCE_NUMBER); verify(this.kinesisClient, atLeastOnce()).getShardIterator(expectedIteratorRequest); }
@Test public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception { given(kinesis.getShardIterator(new GetShardIteratorRequest() .withStreamName(STREAM) .withShardId(SHARD_1) .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) .withStartingSequenceNumber(SEQUENCE_NUMBER) )).willReturn(new GetShardIteratorResult() .withShardIterator(SHARD_ITERATOR)); String stream = underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null); assertThat(stream).isEqualTo(SHARD_ITERATOR); }
@Override public GetShardIteratorResult getShardIterator( GetShardIteratorRequest getShardIteratorRequest) { ShardIteratorType shardIteratorType = ShardIteratorType.fromValue( getShardIteratorRequest.getShardIteratorType()); String shardIterator; if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) { shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0); } else { throw new RuntimeException("Not implemented"); } return new GetShardIteratorResult().withShardIterator(shardIterator); }
@Override public GetShardIteratorResult getShardIterator(String streamName, String shardId, String shardIteratorType, String startingSequenceNumber) { throw new RuntimeException("Not implemented"); }
private String getShardItertor() { // either return a cached one or get a new one via a GetShardIterator request. if (currentShardIterator == null) { String shardId; //If ShardId supplied use it, else choose first one if (!getEndpoint().getShardId().isEmpty()) { shardId = getEndpoint().getShardId(); } else { DescribeStreamRequest req1 = new DescribeStreamRequest() .withStreamName(getEndpoint().getStreamName()); DescribeStreamResult res1 = getClient().describeStream(req1); shardId = res1.getStreamDescription().getShards().get(0).getShardId(); } LOG.debug("ShardId is: {}", shardId); GetShardIteratorRequest req = new GetShardIteratorRequest() .withStreamName(getEndpoint().getStreamName()) .withShardId(shardId) .withShardIteratorType(getEndpoint().getIteratorType()); if (hasSequenceNumber()) { req.withStartingSequenceNumber(getEndpoint().getSequenceNumber()); } GetShardIteratorResult result = getClient().getShardIterator(req); currentShardIterator = result.getShardIterator(); } LOG.debug("Shard Iterator is: {}", currentShardIterator); return currentShardIterator; }
private void getIterator() throws ResourceNotFoundException { GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(split.getStreamName()); getShardIteratorRequest.setShardId(split.getShardId()); // Explanation: when we have a sequence number from a prior read or checkpoint, always use it. // Otherwise, decide if starting at a timestamp or the trim horizon based on configuration. // If starting at a timestamp, sue the session variable ITER_START_TIMESTAMP when given, otherwise // fallback on starting at ITER_OFFSET_SECONDS from timestamp. if (lastReadSeqNo == null) { // Important: shard iterator type AT_TIMESTAMP requires 1.11.x or above of the AWS SDK. if (SessionVariables.getIterFromTimestamp(session)) { getShardIteratorRequest.setShardIteratorType("AT_TIMESTAMP"); long iterStartTs = SessionVariables.getIterStartTimestamp(session); if (iterStartTs == 0) { long startTs = System.currentTimeMillis() - (SessionVariables.getIterOffsetSeconds(session) * 1000); getShardIteratorRequest.setTimestamp(new Date(startTs)); } else { getShardIteratorRequest.setTimestamp(new Date(iterStartTs)); } } else { getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); } } else { getShardIteratorRequest.setShardIteratorType("AFTER_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(lastReadSeqNo); } GetShardIteratorResult getShardIteratorResult = clientManager.getClient().getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator(); }
@Override public GetShardIteratorResult getShardIterator(GetShardIteratorRequest getShardIteratorRequest) throws AmazonServiceException, AmazonClientException { ShardIterator iter = ShardIterator.fromStreamAndShard(getShardIteratorRequest.getStreamName(), getShardIteratorRequest.getShardId()); if (iter != null) { InternalStream theStream = this.getStream(iter.streamId); if (theStream != null) { String seqAsString = getShardIteratorRequest.getStartingSequenceNumber(); if (seqAsString != null && !seqAsString.isEmpty() && getShardIteratorRequest.getShardIteratorType().equals("AFTER_SEQUENCE_NUMBER")) { int sequence = Integer.parseInt(seqAsString); iter.recordIndex = sequence + 1; } else { iter.recordIndex = 100; } GetShardIteratorResult result = new GetShardIteratorResult(); return result.withShardIterator(iter.makeString()); } else { throw new AmazonClientException("Unknown stream or bad shard iterator!"); } } else { throw new AmazonClientException("Bad stream or shard iterator!"); } }
@Override public GetShardIteratorResult getShardIterator(String streamName, String shardId, String shardIteratorType) { throw new RuntimeException("Not implemented"); }
@Override public GetShardIteratorResult getShardIterator(String s, String s1, String s2) throws AmazonServiceException, AmazonClientException { return null; }
@Override public GetShardIteratorResult getShardIterator(String s, String s1, String s2, String s3) throws AmazonServiceException, AmazonClientException { return null; }
public static void main(String[] args) { AmazonKinesisClient kinesisClient = Helper.setupKinesisClient(); // Retrieve the Shards from a Stream DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(Helper.properties().getProperty("kinesisStreamName")); DescribeStreamResult describeStreamResult; List<Shard> shards = new ArrayList<>(); String lastShardId = null; do { describeStreamRequest.setExclusiveStartShardId(lastShardId); describeStreamResult = kinesisClient.describeStream(describeStreamRequest); shards.addAll(describeStreamResult.getStreamDescription().getShards()); if (shards.size() > 0) { lastShardId = shards.get(shards.size() - 1).getShardId(); } } while (describeStreamResult.getStreamDescription().getHasMoreShards()); // Get Data from the Shards in a Stream // Hard-coded to use only 1 shard String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(Helper.properties().getProperty("kinesisStreamName")); getShardIteratorRequest.setShardId(shards.get(0).getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator(); // Continuously read data records from shard. List<Record> records; while (true) { // Create new GetRecordsRequest with existing shardIterator. // Set maximum records to return to 1000. GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(1000); GetRecordsResult result = kinesisClient.getRecords(getRecordsRequest); // Put result into record list. Result may be empty. records = result.getRecords(); // Print records for (Record record : records) { ByteBuffer byteBuffer = record.getData(); System.out.println(String.format("Seq No: %s - %s", record.getSequenceNumber(), new String(byteBuffer.array()))); } try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); } }