public void start() { int mb = 1024 * 1024; LOG.info("Max memory: {} mb", Runtime.getRuntime().maxMemory() / mb); LOG.info("Starting up Kinesis Consumer... (may take a few seconds)"); AmazonKinesisClient kinesisClient = new AmazonKinesisClient(kinesisCfg.getKinesisCredentialsProvider(), kinesisCfg.getKinesisClientConfiguration()); AmazonDynamoDBClient dynamoDBClient = new AmazonDynamoDBClient(kinesisCfg.getDynamoDBCredentialsProvider(), kinesisCfg.getDynamoDBClientConfiguration()); AmazonCloudWatch cloudWatchClient = new AmazonCloudWatchClient(kinesisCfg.getCloudWatchCredentialsProvider(), kinesisCfg.getCloudWatchClientConfiguration()); Worker worker = new Worker.Builder() .recordProcessorFactory(() -> new RecordProcessor(unitOfWorkListener, exceptionStrategy, metricsCallback, dry)) .config(kinesisCfg) .kinesisClient(kinesisClient) .dynamoDBClient(dynamoDBClient) .cloudWatchClient(cloudWatchClient) .build(); worker.run(); }
/** * Creates an Amazon Kinesis Client. * @param configProps configuration properties containing the access key, secret key, and region * @return a new Amazon Kinesis Client */ public static AmazonKinesisClient createKinesisClient(Properties configProps) { // set a Flink-specific user agent ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig(); awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() + " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector"); // utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider AmazonKinesisClient client = new AmazonKinesisClient( AWSUtil.getCredentialsProvider(configProps), awsClientConfig); client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)))); if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) { client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT)); } return client; }
private void sendDataToKinesis() { AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain(); AmazonKinesisClient amazonKinesisClient = new AmazonKinesisClient(credentialsProvider); amazonKinesisClient.setRegion(Region.getRegion(Regions.fromName("eu-west-1"))); PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(TestConstants.stream); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf("This is just a test").getBytes())); putRecordsRequestEntry.setPartitionKey("partitionKey-1"); putRecordsRequestEntryList.add(putRecordsRequestEntry); putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); logger.info("Put Result" + putRecordsResult); }
private void deleteKinesisStream() { AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain(); AmazonKinesisClient amazonKinesisClient = new AmazonKinesisClient(credentialsProvider); amazonKinesisClient.setRegion(Region.getRegion(Regions.fromName("eu-west-1"))); DeleteStreamRequest createStreamRequest = new DeleteStreamRequest(); createStreamRequest.setStreamName(TestConstants.stream); amazonKinesisClient.deleteStream(createStreamRequest); DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(TestConstants.stream); logger.info("Stream " + TestConstants.stream + " deleted"); }
public static void main(String[] args) throws Exception { verify(args); String stream = args[0]; Region region = RegionUtils.getRegion(args[1]); AWSCredentials credentials = getCreds(); AmazonKinesis client = new AmazonKinesisClient(credentials, CLIENT_CONF); client.setRegion(region); checkStream(client.describeStream(stream)); System.out.println("Let's start putting records!"); Random rnd = new Random(System.currentTimeMillis()); for (;;) { putEventRecord(client, stream); Thread.sleep(rnd.nextInt(500) + 650); } }
/** * Collect data for Kinesis. * * @param stats * current statistics object. * @param account * currently used credentials object. * @param region * currently used aws region. */ public static void scanKinesis(AwsStats stats, AwsAccount account, Regions region) { LOG.debug("Scan for Kinesis in region " + region.getName() + " in account " + account.getAccountId()); try { AmazonKinesis kinesis = new AmazonKinesisClient(account.getCredentials()); kinesis.setRegion(Region.getRegion(region)); List<String> list = kinesis.listStreams().getStreamNames(); int totalItems = list.size(); for (String streamName : list) { stats.add(new AwsResource(streamName, account.getAccountId(), AwsResourceType.Kinesis, region)); } LOG.info(totalItems + " Kinesis streams in region " + region.getName() + " in account " + account.getAccountId()); } catch (AmazonServiceException ase) { LOG.error("Exception of Kinesis: " + ase.getMessage()); } }
/** * Gets a list of all Amazon Kinesis streams * * @param kinesisClient * The {@link AmazonKinesisClient} with Amazon Kinesis read privileges * @return list of Amazon Kinesis streams */ public static List<String> listAllStreams(AmazonKinesisClient kinesisClient) { ListStreamsRequest listStreamsRequest = new ListStreamsRequest(); listStreamsRequest.setLimit(10); ListStreamsResult listStreamsResult = kinesisClient.listStreams(listStreamsRequest); List<String> streamNames = listStreamsResult.getStreamNames(); while (listStreamsResult.isHasMoreStreams()) { if (streamNames.size() > 0) { listStreamsRequest.setExclusiveStartStreamName(streamNames.get(streamNames.size() - 1)); } listStreamsResult = kinesisClient.listStreams(listStreamsRequest); streamNames.addAll(listStreamsResult.getStreamNames()); } return streamNames; }
@Inject KinesisClientManager(KinesisConnectorConfig kinesisConnectorConfig) { log.info("Creating new client for Consumer"); if (nonEmpty(kinesisConnectorConfig.getAccessKey()) && nonEmpty(kinesisConnectorConfig.getSecretKey())) { this.kinesisAwsCredentials = new KinesisAwsCredentials(kinesisConnectorConfig.getAccessKey(), kinesisConnectorConfig.getSecretKey()); this.client = new AmazonKinesisClient(this.kinesisAwsCredentials); this.amazonS3Client = new AmazonS3Client(this.kinesisAwsCredentials); this.dynamoDBClient = new AmazonDynamoDBClient(this.kinesisAwsCredentials); } else { this.kinesisAwsCredentials = null; DefaultAWSCredentialsProviderChain defaultChain = new DefaultAWSCredentialsProviderChain(); this.client = new AmazonKinesisClient(defaultChain); this.amazonS3Client = new AmazonS3Client(defaultChain); this.dynamoDBClient = new AmazonDynamoDBClient(defaultChain); } this.client.setEndpoint("kinesis." + kinesisConnectorConfig.getAwsRegion() + ".amazonaws.com"); this.dynamoDBClient.setEndpoint("dynamodb." + kinesisConnectorConfig.getAwsRegion() + ".amazonaws.com"); }
public static void main(String[] args) { AmazonKinesisClient kinesisClient = Helper.setupKinesisClient(); setupHosebirdClient(); hosebirdClient.connect(); while (!hosebirdClient.isDone()) { try { String tweetText = msgQueue.take(); // Add Data to a Stream PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(Helper.properties().getProperty("kinesisStreamName")); putRecordRequest.setData(ByteBuffer.wrap(tweetText.getBytes())); putRecordRequest.setPartitionKey(String.format("partitionKey-%s", "tweets")); PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest); System.out.println(String.format("Seq No: %s - %s", putRecordResult.getSequenceNumber(), tweetText)); } catch (InterruptedException e) { e.printStackTrace(); } } }
/** * Gets a list of all Amazon Kinesis streams * * @param kinesisClient The {@link com.amazonaws.services.kinesis.AmazonKinesisClient} with Amazon Kinesis read privileges * @return list of Amazon Kinesis streams */ public static List<String> listAllStreams(AmazonKinesisClient kinesisClient) { ListStreamsRequest listStreamsRequest = new ListStreamsRequest(); listStreamsRequest.setLimit(10); ListStreamsResult listStreamsResult = kinesisClient.listStreams(listStreamsRequest); List<String> streamNames = listStreamsResult.getStreamNames(); while (listStreamsResult.isHasMoreStreams()) { if (!streamNames.isEmpty()) { listStreamsRequest.setExclusiveStartStreamName(streamNames.get(streamNames.size() - 1)); } listStreamsResult = kinesisClient.listStreams(listStreamsRequest); streamNames.addAll(listStreamsResult.getStreamNames()); } return streamNames; }
public S3BulkEventStore(Metastore metastore, AWSConfig config, FieldDependencyBuilder.FieldDependency fieldDependency) { this.metastore = metastore; this.config = config; this.s3Client = new AmazonS3Client(config.getCredentials()); s3Client.setRegion(config.getAWSRegion()); if (config.getS3Endpoint() != null) { s3Client.setEndpoint(config.getS3Endpoint()); } kinesis = new AmazonKinesisClient(config.getCredentials()); kinesis.setRegion(config.getAWSRegion()); if (config.getKinesisEndpoint() != null) { kinesis.setEndpoint(config.getKinesisEndpoint()); } cloudWatchClient = new AmazonCloudWatchAsyncClient(config.getCredentials()); cloudWatchClient.setRegion(config.getAWSRegion()); this.conditionalMagicFieldsSize = fieldDependency.dependentFields.size(); }
@Inject public AWSKinesisClickhouseEventStore(AWSConfig config, ProjectConfig projectConfig, ClickHouseConfig clickHouseConfig) { kinesis = new AmazonKinesisClient(config.getCredentials()); kinesis.setRegion(config.getAWSRegion()); if (config.getKinesisEndpoint() != null) { kinesis.setEndpoint(config.getKinesisEndpoint()); } this.config = config; this.projectConfig = projectConfig; this.bulkClient = new ClickHouseEventStore(projectConfig, clickHouseConfig); KinesisProducerConfiguration producerConfiguration = new KinesisProducerConfiguration() .setRegion(config.getRegion()) .setCredentialsProvider(config.getCredentials()); // producer = new KinesisProducer(producerConfiguration); }
private void run(final int events, final OutputFormat format, final String streamName, final String region) throws Exception { AmazonKinesis kinesisClient = new AmazonKinesisClient( new DefaultAWSCredentialsProviderChain()); kinesisClient.setRegion(Region.getRegion(Regions.fromName(region))); int count = 0; SensorReading r = null; do { r = nextSensorReading(format); try { PutRecordRequest req = new PutRecordRequest() .withPartitionKey("" + rand.nextLong()) .withStreamName(streamName) .withData(ByteBuffer.wrap(r.toString().getBytes())); kinesisClient.putRecord(req); } catch (ProvisionedThroughputExceededException e) { Thread.sleep(BACKOFF); } System.out.println(r); count++; } while (count < events); }
/*** * * All input parameters are required * * @param topic * @param config * @param client * @param dynamoClient * @param cloudwatchClient */ public KinesisConsumer(final String topic, final KinesisClientLibConfiguration config, final AmazonKinesisClient client, final AmazonDynamoDBClient dynamoClient, final AmazonCloudWatchClient cloudwatchClient, final ObjectMapper mapper){ this.topic = Preconditions.checkNotNull(topic, "A valid kinesis topic is required"); this.config = Preconditions.checkNotNull(config, "KinesisClientLibConfiguration is required"); this.client = Preconditions.checkNotNull(client, "AmazonKinesisClient is required"); this.dynamoClient = Preconditions.checkNotNull(dynamoClient, "AmazonDynamoDBClient is required"); this.cloudwatchClient = Preconditions.checkNotNull(cloudwatchClient, "AmazonCloudWatchClient is required"); this.mapper = Preconditions.checkNotNull(mapper, "ObjectMapper is required"); }
@Override protected void createAWSClient() { client = tryClientFactory(config.clientFactoryMethod, AmazonKinesis.class, true); if ((client == null) && (config.clientEndpoint == null)) { client = tryClientFactory("com.amazonaws.services.kinesis.AmazonKinesisClientBuilder.defaultClient", AmazonKinesis.class, false); } if (client == null) { LogLog.debug(getClass().getSimpleName() + ": creating service client via constructor"); client = tryConfigureEndpointOrRegion(new AmazonKinesisClient(), config.clientEndpoint); } }
@Override public AmazonKinesis create(KinesisSourceConnectorConfig config) { AmazonKinesisClientBuilder builder = AmazonKinesisClient.builder() .withCredentials(config.awsCredentialsProvider()) .withRegion(config.kinesisRegion); return builder.build(); }
public void split(final String streamName, final String awsAccessKey, final String awsSecretKey, long secsToWait) throws InterruptedException { AWSCredentialsProvider creds = createAwsCredentialsProvider(awsAccessKey, awsSecretKey); AmazonKinesisClient client = new AmazonKinesisClient(creds); // Describes the stream to get the information about each shard. DescribeStreamResult result = client.describeStream(streamName); List<Shard> shards = result.getStreamDescription().getShards(); log.log(Level.INFO, "Splitting the Stream: [{0}], there are [{1}] shards to split.", new Object[]{streamName, shards.size()}); for (final Shard shard : shards) { // Gets the new shard start key. BigInteger startKey = new BigInteger(shard.getHashKeyRange().getStartingHashKey()); BigInteger endKey = new BigInteger(shard.getHashKeyRange().getEndingHashKey()); String newStartKey = startKey.add(endKey).divide(DENOMINATOR).toString(); log.log(Level.INFO, "Processing the Shard:[{0}], StartKey:[{1}] EndKey:[{2}] - NewStartKey:[{3}]", new String[]{shard.getShardId(), shard.getHashKeyRange().getStartingHashKey(), shard.getHashKeyRange().getEndingHashKey(), newStartKey}); // Split the shard. client.splitShard(new SplitShardRequest() .withStreamName(streamName) .withShardToSplit(shard.getShardId()) .withNewStartingHashKey(newStartKey)); // Give some time to kinesis to process. TimeUnit.SECONDS.sleep(secsToWait); } log.info("Done!"); }
/** * Creates a new StreamSource. * * @param config * Configuration to determine which stream to put records to and get {@link AWSCredentialsProvider} * @param inputFile * File containing record data to emit on each line * @param loopOverStreamSource * Loop over the stream source to continually put records */ public StreamSource(KinesisConnectorConfiguration config, String inputFile, boolean loopOverStreamSource) { this.config = config; this.inputFile = inputFile; this.loopOverInputFile = loopOverStreamSource; this.objectMapper = new ObjectMapper(); kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER); kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME)); if (config.KINESIS_ENDPOINT != null) { kinesisClient.setEndpoint(config.KINESIS_ENDPOINT); } KinesisUtils.createInputStream(config); }
/** * Creates the Amazon Kinesis stream specified by config.KINESIS_INPUT_STREAM * * @param config * The configuration with the specified input stream name and {@link AWSCredentialsProvider} * @param shardCount * The shard count to create the stream with */ public static void createInputStream(KinesisConnectorConfiguration config) { AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER); kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME)); if (config.KINESIS_ENDPOINT != null) { kinesisClient.setEndpoint(config.KINESIS_ENDPOINT); } createAndWaitForStreamToBecomeAvailable(kinesisClient, config.KINESIS_INPUT_STREAM, config.KINESIS_INPUT_STREAM_SHARD_COUNT); }
/** * Creates the Amazon Kinesis stream specified by config.KINESIS_OUTPUT_STREAM. * * @param config * The configuration with the specified output stream name and {@link AWSCredentialsProvider} * @param shardCount * The shard count to create the stream with */ public static void createOutputStream(KinesisConnectorConfiguration config) { AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER); kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME)); if (config.KINESIS_ENDPOINT != null) { kinesisClient.setEndpoint(config.KINESIS_ENDPOINT); } createAndWaitForStreamToBecomeAvailable(kinesisClient, config.KINESIS_OUTPUT_STREAM, config.KINESIS_OUTPUT_STREAM_SHARD_COUNT); }
/** * Helper method to determine if an Amazon Kinesis stream exists. * * @param kinesisClient * The {@link AmazonKinesisClient} with Amazon Kinesis read privileges * @param streamName * The Amazon Kinesis stream to check for * @return true if the Amazon Kinesis stream exists, otherwise return false */ private static boolean streamExists(AmazonKinesisClient kinesisClient, String streamName) { DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); try { kinesisClient.describeStream(describeStreamRequest); return true; } catch (ResourceNotFoundException e) { return false; } }
/** * Return the state of a Amazon Kinesis stream. * * @param kinesisClient * The {@link AmazonKinesisClient} with Amazon Kinesis read privileges * @param streamName * The Amazon Kinesis stream to get the state of * @return String representation of the Stream state */ private static String streamState(AmazonKinesisClient kinesisClient, String streamName) { DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); try { return kinesisClient.describeStream(describeStreamRequest).getStreamDescription().getStreamStatus(); } catch (AmazonServiceException e) { return null; } }
/** * Deletes the input stream specified by config.KINESIS_INPUT_STREAM * * @param config * The configuration containing the stream name and {@link AWSCredentialsProvider} */ public static void deleteInputStream(KinesisConnectorConfiguration config) { AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER); kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME)); if (config.KINESIS_ENDPOINT != null) { kinesisClient.setEndpoint(config.KINESIS_ENDPOINT); } deleteStream(kinesisClient, config.KINESIS_INPUT_STREAM); }
/** * Deletes the output stream specified by config.KINESIS_OUTPUT_STREAM * * @param config * The configuration containing the stream name and {@link AWSCredentialsProvider} */ public static void deleteOutputStream(KinesisConnectorConfiguration config) { AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER); kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME)); if (config.KINESIS_ENDPOINT != null) { kinesisClient.setEndpoint(config.KINESIS_ENDPOINT); } deleteStream(kinesisClient, config.KINESIS_OUTPUT_STREAM); }
/** * Deletes an Amazon Kinesis stream if it exists. * * @param kinesisClient * The {@link AmazonKinesisClient} with Amazon Kinesis read and write privileges * @param streamName * The Amazon Kinesis stream to delete */ public static void deleteStream(AmazonKinesisClient kinesisClient, String streamName) { if (streamExists(kinesisClient, streamName)) { DeleteStreamRequest deleteStreamRequest = new DeleteStreamRequest(); deleteStreamRequest.setStreamName(streamName); kinesisClient.deleteStream(deleteStreamRequest); LOG.info("Deleting stream " + streamName); } else { LOG.warn("Stream " + streamName + " does not exist"); } }
@Override public synchronized void start() { logger.info("Starting KinesisSink: " + this.getName()); if (this.kinesisClient == null) { this.kinesisClient = new AmazonKinesisClient(new BasicAWSCredentials(this.accessKey, this.accessSecretKey)); } this.kinesisClient.setEndpoint(kinesisEndpoint); super.start(); }
@Test public void simpleProcessTest() throws EventDeliveryException { Channel channel = mock(Channel.class); Transaction transactionMock = mock(Transaction.class); AmazonKinesisClient kinesisClient = mock(AmazonKinesisClient.class); PutRecordResult putRecordResult = mock(PutRecordResult.class); when(channel.getTransaction()).thenReturn(transactionMock); Event testEvent = new SimpleEvent(); byte[] testBody = new byte[]{'b', 'o', 'd', 'y'}; testEvent.setBody(testBody); when(channel.take()).thenReturn(testEvent); when(kinesisClient.putRecord(any(PutRecordRequest.class))).thenReturn(putRecordResult); KinesisSink kinesisSink = new KinesisSink(kinesisClient); kinesisSink.setChannel(channel); Context context = new Context(); context.put(KinesisSinkConfigurationConstant.ACCESS_KEY, "default"); context.put(KinesisSinkConfigurationConstant.ACCESS_SECRET_KEY, "default"); context.put(KinesisSinkConfigurationConstant.STREAM_NAME, "default"); kinesisSink.configure(context); kinesisSink.start(); kinesisSink.process(); verify(channel, times(1)).getTransaction(); verify(channel, times(1)).take(); verify(transactionMock, times(1)).begin(); verify(transactionMock, times(1)).close(); verify(transactionMock, times(1)).commit(); verify(transactionMock, times(0)).rollback(); }
public static void main(String[] args) throws SQLException { java.security.Security.setProperty("networkaddress.cache.ttl" , "60"); query = System.getProperty("kinesisapp.query"); conn = DriverManager.getConnection( System.getProperty("kinesisapp.jdbcurl"), System.getProperty("kinesisapp.dbuser"), System.getProperty("kinesisapp.dbpassword")); conn.setAutoCommit(true); AmazonKinesisClient client = new AmazonKinesisClient(); client.setEndpoint("https://kinesis.us-east-1.amazonaws.com"); String stream = "test"; int iteration = 100; int threashold = 1000; String data = new String("{\"user\":\"10125\",\"line\":\"aaa\",\"station\":\"bbb\",\"latitude\":35."); Random rand = new Random(); try { long start = System.currentTimeMillis(); String myKey = Long.toString(Thread.currentThread().getId()); for (int i = 0; i < iteration; i++) { try { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(stream); putRecordRequest.setData(ByteBuffer.wrap((data+Integer.toString(rand.nextInt(19)+52)+",\"longitude\":139."+Integer.toString(rand.nextInt(39)+51)+"}").getBytes())); putRecordRequest.setPartitionKey(myKey); PutRecordResult putRecordResult = client.putRecord(putRecordRequest); } catch(Exception iex) { } } System.out.println("Elapsed time(ms) for task " + Thread.currentThread().getId() + " : " + (System.currentTimeMillis() - start)); } catch(Exception ex) { ex.printStackTrace(); } }
public static void main(String[] args) { java.security.Security.setProperty("networkaddress.cache.ttl" , "60"); AmazonKinesisClient client = new AmazonKinesisClient(); client.setEndpoint("https://kinesis.us-east-1.amazonaws.com"); String stream = "test"; int iteration = 100; int threashold = 1000; String data = new String("{\"user\":\"10125\",\"line\":\"aaa\",\"station\":\"bbb\",\"latitude\":35."); Random rand = new Random(); try { long start = System.currentTimeMillis(); String myKey = Long.toString(Thread.currentThread().getId()); for (int i = 0; i < iteration; i++) { try { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(stream); putRecordRequest.setData(ByteBuffer.wrap((data+Integer.toString(rand.nextInt(19)+52)+",\"longitude\":139."+Integer.toString(rand.nextInt(39)+51)+"}").getBytes())); putRecordRequest.setPartitionKey(myKey); PutRecordResult putRecordResult = client.putRecord(putRecordRequest); } catch(Exception iex) { } } System.out.println("Elapsed time(ms) for task " + Thread.currentThread().getId() + " : " + (System.currentTimeMillis() - start)); } catch(Exception ex) { ex.printStackTrace(); } }
/** * Helper method to determine if an Amazon Kinesis stream exists. * * @param kinesisClient The {@link com.amazonaws.services.kinesis.AmazonKinesisClient} with Amazon Kinesis read privileges * @param streamName The Amazon Kinesis stream to check for * @return true if the Amazon Kinesis stream exists, otherwise return false */ private static boolean streamExists(AmazonKinesisClient kinesisClient, String streamName) { DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); try { kinesisClient.describeStream(describeStreamRequest); return true; } catch (ResourceNotFoundException e) { return false; } }
/** * Return the state of a Amazon Kinesis stream. * * @param kinesisClient The {@link com.amazonaws.services.kinesis.AmazonKinesisClient} with Amazon Kinesis read privileges * @param streamName The Amazon Kinesis stream to get the state of * @return String representation of the Stream state */ private static String streamState(AmazonKinesisClient kinesisClient, String streamName) { DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); try { return kinesisClient.describeStream(describeStreamRequest).getStreamDescription().getStreamStatus(); } catch (AmazonServiceException e) { return null; } }
/** * Deletes an Amazon Kinesis stream if it exists. * * @param kinesisClient The {@link com.amazonaws.services.kinesis.AmazonKinesisClient} with Amazon Kinesis read and write privileges * @param streamName The Amazon Kinesis stream to delete */ public static void deleteStream(AmazonKinesisClient kinesisClient, String streamName) { if (streamExists(kinesisClient, streamName)) { DeleteStreamRequest deleteStreamRequest = new DeleteStreamRequest(); deleteStreamRequest.setStreamName(streamName); kinesisClient.deleteStream(deleteStreamRequest); LOG.info("Deleting stream " + streamName); } else { LOG.warn("Stream " + streamName + " does not exist"); } }
public DynamoDataStore(AmazonDynamoDB dynamoClient, AmazonKinesisClient kinesisClient, AggregatorType aggregatorType, String streamName, String tableName, String labelAttribute, String dateAttribute) { this.dynamoClient = dynamoClient; this.kinesisClient = kinesisClient; this.aggregatorType = aggregatorType; this.streamName = streamName; this.tableName = tableName; this.labelAttribute = labelAttribute; this.dateAttribute = dateAttribute; }