Java 类com.amazonaws.services.kinesis.producer.KinesisProducer 实例源码

项目:flink    文件:FlinkKinesisProducer.java   
@Override
public void close() throws Exception {
    LOG.info("Closing producer");
    super.close();
    KinesisProducer kp = this.producer;
    this.producer = null;
    if (kp != null) {
        LOG.info("Flushing outstanding {} records", kp.getOutstandingRecordsCount());
        // try to flush all outstanding records
        while (kp.getOutstandingRecordsCount() > 0) {
            kp.flush();
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                LOG.warn("Flushing was interrupted.");
                // stop the blocking flushing and destroy producer immediately
                break;
            }
        }
        LOG.info("Flushing done. Destroying producer instance.");
        kp.destroy();
    }
}
项目:kinesis-kafka-connector    文件:AmazonKinesisSinkTask.java   
private ListenableFuture<UserRecordResult> addUserRecord(KinesisProducer kp, String streamName, String partitionKey,
        boolean usePartitionAsHashKey, SinkRecord sinkRecord) {

    // If configured use kafka partition key as explicit hash key
    // This will be useful when sending data from same partition into
    // same shard
    if (usePartitionAsHashKey)
        return kp.addUserRecord(streamName, partitionKey, Integer.toString(sinkRecord.kafkaPartition()),
                DataUtility.parseValue(sinkRecord.valueSchema(), sinkRecord.value()));
    else
        return kp.addUserRecord(streamName, partitionKey,
                DataUtility.parseValue(sinkRecord.valueSchema(), sinkRecord.value()));

}
项目:kinesis-kafka-connector    文件:AmazonKinesisSinkTask.java   
@Override
public void stop() {
    // destroying kinesis producers which were not closed as part of close
    if (singleKinesisProducerPerPartition) {
        for (KinesisProducer kp : producerMap.values()) {
            kp.flushSync();
            kp.destroy();
        }
    } else {
        kinesisProducer.destroy();
    }

}
项目:kinesis-kafka-connector    文件:AmazonKinesisSinkTask.java   
private KinesisProducer getKinesisProducer() {
    KinesisProducerConfiguration config = new KinesisProducerConfiguration();
    config.setRegion(regionName);
    config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain());
    config.setMaxConnections(maxConnections);

    config.setAggregationEnabled(aggregration);

    // Limits the maximum allowed put rate for a shard, as a percentage of
    // the
    // backend limits.
    config.setRateLimit(rateLimit);

    // Maximum amount of time (milliseconds) a record may spend being
    // buffered
    // before it gets sent. Records may be sent sooner than this depending
    // on the
    // other buffering limits
    config.setRecordMaxBufferedTime(maxBufferedTime);

    // Set a time-to-live on records (milliseconds). Records that do not get
    // successfully put within the limit are failed.
    config.setRecordTtl(ttl);

    // Controls the number of metrics that are uploaded to CloudWatch.
    // Expected pattern: none|summary|detailed
    config.setMetricsLevel(metricsLevel);

    // Controls the granularity of metrics that are uploaded to CloudWatch.
    // Greater granularity produces more metrics.
    // Expected pattern: global|stream|shard
    config.setMetricsGranularity(metricsGranuality);

    // The namespace to upload metrics under.
    config.setMetricsNamespace(metricsNameSpace);

    return new KinesisProducer(config);

}
项目:aws-kinesis-zombies    文件:Drone.java   
/**
 * All what you need: 
 * https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java
 * @return KinesisProducer instance used to put records.
 */
