@Override public void close() throws Exception { LOG.info("Closing producer"); super.close(); KinesisProducer kp = this.producer; this.producer = null; if (kp != null) { LOG.info("Flushing outstanding {} records", kp.getOutstandingRecordsCount()); // try to flush all outstanding records while (kp.getOutstandingRecordsCount() > 0) { kp.flush(); try { Thread.sleep(500); } catch (InterruptedException e) { LOG.warn("Flushing was interrupted."); // stop the blocking flushing and destroy producer immediately break; } } LOG.info("Flushing done. Destroying producer instance."); kp.destroy(); } }
private ListenableFuture<UserRecordResult> addUserRecord(KinesisProducer kp, String streamName, String partitionKey, boolean usePartitionAsHashKey, SinkRecord sinkRecord) { // If configured use kafka partition key as explicit hash key // This will be useful when sending data from same partition into // same shard if (usePartitionAsHashKey) return kp.addUserRecord(streamName, partitionKey, Integer.toString(sinkRecord.kafkaPartition()), DataUtility.parseValue(sinkRecord.valueSchema(), sinkRecord.value())); else return kp.addUserRecord(streamName, partitionKey, DataUtility.parseValue(sinkRecord.valueSchema(), sinkRecord.value())); }
@Override public void stop() { // destroying kinesis producers which were not closed as part of close if (singleKinesisProducerPerPartition) { for (KinesisProducer kp : producerMap.values()) { kp.flushSync(); kp.destroy(); } } else { kinesisProducer.destroy(); } }
private KinesisProducer getKinesisProducer() { KinesisProducerConfiguration config = new KinesisProducerConfiguration(); config.setRegion(regionName); config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain()); config.setMaxConnections(maxConnections); config.setAggregationEnabled(aggregration); // Limits the maximum allowed put rate for a shard, as a percentage of // the // backend limits. config.setRateLimit(rateLimit); // Maximum amount of time (milliseconds) a record may spend being // buffered // before it gets sent. Records may be sent sooner than this depending // on the // other buffering limits config.setRecordMaxBufferedTime(maxBufferedTime); // Set a time-to-live on records (milliseconds). Records that do not get // successfully put within the limit are failed. config.setRecordTtl(ttl); // Controls the number of metrics that are uploaded to CloudWatch. // Expected pattern: none|summary|detailed config.setMetricsLevel(metricsLevel); // Controls the granularity of metrics that are uploaded to CloudWatch. // Greater granularity produces more metrics. // Expected pattern: global|stream|shard config.setMetricsGranularity(metricsGranuality); // The namespace to upload metrics under. config.setMetricsNamespace(metricsNameSpace); return new KinesisProducer(config); }
/** * All what you need: * https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java * @return KinesisProducer instance used to put records. */ public KinesisProducer createKinesisProducer() { KinesisProducerConfiguration config = new KinesisProducerConfiguration(); config.setRegion(region); config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain()); config.setMaxConnections(24); // Raise it if you have expired records config.setRequestTimeout(60000); config.setAggregationEnabled(true); config.setAggregationMaxCount(2); // Usually a higher value is far more efficent config.setAggregationMaxSize(1024*100); config.setRecordMaxBufferedTime(5000); producer = new KinesisProducer(config); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { log.info("Flushing remaining records."); producer.flushSync(); log.info("All records flushed."); producer.destroy(); log.info("Producer finished."); } }) { }); return producer; }
public KinesisEventProducer(String format, CommandLine cmd, String propertiesFile, String streamName, int throttleQueueSize, String appName) { this(throttleQueueSize); KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile(propertiesFile); this.streamName = streamName; this.producer = new KinesisProducer(config); this.metrics = new KouplerMetrics(this, config, appName); this.format = FormatFactory.getFormat(format, cmd); }
@SuppressWarnings("unchecked") @Test public void testRecordTooLarge() throws Exception { KinesisProducerConfigBean config = getKinesisTargetConfig(); KinesisTarget target = new KinesisTarget(config); TargetRunner targetRunner = new TargetRunner.Builder( KinesisDTarget.class, target ).setOnRecordError(OnRecordError.TO_ERROR).build(); KinesisTestUtil.mockKinesisUtil(1); KinesisProducer producer = mock(KinesisProducer.class); Whitebox.setInternalState(target, "kinesisProducer", producer); targetRunner.runInit(); ListenableFuture<UserRecordResult> future = mock(ListenableFuture.class); UserRecordResult result = mock(UserRecordResult.class); when(result.isSuccessful()).thenReturn(true); when(future.get()).thenReturn(result); when(producer.addUserRecord(any(String.class), any(String.class), any(ByteBuffer.class))) .thenReturn(future); List<Record> records = new ArrayList<>(4); records.add(KinesisTestUtil.getTooLargeRecord()); records.addAll(KinesisTestUtil.getProducerTestRecords(3)); targetRunner.runWrite(records); // Verify we added 3 good records at the end of the batch but not the bad one verify(producer, times(3)).addUserRecord(eq(STREAM_NAME), any(String.class), any(ByteBuffer.class)); assertEquals(1, targetRunner.getErrorRecords().size()); targetRunner.runDestroy(); }
@Inject public AWSKinesisEventStore(AWSConfig config, Metastore metastore, FieldDependency fieldDependency) { kinesis = new AmazonKinesisAsyncClient(config.getCredentials()); kinesis.setRegion(config.getAWSRegion()); if (config.getKinesisEndpoint() != null) { kinesis.setEndpoint(config.getKinesisEndpoint()); } this.config = config; this.bulkClient = new S3BulkEventStore(metastore, config, fieldDependency); KinesisProducerConfiguration producerConfiguration = new KinesisProducerConfiguration() .setRegion(config.getRegion()) .setCredentialsProvider(config.getCredentials()); if (config.getKinesisEndpoint() != null) { try { URL url = new URL(config.getKinesisEndpoint()); producerConfiguration.setKinesisEndpoint(url.getHost()); producerConfiguration.setKinesisPort(url.getPort()); producerConfiguration.setVerifyCertificate(false); } catch (MalformedURLException e) { throw new IllegalStateException(String.format("Kinesis endpoint is invalid: %s", config.getKinesisEndpoint())); } } producer = new KinesisProducer(producerConfiguration); }
/** * Creates a {@link KinesisProducer}. * Exposed so that tests can inject mock producers easily. */ @VisibleForTesting protected KinesisProducer getKinesisProducer(KinesisProducerConfiguration producerConfig) { return new KinesisProducer(producerConfig); }
@Override protected KinesisProducer getKinesisProducer(KinesisProducerConfiguration producerConfig) { return mockProducer; }
@Override public void open(Configuration parameters) throws Exception { super.open(parameters); KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION)); producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) { producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps, ProducerConfigConstants.COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG)); } if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) { producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps, ProducerConfigConstants.AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG)); } producer = new KinesisProducer(producerConfig); callback = new FutureCallback<UserRecordResult>() { @Override public void onSuccess(UserRecordResult result) { if (!result.isSuccessful()) { if(failOnError) { thrownException = new RuntimeException("Record was not sent successful"); } else { LOG.warn("Record was not sent successful"); } } } @Override public void onFailure(Throwable t) { if (failOnError) { thrownException = t; } else { LOG.warn("An exception occurred while processing a record", t); } } }; if (this.customPartitioner != null) { this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); } LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion()); }
@SuppressWarnings("unchecked") @Test public void testInOrderProduce() throws Exception { KinesisProducerConfigBean config = getKinesisTargetConfig(); config.preserveOrdering = true; KinesisTarget target = new KinesisTarget(config); TargetRunner targetRunner = new TargetRunner.Builder(KinesisDTarget.class, target).build(); PowerMockito.mockStatic(KinesisUtil.class); when(KinesisUtil.checkStreamExists( any(ClientConfiguration.class), any(KinesisConfigBean.class), any(String.class), any(List.class), any(Stage.Context.class) ) ).thenReturn(1L); KinesisProducer producer = mock(KinesisProducer.class); Whitebox.setInternalState(target, "kinesisProducer", producer); targetRunner.runInit(); ListenableFuture<UserRecordResult> future = mock(ListenableFuture.class); UserRecordResult result = mock(UserRecordResult.class); when(result.isSuccessful()).thenReturn(true); when(result.getShardId()).thenReturn("shardId-000000000000"); when(future.get()).thenReturn(result); when(producer.addUserRecord(any(String.class), any(String.class), any(ByteBuffer.class))) .thenReturn(future); targetRunner.runWrite(KinesisTestUtil.getProducerTestRecords(3)); // Verify we added 3 records to stream test verify(producer, times(3)).addUserRecord(eq(STREAM_NAME), any(String.class), any(ByteBuffer.class)); // With preserveOrdering we should call flushSync for each record, plus once more for the batch. // The last invocation has no effect as no records should be pending. verify(producer, times(4)).flushSync(); targetRunner.runDestroy(); }
protected AdvancedKPLClickEventsToKinesis( BlockingQueue<ClickEvent> inputQueue) { super(inputQueue); kinesis = new KinesisProducer(new KinesisProducerConfiguration() .setRegion(REGION)); }
public KPLClickEventsToKinesis(BlockingQueue<ClickEvent> inputQueue) { super(inputQueue); kinesis = new KinesisProducer(new KinesisProducerConfiguration() .setRegion(REGION) .setRecordMaxBufferedTime(5000)); }
@SuppressWarnings("unchecked") @Test public void testDefaultProduce() throws Exception { KinesisProducerConfigBean config = getKinesisTargetConfig(); KinesisTarget target = new KinesisTarget(config); TargetRunner targetRunner = new TargetRunner.Builder(KinesisDTarget.class, target).build(); KinesisTestUtil.mockKinesisUtil(1); KinesisProducer producer = mock(KinesisProducer.class); Whitebox.setInternalState(target, "kinesisProducer", producer); targetRunner.runInit(); ListenableFuture<UserRecordResult> future = mock(ListenableFuture.class); UserRecordResult result = mock(UserRecordResult.class); when(result.isSuccessful()).thenReturn(true); when(future.get()).thenReturn(result); when(producer.addUserRecord(any(String.class), any(String.class), any(ByteBuffer.class))) .thenReturn(future); targetRunner.runWrite(KinesisTestUtil.getProducerTestRecords(3)); // Verify we added 3 records verify(producer, times(3)).addUserRecord(eq(STREAM_NAME), any(String.class), any(ByteBuffer.class)); // By default we should only call flushSync one time per batch. verify(producer, times(1)).flushSync(); targetRunner.runDestroy(); }