Java 类com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker 实例源码

项目:lumber-mill    文件:KinesisConsumerBootstrap.java   
public void start()  {
    int mb = 1024 * 1024;

    LOG.info("Max memory:           {} mb", Runtime.getRuntime().maxMemory() / mb);
    LOG.info("Starting up Kinesis Consumer... (may take a few seconds)");
    AmazonKinesisClient kinesisClient = new AmazonKinesisClient(kinesisCfg.getKinesisCredentialsProvider(),
            kinesisCfg.getKinesisClientConfiguration());
    AmazonDynamoDBClient dynamoDBClient = new AmazonDynamoDBClient(kinesisCfg.getDynamoDBCredentialsProvider(),
            kinesisCfg.getDynamoDBClientConfiguration());
    AmazonCloudWatch cloudWatchClient = new AmazonCloudWatchClient(kinesisCfg.getCloudWatchCredentialsProvider(),
            kinesisCfg.getCloudWatchClientConfiguration());

    Worker worker = new Worker.Builder()
            .recordProcessorFactory(() -> new RecordProcessor(unitOfWorkListener, exceptionStrategy, metricsCallback, dry))
            .config(kinesisCfg)
            .kinesisClient(kinesisClient)
            .dynamoDBClient(dynamoDBClient)
            .cloudWatchClient(cloudWatchClient)
            .build();

    worker.run();

}
项目:zipkin-aws    文件:KinesisCollector.java   
@Override
public KinesisCollector start() {
  String workerId = null;
  try {
    workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
  } catch (UnknownHostException e) {
    workerId = UUID.randomUUID().toString();
  }
  KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(appName, streamName, credentialsProvider, workerId);
  processor = new KinesisRecordProcessorFactory(collector);
  worker = new Worker.Builder()
      .recordProcessorFactory(processor)
      .config(config)
      .build();

  executor.execute(worker);
  return this;
}
项目:ingestion-service    文件:KafkaKinesisIntegrationTest.java   
private void startKinesisConsumer() throws Exception {
    AWSCredentialsProvider credentialsProvider = new
            DefaultAWSCredentialsProviderChain();

    String region = "eu-west-1";
    logger.info("Starting in Region " + region);

    String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();

    KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
            this.getClass().getName(), TestConstants.stream, credentialsProvider, workerId)
            .withInitialPositionInStream(InitialPositionInStream.LATEST).withRegionName(region);

    IRecordProcessorFactory recordProcessorFactory = new
            RecordFactory();
    worker = new Worker(recordProcessorFactory,
            kinesisClientLibConfiguration);

    es = Executors.newSingleThreadExecutor();
    es.execute(worker);
}
项目:flume-kinesis    文件:KinesisSource.java   
@Override
public void start() {

    IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(getChannelProcessor(),
                                                                                serializer,
                                                                                backOffTimeInMillis,
                                                                                numberRetries,
                                                                                checkpointIntervalMillis);

    worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration);

    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            System.out.println("Shutting down Kinesis client thread...");
            worker.shutdown();
        }
    });

    try{
        worker.run();
    }catch (AmazonClientException e) {
        logger.error("Can't connect to amazon kinesis", e);
        Throwables.propagate(e);
    }

}
项目:Surf    文件:DumpStream.java   
public static void main(String []args) throws Exception{
    // args[0] is expected to be the same sort of property file as needed
    // by Surf for Kinesis: it should contain 
    // aws-access-key-id: <your-access-id>
    // aws-secret-key: <your-secret-key>
    // aws-kinesis-stream-name: <your-stream-name>
    if (args.length != 1){
        errorExit();
    }
    File f = new File(args[0]);
    if(!f.isFile() || !f.canRead()){
        errorExit();
    }

    // Set up disruptor
    final EventHandler<KinesisEvent> handler = new EventHandler<KinesisEvent>() {
        @Override
        public void onEvent(KinesisEvent kinesisEvent, long l, boolean b) throws Exception {
            System.out.println(String.format("Received : %s", kinesisEvent.getData()));
        }
    };
    Worker worker = Util.createWorker(f, handler, "DumpStream");
    worker.run();
}
项目:Surf    文件:Util.java   
public static Worker createWorker(File conf, EventHandler<KinesisEvent> handler, String appName)throws IOException{
    Executor executor = Executors.newCachedThreadPool();
    Disruptor<KinesisEvent> disruptor = new Disruptor<>(KinesisEvent.EVENT_FACTORY, 128, executor);

    disruptor.handleEventsWith(handler);
    RingBuffer<KinesisEvent> buffer = disruptor.start();

    Properties props = new Properties();
    props.load(new FileReader(conf));
    // Generate a unique worker ID
    String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
    String accessid = props.getProperty("aws-access-key-id");
    String secretkey = props.getProperty("aws-secret-key");
    String streamname = props.getProperty("aws-kinesis-stream-name");
    BasicAWSCredentials creds = new BasicAWSCredentials(accessid, secretkey);
    CredProvider credprovider = new CredProvider(creds);
    KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(appName, streamname,  credprovider, workerId);

    Worker worker = new Worker(new RecordProcessorFactory(buffer), config, new MetricsFactory());
    return worker;
}
项目:Surf    文件:PageCount.java   
public static void main( String[] args ) throws Exception
{
    // args[0] is expected to be the same sort of property file as needed
    // by Surf for Kinesis: it should contain 
    // aws-access-key-id: <your-access-id>
    // aws-secret-key: <your-secret-key>
    // aws-kinesis-stream-name: <your-stream-name>
    if (args.length != 1){
        errorExit();
    }
    File f = new File(args[0]);
    if(!f.isFile() || !f.canRead()){
        errorExit();
    }

    // Set up disruptor
    final EventHandler<KinesisEvent> handler = new PageCountHandler();
    Worker worker = Util.createWorker(f, handler, "PageCount");
    worker.run();
}
项目:aws-kinesis-beanstalk-workers    文件:ManagedConsumer.java   
public int run() throws Exception {
    configure();

    System.out.println(String.format("Starting %s", appName));
    LOG.info(String.format("Running %s to process stream %s", appName, streamName));

    IRecordProcessorFactory recordProcessorFactory = new ManagedClientProcessorFactory(
            this.templateProcessor);
    Worker worker = new Worker(recordProcessorFactory, this.config);

    int exitCode = 0;
    int failures = 0;

    // run the worker, tolerating as many failures as is configured
    while (failures < failuresToTolerate || failuresToTolerate == -1) {
        try {
            worker.run();
        } catch (Throwable t) {
            LOG.error("Caught throwable while processing data.", t);

            failures++;

            if (failures < failuresToTolerate) {
                LOG.error("Restarting...");
            }
            exitCode = 1;
        }
    }

    return exitCode;
}
项目:aws-kinesis-zombies    文件:Consumer.java   
private void initKinesis() {
    String pid = ManagementFactory.getRuntimeMXBean().getName();
    pid = pid.indexOf('@') == -1 ? pid : pid.substring(0, pid.indexOf('@'));
    log.info("Creating kinesis consumer with pid {}.", pid);
    KinesisClientLibConfiguration config
            = new KinesisClientLibConfiguration(
                    "Zombies" /* aplication name */,
                    streamName,
                    new DefaultAWSCredentialsProviderChain(),
                    "ZombieConsumer_" + pid /* worker id*/)
            .withRegionName(region)
            .withFailoverTimeMillis(1000 * 30) // after 30 seconds this worker is considered ko                        
            .withMaxLeasesForWorker(2) // forced to read only 1 shard for demo reasons.
            .withMaxRecords(500) // max records per GetRecords
            .withCallProcessRecordsEvenForEmptyRecordList(false) // no records -> no processing
            .withInitialLeaseTableWriteCapacity(10) // Dynamodb lease table capacity
            .withInitialLeaseTableReadCapacity(10)
            .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);

    final Worker worker
            = new Worker.Builder()
            .recordProcessorFactory(zombieRecordFactory)
            .config(config)
            .build();

    new Thread() {
        @Override
        public void run() {
            worker.run();
        }
    }.start();
}
项目:spark-cstar-canaries    文件:JKinesisReceiver.java   
@Override
public void onStart() {
    this.recordProcessorFactory = new EventRecordProcessorFactory(this,
                                                                  workerId,
                                                                  checkpointInterval);

    this.worker = new Worker(this.recordProcessorFactory, this.kclConfig);
    int exitCode = 0;
    try {
        worker.run();
    } catch (Throwable t) {
        exitCode = 1;
    }
    System.exit(exitCode);
}
项目:dynamodb-streams-kafka    文件:KafkaDynamoStreamAdapter.java   
public void run() {
    streamId = enableStreamForTable(dynamoDBClient, StreamViewType.NEW_IMAGE, sourceTable);
    workerConfig = new KinesisClientLibConfiguration(KCL_APP_NAME, streamId, credentialsProvider, KCL_WORKER_NAME)
        .withMaxRecords(maxRecordsPerRead)
        .withIdleTimeBetweenReadsInMillis(idleTimeBetweenReads)
        .withInitialPositionInStream(initialStreamPosition );
    worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient);
    workerThread = new Thread(worker);
    workerThread.start();
}
项目:samza    文件:KinesisSystemConsumer.java   
@Override
public void start() {
  LOG.info("Start samza consumer for system {}.", system);

  metrics.initializeMetrics(streams);

  ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
      .setNameFormat("kinesis-worker-thread-" + system + "-%d")
      .build();
  // launch kinesis workers in separate threads, one per stream
  executorService = Executors.newFixedThreadPool(streams.size(), namedThreadFactory);

  for (String stream : streams) {
    // KCL Dynamodb table is used for storing the state of processing. By default, the table name is the same as the
    // application name. Dynamodb table name must be unique for a given account and region (even across different
    // streams). So, let's create the default one with the combination of job name, job id and stream name. The table
    // name could be changed by providing a different TableName via KCL specific config.
    String kinesisApplicationName =
        kConfig.get(JobConfig.JOB_NAME()) + "-" + kConfig.get(JobConfig.JOB_ID()) + "-" + stream;

    Worker worker = new Worker.Builder()
        .recordProcessorFactory(createRecordProcessorFactory(stream))
        .config(kConfig.getKinesisClientLibConfig(system, stream, kinesisApplicationName))
        .build();

    workers.add(worker);

    // launch kinesis workers in separate thread-pools, one per stream
    executorService.execute(worker);
    LOG.info("Started worker for system {} stream {}.", system, stream);
  }
}
项目:samza    文件:KinesisSystemConsumer.java   
@Override
public void stop() {
  LOG.info("Stop samza consumer for system {}.", system);
  workers.forEach(Worker::shutdown);
  workers.clear();
  executorService.shutdownNow();
  LOG.info("Kinesis system consumer executor service for system {} is shutdown.", system);
}
项目:koupler    文件:KinesisEventConsumer.java   
public KinesisEventConsumer(String propertiesFile, String streamName, String appName, String initialPosition) {
    KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile(propertiesFile);

    InitialPositionInStream position = InitialPositionInStream.valueOf(initialPosition);

    KinesisClientLibConfiguration clientConfig = new KinesisClientLibConfiguration(appName, streamName,
            new DefaultAWSCredentialsProviderChain(), appName)
                    .withRegionName(config.getRegion())
                    .withInitialPositionInStream(position);

    this.builder = new Worker.Builder().recordProcessorFactory(this).config(clientConfig);
}
项目:datacollector    文件:KinesisSource.java   
private Worker createKinesisWorker(IRecordProcessorFactory recordProcessorFactory, int maxBatchSize) {
  KinesisClientLibConfiguration kclConfig =
      new KinesisClientLibConfiguration(
          conf.applicationName,
          conf.streamName,
          credentials,
          getWorkerId()
      );

  kclConfig
      .withMaxRecords(maxBatchSize)
      .withCallProcessRecordsEvenForEmptyRecordList(false)
      .withIdleTimeBetweenReadsInMillis(conf.idleTimeBetweenReads)
      .withInitialPositionInStream(conf.initialPositionInStream)
      .withKinesisClientConfig(clientConfiguration);

  if (conf.initialPositionInStream == InitialPositionInStream.AT_TIMESTAMP) {
    kclConfig.withTimestampAtInitialPositionInStream(new Date(conf.initialTimestamp));
  }

  if (conf.region == AWSRegions.OTHER) {
    kclConfig.withKinesisEndpoint(conf.endpoint);
  } else {
    kclConfig.withRegionName(conf.region.getLabel());
  }

  return new Worker.Builder()
      .recordProcessorFactory(recordProcessorFactory)
      .metricsFactory(metricsFactory)
      .dynamoDBClient(dynamoDBClient)
      .cloudWatchClient(cloudWatchClient)
      .execService(executor)
      .config(kclConfig)
      .build();
}
项目:awsbigdata    文件:KinesisServer.java   
public static void main(String[] args) throws Exception {
       java.security.Security.setProperty("networkaddress.cache.ttl" , "60");

       Server server = new Server();
       ServerConnector connector = new ServerConnector(server);
       connector.setPort(80);
       server.addConnector(connector);

       ResourceHandler resource_handler = new ResourceHandler();
       resource_handler.setResourceBase(".");
       ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
       context.setContextPath("/");
       ServletHolder holderEvents = new ServletHolder("ws-events", MessageProxyServlet.class);
       context.addServlet(holderEvents, "/kinesisapp/*");

       HandlerList handlers = new HandlerList();
       handlers.setHandlers(new Handler[] { resource_handler, context, new DefaultHandler() });
       server.setHandler(handlers);        
       server.start();

       AWSCredentialsProvider credentialsProvider = new EnvironmentVariableCredentialsProvider();
       KinesisClientLibConfiguration kinesisConfig = new KinesisClientLibConfiguration(
            System.getProperty("kinesisapp.name"), 
            System.getProperty("kinesisapp.stream"), 
            credentialsProvider, Long.toString(System.currentTimeMillis()))
            .withKinesisEndpoint(System.getProperty("kinesisapp.endpoint"));

       IRecordProcessorFactory factory = new Factory();
       Worker worker = new Worker(factory, kinesisConfig);
       worker.run();

       /*
    Processor p = new Processor();
    p.test();
    */
}
项目:amazon-kinesis-aggregators    文件:AggregatorConsumer.java   
public int run() throws Exception {
    configure();

    System.out.println(String.format("Starting %s", appName));
    LOG.info(String.format("Running %s to process stream %s", appName,
            streamName));

    IRecordProcessorFactory recordProcessorFactory = new AggregatorProcessorFactory(
            aggGroup);
    worker = new Worker(recordProcessorFactory, this.config);

    int exitCode = 0;
    int failures = 0;

    // run the worker, tolerating as many failures as is configured
    while (failures < failuresToTolerate || failuresToTolerate == -1) {
        try {
            worker.run();
        } catch (Throwable t) {
            LOG.error("Caught throwable while processing data.", t);

            failures++;

            if (failures < failuresToTolerate) {
                LOG.error("Restarting...");
            } else {
                shutdown();
            }
            exitCode = 1;
        }
    }

    return exitCode;
}
项目:aws-big-data-blog    文件:ManagedConsumer.java   
public int run() throws Exception {
    configure();

    System.out.println(String.format("Starting %s", appName));
    LOG.info(String.format("Running %s to process stream %s", appName, streamName));

    IRecordProcessorFactory recordProcessorFactory = new ManagedClientProcessorFactory(
            this.templateProcessor);
    Worker worker = new Worker(recordProcessorFactory, this.config);

    int exitCode = 0;
    int failures = 0;

    // run the worker, tolerating as many failures as is configured
    while (failures < failuresToTolerate || failuresToTolerate == -1) {
        try {
            worker.run();
        } catch (Throwable t) {
            LOG.error("Caught throwable while processing data.", t);

            failures++;

            if (failures < failuresToTolerate) {
                LOG.error("Restarting...");
            }
            exitCode = 1;
        }
    }

    return exitCode;
}
项目:aws-big-data-blog    文件:KinesisApplication.java   
/**
 * @param args Property file with config overrides (e.g. application name, stream name)
 * @throws IOException Thrown if we can't read properties from the specified properties file
 */
