小编典典

使用Spark / java的ST_geomfromtext函数

sql

由于ST_GeomFromText不是org.apache.spark.sql.functions的一部分,因此它不会在内部识别它。我需要首先为此函数定义UDF。意味着我需要定义该函数的定义,然后将该函数向spark注册为UDF,然后只有我才能使用此函数。

我陷入了开始定义此功能的困境,需要使用什么参数。

编辑

我使用的代码如下:

 sparkSession.udf().register("ST_GeomFromText", new UDF1<String, String>() {
        @Override
        public String call(String txt ) {
            return (new ST_GeomFromText(txt));
        }
    }, DataTypes.StringType);

我真的需要你的帮助。

谢谢


阅读 374

收藏
2021-04-07

共1个答案

小编典典

我认为,您没有完全按照GeoSparkSQL-Overview /#quick-
start
进行操作-

  1. 按照快速入门,您需要将GeoSpark-core和GeoSparkSQL添加到项目POM.xml或build.sbt中

    <!– Geo spark lib doc - https://datasystemslab.github.io/GeoSpark/api/sql/GeoSparkSQL-Overview/#quick-start-->

    org.datasyslab
    geospark-sql_2.3
    1.3.1



    com.vividsolutions
    jts
    1.13



    org.datasyslab
    geospark-viz_2.3
    1.3.1


    org.datasyslab
    geospark
    1.3.1

  2. 声明您的Spark会话

    SparkSession sparkSession = SparkSession.builder()
    .config(“spark.serializer”, KryoSerializer.class.getName())
    .config(“spark.kryo.registrator”, GeoSparkKryoRegistrator.class.getName())
    .master(“local[*]”)
    .appName(“myGeoSparkSQLdemo”)
    .getOrCreate();

  3. 从注册所有功能geospark-sql_2.3sparkSession,这样它可以用来直接火花SQL

    // register all functions from geospark-sql_2.3 to sparkSession
    GeoSparkSQLRegistrator.registerAll(sparkSession);

现在,这是工作示例-

   SparkSession sparkSession = SparkSession.builder()
                .config("spark.serializer", KryoSerializer.class.getName())
                .config("spark.kryo.registrator", GeoSparkKryoRegistrator.class.getName())
                .master("local[*]")
                .appName("myGeoSparkSQLdemo")
                .getOrCreate();

        // register all functions from geospark-sql_2.3 to sparkSession
        GeoSparkSQLRegistrator.registerAll(sparkSession);
        try {
            System.out.println(sparkSession.catalog().getFunction("ST_Geomfromtext"));
            // Function[name='ST_GeomFromText', className='org.apache.spark.sql.geosparksql.expressions.ST_GeomFromText$', isTemporary='true']
        } catch (Exception e) {
            e.printStackTrace();
        }
        // https://datasystemslab.github.io/GeoSpark/api/sql/GeoSparkSQL-Function/
        Dataset<Row> dataframe = sparkSession.sql("select ST_GeomFromText('POINT(-7.07378166 33.826661)')");
        dataframe.show(false);
        dataframe.printSchema();
        /**
         * +---------------------------------------------+
         * |st_geomfromtext(POINT(-7.07378166 33.826661))|
         * +---------------------------------------------+
         * |POINT (-7.07378166 33.826661)                |
         * +---------------------------------------------+
         */

        // using longitude and latitude column from existing dataframe
        Dataset<Row> df = sparkSession.sql("select -7.07378166 as longitude, 33.826661 as latitude");
        df.withColumn("ST_Geomfromtext ",
                expr("ST_GeomFromText(CONCAT('POINT(',longitude,' ',latitude,')'))"))
        .show(false);
        /**
         * +-----------+---------+-----------------------------+
         * |longitude  |latitude |ST_Geomfromtext              |
         * +-----------+---------+-----------------------------+
         * |-7.07378166|33.826661|POINT (-7.07378166 33.826661)|
         * +-----------+---------+-----------------------------+
         */
2021-04-07