由于ST_GeomFromText不是org.apache.spark.sql.functions的一部分,因此它不会在内部识别它。我需要首先为此函数定义UDF。意味着我需要定义该函数的定义,然后将该函数向spark注册为UDF,然后只有我才能使用此函数。
我陷入了开始定义此功能的困境,需要使用什么参数。
编辑
我使用的代码如下:
sparkSession.udf().register("ST_GeomFromText", new UDF1<String, String>() { @Override public String call(String txt ) { return (new ST_GeomFromText(txt)); } }, DataTypes.StringType);
我真的需要你的帮助。
谢谢
我认为,您没有完全按照GeoSparkSQL-Overview /#quick- start进行操作-
按照快速入门,您需要将GeoSpark-core和GeoSparkSQL添加到项目POM.xml或build.sbt中
<!– Geo spark lib doc - https://datasystemslab.github.io/GeoSpark/api/sql/GeoSparkSQL-Overview/#quick-start--> org.datasyslab geospark-sql_2.3 1.3.1 com.vividsolutions jts 1.13 org.datasyslab geospark-viz_2.3 1.3.1 org.datasyslab geospark 1.3.1
声明您的Spark会话
SparkSession sparkSession = SparkSession.builder() .config(“spark.serializer”, KryoSerializer.class.getName()) .config(“spark.kryo.registrator”, GeoSparkKryoRegistrator.class.getName()) .master(“local[*]”) .appName(“myGeoSparkSQLdemo”) .getOrCreate();
从注册所有功能geospark-sql_2.3的sparkSession,这样它可以用来直接火花SQL
geospark-sql_2.3
sparkSession
// register all functions from geospark-sql_2.3 to sparkSession GeoSparkSQLRegistrator.registerAll(sparkSession);
SparkSession sparkSession = SparkSession.builder() .config("spark.serializer", KryoSerializer.class.getName()) .config("spark.kryo.registrator", GeoSparkKryoRegistrator.class.getName()) .master("local[*]") .appName("myGeoSparkSQLdemo") .getOrCreate(); // register all functions from geospark-sql_2.3 to sparkSession GeoSparkSQLRegistrator.registerAll(sparkSession); try { System.out.println(sparkSession.catalog().getFunction("ST_Geomfromtext")); // Function[name='ST_GeomFromText', className='org.apache.spark.sql.geosparksql.expressions.ST_GeomFromText$', isTemporary='true'] } catch (Exception e) { e.printStackTrace(); } // https://datasystemslab.github.io/GeoSpark/api/sql/GeoSparkSQL-Function/ Dataset<Row> dataframe = sparkSession.sql("select ST_GeomFromText('POINT(-7.07378166 33.826661)')"); dataframe.show(false); dataframe.printSchema(); /** * +---------------------------------------------+ * |st_geomfromtext(POINT(-7.07378166 33.826661))| * +---------------------------------------------+ * |POINT (-7.07378166 33.826661) | * +---------------------------------------------+ */ // using longitude and latitude column from existing dataframe Dataset<Row> df = sparkSession.sql("select -7.07378166 as longitude, 33.826661 as latitude"); df.withColumn("ST_Geomfromtext ", expr("ST_GeomFromText(CONCAT('POINT(',longitude,' ',latitude,')'))")) .show(false); /** * +-----------+---------+-----------------------------+ * |longitude |latitude |ST_Geomfromtext | * +-----------+---------+-----------------------------+ * |-7.07378166|33.826661|POINT (-7.07378166 33.826661)| * +-----------+---------+-----------------------------+ */