@Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { LOG.info(String.format("Received %s Records", records.size())); // add a call to your business logic here! // // myLinkedClasses.doSomething(records) // // try { checkpointer.checkpoint(); } catch (KinesisClientLibDependencyException | InvalidStateException | ThrottlingException | ShutdownException e) { e.printStackTrace(); super.shutdown(checkpointer, ShutdownReason.ZOMBIE); } }
@Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { LOG.info("Shutting down record processor with shardId: " + shardId + " with reason " + reason); if (isShutdown) { LOG.warn("Record processor for shardId: " + shardId + " has been shutdown multiple times."); return; } switch (reason) { case TERMINATE: emit(checkpointer, transformToOutput(buffer.getRecords())); try { checkpointer.checkpoint(); } catch (KinesisClientLibDependencyException | InvalidStateException | ThrottlingException | ShutdownException e) { LOG.error(e); } break; case ZOMBIE: break; default: throw new IllegalStateException("invalid shutdown reason"); } emitter.shutdown(); isShutdown = true; }
@Override public void processRecords(List<Record> list, IRecordProcessorCheckpointer irpc) { _logger.info("Processing {} records", list.size()); for(Record r: list){ String data = new String(r.getData().array()); long seq = _buffer.next(); KinesisEvent evt = _buffer.get(seq); evt.setData(data); _buffer.publish(seq); } try{ irpc.checkpoint(); } catch(InvalidStateException | KinesisClientLibDependencyException | ShutdownException | ThrottlingException ex){ _logger.warn("Exception while checkpointing", ex); } }