Java 类com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration 实例源码

项目:rakam    文件:AWSKinesisClickhouseEventStore.java   
@Inject
    public AWSKinesisClickhouseEventStore(AWSConfig config, ProjectConfig projectConfig, ClickHouseConfig clickHouseConfig) {
        kinesis = new AmazonKinesisClient(config.getCredentials());
        kinesis.setRegion(config.getAWSRegion());
        if (config.getKinesisEndpoint() != null) {
            kinesis.setEndpoint(config.getKinesisEndpoint());
        }
        this.config = config;
        this.projectConfig = projectConfig;
        this.bulkClient = new ClickHouseEventStore(projectConfig, clickHouseConfig);

        KinesisProducerConfiguration producerConfiguration = new KinesisProducerConfiguration()
                .setRegion(config.getRegion())
                .setCredentialsProvider(config.getCredentials());
//        producer = new KinesisProducer(producerConfiguration);
    }
项目:kinesis-kafka-connector    文件:AmazonKinesisSinkTask.java   
private KinesisProducer getKinesisProducer() {
    KinesisProducerConfiguration config = new KinesisProducerConfiguration();
    config.setRegion(regionName);
    config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain());
    config.setMaxConnections(maxConnections);

    config.setAggregationEnabled(aggregration);

    // Limits the maximum allowed put rate for a shard, as a percentage of
    // the
    // backend limits.
    config.setRateLimit(rateLimit);

    // Maximum amount of time (milliseconds) a record may spend being
    // buffered
    // before it gets sent. Records may be sent sooner than this depending
    // on the
    // other buffering limits
    config.setRecordMaxBufferedTime(maxBufferedTime);

    // Set a time-to-live on records (milliseconds). Records that do not get
    // successfully put within the limit are failed.
    config.setRecordTtl(ttl);

    // Controls the number of metrics that are uploaded to CloudWatch.
    // Expected pattern: none|summary|detailed
    config.setMetricsLevel(metricsLevel);

    // Controls the granularity of metrics that are uploaded to CloudWatch.
    // Greater granularity produces more metrics.
    // Expected pattern: global|stream|shard
    config.setMetricsGranularity(metricsGranuality);

    // The namespace to upload metrics under.
    config.setMetricsNamespace(metricsNameSpace);

    return new KinesisProducer(config);

}
项目:aws-kinesis-zombies    文件:Drone.java   
/**
 * All what you need: 
 * https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java
 * @return KinesisProducer instance used to put records.
 */
public KinesisProducer createKinesisProducer() {
    KinesisProducerConfiguration config = new KinesisProducerConfiguration();

    config.setRegion(region);
    config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain());
    config.setMaxConnections(24);           // Raise it if you have expired records
    config.setRequestTimeout(60000);        
    config.setAggregationEnabled(true); 
    config.setAggregationMaxCount(2);       // Usually a higher value is far more efficent
    config.setAggregationMaxSize(1024*100);
    config.setRecordMaxBufferedTime(5000);
    producer = new KinesisProducer(config);

    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
        @Override
        public void run() {
            log.info("Flushing remaining records.");
            producer.flushSync();
            log.info("All records flushed.");
            producer.destroy();
            log.info("Producer finished.");
        }
    }) {
    });

    return producer;
}
项目:flink    文件:FlinkKinesisProducer.java   
@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);

    // check and pass the configuration properties
    KinesisProducerConfiguration producerConfig = KinesisConfigUtil.getValidatedProducerConfiguration(configProps);

    producer = getKinesisProducer(producerConfig);
    callback = new FutureCallback<UserRecordResult>() {
        @Override
        public void onSuccess(UserRecordResult result) {
            if (!result.isSuccessful()) {
                if (failOnError) {
                    // only remember the first thrown exception
                    if (thrownException == null) {
                        thrownException = new RuntimeException("Record was not sent successful");
                    }
                } else {
                    LOG.warn("Record was not sent successful");
                }
            }
        }

        @Override
        public void onFailure(Throwable t) {
            if (failOnError) {
                thrownException = t;
            } else {
                LOG.warn("An exception occurred while processing a record", t);
            }
        }
    };

    if (this.customPartitioner != null) {
        this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
    }

    LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion());
}
项目:flink    文件:KinesisConfigUtil.java   
/**
 * Validate configuration properties for {@link FlinkKinesisProducer},
 * and return a constructed KinesisProducerConfiguration.
 */
