public KinesisConnectorRecordProcessor(IBuffer<T> buffer, IFilter<T> filter, IEmitter<U> emitter, ITransformerBase<T, U> transformer, KinesisConnectorConfiguration configuration) { if (buffer == null || filter == null || emitter == null || transformer == null) { throw new IllegalArgumentException("buffer, filter, emitter, and transformer must not be null"); } this.buffer = buffer; this.filter = filter; this.emitter = emitter; this.transformer = transformer; // Limit must be greater than zero if (configuration.RETRY_LIMIT <= 0) { retryLimit = 1; } else { retryLimit = configuration.RETRY_LIMIT; } this.backoffInterval = configuration.BACKOFF_INTERVAL; }
/** * Creates a new StreamSource. * * @param config * Configuration to determine which stream to put records to and get {@link AWSCredentialsProvider} * @param inputFile * File containing record data to emit on each line * @param loopOverStreamSource * Loop over the stream source to continually put records */ public StreamSource(KinesisConnectorConfiguration config, String inputFile, boolean loopOverStreamSource) { this.config = config; this.inputFile = inputFile; this.loopOverInputFile = loopOverStreamSource; this.objectMapper = new ObjectMapper(); kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER); kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME)); if (config.KINESIS_ENDPOINT != null) { kinesisClient.setEndpoint(config.KINESIS_ENDPOINT); } KinesisUtils.createInputStream(config); }
/** * Creates the Amazon Kinesis stream specified by config.KINESIS_INPUT_STREAM * * @param config * The configuration with the specified input stream name and {@link AWSCredentialsProvider} * @param shardCount * The shard count to create the stream with */ public static void createInputStream(KinesisConnectorConfiguration config) { AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER); kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME)); if (config.KINESIS_ENDPOINT != null) { kinesisClient.setEndpoint(config.KINESIS_ENDPOINT); } createAndWaitForStreamToBecomeAvailable(kinesisClient, config.KINESIS_INPUT_STREAM, config.KINESIS_INPUT_STREAM_SHARD_COUNT); }
/** * Creates the Amazon Kinesis stream specified by config.KINESIS_OUTPUT_STREAM. * * @param config * The configuration with the specified output stream name and {@link AWSCredentialsProvider} * @param shardCount * The shard count to create the stream with */ public static void createOutputStream(KinesisConnectorConfiguration config) { AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER); kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME)); if (config.KINESIS_ENDPOINT != null) { kinesisClient.setEndpoint(config.KINESIS_ENDPOINT); } createAndWaitForStreamToBecomeAvailable(kinesisClient, config.KINESIS_OUTPUT_STREAM, config.KINESIS_OUTPUT_STREAM_SHARD_COUNT); }
/** * Deletes the input stream specified by config.KINESIS_INPUT_STREAM * * @param config * The configuration containing the stream name and {@link AWSCredentialsProvider} */ public static void deleteInputStream(KinesisConnectorConfiguration config) { AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER); kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME)); if (config.KINESIS_ENDPOINT != null) { kinesisClient.setEndpoint(config.KINESIS_ENDPOINT); } deleteStream(kinesisClient, config.KINESIS_INPUT_STREAM); }
/** * Deletes the output stream specified by config.KINESIS_OUTPUT_STREAM * * @param config * The configuration containing the stream name and {@link AWSCredentialsProvider} */ public static void deleteOutputStream(KinesisConnectorConfiguration config) { AmazonKinesisClient kinesisClient = new AmazonKinesisClient(config.AWS_CREDENTIALS_PROVIDER); kinesisClient.setRegion(RegionUtils.getRegion(config.REGION_NAME)); if (config.KINESIS_ENDPOINT != null) { kinesisClient.setEndpoint(config.KINESIS_ENDPOINT); } deleteStream(kinesisClient, config.KINESIS_OUTPUT_STREAM); }
public BatchedStreamSource(KinesisConnectorConfiguration config, String inputFile) { this(config, inputFile, false); }
public BatchedStreamSource(KinesisConnectorConfiguration config, String inputFile, boolean loopOverStreamSource) { super(config, inputFile, loopOverStreamSource); buffer = new ArrayList<SimpleKinesisMessageModel>(); }
public KinesisConnectorRecordProcessorFactory(IKinesisConnectorPipeline<T, U> pipeline, KinesisConnectorConfiguration configuration) { this.configuration = configuration; this.pipeline = pipeline; }
@Override public IEmitter<String> getEmitter(KinesisConnectorConfiguration configuration) { return new SumologicEmitter(configuration); }
@Override public IBuffer<SimpleKinesisMessageModel> getBuffer(KinesisConnectorConfiguration configuration) { return new BasicMemoryBuffer<SimpleKinesisMessageModel>(configuration); }
@Override public IFilter<SimpleKinesisMessageModel> getFilter(KinesisConnectorConfiguration configuration) { return new AllPassFilter<SimpleKinesisMessageModel>(); }
public SumologicEmitter(KinesisConnectorConfiguration configuration) { this.config = (KinesisConnectorForSumologicConfiguration) configuration; sender = new SumologicSender(this.config.SUMOLOGIC_URL); batchSize = this.config.BUFFER_RECORD_COUNT_LIMIT; }
public KinesisToS3(String configurationFile) throws IOException { final Properties properties = readProperties(configurationFile); configuration = new KinesisConnectorConfiguration(properties, new DefaultAWSCredentialsProviderChain()); initialize(configuration); }
@Override public IEmitter<byte[]> getEmitter(KinesisConnectorConfiguration configuration) { return new S3Emitter(configuration); }
@Override public IBuffer<byte[]> getBuffer(KinesisConnectorConfiguration configuration) { return new BasicMemoryBuffer<>(configuration); }
@Override public ITransformer<byte[], byte[]> getTransformer(KinesisConnectorConfiguration configuration) { return new ByteArrayNoopTransformer(); }
@Override public IFilter<byte[]> getFilter(KinesisConnectorConfiguration configuration) { return new AllPassFilter<>(); }
public BatchedStreamSource(KinesisConnectorConfiguration config, String inputFile, boolean loopOverStreamSource) { super(config, inputFile, loopOverStreamSource); buffer = new ArrayList<KinesisMessageModel>(); }
@Override public IEmitter<Map<String,String>> getEmitter(KinesisConnectorConfiguration configuration) { return new HBaseEmitter((EMRHBaseKinesisConnectorConfiguration) configuration); }
@Override public IBuffer<KinesisMessageModel> getBuffer(KinesisConnectorConfiguration configuration) { return new BasicMemoryBuffer<KinesisMessageModel>(configuration); }
@Override public ITransformer<KinesisMessageModel, Map<String,String>> getTransformer(KinesisConnectorConfiguration configuration) { return new KinesisMessageModelHBaseTransformer(); }
@Override public IFilter<KinesisMessageModel> getFilter(KinesisConnectorConfiguration configuration) { return new AllPassFilter<KinesisMessageModel>(); }
/** * Creates a new StreamSource. * * @param config * Configuration to determine which stream to put records to and get {@link AWSCredentialsProvider} * @param inputFile * File containing record data to emit on each line */ public StreamSource(KinesisConnectorConfiguration config, String inputFile) { this(config, inputFile, false); }
/** * Initialize the Amazon Kinesis Client Library configuration and worker * * @param kinesisConnectorConfiguration Amazon Kinesis connector configuration */ protected void initialize(KinesisConnectorConfiguration kinesisConnectorConfiguration) { initialize(kinesisConnectorConfiguration, new NullMetricsFactory()); }