private GetShardIteratorRequest getShardIteratorRequest( String shardId, String streamArn, String seqNum ) { final GetShardIteratorRequest req = new GetShardIteratorRequest(); req.setShardId(shardId); req.setStreamArn(streamArn); if (seqNum == null) { req.setShardIteratorType(ShardIteratorType.TRIM_HORIZON); } else { req.setShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); req.setSequenceNumber(seqNum); } return req; }
@Test public void trimHorizonWalksAllShards() throws Exception { endpoint.setIteratorType(ShardIteratorType.TRIM_HORIZON); String[] shardIterators = new String[4]; for (int i = 0; i < shardIterators.length; ++i) { shardIterators[i] = undertest.getShardIterator(null); undertest.updateShardIterator(null); } ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(amazonDynamoDBStreams, times(4)).getShardIterator(getIteratorCaptor.capture()); String[] shards = new String[]{"a", "b", "c", "d"}; for (int i = 0; i < shards.length; ++i) { assertThat(getIteratorCaptor.getAllValues().get(i).getShardId(), is(shards[i])); } assertThat(shardIterators, is(new String[]{"shard_iterator_a_000", "shard_iterator_b_000", "shard_iterator_c_000", "shard_iterator_d_000"})); }
private String shardIterator(String shardId) { String iterator = shardIterators.get(shardId); if (iterator == null) { final GetShardIteratorRequest req = getShardIteratorRequest( shardId, config.streamArnForShard(shardId), storedSequenceNumber(sourcePartition(shardId)) ); iterator = streamsClient.getShardIterator(req).getShardIterator(); shardIterators.put(shardId, iterator); } return iterator; }
private GetShardIteratorRequest buildGetShardIteratorRequest(final String streamArn, ShardIteratorType iteratorType, String sequenceNumber) { GetShardIteratorRequest req = new GetShardIteratorRequest() .withStreamArn(streamArn) .withShardId(currentShard.getShardId()) .withShardIteratorType(iteratorType); switch (iteratorType) { case AFTER_SEQUENCE_NUMBER: case AT_SEQUENCE_NUMBER: // if you request with a sequence number that is LESS than the // start of the shard, you get a HTTP 400 from AWS. // So only add the sequence number if the endpoints // sequence number is less than or equal to the starting // sequence for the shard. // Otherwise change the shart iterator type to trim_horizon // because we get a 400 when we use one of the // {at,after}_sequence_number iterator types and don't supply // a sequence number. if (BigIntComparisons.Conditions.LTEQ.matches( new BigInteger(currentShard.getSequenceNumberRange().getStartingSequenceNumber()), new BigInteger(sequenceNumber) )) { req = req.withSequenceNumber(sequenceNumber); } else { req = req.withShardIteratorType(ShardIteratorType.TRIM_HORIZON); } break; default: } return req; }
@Before public void setup() throws Exception { endpoint.setAmazonDynamoDbStreamsClient(amazonDynamoDBStreams); undertest = new ShardIteratorHandler(endpoint); when(amazonDynamoDBStreams.listStreams(any(ListStreamsRequest.class))).thenReturn( new ListStreamsResult() .withStreams(new Stream() .withStreamArn("arn:aws:dynamodb:region:12345:table/table_name/stream/timestamp") ) ); when(amazonDynamoDBStreams.describeStream(any(DescribeStreamRequest.class))).thenReturn( new DescribeStreamResult() .withStreamDescription( new StreamDescription() .withTableName("table_name") .withShards( ShardListTest.createShardsWithSequenceNumbers(null, "a", "1", "5", "b", "8", "15", "c", "16", "16", "d", "20", null ) ) ) ); when(amazonDynamoDBStreams.getShardIterator(any(GetShardIteratorRequest.class))).thenAnswer(new Answer<GetShardIteratorResult>() { @Override public GetShardIteratorResult answer(InvocationOnMock invocation) throws Throwable { return new GetShardIteratorResult() .withShardIterator("shard_iterator_" + ((GetShardIteratorRequest) invocation.getArguments()[0]).getShardId() + "_000"); } }); }
@Test public void latestOnlyUsesTheLastShard() throws Exception { endpoint.setIteratorType(ShardIteratorType.LATEST); String shardIterator = undertest.getShardIterator(null); ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); assertThat(getIteratorCaptor.getValue().getShardId(), is("d")); assertThat(shardIterator, is("shard_iterator_d_000")); }
@Test public void cachesRecentShardId() throws Exception { endpoint.setIteratorType(ShardIteratorType.LATEST); undertest.updateShardIterator("bar"); String shardIterator = undertest.getShardIterator(null); verify(amazonDynamoDBStreams, times(0)).getShardIterator(any(GetShardIteratorRequest.class)); assertThat(shardIterator, is("bar")); }
@Test public void trimHorizonStartsWithTheFirstShard() throws Exception { endpoint.setIteratorType(ShardIteratorType.TRIM_HORIZON); String shardIterator = undertest.getShardIterator(null); ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); assertThat(getIteratorCaptor.getValue().getShardId(), is("a")); assertThat(shardIterator, is("shard_iterator_a_000")); }
@Test public void atSeqNumber12StartsWithShardB() throws Exception { endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER); endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("12")); String shardIterator = undertest.getShardIterator(null); ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); assertThat(getIteratorCaptor.getValue().getShardId(), is("b")); assertThat(shardIterator, is("shard_iterator_b_000")); }
@Test public void afterSeqNumber16StartsWithShardD() throws Exception { endpoint.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("16")); String shardIterator = undertest.getShardIterator(null); ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); assertThat(getIteratorCaptor.getValue().getShardId(), is("d")); assertThat(shardIterator, is("shard_iterator_d_000")); }
@Test public void resumingFromSomewhereActuallyUsesTheAfterSequenceNumber() throws Exception { endpoint.setIteratorType(ShardIteratorType.LATEST); String shardIterator = undertest.getShardIterator("12"); ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); assertThat(getIteratorCaptor.getValue().getShardId(), is("b")); assertThat(shardIterator, is("shard_iterator_b_000")); assertThat(getIteratorCaptor.getValue().getShardIteratorType(), is(ShardIteratorType.AFTER_SEQUENCE_NUMBER.name())); assertThat(getIteratorCaptor.getValue().getSequenceNumber(), is("12")); }