@Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { // Note: This method will be called even for empty record lists. This is needed for checking the buffer time // threshold. if (isShutdown) { LOG.warn("processRecords called on shutdown record processor for shardId: " + shardId); return; } if (shardId == null) { throw new IllegalStateException("Record processor not initialized"); } // Transform each Amazon Kinesis Record and add the result to the buffer for (Record record : records) { try { if (transformer instanceof ITransformer) { ITransformer<T, U> singleTransformer = (ITransformer<T, U>) transformer; filterAndBufferRecord(singleTransformer.toClass(record), record); } else if (transformer instanceof ICollectionTransformer) { ICollectionTransformer<T, U> listTransformer = (ICollectionTransformer<T, U>) transformer; Collection<T> transformedRecords = listTransformer.toClass(record); for (T transformedRecord : transformedRecords) { filterAndBufferRecord(transformedRecord, record); } } else { throw new RuntimeException("Transformer must implement ITransformer or ICollectionTransformer"); } } catch (IOException e) { LOG.error(e); } } if (buffer.shouldFlush()) { List<U> emitItems = transformToOutput(buffer.getRecords()); emit(checkpointer, emitItems); } }
@Override public ITransformer<byte[], byte[]> getTransformer(KinesisConnectorConfiguration configuration) { return new ByteArrayNoopTransformer(); }
@Override public ITransformer<KinesisMessageModel, Map<String,String>> getTransformer(KinesisConnectorConfiguration configuration) { return new KinesisMessageModelHBaseTransformer(); }