Java 类com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration 实例源码

项目:sumologic-kinesis-connector    文件:KinesisConnectorRecordProcessor.java   
public KinesisConnectorRecordProcessor(IBuffer<T> buffer,
        IFilter<T> filter,
        IEmitter<U> emitter,
        ITransformerBase<T, U> transformer,
        KinesisConnectorConfiguration configuration) {
    if (buffer == null || filter == null || emitter == null || transformer == null) {
        throw new IllegalArgumentException("buffer, filter, emitter, and transformer must not be null");
    }
    this.buffer = buffer;
    this.filter = filter;
    this.emitter = emitter;
    this.transformer = transformer;
    // Limit must be greater than zero
    if (configuration.RETRY_LIMIT <= 0) {
        retryLimit = 1;
    } else {
        retryLimit = configuration.RETRY_LIMIT;
    }
    this.backoffInterval = configuration.BACKOFF_INTERVAL;
}
项目:sumologic-kinesis-connector    文件:StreamSource.java   
/**
 * Creates a new StreamSource.
 * 
 * @param config
 *        Configuration to determine which stream to put records to and get {@link AWSCredentialsProvider}
 * @param inputFile
 *        File containing record data to emit on each line
 * @param loopOverStreamSource
 *        Loop over the stream source to continually put records
 */
public StreamSource(KinesisConnectorConfiguration config, String inputFile, boolean loopOverStreamSource) {
    this.config = config;
    this.inputFile = inputFile;
    this.loopOverInputFile = loopOverStreamSource;
    this.objectMapper = new ObjectMapper();
    kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
    kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
    if (config.KINESIS_ENDPOINT != null) {
        kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
    }
    KinesisUtils.createInputStream(config);
}
项目:sumologic-kinesis-connector    文件:KinesisUtils.java   
/**
 * Creates the Amazon Kinesis stream specified by config.KINESIS_INPUT_STREAM
 * 
 * @param config
 *        The configuration with the specified input stream name and {@link AWSCredentialsProvider}
 * @param shardCount
 *        The shard count to create the stream with
 */
public static void createInputStream(KinesisConnectorConfiguration config) {
    AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
    kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
    if (config.KINESIS_ENDPOINT != null) {
        kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
    }
    createAndWaitForStreamToBecomeAvailable(kinesisClient,
            config.KINESIS_INPUT_STREAM,
            config.KINESIS_INPUT_STREAM_SHARD_COUNT);
}
项目:sumologic-kinesis-connector    文件:KinesisUtils.java   
/**
 * Creates the Amazon Kinesis stream specified by config.KINESIS_OUTPUT_STREAM.
 * 
 * @param config
 *        The configuration with the specified output stream name and {@link AWSCredentialsProvider}
 * @param shardCount
 *        The shard count to create the stream with
 */
public static void createOutputStream(KinesisConnectorConfiguration config) {
    AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
    kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
    if (config.KINESIS_ENDPOINT != null) {
        kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
    }
    createAndWaitForStreamToBecomeAvailable(kinesisClient,
            config.KINESIS_OUTPUT_STREAM,
            config.KINESIS_OUTPUT_STREAM_SHARD_COUNT);
}
项目:sumologic-kinesis-connector    文件:KinesisUtils.java   
/**
 * Deletes the input stream specified by config.KINESIS_INPUT_STREAM
 * 
 * @param config
 *        The configuration containing the stream name and {@link AWSCredentialsProvider}
 */
public static void deleteInputStream(KinesisConnectorConfiguration config) {
    AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
    kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
    if (config.KINESIS_ENDPOINT != null) {
        kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
    }
    deleteStream(kinesisClient, config.KINESIS_INPUT_STREAM);
}
项目:sumologic-kinesis-connector    文件:KinesisUtils.java   
/**
 * Deletes the output stream specified by config.KINESIS_OUTPUT_STREAM
 * 
 * @param config
 *        The configuration containing the stream name and {@link AWSCredentialsProvider}
 */
public static void deleteOutputStream(KinesisConnectorConfiguration config) {
    AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
    kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
    if (config.KINESIS_ENDPOINT != null) {
        kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
    }
    deleteStream(kinesisClient, config.KINESIS_OUTPUT_STREAM);
}
项目:aws-big-data-blog    文件:StreamSource.java   
/**
 * Creates a new StreamSource.
 * 
 * @param config
 *        Configuration to determine which stream to put records to and get {@link AWSCredentialsProvider}
 * @param inputFile
 *        File containing record data to emit on each line
 * @param loopOverStreamSource
 *        Loop over the stream source to continually put records
 */