public static void main(String[] args) throws IOException {
    String propertiesFile = null;

    if (args.length > 1) {
        System.err.println("Usage: java " + KinesisApplication.class.getName() + " <propertiesFile>");
        System.exit(1);
    } else if (args.length == 1) {
        propertiesFile = args[0];
    }

    configure(propertiesFile);

    System.out.println("Starting " + applicationName);
    LOG.info("Running " + applicationName + " to process stream " + streamName);


    IRecordProcessorFactory recordProcessorFactory = new KinesisRecordProcessorFactory(redisEndpoint, redisPort);
    Worker worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration);



    int exitCode = 0;
    try {
        worker.run();
    } catch (Throwable t) {
        LOG.error("Caught throwable while processing data.", t);
        exitCode = 1;
    }
    System.exit(exitCode);
}
项目:micro-genie    文件:KinesisConsumer.java   
/***
 * Subscribe to a kinesis topic, all consumed events are submitted to the provided handler for processing
 */
@Override
public synchronized void subscribe(final EventHandler handler) {
    Preconditions.checkNotNull(handler, "An eventHandler is required to handle consumed events");
    if(this.worker==null){
        this.worker = new Worker(new KinesisRecordProcessorFactory(this.topic, handler, this.mapper), this.config, this.client, this.dynamoClient, this.cloudwatchClient);
    }
    this.executor.execute(this.worker);
}
项目:Surf    文件:KinesisSource.java   
@Override
public void open(VDSConfiguration ctx) throws Exception {
    _logger.info("Initializing Kinesis Source");
    Executor executor = Executors.newCachedThreadPool();
    Disruptor<KinesisEvent> disruptor = new Disruptor<>(KinesisEvent.EVENT_FACTORY, 128, executor);
    disruptor.handleEventsWith(new EventHandler<KinesisEvent>() {
        @Override
        public void onEvent(KinesisEvent kinesisEvent, long l, boolean b) throws Exception {
            _logger.debug("Received a Kinesis Event");
            _eventList.put(kinesisEvent);
            _logger.debug("eventlist size is now {}", _eventList.size());
        }
    });
    RingBuffer<KinesisEvent> buffer = disruptor.start();

    // Generate a unique worker ID
    String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
    String accessid = ctx.getString("aws-access-key-id");
    String secretkey = ctx.getString("aws-secret-key");
    String streamname = ctx.getString("aws-kinesis-stream-name");
    String appName = ctx.optString("application-name", System.getProperty("nodename"));
    BasicAWSCredentials creds = new BasicAWSCredentials(accessid, secretkey);
    CredProvider credprovider = new CredProvider(creds);
    KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(appName, streamname,  credprovider, workerId)
            .withMaxRecords(MAX_EVENTS);

    _worker = new Worker(new RecordProcessorFactory(buffer), config, new MetricsFactory());
    new Thread(_worker).start();
}
项目:tweetamo    文件:TweetamoServer.java   
/**
 * @param args Property file with config overrides (e.g. application name, stream name)
 * @throws IOException Thrown if we can't read properties from the specified properties file
 */