public KinesisProducer createKinesisProducer() {
    KinesisProducerConfiguration config = new KinesisProducerConfiguration();

    config.setRegion(region);
    config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain());
    config.setMaxConnections(24);           // Raise it if you have expired records
    config.setRequestTimeout(60000);        
    config.setAggregationEnabled(true); 
    config.setAggregationMaxCount(2);       // Usually a higher value is far more efficent
    config.setAggregationMaxSize(1024*100);
    config.setRecordMaxBufferedTime(5000);
    producer = new KinesisProducer(config);

    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
        @Override
        public void run() {
            log.info("Flushing remaining records.");
            producer.flushSync();
            log.info("All records flushed.");
            producer.destroy();
            log.info("Producer finished.");
        }
    }) {
    });

    return producer;
}
项目:koupler    文件:KinesisEventProducer.java   
public KinesisEventProducer(String format, CommandLine cmd,
                            String propertiesFile, String streamName,
                            int throttleQueueSize, String appName) {
    this(throttleQueueSize);
    KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile(propertiesFile);
    this.streamName = streamName;
    this.producer = new KinesisProducer(config);
    this.metrics = new KouplerMetrics(this, config, appName);
    this.format = FormatFactory.getFormat(format, cmd);
}
项目:datacollector    文件:TestKinesisTarget.java   
@SuppressWarnings("unchecked")
@Test
public void testRecordTooLarge() throws Exception {
  KinesisProducerConfigBean config = getKinesisTargetConfig();

  KinesisTarget target = new KinesisTarget(config);
  TargetRunner targetRunner = new TargetRunner.Builder(
      KinesisDTarget.class,
      target
  ).setOnRecordError(OnRecordError.TO_ERROR).build();

  KinesisTestUtil.mockKinesisUtil(1);

  KinesisProducer producer = mock(KinesisProducer.class);
  Whitebox.setInternalState(target, "kinesisProducer", producer);

  targetRunner.runInit();

  ListenableFuture<UserRecordResult> future = mock(ListenableFuture.class);

  UserRecordResult result = mock(UserRecordResult.class);

  when(result.isSuccessful()).thenReturn(true);

  when(future.get()).thenReturn(result);

  when(producer.addUserRecord(any(String.class), any(String.class), any(ByteBuffer.class)))
      .thenReturn(future);

  List<Record> records = new ArrayList<>(4);
  records.add(KinesisTestUtil.getTooLargeRecord());
  records.addAll(KinesisTestUtil.getProducerTestRecords(3));
  targetRunner.runWrite(records);

  // Verify we added 3 good records at the end of the batch but not the bad one
  verify(producer, times(3)).addUserRecord(eq(STREAM_NAME), any(String.class), any(ByteBuffer.class));

  assertEquals(1, targetRunner.getErrorRecords().size());
  targetRunner.runDestroy();
}
项目:rakam    文件:AWSKinesisEventStore.java   
@Inject
public AWSKinesisEventStore(AWSConfig config,
                            Metastore metastore,
                            FieldDependency fieldDependency) {
    kinesis = new AmazonKinesisAsyncClient(config.getCredentials());
    kinesis.setRegion(config.getAWSRegion());
    if (config.getKinesisEndpoint() != null) {
        kinesis.setEndpoint(config.getKinesisEndpoint());
    }
    this.config = config;
    this.bulkClient = new S3BulkEventStore(metastore, config, fieldDependency);

    KinesisProducerConfiguration producerConfiguration = new KinesisProducerConfiguration()
            .setRegion(config.getRegion())
            .setCredentialsProvider(config.getCredentials());
    if (config.getKinesisEndpoint() != null) {
        try {
            URL url = new URL(config.getKinesisEndpoint());
            producerConfiguration.setKinesisEndpoint(url.getHost());
            producerConfiguration.setKinesisPort(url.getPort());
            producerConfiguration.setVerifyCertificate(false);
        } catch (MalformedURLException e) {
            throw new IllegalStateException(String.format("Kinesis endpoint is invalid: %s", config.getKinesisEndpoint()));
        }
    }
    producer = new KinesisProducer(producerConfiguration);
}
项目:flink    文件:FlinkKinesisProducer.java   
/**
 * Creates a {@link KinesisProducer}.
 * Exposed so that tests can inject mock producers easily.
 */
