public void start() { int mb = 1024 * 1024; LOG.info("Max memory: {} mb", Runtime.getRuntime().maxMemory() / mb); LOG.info("Starting up Kinesis Consumer... (may take a few seconds)"); AmazonKinesisClient kinesisClient = new AmazonKinesisClient(kinesisCfg.getKinesisCredentialsProvider(), kinesisCfg.getKinesisClientConfiguration()); AmazonDynamoDBClient dynamoDBClient = new AmazonDynamoDBClient(kinesisCfg.getDynamoDBCredentialsProvider(), kinesisCfg.getDynamoDBClientConfiguration()); AmazonCloudWatch cloudWatchClient = new AmazonCloudWatchClient(kinesisCfg.getCloudWatchCredentialsProvider(), kinesisCfg.getCloudWatchClientConfiguration()); Worker worker = new Worker.Builder() .recordProcessorFactory(() -> new RecordProcessor(unitOfWorkListener, exceptionStrategy, metricsCallback, dry)) .config(kinesisCfg) .kinesisClient(kinesisClient) .dynamoDBClient(dynamoDBClient) .cloudWatchClient(cloudWatchClient) .build(); worker.run(); }
@Override public KinesisCollector start() { String workerId = null; try { workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); } catch (UnknownHostException e) { workerId = UUID.randomUUID().toString(); } KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(appName, streamName, credentialsProvider, workerId); processor = new KinesisRecordProcessorFactory(collector); worker = new Worker.Builder() .recordProcessorFactory(processor) .config(config) .build(); executor.execute(worker); return this; }
private void startKinesisConsumer() throws Exception { AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain(); String region = "eu-west-1"; logger.info("Starting in Region " + region); String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration( this.getClass().getName(), TestConstants.stream, credentialsProvider, workerId) .withInitialPositionInStream(InitialPositionInStream.LATEST).withRegionName(region); IRecordProcessorFactory recordProcessorFactory = new RecordFactory(); worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration); es = Executors.newSingleThreadExecutor(); es.execute(worker); }
@Override public void start() { IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(getChannelProcessor(), serializer, backOffTimeInMillis, numberRetries, checkpointIntervalMillis); worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { System.out.println("Shutting down Kinesis client thread..."); worker.shutdown(); } }); try{ worker.run(); }catch (AmazonClientException e) { logger.error("Can't connect to amazon kinesis", e); Throwables.propagate(e); } }
public static void main(String []args) throws Exception{ // args[0] is expected to be the same sort of property file as needed // by Surf for Kinesis: it should contain // aws-access-key-id: <your-access-id> // aws-secret-key: <your-secret-key> // aws-kinesis-stream-name: <your-stream-name> if (args.length != 1){ errorExit(); } File f = new File(args[0]); if(!f.isFile() || !f.canRead()){ errorExit(); } // Set up disruptor final EventHandler<KinesisEvent> handler = new EventHandler<KinesisEvent>() { @Override public void onEvent(KinesisEvent kinesisEvent, long l, boolean b) throws Exception { System.out.println(String.format("Received : %s", kinesisEvent.getData())); } }; Worker worker = Util.createWorker(f, handler, "DumpStream"); worker.run(); }
public static Worker createWorker(File conf, EventHandler<KinesisEvent> handler, String appName)throws IOException{ Executor executor = Executors.newCachedThreadPool(); Disruptor<KinesisEvent> disruptor = new Disruptor<>(KinesisEvent.EVENT_FACTORY, 128, executor); disruptor.handleEventsWith(handler); RingBuffer<KinesisEvent> buffer = disruptor.start(); Properties props = new Properties(); props.load(new FileReader(conf)); // Generate a unique worker ID String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); String accessid = props.getProperty("aws-access-key-id"); String secretkey = props.getProperty("aws-secret-key"); String streamname = props.getProperty("aws-kinesis-stream-name"); BasicAWSCredentials creds = new BasicAWSCredentials(accessid, secretkey); CredProvider credprovider = new CredProvider(creds); KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(appName, streamname, credprovider, workerId); Worker worker = new Worker(new RecordProcessorFactory(buffer), config, new MetricsFactory()); return worker; }
public static void main( String[] args ) throws Exception { // args[0] is expected to be the same sort of property file as needed // by Surf for Kinesis: it should contain // aws-access-key-id: <your-access-id> // aws-secret-key: <your-secret-key> // aws-kinesis-stream-name: <your-stream-name> if (args.length != 1){ errorExit(); } File f = new File(args[0]); if(!f.isFile() || !f.canRead()){ errorExit(); } // Set up disruptor final EventHandler<KinesisEvent> handler = new PageCountHandler(); Worker worker = Util.createWorker(f, handler, "PageCount"); worker.run(); }
public int run() throws Exception { configure(); System.out.println(String.format("Starting %s", appName)); LOG.info(String.format("Running %s to process stream %s", appName, streamName)); IRecordProcessorFactory recordProcessorFactory = new ManagedClientProcessorFactory( this.templateProcessor); Worker worker = new Worker(recordProcessorFactory, this.config); int exitCode = 0; int failures = 0; // run the worker, tolerating as many failures as is configured while (failures < failuresToTolerate || failuresToTolerate == -1) { try { worker.run(); } catch (Throwable t) { LOG.error("Caught throwable while processing data.", t); failures++; if (failures < failuresToTolerate) { LOG.error("Restarting..."); } exitCode = 1; } } return exitCode; }
private void initKinesis() { String pid = ManagementFactory.getRuntimeMXBean().getName(); pid = pid.indexOf('@') == -1 ? pid : pid.substring(0, pid.indexOf('@')); log.info("Creating kinesis consumer with pid {}.", pid); KinesisClientLibConfiguration config = new KinesisClientLibConfiguration( "Zombies" /* aplication name */, streamName, new DefaultAWSCredentialsProviderChain(), "ZombieConsumer_" + pid /* worker id*/) .withRegionName(region) .withFailoverTimeMillis(1000 * 30) // after 30 seconds this worker is considered ko .withMaxLeasesForWorker(2) // forced to read only 1 shard for demo reasons. .withMaxRecords(500) // max records per GetRecords .withCallProcessRecordsEvenForEmptyRecordList(false) // no records -> no processing .withInitialLeaseTableWriteCapacity(10) // Dynamodb lease table capacity .withInitialLeaseTableReadCapacity(10) .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); final Worker worker = new Worker.Builder() .recordProcessorFactory(zombieRecordFactory) .config(config) .build(); new Thread() { @Override public void run() { worker.run(); } }.start(); }
@Override public void onStart() { this.recordProcessorFactory = new EventRecordProcessorFactory(this, workerId, checkpointInterval); this.worker = new Worker(this.recordProcessorFactory, this.kclConfig); int exitCode = 0; try { worker.run(); } catch (Throwable t) { exitCode = 1; } System.exit(exitCode); }
public void run() { streamId = enableStreamForTable(dynamoDBClient, StreamViewType.NEW_IMAGE, sourceTable); workerConfig = new KinesisClientLibConfiguration(KCL_APP_NAME, streamId, credentialsProvider, KCL_WORKER_NAME) .withMaxRecords(maxRecordsPerRead) .withIdleTimeBetweenReadsInMillis(idleTimeBetweenReads) .withInitialPositionInStream(initialStreamPosition ); worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient); workerThread = new Thread(worker); workerThread.start(); }
@Override public void start() { LOG.info("Start samza consumer for system {}.", system); metrics.initializeMetrics(streams); ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("kinesis-worker-thread-" + system + "-%d") .build(); // launch kinesis workers in separate threads, one per stream executorService = Executors.newFixedThreadPool(streams.size(), namedThreadFactory); for (String stream : streams) { // KCL Dynamodb table is used for storing the state of processing. By default, the table name is the same as the // application name. Dynamodb table name must be unique for a given account and region (even across different // streams). So, let's create the default one with the combination of job name, job id and stream name. The table // name could be changed by providing a different TableName via KCL specific config. String kinesisApplicationName = kConfig.get(JobConfig.JOB_NAME()) + "-" + kConfig.get(JobConfig.JOB_ID()) + "-" + stream; Worker worker = new Worker.Builder() .recordProcessorFactory(createRecordProcessorFactory(stream)) .config(kConfig.getKinesisClientLibConfig(system, stream, kinesisApplicationName)) .build(); workers.add(worker); // launch kinesis workers in separate thread-pools, one per stream executorService.execute(worker); LOG.info("Started worker for system {} stream {}.", system, stream); } }
@Override public void stop() { LOG.info("Stop samza consumer for system {}.", system); workers.forEach(Worker::shutdown); workers.clear(); executorService.shutdownNow(); LOG.info("Kinesis system consumer executor service for system {} is shutdown.", system); }
public KinesisEventConsumer(String propertiesFile, String streamName, String appName, String initialPosition) { KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile(propertiesFile); InitialPositionInStream position = InitialPositionInStream.valueOf(initialPosition); KinesisClientLibConfiguration clientConfig = new KinesisClientLibConfiguration(appName, streamName, new DefaultAWSCredentialsProviderChain(), appName) .withRegionName(config.getRegion()) .withInitialPositionInStream(position); this.builder = new Worker.Builder().recordProcessorFactory(this).config(clientConfig); }
private Worker createKinesisWorker(IRecordProcessorFactory recordProcessorFactory, int maxBatchSize) { KinesisClientLibConfiguration kclConfig = new KinesisClientLibConfiguration( conf.applicationName, conf.streamName, credentials, getWorkerId() ); kclConfig .withMaxRecords(maxBatchSize) .withCallProcessRecordsEvenForEmptyRecordList(false) .withIdleTimeBetweenReadsInMillis(conf.idleTimeBetweenReads) .withInitialPositionInStream(conf.initialPositionInStream) .withKinesisClientConfig(clientConfiguration); if (conf.initialPositionInStream == InitialPositionInStream.AT_TIMESTAMP) { kclConfig.withTimestampAtInitialPositionInStream(new Date(conf.initialTimestamp)); } if (conf.region == AWSRegions.OTHER) { kclConfig.withKinesisEndpoint(conf.endpoint); } else { kclConfig.withRegionName(conf.region.getLabel()); } return new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .metricsFactory(metricsFactory) .dynamoDBClient(dynamoDBClient) .cloudWatchClient(cloudWatchClient) .execService(executor) .config(kclConfig) .build(); }
public static void main(String[] args) throws Exception { java.security.Security.setProperty("networkaddress.cache.ttl" , "60"); Server server = new Server(); ServerConnector connector = new ServerConnector(server); connector.setPort(80); server.addConnector(connector); ResourceHandler resource_handler = new ResourceHandler(); resource_handler.setResourceBase("."); ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); context.setContextPath("/"); ServletHolder holderEvents = new ServletHolder("ws-events", MessageProxyServlet.class); context.addServlet(holderEvents, "/kinesisapp/*"); HandlerList handlers = new HandlerList(); handlers.setHandlers(new Handler[] { resource_handler, context, new DefaultHandler() }); server.setHandler(handlers); server.start(); AWSCredentialsProvider credentialsProvider = new EnvironmentVariableCredentialsProvider(); KinesisClientLibConfiguration kinesisConfig = new KinesisClientLibConfiguration( System.getProperty("kinesisapp.name"), System.getProperty("kinesisapp.stream"), credentialsProvider, Long.toString(System.currentTimeMillis())) .withKinesisEndpoint(System.getProperty("kinesisapp.endpoint")); IRecordProcessorFactory factory = new Factory(); Worker worker = new Worker(factory, kinesisConfig); worker.run(); /* Processor p = new Processor(); p.test(); */ }
public int run() throws Exception { configure(); System.out.println(String.format("Starting %s", appName)); LOG.info(String.format("Running %s to process stream %s", appName, streamName)); IRecordProcessorFactory recordProcessorFactory = new AggregatorProcessorFactory( aggGroup); worker = new Worker(recordProcessorFactory, this.config); int exitCode = 0; int failures = 0; // run the worker, tolerating as many failures as is configured while (failures < failuresToTolerate || failuresToTolerate == -1) { try { worker.run(); } catch (Throwable t) { LOG.error("Caught throwable while processing data.", t); failures++; if (failures < failuresToTolerate) { LOG.error("Restarting..."); } else { shutdown(); } exitCode = 1; } } return exitCode; }
/** * @param args Property file with config overrides (e.g. application name, stream name) * @throws IOException Thrown if we can't read properties from the specified properties file */ public static void main(String[] args) throws IOException { String propertiesFile = null; if (args.length > 1) { System.err.println("Usage: java " + KinesisApplication.class.getName() + " <propertiesFile>"); System.exit(1); } else if (args.length == 1) { propertiesFile = args[0]; } configure(propertiesFile); System.out.println("Starting " + applicationName); LOG.info("Running " + applicationName + " to process stream " + streamName); IRecordProcessorFactory recordProcessorFactory = new KinesisRecordProcessorFactory(redisEndpoint, redisPort); Worker worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration); int exitCode = 0; try { worker.run(); } catch (Throwable t) { LOG.error("Caught throwable while processing data.", t); exitCode = 1; } System.exit(exitCode); }
/*** * Subscribe to a kinesis topic, all consumed events are submitted to the provided handler for processing */ @Override public synchronized void subscribe(final EventHandler handler) { Preconditions.checkNotNull(handler, "An eventHandler is required to handle consumed events"); if(this.worker==null){ this.worker = new Worker(new KinesisRecordProcessorFactory(this.topic, handler, this.mapper), this.config, this.client, this.dynamoClient, this.cloudwatchClient); } this.executor.execute(this.worker); }
@Override public void open(VDSConfiguration ctx) throws Exception { _logger.info("Initializing Kinesis Source"); Executor executor = Executors.newCachedThreadPool(); Disruptor<KinesisEvent> disruptor = new Disruptor<>(KinesisEvent.EVENT_FACTORY, 128, executor); disruptor.handleEventsWith(new EventHandler<KinesisEvent>() { @Override public void onEvent(KinesisEvent kinesisEvent, long l, boolean b) throws Exception { _logger.debug("Received a Kinesis Event"); _eventList.put(kinesisEvent); _logger.debug("eventlist size is now {}", _eventList.size()); } }); RingBuffer<KinesisEvent> buffer = disruptor.start(); // Generate a unique worker ID String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); String accessid = ctx.getString("aws-access-key-id"); String secretkey = ctx.getString("aws-secret-key"); String streamname = ctx.getString("aws-kinesis-stream-name"); String appName = ctx.optString("application-name", System.getProperty("nodename")); BasicAWSCredentials creds = new BasicAWSCredentials(accessid, secretkey); CredProvider credprovider = new CredProvider(creds); KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(appName, streamname, credprovider, workerId) .withMaxRecords(MAX_EVENTS); _worker = new Worker(new RecordProcessorFactory(buffer), config, new MetricsFactory()); new Thread(_worker).start(); }
/** * @param args Property file with config overrides (e.g. application name, stream name) * @throws IOException Thrown if we can't read properties from the specified properties file */ public static void main(String[] args) throws IOException { String propertiesFile = null; if (args.length > 1) { System.err.println("Usage: java " + TweetamoServer.class.getName() + " <propertiesFile>"); System.exit(1); } else if (args.length == 1) { propertiesFile = args[0]; } configure(propertiesFile); System.out.println("Starting " + applicationName); LOG.info("Running " + applicationName + " to process stream " + streamName); IRecordProcessorFactory recordProcessorFactory = new TweetamoRecordProcessorFactory(); Worker worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration); int exitCode = 0; try { worker.run(); } catch (Throwable t) { LOG.error("Caught throwable while processing data.", t); exitCode = 1; } System.exit(exitCode); }
@Override public void destroy() { Optional.ofNullable(worker).ifPresent(Worker::shutdown); Optional.ofNullable(executor).ifPresent(ExecutorService::shutdownNow); super.destroy(); }
public void run() throws Exception { adapterClient = new AmazonDynamoDBStreamsAdapterClient(new ClientConfiguration()); adapterClient.setEndpoint(streamsEndpoint); dynamoDBClient = new AmazonDynamoDBClient(new ClientConfiguration()); dynamoDBClient.setEndpoint(dynamodbEndpoint); cloudWatchClient = new AmazonCloudWatchClient(dynamoDBCredentials, new ClientConfiguration()); TcpDiscoverySpi spi = new TcpDiscoverySpi(); TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); List<String> hostList = Arrays.asList(Properties.getString("hostList").split(",")); ipFinder.setAddresses(hostList); spi.setIpFinder(ipFinder); IgniteConfiguration cfg = new IgniteConfiguration(); cfg.setDiscoverySpi(spi); cfg.setClientMode(true); cfg.setPeerClassLoadingEnabled(true); @SuppressWarnings("unused") Ignite ignite = Ignition.start(cfg); cache = Ignition.ignite().cache(Properties.getString("cacheName")); LOG.info(">>> cache acquired"); recordProcessorFactory = new StreamsRecordProcessorFactory(cache); workerConfig = new KinesisClientLibConfiguration(Properties.getString("applicationName"), streamArn, streamsCredentials, "ddbstreamsworker") .withMaxRecords(Integer.parseInt(Properties.getString("maxRecords"))) .withInitialPositionInStream( InitialPositionInStream.valueOf(Properties.getString("initialPositionInStream"))); LOG.info("Creating worker for stream: " + streamArn); worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient); LOG.info("Starting worker..."); int exitCode = 0; try { worker.run(); } catch (Throwable t) { LOG.error("Caught throwable while processing data."); t.printStackTrace(); exitCode = 1; } System.exit(exitCode); }
/** * @param args */ public static void main(String[] args) throws Exception { System.out.println("Starting demo..."); String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; streamsCredentials = new ProfileCredentialsProvider(); dynamoDBCredentials = new ProfileCredentialsProvider(); recordProcessorFactory = new StreamsRecordProcessorFactory(dynamoDBCredentials, dynamodbEndpoint, serviceName, destTable); /* ===== REQUIRED ===== * Users will have to explicitly instantiate and configure the adapter, then pass it to * the KCL worker. */ adapterClient = new AmazonDynamoDBStreamsAdapterClient(streamsCredentials, new ClientConfiguration()); adapterClient.setEndpoint(streamsEndpoint); dynamoDBClient = new AmazonDynamoDBClient(dynamoDBCredentials, new ClientConfiguration()); dynamoDBClient.setEndpoint(dynamodbEndpoint); cloudWatchClient = new AmazonCloudWatchClient(dynamoDBCredentials, new ClientConfiguration()); setUpTables(); workerConfig = new KinesisClientLibConfiguration("streams-adapter-demo", streamArn, streamsCredentials, "streams-demo-worker") .withMaxRecords(1) .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); System.out.println("Creating worker for stream: " + streamArn); worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient); System.out.println("Starting worker..."); Thread t = new Thread(worker); t.start(); Thread.sleep(25000); worker.shutdown(); t.join(); if(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems().equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); } System.out.println("Done."); cleanupAndExit(0); }