String getShardIterator(String resumeFromSequenceNumber) { ShardIteratorType iteratorType = getEndpoint().getIteratorType(); String sequenceNumber = getEndpoint().getSequenceNumber(); if (resumeFromSequenceNumber != null) { // Reset things as we're in an error condition. currentShard = null; currentShardIterator = null; iteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER; sequenceNumber = resumeFromSequenceNumber; } // either return a cached one or get a new one via a GetShardIterator request. if (currentShardIterator == null) { ListStreamsResult streamsListResult = getClient().listStreams( new ListStreamsRequest().withTableName(getEndpoint().getTableName()) ); final String streamArn = streamsListResult.getStreams().get(0).getStreamArn(); // XXX assumes there is only one stream DescribeStreamResult streamDescriptionResult = getClient().describeStream( new DescribeStreamRequest().withStreamArn(streamArn) ); shardList.addAll(streamDescriptionResult.getStreamDescription().getShards()); LOG.trace("Current shard is: {} (in {})", currentShard, shardList); if (currentShard == null) { currentShard = resolveNewShard(iteratorType, resumeFromSequenceNumber); } else { currentShard = shardList.nextAfter(currentShard); } shardList.removeOlderThan(currentShard); LOG.trace("Next shard is: {} (in {})", currentShard, shardList); GetShardIteratorResult result = getClient().getShardIterator( buildGetShardIteratorRequest(streamArn, iteratorType, sequenceNumber) ); currentShardIterator = result.getShardIterator(); } LOG.trace("Shard Iterator is: {}", currentShardIterator); return currentShardIterator; }
@Before public void setup() throws Exception { endpoint.setAmazonDynamoDbStreamsClient(amazonDynamoDBStreams); undertest = new ShardIteratorHandler(endpoint); when(amazonDynamoDBStreams.listStreams(any(ListStreamsRequest.class))).thenReturn( new ListStreamsResult() .withStreams(new Stream() .withStreamArn("arn:aws:dynamodb:region:12345:table/table_name/stream/timestamp") ) ); when(amazonDynamoDBStreams.describeStream(any(DescribeStreamRequest.class))).thenReturn( new DescribeStreamResult() .withStreamDescription( new StreamDescription() .withTableName("table_name") .withShards( ShardListTest.createShardsWithSequenceNumbers(null, "a", "1", "5", "b", "8", "15", "c", "16", "16", "d", "20", null ) ) ) ); when(amazonDynamoDBStreams.getShardIterator(any(GetShardIteratorRequest.class))).thenAnswer(new Answer<GetShardIteratorResult>() { @Override public GetShardIteratorResult answer(InvocationOnMock invocation) throws Throwable { return new GetShardIteratorResult() .withShardIterator("shard_iterator_" + ((GetShardIteratorRequest) invocation.getArguments()[0]).getShardId() + "_000"); } }); }