@VisibleForTesting
protected KinesisProducer getKinesisProducer(KinesisProducerConfiguration producerConfig) {
    return new KinesisProducer(producerConfig);
}
项目:flink    文件:FlinkKinesisProducerTest.java   
@Override
protected KinesisProducer getKinesisProducer(KinesisProducerConfiguration producerConfig) {
    return mockProducer;
}
项目:flink    文件:FlinkKinesisProducer.java   
@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);

    KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration();

    producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION));
    producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
    if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) {
        producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps,
                ProducerConfigConstants.COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG));
    }
    if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) {
        producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps,
                ProducerConfigConstants.AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG));
    }

    producer = new KinesisProducer(producerConfig);
    callback = new FutureCallback<UserRecordResult>() {
        @Override
        public void onSuccess(UserRecordResult result) {
            if (!result.isSuccessful()) {
                if(failOnError) {
                    thrownException = new RuntimeException("Record was not sent successful");
                } else {
                    LOG.warn("Record was not sent successful");
                }
            }
        }

        @Override
        public void onFailure(Throwable t) {
            if (failOnError) {
                thrownException = t;
            } else {
                LOG.warn("An exception occurred while processing a record", t);
            }
        }
    };

    if (this.customPartitioner != null) {
        this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
    }

    LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion());
}
项目:datacollector    文件:TestKinesisTarget.java   
@SuppressWarnings("unchecked")
@Test
public void testInOrderProduce() throws Exception {
  KinesisProducerConfigBean config = getKinesisTargetConfig();
  config.preserveOrdering = true;

  KinesisTarget target = new KinesisTarget(config);
  TargetRunner targetRunner = new TargetRunner.Builder(KinesisDTarget.class, target).build();

  PowerMockito.mockStatic(KinesisUtil.class);

  when(KinesisUtil.checkStreamExists(
      any(ClientConfiguration.class),
      any(KinesisConfigBean.class),
      any(String.class),
      any(List.class),
      any(Stage.Context.class)
      )
  ).thenReturn(1L);

  KinesisProducer producer = mock(KinesisProducer.class);
  Whitebox.setInternalState(target, "kinesisProducer", producer);

  targetRunner.runInit();

  ListenableFuture<UserRecordResult> future = mock(ListenableFuture.class);

  UserRecordResult result = mock(UserRecordResult.class);

  when(result.isSuccessful()).thenReturn(true);
  when(result.getShardId()).thenReturn("shardId-000000000000");

  when(future.get()).thenReturn(result);

  when(producer.addUserRecord(any(String.class), any(String.class), any(ByteBuffer.class)))
      .thenReturn(future);

  targetRunner.runWrite(KinesisTestUtil.getProducerTestRecords(3));

  // Verify we added 3 records to stream test
  verify(producer, times(3)).addUserRecord(eq(STREAM_NAME), any(String.class), any(ByteBuffer.class));
  // With preserveOrdering we should call flushSync for each record, plus once more for the batch.
  // The last invocation has no effect as no records should be pending.
  verify(producer, times(4)).flushSync();

  targetRunner.runDestroy();
}
项目:aws-big-data-blog    文件:AdvancedKPLClickEventsToKinesis.java   
protected AdvancedKPLClickEventsToKinesis(
        BlockingQueue<ClickEvent> inputQueue) {
    super(inputQueue);
    kinesis = new KinesisProducer(new KinesisProducerConfiguration()
            .setRegion(REGION));
}
项目:aws-big-data-blog    文件:KPLClickEventsToKinesis.java   
public KPLClickEventsToKinesis(BlockingQueue<ClickEvent> inputQueue) {
    super(inputQueue);
    kinesis = new KinesisProducer(new KinesisProducerConfiguration()
            .setRegion(REGION)
            .setRecordMaxBufferedTime(5000));
}
项目:datacollector    文件:TestKinesisTarget.java   
@SuppressWarnings("unchecked")
@Test
public void testDefaultProduce() throws Exception {
  KinesisProducerConfigBean config = getKinesisTargetConfig();

  KinesisTarget target = new KinesisTarget(config);
  TargetRunner targetRunner = new TargetRunner.Builder(KinesisDTarget.class, target).build();

  KinesisTestUtil.mockKinesisUtil(1);

  KinesisProducer producer = mock(KinesisProducer.class);
  Whitebox.setInternalState(target, "kinesisProducer", producer);

  targetRunner.runInit();

  ListenableFuture<UserRecordResult> future = mock(ListenableFuture.class);

  UserRecordResult result = mock(UserRecordResult.class);

  when(result.isSuccessful()).thenReturn(true);

  when(future.get()).thenReturn(result);

  when(producer.addUserRecord(any(String.class), any(String.class), any(ByteBuffer.class)))
      .thenReturn(future);

  targetRunner.runWrite(KinesisTestUtil.getProducerTestRecords(3));

  // Verify we added 3 records
  verify(producer, times(3)).addUserRecord(eq(STREAM_NAME), any(String.class), any(ByteBuffer.class));
  // By default we should only call flushSync one time per batch.
  verify(producer, times(1)).flushSync();

  targetRunner.runDestroy();
}