Java 类com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient 实例源码

项目:dynamodb-streams-kafka    文件:KafkaDynamoStreamAdapter.java   
public KafkaDynamoStreamAdapter(String regionName, String srcTable, IRecordProcessorFactory processorFactory) {
    sourceTable = srcTable;
    credentialsProvider = new DefaultAWSCredentialsProviderChain();
    recordProcessorFactory = processorFactory;

    adapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, new ClientConfiguration());
    dynamoDBClient = new AmazonDynamoDBClient(credentialsProvider, new ClientConfiguration());
    cloudWatchClient = new AmazonCloudWatchClient(credentialsProvider, new ClientConfiguration());

    if ("local".equalsIgnoreCase(regionName)) {
        setClientEndpoints(localddbEndpoint);
    } else if (regionName != null) {
        Region region = Region.getRegion(Regions.fromName(regionName));
        adapterClient.setRegion(region);
        dynamoDBClient.setRegion(region);
        cloudWatchClient.setRegion(region);
    }
}
项目:aws-big-data-blog    文件:AmazonDynamoDBStreamstoIgnite.java   
public void run() throws Exception {
    adapterClient = new AmazonDynamoDBStreamsAdapterClient(new ClientConfiguration());
    adapterClient.setEndpoint(streamsEndpoint);
    dynamoDBClient = new AmazonDynamoDBClient(new ClientConfiguration());
    dynamoDBClient.setEndpoint(dynamodbEndpoint);

    cloudWatchClient = new AmazonCloudWatchClient(dynamoDBCredentials, new ClientConfiguration());

    TcpDiscoverySpi spi = new TcpDiscoverySpi();
    TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
    List<String> hostList = Arrays.asList(Properties.getString("hostList").split(","));
    ipFinder.setAddresses(hostList);
    spi.setIpFinder(ipFinder);
    IgniteConfiguration cfg = new IgniteConfiguration();
    cfg.setDiscoverySpi(spi);
    cfg.setClientMode(true);
    cfg.setPeerClassLoadingEnabled(true);

    @SuppressWarnings("unused")
    Ignite ignite = Ignition.start(cfg);
    cache = Ignition.ignite().cache(Properties.getString("cacheName"));
    LOG.info(">>> cache acquired");

    recordProcessorFactory = new StreamsRecordProcessorFactory(cache);
    workerConfig = new KinesisClientLibConfiguration(Properties.getString("applicationName"), streamArn,
            streamsCredentials, "ddbstreamsworker")
                    .withMaxRecords(Integer.parseInt(Properties.getString("maxRecords")))
                    .withInitialPositionInStream(
                            InitialPositionInStream.valueOf(Properties.getString("initialPositionInStream")));

    LOG.info("Creating worker for stream: " + streamArn);
    worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient);
    LOG.info("Starting worker...");

    int exitCode = 0;
    try {
        worker.run();
    } catch (Throwable t) {
        LOG.error("Caught throwable while processing data.");
        t.printStackTrace();
        exitCode = 1;
    }
    System.exit(exitCode);
}
项目:aws-dynamodb-examples    文件:StreamsAdapterDemo.java   
/**
 * @param args
 */
public static void main(String[] args) throws Exception {
    System.out.println("Starting demo...");

    String srcTable = tablePrefix + "-src";
    String destTable = tablePrefix + "-dest";
    streamsCredentials = new ProfileCredentialsProvider();
    dynamoDBCredentials = new ProfileCredentialsProvider();
    recordProcessorFactory = new StreamsRecordProcessorFactory(dynamoDBCredentials, dynamodbEndpoint, serviceName, destTable);


    /* ===== REQUIRED =====
     * Users will have to explicitly instantiate and configure the adapter, then pass it to
     * the KCL worker.
     */
    adapterClient = new AmazonDynamoDBStreamsAdapterClient(streamsCredentials, new ClientConfiguration());
    adapterClient.setEndpoint(streamsEndpoint);

    dynamoDBClient = new AmazonDynamoDBClient(dynamoDBCredentials, new ClientConfiguration());
    dynamoDBClient.setEndpoint(dynamodbEndpoint);

    cloudWatchClient = new AmazonCloudWatchClient(dynamoDBCredentials, new ClientConfiguration());

    setUpTables();

    workerConfig = new KinesisClientLibConfiguration("streams-adapter-demo",
            streamArn, streamsCredentials, "streams-demo-worker")
        .withMaxRecords(1)
        .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);

    System.out.println("Creating worker for stream: " + streamArn);
    worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient);
    System.out.println("Starting worker...");
    Thread t = new Thread(worker);
    t.start();

    Thread.sleep(25000);
    worker.shutdown();
    t.join();

    if(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems().equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
        System.out.println("Scan result is equal.");
    } else {
        System.out.println("Tables are different!");
    }

    System.out.println("Done.");
    cleanupAndExit(0);
}