Java 类com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest 实例源码

项目:kafka-connect-dynamodb    文件:DynamoDbSourceTask.java   
private GetShardIteratorRequest getShardIteratorRequest(
        String shardId,
        String streamArn,
        String seqNum
) {
    final GetShardIteratorRequest req = new GetShardIteratorRequest();
    req.setShardId(shardId);
    req.setStreamArn(streamArn);
    if (seqNum == null) {
        req.setShardIteratorType(ShardIteratorType.TRIM_HORIZON);
    } else {
        req.setShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
        req.setSequenceNumber(seqNum);
    }
    return req;
}
项目:Camel    文件:ShardIteratorHandlerTest.java   
@Test
public void trimHorizonWalksAllShards() throws Exception {
    endpoint.setIteratorType(ShardIteratorType.TRIM_HORIZON);

    String[] shardIterators = new String[4];

    for (int i = 0; i < shardIterators.length; ++i) {
        shardIterators[i] = undertest.getShardIterator(null);
        undertest.updateShardIterator(null);
    }

    ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
    verify(amazonDynamoDBStreams, times(4)).getShardIterator(getIteratorCaptor.capture());
    String[] shards = new String[]{"a", "b", "c", "d"};
    for (int i = 0; i < shards.length; ++i) {
        assertThat(getIteratorCaptor.getAllValues().get(i).getShardId(), is(shards[i]));
    }
    assertThat(shardIterators, is(new String[]{"shard_iterator_a_000", "shard_iterator_b_000", "shard_iterator_c_000", "shard_iterator_d_000"}));

}
项目:kafka-connect-dynamodb    文件:DynamoDbSourceTask.java   
private String shardIterator(String shardId) {
    String iterator = shardIterators.get(shardId);
    if (iterator == null) {
        final GetShardIteratorRequest req = getShardIteratorRequest(
                shardId,
                config.streamArnForShard(shardId),
                storedSequenceNumber(sourcePartition(shardId))
        );
        iterator = streamsClient.getShardIterator(req).getShardIterator();
        shardIterators.put(shardId, iterator);
    }
    return iterator;
}
项目:Camel    文件:ShardIteratorHandler.java   
private GetShardIteratorRequest buildGetShardIteratorRequest(final String streamArn, ShardIteratorType iteratorType, String sequenceNumber) {
    GetShardIteratorRequest req = new GetShardIteratorRequest()
            .withStreamArn(streamArn)
            .withShardId(currentShard.getShardId())
            .withShardIteratorType(iteratorType);
    switch (iteratorType) {
    case AFTER_SEQUENCE_NUMBER:
    case AT_SEQUENCE_NUMBER:
        // if you request with a sequence number that is LESS than the
        // start of the shard, you get a HTTP 400 from AWS.
        // So only add the sequence number if the endpoints
        // sequence number is less than or equal to the starting
        // sequence for the shard.
        // Otherwise change the shart iterator type to trim_horizon
        // because we get a 400 when we use one of the
        // {at,after}_sequence_number iterator types and don't supply
        // a sequence number.
        if (BigIntComparisons.Conditions.LTEQ.matches(
                new BigInteger(currentShard.getSequenceNumberRange().getStartingSequenceNumber()),
                new BigInteger(sequenceNumber)
        )) {
            req = req.withSequenceNumber(sequenceNumber);
        } else {
            req = req.withShardIteratorType(ShardIteratorType.TRIM_HORIZON);
        }
        break;
    default:
    }
    return req;
}
项目:Camel    文件:ShardIteratorHandlerTest.java   
@Before
public void setup() throws Exception {
    endpoint.setAmazonDynamoDbStreamsClient(amazonDynamoDBStreams);
    undertest = new ShardIteratorHandler(endpoint);

    when(amazonDynamoDBStreams.listStreams(any(ListStreamsRequest.class))).thenReturn(
        new ListStreamsResult()
            .withStreams(new Stream()
                    .withStreamArn("arn:aws:dynamodb:region:12345:table/table_name/stream/timestamp")
            )
    );

    when(amazonDynamoDBStreams.describeStream(any(DescribeStreamRequest.class))).thenReturn(
        new DescribeStreamResult()
            .withStreamDescription(
                    new StreamDescription()
                    .withTableName("table_name")
                    .withShards(
                            ShardListTest.createShardsWithSequenceNumbers(null,
                                    "a", "1", "5",
                                    "b", "8", "15",
                                    "c", "16", "16",
                                    "d", "20", null
                            )
                    )
            )
    );

    when(amazonDynamoDBStreams.getShardIterator(any(GetShardIteratorRequest.class))).thenAnswer(new Answer<GetShardIteratorResult>() {
        @Override
        public GetShardIteratorResult answer(InvocationOnMock invocation) throws Throwable {
            return new GetShardIteratorResult()
                    .withShardIterator("shard_iterator_"
                            + ((GetShardIteratorRequest) invocation.getArguments()[0]).getShardId()
                            + "_000");
        }
    });
}
项目:Camel    文件:ShardIteratorHandlerTest.java   
@Test
public void latestOnlyUsesTheLastShard() throws Exception {
    endpoint.setIteratorType(ShardIteratorType.LATEST);

    String shardIterator = undertest.getShardIterator(null);

    ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
    verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
    assertThat(getIteratorCaptor.getValue().getShardId(), is("d"));
    assertThat(shardIterator, is("shard_iterator_d_000"));
}
项目:Camel    文件:ShardIteratorHandlerTest.java   
@Test
public void cachesRecentShardId() throws Exception {
    endpoint.setIteratorType(ShardIteratorType.LATEST);

    undertest.updateShardIterator("bar");
    String shardIterator = undertest.getShardIterator(null);

    verify(amazonDynamoDBStreams, times(0)).getShardIterator(any(GetShardIteratorRequest.class));
    assertThat(shardIterator, is("bar"));
}
项目:Camel    文件:ShardIteratorHandlerTest.java   
@Test
public void trimHorizonStartsWithTheFirstShard() throws Exception {
    endpoint.setIteratorType(ShardIteratorType.TRIM_HORIZON);

    String shardIterator = undertest.getShardIterator(null);

    ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
    verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
    assertThat(getIteratorCaptor.getValue().getShardId(), is("a"));
    assertThat(shardIterator, is("shard_iterator_a_000"));
}
项目:Camel    文件:ShardIteratorHandlerTest.java   
@Test
public void atSeqNumber12StartsWithShardB() throws Exception {
    endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
    endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("12"));

    String shardIterator = undertest.getShardIterator(null);

    ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
    verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
    assertThat(getIteratorCaptor.getValue().getShardId(), is("b"));
    assertThat(shardIterator, is("shard_iterator_b_000"));
}
项目:Camel    文件:ShardIteratorHandlerTest.java   
@Test
public void afterSeqNumber16StartsWithShardD() throws Exception {
    endpoint.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
    endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("16"));

    String shardIterator = undertest.getShardIterator(null);

    ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
    verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
    assertThat(getIteratorCaptor.getValue().getShardId(), is("d"));
    assertThat(shardIterator, is("shard_iterator_d_000"));
}
项目:Camel    文件:ShardIteratorHandlerTest.java   
@Test
public void resumingFromSomewhereActuallyUsesTheAfterSequenceNumber() throws Exception {
    endpoint.setIteratorType(ShardIteratorType.LATEST);

    String shardIterator = undertest.getShardIterator("12");

    ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
    verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
    assertThat(getIteratorCaptor.getValue().getShardId(), is("b"));
    assertThat(shardIterator, is("shard_iterator_b_000"));
    assertThat(getIteratorCaptor.getValue().getShardIteratorType(), is(ShardIteratorType.AFTER_SEQUENCE_NUMBER.name()));
    assertThat(getIteratorCaptor.getValue().getSequenceNumber(), is("12"));
}