private KinesisWrapper(final InternalEvent internal) { KinesisEventRecord eventRecord = ((KinesisInternalEvent) internal).getRecord(); Record record = eventRecord.getKinesis(); this.partitionKey = record.getPartitionKey(); this.sequenceNumber = record.getSequenceNumber(); this.eventSource = eventRecord.getEventSource(); this.sourceArn = eventRecord.getEventSourceARN(); this.functionName = internal.getCtx().getFunctionName(); this.functionVersion = internal.getCtx().getFunctionVersion(); this.processingTime = System.currentTimeMillis(); this.arrivalTime = record.getApproximateArrivalTimestamp().getTime(); this.timestamp = internal.getEventTime(); this.processingDelay = processingTime - timestamp; if (internal.getEventObj() != null) { this.payload = internal.getEventObj().getPayload(); } else { this.payload = null; } }
public void handler(KinesisEvent event, Context context) throws HandlerException { if (!initialized) { init(context); } this.recordIterator = new KinesisEventIterator(context, event.getRecords()); /* * Get processors based on the source stream ARN */ KinesisEventRecord firstRecord = event.getRecords().get(0); this.source = SourceUtils.getSource(firstRecord.getEventSourceARN(), sources); super.process(context); }
public static KinesisEvent createEvent(Class clazz, String resource) throws UnsupportedEncodingException, IOException { /* * Create a kinesis record from a sample JSON file */ String json = IOUtils.toString(new InputStreamReader(clazz.getResourceAsStream(resource), "UTF-8")); Date approximateArrivalTimestamp = new Date(); approximateArrivalTimestamp.setTime(1478737790000l); Record rec = new Record(); rec.withPartitionKey("1").withSequenceNumber("2").withData(ByteBuffer.wrap(json.getBytes())) .withApproximateArrivalTimestamp(approximateArrivalTimestamp); /* * Create a KinesisEventRecord and add single Record */ KinesisEventRecord krecord = new KinesisEventRecord(); krecord.setKinesis(rec); krecord.setEventSourceARN("arn:aws:kinesis:us-east-1:1234:stream/test-events-stream"); krecord.setEventID("shardId-000000000000:1234"); /* * Add single KinesisEventRecord to a KinesisEvent */ KinesisEvent kevent = new KinesisEvent(); List<KinesisEventRecord> events = new ArrayList<KinesisEventRecord>(1); events.add(krecord); kevent.setRecords(events); return kevent; }
@Override protected List<KinesisEventRecord> convertEvent(KinesisEvent event) { // TODO: maybe convert to List<Message> return event.getRecords(); }
public KinesisInternalEvent(KinesisEventRecord record, Context context) { super(new String(record.getKinesis().getData().array()), context, record.getKinesis().getApproximateArrivalTimestamp().getTime()); this.record = record; }
public KinesisEventRecord getRecord() { return record; }
public KinesisEventIterator(Context context, List<KinesisEventRecord> records) { this.iterator = records.iterator(); this.context = context; }