public StreamSource(KinesisConnectorConfiguration config, String inputFile, boolean loopOverStreamSource) {
    this.config = config;
    this.inputFile = inputFile;
    this.loopOverInputFile = loopOverStreamSource;
    this.objectMapper = new ObjectMapper();
    kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
    kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
    if (config.KINESIS_ENDPOINT != null) {
        kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
    }
    KinesisUtils.createInputStream(config);
}
项目:aws-big-data-blog    文件:KinesisUtils.java   
/**
 * Creates the Amazon Kinesis stream specified by config.KINESIS_INPUT_STREAM
 * 
 * @param config
 *        The configuration with the specified input stream name and {@link AWSCredentialsProvider}
 * @param shardCount
 *        The shard count to create the stream with
 */
public static void createInputStream(KinesisConnectorConfiguration config) {
    AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
    kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
    if (config.KINESIS_ENDPOINT != null) {
        kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
    }
    createAndWaitForStreamToBecomeAvailable(kinesisClient,
            config.KINESIS_INPUT_STREAM,
            config.KINESIS_INPUT_STREAM_SHARD_COUNT);
}
项目:aws-big-data-blog    文件:KinesisUtils.java   
/**
 * Creates the Amazon Kinesis stream specified by config.KINESIS_OUTPUT_STREAM.
 * 
 * @param config
 *        The configuration with the specified output stream name and {@link AWSCredentialsProvider}
 * @param shardCount
 *        The shard count to create the stream with
 */
public static void createOutputStream(KinesisConnectorConfiguration config) {
    AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
    kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
    if (config.KINESIS_ENDPOINT != null) {
        kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
    }
    createAndWaitForStreamToBecomeAvailable(kinesisClient,
            config.KINESIS_OUTPUT_STREAM,
            config.KINESIS_OUTPUT_STREAM_SHARD_COUNT);
}
项目:aws-big-data-blog    文件:KinesisUtils.java   
/**
 * Deletes the input stream specified by config.KINESIS_INPUT_STREAM
 * 
 * @param config
 *        The configuration containing the stream name and {@link AWSCredentialsProvider}
 */
public static void deleteInputStream(KinesisConnectorConfiguration config) {
    AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
    kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
    if (config.KINESIS_ENDPOINT != null) {
        kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
    }
    deleteStream(kinesisClient, config.KINESIS_INPUT_STREAM);
}
项目:aws-big-data-blog    文件:KinesisUtils.java   
/**
 * Deletes the output stream specified by config.KINESIS_OUTPUT_STREAM
 * 
 * @param config
 *        The configuration containing the stream name and {@link AWSCredentialsProvider}
 */
