Java 类org.apache.spark.api.java.function.MapFunction 实例源码

项目:bunsen    文件:Loinc.java   
/**
 * Reads the LOINC mutliaxial hierarchy file and converts it to a {@link HierarchicalElement}
 * dataset.
 *
 * @param spark the Spark session
 * @param loincHierarchyPath path to the multiaxial hierarchy CSV
 * @return a dataset of {@link HierarchicalElement} representing the hierarchical relationship.
 */
public static Dataset<HierarchicalElement> readMultiaxialHierarchyFile(SparkSession spark,
    String loincHierarchyPath) {

  return spark.read()
      .option("header", true)
      .csv(loincHierarchyPath)
      .select(col("IMMEDIATE_PARENT"), col("CODE"))
      .where(col("IMMEDIATE_PARENT").isNotNull()
          .and(col("IMMEDIATE_PARENT").notEqual(lit(""))))
      .where(col("CODE").isNotNull()
          .and(col("CODE").notEqual(lit(""))))
      .map((MapFunction<Row, HierarchicalElement>) row -> {

        HierarchicalElement element = new HierarchicalElement();

        element.setAncestorSystem(LOINC_CODE_SYSTEM_URI);
        element.setAncestorValue(row.getString(0));

        element.setDescendantSystem(LOINC_CODE_SYSTEM_URI);
        element.setDescendantValue(row.getString(1));

        return element;
      }, Hierarchies.getHierarchicalElementEncoder());
}
项目:bunsen    文件:ConceptMaps.java   
/**
 * Returns a new ConceptMaps instance that includes the given maps.
 *
 * @param conceptMaps concept maps to add to the returned collection.
 * @return a new ConceptMaps instance with the values added.
 */
public ConceptMaps withConceptMaps(Dataset<ConceptMap> conceptMaps) {

  Dataset<UrlAndVersion> newMembers = getUrlAndVersions(conceptMaps);

  if (hasDuplicateUrlAndVersions(newMembers) || conceptMaps.count() != newMembers.count()) {

    throw new IllegalArgumentException(
        "Cannot add concept maps having duplicate conceptMapUri and conceptMapVersion");
  }

  // Remove the concept contents for persistence. This is most easily done in the ConceptMap
  // object by setting the group to an empty list.
  Dataset<ConceptMap> withoutConcepts = conceptMaps
      .map((MapFunction<ConceptMap,ConceptMap>) conceptMap -> {

        // Remove the elements rather than the groups to preserved the
        // "unmapped" structure in a group that can refer to other
        // concept maps.
        ConceptMap withoutElements = conceptMap.copy();

        List<ConceptMapGroupComponent> updatedGroups = new ArrayList<>();

        for (ConceptMapGroupComponent group: withoutElements.getGroup()) {

          group.setElement(new ArrayList<>());
          updatedGroups.add(group);
        }

        withoutElements.setGroup(updatedGroups);

        return withoutElements;
      }, CONCEPT_MAP_ENCODER);

  Dataset<Mapping> newMappings = conceptMaps.flatMap(ConceptMaps::expandMappingsIterator,
      MAPPING_ENCODER);

  return withConceptMaps(withoutConcepts, newMappings);
}
项目:gatk    文件:SparkSharder.java   
/**
 * Join an RDD of locatables with a set of intervals, and apply a function to process the locatables that overlap each interval.
 * @param ctx the Spark Context
 * @param locatables the locatables RDD, must be coordinate sorted
 * @param locatableClass the class of the locatables, must be a subclass of {@link Locatable}
 * @param sequenceDictionary the sequence dictionary to use to find contig lengths
 * @param intervals the collection of intervals to apply the function to
 * @param maxLocatableLength the maximum length of a {@link Locatable}, if any is larger than this size then an exception will be thrown
 * @param f the function to process intervals and overlapping locatables with
 * @param <L> the {@link Locatable} type
 * @param <I> the interval type
 * @param <T> the return type of <code>f</code>
 * @return
 */
