@Test public void testSerializeDateIndexName() throws UnsupportedEncodingException, IOException { ElasticSearchTransportSerializer serializer = new ElasticSearchTransportSerializer(false, "event", "log-", "yyyy-MM-dd"); KinesisEvent kevent = TestUtils.createEvent(this.getClass(), "basic_event.json"); String payload = new String(kevent.getRecords().get(0).getKinesis().getData().array()); InternalEvent record = new DummyEvent(payload, 1478737790000l); String actual = new String(serializer.serialize(record)); String expected = TestUtils.getResourceString(this.getClass(), "datetime_output.txt"); assertEquals(expected, actual); }
public void handler(KinesisEvent event, Context context) throws HandlerException { if (!initialized) { init(context); } this.recordIterator = new KinesisEventIterator(context, event.getRecords()); /* * Get processors based on the source stream ARN */ KinesisEventRecord firstRecord = event.getRecords().get(0); this.source = SourceUtils.getSource(firstRecord.getEventSourceARN(), sources); super.process(context); }
public static KinesisEvent createEvent(Class clazz, String resource) throws UnsupportedEncodingException, IOException { /* * Create a kinesis record from a sample JSON file */ String json = IOUtils.toString(new InputStreamReader(clazz.getResourceAsStream(resource), "UTF-8")); Date approximateArrivalTimestamp = new Date(); approximateArrivalTimestamp.setTime(1478737790000l); Record rec = new Record(); rec.withPartitionKey("1").withSequenceNumber("2").withData(ByteBuffer.wrap(json.getBytes())) .withApproximateArrivalTimestamp(approximateArrivalTimestamp); /* * Create a KinesisEventRecord and add single Record */ KinesisEventRecord krecord = new KinesisEventRecord(); krecord.setKinesis(rec); krecord.setEventSourceARN("arn:aws:kinesis:us-east-1:1234:stream/test-events-stream"); krecord.setEventID("shardId-000000000000:1234"); /* * Add single KinesisEventRecord to a KinesisEvent */ KinesisEvent kevent = new KinesisEvent(); List<KinesisEventRecord> events = new ArrayList<KinesisEventRecord>(1); events.add(krecord); kevent.setRecords(events); return kevent; }
@Override public String handleRequest(KinesisEvent event, Context context) { if (eventProcessor instanceof LambdaContextAwareEventProcessor) { ((LambdaContextAwareEventProcessor)eventProcessor).initialize(context); } Observable.from(event.getRecords()) .map(this::toBytes) .compose(eventProcessor) .count() .toBlocking() .subscribe(); return "Done"; }
public void kinesisHandler(KinesisEvent event, Context context){ logger = context.getLogger(); setup(); for(KinesisEvent.KinesisEventRecord rec : event.getRecords()) { logger.log("Got message "); String msg = new String(rec.getKinesis().getData().array())+"\n"; Record deliveryStreamRecord = new Record().withData(ByteBuffer.wrap(msg.getBytes())); PutRecordRequest putRecordRequest = new PutRecordRequest() .withDeliveryStreamName(deliveryStreamName) .withRecord(deliveryStreamRecord); logger.log("Putting message"); firehoseClient.putRecord(putRecordRequest); logger.log("Successful Put"); } }
@Override protected List<KinesisEventRecord> convertEvent(KinesisEvent event) { // TODO: maybe convert to List<Message> return event.getRecords(); }
@Override public KinesisEvent getTestEvent() throws Exception { return TestUtils.createEvent(this.getClass(), "basic_input.json"); }
protected BytesEvent toBytes(KinesisEvent.KinesisEventRecord record) { BytesEvent bytesEvent = Codecs.BYTES.from(record.getKinesis().getData().array()); bytesEvent.put(METADATA_KINESIS_EVENT_RECORD, record); bytesEvent.put(METADATA_MILLIS_BEHIND_LATEST, System.currentTimeMillis() - record.getKinesis().getApproximateArrivalTimestamp().getTime()); return bytesEvent; }