private GetShardIteratorRequest getShardIteratorRequest( String shardId, String streamArn, String seqNum ) { final GetShardIteratorRequest req = new GetShardIteratorRequest(); req.setShardId(shardId); req.setStreamArn(streamArn); if (seqNum == null) { req.setShardIteratorType(ShardIteratorType.TRIM_HORIZON); } else { req.setShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); req.setSequenceNumber(seqNum); } return req; }
@Test public void itResumesFromAfterTheLastSeenSequenceNumberWhenAShardIteratorHasExpired() throws Exception { endpoint.setIteratorType(ShardIteratorType.LATEST); when(shardIteratorHandler.getShardIterator(anyString())).thenReturn("shard_iterator_b_000", "shard_iterator_b_001", "shard_iterator_b_001"); Mockito.reset(amazonDynamoDBStreams); when(amazonDynamoDBStreams.getRecords(any(GetRecordsRequest.class))) .thenAnswer(recordsAnswer) .thenThrow(new ExpiredIteratorException("expired shard")) .thenAnswer(recordsAnswer); undertest.poll(); undertest.poll(); ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); verify(processor, times(3)).process(exchangeCaptor.capture(), any(AsyncCallback.class)); verify(shardIteratorHandler, times(2)).getShardIterator(null); // first poll. Second poll, getRecords fails with an expired shard. verify(shardIteratorHandler).getShardIterator("9"); // second poll, with a resumeFrom. assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("9")); assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("11")); assertThat(exchangeCaptor.getAllValues().get(2).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("13")); }
@Test public void atSeqNumber35GivesFirstRecordWithSeq35() throws Exception { endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER); endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("35")); when(shardIteratorHandler.getShardIterator(anyString())).thenReturn("shard_iterator_d_001", "shard_iterator_d_002"); for (int i = 0; i < 10; ++i) { // poll lots. undertest.poll(); } ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); verify(processor, times(2)).process(exchangeCaptor.capture(), any(AsyncCallback.class)); assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("35")); assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("40")); }
@Test public void trimHorizonWalksAllShards() throws Exception { endpoint.setIteratorType(ShardIteratorType.TRIM_HORIZON); String[] shardIterators = new String[4]; for (int i = 0; i < shardIterators.length; ++i) { shardIterators[i] = undertest.getShardIterator(null); undertest.updateShardIterator(null); } ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(amazonDynamoDBStreams, times(4)).getShardIterator(getIteratorCaptor.capture()); String[] shards = new String[]{"a", "b", "c", "d"}; for (int i = 0; i < shards.length; ++i) { assertThat(getIteratorCaptor.getAllValues().get(i).getShardId(), is(shards[i])); } assertThat(shardIterators, is(new String[]{"shard_iterator_a_000", "shard_iterator_b_000", "shard_iterator_c_000", "shard_iterator_d_000"})); }
String getShardIterator(String resumeFromSequenceNumber) { ShardIteratorType iteratorType = getEndpoint().getIteratorType(); String sequenceNumber = getEndpoint().getSequenceNumber(); if (resumeFromSequenceNumber != null) { // Reset things as we're in an error condition. currentShard = null; currentShardIterator = null; iteratorType = ShardIteratorType.AFTER_SEQUENCE_NUMBER; sequenceNumber = resumeFromSequenceNumber; } // either return a cached one or get a new one via a GetShardIterator request. if (currentShardIterator == null) { ListStreamsResult streamsListResult = getClient().listStreams( new ListStreamsRequest().withTableName(getEndpoint().getTableName()) ); final String streamArn = streamsListResult.getStreams().get(0).getStreamArn(); // XXX assumes there is only one stream DescribeStreamResult streamDescriptionResult = getClient().describeStream( new DescribeStreamRequest().withStreamArn(streamArn) ); shardList.addAll(streamDescriptionResult.getStreamDescription().getShards()); LOG.trace("Current shard is: {} (in {})", currentShard, shardList); if (currentShard == null) { currentShard = resolveNewShard(iteratorType, resumeFromSequenceNumber); } else { currentShard = shardList.nextAfter(currentShard); } shardList.removeOlderThan(currentShard); LOG.trace("Next shard is: {} (in {})", currentShard, shardList); GetShardIteratorResult result = getClient().getShardIterator( buildGetShardIteratorRequest(streamArn, iteratorType, sequenceNumber) ); currentShardIterator = result.getShardIterator(); } LOG.trace("Shard Iterator is: {}", currentShardIterator); return currentShardIterator; }
private GetShardIteratorRequest buildGetShardIteratorRequest(final String streamArn, ShardIteratorType iteratorType, String sequenceNumber) { GetShardIteratorRequest req = new GetShardIteratorRequest() .withStreamArn(streamArn) .withShardId(currentShard.getShardId()) .withShardIteratorType(iteratorType); switch (iteratorType) { case AFTER_SEQUENCE_NUMBER: case AT_SEQUENCE_NUMBER: // if you request with a sequence number that is LESS than the // start of the shard, you get a HTTP 400 from AWS. // So only add the sequence number if the endpoints // sequence number is less than or equal to the starting // sequence for the shard. // Otherwise change the shart iterator type to trim_horizon // because we get a 400 when we use one of the // {at,after}_sequence_number iterator types and don't supply // a sequence number. if (BigIntComparisons.Conditions.LTEQ.matches( new BigInteger(currentShard.getSequenceNumberRange().getStartingSequenceNumber()), new BigInteger(sequenceNumber) )) { req = req.withSequenceNumber(sequenceNumber); } else { req = req.withShardIteratorType(ShardIteratorType.TRIM_HORIZON); } break; default: } return req; }
private Shard resolveNewShard(ShardIteratorType type, String resumeFrom) { switch(type) { case AFTER_SEQUENCE_NUMBER: return shardList.afterSeq(resumeFrom != null ? resumeFrom : getEndpoint().getSequenceNumber()); case AT_SEQUENCE_NUMBER: return shardList.atSeq(getEndpoint().getSequenceNumber()); case TRIM_HORIZON: return shardList.first(); case LATEST: default: return shardList.last(); } }
@Test public void afterSeqNumber35GivesFirstRecordWithSeq40() throws Exception { endpoint.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("35")); when(shardIteratorHandler.getShardIterator(anyString())).thenReturn("shard_iterator_d_001", "shard_iterator_d_002"); for (int i = 0; i < 10; ++i) { // poll lots. undertest.poll(); } ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); verify(processor, times(1)).process(exchangeCaptor.capture(), any(AsyncCallback.class)); assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("40")); }
@Test public void latestOnlyUsesTheLastShard() throws Exception { endpoint.setIteratorType(ShardIteratorType.LATEST); String shardIterator = undertest.getShardIterator(null); ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); assertThat(getIteratorCaptor.getValue().getShardId(), is("d")); assertThat(shardIterator, is("shard_iterator_d_000")); }
@Test public void cachesRecentShardId() throws Exception { endpoint.setIteratorType(ShardIteratorType.LATEST); undertest.updateShardIterator("bar"); String shardIterator = undertest.getShardIterator(null); verify(amazonDynamoDBStreams, times(0)).getShardIterator(any(GetShardIteratorRequest.class)); assertThat(shardIterator, is("bar")); }
@Test public void trimHorizonStartsWithTheFirstShard() throws Exception { endpoint.setIteratorType(ShardIteratorType.TRIM_HORIZON); String shardIterator = undertest.getShardIterator(null); ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); assertThat(getIteratorCaptor.getValue().getShardId(), is("a")); assertThat(shardIterator, is("shard_iterator_a_000")); }
@Test public void atSeqNumber12StartsWithShardB() throws Exception { endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER); endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("12")); String shardIterator = undertest.getShardIterator(null); ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); assertThat(getIteratorCaptor.getValue().getShardId(), is("b")); assertThat(shardIterator, is("shard_iterator_b_000")); }
@Test public void afterSeqNumber16StartsWithShardD() throws Exception { endpoint.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("16")); String shardIterator = undertest.getShardIterator(null); ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); assertThat(getIteratorCaptor.getValue().getShardId(), is("d")); assertThat(shardIterator, is("shard_iterator_d_000")); }
@Test public void resumingFromSomewhereActuallyUsesTheAfterSequenceNumber() throws Exception { endpoint.setIteratorType(ShardIteratorType.LATEST); String shardIterator = undertest.getShardIterator("12"); ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class); verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture()); assertThat(getIteratorCaptor.getValue().getShardId(), is("b")); assertThat(shardIterator, is("shard_iterator_b_000")); assertThat(getIteratorCaptor.getValue().getShardIteratorType(), is(ShardIteratorType.AFTER_SEQUENCE_NUMBER.name())); assertThat(getIteratorCaptor.getValue().getSequenceNumber(), is("12")); }
public ShardIteratorType getIteratorType() { return iteratorType; }
public void setIteratorType(ShardIteratorType iteratorType) { this.iteratorType = iteratorType; }