private static <L extends Locatable, I extends Locatable, T> JavaRDD<T> joinOverlapping(JavaSparkContext ctx, JavaRDD<L> locatables, Class<L> locatableClass,
                                                                                        SAMSequenceDictionary sequenceDictionary, List<I> intervals,
                                                                                        int maxLocatableLength, MapFunction<Tuple2<I, Iterable<L>>, T> f) {
    return joinOverlapping(ctx, locatables, locatableClass, sequenceDictionary, intervals, maxLocatableLength,
            (FlatMapFunction2<Iterator<L>, Iterator<I>, T>) (locatablesIterator, shardsIterator) -> Iterators.transform(locatablesPerShard(locatablesIterator, shardsIterator, sequenceDictionary, maxLocatableLength), new Function<Tuple2<I,Iterable<L>>, T>() {
                @Nullable
                @Override
                public T apply(@Nullable Tuple2<I, Iterable<L>> input) {
                    try {
                        return f.call(input);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            }));
}
项目:bunsen    文件:Snomed.java   
/**
 * Reads a Snomed relationship file and converts it to a {@link HierarchicalElement} dataset.
 *
 * @param spark the Spark session
 * @param snomedRelationshipPath path to the SNOMED relationship file
 * @return a dataset of{@link HierarchicalElement} representing the hierarchical relationship.
 */
public static Dataset<HierarchicalElement> readRelationshipFile(SparkSession spark,
    String snomedRelationshipPath) {

  return spark.read()
      .option("header", true)
      .option("delimiter", "\t")
      .csv(snomedRelationshipPath)
      .where(col("typeId").equalTo(lit(SNOMED_ISA_RELATIONSHIP_ID)))
      .where(col("active").equalTo(lit("1")))
      .select(col("destinationId"), col("sourceId"))
      .where(col("destinationId").isNotNull()
          .and(col("destinationId").notEqual(lit(""))))
      .where(col("sourceId").isNotNull()
          .and(col("sourceId").notEqual(lit(""))))
      .map((MapFunction<Row, HierarchicalElement>) row -> {

        HierarchicalElement element = new HierarchicalElement();

        element.setAncestorSystem(SNOMED_CODE_SYSTEM_URI);
        element.setAncestorValue(row.getString(0));

        element.setDescendantSystem(SNOMED_CODE_SYSTEM_URI);
        element.setDescendantValue(row.getString(1));

        return element;
      }, Hierarchies.getHierarchicalElementEncoder());
}
项目:video-stream-analytics    文件:VideoStreamProcessor.java   
public static void main(String[] args) throws Exception {
//Read properties
Properties prop = PropertyFileReader.readPropertyFile();

//SparkSesion
SparkSession spark = SparkSession
          .builder()
          .appName("VideoStreamProcessor")
          .master(prop.getProperty("spark.master.url"))
          .getOrCreate();   

//directory to save image files with motion detected
final String processedImageDir = prop.getProperty("processed.output.dir");
logger.warn("Output directory for saving processed images is set to "+processedImageDir+". This is configured in processed.output.dir key of property file.");

//create schema for json message
StructType schema =  DataTypes.createStructType(new StructField[] { 
        DataTypes.createStructField("cameraId", DataTypes.StringType, true),
        DataTypes.createStructField("timestamp", DataTypes.TimestampType, true),
        DataTypes.createStructField("rows", DataTypes.IntegerType, true),
        DataTypes.createStructField("cols", DataTypes.IntegerType, true),
        DataTypes.createStructField("type", DataTypes.IntegerType, true),
        DataTypes.createStructField("data", DataTypes.StringType, true)
        });


//Create DataSet from stream messages from kafka
   Dataset<VideoEventData> ds = spark
     .readStream()
     .format("kafka")
     .option("kafka.bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"))
     .option("subscribe", prop.getProperty("kafka.topic"))
     .option("kafka.max.partition.fetch.bytes", prop.getProperty("kafka.max.partition.fetch.bytes"))
     .option("kafka.max.poll.records", prop.getProperty("kafka.max.poll.records"))
     .load()
     .selectExpr("CAST(value AS STRING) as message")
     .select(functions.from_json(functions.col("message"),schema).as("json"))
     .select("json.*")
     .as(Encoders.bean(VideoEventData.class)); 

   //key-value pair of cameraId-VideoEventData
KeyValueGroupedDataset<String, VideoEventData> kvDataset = ds.groupByKey(new MapFunction<VideoEventData, String>() {
    @Override
    public String call(VideoEventData value) throws Exception {
        return value.getCameraId();
    }
}, Encoders.STRING());

//process
Dataset<VideoEventData> processedDataset = kvDataset.mapGroupsWithState(new MapGroupsWithStateFunction<String, VideoEventData, VideoEventData,VideoEventData>(){
    @Override
    public VideoEventData call(String key, Iterator<VideoEventData> values, GroupState<VideoEventData> state) throws Exception {
        logger.warn("CameraId="+key+" PartitionId="+TaskContext.getPartitionId());
        VideoEventData existing = null;
        //check previous state
        if (state.exists()) {
            existing = state.get();
        }
        //detect motion
        VideoEventData processed = VideoMotionDetector.detectMotion(key,values,processedImageDir,existing);

        //update last processed
        if(processed != null){
            state.update(processed);
        }
        return processed;
    }}, Encoders.bean(VideoEventData.class), Encoders.bean(VideoEventData.class));

//start
 StreamingQuery query = processedDataset.writeStream()
          .outputMode("update")
          .format("console")
          .start();

 //await
    query.awaitTermination();
}
项目:bunsen    文件:ValueSets.java   
/**
 * Returns a new ValueSets instance that includes the given value sets.
 *
 * @param valueSets the value sets to add to the returned collection.
 * @return a new ValueSets instance with the added value sets.
 */
public ValueSets withValueSets(Dataset<ValueSet> valueSets) {

  Dataset<UrlAndVersion> newMembers = getUrlAndVersions(valueSets);

  // Ensure that there are no duplicates among the value sets
  if (hasDuplicateUrlAndVersions(newMembers) || valueSets.count() != newMembers.count()) {

    throw new IllegalArgumentException(
        "Cannot add value sets having duplicate valueSetUri and valueSetVersion");
  }

  // The value set concepts will be stored in the values table for persistence, so we remove
  // them from the individual value sets. This can be done most easily by setting concepts to an
  // empty list.
  Dataset<ValueSet> withoutConcepts = valueSets.map((MapFunction<ValueSet,ValueSet>) valueSet -> {
    ValueSet valueSetWithoutConcepts = valueSet.copy();

    List<ConceptSetComponent> updatedInclusions = new ArrayList<>();

    for (ConceptSetComponent inclusion: valueSet.getCompose().getInclude()) {

      ConceptSetComponent inclusionWithoutConcepts = inclusion.copy();

      inclusionWithoutConcepts.setConcept(new ArrayList<>());
      updatedInclusions.add(inclusionWithoutConcepts);
    }

    valueSetWithoutConcepts.getCompose().setInclude(updatedInclusions);

    return valueSetWithoutConcepts;
  }, VALUE_SET_ENCODER);

  Dataset<Value> newValues = valueSets.flatMap(ValueSets::expandValuesIterator, VALUE_ENCODER);

  return withValueSets(withoutConcepts, newValues);
}