IRecordProcessorFactory createRecordProcessorFactory(String stream) { return () -> { // This code is executed in Kinesis thread context. try { SystemStreamPartition ssp = sspAllocator.allocate(stream); KinesisRecordProcessor processor = new KinesisRecordProcessor(ssp, KinesisSystemConsumer.this); KinesisRecordProcessor prevProcessor = processors.put(ssp, processor); Validate.isTrue(prevProcessor == null, String.format("Adding new kinesis record processor %s while the" + " previous processor %s for the same ssp %s is still active.", processor, prevProcessor, ssp)); return processor; } catch (Exception e) { callbackException = e; // This exception is the result of kinesis dynamic shard splits due to which sspAllocator ran out of free ssps. // Set the failed state in consumer which will eventually result in stopping the container. A manual job restart // will be required at this point. After the job restart, the newly created shards will be discovered and enough // ssps will be added to sspAllocator freePool. throw new SamzaException(e); } }; }
private Map<String, KinesisRecordProcessor> createAndInitProcessors(IRecordProcessorFactory factory, int numShards) { Map<String, KinesisRecordProcessor> processorMap = new HashMap<>(); IntStream.range(0, numShards) .forEach(p -> { String shardId = String.format("shard-%05d", p); // Create Kinesis processor KinesisRecordProcessor processor = (KinesisRecordProcessor) factory.createProcessor(); // Initialize the shard ExtendedSequenceNumber seqNum = new ExtendedSequenceNumber("0000"); InitializationInput initializationInput = new InitializationInput().withShardId(shardId).withExtendedSequenceNumber(seqNum); processor.initialize(initializationInput); processorMap.put(shardId, processor); }); return processorMap; }
@Autowired public Consumer(@Value("${stream}") String streamName, @Value("${region}") String region, IRecordProcessorFactory zombieRecordFactory) { this.streamName = streamName; this.region = region; this.zombieRecordFactory = zombieRecordFactory; this.initKinesis(); }
private Worker createKinesisWorker(IRecordProcessorFactory recordProcessorFactory, int maxBatchSize) { KinesisClientLibConfiguration kclConfig = new KinesisClientLibConfiguration( conf.applicationName, conf.streamName, credentials, getWorkerId() ); kclConfig .withMaxRecords(maxBatchSize) .withCallProcessRecordsEvenForEmptyRecordList(false) .withIdleTimeBetweenReadsInMillis(conf.idleTimeBetweenReads) .withInitialPositionInStream(conf.initialPositionInStream) .withKinesisClientConfig(clientConfiguration); if (conf.initialPositionInStream == InitialPositionInStream.AT_TIMESTAMP) { kclConfig.withTimestampAtInitialPositionInStream(new Date(conf.initialTimestamp)); } if (conf.region == AWSRegions.OTHER) { kclConfig.withKinesisEndpoint(conf.endpoint); } else { kclConfig.withRegionName(conf.region.getLabel()); } return new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .metricsFactory(metricsFactory) .dynamoDBClient(dynamoDBClient) .cloudWatchClient(cloudWatchClient) .execService(executor) .config(kclConfig) .build(); }
/** * Helper to simulate and test the life-cycle of record processing from a kinesis stream with a given number of shards * 1. Creation of record processors. * 2. Initialization of record processors. * 3. Processing records via record processors. * 4. Calling checkpoint on record processors. * 5. Shutting down (due to re-assignment or lease expiration) record processors. */ private void testProcessRecordsHelper(String system, String stream, int numShards, int numRecordsPerShard) throws InterruptedException, ShutdownException, InvalidStateException, NoSuchFieldException, IllegalAccessException { KinesisConfig kConfig = new KinesisConfig(new MapConfig()); // Create consumer KinesisSystemConsumer consumer = new KinesisSystemConsumer(system, kConfig, new NoOpMetricsRegistry()); initializeMetrics(consumer, stream); List<SystemStreamPartition> ssps = new LinkedList<>(); IntStream.range(0, numShards) .forEach(p -> { SystemStreamPartition ssp = new SystemStreamPartition(system, stream, new Partition(p)); ssps.add(ssp); }); ssps.forEach(ssp -> consumer.register(ssp, SYSTEM_CONSUMER_REGISTER_OFFSET)); // Create Kinesis record processor factory IRecordProcessorFactory factory = consumer.createRecordProcessorFactory(stream); // Create and initialize Kinesis record processor Map<String, KinesisRecordProcessor> processorMap = createAndInitProcessors(factory, numShards); List<KinesisRecordProcessor> processorList = new ArrayList<>(processorMap.values()); // Generate records to Kinesis record processor Map<KinesisRecordProcessor, List<Record>> inputRecordMap = generateRecords(numRecordsPerShard, processorList); // Verification steps // Read events from the BEM queue Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages = readEvents(new HashSet<>(ssps), consumer, numRecordsPerShard); if (numRecordsPerShard > 0) { Assert.assertEquals(messages.size(), numShards); } else { // No input records and hence no messages Assert.assertEquals(messages.size(), 0); return; } Map<SystemStreamPartition, KinesisRecordProcessor> sspToProcessorMap = getProcessorMap(consumer); ssps.forEach(ssp -> { try { KinesisRecordProcessor processor = sspToProcessorMap.get(ssp); if (numRecordsPerShard > 0) { // Verify that the read messages are received in order and are the same as input records Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard); List<IncomingMessageEnvelope> envelopes = messages.get(ssp); List<Record> inputRecords = inputRecordMap.get(processor); verifyRecords(envelopes, inputRecords, processor.getShardId()); // Call checkpoint on consumer and verify that the checkpoint is called with the right offset IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1); consumer.onCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset())); ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class); verify(getCheckpointer(processor)).checkpoint(argument.capture()); Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue()); } // Call shutdown (with ZOMBIE reason) on processor and verify if shutdown freed the ssp mapping shutDownProcessor(processor, ShutdownReason.ZOMBIE); Assert.assertTrue(!sspToProcessorMap.containsValue(processor)); Assert.assertTrue(isSspAvailable(consumer, ssp)); } catch (NoSuchFieldException | IllegalAccessException | InvalidStateException | ShutdownException ex) { throw new RuntimeException(ex); } }); }