public static void deleteOutputStream(KinesisConnectorConfiguration config) {
    AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER);
    kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME));
    if (config.KINESIS_ENDPOINT != null) {
        kinesisClient.setEndpoint(config.KINESIS_ENDPOINT);
    }
    deleteStream(kinesisClient, config.KINESIS_OUTPUT_STREAM);
}
项目:sumologic-kinesis-connector    文件:BatchedStreamSource.java   
public BatchedStreamSource(KinesisConnectorConfiguration config, String inputFile) {
    this(config, inputFile, false);
}
项目:sumologic-kinesis-connector    文件:BatchedStreamSource.java   
public BatchedStreamSource(KinesisConnectorConfiguration config, String inputFile, boolean loopOverStreamSource) {
    super(config, inputFile, loopOverStreamSource);
    buffer = new ArrayList<SimpleKinesisMessageModel>();
}
项目:sumologic-kinesis-connector    文件:KinesisConnectorRecordProcessorFactory.java   
public KinesisConnectorRecordProcessorFactory(IKinesisConnectorPipeline<T, U> pipeline,
        KinesisConnectorConfiguration configuration) {
    this.configuration = configuration;
    this.pipeline = pipeline;
}
项目:sumologic-kinesis-connector    文件:SumologicMessageModelPipeline.java   
@Override
public IEmitter<String> getEmitter(KinesisConnectorConfiguration configuration) {
    return new SumologicEmitter(configuration);
}
项目:sumologic-kinesis-connector    文件:SumologicMessageModelPipeline.java   
@Override
public IBuffer<SimpleKinesisMessageModel> getBuffer(KinesisConnectorConfiguration configuration) {
    return new BasicMemoryBuffer<SimpleKinesisMessageModel>(configuration);
}
项目:sumologic-kinesis-connector    文件:SumologicMessageModelPipeline.java   
@Override
public IFilter<SimpleKinesisMessageModel> getFilter(KinesisConnectorConfiguration configuration) {
    return new AllPassFilter<SimpleKinesisMessageModel>();
}
项目:sumologic-kinesis-connector    文件:SumologicEmitter.java   
public SumologicEmitter(KinesisConnectorConfiguration configuration) {
    this.config = (KinesisConnectorForSumologicConfiguration) configuration;
    sender = new SumologicSender(this.config.SUMOLOGIC_URL);
    batchSize = this.config.BUFFER_RECORD_COUNT_LIMIT;
}
项目:kinesis-to-s3    文件:KinesisToS3.java   
public KinesisToS3(String configurationFile) throws IOException {
    final Properties properties = readProperties(configurationFile);
    configuration = new KinesisConnectorConfiguration(properties, new DefaultAWSCredentialsProviderChain());
    initialize(configuration);
}
项目:kinesis-to-s3    文件:S3Pipeline.java   
@Override
public IEmitter<byte[]> getEmitter(KinesisConnectorConfiguration configuration) {
    return new S3Emitter(configuration);
}
项目:kinesis-to-s3    文件:S3Pipeline.java   
@Override
public IBuffer<byte[]> getBuffer(KinesisConnectorConfiguration configuration) {
    return new BasicMemoryBuffer<>(configuration);
}
项目:kinesis-to-s3    文件:S3Pipeline.java   
@Override
public ITransformer<byte[], byte[]> getTransformer(KinesisConnectorConfiguration configuration) {
    return new ByteArrayNoopTransformer();
}
项目:kinesis-to-s3    文件:S3Pipeline.java   
@Override
public IFilter<byte[]> getFilter(KinesisConnectorConfiguration configuration) {
    return new AllPassFilter<>();
}
项目:aws-big-data-blog    文件:BatchedStreamSource.java   
public BatchedStreamSource(KinesisConnectorConfiguration config, String inputFile) {
    this(config, inputFile, false);
}
项目:aws-big-data-blog    文件:BatchedStreamSource.java   
public BatchedStreamSource(KinesisConnectorConfiguration config, String inputFile, boolean loopOverStreamSource) {
    super(config, inputFile, loopOverStreamSource);
    buffer = new ArrayList<KinesisMessageModel>();
}
项目:aws-big-data-blog    文件:HBasePipeline.java   
@Override
public IEmitter<Map<String,String>> getEmitter(KinesisConnectorConfiguration configuration) {
    return new HBaseEmitter((EMRHBaseKinesisConnectorConfiguration) configuration);
}
项目:aws-big-data-blog    文件:HBasePipeline.java   
@Override
public IBuffer<KinesisMessageModel> getBuffer(KinesisConnectorConfiguration configuration) {
    return new BasicMemoryBuffer<KinesisMessageModel>(configuration);
}
项目:aws-big-data-blog    文件:HBasePipeline.java   
@Override
public ITransformer<KinesisMessageModel, Map<String,String>> getTransformer(KinesisConnectorConfiguration configuration) {
    return new KinesisMessageModelHBaseTransformer();
}
项目:aws-big-data-blog    文件:HBasePipeline.java   
@Override
public IFilter<KinesisMessageModel> getFilter(KinesisConnectorConfiguration configuration) {
    return new AllPassFilter<KinesisMessageModel>();
}
项目:sumologic-kinesis-connector    文件:StreamSource.java   
/**
 * Creates a new StreamSource.
 * 
 * @param config
 *        Configuration to determine which stream to put records to and get {@link AWSCredentialsProvider}
 * @param inputFile
 *        File containing record data to emit on each line
 */
public StreamSource(KinesisConnectorConfiguration config, String inputFile) {
    this(config, inputFile, false);
}
项目:sumologic-kinesis-connector    文件:KinesisConnectorExecutorBase.java   
/**
 * Initialize the Amazon Kinesis Client Library configuration and worker
 * 
 * @param kinesisConnectorConfiguration Amazon Kinesis connector configuration
 */
protected void initialize(KinesisConnectorConfiguration kinesisConnectorConfiguration) {
    initialize(kinesisConnectorConfiguration, new NullMetricsFactory());
}
项目:aws-big-data-blog    文件:StreamSource.java   
/**
 * Creates a new StreamSource.
 * 
 * @param config
 *        Configuration to determine which stream to put records to and get {@link AWSCredentialsProvider}
 * @param inputFile
 *        File containing record data to emit on each line
 */
public StreamSource(KinesisConnectorConfiguration config, String inputFile) {
    this(config, inputFile, false);
}