public KafkaDynamoStreamAdapter(String regionName, String srcTable, IRecordProcessorFactory processorFactory) { sourceTable = srcTable; credentialsProvider = new DefaultAWSCredentialsProviderChain(); recordProcessorFactory = processorFactory; adapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, new ClientConfiguration()); dynamoDBClient = new AmazonDynamoDBClient(credentialsProvider, new ClientConfiguration()); cloudWatchClient = new AmazonCloudWatchClient(credentialsProvider, new ClientConfiguration()); if ("local".equalsIgnoreCase(regionName)) { setClientEndpoints(localddbEndpoint); } else if (regionName != null) { Region region = Region.getRegion(Regions.fromName(regionName)); adapterClient.setRegion(region); dynamoDBClient.setRegion(region); cloudWatchClient.setRegion(region); } }
public void run() throws Exception { adapterClient = new AmazonDynamoDBStreamsAdapterClient(new ClientConfiguration()); adapterClient.setEndpoint(streamsEndpoint); dynamoDBClient = new AmazonDynamoDBClient(new ClientConfiguration()); dynamoDBClient.setEndpoint(dynamodbEndpoint); cloudWatchClient = new AmazonCloudWatchClient(dynamoDBCredentials, new ClientConfiguration()); TcpDiscoverySpi spi = new TcpDiscoverySpi(); TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); List<String> hostList = Arrays.asList(Properties.getString("hostList").split(",")); ipFinder.setAddresses(hostList); spi.setIpFinder(ipFinder); IgniteConfiguration cfg = new IgniteConfiguration(); cfg.setDiscoverySpi(spi); cfg.setClientMode(true); cfg.setPeerClassLoadingEnabled(true); @SuppressWarnings("unused") Ignite ignite = Ignition.start(cfg); cache = Ignition.ignite().cache(Properties.getString("cacheName")); LOG.info(">>> cache acquired"); recordProcessorFactory = new StreamsRecordProcessorFactory(cache); workerConfig = new KinesisClientLibConfiguration(Properties.getString("applicationName"), streamArn, streamsCredentials, "ddbstreamsworker") .withMaxRecords(Integer.parseInt(Properties.getString("maxRecords"))) .withInitialPositionInStream( InitialPositionInStream.valueOf(Properties.getString("initialPositionInStream"))); LOG.info("Creating worker for stream: " + streamArn); worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient); LOG.info("Starting worker..."); int exitCode = 0; try { worker.run(); } catch (Throwable t) { LOG.error("Caught throwable while processing data."); t.printStackTrace(); exitCode = 1; } System.exit(exitCode); }
/** * @param args */ public static void main(String[] args) throws Exception { System.out.println("Starting demo..."); String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; streamsCredentials = new ProfileCredentialsProvider(); dynamoDBCredentials = new ProfileCredentialsProvider(); recordProcessorFactory = new StreamsRecordProcessorFactory(dynamoDBCredentials, dynamodbEndpoint, serviceName, destTable); /* ===== REQUIRED ===== * Users will have to explicitly instantiate and configure the adapter, then pass it to * the KCL worker. */ adapterClient = new AmazonDynamoDBStreamsAdapterClient(streamsCredentials, new ClientConfiguration()); adapterClient.setEndpoint(streamsEndpoint); dynamoDBClient = new AmazonDynamoDBClient(dynamoDBCredentials, new ClientConfiguration()); dynamoDBClient.setEndpoint(dynamodbEndpoint); cloudWatchClient = new AmazonCloudWatchClient(dynamoDBCredentials, new ClientConfiguration()); setUpTables(); workerConfig = new KinesisClientLibConfiguration("streams-adapter-demo", streamArn, streamsCredentials, "streams-demo-worker") .withMaxRecords(1) .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); System.out.println("Creating worker for stream: " + streamArn); worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient); System.out.println("Starting worker..."); Thread t = new Thread(worker); t.start(); Thread.sleep(25000); worker.shutdown(); t.join(); if(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems().equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); } System.out.println("Done."); cleanupAndExit(0); }