Java 类com.amazonaws.services.dynamodbv2.model.Record 实例源码

项目:Camel    文件:DdbStreamConsumerTest.java   
@Before
public void setup() throws Exception {
    endpoint.setAmazonDynamoDbStreamsClient(amazonDynamoDBStreams);

    undertest = new DdbStreamConsumer(endpoint, processor, shardIteratorHandler);

    final Map<String, String> shardIterators = new HashMap<>();
    shardIterators.put("shard_iterator_a_000", "shard_iterator_a_001");
    shardIterators.put("shard_iterator_b_000", "shard_iterator_b_001");
    shardIterators.put("shard_iterator_b_001", "shard_iterator_b_002");
    shardIterators.put("shard_iterator_c_000", "shard_iterator_c_001");
    shardIterators.put("shard_iterator_d_000", "shard_iterator_d_001");
    final Map<String, Collection<Record>> answers = new HashMap<>();
    answers.put("shard_iterator_a_001", createRecords("2"));
    answers.put("shard_iterator_b_000", createRecords("9"));
    answers.put("shard_iterator_b_001", createRecords("11", "13"));
    answers.put("shard_iterator_b_002", createRecords("14"));
    answers.put("shard_iterator_d_000", createRecords("21", "25"));
    answers.put("shard_iterator_d_001", createRecords("30", "35", "40"));
    recordsAnswer = new GetRecordsAnswer(shardIterators, answers);
    when(amazonDynamoDBStreams.getRecords(any(GetRecordsRequest.class))).thenAnswer(recordsAnswer);
}
项目:Camel    文件:DdbStreamConsumerTest.java   
@Test
public void itResumesFromAfterTheLastSeenSequenceNumberWhenAShardIteratorHasExpired() throws Exception {
    endpoint.setIteratorType(ShardIteratorType.LATEST);
    when(shardIteratorHandler.getShardIterator(anyString())).thenReturn("shard_iterator_b_000", "shard_iterator_b_001", "shard_iterator_b_001");
    Mockito.reset(amazonDynamoDBStreams);
    when(amazonDynamoDBStreams.getRecords(any(GetRecordsRequest.class)))
            .thenAnswer(recordsAnswer)
            .thenThrow(new ExpiredIteratorException("expired shard"))
            .thenAnswer(recordsAnswer);

    undertest.poll();
    undertest.poll();

    ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class);
    verify(processor, times(3)).process(exchangeCaptor.capture(), any(AsyncCallback.class));
    verify(shardIteratorHandler, times(2)).getShardIterator(null); // first poll. Second poll, getRecords fails with an expired shard.
    verify(shardIteratorHandler).getShardIterator("9"); // second poll, with a resumeFrom.
    assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("9"));
    assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("11"));
    assertThat(exchangeCaptor.getAllValues().get(2).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("13"));
}
项目:Camel    文件:DdbStreamConsumerTest.java   
@Test
public void atSeqNumber35GivesFirstRecordWithSeq35() throws Exception {
    endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
    endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("35"));
    when(shardIteratorHandler.getShardIterator(anyString())).thenReturn("shard_iterator_d_001", "shard_iterator_d_002");

    for (int i = 0; i < 10; ++i) { // poll lots.
        undertest.poll();
    }

    ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class);
    verify(processor, times(2)).process(exchangeCaptor.capture(), any(AsyncCallback.class));

    assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("35"));
    assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("40"));
}
项目:Camel    文件:DdbStreamConsumerTest.java   
@Override
public GetRecordsResult answer(InvocationOnMock invocation) throws Throwable {
    final String shardIterator = ((GetRecordsRequest) invocation.getArguments()[0]).getShardIterator();
    // note that HashMap returns null when there is no entry in the map.
    // A null 'nextShardIterator' indicates that the shard has finished
    // and we should move onto the next shard.
    String nextShardIterator = shardIterators.get(shardIterator);
    Matcher m = Pattern.compile("shard_iterator_d_0*(\\d+)").matcher(shardIterator);
    Collection<Record> ans = answers.get(shardIterator);
    if (nextShardIterator == null && m.matches()) { // last shard iterates forever.
        Integer num = Integer.parseInt(m.group(1));
        nextShardIterator = "shard_iterator_d_" + pad(Integer.toString(num + 1), 3);
    }
    if (null == ans) { // default to an empty list of records.
        ans = createRecords();
    }
    return new GetRecordsResult()
            .withRecords(ans)
            .withNextShardIterator(nextShardIterator);
}
项目:Camel    文件:DdbStreamConsumer.java   
private Queue<Exchange> createExchanges(List<Record> records, String lastSeenSequenceNumber) {
    Queue<Exchange> exchanges = new ArrayDeque<>();
    BigIntComparisons condition = null;
    BigInteger providedSeqNum = null;
    if (lastSeenSequenceNumber != null) {
        providedSeqNum = new BigInteger(lastSeenSequenceNumber);
        condition = BigIntComparisons.Conditions.LT;
    }
    switch(getEndpoint().getIteratorType()) {
    case AFTER_SEQUENCE_NUMBER:
        condition = BigIntComparisons.Conditions.LT;
        providedSeqNum = new BigInteger(getEndpoint().getSequenceNumberProvider().getSequenceNumber());
        break;
    case AT_SEQUENCE_NUMBER:
        condition = BigIntComparisons.Conditions.LTEQ;
        providedSeqNum = new BigInteger(getEndpoint().getSequenceNumberProvider().getSequenceNumber());
        break;
    default:
    }
    for (Record record : records) {
        BigInteger recordSeqNum = new BigInteger(record.getDynamodb().getSequenceNumber());
        if (condition == null || condition.matches(providedSeqNum, recordSeqNum)) {
            exchanges.add(getEndpoint().createExchange(record));
        }
    }
    return exchanges;
}
项目:Camel    文件:DdbStreamConsumerTest.java   
@Test
public void afterSeqNumber35GivesFirstRecordWithSeq40() throws Exception {
    endpoint.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
    endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("35"));
    when(shardIteratorHandler.getShardIterator(anyString())).thenReturn("shard_iterator_d_001", "shard_iterator_d_002");

    for (int i = 0; i < 10; ++i) { // poll lots.
        undertest.poll();
    }

    ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class);
    verify(processor, times(1)).process(exchangeCaptor.capture(), any(AsyncCallback.class));

    assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("40"));
}
项目:Camel    文件:DdbStreamConsumerTest.java   
private static Collection<Record> createRecords(String... sequenceNumbers) {
    List<Record> results = new ArrayList<>();

    for (String seqNum : sequenceNumbers) {
        results.add(new Record()
                .withDynamodb(new StreamRecord().withSequenceNumber(seqNum))
        );
    }

    return results;
}
项目:Camel    文件:DdbStreamEndpoint.java   
Exchange createExchange(Record record) {
    Exchange ex = super.createExchange();
    ex.getIn().setBody(record, Record.class);

    return ex;
}
项目:Camel    文件:DdbStreamConsumerTest.java   
GetRecordsAnswer(Map<String, String> shardIterators, Map<String, Collection<Record>> answers) {
    this.shardIterators = shardIterators;
    this.answers = answers;
}
项目:wildfly-camel    文件:DynamoDBStreamsIntegrationTest.java   
@Test
public void testKeyValueOperations() throws Exception {

    AmazonDynamoDBClient ddbClient = ddbProvider.getClient();
    Assume.assumeNotNull("AWS client not null", ddbClient);

    DynamoDBUtils.assertNoStaleTables(ddbClient, "before");

    try {
        try {
            TableDescription description = DynamoDBUtils.createTable(ddbClient, tableName);
            Assert.assertEquals("ACTIVE", description.getTableStatus());

            WildFlyCamelContext camelctx = new WildFlyCamelContext();
            camelctx.getNamingContext().bind("ddbClientB", ddbClient);
            camelctx.getNamingContext().bind("dbsClientB", dbsProvider.getClient());

            camelctx.addRoutes(new RouteBuilder() {
                @Override
                public void configure() throws Exception {
                    from("direct:start").to("aws-ddb://" + tableName + "?amazonDDBClient=#ddbClientB");
                    from("aws-ddbstream://" + tableName + "?amazonDynamoDbStreamsClient=#dbsClientB")
                            .to("seda:end");
                }
            });

            PollingConsumer pollingConsumer = camelctx.getEndpoint("seda:end").createPollingConsumer();
            pollingConsumer.start();

            camelctx.start();
            try {
                DynamoDBUtils.putItem(camelctx, "Book 103 Title");

                String result = ((AttributeValue) DynamoDBUtils.getItem(camelctx).get("Title")).getS();
                Assert.assertEquals("Book 103 Title", result);

                Exchange exchange = pollingConsumer.receive(3000);
                Assert.assertNull(exchange);

                DynamoDBUtils.updItem(camelctx, "Book 103 Update");

                result = ((AttributeValue) DynamoDBUtils.getItem(camelctx).get("Title")).getS();
                Assert.assertEquals("Book 103 Update", result);

                exchange = pollingConsumer.receive(3000);
                StreamRecord record = exchange.getIn().getBody(Record.class).getDynamodb();
                Map<String, AttributeValue> oldImage = record.getOldImage();
                Map<String, AttributeValue> newImage = record.getNewImage();
                Assert.assertEquals("Book 103 Title", oldImage.get("Title").getS());
                Assert.assertEquals("Book 103 Update", newImage.get("Title").getS());

            } finally {
                camelctx.stop();
            }
        } finally {
            DynamoDBUtils.deleteTable(ddbClient, tableName);
        }
    } finally {
        DynamoDBUtils.assertNoStaleTables(ddbClient, "after");
    }
}