@Before public void setup() throws Exception { endpoint.setAmazonDynamoDbStreamsClient(amazonDynamoDBStreams); undertest = new DdbStreamConsumer(endpoint, processor, shardIteratorHandler); final Map<String, String> shardIterators = new HashMap<>(); shardIterators.put("shard_iterator_a_000", "shard_iterator_a_001"); shardIterators.put("shard_iterator_b_000", "shard_iterator_b_001"); shardIterators.put("shard_iterator_b_001", "shard_iterator_b_002"); shardIterators.put("shard_iterator_c_000", "shard_iterator_c_001"); shardIterators.put("shard_iterator_d_000", "shard_iterator_d_001"); final Map<String, Collection<Record>> answers = new HashMap<>(); answers.put("shard_iterator_a_001", createRecords("2")); answers.put("shard_iterator_b_000", createRecords("9")); answers.put("shard_iterator_b_001", createRecords("11", "13")); answers.put("shard_iterator_b_002", createRecords("14")); answers.put("shard_iterator_d_000", createRecords("21", "25")); answers.put("shard_iterator_d_001", createRecords("30", "35", "40")); recordsAnswer = new GetRecordsAnswer(shardIterators, answers); when(amazonDynamoDBStreams.getRecords(any(GetRecordsRequest.class))).thenAnswer(recordsAnswer); }
@Test public void itResumesFromAfterTheLastSeenSequenceNumberWhenAShardIteratorHasExpired() throws Exception { endpoint.setIteratorType(ShardIteratorType.LATEST); when(shardIteratorHandler.getShardIterator(anyString())).thenReturn("shard_iterator_b_000", "shard_iterator_b_001", "shard_iterator_b_001"); Mockito.reset(amazonDynamoDBStreams); when(amazonDynamoDBStreams.getRecords(any(GetRecordsRequest.class))) .thenAnswer(recordsAnswer) .thenThrow(new ExpiredIteratorException("expired shard")) .thenAnswer(recordsAnswer); undertest.poll(); undertest.poll(); ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); verify(processor, times(3)).process(exchangeCaptor.capture(), any(AsyncCallback.class)); verify(shardIteratorHandler, times(2)).getShardIterator(null); // first poll. Second poll, getRecords fails with an expired shard. verify(shardIteratorHandler).getShardIterator("9"); // second poll, with a resumeFrom. assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("9")); assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("11")); assertThat(exchangeCaptor.getAllValues().get(2).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("13")); }
@Test public void atSeqNumber35GivesFirstRecordWithSeq35() throws Exception { endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER); endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("35")); when(shardIteratorHandler.getShardIterator(anyString())).thenReturn("shard_iterator_d_001", "shard_iterator_d_002"); for (int i = 0; i < 10; ++i) { // poll lots. undertest.poll(); } ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); verify(processor, times(2)).process(exchangeCaptor.capture(), any(AsyncCallback.class)); assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("35")); assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("40")); }
@Override public GetRecordsResult answer(InvocationOnMock invocation) throws Throwable { final String shardIterator = ((GetRecordsRequest) invocation.getArguments()[0]).getShardIterator(); // note that HashMap returns null when there is no entry in the map. // A null 'nextShardIterator' indicates that the shard has finished // and we should move onto the next shard. String nextShardIterator = shardIterators.get(shardIterator); Matcher m = Pattern.compile("shard_iterator_d_0*(\\d+)").matcher(shardIterator); Collection<Record> ans = answers.get(shardIterator); if (nextShardIterator == null && m.matches()) { // last shard iterates forever. Integer num = Integer.parseInt(m.group(1)); nextShardIterator = "shard_iterator_d_" + pad(Integer.toString(num + 1), 3); } if (null == ans) { // default to an empty list of records. ans = createRecords(); } return new GetRecordsResult() .withRecords(ans) .withNextShardIterator(nextShardIterator); }
private Queue<Exchange> createExchanges(List<Record> records, String lastSeenSequenceNumber) { Queue<Exchange> exchanges = new ArrayDeque<>(); BigIntComparisons condition = null; BigInteger providedSeqNum = null; if (lastSeenSequenceNumber != null) { providedSeqNum = new BigInteger(lastSeenSequenceNumber); condition = BigIntComparisons.Conditions.LT; } switch(getEndpoint().getIteratorType()) { case AFTER_SEQUENCE_NUMBER: condition = BigIntComparisons.Conditions.LT; providedSeqNum = new BigInteger(getEndpoint().getSequenceNumberProvider().getSequenceNumber()); break; case AT_SEQUENCE_NUMBER: condition = BigIntComparisons.Conditions.LTEQ; providedSeqNum = new BigInteger(getEndpoint().getSequenceNumberProvider().getSequenceNumber()); break; default: } for (Record record : records) { BigInteger recordSeqNum = new BigInteger(record.getDynamodb().getSequenceNumber()); if (condition == null || condition.matches(providedSeqNum, recordSeqNum)) { exchanges.add(getEndpoint().createExchange(record)); } } return exchanges; }
@Test public void afterSeqNumber35GivesFirstRecordWithSeq40() throws Exception { endpoint.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("35")); when(shardIteratorHandler.getShardIterator(anyString())).thenReturn("shard_iterator_d_001", "shard_iterator_d_002"); for (int i = 0; i < 10; ++i) { // poll lots. undertest.poll(); } ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); verify(processor, times(1)).process(exchangeCaptor.capture(), any(AsyncCallback.class)); assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("40")); }
private static Collection<Record> createRecords(String... sequenceNumbers) { List<Record> results = new ArrayList<>(); for (String seqNum : sequenceNumbers) { results.add(new Record() .withDynamodb(new StreamRecord().withSequenceNumber(seqNum)) ); } return results; }
Exchange createExchange(Record record) { Exchange ex = super.createExchange(); ex.getIn().setBody(record, Record.class); return ex; }
GetRecordsAnswer(Map<String, String> shardIterators, Map<String, Collection<Record>> answers) { this.shardIterators = shardIterators; this.answers = answers; }
@Test public void testKeyValueOperations() throws Exception { AmazonDynamoDBClient ddbClient = ddbProvider.getClient(); Assume.assumeNotNull("AWS client not null", ddbClient); DynamoDBUtils.assertNoStaleTables(ddbClient, "before"); try { try { TableDescription description = DynamoDBUtils.createTable(ddbClient, tableName); Assert.assertEquals("ACTIVE", description.getTableStatus()); WildFlyCamelContext camelctx = new WildFlyCamelContext(); camelctx.getNamingContext().bind("ddbClientB", ddbClient); camelctx.getNamingContext().bind("dbsClientB", dbsProvider.getClient()); camelctx.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").to("aws-ddb://" + tableName + "?amazonDDBClient=#ddbClientB"); from("aws-ddbstream://" + tableName + "?amazonDynamoDbStreamsClient=#dbsClientB") .to("seda:end"); } }); PollingConsumer pollingConsumer = camelctx.getEndpoint("seda:end").createPollingConsumer(); pollingConsumer.start(); camelctx.start(); try { DynamoDBUtils.putItem(camelctx, "Book 103 Title"); String result = ((AttributeValue) DynamoDBUtils.getItem(camelctx).get("Title")).getS(); Assert.assertEquals("Book 103 Title", result); Exchange exchange = pollingConsumer.receive(3000); Assert.assertNull(exchange); DynamoDBUtils.updItem(camelctx, "Book 103 Update"); result = ((AttributeValue) DynamoDBUtils.getItem(camelctx).get("Title")).getS(); Assert.assertEquals("Book 103 Update", result); exchange = pollingConsumer.receive(3000); StreamRecord record = exchange.getIn().getBody(Record.class).getDynamodb(); Map<String, AttributeValue> oldImage = record.getOldImage(); Map<String, AttributeValue> newImage = record.getNewImage(); Assert.assertEquals("Book 103 Title", oldImage.get("Title").getS()); Assert.assertEquals("Book 103 Update", newImage.get("Title").getS()); } finally { camelctx.stop(); } } finally { DynamoDBUtils.deleteTable(ddbClient, tableName); } } finally { DynamoDBUtils.assertNoStaleTables(ddbClient, "after"); } }