@Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { LOG.info(String.format("Received %s Records", records.size())); // add a call to your business logic here! // // myLinkedClasses.doSomething(records) // // try { checkpointer.checkpoint(); } catch (KinesisClientLibDependencyException | InvalidStateException | ThrottlingException | ShutdownException e) { e.printStackTrace(); super.shutdown(checkpointer, ShutdownReason.ZOMBIE); } }
static Map<KinesisRecordProcessor, List<Record>> generateRecords(int numRecordsPerShard, List<KinesisRecordProcessor> processors) throws ShutdownException, InvalidStateException { Map<KinesisRecordProcessor, List<Record>> processorRecordMap = new HashMap<>(); processors.forEach(processor -> { try { // Create records and call process records IRecordProcessorCheckpointer checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class); doNothing().when(checkpointer).checkpoint(anyString()); doNothing().when(checkpointer).checkpoint(); ProcessRecordsInput processRecordsInput = Mockito.mock(ProcessRecordsInput.class); when(processRecordsInput.getCheckpointer()).thenReturn(checkpointer); when(processRecordsInput.getMillisBehindLatest()).thenReturn(1000L); List<Record> inputRecords = createRecords(numRecordsPerShard); processorRecordMap.put(processor, inputRecords); when(processRecordsInput.getRecords()).thenReturn(inputRecords); processor.processRecords(processRecordsInput); } catch (ShutdownException | InvalidStateException ex) { throw new RuntimeException(ex); } }); return processorRecordMap; }
@Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { LOG.info("Shutting down record processor with shardId: " + shardId + " with reason " + reason); if (isShutdown) { LOG.warn("Record processor for shardId: " + shardId + " has been shutdown multiple times."); return; } switch (reason) { case TERMINATE: emit(checkpointer, transformToOutput(buffer.getRecords())); try { checkpointer.checkpoint(); } catch (KinesisClientLibDependencyException | InvalidStateException | ThrottlingException | ShutdownException e) { LOG.error(e); } break; case ZOMBIE: break; default: throw new IllegalStateException("invalid shutdown reason"); } emitter.shutdown(); isShutdown = true; }
@Override public void processRecords(List<Record> list, IRecordProcessorCheckpointer irpc) { _logger.info("Processing {} records", list.size()); for(Record r: list){ String data = new String(r.getData().array()); long seq = _buffer.next(); KinesisEvent evt = _buffer.get(seq); evt.setData(data); _buffer.publish(seq); } try{ irpc.checkpoint(); } catch(InvalidStateException | KinesisClientLibDependencyException | ShutdownException | ThrottlingException ex){ _logger.warn("Exception while checkpointing", ex); } }
@Test public void testProcessRecords() throws InterruptedException, ShutdownException, InvalidStateException, NoSuchFieldException, IllegalAccessException { String system = "kinesis"; String stream = "stream"; int numShards = 2; int numRecordsPerShard = 5; testProcessRecordsHelper(system, stream, numShards, numRecordsPerShard); }
@Test public void testProcessRecordsWithEmptyRecordList() throws InterruptedException, ShutdownException, InvalidStateException, NoSuchFieldException, IllegalAccessException { String system = "kinesis"; String stream = "stream"; int numShards = 1; int numRecordsPerShard = 0; testProcessRecordsHelper(system, stream, numShards, numRecordsPerShard); }
private void finishBatch(IRecordProcessorCheckpointer checkpointer, Record checkpointRecord) { try { if (!context.processBatch(batchContext, shardId, KinesisUtil.createKinesisRecordId(shardId, checkpointRecord))) { throw Throwables.propagate(new StageException(Errors.KINESIS_04)); } // Checkpoint iff batch processing succeeded checkpointer.checkpoint(checkpointRecord); if (LOG.isDebugEnabled()) { LOG.debug("Checkpointed batch at record {}", checkpointRecord.toString()); } } catch (InvalidStateException | ShutdownException e) { LOG.error("Error checkpointing batch: {}", e.toString(), e); } }
/** * We don't checkpoint on SHUTDOWN_REQUESTED because we currently always * checkpoint each batch in {@link #processRecords}. * * @param shutdownInput {@inheritDoc} */ @Override public void shutdown(ShutdownInput shutdownInput) { LOG.info("Shutting down record processor for shard: {}", shardId); if (ShutdownReason.TERMINATE.equals(shutdownInput.getShutdownReason())) { // Shard is closed / finished processing. Checkpoint all processing up to here. try { shutdownInput.getCheckpointer().checkpoint(); LOG.debug("Checkpointed due to record processor shutdown request."); } catch (InvalidStateException | ShutdownException e) { LOG.error("Error checkpointing batch: {}", e.toString(), e); } } }
/** * Helper to simulate and test the life-cycle of record processing from a kinesis stream with a given number of shards * 1. Creation of record processors. * 2. Initialization of record processors. * 3. Processing records via record processors. * 4. Calling checkpoint on record processors. * 5. Shutting down (due to re-assignment or lease expiration) record processors. */ private void testProcessRecordsHelper(String system, String stream, int numShards, int numRecordsPerShard) throws InterruptedException, ShutdownException, InvalidStateException, NoSuchFieldException, IllegalAccessException { KinesisConfig kConfig = new KinesisConfig(new MapConfig()); // Create consumer KinesisSystemConsumer consumer = new KinesisSystemConsumer(system, kConfig, new NoOpMetricsRegistry()); initializeMetrics(consumer, stream); List<SystemStreamPartition> ssps = new LinkedList<>(); IntStream.range(0, numShards) .forEach(p -> { SystemStreamPartition ssp = new SystemStreamPartition(system, stream, new Partition(p)); ssps.add(ssp); }); ssps.forEach(ssp -> consumer.register(ssp, SYSTEM_CONSUMER_REGISTER_OFFSET)); // Create Kinesis record processor factory IRecordProcessorFactory factory = consumer.createRecordProcessorFactory(stream); // Create and initialize Kinesis record processor Map<String, KinesisRecordProcessor> processorMap = createAndInitProcessors(factory, numShards); List<KinesisRecordProcessor> processorList = new ArrayList<>(processorMap.values()); // Generate records to Kinesis record processor Map<KinesisRecordProcessor, List<Record>> inputRecordMap = generateRecords(numRecordsPerShard, processorList); // Verification steps // Read events from the BEM queue Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages = readEvents(new HashSet<>(ssps), consumer, numRecordsPerShard); if (numRecordsPerShard > 0) { Assert.assertEquals(messages.size(), numShards); } else { // No input records and hence no messages Assert.assertEquals(messages.size(), 0); return; } Map<SystemStreamPartition, KinesisRecordProcessor> sspToProcessorMap = getProcessorMap(consumer); ssps.forEach(ssp -> { try { KinesisRecordProcessor processor = sspToProcessorMap.get(ssp); if (numRecordsPerShard > 0) { // Verify that the read messages are received in order and are the same as input records Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard); List<IncomingMessageEnvelope> envelopes = messages.get(ssp); List<Record> inputRecords = inputRecordMap.get(processor); verifyRecords(envelopes, inputRecords, processor.getShardId()); // Call checkpoint on consumer and verify that the checkpoint is called with the right offset IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1); consumer.onCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset())); ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class); verify(getCheckpointer(processor)).checkpoint(argument.capture()); Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue()); } // Call shutdown (with ZOMBIE reason) on processor and verify if shutdown freed the ssp mapping shutDownProcessor(processor, ShutdownReason.ZOMBIE); Assert.assertTrue(!sspToProcessorMap.containsValue(processor)); Assert.assertTrue(isSspAvailable(consumer, ssp)); } catch (NoSuchFieldException | IllegalAccessException | InvalidStateException | ShutdownException ex) { throw new RuntimeException(ex); } }); }
@Test public void testLifeCycleWithEvents() throws InterruptedException, ShutdownException, InvalidStateException, NoSuchFieldException, IllegalAccessException { testLifeCycleHelper(5); }
@Test public void testLifeCycleWithNoEvents() throws InterruptedException, ShutdownException, InvalidStateException, NoSuchFieldException, IllegalAccessException { testLifeCycleHelper(0); }
@Test public void testShutdownDuringReshardWithEvents() throws InterruptedException, ShutdownException, InvalidStateException, NoSuchFieldException, IllegalAccessException { testShutdownDuringReshardHelper(5); }
@Test public void testShutdownDuringReshardWithNoEvents() throws InterruptedException, ShutdownException, InvalidStateException, NoSuchFieldException, IllegalAccessException { testShutdownDuringReshardHelper(0); }