@Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { LOG.info(String.format("Received %s Records", records.size())); // add a call to your business logic here! // // myLinkedClasses.doSomething(records) // // try { checkpointer.checkpoint(); } catch (KinesisClientLibDependencyException | InvalidStateException | ThrottlingException | ShutdownException e) { e.printStackTrace(); super.shutdown(checkpointer, ShutdownReason.ZOMBIE); } }
/** * {@inheritDoc} */ @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { LOG.info("Shutting down record processor for shard: " + kinesisShardId); // Important to checkpoint after reaching end of shard, so we can start // processing data from child shards. if (reason == ShutdownReason.TERMINATE) { try { checkpoint(checkpointer); } catch (Exception e) { e.printStackTrace(); } } }
@SneakyThrows @Override public void processRecords(ProcessRecordsInput processRecordsInput) { List<Record> records = processRecordsInput.getRecords(); // Used to update the last processed record IRecordProcessorCheckpointer checkpointer = processRecordsInput.getCheckpointer(); log.info("Recovering records from kinesis."); for (Record r : records) { try { int len = r.getData().remaining(); byte[] buffer = new byte[len]; r.getData().get(buffer); String json = new String(buffer, "UTF-8"); ZombieLecture lecture = mapper.readValue(json, ZombieLecture.class); this.processZombieLecture(lecture); log.debug(processedRecords++ + ": " + json); if (processedRecords % 1000 == 999) { // Uncomment next line to keep track of the processed lectures. checkpointer.checkpoint(); } } catch (UnsupportedEncodingException | MessagingException ex) { log.warn(ex.getMessage()); } } }
static Map<KinesisRecordProcessor, List<Record>> generateRecords(int numRecordsPerShard, List<KinesisRecordProcessor> processors) throws ShutdownException, InvalidStateException { Map<KinesisRecordProcessor, List<Record>> processorRecordMap = new HashMap<>(); processors.forEach(processor -> { try { // Create records and call process records IRecordProcessorCheckpointer checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class); doNothing().when(checkpointer).checkpoint(anyString()); doNothing().when(checkpointer).checkpoint(); ProcessRecordsInput processRecordsInput = Mockito.mock(ProcessRecordsInput.class); when(processRecordsInput.getCheckpointer()).thenReturn(checkpointer); when(processRecordsInput.getMillisBehindLatest()).thenReturn(1000L); List<Record> inputRecords = createRecords(numRecordsPerShard); processorRecordMap.put(processor, inputRecords); when(processRecordsInput.getRecords()).thenReturn(inputRecords); processor.processRecords(processRecordsInput); } catch (ShutdownException | InvalidStateException ex) { throw new RuntimeException(ex); } }); return processorRecordMap; }
@Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { LOG.info("Shutting down record processor with shardId: " + shardId + " with reason " + reason); if (isShutdown) { LOG.warn("Record processor for shardId: " + shardId + " has been shutdown multiple times."); return; } switch (reason) { case TERMINATE: emit(checkpointer, transformToOutput(buffer.getRecords())); try { checkpointer.checkpoint(); } catch (KinesisClientLibDependencyException | InvalidStateException | ThrottlingException | ShutdownException e) { LOG.error(e); } break; case ZOMBIE: break; default: throw new IllegalStateException("invalid shutdown reason"); } emitter.shutdown(); isShutdown = true; }
@Override public void processRecords(List<Record> arg0, IRecordProcessorCheckpointer arg1) { counter += arg0.size(); if (counter > target) { System.out.println("Received : " + counter + " records"); target += target; } Record rec; for(int i = 0; i < arg0.size(); i++) { rec = arg0.get(i); try { verifyRecord(rec.getData()); } catch (JSONException | UnsupportedEncodingException e) { e.printStackTrace(); } } }
/** * {@inheritDoc} */ @Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { LOG.info("Aggregating " + records.size() + " records for Kinesis Shard " + kinesisShardId); try { // run data into the aggregator agg.aggregate(records); // checkpoint the aggregator and kcl agg.checkpoint(); checkpointer.checkpoint(records.get(records.size() - 1)); LOG.debug("Kinesis Checkpoint for Shard " + kinesisShardId + " Complete"); } catch (Exception e) { e.printStackTrace(); LOG.error(e); shutdown(checkpointer, ShutdownReason.ZOMBIE); } }
@Override public void processRecords(List<Record> list, IRecordProcessorCheckpointer irpc) { _logger.info("Processing {} records", list.size()); for(Record r: list){ String data = new String(r.getData().array()); long seq = _buffer.next(); KinesisEvent evt = _buffer.get(seq); evt.setData(data); _buffer.publish(seq); } try{ irpc.checkpoint(); } catch(InvalidStateException | KinesisClientLibDependencyException | ShutdownException | ThrottlingException ex){ _logger.warn("Exception while checkpointing", ex); } }
@Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { records.forEach(record -> { byte[] bytes = new byte[record.getData().remaining()]; record.getData().get(bytes); System.out.write(bytes, 0, bytes.length); System.out.println(); }); }
@Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { records.forEach(record -> { try { byte[] bytes = new byte[record.getData().remaining()]; record.getData().get(bytes); System.out.println(mapper.writeValueAsString(mapper.readTree(bytes))); } catch (IOException e) { logger.error("", e); } }); }
@Override @SneakyThrows public void shutdown(ShutdownInput shutdownInput) { IRecordProcessorCheckpointer checkpointer = shutdownInput.getCheckpointer(); ShutdownReason reason = shutdownInput.getShutdownReason(); log.info("Finalizado trabajo: {}.", reason); checkpointer.checkpoint(); }
@Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer iRecordProcessorCheckpointer) { try { logger.info("Start processing records"); processRecordsWithRetries(records); if (System.currentTimeMillis() > nextCheckpointTimeInMillis) { checkpoint(iRecordProcessorCheckpointer); nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS; } } catch (InvalidProtocolBufferException exc) { logger.error(exc); } }
@Override public void shutdown(IRecordProcessorCheckpointer iRecordProcessorCheckpointer, ShutdownReason shutdownReason) { if (producer != null) { producer.close(); } logger.info("Shutting down record processor for shard: " + shardId); // Important to checkpoint after reaching end of shard, so we can start processing data from child shards. if (shutdownReason == ShutdownReason.TERMINATE) { checkpoint(iRecordProcessorCheckpointer); } }
@Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { if (!receiver.isStopped()) { // this processor works in tandem with the Spark Streaming receiver handleRecords(records); checkpointIfNeeded(checkpointer); } }
/** * {@inheritDoc} */ @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { // Important to checkpoint after reaching end of shard, // so we can start processing data from child shards. if (reason == ShutdownReason.TERMINATE) { performCheckpoint(checkpointer); } }
private void performCheckpoint(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (Exception ex) { System.out.println("Sky is falling! Why? " + ex); } }
@Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { for (Record record : records) { producer.send(messageFactory.createMessage(record, topic)); if(++checkpointCounter % checkpointFrequency == 0) { try { checkpointer.checkpoint(); } catch(Exception e) { e.printStackTrace(); } } } }
@Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { if(reason == ShutdownReason.TERMINATE) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }
@Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { // Note: This method will be called even for empty record lists. This is needed for checking the buffer time // threshold. if (isShutdown) { LOG.warn("processRecords called on shutdown record processor for shardId: " + shardId); return; } if (shardId == null) { throw new IllegalStateException("Record processor not initialized"); } // Transform each Amazon Kinesis Record and add the result to the buffer for (Record record : records) { try { if (transformer instanceof ITransformer) { ITransformer<T, U> singleTransformer = (ITransformer<T, U>) transformer; filterAndBufferRecord(singleTransformer.toClass(record), record); } else if (transformer instanceof ICollectionTransformer) { ICollectionTransformer<T, U> listTransformer = (ICollectionTransformer<T, U>) transformer; Collection<T> transformedRecords = listTransformer.toClass(record); for (T transformedRecord : transformedRecords) { filterAndBufferRecord(transformedRecord, record); } } else { throw new RuntimeException("Transformer must implement ITransformer or ICollectionTransformer"); } } catch (IOException e) { LOG.error(e); } } if (buffer.shouldFlush()) { List<U> emitItems = transformToOutput(buffer.getRecords()); emit(checkpointer, emitItems); } }
private void finishBatch(IRecordProcessorCheckpointer checkpointer, Record checkpointRecord) { try { if (!context.processBatch(batchContext, shardId, KinesisUtil.createKinesisRecordId(shardId, checkpointRecord))) { throw Throwables.propagate(new StageException(Errors.KINESIS_04)); } // Checkpoint iff batch processing succeeded checkpointer.checkpoint(checkpointRecord); if (LOG.isDebugEnabled()) { LOG.debug("Checkpointed batch at record {}", checkpointRecord.toString()); } } catch (InvalidStateException | ShutdownException e) { LOG.error("Error checkpointing batch: {}", e.toString(), e); } }
@Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { if (logger.isDebugEnabled()) { logger.debug("Processing " + records.size() + " records from " + kinesisShardId); } // Process records and perform all exception handling. processRecordsWithRetries(records); // Checkpoint once every checkpoint interval. if (System.currentTimeMillis() > nextCheckpointTimeInMillis) { checkpoint(checkpointer); nextCheckpointTimeInMillis = System.currentTimeMillis() + checkpointIntervalMillis; } }
/** * {@inheritDoc} */ @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { logger.info("Shutting down record processor for shard: " + kinesisShardId); // Important to checkpoint after reaching end of shard, so we can start processing data from child shards. if (reason == ShutdownReason.TERMINATE) { checkpoint(checkpointer); } }
/** * {@inheritDoc} */ public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { LOG.info("Processing " + records.size() + " records from " + kinesisShardId); // Process records and perform all exception handling. processRecordsWithRetries(records); // Checkpoint once every checkpoint interval. if (System.currentTimeMillis() > nextCheckpointTimeInMillis) { checkpoint(checkpointer); nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS; } }
/** * {@inheritDoc} */ public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { LOG.info("Shutting down record processor for shard: " + kinesisShardId); // Important to checkpoint after reaching end of shard, so we can start processing data from child shards. if (reason == ShutdownReason.TERMINATE) { checkpoint(checkpointer); } }
@Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { LOG.info("Processing " + records.size() + " records from " + kinesisShardId); // Process records and perform all exception handling. try { processRecordsWithRetries(records); checkpoint(checkpointer); } catch (Exception e) { System.err.println("Unhandled Exception while processing record set. Shutdown"); } }
@Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { LOG.info("Shutting down record processor for shard: " + kinesisShardId); // Important to checkpoint after reaching end of shard, so we can start // processing data from child shards. if (reason == ShutdownReason.TERMINATE) { checkpoint(checkpointer); } }
/** * {@inheritDoc} */ @Override public void processRecords(final List<Record> records, final IRecordProcessorCheckpointer checkpointer) { LOGGER.info("Processing {} records from ", records.size(), kinesisShardId); this.processRecordsWithRetries(records); if (System.currentTimeMillis() > nextCheckpointTimeInMillis) { this.checkpoint(checkpointer); this.nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS; } }
/** * {@inheritDoc} */ @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { LOGGER.info("Shutting down record processor for shard: " + kinesisShardId); /** Important to checkpoint after reaching end of shard, so we can start processing data from child shards. **/ if (reason == ShutdownReason.TERMINATE) { checkpoint(checkpointer); } }
@Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { for(Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if(record instanceof RecordAdapter) { com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record).getInternalObject(); switch(streamRecord.getEventName()) { case "INSERT" : case "MODIFY" : StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE" : StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if(checkpointCounter % 10 == 0) { try { checkpointer.checkpoint(); } catch(Exception e) { e.printStackTrace(); } } } }
/** * {@inheritDoc} */ @Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { LOG.info("Processing " + records.size() + " records for kinesisShardId " + kinesisShardId); // Process records and perform all exception handling. processRecordsWithRetries(records); // Checkpoint once every checkpoint interval. if (System.currentTimeMillis() > nextCheckpointTimeInMillis) { checkpoint(checkpointer); nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS; // get the last minutes tweets, max of 30 long start = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(30); try { if(persistentStore !=null) { ScanResult scanResult = persistentStore.getSince(start, 30); if(scanResult!=null) { System.out.println( "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++" ); System.out.println( "Last 30 mins of Tweets, max of 30" ); System.out.println( "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++" ); for (Map<String, AttributeValue> item : scanResult.getItems()) { printItem(item); } System.out.println( "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++" ); } } } catch(Exception e) { LOG.error("Error retrieving tweets.",e); } } }
/** * {@inheritDoc} */ @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { LOG.info("Shutting down record processor for shard: " + kinesisShardId); // Important to checkpoint after reaching end of shard, so we can start processing data from child shards. if (reason == ShutdownReason.TERMINATE) { checkpoint(checkpointer); } }
@Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { }
/** * {@inheritDoc} */ @Override public abstract void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer);
private void checkpointIfNeeded(IRecordProcessorCheckpointer checkpointer) { if (checkpointState.shouldCheckpoint()) { performCheckpoint(checkpointer); checkpointState.advanceCheckpoint(); } }
static IRecordProcessorCheckpointer getCheckpointer(KinesisRecordProcessor processor) throws NoSuchFieldException, IllegalAccessException { Field f = processor.getClass().getDeclaredField("checkpointer"); f.setAccessible(true); return (IRecordProcessorCheckpointer) f.get(processor); }