Java 类com.amazonaws.services.dynamodbv2.model.StreamRecord 实例源码

项目:kafka-connect-dynamodb    文件:DynamoDbSourceTask.java   
private SourceRecord toSourceRecord(Map<String, String> sourcePartition, String topic, StreamRecord dynamoRecord) {
    return new SourceRecord(
            sourcePartition,
            Collections.singletonMap(Keys.SEQNUM, dynamoRecord.getSequenceNumber()),
            topic, null,
            RecordMapper.attributesSchema(), RecordMapper.toConnect(dynamoRecord.getKeys()),
            RecordMapper.attributesSchema(), RecordMapper.toConnect(dynamoRecord.getNewImage()),
            dynamoRecord.getApproximateCreationDateTime().getTime()
    );
}
项目:Camel    文件:DdbStreamConsumerTest.java   
private static Collection<Record> createRecords(String... sequenceNumbers) {
    List<Record> results = new ArrayList<>();

    for (String seqNum : sequenceNumbers) {
        results.add(new Record()
                .withDynamodb(new StreamRecord().withSequenceNumber(seqNum))
        );
    }

    return results;
}
项目:wildfly-camel    文件:DynamoDBStreamsIntegrationTest.java   
@Test
public void testKeyValueOperations() throws Exception {

    AmazonDynamoDBClient ddbClient = ddbProvider.getClient();
    Assume.assumeNotNull("AWS client not null", ddbClient);

    DynamoDBUtils.assertNoStaleTables(ddbClient, "before");

    try {
        try {
            TableDescription description = DynamoDBUtils.createTable(ddbClient, tableName);
            Assert.assertEquals("ACTIVE", description.getTableStatus());

            WildFlyCamelContext camelctx = new WildFlyCamelContext();
            camelctx.getNamingContext().bind("ddbClientB", ddbClient);
            camelctx.getNamingContext().bind("dbsClientB", dbsProvider.getClient());

            camelctx.addRoutes(new RouteBuilder() {
                @Override
                public void configure() throws Exception {
                    from("direct:start").to("aws-ddb://" + tableName + "?amazonDDBClient=#ddbClientB");
                    from("aws-ddbstream://" + tableName + "?amazonDynamoDbStreamsClient=#dbsClientB")
                            .to("seda:end");
                }
            });

            PollingConsumer pollingConsumer = camelctx.getEndpoint("seda:end").createPollingConsumer();
            pollingConsumer.start();

            camelctx.start();
            try {
                DynamoDBUtils.putItem(camelctx, "Book 103 Title");

                String result = ((AttributeValue) DynamoDBUtils.getItem(camelctx).get("Title")).getS();
                Assert.assertEquals("Book 103 Title", result);

                Exchange exchange = pollingConsumer.receive(3000);
                Assert.assertNull(exchange);

                DynamoDBUtils.updItem(camelctx, "Book 103 Update");

                result = ((AttributeValue) DynamoDBUtils.getItem(camelctx).get("Title")).getS();
                Assert.assertEquals("Book 103 Update", result);

                exchange = pollingConsumer.receive(3000);
                StreamRecord record = exchange.getIn().getBody(Record.class).getDynamodb();
                Map<String, AttributeValue> oldImage = record.getOldImage();
                Map<String, AttributeValue> newImage = record.getNewImage();
                Assert.assertEquals("Book 103 Title", oldImage.get("Title").getS());
                Assert.assertEquals("Book 103 Update", newImage.get("Title").getS());

            } finally {
                camelctx.stop();
            }
        } finally {
            DynamoDBUtils.deleteTable(ddbClient, tableName);
        }
    } finally {
        DynamoDBUtils.assertNoStaleTables(ddbClient, "after");
    }
}