/** * Returns the collection of ancestors from the table in the given database. * * @param spark the spark session * @param database name of the database containing the ancestors table * @return a Hierarchies instance. */ public static Hierarchies getFromDatabase(SparkSession spark, String database) { Dataset<Ancestor> ancestors = spark.sql("SELECT * FROM " + database + "." + ANCESTORS_TABLE) .as(ANCESTOR_ENCODER); Dataset<UrlAndVersion> members = ancestors.filter((FilterFunction<Ancestor>) ancestor -> ancestor.getUri().startsWith(HIERARCHY_URI_PREFIX)) .select(col("uri").alias("url"), col("version")) .distinct() .as(URI_AND_VERSION_ENCODER); return new Hierarchies(spark, members, ancestors); }
private Dataset<Row> doOperation(final GetDataFrameOfElements operation, final ParquetStore store, final SparkSession spark, final Authorisations auths, final String visibility) throws OperationException { if (operation.getView().equals(store.getSchemaUtils().getEmptyView())) { LOGGER.debug("Retrieving elements as a dataframe"); final String rootDir; if (null != store.getGraphIndex()) { rootDir = store.getDataDir() + "/" + store.getGraphIndex().getSnapshotTimestamp() + "/"; } else { return spark.emptyDataFrame(); } final Dataset<Row> dataset; if (!visibility.isEmpty()) { final FilterFunction<Row> filter = e -> isVisible(e, visibility, auths); dataset = spark .read() .option("mergeSchema", true) .parquet(rootDir + ParquetStoreConstants.GRAPH) .filter(filter); } else { dataset = spark .read() .option("mergeSchema", true) .parquet(rootDir + ParquetStoreConstants.GRAPH); } LOGGER.debug("The merged schema that the data is being loaded using is: {}", dataset.schema().treeString()); return dataset; } else { throw new OperationException("Views are not supported by this operation yet"); } }
/** * Returns a dataset with the values for each element in the map of uri to version. * * @param uriToVersion a map of value set URI to the version to load * @return a dataset of values for the given URIs and versions. */ public Dataset<Value> getValues(Map<String,String> uriToVersion) { JavaSparkContext context = new JavaSparkContext(this.spark.sparkContext()); Broadcast<Map<String,String>> broadcastUrisToVersion = context.broadcast(uriToVersion); return this.values.filter((FilterFunction<Value>) value -> { String latestVersion = broadcastUrisToVersion.getValue().get(value.getValueSetUri()); return latestVersion != null && latestVersion.equals(value.getValueSetVersion()); }); }
/** * Returns a dataset with the mappings for each uri and version. * * @param uriToVersion a map of concept map URI to the version to load * @return a dataset of mappings for the given URIs and versions. */ public Dataset<Mapping> getMappings(Map<String,String> uriToVersion) { JavaSparkContext context = new JavaSparkContext(this.spark.sparkContext()); Broadcast<Map<String,String>> broadcastMaps = context.broadcast(uriToVersion); return this.mappings.filter((FilterFunction<Mapping>) mapping -> { String latestVersion = broadcastMaps.getValue().get(mapping.getConceptMapUri()); return latestVersion != null && latestVersion.equals(mapping.getConceptMapVersion()); }); }