private Map<String, KinesisRecordProcessor> createAndInitProcessors(IRecordProcessorFactory factory, int numShards) { Map<String, KinesisRecordProcessor> processorMap = new HashMap<>(); IntStream.range(0, numShards) .forEach(p -> { String shardId = String.format("shard-%05d", p); // Create Kinesis processor KinesisRecordProcessor processor = (KinesisRecordProcessor) factory.createProcessor(); // Initialize the shard ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000"); InitializationInput initializationInput = new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(seqNum); processor.initialize(initializationInput); processorMap.put(shardId, processor); }); return processorMap; }
private ExtendedSequenceNumber extendedSequenceNumber() { String fullSequenceNumber = sequenceNumber; if (fullSequenceNumber == null) { fullSequenceNumber = shardIteratorType.toString(); } return new ExtendedSequenceNumber(fullSequenceNumber, subSequenceNumber); }
@Test public void testComparisonWithExtendedSequenceNumber() { assertThat(new ShardCheckpoint("", "", new StartingPoint(LATEST)).isBeforeOrAt( recordWith(new ExtendedSequenceNumber("100", 0L)) )).isTrue(); assertThat(new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON)).isBeforeOrAt( recordWith(new ExtendedSequenceNumber("100", 0L)) )).isTrue(); assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "10", 1L).isBeforeOrAt( recordWith(new ExtendedSequenceNumber("100", 0L)) )).isTrue(); assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt( recordWith(new ExtendedSequenceNumber("100", 0L)) )).isTrue(); assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt( recordWith(new ExtendedSequenceNumber("100", 0L)) )).isFalse(); assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 1L).isBeforeOrAt( recordWith(new ExtendedSequenceNumber("100", 0L)) )).isFalse(); assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt( recordWith(new ExtendedSequenceNumber("99", 1L)) )).isFalse(); }
/** * Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the * application. Upon fail over, the new instance will get records with sequence number greater than the checkpoint * position for each partition key. * * @param processRecordsInput Provides the records to be processed as well as information and capabilities related * to them (eg checkpointing). */ @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // KCL does not send any records to the processor that was shutdown. Validate.isTrue(!shutdownRequested, String.format("KCL returned records after shutdown is called on the processor %s.", this)); // KCL aways gives reference to the same checkpointer instance for a given processor instance. checkpointer = processRecordsInput.getCheckpointer(); List<Record> records = processRecordsInput.getRecords(); // Empty records are expected when KCL config has CallProcessRecordsEvenForEmptyRecordList set to true. if (!records.isEmpty()) { lastProcessedRecordSeqNumber = new ExtendedSequenceNumber(records.get(records.size() - 1).getSequenceNumber()); listener.onReceiveRecords(ssp, records, processRecordsInput.getMillisBehindLatest()); } }
public ExtendedSequenceNumber getExtendedSequenceNumber() { return new ExtendedSequenceNumber(getSequenceNumber(), getSubSequenceNumber()); }
private KinesisRecord recordWith(ExtendedSequenceNumber extendedSequenceNumber) { KinesisRecord record = mock(KinesisRecord.class); given(record.getExtendedSequenceNumber()).willReturn(extendedSequenceNumber); return record; }