public static KinesisProducerConfiguration getValidatedProducerConfiguration(Properties config) {
    checkNotNull(config, "config can not be null");

    validateAwsConfiguration(config);

    KinesisProducerConfiguration kpc = KinesisProducerConfiguration.fromProperties(config);
    kpc.setRegion(config.getProperty(AWSConfigConstants.AWS_REGION));

    kpc.setCredentialsProvider(AWSUtil.getCredentialsProvider(config));

    // we explicitly lower the credential refresh delay (default is 5 seconds)
    // to avoid an ignorable interruption warning that occurs when shutting down the
    // KPL client. See https://github.com/awslabs/amazon-kinesis-producer/issues/10.
    kpc.setCredentialsRefreshDelay(100);

    // Override default values if they aren't specified by users
    if (!config.containsKey(RATE_LIMIT)) {
        kpc.setRateLimit(DEFAULT_RATE_LIMIT);
    }
    if (!config.containsKey(THREADING_MODEL)) {
        kpc.setThreadingModel(DEFAULT_THREADING_MODEL);
    }
    if (!config.containsKey(THREAD_POOL_SIZE)) {
        kpc.setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE);
    }

    return kpc;
}
项目:flink    文件:KinesisConfigUtilTest.java   
@Test
public void testRateLimitInProducerConfiguration() {
    Properties testConfig = new Properties();
    testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
    KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);

    assertEquals(100, kpc.getRateLimit());

    testConfig.setProperty(KinesisConfigUtil.RATE_LIMIT, "150");
    kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);

    assertEquals(150, kpc.getRateLimit());
}
项目:flink    文件:KinesisConfigUtilTest.java   
@Test
public void testThreadingModelInProducerConfiguration() {
    Properties testConfig = new Properties();
    testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
    KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);

    assertEquals(KinesisProducerConfiguration.ThreadingModel.POOLED, kpc.getThreadingModel());

    testConfig.setProperty(KinesisConfigUtil.THREADING_MODEL, "PER_REQUEST");
    kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);

    assertEquals(KinesisProducerConfiguration.ThreadingModel.PER_REQUEST, kpc.getThreadingModel());
}
项目:flink    文件:KinesisConfigUtilTest.java   
@Test
public void testThreadPoolSizeInProducerConfiguration() {
    Properties testConfig = new Properties();
    testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
    KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);

    assertEquals(10, kpc.getThreadPoolSize());

    testConfig.setProperty(KinesisConfigUtil.THREAD_POOL_SIZE, "12");
    kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);

    assertEquals(12, kpc.getThreadPoolSize());
}
项目:flink    文件:KinesisConfigUtilTest.java   
@Test
public void testCorrectlySetRegionInProducerConfiguration() {
    String region = "us-east-1";
    Properties testConfig = new Properties();
    testConfig.setProperty(AWSConfigConstants.AWS_REGION, region);
    KinesisProducerConfiguration kpc = KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);

    assertEquals("incorrect region", region, kpc.getRegion());
}
项目:koupler    文件:KinesisEventConsumer.java   
public KinesisEventConsumer(String propertiesFile, String streamName, String appName, String initialPosition) {
    KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile(propertiesFile);

    InitialPositionInStream position = InitialPositionInStream.valueOf(initialPosition);

    KinesisClientLibConfiguration clientConfig = new KinesisClientLibConfiguration(appName, streamName,
            new DefaultAWSCredentialsProviderChain(), appName)
                    .withRegionName(config.getRegion())
                    .withInitialPositionInStream(position);

    this.builder = new Worker.Builder().recordProcessorFactory(this).config(clientConfig);
}
项目:koupler    文件:KinesisEventProducer.java   
public KinesisEventProducer(String format, CommandLine cmd,
                            String propertiesFile, String streamName,
                            int throttleQueueSize, String appName) {
    this(throttleQueueSize);
    KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile(propertiesFile);
    this.streamName = streamName;
    this.producer = new KinesisProducer(config);
    this.metrics = new KouplerMetrics(this, config, appName);
    this.format = FormatFactory.getFormat(format, cmd);
}
项目:rakam    文件:AWSKinesisEventStore.java   
@Inject
public AWSKinesisEventStore(AWSConfig config,
                            Metastore metastore,
                            FieldDependency fieldDependency) {
    kinesis = new AmazonKinesisAsyncClient(config.getCredentials());
    kinesis.setRegion(config.getAWSRegion());
    if (config.getKinesisEndpoint() != null) {
        kinesis.setEndpoint(config.getKinesisEndpoint());
    }
    this.config = config;
    this.bulkClient = new S3BulkEventStore(metastore, config, fieldDependency);

    KinesisProducerConfiguration producerConfiguration = new KinesisProducerConfiguration()
            .setRegion(config.getRegion())
            .setCredentialsProvider(config.getCredentials());
    if (config.getKinesisEndpoint() != null) {
        try {
            URL url = new URL(config.getKinesisEndpoint());
            producerConfiguration.setKinesisEndpoint(url.getHost());
            producerConfiguration.setKinesisPort(url.getPort());
            producerConfiguration.setVerifyCertificate(false);
        } catch (MalformedURLException e) {
            throw new IllegalStateException(String.format("Kinesis endpoint is invalid: %s", config.getKinesisEndpoint()));
        }
    }
    producer = new KinesisProducer(producerConfiguration);
}
项目:flink    文件:FlinkKinesisProducer.java   
/**
 * Creates a {@link KinesisProducer}.
 * Exposed so that tests can inject mock producers easily.
 */