public static void main(String[] args) throws IOException {
    String propertiesFile = null;

    if (args.length > 1) {
        System.err.println("Usage: java " + TweetamoServer.class.getName() + " <propertiesFile>");
        System.exit(1);
    } else if (args.length == 1) {
        propertiesFile = args[0];
    }

    configure(propertiesFile);

    System.out.println("Starting " + applicationName);
    LOG.info("Running " + applicationName + " to process stream " + streamName);


    IRecordProcessorFactory recordProcessorFactory = new TweetamoRecordProcessorFactory();
     Worker worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration);

    int exitCode = 0;
    try {
        worker.run();
    } catch (Throwable t) {
        LOG.error("Caught throwable while processing data.", t);
        exitCode = 1;
    }
    System.exit(exitCode);
}
项目:datacollector    文件:KinesisSource.java   
@Override
public void destroy() {
  Optional.ofNullable(worker).ifPresent(Worker::shutdown);
  Optional.ofNullable(executor).ifPresent(ExecutorService::shutdownNow);
  super.destroy();
}
项目:aws-big-data-blog    文件:AmazonDynamoDBStreamstoIgnite.java   
public void run() throws Exception {
    adapterClient = new AmazonDynamoDBStreamsAdapterClient(new ClientConfiguration());
    adapterClient.setEndpoint(streamsEndpoint);
    dynamoDBClient = new AmazonDynamoDBClient(new ClientConfiguration());
    dynamoDBClient.setEndpoint(dynamodbEndpoint);

    cloudWatchClient = new AmazonCloudWatchClient(dynamoDBCredentials, new ClientConfiguration());

    TcpDiscoverySpi spi = new TcpDiscoverySpi();
    TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
    List<String> hostList = Arrays.asList(Properties.getString("hostList").split(","));
    ipFinder.setAddresses(hostList);
    spi.setIpFinder(ipFinder);
    IgniteConfiguration cfg = new IgniteConfiguration();
    cfg.setDiscoverySpi(spi);
    cfg.setClientMode(true);
    cfg.setPeerClassLoadingEnabled(true);

    @SuppressWarnings("unused")
    Ignite ignite = Ignition.start(cfg);
    cache = Ignition.ignite().cache(Properties.getString("cacheName"));
    LOG.info(">>> cache acquired");

    recordProcessorFactory = new StreamsRecordProcessorFactory(cache);
    workerConfig = new KinesisClientLibConfiguration(Properties.getString("applicationName"), streamArn,
            streamsCredentials, "ddbstreamsworker")
                    .withMaxRecords(Integer.parseInt(Properties.getString("maxRecords")))
                    .withInitialPositionInStream(
                            InitialPositionInStream.valueOf(Properties.getString("initialPositionInStream")));

    LOG.info("Creating worker for stream: " + streamArn);
    worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient);
    LOG.info("Starting worker...");

    int exitCode = 0;
    try {
        worker.run();
    } catch (Throwable t) {
        LOG.error("Caught throwable while processing data.");
        t.printStackTrace();
        exitCode = 1;
    }
    System.exit(exitCode);
}
项目:aws-dynamodb-examples    文件:StreamsAdapterDemo.java   
/**
 * @param args
 */
