@Override public KinesisCollector start() { String workerId = null; try { workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); } catch (UnknownHostException e) { workerId = UUID.randomUUID().toString(); } KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(appName, streamName, credentialsProvider, workerId); processor = new KinesisRecordProcessorFactory(collector); worker = new Worker.Builder() .recordProcessorFactory(processor) .config(config) .build(); executor.execute(worker); return this; }
public KinesisConsumerBootstrap(KinesisClientLibConfiguration kinesisCfg, UnitOfWorkListener unitOfWorkListener, ExceptionStrategy exceptionStrategy, Metrics metricsCallback, boolean dry) { this.kinesisCfg = kinesisCfg; this.unitOfWorkListener = unitOfWorkListener; this.exceptionStrategy = exceptionStrategy; this.metricsCallback = metricsCallback; this.dry = dry; String httpsProxy = System.getenv("https_proxy"); if (StringUtils.isNotEmpty(httpsProxy)) { URI proxy = URI.create(httpsProxy); kinesisCfg.getKinesisClientConfiguration().setProxyHost(proxy.getHost()); kinesisCfg.getKinesisClientConfiguration().setProxyPort(proxy.getPort()); kinesisCfg.getDynamoDBClientConfiguration().setProxyHost(proxy.getHost()); kinesisCfg.getDynamoDBClientConfiguration().setProxyPort(proxy.getPort()); kinesisCfg.getCloudWatchClientConfiguration().setProxyHost(proxy.getHost()); kinesisCfg.getCloudWatchClientConfiguration().setProxyPort(proxy.getPort()); } }
private void startKinesisConsumer() throws Exception { AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain(); String region = "eu-west-1"; logger.info("Starting in Region " + region); String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration( this.getClass().getName(), TestConstants.stream, credentialsProvider, workerId) .withInitialPositionInStream(InitialPositionInStream.LATEST).withRegionName(region); IRecordProcessorFactory recordProcessorFactory = new RecordFactory(); worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration); es = Executors.newSingleThreadExecutor(); es.execute(worker); }
public JKinesisReceiver(String applicationName, String streamName, String endpointUrl, String regionName, Duration checkpoint, InitialPositionInStream position) { super(StorageLevel.MEMORY_ONLY_SER()); this.workerId = getHostname() + ":" + String.valueOf(UUID.randomUUID()); this.checkpointInterval = checkpoint; this.initialPosition = position; Region region = RegionUtils.getRegion(regionName); try { this.kclConfig = new KinesisClientLibConfiguration(applicationName, streamName, getCredsProvider(), workerId) .withCommonClientConfig(CLIENT_CONF) .withRegionName(region.getName()) .withKinesisEndpoint(endpointUrl) .withInitialPositionInStream(InitialPositionInStream.LATEST) .withTaskBackoffTimeMillis(500); } catch (Exception ex) { // do absolutely nothing - and feel good about it! // but ... // we'd do something meaningful in a PROD context } }
/** * Get KCL config for a given system stream. * @param system name of the system * @param stream name of the stream * @param appName name of the application * @return Stream scoped KCL configs required to build * {@link KinesisClientLibConfiguration} */ public KinesisClientLibConfiguration getKinesisClientLibConfig(String system, String stream, String appName) { ClientConfiguration clientConfig = getAWSClientConfig(system); String workerId = appName + "-" + UUID.randomUUID(); InitialPositionInStream startPos = InitialPositionInStream.LATEST; AWSCredentialsProvider provider = credentialsProviderForStream(system, stream); KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName, stream, provider, workerId) .withRegionName(getRegion(system, stream).getName()) .withKinesisClientConfig(clientConfig) .withCloudWatchClientConfig(clientConfig) .withDynamoDBClientConfig(clientConfig) .withInitialPositionInStream(startPos) .withCallProcessRecordsEvenForEmptyRecordList(true); // For health monitoring metrics. // First, get system scoped configs for KCL and override with configs set at stream scope. setKinesisClientLibConfigs( subset(String.format(CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, system)), kinesisClientLibConfiguration); setKinesisClientLibConfigs(subset(String.format(CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG, system, stream)), kinesisClientLibConfiguration); return kinesisClientLibConfiguration; }
public static final StreamAggregator newInstance(String streamName, String appName, KinesisClientLibConfiguration config, String namespace, List<TimeHorizon> timeHorizons, AggregatorType aggregatorType, String delimiter, List<Integer> labelIndicies, String labelAttributeAlias, int dateIndex, String dateFormat, String dateAlias, List<Object> summaryIndicies) throws Exception { StringDataExtractor dataExtractor = new CsvDataExtractor(labelIndicies).withDelimiter( delimiter).withDateValueIndex(dateIndex).withDateFormat(dateFormat).withSummaryIndicies( summaryIndicies); dataExtractor.setAggregatorType(aggregatorType); if (labelAttributeAlias != null && !labelAttributeAlias.equals("")) { dataExtractor.withLabelAttributeAlias(labelAttributeAlias); } if (dateAlias != null && !dateAlias.equals("")) { dataExtractor.withDateAttributeAlias(dateAlias); } return new StreamAggregator(streamName, appName, namespace, config, dataExtractor).withTimeHorizon( timeHorizons).withAggregatorType(aggregatorType); }
/*** * * All input parameters are required * * @param topic * @param config * @param client * @param dynamoClient * @param cloudwatchClient */ public KinesisConsumer(final String topic, final KinesisClientLibConfiguration config, final AmazonKinesisClient client, final AmazonDynamoDBClient dynamoClient, final AmazonCloudWatchClient cloudwatchClient, final ObjectMapper mapper){ this.topic = Preconditions.checkNotNull(topic, "A valid kinesis topic is required"); this.config = Preconditions.checkNotNull(config, "KinesisClientLibConfiguration is required"); this.client = Preconditions.checkNotNull(client, "AmazonKinesisClient is required"); this.dynamoClient = Preconditions.checkNotNull(dynamoClient, "AmazonDynamoDBClient is required"); this.cloudwatchClient = Preconditions.checkNotNull(cloudwatchClient, "AmazonCloudWatchClient is required"); this.mapper = Preconditions.checkNotNull(mapper, "ObjectMapper is required"); }
public static Worker createWorker(File conf, EventHandler<KinesisEvent> handler, String appName)throws IOException{ Executor executor = Executors.newCachedThreadPool(); Disruptor<KinesisEvent> disruptor = new Disruptor<>(KinesisEvent.EVENT_FACTORY, 128, executor); disruptor.handleEventsWith(handler); RingBuffer<KinesisEvent> buffer = disruptor.start(); Properties props = new Properties(); props.load(new FileReader(conf)); // Generate a unique worker ID String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); String accessid = props.getProperty("aws-access-key-id"); String secretkey = props.getProperty("aws-secret-key"); String streamname = props.getProperty("aws-kinesis-stream-name"); BasicAWSCredentials creds = new BasicAWSCredentials(accessid, secretkey); CredProvider credprovider = new CredProvider(creds); KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(appName, streamname, credprovider, workerId); Worker worker = new Worker(new RecordProcessorFactory(buffer), config, new MetricsFactory()); return worker; }
private void initKinesis() { String pid = ManagementFactory.getRuntimeMXBean().getName(); pid = pid.indexOf('@') == -1 ? pid : pid.substring(0, pid.indexOf('@')); log.info("Creating kinesis consumer with pid {}.", pid); KinesisClientLibConfiguration config = new KinesisClientLibConfiguration( "Zombies" /* aplication name */, streamName, new DefaultAWSCredentialsProviderChain(), "ZombieConsumer_" + pid /* worker id*/) .withRegionName(region) .withFailoverTimeMillis(1000 * 30) // after 30 seconds this worker is considered ko .withMaxLeasesForWorker(2) // forced to read only 1 shard for demo reasons. .withMaxRecords(500) // max records per GetRecords .withCallProcessRecordsEvenForEmptyRecordList(false) // no records -> no processing .withInitialLeaseTableWriteCapacity(10) // Dynamodb lease table capacity .withInitialLeaseTableReadCapacity(10) .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); final Worker worker = new Worker.Builder() .recordProcessorFactory(zombieRecordFactory) .config(config) .build(); new Thread() { @Override public void run() { worker.run(); } }.start(); }
public void run() { streamId = enableStreamForTable(dynamoDBClient, StreamViewType.NEW_IMAGE, sourceTable); workerConfig = new KinesisClientLibConfiguration(KCL_APP_NAME, streamId, credentialsProvider, KCL_WORKER_NAME) .withMaxRecords(maxRecordsPerRead) .withIdleTimeBetweenReadsInMillis(idleTimeBetweenReads) .withInitialPositionInStream(initialStreamPosition ); worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient); workerThread = new Thread(worker); workerThread.start(); }
private void setKinesisClientLibConfigs(Map<String, String> config, KinesisClientLibConfiguration kinesisLibConfig) { for (Entry<String, String> entry : config.entrySet()) { boolean found = false; String key = entry.getKey(); String value = entry.getValue(); if (StringUtils.isEmpty(value)) { continue; } for (Method method : KinesisClientLibConfiguration.class.getMethods()) { if (method.getName().equals("with" + key)) { found = true; Class<?> type = method.getParameterTypes()[0]; try { if (type == long.class) { method.invoke(kinesisLibConfig, Long.valueOf(value)); } else if (type == int.class) { method.invoke(kinesisLibConfig, Integer.valueOf(value)); } else if (type == boolean.class) { method.invoke(kinesisLibConfig, Boolean.valueOf(value)); } else if (type == String.class) { method.invoke(kinesisLibConfig, value); } else if (type == InitialPositionInStream.class) { method.invoke(kinesisLibConfig, InitialPositionInStream.valueOf(value.toUpperCase())); } LOG.info("Loaded property " + key + " = " + value); break; } catch (Exception e) { throw new IllegalArgumentException( String.format("Error trying to set field %s with the value '%s'", key, value), e); } } } if (!found) { LOG.warn("Property " + key + " ignored as there is no corresponding set method"); } } }
@Test public void testKclConfigs() { Map<String, String> kv = new HashMap<>(); String system = "kinesis"; String stream = "kinesis-stream"; String systemConfigPrefix = String.format("systems.%s.", system); // region config is required for setting kcl config. kv.put(systemConfigPrefix + "aws.region", "us-east-1"); // Kcl Configs kv.put(systemConfigPrefix + "aws.kcl.TableName", "sample-table"); kv.put(systemConfigPrefix + "aws.kcl.MaxRecords", "100"); kv.put(systemConfigPrefix + "aws.kcl.CallProcessRecordsEvenForEmptyRecordList", "true"); kv.put(systemConfigPrefix + "aws.kcl.InitialPositionInStream", "TRIM_HORIZON"); // override one of the Kcl configs for kinesis-stream1 kv.put(systemConfigPrefix + "streams.kinesis-stream1.aws.kcl.InitialPositionInStream", "LATEST"); Config config = new MapConfig(kv); KinesisConfig kConfig = new KinesisConfig(config); KinesisClientLibConfiguration kclConfig = kConfig.getKinesisClientLibConfig(system, stream, "sample-app"); assertEquals("sample-table", kclConfig.getTableName()); assertEquals(100, kclConfig.getMaxRecords()); assertTrue(kclConfig.shouldCallProcessRecordsEvenForEmptyRecordList()); assertEquals(InitialPositionInStream.TRIM_HORIZON, kclConfig.getInitialPositionInStream()); // verify if the overriden config is applied for kinesis-stream1 kclConfig = kConfig.getKinesisClientLibConfig(system, "kinesis-stream1", "sample-app"); assertEquals(InitialPositionInStream.LATEST, kclConfig.getInitialPositionInStream()); }
public KinesisEventConsumer(String propertiesFile, String streamName, String appName, String initialPosition) { KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile(propertiesFile); InitialPositionInStream position = InitialPositionInStream.valueOf(initialPosition); KinesisClientLibConfiguration clientConfig = new KinesisClientLibConfiguration(appName, streamName, new DefaultAWSCredentialsProviderChain(), appName) .withRegionName(config.getRegion()) .withInitialPositionInStream(position); this.builder = new Worker.Builder().recordProcessorFactory(this).config(clientConfig); }
private Worker createKinesisWorker(IRecordProcessorFactory recordProcessorFactory, int maxBatchSize) { KinesisClientLibConfiguration kclConfig = new KinesisClientLibConfiguration( conf.applicationName, conf.streamName, credentials, getWorkerId() ); kclConfig .withMaxRecords(maxBatchSize) .withCallProcessRecordsEvenForEmptyRecordList(false) .withIdleTimeBetweenReadsInMillis(conf.idleTimeBetweenReads) .withInitialPositionInStream(conf.initialPositionInStream) .withKinesisClientConfig(clientConfiguration); if (conf.initialPositionInStream == InitialPositionInStream.AT_TIMESTAMP) { kclConfig.withTimestampAtInitialPositionInStream(new Date(conf.initialTimestamp)); } if (conf.region == AWSRegions.OTHER) { kclConfig.withKinesisEndpoint(conf.endpoint); } else { kclConfig.withRegionName(conf.region.getLabel()); } return new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .metricsFactory(metricsFactory) .dynamoDBClient(dynamoDBClient) .cloudWatchClient(cloudWatchClient) .execService(executor) .config(kclConfig) .build(); }
public static void main(String[] args) throws Exception { java.security.Security.setProperty("networkaddress.cache.ttl" , "60"); Server server = new Server(); ServerConnector connector = new ServerConnector(server); connector.setPort(80); server.addConnector(connector); ResourceHandler resource_handler = new ResourceHandler(); resource_handler.setResourceBase("."); ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); context.setContextPath("/"); ServletHolder holderEvents = new ServletHolder("ws-events", MessageProxyServlet.class); context.addServlet(holderEvents, "/kinesisapp/*"); HandlerList handlers = new HandlerList(); handlers.setHandlers(new Handler[] { resource_handler, context, new DefaultHandler() }); server.setHandler(handlers); server.start(); AWSCredentialsProvider credentialsProvider = new EnvironmentVariableCredentialsProvider(); KinesisClientLibConfiguration kinesisConfig = new KinesisClientLibConfiguration( System.getProperty("kinesisapp.name"), System.getProperty("kinesisapp.stream"), credentialsProvider, Long.toString(System.currentTimeMillis())) .withKinesisEndpoint(System.getProperty("kinesisapp.endpoint")); IRecordProcessorFactory factory = new Factory(); Worker worker = new Worker(factory, kinesisConfig); worker.run(); /* Processor p = new Processor(); p.test(); */ }
/** * Create a new Aggregator for Object Serialised Data based upon a Class * which is configured using Annotations from the base class. * * @param streamName The Stream Name that the Aggregator is receiving data * from. * @param appName The Application Name that an Aggregator is part of. * @param config The Kinesis Client Library Configuration to inherit * credentials and connectivity to the database from. * @param clazz The annotated class to use for configuration of the * aggregator * @return A Stream Aggregator which can process object serialised data * @throws Exception */ public static final StreamAggregator newInstance(String streamName, String appName, KinesisClientLibConfiguration config, Class clazz) throws Exception { AnnotationProcessor p = new AnnotationProcessor(clazz); ObjectExtractor dataExtractor = new ObjectExtractor(p.getLabelMethodNames(), clazz).withDateMethod(p.getDateMethodName()); dataExtractor.withSummaryConfig(p.getSummaryConfig()); //dataExtractor.withSummaryMethods(new ArrayList<>(p.getSummaryMethods().keySet())); StreamAggregator agg = new StreamAggregator(streamName, appName, p.getNamespace(), config, dataExtractor).withTimeHorizon(p.getTimeHorizon()).withAggregatorType(p.getType()).withRaiseExceptionOnDataExtractionErrors( p.shouldFailOnDataExtractionErrors()); // configure metrics service on the aggregator if it's been // configured if (p.shouldEmitMetrics() || (p.getMetricsEmitter() != null && !p.getMetricsEmitter().equals( CloudWatchMetricsEmitter.class))) { if (p.getMetricsEmitter() != null) { agg.withMetricsEmitter(p.getMetricsEmitter().newInstance()); } else { agg.withCloudWatchMetrics(); } } // create a new instance of the Data Store if one has been // configured. Currently we only support pluggable data stores that // are configured via their environment or have self defined // configuration models: only no args public constructors can be // called if (p.getDataStore() != null && !p.getDataStore().equals(DynamoDataStore.class)) { agg.withDataStore((IDataStore) p.getDataStore().newInstance()); } return agg; }
public StreamAggregator(String streamName, String applicationName, String namespace, KinesisClientLibConfiguration config, IDataExtractor dataExtractor) { this.streamName = streamName; this.applicationName = applicationName; this.namespace = namespace; this.config = config; this.dataExtractor = dataExtractor; }
/**** * Create subscriber if it has not already been created */ @Override public synchronized Subscriber createSubscriber(final String clientId, final String topic) { final String clientIdToUse = Strings.isNullOrEmpty(clientId)? DEFAULT_CLIENT_ID : clientId; Subscriber subscriber = this.subscribers.get(topic); if(subscriber == null){ LOGGER.debug("creating kinsis subscriber for topic {} - clientId: {}", topic, clientIdToUse); final KinesisClientLibConfiguration config = createConsumerConfig(clientIdToUse, topic); subscriber = new KinesisConsumer(topic, config, this.kinesisClient, this.dynamoDbClient, this.cloudwatchClient, this.mapper); this.subscribers.put(topic, subscriber); } return subscriber; }
/*** * Create consumer configurations */ private KinesisClientLibConfiguration createConsumerConfig(final String clientId, final String topic) { final String kinesisApplication = String.format(WORKER_ID_TEMPLATE, topic, clientId); final KinesisClientLibConfiguration clientConfig = new KinesisClientLibConfiguration(kinesisApplication, topic, new DefaultAWSCredentialsProviderChain(), UUID.randomUUID().toString()); return clientConfig; }
@Override public void open(VDSConfiguration ctx) throws Exception { _logger.info("Initializing Kinesis Source"); Executor executor = Executors.newCachedThreadPool(); Disruptor<KinesisEvent> disruptor = new Disruptor<>(KinesisEvent.EVENT_FACTORY, 128, executor); disruptor.handleEventsWith(new EventHandler<KinesisEvent>() { @Override public void onEvent(KinesisEvent kinesisEvent, long l, boolean b) throws Exception { _logger.debug("Received a Kinesis Event"); _eventList.put(kinesisEvent); _logger.debug("eventlist size is now {}", _eventList.size()); } }); RingBuffer<KinesisEvent> buffer = disruptor.start(); // Generate a unique worker ID String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); String accessid = ctx.getString("aws-access-key-id"); String secretkey = ctx.getString("aws-secret-key"); String streamname = ctx.getString("aws-kinesis-stream-name"); String appName = ctx.optString("application-name", System.getProperty("nodename")); BasicAWSCredentials creds = new BasicAWSCredentials(accessid, secretkey); CredProvider credprovider = new CredProvider(creds); KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(appName, streamname, credprovider, workerId) .withMaxRecords(MAX_EVENTS); _worker = new Worker(new RecordProcessorFactory(buffer), config, new MetricsFactory()); new Thread(_worker).start(); }
public void configure() throws Exception { if (!isConfigured) { validateConfig(); try { String userAgent = "AWSKinesisManagedConsumer/" + this.version; if (this.positionInStream != null) { streamPosition = InitialPositionInStream.valueOf(this.positionInStream); } else { streamPosition = InitialPositionInStream.LATEST; } // append the environment name to the application name if (environmentName != null) { appName = String.format("%s-%s", appName, environmentName); } // ensure the JVM will refresh the cached IP values of AWS // resources // (e.g. service endpoints). java.security.Security.setProperty("networkaddress.cache.ttl", "60"); String workerId = NetworkInterface.getNetworkInterfaces() + ":" + UUID.randomUUID(); LOG.info("Using Worker ID: " + workerId); // obtain credentials using the default provider chain or the // credentials provider supplied AWSCredentialsProvider credentialsProvider = this.credentialsProvider == null ? new DefaultAWSCredentialsProviderChain() : this.credentialsProvider; LOG.info("Using credentials with Access Key ID: " + credentialsProvider.getCredentials().getAWSAccessKeyId()); config = new KinesisClientLibConfiguration(appName, streamName, credentialsProvider, workerId).withInitialPositionInStream(streamPosition).withKinesisEndpoint( kinesisEndpoint); config.getKinesisClientConfiguration().setUserAgent(userAgent); if (regionName != null) { Region region = Region.getRegion(Regions.fromName(regionName)); config.withRegionName(region.getName()); } if (this.maxRecords != -1) config.withMaxRecords(maxRecords); if (this.positionInStream != null) config.withInitialPositionInStream(InitialPositionInStream.valueOf(this.positionInStream)); LOG.info(String.format( "Amazon Kinesis Managed Client prepared for %s on %s in %s (%s) using %s Max Records", config.getApplicationName(), config.getStreamName(), config.getRegionName(), config.getWorkerIdentifier(), config.getMaxRecords())); isConfigured = true; } catch (Exception e) { throw new InvalidConfigurationException(e); } } }
public static KCL create (KinesisClientLibConfiguration configuration) { return new KCL (configuration); }
private KCL (KinesisClientLibConfiguration configuration) { this.config = configuration; }
public void configure() throws Exception { if (!isConfigured) { validateConfig(); if (this.positionInStream != null) { streamPosition = InitialPositionInStream .valueOf(this.positionInStream); } else { streamPosition = InitialPositionInStream.LATEST; } // append the environment name to the application name if (environmentName != null) { appName = String.format("%s-%s", appName, environmentName); } // ensure the JVM will refresh the cached IP values of AWS resources // (e.g. service endpoints). java.security.Security .setProperty("networkaddress.cache.ttl", "60"); String workerId = NetworkInterface.getNetworkInterfaces() + ":" + UUID.randomUUID(); LOG.info("Using Worker ID: " + workerId); // obtain credentials using the default provider chain or the // credentials provider supplied AWSCredentialsProvider credentialsProvider = this.credentialsProvider == null ? new DefaultAWSCredentialsProviderChain() : this.credentialsProvider; LOG.info("Using credentials with Access Key ID: " + credentialsProvider.getCredentials().getAWSAccessKeyId()); config = new KinesisClientLibConfiguration(appName, streamName, credentialsProvider, workerId).withInitialPositionInStream( streamPosition).withKinesisEndpoint(kinesisEndpoint); config.getKinesisClientConfiguration().setUserAgent( StreamAggregator.AWSApplication); if (regionName != null) { Region region = Region.getRegion(Regions.fromName(regionName)); config.withRegionName(region.getName()); } if (maxRecords != -1) config.withMaxRecords(maxRecords); // initialise the Aggregators aggGroup = buildAggregatorsFromConfig(); LOG.info(String .format("Amazon Kinesis Aggregators Managed Client prepared for %s on %s in %s (%s) using %s Max Records", config.getApplicationName(), config.getStreamName(), config.getRegionName(), config.getWorkerIdentifier(), config.getMaxRecords())); isConfigured = true; } }
/** * Factory Method which generates a Regular Expression based Aggregator for * a number of Time Horizons * * @param streamName The name of the Stream to aggregate against. * @param appName The application name to associate with the aggregator. * @param config The Kinesis Configuration used for the containing worker. * @param namespace The namespace to associate with the aggregated data. * @param timeHorizon The time horizons on which to aggregate data. * @param aggregatorType The type of aggregator to create. * @param regularExpression The regular expression used to extract data from * the Kinesis Stream via Character Classes * @param labelIndicies The index of the extracted data to be used as the * aggregation label * @param dateIndex The index of the extracted data to be used as the time * value * @param dateFormat The format of the data which represents the event time * when shipped as a String * @param summaryIndicies The indicies or Summary Expressions on indicies * which contain summary values to be aggregated * @return * @throws Exception */ public static final StreamAggregator newInstance(String streamName, String appName, KinesisClientLibConfiguration config, String namespace, List<TimeHorizon> timeHorizons, AggregatorType aggregatorType, String regularExpression, List<Integer> labelIndicies, String labelAttributeAlias, int dateIndex, String dateFormat, String dateAlias, List<Object> summaryIndicies) throws Exception { StringDataExtractor dataExtractor = new RegexDataExtractor(regularExpression, labelIndicies).withDateValueIndex( dateIndex).withDateFormat(dateFormat).withSummaryIndicies(summaryIndicies); dataExtractor.setAggregatorType(aggregatorType); if (labelAttributeAlias != null && !labelAttributeAlias.equals("")) { dataExtractor.withLabelAttributeAlias(labelAttributeAlias); } if (dateAlias != null && !dateAlias.equals("")) { dataExtractor.withDateAttributeAlias(dateAlias); } return new StreamAggregator(streamName, appName, namespace, config, dataExtractor).withTimeHorizon( timeHorizons).withAggregatorType(aggregatorType); }
public void run() throws Exception { adapterClient = new AmazonDynamoDBStreamsAdapterClient(new ClientConfiguration()); adapterClient.setEndpoint(streamsEndpoint); dynamoDBClient = new AmazonDynamoDBClient(new ClientConfiguration()); dynamoDBClient.setEndpoint(dynamodbEndpoint); cloudWatchClient = new AmazonCloudWatchClient(dynamoDBCredentials, new ClientConfiguration()); TcpDiscoverySpi spi = new TcpDiscoverySpi(); TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); List<String> hostList = Arrays.asList(Properties.getString("hostList").split(",")); ipFinder.setAddresses(hostList); spi.setIpFinder(ipFinder); IgniteConfiguration cfg = new IgniteConfiguration(); cfg.setDiscoverySpi(spi); cfg.setClientMode(true); cfg.setPeerClassLoadingEnabled(true); @SuppressWarnings("unused") Ignite ignite = Ignition.start(cfg); cache = Ignition.ignite().cache(Properties.getString("cacheName")); LOG.info(">>> cache acquired"); recordProcessorFactory = new StreamsRecordProcessorFactory(cache); workerConfig = new KinesisClientLibConfiguration(Properties.getString("applicationName"), streamArn, streamsCredentials, "ddbstreamsworker") .withMaxRecords(Integer.parseInt(Properties.getString("maxRecords"))) .withInitialPositionInStream( InitialPositionInStream.valueOf(Properties.getString("initialPositionInStream"))); LOG.info("Creating worker for stream: " + streamArn); worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient); LOG.info("Starting worker..."); int exitCode = 0; try { worker.run(); } catch (Throwable t) { LOG.error("Caught throwable while processing data."); t.printStackTrace(); exitCode = 1; } System.exit(exitCode); }
/** * @param args */ public static void main(String[] args) throws Exception { System.out.println("Starting demo..."); String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; streamsCredentials = new ProfileCredentialsProvider(); dynamoDBCredentials = new ProfileCredentialsProvider(); recordProcessorFactory = new StreamsRecordProcessorFactory(dynamoDBCredentials, dynamodbEndpoint, serviceName, destTable); /* ===== REQUIRED ===== * Users will have to explicitly instantiate and configure the adapter, then pass it to * the KCL worker. */ adapterClient = new AmazonDynamoDBStreamsAdapterClient(streamsCredentials, new ClientConfiguration()); adapterClient.setEndpoint(streamsEndpoint); dynamoDBClient = new AmazonDynamoDBClient(dynamoDBCredentials, new ClientConfiguration()); dynamoDBClient.setEndpoint(dynamodbEndpoint); cloudWatchClient = new AmazonCloudWatchClient(dynamoDBCredentials, new ClientConfiguration()); setUpTables(); workerConfig = new KinesisClientLibConfiguration("streams-adapter-demo", streamArn, streamsCredentials, "streams-demo-worker") .withMaxRecords(1) .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); System.out.println("Creating worker for stream: " + streamArn); worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient); System.out.println("Starting worker..."); Thread t = new Thread(worker); t.start(); Thread.sleep(25000); worker.shutdown(); t.join(); if(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems().equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); } System.out.println("Done."); cleanupAndExit(0); }
/** * Creates an Aggregator for data that is formatted as JSON Strings on the * Kinesis Stream. * * @param streamName The Stream Name that the Aggregator is receiving data * from. * @param appName The Application Name that an Aggregator is part of. * @param config The Kinesis Client Library Configuration to inherit * credentials and connectivity to the database from. * @param namespace The namespace used to separate this Aggregator's output * data from other Aggregated data * @param timeHorizon The Time Horizon value to use for the granularity of * the Aggregated data * @param aggregatorType The type of Aggregator to create. Default is COUNT. * @param labelAttributes The attribute name in the JSON document which * should be used as the label value for Aggregation * @param dateAttribute The attribute name in the JSON document which should * be used for the time element of the Aggregation. If NULL then the * client receive time will be used. * @param dateFormat The format of the dateAttribute, if String based dates * are used. This should follow {@link java.text.SimpleDateFormat} * convention. * @param summaryAttributes List of attributes or expressions on attributes * which should be used for summary aggregation. * @return A Stream Aggregator which can process JSON data containing the * indicated attributes. * @throws Exception */ public static final StreamAggregator newInstance(String streamName, String appName, KinesisClientLibConfiguration config, String namespace, TimeHorizon timeHorizon, AggregatorType aggregatorType, List<String> labelAttributes, String dateAttribute, String dateFormat, List<String> summaryAttributes) throws Exception { return newInstance(streamName, appName, config, namespace, Arrays.asList(new TimeHorizon[] { timeHorizon }), aggregatorType, labelAttributes, dateAttribute, dateFormat, summaryAttributes); }
/** * Creates an Aggregator for data that is formatted as JSON Strings on the * Kinesis Stream. * * @param streamName The Stream Name that the Aggregator is receiving data * from. * @param appName The Application Name that an Aggregator is part of. * @param workerId The worker ID hosting the Aggregator. * @param config The Kinesis Client Library Configuration to inherit * credentials and connectivity to the database from. * @param namespace The namespace used to separate this Aggregator's output * data from other Aggregated data. * @param timeHorizons The list of Time Horizon values to use the * aggregator. Data will be automatically managed at ALL of the * requested granularities using a prefixed namespace on dates. * @param aggregatorType The type of Aggregator to create. Default is COUNT. * @param labelAttributes The attribute name in the JSON document which * should be used as the label value for Aggregation. * @param dateAttribute The attribute name in the JSON document which should * be used for the time element of the Aggregation. If NULL then the * client receive time will be used. * @param dateFormat The format of the dateAttribute, if String based dates * are used. This should follow {@link java.text.SimpleDateFormat} * convention. * @param summaryAttributes List of attributes or expressions on attributes * which should be used for summary aggregation. * @return A Stream Aggregator which can process JSON data containing the * indicated attributes. * @throws Exception */ public static final StreamAggregator newInstance(String streamName, String appName, KinesisClientLibConfiguration config, String namespace, List<TimeHorizon> timeHorizons, AggregatorType aggregatorType, List<String> labelAttributes, String dateAttribute, String dateFormat, List<String> summaryAttributes) throws Exception { IDataExtractor dataExtractor = new JsonDataExtractor(labelAttributes).withDateValueAttribute( dateAttribute).withSummaryAttributes(summaryAttributes).withDateFormat(dateFormat); dataExtractor.setAggregatorType(aggregatorType); return new StreamAggregator(streamName, appName, namespace, config, dataExtractor).withTimeHorizon( timeHorizons).withAggregatorType(aggregatorType); }
/** * Factory Method which generates a Regular Expression based Aggregator for * a single Time Horizon * * @param streamName * @param appName * @param config * @param namespace * @param timeHorizon * @param aggregatorType * @param regularExpression * @param labelIndicies * @param dateIndex * @param dateFormat * @param summaryIndicies * @return * @throws Exception */ public static final StreamAggregator newInstance(String streamName, String appName, KinesisClientLibConfiguration config, String namespace, TimeHorizon timeHorizon, AggregatorType aggregatorType, String regularExpression, List<Integer> labelIndicies, String labelAttributeAlias, int dateIndex, String dateFormat, String dateAlias, List<Object> summaryIndicies) throws Exception { return newInstance(streamName, appName, config, namespace, Arrays.asList(new TimeHorizon[] { timeHorizon }), aggregatorType, regularExpression, labelIndicies, labelAttributeAlias, dateIndex, dateFormat, dateAlias, summaryIndicies); }
/** * Factory Method to generate a new Aggregator for CSV Data. * * @param streamName The name of the Stream to aggregate against. * @param appName The application name to associate with the aggregator. * @param config The Kinesis Configuration used for the containing worker. * @param namespace The namespace to associate with the aggregated data. * @param timeHorizon The time horizons on which to aggregate data. * @param aggregatorType The type of aggregator to create. * @param delimiter The character delimiter for data on the stream. * @param labelIndicies The position of the field in the stream data which * should be used to aggregate data. * @param dateIndex The position of the field which includes a date item * used to aggregate data by the timeHorizon. Values can be in String * format if dateFormat is supplied, or in epoch seconds. * @param dateFormat The format of the date item, if provided as a String * @param summaryIndicies The list of field positions, or expressions using * a {@link SummaryCalculation} against the field positions. For * example, simple summaries might have a list of '0,1,2' or when * expressions are used, a list of 'min(0),sum(1),max(2)'. * @return Returns a new CSV Aggregator. * @throws Exception */ public static final StreamAggregator newInstance(String streamName, String appName, KinesisClientLibConfiguration config, String namespace, TimeHorizon timeHorizon, AggregatorType aggregatorType, String delimiter, List<Integer> labelIndicies, String labelAttributeAlias, int dateIndex, String dateFormat, String dateAlias, List<Object> summaryIndicies) throws Exception { return newInstance(streamName, appName, config, namespace, Arrays.asList(new TimeHorizon[] { timeHorizon }), aggregatorType, delimiter, labelIndicies, labelAttributeAlias, dateIndex, dateFormat, dateAlias, summaryIndicies); }
/** * Create a new Aggregator for data which is object serialised on the stream * using Jackson JSON Serialisation. * * @param streamName The Stream Name that the Aggregator is receiving data * from. * @param appName The Application Name that an Aggregator is part of. * @param config The Kinesis Client Library Configuration to inherit * credentials and connectivity to the database from. * @param namespace The namespace used to separate this Aggregator's output * data from other Aggregated data * @param timeHorizon The Time Horizon value to use for the granularity of * the Aggregated data * @param aggregatorType The type of Aggregator to create. Default is COUNT. * @param clazz The base class to use as a Transfer Object for the data * stream. * @param labelMethods The method on the base class to use to obtain the * label for aggregation. * @param dateMethod The method on the object which should be used to * establish the time. If NULL then the client receive time will be * used. * @param summaryMethods List of summary method names or expressions to be * used when the AggregatorType is SUM, as secondary aggregated data * points * @return A Stream Aggregator which can process object serialised data * @throws Exception */ public static final StreamAggregator newInstance(String streamName, String appName, KinesisClientLibConfiguration config, String namespace, TimeHorizon timeHorizon, AggregatorType aggregatorType, Class clazz, List<String> labelMethods, String dateMethod, List<String> summaryMethods) throws Exception { return newInstance(streamName, appName, config, namespace, Arrays.asList(new TimeHorizon[] { timeHorizon }), aggregatorType, clazz, labelMethods, dateMethod, summaryMethods); }
/** * Create a new Aggregator for data which is object serialised on the stream * using Jackson JSON Serialisation. * * @param streamName The Stream Name that the Aggregator is receiving data * from. * @param appName The Application Name that an Aggregator is part of. * @param config The Kinesis Client Library Configuration to inherit * credentials and connectivity to the database from. * @param namespace The namespace used to separate this Aggregator's output * data from other Aggregated data. * @param timeHorizons The list of Time Horizon values to use the * aggregator. Data will be automatically managed at ALL of the * requested granularities using a prefixed namespace on dates. * @param aggregatorType The type of Aggregator to create. Default is COUNT. * @param clazz The base class to use as a Transfer Object for the data * stream. * @param labelMethods The methods on the base class to use to obtain the * label for aggregation. * @param dateMethod The method on the object which should be used to * establish the time. If NULL then the client receive time will be * used. * @param summaryMethods List of summary method names or expressions to be * used when the AggregatorType is SUM, as secondary aggregated data * points. * @return A Stream Aggregator which can process object serialised data. * @return * @throws Exception */ public static final StreamAggregator newInstance(String streamName, String appName, KinesisClientLibConfiguration config, String namespace, List<TimeHorizon> timeHorizons, AggregatorType aggregatorType, Class clazz, List<String> labelMethods, String dateMethod, List<String> summaryMethods) throws Exception { IDataExtractor dataExtractor = new ObjectExtractor(labelMethods, clazz).withDateMethod( dateMethod).withSummaryMethods(summaryMethods); dataExtractor.setAggregatorType(aggregatorType); return new StreamAggregator(streamName, appName, namespace, config, dataExtractor).withTimeHorizon( timeHorizons).withAggregatorType(aggregatorType); }