@Before public void setup() throws Exception { endpoint.setAmazonDynamoDbStreamsClient(amazonDynamoDBStreams); undertest = new DdbStreamConsumer(endpoint, processor, shardIteratorHandler); final Map<String, String> shardIterators = new HashMap<>(); shardIterators.put("shard_iterator_a_000", "shard_iterator_a_001"); shardIterators.put("shard_iterator_b_000", "shard_iterator_b_001"); shardIterators.put("shard_iterator_b_001", "shard_iterator_b_002"); shardIterators.put("shard_iterator_c_000", "shard_iterator_c_001"); shardIterators.put("shard_iterator_d_000", "shard_iterator_d_001"); final Map<String, Collection<Record>> answers = new HashMap<>(); answers.put("shard_iterator_a_001", createRecords("2")); answers.put("shard_iterator_b_000", createRecords("9")); answers.put("shard_iterator_b_001", createRecords("11", "13")); answers.put("shard_iterator_b_002", createRecords("14")); answers.put("shard_iterator_d_000", createRecords("21", "25")); answers.put("shard_iterator_d_001", createRecords("30", "35", "40")); recordsAnswer = new GetRecordsAnswer(shardIterators, answers); when(amazonDynamoDBStreams.getRecords(any(GetRecordsRequest.class))).thenAnswer(recordsAnswer); }
@Test public void itResumesFromAfterTheLastSeenSequenceNumberWhenAShardIteratorHasExpired() throws Exception { endpoint.setIteratorType(ShardIteratorType.LATEST); when(shardIteratorHandler.getShardIterator(anyString())).thenReturn("shard_iterator_b_000", "shard_iterator_b_001", "shard_iterator_b_001"); Mockito.reset(amazonDynamoDBStreams); when(amazonDynamoDBStreams.getRecords(any(GetRecordsRequest.class))) .thenAnswer(recordsAnswer) .thenThrow(new ExpiredIteratorException("expired shard")) .thenAnswer(recordsAnswer); undertest.poll(); undertest.poll(); ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); verify(processor, times(3)).process(exchangeCaptor.capture(), any(AsyncCallback.class)); verify(shardIteratorHandler, times(2)).getShardIterator(null); // first poll. Second poll, getRecords fails with an expired shard. verify(shardIteratorHandler).getShardIterator("9"); // second poll, with a resumeFrom. assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("9")); assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("11")); assertThat(exchangeCaptor.getAllValues().get(2).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("13")); }
@Override public GetRecordsResult answer(InvocationOnMock invocation) throws Throwable { final String shardIterator = ((GetRecordsRequest) invocation.getArguments()[0]).getShardIterator(); // note that HashMap returns null when there is no entry in the map. // A null 'nextShardIterator' indicates that the shard has finished // and we should move onto the next shard. String nextShardIterator = shardIterators.get(shardIterator); Matcher m = Pattern.compile("shard_iterator_d_0*(\\d+)").matcher(shardIterator); Collection<Record> ans = answers.get(shardIterator); if (nextShardIterator == null && m.matches()) { // last shard iterates forever. Integer num = Integer.parseInt(m.group(1)); nextShardIterator = "shard_iterator_d_" + pad(Integer.toString(num + 1), 3); } if (null == ans) { // default to an empty list of records. ans = createRecords(); } return new GetRecordsResult() .withRecords(ans) .withNextShardIterator(nextShardIterator); }
@Override public List<SourceRecord> poll() throws InterruptedException { // TODO rate limiting? if (assignedShards.isEmpty()) { throw new ConnectException("No remaining source shards"); } final String shardId = assignedShards.get(currentShardIdx); final GetRecordsRequest req = new GetRecordsRequest(); req.setShardIterator(shardIterator(shardId)); req.setLimit(100); // TODO configurable final GetRecordsResult rsp = streamsClient.getRecords(req); if (rsp.getNextShardIterator() == null) { log.info("Shard ID `{}` for table `{}` has been closed, it will no longer be polled", shardId, config.tableForShard(shardId)); shardIterators.remove(shardId); assignedShards.remove(shardId); } else { log.debug("Retrieved {} records from shard ID `{}`", rsp.getRecords().size(), shardId); shardIterators.put(shardId, rsp.getNextShardIterator()); } currentShardIdx = (currentShardIdx + 1) % assignedShards.size(); final String tableName = config.tableForShard(shardId); final String topic = config.topicFormat.replace("${table}", tableName); final Map<String, String> sourcePartition = sourcePartition(shardId); return rsp.getRecords().stream() .map(dynamoRecord -> toSourceRecord(sourcePartition, topic, dynamoRecord.getDynamodb())) .collect(Collectors.toList()); }