private Map<String, KinesisRecordProcessor> createAndInitProcessors(IRecordProcessorFactory factory, int numShards) { Map<String, KinesisRecordProcessor> processorMap = new HashMap<>(); IntStream.range(0, numShards) .forEach(p -> { String shardId = String.format("shard-%05d", p); // Create Kinesis processor KinesisRecordProcessor processor = (KinesisRecordProcessor) factory.createProcessor(); // Initialize the shard ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000"); InitializationInput initializationInput = new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(seqNum); processor.initialize(initializationInput); processorMap.put(shardId, processor); }); return processorMap; }
/** * Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance * (via processRecords). * * @param initializationInput Provides information related to initialization */ @Override public void initialize(InitializationInput initializationInput) { Validate.isTrue(listener != null, "There is no listener set for the processor."); initSeqNumber = initializationInput.getExtendedSequenceNumber(); shardId = initializationInput.getShardId(); LOG.info("Initialization done for {} with sequence {}", this, initializationInput.getExtendedSequenceNumber().getSequenceNumber()); }
/** * {@inheritDoc} */ @Override public void initialize(InitializationInput initializationInput) { shardId = initializationInput.getShardId(); if (LOG.isDebugEnabled()) { LOG.debug("Initializing record processor at: {}", initializationInput.getExtendedSequenceNumber().toString()); LOG.debug("Initializing record processor for shard: {}", shardId); } }
@Override public void initialize(InitializationInput initializationInput) { }
@Override public void initialize(InitializationInput initializationInput) { log.info("Processing from shard {} beginning with subsequence {}.", initializationInput.getShardId(), initializationInput.getExtendedSequenceNumber().getSubSequenceNumber()); }
@Override public void initialize(InitializationInput initializationInput) { LOG.info("Init RecordProcessor " + initializationInput.getShardId()); this.kinesisShardId = initializationInput.getShardId(); }