/** * Reads the LOINC mutliaxial hierarchy file and converts it to a {@link HierarchicalElement} * dataset. * * @param spark the Spark session * @param loincHierarchyPath path to the multiaxial hierarchy CSV * @return a dataset of {@link HierarchicalElement} representing the hierarchical relationship. */ public static Dataset<HierarchicalElement> readMultiaxialHierarchyFile(SparkSession spark, String loincHierarchyPath) { return spark.read() .option("header", true) .csv(loincHierarchyPath) .select(col("IMMEDIATE_PARENT"), col("CODE")) .where(col("IMMEDIATE_PARENT").isNotNull() .and(col("IMMEDIATE_PARENT").notEqual(lit("")))) .where(col("CODE").isNotNull() .and(col("CODE").notEqual(lit("")))) .map((MapFunction<Row, HierarchicalElement>) row -> { HierarchicalElement element = new HierarchicalElement(); element.setAncestorSystem(LOINC_CODE_SYSTEM_URI); element.setAncestorValue(row.getString(0)); element.setDescendantSystem(LOINC_CODE_SYSTEM_URI); element.setDescendantValue(row.getString(1)); return element; }, Hierarchies.getHierarchicalElementEncoder()); }
/** * Returns a new ConceptMaps instance that includes the given maps. * * @param conceptMaps concept maps to add to the returned collection. * @return a new ConceptMaps instance with the values added. */ public ConceptMaps withConceptMaps(Dataset<ConceptMap> conceptMaps) { Dataset<UrlAndVersion> newMembers = getUrlAndVersions(conceptMaps); if (hasDuplicateUrlAndVersions(newMembers) || conceptMaps.count() != newMembers.count()) { throw new IllegalArgumentException( "Cannot add concept maps having duplicate conceptMapUri and conceptMapVersion"); } // Remove the concept contents for persistence. This is most easily done in the ConceptMap // object by setting the group to an empty list. Dataset<ConceptMap> withoutConcepts = conceptMaps .map((MapFunction<ConceptMap,ConceptMap>) conceptMap -> { // Remove the elements rather than the groups to preserved the // "unmapped" structure in a group that can refer to other // concept maps. ConceptMap withoutElements = conceptMap.copy(); List<ConceptMapGroupComponent> updatedGroups = new ArrayList<>(); for (ConceptMapGroupComponent group: withoutElements.getGroup()) { group.setElement(new ArrayList<>()); updatedGroups.add(group); } withoutElements.setGroup(updatedGroups); return withoutElements; }, CONCEPT_MAP_ENCODER); Dataset<Mapping> newMappings = conceptMaps.flatMap(ConceptMaps::expandMappingsIterator, MAPPING_ENCODER); return withConceptMaps(withoutConcepts, newMappings); }
/** * Join an RDD of locatables with a set of intervals, and apply a function to process the locatables that overlap each interval. * @param ctx the Spark Context * @param locatables the locatables RDD, must be coordinate sorted * @param locatableClass the class of the locatables, must be a subclass of {@link Locatable} * @param sequenceDictionary the sequence dictionary to use to find contig lengths * @param intervals the collection of intervals to apply the function to * @param maxLocatableLength the maximum length of a {@link Locatable}, if any is larger than this size then an exception will be thrown * @param f the function to process intervals and overlapping locatables with * @param <L> the {@link Locatable} type * @param <I> the interval type * @param <T> the return type of <code>f</code> * @return */ private static <L extends Locatable, I extends Locatable, T> JavaRDD<T> joinOverlapping(JavaSparkContext ctx, JavaRDD<L> locatables, Class<L> locatableClass, SAMSequenceDictionary sequenceDictionary, List<I> intervals, int maxLocatableLength, MapFunction<Tuple2<I, Iterable<L>>, T> f) { return joinOverlapping(ctx, locatables, locatableClass, sequenceDictionary, intervals, maxLocatableLength, (FlatMapFunction2<Iterator<L>, Iterator<I>, T>) (locatablesIterator, shardsIterator) -> Iterators.transform(locatablesPerShard(locatablesIterator, shardsIterator, sequenceDictionary, maxLocatableLength), new Function<Tuple2<I,Iterable<L>>, T>() { @Nullable @Override public T apply(@Nullable Tuple2<I, Iterable<L>> input) { try { return f.call(input); } catch (Exception e) { throw new RuntimeException(e); } } })); }
/** * Reads a Snomed relationship file and converts it to a {@link HierarchicalElement} dataset. * * @param spark the Spark session * @param snomedRelationshipPath path to the SNOMED relationship file * @return a dataset of{@link HierarchicalElement} representing the hierarchical relationship. */ public static Dataset<HierarchicalElement> readRelationshipFile(SparkSession spark, String snomedRelationshipPath) { return spark.read() .option("header", true) .option("delimiter", "\t") .csv(snomedRelationshipPath) .where(col("typeId").equalTo(lit(SNOMED_ISA_RELATIONSHIP_ID))) .where(col("active").equalTo(lit("1"))) .select(col("destinationId"), col("sourceId")) .where(col("destinationId").isNotNull() .and(col("destinationId").notEqual(lit("")))) .where(col("sourceId").isNotNull() .and(col("sourceId").notEqual(lit("")))) .map((MapFunction<Row, HierarchicalElement>) row -> { HierarchicalElement element = new HierarchicalElement(); element.setAncestorSystem(SNOMED_CODE_SYSTEM_URI); element.setAncestorValue(row.getString(0)); element.setDescendantSystem(SNOMED_CODE_SYSTEM_URI); element.setDescendantValue(row.getString(1)); return element; }, Hierarchies.getHierarchicalElementEncoder()); }
public static void main(String[] args) throws Exception { //Read properties Properties prop = PropertyFileReader.readPropertyFile(); //SparkSesion SparkSession spark = SparkSession .builder() .appName("VideoStreamProcessor") .master(prop.getProperty("spark.master.url")) .getOrCreate(); //directory to save image files with motion detected final String processedImageDir = prop.getProperty("processed.output.dir"); logger.warn("Output directory for saving processed images is set to "+processedImageDir+". This is configured in processed.output.dir key of property file."); //create schema for json message StructType schema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("cameraId", DataTypes.StringType, true), DataTypes.createStructField("timestamp", DataTypes.TimestampType, true), DataTypes.createStructField("rows", DataTypes.IntegerType, true), DataTypes.createStructField("cols", DataTypes.IntegerType, true), DataTypes.createStructField("type", DataTypes.IntegerType, true), DataTypes.createStructField("data", DataTypes.StringType, true) }); //Create DataSet from stream messages from kafka Dataset<VideoEventData> ds = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers", prop.getProperty("kafka.bootstrap.servers")) .option("subscribe", prop.getProperty("kafka.topic")) .option("kafka.max.partition.fetch.bytes", prop.getProperty("kafka.max.partition.fetch.bytes")) .option("kafka.max.poll.records", prop.getProperty("kafka.max.poll.records")) .load() .selectExpr("CAST(value AS STRING) as message") .select(functions.from_json(functions.col("message"),schema).as("json")) .select("json.*") .as(Encoders.bean(VideoEventData.class)); //key-value pair of cameraId-VideoEventData KeyValueGroupedDataset<String, VideoEventData> kvDataset = ds.groupByKey(new MapFunction<VideoEventData, String>() { @Override public String call(VideoEventData value) throws Exception { return value.getCameraId(); } }, Encoders.STRING()); //process Dataset<VideoEventData> processedDataset = kvDataset.mapGroupsWithState(new MapGroupsWithStateFunction<String, VideoEventData, VideoEventData,VideoEventData>(){ @Override public VideoEventData call(String key, Iterator<VideoEventData> values, GroupState<VideoEventData> state) throws Exception { logger.warn("CameraId="+key+" PartitionId="+TaskContext.getPartitionId()); VideoEventData existing = null; //check previous state if (state.exists()) { existing = state.get(); } //detect motion VideoEventData processed = VideoMotionDetector.detectMotion(key,values,processedImageDir,existing); //update last processed if(processed != null){ state.update(processed); } return processed; }}, Encoders.bean(VideoEventData.class), Encoders.bean(VideoEventData.class)); //start StreamingQuery query = processedDataset.writeStream() .outputMode("update") .format("console") .start(); //await query.awaitTermination(); }
/** * Returns a new ValueSets instance that includes the given value sets. * * @param valueSets the value sets to add to the returned collection. * @return a new ValueSets instance with the added value sets. */ public ValueSets withValueSets(Dataset<ValueSet> valueSets) { Dataset<UrlAndVersion> newMembers = getUrlAndVersions(valueSets); // Ensure that there are no duplicates among the value sets if (hasDuplicateUrlAndVersions(newMembers) || valueSets.count() != newMembers.count()) { throw new IllegalArgumentException( "Cannot add value sets having duplicate valueSetUri and valueSetVersion"); } // The value set concepts will be stored in the values table for persistence, so we remove // them from the individual value sets. This can be done most easily by setting concepts to an // empty list. Dataset<ValueSet> withoutConcepts = valueSets.map((MapFunction<ValueSet,ValueSet>) valueSet -> { ValueSet valueSetWithoutConcepts = valueSet.copy(); List<ConceptSetComponent> updatedInclusions = new ArrayList<>(); for (ConceptSetComponent inclusion: valueSet.getCompose().getInclude()) { ConceptSetComponent inclusionWithoutConcepts = inclusion.copy(); inclusionWithoutConcepts.setConcept(new ArrayList<>()); updatedInclusions.add(inclusionWithoutConcepts); } valueSetWithoutConcepts.getCompose().setInclude(updatedInclusions); return valueSetWithoutConcepts; }, VALUE_SET_ENCODER); Dataset<Value> newValues = valueSets.flatMap(ValueSets::expandValuesIterator, VALUE_ENCODER); return withValueSets(withoutConcepts, newValues); }