Java 类com.amazonaws.services.lambda.runtime.events.KinesisEvent 实例源码

项目:bender    文件:ElasticSearchTansportSerializerTest.java   
@Test
public void testSerializeDateIndexName() throws UnsupportedEncodingException, IOException {
  ElasticSearchTransportSerializer serializer =
      new ElasticSearchTransportSerializer(false, "event", "log-", "yyyy-MM-dd");

  KinesisEvent kevent = TestUtils.createEvent(this.getClass(), "basic_event.json");
  String payload = new String(kevent.getRecords().get(0).getKinesis().getData().array());
  InternalEvent record = new DummyEvent(payload, 1478737790000l);

  String actual = new String(serializer.serialize(record));
  String expected = TestUtils.getResourceString(this.getClass(), "datetime_output.txt");
  assertEquals(expected, actual);
}
项目:bender    文件:KinesisHandler.java   
public void handler(KinesisEvent event, Context context) throws HandlerException {
  if (!initialized) {
    init(context);
  }

  this.recordIterator = new KinesisEventIterator(context, event.getRecords());

  /*
   * Get processors based on the source stream ARN
   */
  KinesisEventRecord firstRecord = event.getRecords().get(0);
  this.source = SourceUtils.getSource(firstRecord.getEventSourceARN(), sources);

  super.process(context);
}
项目:bender    文件:TestUtils.java   
public static KinesisEvent createEvent(Class clazz, String resource)
    throws UnsupportedEncodingException, IOException {
  /*
   * Create a kinesis record from a sample JSON file
   */
  String json =
      IOUtils.toString(new InputStreamReader(clazz.getResourceAsStream(resource), "UTF-8"));

  Date approximateArrivalTimestamp = new Date();
  approximateArrivalTimestamp.setTime(1478737790000l);

  Record rec = new Record();
  rec.withPartitionKey("1").withSequenceNumber("2").withData(ByteBuffer.wrap(json.getBytes()))
      .withApproximateArrivalTimestamp(approximateArrivalTimestamp);

  /*
   * Create a KinesisEventRecord and add single Record
   */
  KinesisEventRecord krecord = new KinesisEventRecord();
  krecord.setKinesis(rec);
  krecord.setEventSourceARN("arn:aws:kinesis:us-east-1:1234:stream/test-events-stream");
  krecord.setEventID("shardId-000000000000:1234");

  /*
   * Add single KinesisEventRecord to a KinesisEvent
   */
  KinesisEvent kevent = new KinesisEvent();
  List<KinesisEventRecord> events = new ArrayList<KinesisEventRecord>(1);
  events.add(krecord);
  kevent.setRecords(events);

  return kevent;
}
项目:lumber-mill    文件:KinesisLambda.java   
@Override
public String handleRequest(KinesisEvent event, Context context) {
    if (eventProcessor instanceof LambdaContextAwareEventProcessor) {
        ((LambdaContextAwareEventProcessor)eventProcessor).initialize(context);
    }

    Observable.from(event.getRecords())
            .map(this::toBytes)
            .compose(eventProcessor)
            .count()
            .toBlocking()
            .subscribe();
    return "Done";
}
项目:aws-big-data-blog    文件:KinesisToFirehose.java   
public void kinesisHandler(KinesisEvent event, Context context){
    logger = context.getLogger();
    setup();
    for(KinesisEvent.KinesisEventRecord rec : event.getRecords()) {
        logger.log("Got message ");
        String msg = new String(rec.getKinesis().getData().array())+"\n";
        Record deliveryStreamRecord = new Record().withData(ByteBuffer.wrap(msg.getBytes()));

        PutRecordRequest putRecordRequest = new PutRecordRequest()
                .withDeliveryStreamName(deliveryStreamName)
                .withRecord(deliveryStreamRecord);

        logger.log("Putting message");
        firehoseClient.putRecord(putRecordRequest);
        logger.log("Successful Put");
    }
}
项目:service-block-samples    文件:SpringBootKinesisEventHandler.java   
@Override
protected List<KinesisEventRecord> convertEvent(KinesisEvent event) {
    // TODO: maybe convert to List<Message>
    return event.getRecords();
}
项目:bender    文件:KinesisHandlerTest.java   
@Override
public KinesisEvent getTestEvent() throws Exception {
  return TestUtils.createEvent(this.getClass(), "basic_input.json");
}
项目:bender    文件:KinesisHandlerTest.java   
@Override
public KinesisEvent getTestEvent() throws Exception {
  return TestUtils.createEvent(this.getClass(), "basic_input.json");
}
项目:lumber-mill    文件:KinesisLambda.java   
protected BytesEvent toBytes(KinesisEvent.KinesisEventRecord record) {
    BytesEvent bytesEvent = Codecs.BYTES.from(record.getKinesis().getData().array());
    bytesEvent.put(METADATA_KINESIS_EVENT_RECORD, record);
    bytesEvent.put(METADATA_MILLIS_BEHIND_LATEST, System.currentTimeMillis() - record.getKinesis().getApproximateArrivalTimestamp().getTime());
    return bytesEvent;
}
项目:spring-cloud-function    文件:SpringBootKinesisEventHandler.java   
@Override
protected List<KinesisEventRecord> convertEvent(KinesisEvent event) {
    // TODO: maybe convert to List<Message>
    return event.getRecords();
}