public KinesisConnectorRecordProcessor(IBuffer<T> buffer, IFilter<T> filter, IEmitter<U> emitter, ITransformerBase<T, U> transformer, KinesisConnectorConfiguration configuration) { if (buffer == null || filter == null || emitter == null || transformer == null) { throw new IllegalArgumentException("buffer, filter, emitter, and transformer must not be null"); } this.buffer = buffer; this.filter = filter; this.emitter = emitter; this.transformer = transformer; // Limit must be greater than zero if (configuration.RETRY_LIMIT <= 0) { retryLimit = 1; } else { retryLimit = configuration.RETRY_LIMIT; } this.backoffInterval = configuration.BACKOFF_INTERVAL; }
@Override public IRecordProcessor createProcessor() { try { IBuffer<T> buffer = pipeline.getBuffer(configuration); IEmitter<U> emitter = pipeline.getEmitter(configuration); ITransformerBase<T, U> transformer = pipeline.getTransformer(configuration); IFilter<T> filter = pipeline.getFilter(configuration); KinesisConnectorRecordProcessor<T, U> processor = new KinesisConnectorRecordProcessor<T, U>(buffer, filter, emitter, transformer, configuration); return processor; } catch (Throwable t) { throw new RuntimeException(t); } }
@Override public IFilter<SimpleKinesisMessageModel> getFilter(KinesisConnectorConfiguration configuration) { return new AllPassFilter<SimpleKinesisMessageModel>(); }
@Override public IFilter<byte[]> getFilter(KinesisConnectorConfiguration configuration) { return new AllPassFilter<>(); }
@Override public IFilter<KinesisMessageModel> getFilter(KinesisConnectorConfiguration configuration) { return new AllPassFilter<KinesisMessageModel>(); }