@Inject public AWSKinesisClickhouseEventStore(AWSConfig config, ProjectConfig projectConfig, ClickHouseConfig clickHouseConfig) { kinesis = new AmazonKinesisClient(config.getCredentials()); kinesis.setRegion(config.getAWSRegion()); if (config.getKinesisEndpoint() != null) { kinesis.setEndpoint(config.getKinesisEndpoint()); } this.config = config; this.projectConfig = projectConfig; this.bulkClient = new ClickHouseEventStore(projectConfig, clickHouseConfig); KinesisProducerConfiguration producerConfiguration = new KinesisProducerConfiguration() .setRegion(config.getRegion()) .setCredentialsProvider(config.getCredentials()); // producer = new KinesisProducer(producerConfiguration); }
private KinesisProducer getKinesisProducer() { KinesisProducerConfiguration config = new KinesisProducerConfiguration(); config.setRegion(regionName); config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain()); config.setMaxConnections(maxConnections); config.setAggregationEnabled(aggregration); // Limits the maximum allowed put rate for a shard, as a percentage of // the // backend limits. config.setRateLimit(rateLimit); // Maximum amount of time (milliseconds) a record may spend being // buffered // before it gets sent. Records may be sent sooner than this depending // on the // other buffering limits config.setRecordMaxBufferedTime(maxBufferedTime); // Set a time-to-live on records (milliseconds). Records that do not get // successfully put within the limit are failed. config.setRecordTtl(ttl); // Controls the number of metrics that are uploaded to CloudWatch. // Expected pattern: none|summary|detailed config.setMetricsLevel(metricsLevel); // Controls the granularity of metrics that are uploaded to CloudWatch. // Greater granularity produces more metrics. // Expected pattern: global|stream|shard config.setMetricsGranularity(metricsGranuality); // The namespace to upload metrics under. config.setMetricsNamespace(metricsNameSpace); return new KinesisProducer(config); }
/** * All what you need: * https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java * @return KinesisProducer instance used to put records. */ public KinesisProducer createKinesisProducer() { KinesisProducerConfiguration config = new KinesisProducerConfiguration(); config.setRegion(region); config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain()); config.setMaxConnections(24); // Raise it if you have expired records config.setRequestTimeout(60000); config.setAggregationEnabled(true); config.setAggregationMaxCount(2); // Usually a higher value is far more efficent config.setAggregationMaxSize(1024*100); config.setRecordMaxBufferedTime(5000); producer = new KinesisProducer(config); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { log.info("Flushing remaining records."); producer.flushSync(); log.info("All records flushed."); producer.destroy(); log.info("Producer finished."); } }) { }); return producer; }
@Override public void open(Configuration parameters) throws Exception { super.open(parameters); // check and pass the configuration properties KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps); producer = getKinesisProducer(producerConfig); callback = new FutureCallback<UserRecordResult>() { @Override public void onSuccess(UserRecordResult result) { if (!result.isSuccessful()) { if (failOnError) { // only remember the first thrown exception if (thrownException == null) { thrownException = new RuntimeException("Record was not sent successful"); } } else { LOG.warn("Record was not sent successful"); } } } @Override public void onFailure(Throwable t) { if (failOnError) { thrownException = t; } else { LOG.warn("An exception occurred while processing a record", t); } } }; if (this.customPartitioner != null) { this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); } LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion()); }
/** * Validate configuration properties for {@link FlinkKinesisProducer}, * and return a constructed KinesisProducerConfiguration. */ public static KinesisProducerConfiguration getValidatedProducerConfiguration(Properties config) { checkNotNull(config, "config can not be null"); validateAwsConfiguration(config); KinesisProducerConfiguration kpc = KinesisProducerConfiguration.fromProperties(config); kpc.setRegion(config.getProperty(AWSConfigConstants.AWS_REGION)); kpc.setCredentialsProvider(AWSUtil.getCredentialsProvider(config)); // we explicitly lower the credential refresh delay (default is 5 seconds) // to avoid an ignorable interruption warning that occurs when shutting down the // KPL client. See https://github.com/awslabs/amazon-kinesis-producer/issues/10. kpc.setCredentialsRefreshDelay(100); // Override default values if they aren't specified by users if (!config.containsKey(RATE_LIMIT)) { kpc.setRateLimit(DEFAULT_RATE_LIMIT); } if (!config.containsKey(THREADING_MODEL)) { kpc.setThreadingModel(DEFAULT_THREADING_MODEL); } if (!config.containsKey(THREAD_POOL_SIZE)) { kpc.setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE); } return kpc; }
@Test public void testRateLimitInProducerConfiguration() { Properties testConfig = new Properties(); testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); assertEquals(100, kpc.getRateLimit()); testConfig.setProperty(KinesisConfigUtil.RATE_LIMIT, "150"); kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); assertEquals(150, kpc.getRateLimit()); }
@Test public void testThreadingModelInProducerConfiguration() { Properties testConfig = new Properties(); testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); assertEquals(KinesisProducerConfiguration.ThreadingModel.POOLED, kpc.getThreadingModel()); testConfig.setProperty(KinesisConfigUtil.THREADING_MODEL, "PER_REQUEST"); kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); assertEquals(KinesisProducerConfiguration.ThreadingModel.PER_REQUEST, kpc.getThreadingModel()); }
@Test public void testThreadPoolSizeInProducerConfiguration() { Properties testConfig = new Properties(); testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); assertEquals(10, kpc.getThreadPoolSize()); testConfig.setProperty(KinesisConfigUtil.THREAD_POOL_SIZE, "12"); kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); assertEquals(12, kpc.getThreadPoolSize()); }
@Test public void testCorrectlySetRegionInProducerConfiguration() { String region = "us-east-1"; Properties testConfig = new Properties(); testConfig.setProperty(AWSConfigConstants.AWS_REGION, region); KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig); assertEquals("incorrect region", region, kpc.getRegion()); }
public KinesisEventConsumer(String propertiesFile, String streamName, String appName, String initialPosition) { KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile(propertiesFile); InitialPositionInStream position = InitialPositionInStream.valueOf(initialPosition); KinesisClientLibConfiguration clientConfig = new KinesisClientLibConfiguration(appName, streamName, new DefaultAWSCredentialsProviderChain(), appName) .withRegionName(config.getRegion()) .withInitialPositionInStream(position); this.builder = new Worker.Builder().recordProcessorFactory(this).config(clientConfig); }
public KinesisEventProducer(String format, CommandLine cmd, String propertiesFile, String streamName, int throttleQueueSize, String appName) { this(throttleQueueSize); KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile(propertiesFile); this.streamName = streamName; this.producer = new KinesisProducer(config); this.metrics = new KouplerMetrics(this, config, appName); this.format = FormatFactory.getFormat(format, cmd); }
@Inject public AWSKinesisEventStore(AWSConfig config, Metastore metastore, FieldDependency fieldDependency) { kinesis = new AmazonKinesisAsyncClient(config.getCredentials()); kinesis.setRegion(config.getAWSRegion()); if (config.getKinesisEndpoint() != null) { kinesis.setEndpoint(config.getKinesisEndpoint()); } this.config = config; this.bulkClient = new S3BulkEventStore(metastore, config, fieldDependency); KinesisProducerConfiguration producerConfiguration = new KinesisProducerConfiguration() .setRegion(config.getRegion()) .setCredentialsProvider(config.getCredentials()); if (config.getKinesisEndpoint() != null) { try { URL url = new URL(config.getKinesisEndpoint()); producerConfiguration.setKinesisEndpoint(url.getHost()); producerConfiguration.setKinesisPort(url.getPort()); producerConfiguration.setVerifyCertificate(false); } catch (MalformedURLException e) { throw new IllegalStateException(String.format("Kinesis endpoint is invalid: %s", config.getKinesisEndpoint())); } } producer = new KinesisProducer(producerConfiguration); }
/** * Creates a {@link KinesisProducer}. * Exposed so that tests can inject mock producers easily. */ @VisibleForTesting protected KinesisProducer getKinesisProducer(KinesisProducerConfiguration producerConfig) { return new KinesisProducer(producerConfig); }
@Override protected KinesisProducer getKinesisProducer(KinesisProducerConfiguration producerConfig) { return mockProducer; }
@Override public void open(Configuration parameters) throws Exception { super.open(parameters); KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION)); producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) { producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps, ProducerConfigConstants.COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG)); } if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) { producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps, ProducerConfigConstants.AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG)); } producer = new KinesisProducer(producerConfig); callback = new FutureCallback<UserRecordResult>() { @Override public void onSuccess(UserRecordResult result) { if (!result.isSuccessful()) { if(failOnError) { thrownException = new RuntimeException("Record was not sent successful"); } else { LOG.warn("Record was not sent successful"); } } } @Override public void onFailure(Throwable t) { if (failOnError) { thrownException = t; } else { LOG.warn("An exception occurred while processing a record", t); } } }; if (this.customPartitioner != null) { this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); } LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion()); }
public KouplerMetrics(KinesisEventProducer producer, KinesisProducerConfiguration config, String appName) { this(producer, appName); cloudWatch = new AmazonCloudWatchClient(); Region region = Region.getRegion(Regions.fromName(config.getRegion())); cloudWatch.setRegion(region); }
protected AdvancedKPLClickEventsToKinesis( BlockingQueue<ClickEvent> inputQueue) { super(inputQueue); kinesis = new KinesisProducer(new KinesisProducerConfiguration() .setRegion(REGION)); }
public KPLClickEventsToKinesis(BlockingQueue<ClickEvent> inputQueue) { super(inputQueue); kinesis = new KinesisProducer(new KinesisProducerConfiguration() .setRegion(REGION) .setRecordMaxBufferedTime(5000)); }