Java 类org.apache.spark.api.java.function.FilterFunction 实例源码

项目:bunsen    文件:Hierarchies.java   
/**
 * Returns the collection of ancestors from the table in the given database.
 *
 * @param spark the spark session
 * @param database name of the database containing the ancestors table
 * @return a Hierarchies instance.
 */
public static Hierarchies getFromDatabase(SparkSession spark, String database) {

  Dataset<Ancestor> ancestors = spark.sql("SELECT * FROM " + database + "." + ANCESTORS_TABLE)
      .as(ANCESTOR_ENCODER);

  Dataset<UrlAndVersion> members = ancestors.filter((FilterFunction<Ancestor>) ancestor ->
          ancestor.getUri().startsWith(HIERARCHY_URI_PREFIX))
      .select(col("uri").alias("url"), col("version"))
      .distinct()
      .as(URI_AND_VERSION_ENCODER);

  return new Hierarchies(spark,
      members,
      ancestors);
}
项目:Gaffer    文件:GetDataframeOfElementsHandler.java   
private Dataset<Row> doOperation(final GetDataFrameOfElements operation,
                                 final ParquetStore store,
                                 final SparkSession spark,
                                 final Authorisations auths,
                                 final String visibility) throws OperationException {
    if (operation.getView().equals(store.getSchemaUtils().getEmptyView())) {
        LOGGER.debug("Retrieving elements as a dataframe");
        final String rootDir;
        if (null != store.getGraphIndex()) {
            rootDir = store.getDataDir() + "/" + store.getGraphIndex().getSnapshotTimestamp() + "/";
        } else {
            return spark.emptyDataFrame();
        }

        final Dataset<Row> dataset;
        if (!visibility.isEmpty()) {
            final FilterFunction<Row> filter = e -> isVisible(e, visibility, auths);
            dataset = spark
                    .read()
                    .option("mergeSchema", true)
                    .parquet(rootDir + ParquetStoreConstants.GRAPH)
                    .filter(filter);
        } else {
            dataset = spark
                    .read()
                    .option("mergeSchema", true)
                    .parquet(rootDir + ParquetStoreConstants.GRAPH);
        }
        LOGGER.debug("The merged schema that the data is being loaded using is: {}", dataset.schema().treeString());
        return dataset;
    } else {
        throw new OperationException("Views are not supported by this operation yet");
    }
}
项目:bunsen    文件:ValueSets.java   
/**
 * Returns a dataset with the values for each element in the map of uri to version.
 *
 * @param uriToVersion a map of value set URI to the version to load
 * @return a dataset of values for the given URIs and versions.
 */
public Dataset<Value> getValues(Map<String,String> uriToVersion) {

  JavaSparkContext context = new JavaSparkContext(this.spark.sparkContext());

  Broadcast<Map<String,String>> broadcastUrisToVersion = context.broadcast(uriToVersion);

  return this.values.filter((FilterFunction<Value>) value -> {

    String latestVersion = broadcastUrisToVersion.getValue().get(value.getValueSetUri());

    return latestVersion != null && latestVersion.equals(value.getValueSetVersion());
  });
}
项目:bunsen    文件:ConceptMaps.java   
/**
 * Returns a dataset with the mappings for each uri and version.
 *
 * @param uriToVersion a map of concept map URI to the version to load
 * @return a dataset of mappings for the given URIs and versions.
 */
public Dataset<Mapping> getMappings(Map<String,String> uriToVersion) {

  JavaSparkContext context = new JavaSparkContext(this.spark.sparkContext());

  Broadcast<Map<String,String>> broadcastMaps = context.broadcast(uriToVersion);

  return this.mappings.filter((FilterFunction<Mapping>) mapping -> {

    String latestVersion = broadcastMaps.getValue().get(mapping.getConceptMapUri());

    return latestVersion != null && latestVersion.equals(mapping.getConceptMapVersion());
  });
}