private static List<Shard> getShards(AmazonKinesis client, String stream) { DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(stream); List<Shard> shards = new ArrayList<>(); String exclusiveStartShardId = null; do { describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId); DescribeStreamResult describeStreamResult = client.describeStream(describeStreamRequest); shards.addAll(describeStreamResult.getStreamDescription().getShards()); if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) { exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); } else { exclusiveStartShardId = null; } } while (exclusiveStartShardId != null); return shards; }
@Test public void testProvisionProducerSuccessfulWithExistingStream() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); ExtendedProducerProperties<KinesisProducerProperties> extendedProducerProperties = new ExtendedProducerProperties<>(new KinesisProducerProperties()); String name = "test-stream"; DescribeStreamResult describeStreamResult = describeStreamResultWithShards( Collections.singletonList(new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenReturn(describeStreamResult); ProducerDestination destination = provisioner.provisionProducerDestination(name, extendedProducerProperties); verify(amazonKinesisMock) .describeStream(any(DescribeStreamRequest.class)); assertThat(destination.getName()).isEqualTo(name); }
@Test public void testProvisionConsumerSuccessfulWithExistingStream() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(new KinesisConsumerProperties()); String name = "test-stream"; String group = "test-group"; DescribeStreamResult describeStreamResult = describeStreamResultWithShards(Collections.singletonList(new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenReturn(describeStreamResult); ConsumerDestination destination = provisioner.provisionConsumerDestination(name, group, extendedConsumerProperties); verify(amazonKinesisMock) .describeStream(any(DescribeStreamRequest.class)); assertThat(destination.getName()).isEqualTo(name); }
@BeforeEach public void before() { this.sourceTaskContext = mock(SourceTaskContext.class); this.offsetStorageReader = mock(OffsetStorageReader.class); when(this.sourceTaskContext.offsetStorageReader()).thenReturn(this.offsetStorageReader); this.task = new KinesisSourceTask(); this.task.initialize(this.sourceTaskContext); this.kinesisClient = mock(AmazonKinesis.class); this.task.time = mock(Time.class); this.task.kinesisClientFactory = mock(KinesisClientFactory.class); when(this.task.kinesisClientFactory.create(any())).thenReturn(this.kinesisClient); this.settings = TestData.settings(); this.config = new KinesisSourceConnectorConfig(this.settings); }
KinesisStreamFactory(Builder builder) { this.stream = builder.stream; this.app = builder.app; this.regionName = builder.awsRegion; this.endpoint = builder.awsEndpoint != null ? builder.awsEndpoint : Region.getRegion(Regions.fromName(regionName)) .getServiceEndpoint(AmazonKinesis.ENDPOINT_PREFIX); this.checkpointInterval = builder.checkpointInterval; this.initialPositionInStream = builder.initialPositionInStream; this.storageLevel = builder.storageLevel; this.awsAccessKeyId = builder.awsAccessKeyId; this.awsSecretKey = builder.awsSecretKey; }
@Override public AmazonKinesis getKinesisClient() { return new AmazonKinesisMock(transform(shardedData, new Function<List<TestData>, List<Record>>() { @Override public List<Record> apply(@Nullable List<TestData> testDatas) { return transform(testDatas, new Function<TestData, Record>() { @Override public Record apply(@Nullable TestData testData) { return testData.convertToRecord(); } }); } }), numberOfRecordsPerGet); }
/** * Creates an AmazonKinesis client. * @param configProps configuration properties containing the access key, secret key, and region * @return a new AmazonKinesis client */ public static AmazonKinesis createKinesisClient(Properties configProps) { // set a Flink-specific user agent ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig() .withUserAgentPrefix(String.format(USER_AGENT_FORMAT, EnvironmentInformation.getVersion(), EnvironmentInformation.getRevisionInformation().commitId)); // utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard() .withCredentials(AWSUtil.getCredentialsProvider(configProps)) .withClientConfiguration(awsClientConfig) .withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION))); if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) { // Set signingRegion as null, to facilitate mocking Kinesis for local tests builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT), null)); } return builder.build(); }
public static void main(String[] args) throws Exception { verify(args); String stream = args[0]; Region region = RegionUtils.getRegion(args[1]); AWSCredentials credentials = getCreds(); AmazonKinesis client = new AmazonKinesisClient(credentials, CLIENT_CONF); client.setRegion(region); checkStream(client.describeStream(stream)); System.out.println("Let's start putting records!"); Random rnd = new Random(System.currentTimeMillis()); for (;;) { putEventRecord(client, stream); Thread.sleep(rnd.nextInt(500) + 650); } }
/** * Collect data for Kinesis. * * @param stats * current statistics object. * @param account * currently used credentials object. * @param region * currently used aws region. */ public static void scanKinesis(AwsStats stats, AwsAccount account, Regions region) { LOG.debug("Scan for Kinesis in region " + region.getName() + " in account " + account.getAccountId()); try { AmazonKinesis kinesis = new AmazonKinesisClient(account.getCredentials()); kinesis.setRegion(Region.getRegion(region)); List<String> list = kinesis.listStreams().getStreamNames(); int totalItems = list.size(); for (String streamName : list) { stats.add(new AwsResource(streamName, account.getAccountId(), AwsResourceType.Kinesis, region)); } LOG.info(totalItems + " Kinesis streams in region " + region.getName() + " in account " + account.getAccountId()); } catch (AmazonServiceException ase) { LOG.error("Exception of Kinesis: " + ase.getMessage()); } }
public static List<com.amazonaws.services.kinesis.model.Record> getPreviewRecords( ClientConfiguration awsClientConfig, KinesisConfigBean conf, int maxBatchSize, GetShardIteratorRequest getShardIteratorRequest ) throws StageException { AmazonKinesis kinesisClient = getKinesisClient(awsClientConfig, conf); GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest); String shardIterator = getShardIteratorResult.getShardIterator(); GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(maxBatchSize); GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest); return getRecordsResult.getRecords(); }
private void run(final int events, final OutputFormat format, final String streamName, final String region) throws Exception { AmazonKinesis kinesisClient = new AmazonKinesisClient( new DefaultAWSCredentialsProviderChain()); kinesisClient.setRegion(Region.getRegion(Regions.fromName(region))); int count = 0; SensorReading r = null; do { r = nextSensorReading(format); try { PutRecordRequest req = new PutRecordRequest() .withPartitionKey("" + rand.nextLong()) .withStreamName(streamName) .withData(ByteBuffer.wrap(r.toString().getBytes())); kinesisClient.putRecord(req); } catch (ProvisionedThroughputExceededException e) { Thread.sleep(BACKOFF); } System.out.println(r); count++; } while (count < events); }
private static String getShardIterator(AmazonKinesis client, String stream, Shard shard, String start) { GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(stream); getShardIteratorRequest.setShardId(shard.getShardId()); if (!Strings.isNullOrEmpty(start)) { getShardIteratorRequest.setShardIteratorType(ShardIteratorType.AT_TIMESTAMP); getShardIteratorRequest.setTimestamp(new Date(System.currentTimeMillis() - Duration.parse(start).toMillis())); } else { getShardIteratorRequest.setShardIteratorType(ShardIteratorType.LATEST); } GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); return getShardIteratorResult.getShardIterator(); }
private static String getShardIteratorAtSequenceNumber(AmazonKinesis client, String stream, Shard shard, String sequenceNumber) { GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(stream); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER); getShardIteratorRequest.setStartingSequenceNumber(sequenceNumber); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); return getShardIteratorResult.getShardIterator(); }
private static String getOldestShardIterator(AmazonKinesis client, String stream, Shard shard) { GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(stream); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType(ShardIteratorType.TRIM_HORIZON); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); return getShardIteratorResult.getShardIterator(); }
@Override protected void createAWSClient() { client = tryClientFactory(config.clientFactoryMethod, AmazonKinesis.class, true); if ((client == null) && (config.clientEndpoint == null)) { client = tryClientFactory("com.amazonaws.services.kinesis.AmazonKinesisClientBuilder.defaultClient", AmazonKinesis.class, false); } if (client == null) { LogLog.debug(getClass().getSimpleName() + ": creating service client via constructor"); client = tryConfigureEndpointOrRegion(new AmazonKinesisClient(), config.clientEndpoint); } }
/** * Creates a client proxy outside of the writer factory. */ public AmazonKinesis createClient() { return (AmazonKinesis)Proxy.newProxyInstance( getClass().getClassLoader(), new Class<?>[] { AmazonKinesis.class }, MockKinesisClient.this); }
public KinesisStreamProvisioner(AmazonKinesis amazonKinesis, KinesisBinderConfigurationProperties kinesisBinderConfigurationProperties) { Assert.notNull(amazonKinesis, "'amazonKinesis' must not be null"); Assert.notNull(kinesisBinderConfigurationProperties, "'kinesisBinderConfigurationProperties' must not be null"); this.amazonKinesis = amazonKinesis; this.configurationProperties = kinesisBinderConfigurationProperties; }
@Test public void testProvisionConsumerExistingStreamUpdateShards() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); ArgumentCaptor<UpdateShardCountRequest> updateShardCaptor = ArgumentCaptor.forClass(UpdateShardCountRequest.class); String name = "test-stream"; String group = "test-group"; int targetShardCount = 2; KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); binderProperties.setMinShardCount(targetShardCount); binderProperties.setAutoAddShards(true); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(new KinesisConsumerProperties()); DescribeStreamResult describeOriginalStream = describeStreamResultWithShards(Collections.singletonList(new Shard())); DescribeStreamResult describeUpdatedStream = describeStreamResultWithShards(Arrays.asList(new Shard(), new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenReturn(describeOriginalStream) .thenReturn(describeUpdatedStream); provisioner.provisionConsumerDestination(name, group, extendedConsumerProperties); verify(amazonKinesisMock, times(1)) .updateShardCount(updateShardCaptor.capture()); assertThat(updateShardCaptor.getValue().getStreamName()).isEqualTo(name); assertThat(updateShardCaptor.getValue().getScalingType()).isEqualTo(ScalingType.UNIFORM_SCALING.name()); assertThat(updateShardCaptor.getValue().getTargetShardCount()).isEqualTo(targetShardCount); }
@Test public void testProvisionProducerSuccessfulWithNewStream() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); ExtendedProducerProperties<KinesisProducerProperties> extendedProducerProperties = new ExtendedProducerProperties<>(new KinesisProducerProperties()); String name = "test-stream"; Integer shards = 1; DescribeStreamResult describeStreamResult = describeStreamResultWithShards(Collections.singletonList(new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenThrow(new ResourceNotFoundException("I got nothing")) .thenReturn(describeStreamResult); when(amazonKinesisMock.createStream(name, shards)) .thenReturn(new CreateStreamResult()); ProducerDestination destination = provisioner.provisionProducerDestination(name, extendedProducerProperties); verify(amazonKinesisMock, times(2)) .describeStream(any(DescribeStreamRequest.class)); verify(amazonKinesisMock) .createStream(name, shards); assertThat(destination.getName()).isEqualTo(name); }
@Test public void testProvisionProducerUpdateShards() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); ArgumentCaptor<UpdateShardCountRequest> updateShardCaptor = ArgumentCaptor.forClass(UpdateShardCountRequest.class); String name = "test-stream"; String group = "test-group"; int targetShardCount = 2; KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); binderProperties.setMinShardCount(targetShardCount); binderProperties.setAutoAddShards(true); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(new KinesisConsumerProperties()); DescribeStreamResult describeOriginalStream = describeStreamResultWithShards(Collections.singletonList(new Shard())); DescribeStreamResult describeUpdatedStream = describeStreamResultWithShards(Arrays.asList(new Shard(), new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenReturn(describeOriginalStream) .thenReturn(describeUpdatedStream); provisioner.provisionConsumerDestination(name, group, extendedConsumerProperties); verify(amazonKinesisMock, times(1)) .updateShardCount(updateShardCaptor.capture()); assertThat(updateShardCaptor.getValue().getStreamName()).isEqualTo(name); assertThat(updateShardCaptor.getValue().getScalingType()).isEqualTo(ScalingType.UNIFORM_SCALING.name()); assertThat(updateShardCaptor.getValue().getTargetShardCount()).isEqualTo(targetShardCount); }
@Test public void testProvisionConsumerSuccessfulWithNewStream() { AmazonKinesis amazonKinesisMock = mock(AmazonKinesis.class); KinesisBinderConfigurationProperties binderProperties = new KinesisBinderConfigurationProperties(); KinesisStreamProvisioner provisioner = new KinesisStreamProvisioner(amazonKinesisMock, binderProperties); int instanceCount = 1; int concurrency = 1; ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties = new ExtendedConsumerProperties<>(new KinesisConsumerProperties()); extendedConsumerProperties.setInstanceCount(instanceCount); extendedConsumerProperties.setConcurrency(concurrency); String name = "test-stream"; String group = "test-group"; DescribeStreamResult describeStreamResult = describeStreamResultWithShards(Collections.singletonList(new Shard())); when(amazonKinesisMock.describeStream(any(DescribeStreamRequest.class))) .thenThrow(new ResourceNotFoundException("I got nothing")) .thenReturn(describeStreamResult); when(amazonKinesisMock.createStream(name, instanceCount * concurrency)) .thenReturn(new CreateStreamResult()); ConsumerDestination destination = provisioner.provisionConsumerDestination(name, group, extendedConsumerProperties); verify(amazonKinesisMock, times(2)) .describeStream(any(DescribeStreamRequest.class)); verify(amazonKinesisMock) .createStream(name, instanceCount * concurrency); assertThat(destination.getName()).isEqualTo(name); }
@Override public AmazonKinesis create(KinesisSourceConnectorConfig config) { AmazonKinesisClientBuilder builder = AmazonKinesisClient.builder() .withCredentials(config.awsCredentialsProvider()) .withRegion(config.kinesisRegion); return builder.build(); }
@BeforeEach public void setup() { this.kinesisClient = mock(AmazonKinesis.class, withSettings().verboseLogging()); this.connector = new KinesisSourceConnector(); this.connector.kinesisClientFactory = mock(KinesisClientFactory.class); when(this.connector.kinesisClientFactory.create(any())).thenReturn(this.kinesisClient); }
@Override public AmazonKinesis getKinesisClient() { AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard().withCredentials(getCredentialsProvider()); if (serviceEndpoint == null) { clientBuilder.withRegion(region); } else { clientBuilder.withEndpointConfiguration( new AwsClientBuilder.EndpointConfiguration(serviceEndpoint, region.getName())); } return clientBuilder.build(); }
private static boolean streamExists(AmazonKinesis client, String streamName) { try { DescribeStreamResult describeStreamResult = client.describeStream(streamName); return (describeStreamResult != null && describeStreamResult.getSdkHttpMetadata().getHttpStatusCode() == 200); } catch (Exception e) { LOG.warn("Error checking whether stream {} exists.", streamName, e); } return false; }
public static void putEventRecord(AmazonKinesis client, String stream) throws Exception { String eventRecord = getEventRecord(); PutRecordRequest put = new PutRecordRequest(); put.setStreamName(stream); put.setPartitionKey("test:650"); put.setData(ByteBuffer.wrap(eventRecord.getBytes("UTF-8"))); try { PutRecordResult result = client.putRecord(put); System.out.println(result); } catch (AmazonClientException ex) { System.out.println("PutRecord failed."); } }
public static long getShardCount( ClientConfiguration awsClientConfig, KinesisConfigBean conf, String streamName ) throws StageException { AmazonKinesis kinesisClient = getKinesisClient(awsClientConfig, conf); try { long numShards = 0; String lastShardId = null; StreamDescription description; do { if (lastShardId == null) { description = kinesisClient.describeStream(streamName).getStreamDescription(); } else { description = kinesisClient.describeStream(streamName, lastShardId).getStreamDescription(); } for (Shard shard : description.getShards()) { if (shard.getSequenceNumberRange().getEndingSequenceNumber() == null) { // Then this shard is open, so we should count it. Shards with an ending sequence number // are closed and cannot be written to, so we skip counting them. ++numShards; } } int pageSize = description.getShards().size(); lastShardId = description.getShards().get(pageSize - 1).getShardId(); } while (description.getHasMoreShards()); LOG.debug("Connected successfully to stream: '{}' with '{}' shards.", streamName, numShards); return numShards; } finally { kinesisClient.shutdown(); } }
private static AmazonKinesis getKinesisClient(ClientConfiguration awsClientConfig, KinesisConfigBean conf) throws StageException { AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder .standard() .withClientConfiguration(checkNotNull(awsClientConfig)) .withCredentials(AWSUtil.getCredentialsProvider(conf.awsConfig)); if (AWSRegions.OTHER == conf.region) { builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(conf.endpoint, null)); } else { builder.withRegion(conf.region.getLabel()); } return builder.build(); }
/** * Get the last shard Id in the given stream * In preview mode, kinesis source uses the last Shard Id to get records from kinesis * @param awsClientConfig generic AWS client configuration * @param conf * @param streamName */ public static String getLastShardId( ClientConfiguration awsClientConfig, KinesisConfigBean conf, String streamName ) throws StageException { AmazonKinesis kinesisClient = getKinesisClient(awsClientConfig, conf); String lastShardId = null; try { StreamDescription description; do { if (lastShardId == null) { description = kinesisClient.describeStream(streamName).getStreamDescription(); } else { description = kinesisClient.describeStream(streamName, lastShardId).getStreamDescription(); } int pageSize = description.getShards().size(); lastShardId = description.getShards().get(pageSize - 1).getShardId(); } while (description.getHasMoreShards()); return lastShardId; } finally { kinesisClient.shutdown(); } }
private static boolean streamActive(AmazonKinesis client, String streamName) { try { DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(streamName); DescribeStreamResult describeStreamResult = client.describeStream(describeStreamRequest); String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); if ("ACTIVE".equals(streamStatus)) { return true; } } catch (Exception e) { return false; } return false; }
@Override public Iterable<ValidationResult> doHealthChecks(RuntimeContainer container) { AmazonKinesis amazonKinesis = KinesisClient.create(properties); try { ListStreamsResult listStreamsResult = amazonKinesis.listStreams(); return Arrays.asList(ValidationResult.OK); } catch (Exception e) { return Arrays.asList(new ValidationResult(ValidationResult.Result.ERROR, e.getMessage())); } }
@Override public Set<String> listStreams() { AmazonKinesis amazonKinesis = KinesisClient.create(properties); ListStreamsResult listStreamsResult = amazonKinesis.listStreams(); List<String> streamNames = listStreamsResult.getStreamNames(); Set<String> streamNamesCollection = new HashSet(streamNames); while (listStreamsResult.isHasMoreStreams() && !streamNames.isEmpty()) { listStreamsResult = amazonKinesis.listStreams(streamNames.get(streamNames.size() - 1)); streamNames = listStreamsResult.getStreamNames(); streamNamesCollection.addAll(streamNames); } return streamNamesCollection; }
/** * @param eventsQueue The queue that holds the records to send to Kinesis * @param kinesisClient Reference to the Kinesis client * @param streamName The stream name to send items to */ public ProducerBase(BlockingQueue<Event> eventsQueue, AmazonKinesis kinesisClient, String streamName) { this.eventsQueue = eventsQueue; this.kinesisClient = kinesisClient; this.streamName = streamName; }
private static Map<Shard, String> getShardIterators(AmazonKinesis client, String stream, String start) { Map<Shard, String> shardIterators = new HashMap<>(); getShards(client, stream).forEach(shard -> shardIterators.put(shard, getShardIterator(client, stream, shard, start))); return shardIterators; }
public static AmazonKinesis createMockClient() { staticFactoryMock = new MockKinesisClient(); return staticFactoryMock.createClient(); }
/** * Factory method called by smoketest */ public static AmazonKinesis createClient() { return AmazonKinesisClientBuilder.defaultClient(); }
public SimplifiedKinesisClient(AmazonKinesis kinesis, AmazonCloudWatch cloudWatch) { this.kinesis = checkNotNull(kinesis, "kinesis"); this.cloudWatch = checkNotNull(cloudWatch, "cloudWatch"); }
AmazonKinesis getClient() { return amazonKinesisClient; }
public AmazonKinesis getAmazonKinesisClient() { return amazonKinesisClient; }
public void setAmazonKinesisClient(AmazonKinesis amazonKinesisClient) { this.amazonKinesisClient = amazonKinesisClient; }