public String getShardIterator(final String streamName, final String shardId, final ShardIteratorType shardIteratorType, final String startingSequenceNumber, final Instant timestamp) throws TransientKinesisException { final Date date = timestamp != null ? timestamp.toDate() : null; return wrapExceptions(new Callable<String>() { @Override public String call() throws Exception { return kinesis.getShardIterator(new GetShardIteratorRequest() .withStreamName(streamName) .withShardId(shardId) .withShardIteratorType(shardIteratorType) .withStartingSequenceNumber(startingSequenceNumber) .withTimestamp(date) ).getShardIterator(); } }); }
@Test public void shouldReturnIteratorStartingWithTimestamp() throws Exception { Instant timestamp = Instant.now(); given(kinesis.getShardIterator(new GetShardIteratorRequest() .withStreamName(STREAM) .withShardId(SHARD_1) .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) .withTimestamp(timestamp.toDate()) )).willReturn(new GetShardIteratorResult() .withShardIterator(SHARD_ITERATOR)); String stream = underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp); assertThat(stream).isEqualTo(SHARD_ITERATOR); }
private void shouldHandleGetShardIteratorError( Exception thrownException, Class<? extends Exception> expectedExceptionClass) { GetShardIteratorRequest request = new GetShardIteratorRequest() .withStreamName(STREAM) .withShardId(SHARD_1) .withShardIteratorType(ShardIteratorType.LATEST); given(kinesis.getShardIterator(request)).willThrow(thrownException); try { underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null); failBecauseExceptionWasNotThrown(expectedExceptionClass); } catch (Exception e) { assertThat(e).isExactlyInstanceOf(expectedExceptionClass); } finally { reset(kinesis); } }
private String getShardIterator(GetShardIteratorRequest getShardIteratorRequest) throws InterruptedException { GetShardIteratorResult getShardIteratorResult = null; int attempt = 0; while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) { try { getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest); } catch (AmazonServiceException ex) { if (isRecoverableException(ex)) { long backoffMillis = fullJitterBackoff( getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++); LOG.warn("Got recoverable AmazonServiceException. Backing off for " + backoffMillis + " millis (" + ex.getErrorMessage() + ")"); Thread.sleep(backoffMillis); } else { throw ex; } } } if (getShardIteratorResult == null) { throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts + " retry attempts returned ProvisionedThroughputExceededException."); } return getShardIteratorResult.getShardIterator(); }
@Before public void setup() throws Exception { KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", component); endpoint.setAmazonKinesisClient(kinesisClient); endpoint.setIteratorType(ShardIteratorType.LATEST); undertest = new KinesisConsumer(endpoint, processor); when(kinesisClient.getRecords(any(GetRecordsRequest.class))) .thenReturn(new GetRecordsResult() .withNextShardIterator("nextShardIterator") ); when(kinesisClient.describeStream(any(DescribeStreamRequest.class))) .thenReturn(new DescribeStreamResult() .withStreamDescription(new StreamDescription() .withShards(new Shard().withShardId("shardId")) ) ); when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))) .thenReturn(new GetShardIteratorResult() .withShardIterator("shardIterator") ); }
@Test public void itObtainsAShardIteratorOnFirstPollForSequenceNumber() throws Exception { undertest.getEndpoint().setSequenceNumber("12345"); undertest.getEndpoint().setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); undertest.poll(); final ArgumentCaptor<DescribeStreamRequest> describeStreamReqCap = ArgumentCaptor.forClass(DescribeStreamRequest.class); final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(kinesisClient).describeStream(describeStreamReqCap.capture()); assertThat(describeStreamReqCap.getValue().getStreamName(), is("streamName")); verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture()); assertThat(getShardIteratorReqCap.getValue().getStreamName(), is("streamName")); assertThat(getShardIteratorReqCap.getValue().getShardId(), is("shardId")); assertThat(getShardIteratorReqCap.getValue().getShardIteratorType(), is("AFTER_SEQUENCE_NUMBER")); assertThat(getShardIteratorReqCap.getValue().getStartingSequenceNumber(), is("12345")); }
public static List<com.amazonaws.services.kinesis.model.Record> getPreviewRecords( ClientConfiguration awsClientConfig, KinesisConfigBean conf, int maxBatchSize, GetShardIteratorRequest getShardIteratorRequest ) throws StageException { AmazonKinesis kinesisClient = getKinesisClient(awsClientConfig, conf); GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest); String shardIterator = getShardIteratorResult.getShardIterator(); GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(maxBatchSize); GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest); return getRecordsResult.getRecords(); }
@Override public void start(Map<String, String> settings) { this.config = new KinesisSourceConnectorConfig(settings); this.kinesisClient = this.kinesisClientFactory.create(this.config); this.sourcePartition = ImmutableMap.of(RecordConverter.FIELD_SHARD_ID, this.config.kinesisShardId); Map<String, Object> lastOffset = this.context.offsetStorageReader().offset(this.sourcePartition); GetShardIteratorRequest shardIteratorRequest = new GetShardIteratorRequest() .withShardId(this.config.kinesisShardId) .withStreamName(this.config.kinesisStreamName); if (null != lastOffset && !lastOffset.isEmpty()) { String startingSequenceNumber = (String) lastOffset.get(RecordConverter.FIELD_SEQUENCE_NUMBER); log.info("start() - Starting iterator after last processed sequence number of '{}'", startingSequenceNumber); shardIteratorRequest.setShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); shardIteratorRequest.setStartingSequenceNumber(startingSequenceNumber); } else { log.info("start() - Setting Shard Iterator Type to {} for {}", this.config.kinesisPosition, this.config.kinesisShardId); shardIteratorRequest.setShardIteratorType(this.config.kinesisPosition); } GetShardIteratorResult shardIteratorResult = this.kinesisClient.getShardIterator(shardIteratorRequest); log.info("start() - Using Shard Iterator {}", shardIteratorResult.getShardIterator()); this.recordsRequest = new GetRecordsRequest() .withLimit(this.config.kinesisRecordLimit) .withShardIterator(shardIteratorResult.getShardIterator()); this.recordConverter = new RecordConverter(this.config); }
@Test public void sourceOffsets() throws InterruptedException { final String SEQUENCE_NUMBER = "asdfasdfddsa"; Map<String, Object> sourceOffset = ImmutableMap.of(RecordConverter.FIELD_SEQUENCE_NUMBER, SEQUENCE_NUMBER); when(this.offsetStorageReader.offset(anyMap())).thenReturn(sourceOffset); when(this.kinesisClient.getShardIterator(any())).thenReturn( new GetShardIteratorResult().withShardIterator("dfasdfsadfasdf") ); this.task.start(settings); GetRecordsResult recordsResult = new GetRecordsResult() .withNextShardIterator("dsfargadsfasdfasda") .withRecords(TestData.record()) .withMillisBehindLatest(0L); when(this.kinesisClient.getRecords(any())).thenReturn(recordsResult); List<SourceRecord> records = this.task.poll(); assertNotNull(records, "records should not be null."); assertFalse(records.isEmpty(), "records should not be empty."); verify(this.offsetStorageReader, atLeastOnce()).offset(anyMap()); GetShardIteratorRequest expectedIteratorRequest = new GetShardIteratorRequest() .withShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER) .withShardId(this.config.kinesisShardId) .withStreamName(this.config.kinesisStreamName) .withStartingSequenceNumber(SEQUENCE_NUMBER); verify(this.kinesisClient, atLeastOnce()).getShardIterator(expectedIteratorRequest); }
@Test public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception { given(kinesis.getShardIterator(new GetShardIteratorRequest() .withStreamName(STREAM) .withShardId(SHARD_1) .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) .withStartingSequenceNumber(SEQUENCE_NUMBER) )).willReturn(new GetShardIteratorResult() .withShardIterator(SHARD_ITERATOR)); String stream = underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null); assertThat(stream).isEqualTo(SHARD_ITERATOR); }
@Override public GetShardIteratorResult getShardIterator( GetShardIteratorRequest getShardIteratorRequest) { ShardIteratorType shardIteratorType = ShardIteratorType.fromValue( getShardIteratorRequest.getShardIteratorType()); String shardIterator; if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) { shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0); } else { throw new RuntimeException("Not implemented"); } return new GetShardIteratorResult().withShardIterator(shardIterator); }
/** * {@inheritDoc} */ @Override public String getShardIterator(StreamShardHandle shard, String shardIteratorType, @Nullable Object startingMarker) throws InterruptedException { GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() .withStreamName(shard.getStreamName()) .withShardId(shard.getShard().getShardId()) .withShardIteratorType(shardIteratorType); switch (ShardIteratorType.fromValue(shardIteratorType)) { case TRIM_HORIZON: case LATEST: break; case AT_TIMESTAMP: if (startingMarker instanceof Date) { getShardIteratorRequest.setTimestamp((Date) startingMarker); } else { throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_TIMESTAMP. Must be a Date object."); } break; case AT_SEQUENCE_NUMBER: case AFTER_SEQUENCE_NUMBER: if (startingMarker instanceof String) { getShardIteratorRequest.setStartingSequenceNumber((String) startingMarker); } else { throw new IllegalArgumentException("Invalid object given for GetShardIteratorRequest() when ShardIteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER. Must be a String."); } } return getShardIterator(getShardIteratorRequest); }
private String getShardItertor() { // either return a cached one or get a new one via a GetShardIterator request. if (currentShardIterator == null) { String shardId; //If ShardId supplied use it, else choose first one if (!getEndpoint().getShardId().isEmpty()) { shardId = getEndpoint().getShardId(); } else { DescribeStreamRequest req1 = new DescribeStreamRequest() .withStreamName(getEndpoint().getStreamName()); DescribeStreamResult res1 = getClient().describeStream(req1); shardId = res1.getStreamDescription().getShards().get(0).getShardId(); } LOG.debug("ShardId is: {}", shardId); GetShardIteratorRequest req = new GetShardIteratorRequest() .withStreamName(getEndpoint().getStreamName()) .withShardId(shardId) .withShardIteratorType(getEndpoint().getIteratorType()); if (hasSequenceNumber()) { req.withStartingSequenceNumber(getEndpoint().getSequenceNumber()); } GetShardIteratorResult result = getClient().getShardIterator(req); currentShardIterator = result.getShardIterator(); } LOG.debug("Shard Iterator is: {}", currentShardIterator); return currentShardIterator; }
@Test public void itObtainsAShardIteratorOnFirstPoll() throws Exception { undertest.poll(); final ArgumentCaptor<DescribeStreamRequest> describeStreamReqCap = ArgumentCaptor.forClass(DescribeStreamRequest.class); final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(kinesisClient).describeStream(describeStreamReqCap.capture()); assertThat(describeStreamReqCap.getValue().getStreamName(), is("streamName")); verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture()); assertThat(getShardIteratorReqCap.getValue().getStreamName(), is("streamName")); assertThat(getShardIteratorReqCap.getValue().getShardId(), is("shardId")); assertThat(getShardIteratorReqCap.getValue().getShardIteratorType(), is("LATEST")); }
@Test public void itDoesNotMakeADescribeStreamRequestIfShardIdIsSet() throws Exception { undertest.getEndpoint().setShardId("shardIdPassedAsUrlParam"); undertest.poll(); verify(kinesisClient, never()).describeStream(any(DescribeStreamRequest.class)); final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture()); assertThat(getShardIteratorReqCap.getValue().getStreamName(), is("streamName")); assertThat(getShardIteratorReqCap.getValue().getShardId(), is("shardIdPassedAsUrlParam")); assertThat(getShardIteratorReqCap.getValue().getShardIteratorType(), is("LATEST")); }
@Test public void itUsesTheShardIteratorOnSubsiquentPolls() throws Exception { undertest.poll(); undertest.poll(); final ArgumentCaptor<GetRecordsRequest> getRecordsReqCap = ArgumentCaptor.forClass(GetRecordsRequest.class); verify(kinesisClient, times(1)).describeStream(any(DescribeStreamRequest.class)); verify(kinesisClient, times(1)).getShardIterator(any(GetShardIteratorRequest.class)); verify(kinesisClient, times(2)).getRecords(getRecordsReqCap.capture()); assertThat(getRecordsReqCap.getAllValues().get(0).getShardIterator(), is("shardIterator")); assertThat(getRecordsReqCap.getAllValues().get(1).getShardIterator(), is("nextShardIterator")); }
private void previewProcess( int maxBatchSize, BatchMaker batchMaker ) throws IOException, StageException { ClientConfiguration awsClientConfig = AWSUtil.getClientConfiguration(conf.proxyConfig); String shardId = KinesisUtil.getLastShardId(awsClientConfig, conf, conf.streamName); GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(conf.streamName); getShardIteratorRequest.setShardId(shardId); getShardIteratorRequest.setShardIteratorType(conf.initialPositionInStream.name()); List<com.amazonaws.services.kinesis.model.Record> results = KinesisUtil.getPreviewRecords( awsClientConfig, conf, Math.min(conf.maxBatchSize, maxBatchSize), getShardIteratorRequest ); int batchSize = results.size() > maxBatchSize ? maxBatchSize : results.size(); for (int index = 0; index < batchSize; index++) { com.amazonaws.services.kinesis.model.Record record = results.get(index); UserRecord userRecord = new UserRecord(record); KinesisUtil.processKinesisRecord( getShardIteratorRequest.getShardId(), userRecord, parserFactory ).forEach(batchMaker::addRecord); } }
private void getIterator() throws ResourceNotFoundException { GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(split.getStreamName()); getShardIteratorRequest.setShardId(split.getShardId()); // Explanation: when we have a sequence number from a prior read or checkpoint, always use it. // Otherwise, decide if starting at a timestamp or the trim horizon based on configuration. // If starting at a timestamp, sue the session variable ITER_START_TIMESTAMP when given, otherwise // fallback on starting at ITER_OFFSET_SECONDS from timestamp. if (lastReadSeqNo == null) { // Important: shard iterator type AT_TIMESTAMP requires 1.11.x or above of the AWS SDK. if (SessionVariables.getIterFromTimestamp(session)) { getShardIteratorRequest.setShardIteratorType("AT_TIMESTAMP"); long iterStartTs = SessionVariables.getIterStartTimestamp(session); if (iterStartTs == 0) { long startTs = System.currentTimeMillis() - (SessionVariables.getIterOffsetSeconds(session) * 1000); getShardIteratorRequest.setTimestamp(new Date(startTs)); } else { getShardIteratorRequest.setTimestamp(new Date(iterStartTs)); } } else { getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); } } else { getShardIteratorRequest.setShardIteratorType("AFTER_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(lastReadSeqNo); } GetShardIteratorResult getShardIteratorResult = clientManager.getClient().getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator(); }
@Override public GetShardIteratorResult getShardIterator(GetShardIteratorRequest getShardIteratorRequest) throws AmazonServiceException, AmazonClientException { ShardIterator iter = ShardIterator.fromStreamAndShard(getShardIteratorRequest.getStreamName(), getShardIteratorRequest.getShardId()); if (iter != null) { InternalStream theStream = this.getStream(iter.streamId); if (theStream != null) { String seqAsString = getShardIteratorRequest.getStartingSequenceNumber(); if (seqAsString != null && !seqAsString.isEmpty() && getShardIteratorRequest.getShardIteratorType().equals("AFTER_SEQUENCE_NUMBER")) { int sequence = Integer.parseInt(seqAsString); iter.recordIndex = sequence + 1; } else { iter.recordIndex = 100; } GetShardIteratorResult result = new GetShardIteratorResult(); return result.withShardIterator(iter.makeString()); } else { throw new AmazonClientException("Unknown stream or bad shard iterator!"); } } else { throw new AmazonClientException("Bad stream or shard iterator!"); } }
public static void main(String[] args) { AmazonKinesisClient kinesisClient = Helper.setupKinesisClient(); // Retrieve the Shards from a Stream DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(Helper.properties().getProperty("kinesisStreamName")); DescribeStreamResult describeStreamResult; List<Shard> shards = new ArrayList<>(); String lastShardId = null; do { describeStreamRequest.setExclusiveStartShardId(lastShardId); describeStreamResult = kinesisClient.describeStream(describeStreamRequest); shards.addAll(describeStreamResult.getStreamDescription().getShards()); if (shards.size() > 0) { lastShardId = shards.get(shards.size() - 1).getShardId(); } } while (describeStreamResult.getStreamDescription().getHasMoreShards()); // Get Data from the Shards in a Stream // Hard-coded to use only 1 shard String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(Helper.properties().getProperty("kinesisStreamName")); getShardIteratorRequest.setShardId(shards.get(0).getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator(); // Continuously read data records from shard. List<Record> records; while (true) { // Create new GetRecordsRequest with existing shardIterator. // Set maximum records to return to 1000. GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(1000); GetRecordsResult result = kinesisClient.getRecords(getRecordsRequest); // Put result into record list. Result may be empty. records = result.getRecords(); // Print records for (Record record : records) { ByteBuffer byteBuffer = record.getData(); System.out.println(String.format("Seq No: %s - %s", record.getSequenceNumber(), new String(byteBuffer.array()))); } try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); } }