Java 类com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest 实例源码

项目:Camel    文件:ShardIteratorHandler.java   
String getShardIterator(String resumeFromSequenceNumber) {
    ShardIteratorType iteratorType = getEndpoint().getIteratorType();
    String sequenceNumber = getEndpoint().getSequenceNumber();
    if (resumeFromSequenceNumber != null) {
        // Reset things as we're in an error condition.
        currentShard = null;
        currentShardIterator = null;
        iteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER;
        sequenceNumber = resumeFromSequenceNumber;
    }
    // either return a cached one or get a new one via a GetShardIterator request.
    if (currentShardIterator == null) {
        ListStreamsResult streamsListResult = getClient().listStreams(
                new ListStreamsRequest().withTableName(getEndpoint().getTableName())
        );
        final String streamArn = streamsListResult.getStreams().get(0).getStreamArn(); // XXX assumes there is only one stream
        DescribeStreamResult streamDescriptionResult = getClient().describeStream(
                new DescribeStreamRequest().withStreamArn(streamArn)
        );
        shardList.addAll(streamDescriptionResult.getStreamDescription().getShards());

        LOG.trace("Current shard is: {} (in {})", currentShard, shardList);
        if (currentShard == null) {
            currentShard = resolveNewShard(iteratorType, resumeFromSequenceNumber);
        } else {
            currentShard = shardList.nextAfter(currentShard);
        }
        shardList.removeOlderThan(currentShard);
        LOG.trace("Next shard is: {} (in {})", currentShard, shardList);

        GetShardIteratorResult result = getClient().getShardIterator(
                buildGetShardIteratorRequest(streamArn, iteratorType, sequenceNumber)
        );
        currentShardIterator = result.getShardIterator();
    }
    LOG.trace("Shard Iterator is: {}", currentShardIterator);
    return currentShardIterator;
}
项目:Camel    文件:ShardIteratorHandlerTest.java   
@Before
public void setup() throws Exception {
    endpoint.setAmazonDynamoDbStreamsClient(amazonDynamoDBStreams);
    undertest = new ShardIteratorHandler(endpoint);

    when(amazonDynamoDBStreams.listStreams(any(ListStreamsRequest.class))).thenReturn(
        new ListStreamsResult()
            .withStreams(new Stream()
                    .withStreamArn("arn:aws:dynamodb:region:12345:table/table_name/stream/timestamp")
            )
    );

    when(amazonDynamoDBStreams.describeStream(any(DescribeStreamRequest.class))).thenReturn(
        new DescribeStreamResult()
            .withStreamDescription(
                    new StreamDescription()
                    .withTableName("table_name")
                    .withShards(
                            ShardListTest.createShardsWithSequenceNumbers(null,
                                    "a", "1", "5",
                                    "b", "8", "15",
                                    "c", "16", "16",
                                    "d", "20", null
                            )
                    )
            )
    );

    when(amazonDynamoDBStreams.getShardIterator(any(GetShardIteratorRequest.class))).thenAnswer(new Answer<GetShardIteratorResult>() {
        @Override
        public GetShardIteratorResult answer(InvocationOnMock invocation) throws Throwable {
            return new GetShardIteratorResult()
                    .withShardIterator("shard_iterator_"
                            + ((GetShardIteratorRequest) invocation.getArguments()[0]).getShardId()
                            + "_000");
        }
    });
}