@Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { LOG.info(String.format("Received %s Records", records.size())); // add a call to your business logic here! // // myLinkedClasses.doSomething(records) // // try { checkpointer.checkpoint(); } catch (KinesisClientLibDependencyException | InvalidStateException | ThrottlingException | ShutdownException e) { e.printStackTrace(); super.shutdown(checkpointer, ShutdownReason.ZOMBIE); } }
/** * {@inheritDoc} */ @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { LOG.info("Shutting down record processor for shard: " + kinesisShardId); // Important to checkpoint after reaching end of shard, so we can start // processing data from child shards. if (reason == ShutdownReason.TERMINATE) { try { checkpoint(checkpointer); } catch (Exception e) { e.printStackTrace(); } } }
@Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { LOG.info("Shutting down record processor with shardId: " + shardId + " with reason " + reason); if (isShutdown) { LOG.warn("Record processor for shardId: " + shardId + " has been shutdown multiple times."); return; } switch (reason) { case TERMINATE: emit(checkpointer, transformToOutput(buffer.getRecords())); try { checkpointer.checkpoint(); } catch (KinesisClientLibDependencyException | InvalidStateException | ThrottlingException | ShutdownException e) { LOG.error(e); } break; case ZOMBIE: break; default: throw new IllegalStateException("invalid shutdown reason"); } emitter.shutdown(); isShutdown = true; }
/** * {@inheritDoc} */ @Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { LOG.info("Aggregating " + records.size() + " records for Kinesis Shard " + kinesisShardId); try { // run data into the aggregator agg.aggregate(records); // checkpoint the aggregator and kcl agg.checkpoint(); checkpointer.checkpoint(records.get(records.size() - 1)); LOG.debug("Kinesis Checkpoint for Shard " + kinesisShardId + " Complete"); } catch (Exception e) { e.printStackTrace(); LOG.error(e); shutdown(checkpointer, ShutdownReason.ZOMBIE); } }
@Override @SneakyThrows public void shutdown(ShutdownInput shutdownInput) { IRecordProcessorCheckpointer checkpointer = shutdownInput.getCheckpointer(); ShutdownReason reason = shutdownInput.getShutdownReason(); log.info("Finalizado trabajo: {}.", reason); checkpointer.checkpoint(); }
@Override public void shutdown(IRecordProcessorCheckpointer iRecordProcessorCheckpointer, ShutdownReason shutdownReason) { if (producer != null) { producer.close(); } logger.info("Shutting down record processor for shard: " + shardId); // Important to checkpoint after reaching end of shard, so we can start processing data from child shards. if (shutdownReason == ShutdownReason.TERMINATE) { checkpoint(iRecordProcessorCheckpointer); } }
/** * {@inheritDoc} */ @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { // Important to checkpoint after reaching end of shard, // so we can start processing data from child shards. if (reason == ShutdownReason.TERMINATE) { performCheckpoint(checkpointer); } }
@Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { if(reason == ShutdownReason.TERMINATE) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }
/** * {@inheritDoc} */ @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { logger.info("Shutting down record processor for shard: " + kinesisShardId); // Important to checkpoint after reaching end of shard, so we can start processing data from child shards. if (reason == ShutdownReason.TERMINATE) { checkpoint(checkpointer); } }
/** * {@inheritDoc} */ public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { LOG.info("Shutting down record processor for shard: " + kinesisShardId); // Important to checkpoint after reaching end of shard, so we can start processing data from child shards. if (reason == ShutdownReason.TERMINATE) { checkpoint(checkpointer); } }
@Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { LOG.info("Shutting down record processor for shard: " + kinesisShardId); // Important to checkpoint after reaching end of shard, so we can start // processing data from child shards. if (reason == ShutdownReason.TERMINATE) { checkpoint(checkpointer); } }
/** * {@inheritDoc} */ @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { LOGGER.info("Shutting down record processor for shard: " + kinesisShardId); /** Important to checkpoint after reaching end of shard, so we can start processing data from child shards. **/ if (reason == ShutdownReason.TERMINATE) { checkpoint(checkpointer); } }
/** * {@inheritDoc} */ @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { LOG.info("Shutting down record processor for shard: " + kinesisShardId); // Important to checkpoint after reaching end of shard, so we can start processing data from child shards. if (reason == ShutdownReason.TERMINATE) { checkpoint(checkpointer); } }
@Override public void shutdown(IRecordProcessorCheckpointer arg0, ShutdownReason arg1) { }
@Override public void shutdown(IRecordProcessorCheckpointer irpc, ShutdownReason sr) { _logger.info("Shutting down record processor"); }