@Override protected void validateStreamName(AmazonKinesisAsyncClient client, String streamName) { DescribeStreamResult describeResult = null; try { describeResult = getClient().describeStream(streamName); String streamStatus = describeResult.getStreamDescription().getStreamStatus(); if(!StreamStatus.ACTIVE.name().equals(streamStatus) && !StreamStatus.UPDATING.name().equals(streamStatus)) { setInitializationFailed(true); addError("Stream " + streamName + " is not ready (in active/updating status) for appender: " + name); } } catch(ResourceNotFoundException rnfe) { setInitializationFailed(true); addError("Stream " + streamName + " doesn't exist for appender: " + name, rnfe); } }
private AmazonKinesisAsyncClient createClient() { // Building Kinesis configuration int connectionTimeout = ClientConfiguration.DEFAULT_CONNECTION_TIMEOUT; int maxConnection = ClientConfiguration.DEFAULT_MAX_CONNECTIONS; RetryPolicy retryPolicy = ClientConfiguration.DEFAULT_RETRY_POLICY; int socketTimeout = ClientConfiguration.DEFAULT_SOCKET_TIMEOUT; boolean useReaper = ClientConfiguration.DEFAULT_USE_REAPER; String userAgent = ClientConfiguration.DEFAULT_USER_AGENT; ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.setConnectionTimeout(connectionTimeout); clientConfiguration.setMaxConnections(maxConnection); clientConfiguration.setRetryPolicy(retryPolicy); clientConfiguration.setSocketTimeout(socketTimeout); clientConfiguration.setUseReaper(useReaper); clientConfiguration.setUserAgent(userAgent); // Reading credentials from ENV-variables AWSCredentialsProvider awsCredentialsProvider = new DefaultAWSCredentialsProviderChain(); // Configuring Kinesis-client with configuration AmazonKinesisAsyncClient kinesisAsyncClient = new AmazonKinesisAsyncClient(awsCredentialsProvider, clientConfiguration); Regions myRegion = Regions.fromName(AmazonUtil.getInstance().getRegion()); kinesisAsyncClient.withRegion(Region.getRegion(myRegion)); return kinesisAsyncClient; }
@Inject public AWSKinesisEventStore(AWSConfig config, Metastore metastore, FieldDependency fieldDependency) { kinesis = new AmazonKinesisAsyncClient(config.getCredentials()); kinesis.setRegion(config.getAWSRegion()); if (config.getKinesisEndpoint() != null) { kinesis.setEndpoint(config.getKinesisEndpoint()); } this.config = config; this.bulkClient = new S3BulkEventStore(metastore, config, fieldDependency); KinesisProducerConfiguration producerConfiguration = new KinesisProducerConfiguration() .setRegion(config.getRegion()) .setCredentialsProvider(config.getCredentials()); if (config.getKinesisEndpoint() != null) { try { URL url = new URL(config.getKinesisEndpoint()); producerConfiguration.setKinesisEndpoint(url.getHost()); producerConfiguration.setKinesisPort(url.getPort()); producerConfiguration.setVerifyCertificate(false); } catch (MalformedURLException e) { throw new IllegalStateException(String.format("Kinesis endpoint is invalid: %s", config.getKinesisEndpoint())); } } producer = new KinesisProducer(producerConfiguration); }
@Override protected void doStart() { kinesis = new AmazonKinesisAsyncClient( getCredentials(), getClientConfiguration(), AppenderExecutors.newExecutor(this, getThreadPoolSize()) ); kinesis.setRegion(RegionUtils.getRegion(region)); }
@Override public void open(VDSConfiguration ctx) throws Exception { String accessID = ctx.getString(ACCESS_KEY); String secretKey = ctx.getString(SECRET_KEY); _streamName = ctx.getString(STREAM_NAME); int tcount = ctx.optInt(THREAD_COUNT, 5); _threadpool = new ThreadPoolExecutor(tcount, tcount, 10, TimeUnit.SECONDS, new ArrayBlockingQueue(100),new ThreadPoolExecutor.CallerRunsPolicy()); // TODO: make the queue length configurable BasicAWSCredentials creds = new BasicAWSCredentials(accessID, secretKey); _client = new AmazonKinesisAsyncClient(creds, _threadpool); _scheduler.scheduleAtFixedRate(_callback, 10, 10, TimeUnit.SECONDS); // TODO: make this configurable? _logger.info("Created connection to AWS Kinesis"); _logger.info("Stream name: " + _streamName); }
private AmazonKinesisAsync createClient(ClientConfiguration config, MapWrap configuration) { AmazonKinesisAsync kinesisClient = new AmazonKinesisAsyncClient(getAwsCredentialsProvider( configuration, config), config, Executors.newFixedThreadPool(config.getMaxConnections(), new ThreadFactoryBuilder().setNameFormat("lumbermill-async-kinesis-%d").build())); Regions region = Regions.fromName(configuration.asString("region", "eu-west-1")); kinesisClient.setRegion(Region.getRegion(region)); if (configuration.exists("endpoint")) { String endpoint = configuration.asString("endpoint"); if (endpoint.length() > 0) { kinesisClient.setEndpoint(endpoint); } } return kinesisClient; }
@Override protected AmazonKinesisAsyncClient createClient(AWSCredentialsProvider credentials, ClientConfiguration configuration, ThreadPoolExecutor executor) { return new AmazonKinesisAsyncClient(credentials, configuration, executor); }
private AmazonKinesisAsyncClient createClient() { // Building Kinesis configuration int connectionTimeout = getOptionalIntConfig(CONNECTION_TIMEOUT, ClientConfiguration.DEFAULT_CONNECTION_TIMEOUT); int maxConnection = getOptionalIntConfig(MAX_CONNECTION, ClientConfiguration.DEFAULT_MAX_CONNECTIONS); // TODO: replace default retry policy RetryPolicy retryPolicy = ClientConfiguration.DEFAULT_RETRY_POLICY; int socketTimeout = getOptionalIntConfig(SOCKET_TIMEOUT, ClientConfiguration.DEFAULT_SOCKET_TIMEOUT); boolean useReaper = getOptionalBooleanConfig(USE_REAPER, ClientConfiguration.DEFAULT_USE_REAPER); String userAgent = getOptionalStringConfig(USER_AGENT, ClientConfiguration.DEFAULT_USER_AGENT); String endpoint = getOptionalStringConfig(ENDPOINT, null); streamName = getMandatoryStringConfig(STREAM_NAME); partitionKey = getMandatoryStringConfig(PARTITION_KEY); region = getMandatoryStringConfig(REGION); logger.info(" --- Stream name: " + streamName); logger.info(" --- Partition key: " + partitionKey); logger.info(" --- Region: " + region); if(endpoint != null) { logger.info(" --- Endpoint: " + endpoint); } ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.setConnectionTimeout(connectionTimeout); clientConfiguration.setMaxConnections(maxConnection); clientConfiguration.setRetryPolicy(retryPolicy); clientConfiguration.setSocketTimeout(socketTimeout); clientConfiguration.setUseReaper(useReaper); clientConfiguration.setUserAgent(userAgent); /* AWS credentials provider chain that looks for credentials in this order: Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY Java System Properties - aws.accessKeyId and aws.secretKey Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI Instance profile credentials delivered through the Amazon EC2 metadata service */ AWSCredentialsProvider awsCredentialsProvider = new DefaultAWSCredentialsProviderChain(); // Configuring Kinesis-client with configuration AmazonKinesisAsyncClient kinesisAsyncClient = new AmazonKinesisAsyncClient(awsCredentialsProvider, clientConfiguration); Region awsRegion = RegionUtils.getRegion(region); kinesisAsyncClient.setRegion(awsRegion); if(endpoint != null) { kinesisAsyncClient.setEndpoint(endpoint); } return kinesisAsyncClient; }
/** * Configures this appender instance and makes it ready for use by the * consumers. It validates mandatory parameters and confirms if the configured * stream is ready for publishing data yet. * * Error details are made available through the fallback handler for this * appender * * @throws IllegalStateException * if we encounter issues configuring this appender instance */ @Override public void activateOptions() { if (streamName == null) { initializationFailed = true; error("Invalid configuration - streamName cannot be null for appender: " + name); } if (layout == null) { initializationFailed = true; error("Invalid configuration - No layout for appender: " + name); } ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration = setProxySettingsFromSystemProperties(clientConfiguration); clientConfiguration.setMaxErrorRetry(maxRetries); clientConfiguration.setRetryPolicy(new RetryPolicy(PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION, PredefinedRetryPolicies.DEFAULT_BACKOFF_STRATEGY, maxRetries, true)); clientConfiguration.setUserAgent(AppenderConstants.USER_AGENT_STRING); BlockingQueue<Runnable> taskBuffer = new LinkedBlockingDeque<Runnable>(bufferSize); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(threadCount, threadCount, AppenderConstants.DEFAULT_THREAD_KEEP_ALIVE_SEC, TimeUnit.SECONDS, taskBuffer, new BlockFastProducerPolicy()); threadPoolExecutor.prestartAllCoreThreads(); kinesisClient = new AmazonKinesisAsyncClient(new CustomCredentialsProviderChain(), clientConfiguration, threadPoolExecutor); boolean regionProvided = !Validator.isBlank(region); if (!regionProvided) { region = AppenderConstants.DEFAULT_REGION; } if (!Validator.isBlank(endpoint)) { if (regionProvided) { LOGGER .warn("Received configuration for both region as well as Amazon Kinesis endpoint. (" + endpoint + ") will be used as endpoint instead of default endpoint for region (" + region + ")"); } kinesisClient.setEndpoint(endpoint, AppenderConstants.DEFAULT_SERVICE_NAME, region); } else { kinesisClient.setRegion(Region.getRegion(Regions.fromName(region))); } DescribeStreamResult describeResult = null; try { describeResult = kinesisClient.describeStream(streamName); String streamStatus = describeResult.getStreamDescription().getStreamStatus(); if (!StreamStatus.ACTIVE.name().equals(streamStatus) && !StreamStatus.UPDATING.name().equals(streamStatus)) { initializationFailed = true; error("Stream " + streamName + " is not ready (in active/updating status) for appender: " + name); } } catch (ResourceNotFoundException rnfe) { initializationFailed = true; error("Stream " + streamName + " doesn't exist for appender: " + name, rnfe); } asyncCallHander = new AsyncPutCallStatsReporter(name); }