public static void main(String[] args) throws Exception {
    System.out.println("Starting demo...");

    String srcTable = tablePrefix + "-src";
    String destTable = tablePrefix + "-dest";
    streamsCredentials = new ProfileCredentialsProvider();
    dynamoDBCredentials = new ProfileCredentialsProvider();
    recordProcessorFactory = new StreamsRecordProcessorFactory(dynamoDBCredentials, dynamodbEndpoint, serviceName, destTable);


    /* ===== REQUIRED =====
     * Users will have to explicitly instantiate and configure the adapter, then pass it to
     * the KCL worker.
     */
    adapterClient = new AmazonDynamoDBStreamsAdapterClient(streamsCredentials, new ClientConfiguration());
    adapterClient.setEndpoint(streamsEndpoint);

    dynamoDBClient = new AmazonDynamoDBClient(dynamoDBCredentials, new ClientConfiguration());
    dynamoDBClient.setEndpoint(dynamodbEndpoint);

    cloudWatchClient = new AmazonCloudWatchClient(dynamoDBCredentials, new ClientConfiguration());

    setUpTables();

    workerConfig = new KinesisClientLibConfiguration("streams-adapter-demo",
            streamArn, streamsCredentials, "streams-demo-worker")
        .withMaxRecords(1)
        .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);

    System.out.println("Creating worker for stream: " + streamArn);
    worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient);
    System.out.println("Starting worker...");
    Thread t = new Thread(worker);
    t.start();

    Thread.sleep(25000);
    worker.shutdown();
    t.join();

    if(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems().equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
        System.out.println("Scan result is equal.");
    } else {
        System.out.println("Tables are different!");
    }

    System.out.println("Done.");
    cleanupAndExit(0);
}