@Override public void shutdown(ShutdownInput input) { Thread.currentThread().setName(kinesisShardId); if (input.getShutdownReason() == ShutdownReason.ZOMBIE) { /* This happens because we have lost our lease. Either because of re-balancing * (there is another NomNom running on another host), or because we have failed * failed to renew our leases, which would happen if we are too busy. * * It happens when a new version of NomNom is deployed, since the two versions * will run side by side for a few seconds before the old one is terminated. And * the new one will try to take a few leases at startup. */ LOG.warn("We're a ZOMBIE - someone stole our lease. Quitting."); } else { /* This happens when a shard is split or merged, meaning that it stops existing * and we have other shards to process instead. Very rare. */ LOG.warn("Shard is shutting down, reason: {}", input.getShutdownReason()); try { input.getCheckpointer().checkpoint(); } catch (Exception e) { LOG.error("Failed to checkpoint after shard shutdown", e); } } LOG.info(""); }
/** * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this * RecordProcessor instance. * * @param shutdownInput Provides information and capabilities (eg checkpointing) related to shutdown of this record * processor. */ @Override public void shutdown(ShutdownInput shutdownInput) { LOG.info("Shutting down {} with reason:{}", this, shutdownInput.getShutdownReason()); Validate.isTrue(!shutdownRequested, String.format("KCL called shutdown more than once for processor %s.", this)); shutdownRequested = true; // shutdown reason TERMINATE indicates that the shard is closed due to re-sharding. It also indicates that all the // records from the shard have been delivered to the consumer and the consumer is expected to checkpoint the // progress. if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { // We need to ensure that all records are processed and checkpointed before going ahead and marking the processing // complete by calling checkpoint() on KCL. We need to checkpoint the completion state for parent shard, for KCL // to consume from the child shard(s). try { LOG.info("Waiting for all the records for {} to be processed.", this); // Let's poll periodically and block until the last processed record is checkpointed. Also handle the case // where there are no records received for the processor, in which case the lastProcessedRecordSeqNumber will // be null. while (lastProcessedRecordSeqNumber != null && !lastProcessedRecordSeqNumber.equals(lastCheckpointedRecordSeqNumber)) { Thread.sleep(POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS); } LOG.info("Final checkpoint for {} before shutting down.", this); shutdownInput.getCheckpointer().checkpoint(); } catch (Exception e) { LOG.warn("An error occurred while committing the final checkpoint in the parent shard {}", this, e); } } listener.onShutdown(ssp); }
static void shutDownProcessor(KinesisRecordProcessor processor, ShutdownReason reason) { try { ShutdownInput shutdownInput = Mockito.mock(ShutdownInput.class); when(shutdownInput.getShutdownReason()).thenReturn(reason); when(shutdownInput.getCheckpointer()).thenReturn(getCheckpointer(processor)); processor.shutdown(shutdownInput); } catch (Exception ex) { throw new RuntimeException(ex); } }
/** * We don't checkpoint on SHUTDOWN_REQUESTED because we currently always * checkpoint each batch in {@link #processRecords}. * * @param shutdownInput {@inheritDoc} */ @Override public void shutdown(ShutdownInput shutdownInput) { LOG.info("Shutting down record processor for shard: {}", shardId); if (ShutdownReason.TERMINATE.equals(shutdownInput.getShutdownReason())) { // Shard is closed / finished processing. Checkpoint all processing up to here. try { shutdownInput.getCheckpointer().checkpoint(); LOG.debug("Checkpointed due to record processor shutdown request."); } catch (InvalidStateException | ShutdownException e) { LOG.error("Error checkpointing batch: {}", e.toString(), e); } } }
@Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { }
/** * Helper to simulate and test the life-cycle of record processing from a kinesis stream with a given number of shards * 1. Creation of record processors. * 2. Initialization of record processors. * 3. Processing records via record processors. * 4. Calling checkpoint on record processors. * 5. Shutting down (due to re-assignment or lease expiration) record processors. */ private void testProcessRecordsHelper(String system, String stream, int numShards, int numRecordsPerShard) throws InterruptedException, ShutdownException, InvalidStateException, NoSuchFieldException, IllegalAccessException { KinesisConfig kConfig = new KinesisConfig(new MapConfig()); // Create consumer KinesisSystemConsumer consumer = new KinesisSystemConsumer(system, kConfig, new NoOpMetricsRegistry()); initializeMetrics(consumer, stream); List<SystemStreamPartition> ssps = new LinkedList<>(); IntStream.range(0, numShards) .forEach(p -> { SystemStreamPartition ssp = new SystemStreamPartition(system, stream, new Partition(p)); ssps.add(ssp); }); ssps.forEach(ssp -> consumer.register(ssp, SYSTEM_CONSUMER_REGISTER_OFFSET)); // Create Kinesis record processor factory IRecordProcessorFactory factory = consumer.createRecordProcessorFactory(stream); // Create and initialize Kinesis record processor Map<String, KinesisRecordProcessor> processorMap = createAndInitProcessors(factory, numShards); List<KinesisRecordProcessor> processorList = new ArrayList<>(processorMap.values()); // Generate records to Kinesis record processor Map<KinesisRecordProcessor, List<Record>> inputRecordMap = generateRecords(numRecordsPerShard, processorList); // Verification steps // Read events from the BEM queue Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages = readEvents(new HashSet<>(ssps), consumer, numRecordsPerShard); if (numRecordsPerShard > 0) { Assert.assertEquals(messages.size(), numShards); } else { // No input records and hence no messages Assert.assertEquals(messages.size(), 0); return; } Map<SystemStreamPartition, KinesisRecordProcessor> sspToProcessorMap = getProcessorMap(consumer); ssps.forEach(ssp -> { try { KinesisRecordProcessor processor = sspToProcessorMap.get(ssp); if (numRecordsPerShard > 0) { // Verify that the read messages are received in order and are the same as input records Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard); List<IncomingMessageEnvelope> envelopes = messages.get(ssp); List<Record> inputRecords = inputRecordMap.get(processor); verifyRecords(envelopes, inputRecords, processor.getShardId()); // Call checkpoint on consumer and verify that the checkpoint is called with the right offset IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1); consumer.onCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset())); ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class); verify(getCheckpointer(processor)).checkpoint(argument.capture()); Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue()); } // Call shutdown (with ZOMBIE reason) on processor and verify if shutdown freed the ssp mapping shutDownProcessor(processor, ShutdownReason.ZOMBIE); Assert.assertTrue(!sspToProcessorMap.containsValue(processor)); Assert.assertTrue(isSspAvailable(consumer, ssp)); } catch (NoSuchFieldException | IllegalAccessException | InvalidStateException | ShutdownException ex) { throw new RuntimeException(ex); } }); }