@Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { LOG.info(String.format("Received %s Records", records.size())); // add a call to your business logic here! // // myLinkedClasses.doSomething(records) // // try { checkpointer.checkpoint(); } catch (KinesisClientLibDependencyException | InvalidStateException | ThrottlingException | ShutdownException e) { e.printStackTrace(); super.shutdown(checkpointer, ShutdownReason.ZOMBIE); } }
private KinesisWrapper(final InternalEvent internal) { KinesisEventRecord eventRecord = ((KinesisInternalEvent) internal).getRecord(); Record record = eventRecord.getKinesis(); this.partitionKey = record.getPartitionKey(); this.sequenceNumber = record.getSequenceNumber(); this.eventSource = eventRecord.getEventSource(); this.sourceArn = eventRecord.getEventSourceARN(); this.functionName = internal.getCtx().getFunctionName(); this.functionVersion = internal.getCtx().getFunctionVersion(); this.processingTime = System.currentTimeMillis(); this.arrivalTime = record.getApproximateArrivalTimestamp().getTime(); this.timestamp = internal.getEventTime(); this.processingDelay = processingTime - timestamp; if (internal.getEventObj() != null) { this.payload = internal.getEventObj().getPayload(); } else { this.payload = null; } }
@SneakyThrows @Override public void processRecords(ProcessRecordsInput processRecordsInput) { List<Record> records = processRecordsInput.getRecords(); // Used to update the last processed record IRecordProcessorCheckpointer checkpointer = processRecordsInput.getCheckpointer(); log.info("Recovering records from kinesis."); for (Record r : records) { try { int len = r.getData().remaining(); byte[] buffer = new byte[len]; r.getData().get(buffer); String json = new String(buffer, "UTF-8"); ZombieLecture lecture = mapper.readValue(json, ZombieLecture.class); this.processZombieLecture(lecture); log.debug(processedRecords++ + ": " + json); if (processedRecords % 1000 == 999) { // Uncomment next line to keep track of the processed lectures. checkpointer.checkpoint(); } } catch (UnsupportedEncodingException | MessagingException ex) { log.warn(ex.getMessage()); } } }
@Override public AmazonKinesis getKinesisClient() { return new AmazonKinesisMock(transform(shardedData, new Function<List<TestData>, List<Record>>() { @Override public List<Record> apply(@Nullable List<TestData> testDatas) { return transform(testDatas, new Function<TestData, Record>() { @Override public Record apply(@Nullable TestData testData) { return testData.convertToRecord(); } }); } }), numberOfRecordsPerGet); }
@Test public void recordsAreSentToTheProcessor() throws Exception { when(kinesisClient.getRecords(any(GetRecordsRequest.class))) .thenReturn(new GetRecordsResult() .withNextShardIterator("nextShardIterator") .withRecords(new Record().withSequenceNumber("1"), new Record().withSequenceNumber("2")) ); int messageCount = undertest.poll(); assertThat(messageCount, is(2)); final ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); verify(processor, times(2)).process(exchangeCaptor.capture(), any(AsyncCallback.class)); assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getSequenceNumber(), is("1")); assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getSequenceNumber(), is("2")); }
@Test public void exchangePropertiesAreSet() throws Exception { String partitionKey = "partitionKey"; String sequenceNumber = "1"; when(kinesisClient.getRecords(any(GetRecordsRequest.class))) .thenReturn(new GetRecordsResult() .withNextShardIterator("nextShardIterator") .withRecords(new Record() .withSequenceNumber(sequenceNumber) .withApproximateArrivalTimestamp(new Date(42)) .withPartitionKey(partitionKey) ) ); undertest.poll(); final ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); verify(processor).process(exchangeCaptor.capture(), any(AsyncCallback.class)); assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.APPROX_ARRIVAL_TIME, long.class), is(42L)); assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.PARTITION_KEY, String.class), is(partitionKey)); assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.SEQUENCE_NUMBER, String.class), is(sequenceNumber)); }
/** * Implement InputOperator Interface. */ @Override public void emitTuples() { if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) { return; } int count = consumer.getQueueSize(); if (maxTuplesPerWindow > 0) { count = Math.min(count, maxTuplesPerWindow - emitCount); } for (int i = 0; i < count; i++) { Pair<String, Record> data = consumer.pollRecord(); String shardId = data.getFirst(); String recordId = data.getSecond().getSequenceNumber(); emitTuple(data); MutablePair<String, Integer> shardOffsetAndCount = currentWindowRecoveryState.get(shardId); if (shardOffsetAndCount == null) { currentWindowRecoveryState.put(shardId, new MutablePair<String, Integer>(recordId, 1)); } else { shardOffsetAndCount.setRight(shardOffsetAndCount.right + 1); } shardPosition.put(shardId, recordId); } emitCount += count; }
public String processNextIterator(String iterator) { GetRecordsRequest getRequest = new GetRecordsRequest(); getRequest.setLimit(1000); getRequest.setShardIterator(iterator); // call "get" operation and get everything in this shard range GetRecordsResult getResponse = client.getRecords(getRequest); iterator = getResponse.getNextShardIterator(); List<Record> records = getResponse.getRecords(); processResponseRecords(records); return iterator; }
protected void processResponseRecords( List<Record> records ) { if ( records == null || records.isEmpty() ) { return; } receiveCount += records.size(); logger.debug("ReceiveCount= {}", receiveCount); for ( Record record : records ) { holdingBuffer.add(record); if ( shouldProcessRecord ) { processRecord( record ); } if ( doneLatch != null ) { doneLatch.countDown(); } } }
@Override protected void processRecord(Record record) { String partitionKey = record.getPartitionKey(); ByteBuffer data = record.getData(); logger.info("partitionKey={} ", partitionKey); byte[] dataBytes = new byte[data.remaining()]; data.get(dataBytes, 0, dataBytes.length); long key = Long.valueOf(partitionKey); TestPOJO expected = new TestPOJO(key); TestPOJO read = (TestPOJO)fieldValueGenerator.deserializeObject(dataBytes); if (!read.outputFieldsEquals(expected)) { logger.error("read is not same as expected. read={}, expected={}", read, expected); Assert.assertTrue(false); } else { logger.info("read is same as expected. read={}, expected={}", read, expected); } }
/** * Process a single record. * * @param record The record to be processed. */ private void processSingleRecord(Record record) { if (producer == null) { producer = createProducer(); } String kafkaServers = System.getenv().get("KAFKA_SERVERS"); if (null == kafkaServers) { kafkaServers = Constants.KAFKA_SERVERS; } ByteBuffer data = record.getData(); retryCounter = 0; // sending event to Queue ProducerRecord<String, byte[]> keyedMessage = new ProducerRecord<>( TestConstants.topic, TestConstants.partition, data.array()); this.sendMessage(keyedMessage); }
private void verifyRecords(List<IncomingMessageEnvelope> outputRecords, List<Record> inputRecords, String shardId) { Iterator outputRecordsIter = outputRecords.iterator(); inputRecords.forEach(record -> { IncomingMessageEnvelope envelope = (IncomingMessageEnvelope) outputRecordsIter.next(); String outputKey = (String) envelope.getKey(); KinesisIncomingMessageEnvelope kinesisMessageEnvelope = (KinesisIncomingMessageEnvelope) envelope; Assert.assertEquals(outputKey, record.getPartitionKey()); Assert.assertEquals(kinesisMessageEnvelope.getSequenceNumber(), record.getSequenceNumber()); Assert.assertEquals(kinesisMessageEnvelope.getApproximateArrivalTimestamp(), record.getApproximateArrivalTimestamp()); Assert.assertEquals(kinesisMessageEnvelope.getShardId(), shardId); ByteBuffer outputData = ByteBuffer.wrap((byte[]) kinesisMessageEnvelope.getMessage()); record.getData().rewind(); Assert.assertTrue(outputData.equals(record.getData())); verifyOffset(envelope.getOffset(), record, shardId); }); }
static Map<KinesisRecordProcessor, List<Record>> generateRecords(int numRecordsPerShard, List<KinesisRecordProcessor> processors) throws ShutdownException, InvalidStateException { Map<KinesisRecordProcessor, List<Record>> processorRecordMap = new HashMap<>(); processors.forEach(processor -> { try { // Create records and call process records IRecordProcessorCheckpointer checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class); doNothing().when(checkpointer).checkpoint(anyString()); doNothing().when(checkpointer).checkpoint(); ProcessRecordsInput processRecordsInput = Mockito.mock(ProcessRecordsInput.class); when(processRecordsInput.getCheckpointer()).thenReturn(checkpointer); when(processRecordsInput.getMillisBehindLatest()).thenReturn(1000L); List<Record> inputRecords = createRecords(numRecordsPerShard); processorRecordMap.put(processor, inputRecords); when(processRecordsInput.getRecords()).thenReturn(inputRecords); processor.processRecords(processRecordsInput); } catch (ShutdownException | InvalidStateException ex) { throw new RuntimeException(ex); } }); return processorRecordMap; }
private static List<Record> createRecords(int numRecords) { List<Record> records = new ArrayList<>(numRecords); Random rand = new Random(); for (int i = 0; i < numRecords; i++) { String dataStr = "testData-" + System.currentTimeMillis(); ByteBuffer data = ByteBuffer.wrap(dataStr.getBytes(StandardCharsets.UTF_8)); String key = String.format("partitionKey-%d", rand.nextLong()); String seqNum = String.format("%04d", 5 * i + 1); Record record = new Record() .withData(data) .withPartitionKey(key) .withSequenceNumber(seqNum) .withApproximateArrivalTimestamp(new Date()); records.add(record); } return records; }
@Test public void theTransformerShouldFailGracefullyWhenUnableToCompress () { CloudWatchMessageModelSumologicTransformer transfomer = new CloudWatchMessageModelSumologicTransformer(); String randomData = "Some random string without GZIP compression"; ByteBuffer bufferedData = null; try { bufferedData = encoder.encode(CharBuffer.wrap(randomData)); } catch (Exception e) { Assert.fail("Getting error: "+e.getMessage()); } Record mockedRecord = new Record(); mockedRecord.setData(bufferedData); CloudWatchLogsMessageModel messageModel = transfomer.toClass(mockedRecord); Assert.assertNull(messageModel); }
@Override public List<Object> getTuple(Record record) { CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder(); List<Object> tuple = new ArrayList<>(); tuple.add(record.getPartitionKey()); tuple.add(record.getSequenceNumber()); try { String data = decoder.decode(record.getData()).toString(); tuple.add(data); } catch (CharacterCodingException e) { e.printStackTrace(); LOG.warn("Exception occured. Emitting tuple with empty string data", e); tuple.add(""); } return tuple; }
public static List<com.amazonaws.services.kinesis.model.Record> getPreviewRecords( ClientConfiguration awsClientConfig, KinesisConfigBean conf, int maxBatchSize, GetShardIteratorRequest getShardIteratorRequest ) throws StageException { AmazonKinesis kinesisClient = getKinesisClient(awsClientConfig, conf); GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest); String shardIterator = getShardIteratorResult.getShardIterator(); GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(maxBatchSize); GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest); return getRecordsResult.getRecords(); }
public static List<com.streamsets.pipeline.api.Record> processKinesisRecord( String shardId, Record kRecord, DataParserFactory parserFactory ) throws DataParserException, IOException { final String recordId = createKinesisRecordId(shardId, kRecord); DataParser parser = parserFactory.getParser(recordId, kRecord.getData().array()); List<com.streamsets.pipeline.api.Record> records = new ArrayList<>(); com.streamsets.pipeline.api.Record r; while ((r = parser.parse()) != null) { records.add(r); } parser.close(); return records; }
public PutRecordResult putRecord(ByteBuffer data, String partitionKey) { // Create record and insert into the shards. Initially just do it // on a round robin basis. long ts = System.currentTimeMillis() - 50000; Record rec = new Record(); rec = rec.withData(data).withPartitionKey(partitionKey).withSequenceNumber(String.valueOf(sequenceNo)); rec.setApproximateArrivalTimestamp(new Date(ts)); if (nextShard == shards.size()) { nextShard = 0; } InternalShard shard = shards.get(nextShard); shard.addRecord(rec); PutRecordResult result = new PutRecordResult(); result.setSequenceNumber(String.valueOf(sequenceNo)); result.setShardId(shard.getShardId()); nextShard++; sequenceNo++; return result; }
@Override public void processRecords(List<Record> arg0, IRecordProcessorCheckpointer arg1) { counter += arg0.size(); if (counter > target) { System.out.println("Received : " + counter + " records"); target += target; } Record rec; for(int i = 0; i < arg0.size(); i++) { rec = arg0.get(i); try { verifyRecord(rec.getData()); } catch (JSONException | UnsupportedEncodingException e) { e.printStackTrace(); } } }
/** * {@inheritDoc} */ @Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { LOG.info("Aggregating " + records.size() + " records for Kinesis Shard " + kinesisShardId); try { // run data into the aggregator agg.aggregate(records); // checkpoint the aggregator and kcl agg.checkpoint(); checkpointer.checkpoint(records.get(records.size() - 1)); LOG.debug("Kinesis Checkpoint for Shard " + kinesisShardId + " Complete"); } catch (Exception e) { e.printStackTrace(); LOG.error(e); shutdown(checkpointer, ShutdownReason.ZOMBIE); } }
@Override public void execute(Tuple input, BasicOutputCollector collector) { Record record = (Record)input.getValueByField(DefaultKinesisRecordScheme.FIELD_RECORD); ByteBuffer buffer = record.getData(); String data = null; try { data = decoder.decode(buffer).toString(); JSONObject jsonObject = new JSONObject(data); String referrer = jsonObject.getString("referrer"); int firstIndex = referrer.indexOf('.'); int nextIndex = referrer.indexOf('.',firstIndex+1); collector.emit(new Values(referrer.substring(firstIndex+1,nextIndex))); } catch (CharacterCodingException|JSONException|IllegalStateException e) { LOG.error("Exception when decoding record ", e); } }
private void processRecordsWithRetries(List<Record> records) throws Exception { for (Record record : records) { int tryCount = 0; boolean processedOk = false; while (tryCount < NUM_RETRIES) { try { try { processSingleRecord(record); processedOk = true; } catch (Throwable t) { System.out.println("Caught throwable " + t + " while processing record " + record); // exponential backoff Thread.sleep(new Double(Math.pow(2, tryCount) * BACKOFF_TIME_IN_MILLIS).longValue()); } } catch (InterruptedException e) { throw e; } } if (!processedOk) { throw new Exception("Unable to process record " + record.getPartitionKey() + " after " + NUM_RETRIES); } } }
/** Process records performing retries as needed. Skip "poison pill" records. * @param records */ private void processRecordsWithRetries(List<Record> records) { for(Record record : records) { boolean processedSuccessfully = false; for (int i = 0; i < NUM_RETRIES; i++) { processedSuccessfully = this.processRecord(record); if(processedSuccessfully){ break; } /** If here then a failure occurred, let's back off **/ this.backOff(); } if (!processedSuccessfully){ LOGGER.error("Couldn't process record {} - skipping record", record); /** TODO add an optional failure handler here, such as a dead letter queue **/ } } }
@Override public void processRecords(List<Record> list, IRecordProcessorCheckpointer irpc) { _logger.info("Processing {} records", list.size()); for(Record r: list){ String data = new String(r.getData().array()); long seq = _buffer.next(); KinesisEvent evt = _buffer.get(seq); evt.setData(data); _buffer.publish(seq); } try{ irpc.checkpoint(); } catch(InvalidStateException | KinesisClientLibDependencyException | ShutdownException | ThrottlingException ex){ _logger.warn("Exception while checkpointing", ex); } }
@Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { records.forEach(record -> { byte[] bytes = new byte[record.getData().remaining()]; record.getData().get(bytes); System.out.write(bytes, 0, bytes.length); System.out.println(); }); }
@Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { records.forEach(record -> { try { byte[] bytes = new byte[record.getData().remaining()]; record.getData().get(bytes); System.out.println(mapper.writeValueAsString(mapper.readTree(bytes))); } catch (IOException e) { logger.error("", e); } }); }
@Override public List<SourceRecord> poll() throws InterruptedException { List<SourceRecord> records; try { GetRecordsResult recordsResult = this.kinesisClient.getRecords(this.recordsRequest); records = new ArrayList<>(recordsResult.getRecords().size()); log.trace("poll() - {} record(s) returned from shard {}.", this.config.kinesisShardId); for (Record record : recordsResult.getRecords()) { SourceRecord sourceRecord = this.recordConverter.sourceRecord(this.config.kinesisStreamName, this.config.kinesisShardId, record); records.add(sourceRecord); } log.trace("poll() - Changing shard iterator to {}", recordsResult.getNextShardIterator()); this.recordsRequest.setShardIterator(recordsResult.getNextShardIterator()); } catch (ProvisionedThroughputExceededException ex) { log.warn("poll() - Throughput exceeded sleeping {} ms", this.config.kinesisThroughputExceededBackoffMs, ex); this.time.sleep(this.config.kinesisThroughputExceededBackoffMs); return new ArrayList<>(); } if (records.isEmpty()) { log.trace("poll() - No records returned. Sleeping {} ms.", this.config.kinesisEmptyRecordsBackoffMs); this.time.sleep(this.config.kinesisEmptyRecordsBackoffMs); } return records; }
public static Record record() { return new Record() .withApproximateArrivalTimestamp(EXPECTED_APPROXIMATE_ARRIVAL_TIMESTAMP) .withData(ByteBuffer.wrap(EXPECTED_DATA)) .withPartitionKey(EXPECTED_PARTITION_KEY) .withSequenceNumber(EXPECTED_SEQUENCE_NUMBER); }
@Test public void collectorFailsWhenRecordEncodedAsSingleSpan() { Span span = TestObjects.LOTS_OF_SPANS[0]; byte[] encodedSpan = Codec.THRIFT.writeSpan(span); Record kinesisRecord = new Record().withData(ByteBuffer.wrap(encodedSpan)); ProcessRecordsInput kinesisInput = new ProcessRecordsInput().withRecords(Collections.singletonList(kinesisRecord)); kinesisSpanProcessor.processRecords(kinesisInput); assertThat(storage.spanStore().getTraces().size()).isEqualTo(0); assertThat(metrics.messagesDropped()).isEqualTo(1); assertThat(metrics.bytes()).isEqualTo(encodedSpan.length); }
private ProcessRecordsInput createTestData(int count) { List<Record> records = new ArrayList<>(); Span[] spans = Arrays.copyOfRange(TestObjects.LOTS_OF_SPANS, 0, count); Arrays.stream(spans) .map(s -> ByteBuffer.wrap(Codec.THRIFT.writeSpans(Collections.singletonList(s)))) .map(b -> new Record().withData(b)) .forEach(records::add); return new ProcessRecordsInput().withRecords(records); }
public Record convertToRecord() { return new Record(). withApproximateArrivalTimestamp(arrivalTimestamp.toDate()). withData(ByteBuffer.wrap(data.getBytes())). withSequenceNumber(sequenceNumber). withPartitionKey(""); }
@Override public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) { String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":"); int shardId = parseInt(shardIteratorParts[0]); int startingRecord = parseInt(shardIteratorParts[1]); List<Record> shardData = shardedData.get(shardId); int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size()); int fromIndex = min(startingRecord, toIndex); return new GetRecordsResult() .withRecords(shardData.subList(fromIndex, toIndex)) .withNextShardIterator(String.format("%s:%s", shardId, toIndex)) .withMillisBehindLatest(0L); }
public static List<Record> createRecordBatchWithRange(int min, int max) { List<Record> batch = new LinkedList<>(); for (int i = min; i < max; i++) { batch.add( new Record() .withData(ByteBuffer.wrap(String.valueOf(i).getBytes(ConfigConstants.DEFAULT_CHARSET))) .withPartitionKey(UUID.randomUUID().toString()) .withApproximateArrivalTimestamp(new Date(System.currentTimeMillis())) .withSequenceNumber(String.valueOf(i))); } return batch; }
public static List<Record> createRecordBatchWithRange(int min, int max) { List<Record> batch = new LinkedList<>(); for (int i = min; i < max; i++) { batch.add( new Record() .withData(ByteBuffer.wrap(String.valueOf(i).getBytes())) .withPartitionKey(UUID.randomUUID().toString()) .withApproximateArrivalTimestamp(new Date(System.currentTimeMillis())) .withSequenceNumber(String.valueOf(i))); } return batch; }
/** * {@inheritDoc} */ @Override public void processRecords(ProcessRecordsInput processRecordsInput) { try { List<Record> records = processRecordsInput.getRecords(); Thread.currentThread().setName(kinesisShardId); int bytes = calculateSize(records); LOG.debug("Got {} records ({} bytes) and is behind latest with {}", records.size(), bytes, printTextBehindLatest(processRecordsInput)); metricsCallback.shardBehindMs (kinesisShardId, processRecordsInput.getMillisBehindLatest()); Observable observable = Observable.create(subscriber -> { try { for (Record record : records) { subscriber.onNext(Codecs.BYTES.from(record.getData().array()) .put("_shardId", kinesisShardId)); } subscriber.onCompleted(); metricsCallback.recordsProcessed (kinesisShardId, records.size()); metricsCallback.bytesProcessed (kinesisShardId,bytes); } catch (RuntimeException e) { subscriber.onError(e); } }); unitOfWorkListener.apply(observable).toBlocking().subscribe(); transaction.checkpoint(processRecordsInput.getCheckpointer()); } catch (RuntimeException t) { doOnError(t); } }
private int calculateSize(List<Record> records) { int bytes = 0; for (Record r : records) { bytes += r.getData().remaining(); } // We get it in binary, but it's actually sent as Base64 return bytes * 3 / 2; }
public Exchange createExchange(Record record) { Exchange exchange = super.createExchange(); exchange.getIn().setBody(record); exchange.getIn().setHeader(KinesisConstants.APPROX_ARRIVAL_TIME, record.getApproximateArrivalTimestamp()); exchange.getIn().setHeader(KinesisConstants.PARTITION_KEY, record.getPartitionKey()); exchange.getIn().setHeader(KinesisConstants.SEQUENCE_NUMBER, record.getSequenceNumber()); return exchange; }