@Test public void readsDataFromMockKinesis() { int noOfShards = 3; int noOfEventsPerShard = 100; List<List<AmazonKinesisMock.TestData>> testData = provideTestData(noOfShards, noOfEventsPerShard); PCollection<AmazonKinesisMock.TestData> result = p .apply( KinesisIO.read() .withStreamName("stream") .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) .withAWSClientsProvider(new AmazonKinesisMock.Provider(testData, 10)) .withMaxNumRecords(noOfShards * noOfEventsPerShard)) .apply(ParDo.of(new KinesisRecordToTestData())); PAssert.that(result).containsInAnyOrder(Iterables.concat(testData)); p.run(); }
private void startKinesisConsumer() throws Exception { AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain(); String region = "eu-west-1"; logger.info("Starting in Region " + region); String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration( this.getClass().getName(), TestConstants.stream, credentialsProvider, workerId) .withInitialPositionInStream(InitialPositionInStream.LATEST).withRegionName(region); IRecordProcessorFactory recordProcessorFactory = new RecordFactory(); worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration); es = Executors.newSingleThreadExecutor(); es.execute(worker); }
public JKinesisReceiver(String applicationName, String streamName, String endpointUrl, String regionName, Duration checkpoint, InitialPositionInStream position) { super(StorageLevel.MEMORY_ONLY_SER()); this.workerId = getHostname() + ":" + String.valueOf(UUID.randomUUID()); this.checkpointInterval = checkpoint; this.initialPosition = position; Region region = RegionUtils.getRegion(regionName); try { this.kclConfig = new KinesisClientLibConfiguration(applicationName, streamName, getCredsProvider(), workerId) .withCommonClientConfig(CLIENT_CONF) .withRegionName(region.getName()) .withKinesisEndpoint(endpointUrl) .withInitialPositionInStream(InitialPositionInStream.LATEST) .withTaskBackoffTimeMillis(500); } catch (Exception ex) { // do absolutely nothing - and feel good about it! // but ... // we'd do something meaningful in a PROD context } }
/** * Get KCL config for a given system stream. * @param system name of the system * @param stream name of the stream * @param appName name of the application * @return Stream scoped KCL configs required to build * {@link KinesisClientLibConfiguration} */ public KinesisClientLibConfiguration getKinesisClientLibConfig(String system, String stream, String appName) { ClientConfiguration clientConfig = getAWSClientConfig(system); String workerId = appName + "-" + UUID.randomUUID(); InitialPositionInStream startPos = InitialPositionInStream.LATEST; AWSCredentialsProvider provider = credentialsProviderForStream(system, stream); KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(appName, stream, provider, workerId) .withRegionName(getRegion(system, stream).getName()) .withKinesisClientConfig(clientConfig) .withCloudWatchClientConfig(clientConfig) .withDynamoDBClientConfig(clientConfig) .withInitialPositionInStream(startPos) .withCallProcessRecordsEvenForEmptyRecordList(true); // For health monitoring metrics. // First, get system scoped configs for KCL and override with configs set at stream scope. setKinesisClientLibConfigs( subset(String.format(CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, system)), kinesisClientLibConfiguration); setKinesisClientLibConfigs(subset(String.format(CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG, system, stream)), kinesisClientLibConfiguration); return kinesisClientLibConfiguration; }
private void upgradeToConsumerConfigBeanV1(List<Config> configs) { for (Config config : configs) { // Migrate existing configs that were moved into the Kinesis Consumer config bean switch (config.getName()) { case "applicationName": // fall through case "maxBatchSize": // fall through case "idleTimeBetweenReads": // fall through case "maxWaitTime": // fall through case "previewWaitTime": moveConfigToBean(config, KINESIS_CONFIG_BEAN); break; default: // no-op } } commitMove(configs); configs.add(new Config(KINESIS_CONFIG_BEAN + ".initialPositionInStream", InitialPositionInStream.LATEST)); }
private KinesisConsumerConfigBean getKinesisConsumerConfig(String streamName) { KinesisConsumerConfigBean conf = new KinesisConsumerConfigBean(); conf.dataFormatConfig = new DataParserFormatConfig(); conf.awsConfig = new AWSConfig(); conf.awsConfig.awsAccessKeyId = () -> "foo"; conf.awsConfig.awsSecretAccessKey = () -> "boo"; conf.region = AWSRegions.OTHER; conf.endpoint = getKinesisEndpoint(); conf.streamName = streamName; conf.dataFormat = DataFormat.JSON; conf.dataFormatConfig.jsonContent = JsonMode.MULTIPLE_OBJECTS; conf.dataFormatConfig.charset = "UTF-8"; conf.dataFormatConfig.jsonMaxObjectLen = 1024; conf.applicationName = UUID.randomUUID().toString(); conf.idleTimeBetweenReads = 250; conf.initialPositionInStream = InitialPositionInStream.TRIM_HORIZON; conf.maxBatchSize = 1000; conf.maxRecordProcessors = 2; // Must be at least 1 return conf; }
private void initKinesis() { String pid = ManagementFactory.getRuntimeMXBean().getName(); pid = pid.indexOf('@') == -1 ? pid : pid.substring(0, pid.indexOf('@')); log.info("Creating kinesis consumer with pid {}.", pid); KinesisClientLibConfiguration config = new KinesisClientLibConfiguration( "Zombies" /* aplication name */, streamName, new DefaultAWSCredentialsProviderChain(), "ZombieConsumer_" + pid /* worker id*/) .withRegionName(region) .withFailoverTimeMillis(1000 * 30) // after 30 seconds this worker is considered ko .withMaxLeasesForWorker(2) // forced to read only 1 shard for demo reasons. .withMaxRecords(500) // max records per GetRecords .withCallProcessRecordsEvenForEmptyRecordList(false) // no records -> no processing .withInitialLeaseTableWriteCapacity(10) // Dynamodb lease table capacity .withInitialLeaseTableReadCapacity(10) .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); final Worker worker = new Worker.Builder() .recordProcessorFactory(zombieRecordFactory) .config(config) .build(); new Thread() { @Override public void run() { worker.run(); } }.start(); }
@Test public void shouldMapAllShardsToCheckpoints() throws Exception { given(shard1.getShardId()).willReturn("shard-01"); given(shard2.getShardId()).willReturn("shard-02"); given(shard3.getShardId()).willReturn("shard-03"); given(kinesisClient.listShards("stream")).willReturn(asList(shard1, shard2, shard3)); StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST); DynamicCheckpointGenerator underTest = new DynamicCheckpointGenerator("stream", startingPoint); KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient); assertThat(checkpoint).hasSize(3); }
public void start() { final JavaStreamingContext context = new JavaStreamingContext(conf, checkpointInterval); // for graceful shutdown of the application ... Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { System.out.println("Shutting down streaming app..."); context.stop(true, true); System.out.println("Shutdown of streaming app complete."); } }); JKinesisReceiver receiver = new JKinesisReceiver(appName, streamName, endpointUrl, regionName, checkpointInterval, InitialPositionInStream.LATEST); JavaDStream<String> dstream = context.receiverStream(receiver); JavaDStream<EventRecord> recs = dstream.map(new EventRecordMapFunc()); recs.print(); // persist to DStream to Cassandra javaFunctions(recs) .writerBuilder("canary", "eventrecord", mapToRow(EventRecord.class)) .saveToCassandra(); System.out.println("Start Spark Stream Processing..."); context.start(); context.awaitTermination(); }
private void setKinesisClientLibConfigs(Map<String, String> config, KinesisClientLibConfiguration kinesisLibConfig) { for (Entry<String, String> entry : config.entrySet()) { boolean found = false; String key = entry.getKey(); String value = entry.getValue(); if (StringUtils.isEmpty(value)) { continue; } for (Method method : KinesisClientLibConfiguration.class.getMethods()) { if (method.getName().equals("with" + key)) { found = true; Class<?> type = method.getParameterTypes()[0]; try { if (type == long.class) { method.invoke(kinesisLibConfig, Long.valueOf(value)); } else if (type == int.class) { method.invoke(kinesisLibConfig, Integer.valueOf(value)); } else if (type == boolean.class) { method.invoke(kinesisLibConfig, Boolean.valueOf(value)); } else if (type == String.class) { method.invoke(kinesisLibConfig, value); } else if (type == InitialPositionInStream.class) { method.invoke(kinesisLibConfig, InitialPositionInStream.valueOf(value.toUpperCase())); } LOG.info("Loaded property " + key + " = " + value); break; } catch (Exception e) { throw new IllegalArgumentException( String.format("Error trying to set field %s with the value '%s'", key, value), e); } } } if (!found) { LOG.warn("Property " + key + " ignored as there is no corresponding set method"); } } }
@Test public void testKclConfigs() { Map<String, String> kv = new HashMap<>(); String system = "kinesis"; String stream = "kinesis-stream"; String systemConfigPrefix = String.format("systems.%s.", system); // region config is required for setting kcl config. kv.put(systemConfigPrefix + "aws.region", "us-east-1"); // Kcl Configs kv.put(systemConfigPrefix + "aws.kcl.TableName", "sample-table"); kv.put(systemConfigPrefix + "aws.kcl.MaxRecords", "100"); kv.put(systemConfigPrefix + "aws.kcl.CallProcessRecordsEvenForEmptyRecordList", "true"); kv.put(systemConfigPrefix + "aws.kcl.InitialPositionInStream", "TRIM_HORIZON"); // override one of the Kcl configs for kinesis-stream1 kv.put(systemConfigPrefix + "streams.kinesis-stream1.aws.kcl.InitialPositionInStream", "LATEST"); Config config = new MapConfig(kv); KinesisConfig kConfig = new KinesisConfig(config); KinesisClientLibConfiguration kclConfig = kConfig.getKinesisClientLibConfig(system, stream, "sample-app"); assertEquals("sample-table", kclConfig.getTableName()); assertEquals(100, kclConfig.getMaxRecords()); assertTrue(kclConfig.shouldCallProcessRecordsEvenForEmptyRecordList()); assertEquals(InitialPositionInStream.TRIM_HORIZON, kclConfig.getInitialPositionInStream()); // verify if the overriden config is applied for kinesis-stream1 kclConfig = kConfig.getKinesisClientLibConfig(system, "kinesis-stream1", "sample-app"); assertEquals(InitialPositionInStream.LATEST, kclConfig.getInitialPositionInStream()); }
public KinesisEventConsumer(String propertiesFile, String streamName, String appName, String initialPosition) { KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile(propertiesFile); InitialPositionInStream position = InitialPositionInStream.valueOf(initialPosition); KinesisClientLibConfiguration clientConfig = new KinesisClientLibConfiguration(appName, streamName, new DefaultAWSCredentialsProviderChain(), appName) .withRegionName(config.getRegion()) .withInitialPositionInStream(position); this.builder = new Worker.Builder().recordProcessorFactory(this).config(clientConfig); }
private Worker createKinesisWorker(IRecordProcessorFactory recordProcessorFactory, int maxBatchSize) { KinesisClientLibConfiguration kclConfig = new KinesisClientLibConfiguration( conf.applicationName, conf.streamName, credentials, getWorkerId() ); kclConfig .withMaxRecords(maxBatchSize) .withCallProcessRecordsEvenForEmptyRecordList(false) .withIdleTimeBetweenReadsInMillis(conf.idleTimeBetweenReads) .withInitialPositionInStream(conf.initialPositionInStream) .withKinesisClientConfig(clientConfiguration); if (conf.initialPositionInStream == InitialPositionInStream.AT_TIMESTAMP) { kclConfig.withTimestampAtInitialPositionInStream(new Date(conf.initialTimestamp)); } if (conf.region == AWSRegions.OTHER) { kclConfig.withKinesisEndpoint(conf.endpoint); } else { kclConfig.withRegionName(conf.region.getLabel()); } return new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .metricsFactory(metricsFactory) .dynamoDBClient(dynamoDBClient) .cloudWatchClient(cloudWatchClient) .execService(executor) .config(kclConfig) .build(); }
private InitialPositionInStream convertToPosition(KinesisInputProperties.OffsetType offsetType) { switch (offsetType) { case LATEST: return InitialPositionInStream.LATEST; case EARLIEST: return InitialPositionInStream.TRIM_HORIZON; default: TalendRuntimeException.build(CommonErrorCodes.UNEXPECTED_ARGUMENT).setAndThrow( String.format("Do not support OffsetType %s", offsetType)); return null; } }
/** * @param propertiesFile * @throws IOException Thrown when we run into issues reading properties */ private static void loadProperties(String propertiesFile) throws IOException { FileInputStream inputStream = new FileInputStream(propertiesFile); Properties properties = new Properties(); try { properties.load(inputStream); } finally { inputStream.close(); } String appNameOverride = properties.getProperty(ConfigKeys.APPLICATION_NAME_KEY); if (appNameOverride != null) { applicationName = appNameOverride; } LOG.info("Using application name " + applicationName); String streamNameOverride = properties.getProperty(ConfigKeys.STREAM_NAME_KEY); if (streamNameOverride != null) { streamName = streamNameOverride; } LOG.info("Using stream name " + streamName); String kinesisEndpointOverride = properties.getProperty(ConfigKeys.KINESIS_ENDPOINT_KEY); if (kinesisEndpointOverride != null) { kinesisEndpoint = kinesisEndpointOverride; } String initialPositionOverride = properties.getProperty(ConfigKeys.INITIAL_POSITION_IN_STREAM_KEY); if (initialPositionOverride != null) { initialPositionInStream = InitialPositionInStream.valueOf(initialPositionOverride); } LOG.info("Using initial position " + initialPositionInStream.toString() + " (if a checkpoint is not found)."); }
public void configure() throws Exception { if (!isConfigured) { validateConfig(); try { String userAgent = "AWSKinesisManagedConsumer/" + this.version; if (this.positionInStream != null) { streamPosition = InitialPositionInStream.valueOf(this.positionInStream); } else { streamPosition = InitialPositionInStream.LATEST; } // append the environment name to the application name if (environmentName != null) { appName = String.format("%s-%s", appName, environmentName); } // ensure the JVM will refresh the cached IP values of AWS // resources // (e.g. service endpoints). java.security.Security.setProperty("networkaddress.cache.ttl", "60"); String workerId = NetworkInterface.getNetworkInterfaces() + ":" + UUID.randomUUID(); LOG.info("Using Worker ID: " + workerId); // obtain credentials using the default provider chain or the // credentials provider supplied AWSCredentialsProvider credentialsProvider = this.credentialsProvider == null ? new DefaultAWSCredentialsProviderChain() : this.credentialsProvider; LOG.info("Using credentials with Access Key ID: " + credentialsProvider.getCredentials().getAWSAccessKeyId()); config = new KinesisClientLibConfiguration(appName, streamName, credentialsProvider, workerId).withInitialPositionInStream(streamPosition).withKinesisEndpoint( kinesisEndpoint); config.getKinesisClientConfiguration().setUserAgent(userAgent); if (regionName != null) { Region region = Region.getRegion(Regions.fromName(regionName)); config.withRegionName(region.getName()); } if (this.maxRecords != -1) config.withMaxRecords(maxRecords); if (this.positionInStream != null) config.withInitialPositionInStream(InitialPositionInStream.valueOf(this.positionInStream)); LOG.info(String.format( "Amazon Kinesis Managed Client prepared for %s on %s in %s (%s) using %s Max Records", config.getApplicationName(), config.getStreamName(), config.getRegionName(), config.getWorkerIdentifier(), config.getMaxRecords())); isConfigured = true; } catch (Exception e) { throw new InvalidConfigurationException(e); } } }
public static Builder newBuilder() { return new Builder() .checkpointIntervalMillis(2000) .initialPositionInStream(InitialPositionInStream.TRIM_HORIZON) .storageLevel(StorageLevel.MEMORY_AND_DISK_2()); }
/** * Specify reading from some initial position in stream. */ public Read withInitialPositionInStream(InitialPositionInStream initialPosition) { return toBuilder() .setInitialPosition(new StartingPoint(initialPosition)) .build(); }
public StartingPoint(InitialPositionInStream position) { this.position = checkNotNull(position, "position"); this.timestamp = null; }
public InitialPositionInStream getPosition() { return position; }
public InitialPositionInStream getInitialStreamPoisition() { return initialStreamPosition; }
public void setInitialStreamPosition(InitialPositionInStream initialPosition) { initialStreamPosition = initialPosition; }
public InitialPositionInStreamChooserValues() { super(InitialPositionInStream.values()); }
public void configure() throws Exception { if (!isConfigured) { validateConfig(); if (this.positionInStream != null) { streamPosition = InitialPositionInStream .valueOf(this.positionInStream); } else { streamPosition = InitialPositionInStream.LATEST; } // append the environment name to the application name if (environmentName != null) { appName = String.format("%s-%s", appName, environmentName); } // ensure the JVM will refresh the cached IP values of AWS resources // (e.g. service endpoints). java.security.Security .setProperty("networkaddress.cache.ttl", "60"); String workerId = NetworkInterface.getNetworkInterfaces() + ":" + UUID.randomUUID(); LOG.info("Using Worker ID: " + workerId); // obtain credentials using the default provider chain or the // credentials provider supplied AWSCredentialsProvider credentialsProvider = this.credentialsProvider == null ? new DefaultAWSCredentialsProviderChain() : this.credentialsProvider; LOG.info("Using credentials with Access Key ID: " + credentialsProvider.getCredentials().getAWSAccessKeyId()); config = new KinesisClientLibConfiguration(appName, streamName, credentialsProvider, workerId).withInitialPositionInStream( streamPosition).withKinesisEndpoint(kinesisEndpoint); config.getKinesisClientConfiguration().setUserAgent( StreamAggregator.AWSApplication); if (regionName != null) { Region region = Region.getRegion(Regions.fromName(regionName)); config.withRegionName(region.getName()); } if (maxRecords != -1) config.withMaxRecords(maxRecords); // initialise the Aggregators aggGroup = buildAggregatorsFromConfig(); LOG.info(String .format("Amazon Kinesis Aggregators Managed Client prepared for %s on %s in %s (%s) using %s Max Records", config.getApplicationName(), config.getStreamName(), config.getRegionName(), config.getWorkerIdentifier(), config.getMaxRecords())); isConfigured = true; } }
/** * @param propertiesFile * @throws IOException Thrown when we run into issues reading properties */ private static void loadProperties(String propertiesFile) throws IOException { FileInputStream inputStream = new FileInputStream(propertiesFile); Properties properties = new Properties(); try { properties.load(inputStream); } finally { inputStream.close(); } String appNameOverride = properties.getProperty(ConfigKeys.APPLICATION_NAME_KEY); if (appNameOverride != null) { applicationName = appNameOverride; } LOG.info("Using application name " + applicationName); String streamNameOverride = properties.getProperty(ConfigKeys.STREAM_NAME_KEY); if (streamNameOverride != null) { streamName = streamNameOverride; } LOG.info("Using stream name " + streamName); String kinesisEndpointOverride = properties.getProperty(ConfigKeys.KINESIS_ENDPOINT_KEY); if (kinesisEndpointOverride != null) { kinesisEndpoint = kinesisEndpointOverride; } LOG.info("Using Kinesis endpoint " + kinesisEndpoint); String initialPositionOverride = properties.getProperty(ConfigKeys.INITIAL_POSITION_IN_STREAM_KEY); if (initialPositionOverride != null) { initialPositionInStream = InitialPositionInStream.valueOf(initialPositionOverride); } LOG.info("Using initial position " + initialPositionInStream.toString() + " (if a checkpoint is not found)."); String redisEndpointOverride = properties.getProperty(ConfigKeys.REDIS_ENDPOINT); if (redisEndpointOverride != null) { redisEndpoint = redisEndpointOverride; } LOG.info("Using Redis endpoint " + redisEndpoint); String redisPortOverride = properties.getProperty(ConfigKeys.REDIS_PORT); if (redisPortOverride != null) { try { redisPort = Integer.parseInt(redisPortOverride); } catch(Exception e) { } } LOG.info("Using Redis port " + redisPort); }
public void run() throws Exception { adapterClient = new AmazonDynamoDBStreamsAdapterClient(new ClientConfiguration()); adapterClient.setEndpoint(streamsEndpoint); dynamoDBClient = new AmazonDynamoDBClient(new ClientConfiguration()); dynamoDBClient.setEndpoint(dynamodbEndpoint); cloudWatchClient = new AmazonCloudWatchClient(dynamoDBCredentials, new ClientConfiguration()); TcpDiscoverySpi spi = new TcpDiscoverySpi(); TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); List<String> hostList = Arrays.asList(Properties.getString("hostList").split(",")); ipFinder.setAddresses(hostList); spi.setIpFinder(ipFinder); IgniteConfiguration cfg = new IgniteConfiguration(); cfg.setDiscoverySpi(spi); cfg.setClientMode(true); cfg.setPeerClassLoadingEnabled(true); @SuppressWarnings("unused") Ignite ignite = Ignition.start(cfg); cache = Ignition.ignite().cache(Properties.getString("cacheName")); LOG.info(">>> cache acquired"); recordProcessorFactory = new StreamsRecordProcessorFactory(cache); workerConfig = new KinesisClientLibConfiguration(Properties.getString("applicationName"), streamArn, streamsCredentials, "ddbstreamsworker") .withMaxRecords(Integer.parseInt(Properties.getString("maxRecords"))) .withInitialPositionInStream( InitialPositionInStream.valueOf(Properties.getString("initialPositionInStream"))); LOG.info("Creating worker for stream: " + streamArn); worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient); LOG.info("Starting worker..."); int exitCode = 0; try { worker.run(); } catch (Throwable t) { LOG.error("Caught throwable while processing data."); t.printStackTrace(); exitCode = 1; } System.exit(exitCode); }
/** * @param args */ public static void main(String[] args) throws Exception { System.out.println("Starting demo..."); String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; streamsCredentials = new ProfileCredentialsProvider(); dynamoDBCredentials = new ProfileCredentialsProvider(); recordProcessorFactory = new StreamsRecordProcessorFactory(dynamoDBCredentials, dynamodbEndpoint, serviceName, destTable); /* ===== REQUIRED ===== * Users will have to explicitly instantiate and configure the adapter, then pass it to * the KCL worker. */ adapterClient = new AmazonDynamoDBStreamsAdapterClient(streamsCredentials, new ClientConfiguration()); adapterClient.setEndpoint(streamsEndpoint); dynamoDBClient = new AmazonDynamoDBClient(dynamoDBCredentials, new ClientConfiguration()); dynamoDBClient.setEndpoint(dynamodbEndpoint); cloudWatchClient = new AmazonCloudWatchClient(dynamoDBCredentials, new ClientConfiguration()); setUpTables(); workerConfig = new KinesisClientLibConfiguration("streams-adapter-demo", streamArn, streamsCredentials, "streams-demo-worker") .withMaxRecords(1) .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); System.out.println("Creating worker for stream: " + streamArn); worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient); System.out.println("Starting worker..."); Thread t = new Thread(worker); t.start(); Thread.sleep(25000); worker.shutdown(); t.join(); if(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems().equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); } System.out.println("Done."); cleanupAndExit(0); }