@VisibleForTesting
protected KinesisProducer getKinesisProducer(KinesisProducerConfiguration producerConfig) {
    return new KinesisProducer(producerConfig);
}
项目:flink    文件:FlinkKinesisProducerTest.java   
@Override
protected KinesisProducer getKinesisProducer(KinesisProducerConfiguration producerConfig) {
    return mockProducer;
}
项目:flink    文件:FlinkKinesisProducer.java   
@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);

    KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration();

    producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
    producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
    if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
        producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps,
                ProducerConfigConstants.COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG));
    }
    if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) {
        producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps,
                ProducerConfigConstants.AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG));
    }

    producer = new KinesisProducer(producerConfig);
    callback = new FutureCallback<UserRecordResult>() {
        @Override
        public void onSuccess(UserRecordResult result) {
            if (!result.isSuccessful()) {
                if(failOnError) {
                    thrownException = new RuntimeException("Record was not sent successful");
                } else {
                    LOG.warn("Record was not sent successful");
                }
            }
        }

        @Override
        public void onFailure(Throwable t) {
            if (failOnError) {
                thrownException = t;
            } else {
                LOG.warn("An exception occurred while processing a record", t);
            }
        }
    };

    if (this.customPartitioner != null) {
        this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
    }

    LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion());
}
项目:koupler    文件:KouplerMetrics.java   
public KouplerMetrics(KinesisEventProducer producer, KinesisProducerConfiguration config, String appName) {
    this(producer, appName);
    cloudWatch = new AmazonCloudWatchClient();
    Region region = Region.getRegion(Regions.fromName(config.getRegion()));
    cloudWatch.setRegion(region);
}
项目:aws-big-data-blog    文件:AdvancedKPLClickEventsToKinesis.java   
protected AdvancedKPLClickEventsToKinesis(
        BlockingQueue<ClickEvent> inputQueue) {
    super(inputQueue);
    kinesis = new KinesisProducer(new KinesisProducerConfiguration()
            .setRegion(REGION));
}
项目:aws-big-data-blog    文件:KPLClickEventsToKinesis.java   
public KPLClickEventsToKinesis(BlockingQueue<ClickEvent> inputQueue) {
    super(inputQueue);
    kinesis = new KinesisProducer(new KinesisProducerConfiguration()
            .setRegion(REGION)
            .setRecordMaxBufferedTime(5000));
}