我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pyspark.sql.Row()。
def trainModel(docMatrix, savemodel, k, iterations=10, parallelization=16): data = mmread(docMatrix) rowRange = sc.parallelize(xrange(data.shape[0]), parallelization) dataSpark = spark.createDataFrame(rowRange .map(lambda i: Row(label=i, features=sparkToScipySparse(data.getrow(i))))) lda = LDA(k=k, maxIter=iterations) model = lda.fit(dataSpark) model.save(savemodel) topicMatrix = model.topicsMatrix().toArray() topicMatrix = topicMatrix.T topicMatrix = topicMatrix / topicMatrix.sum(axis=0) print 'TODO: give wordXtopic.mtx a path' mmwrite('wordXtopic.mtx', topicMatrix) print 'TODO: give docXtopic.mtx a path' docXTopics = model.transform(dataSpark) dxT = docXTopics.collect() dxT_v2 = np.array([ dxtI['topicDistribution'] for dxtI in dxT ]) mmwrite('docXtopic.mtx', dxT_v2) # Main script
def test_stopwordsremover(self): dataset = self.spark.createDataFrame([Row(input=["a", "panda"])]) stopWordRemover = StopWordsRemover(inputCol="input", outputCol="output") # Default self.assertEqual(stopWordRemover.getInputCol(), "input") transformedDF = stopWordRemover.transform(dataset) self.assertEqual(transformedDF.head().output, ["panda"]) self.assertEqual(type(stopWordRemover.getStopWords()), list) self.assertTrue(isinstance(stopWordRemover.getStopWords()[0], basestring)) # Custom stopwords = ["panda"] stopWordRemover.setStopWords(stopwords) self.assertEqual(stopWordRemover.getInputCol(), "input") self.assertEqual(stopWordRemover.getStopWords(), stopwords) transformedDF = stopWordRemover.transform(dataset) self.assertEqual(transformedDF.head().output, ["a"]) # with language selection stopwords = StopWordsRemover.loadDefaultStopWords("turkish") dataset = self.spark.createDataFrame([Row(input=["acaba", "ama", "biri"])]) stopWordRemover.setStopWords(stopwords) self.assertEqual(stopWordRemover.getStopWords(), stopwords) transformedDF = stopWordRemover.transform(dataset) self.assertEqual(transformedDF.head().output, [])
def test_infer_schema(self): rdd = self.sc.parallelize([Row(label=1.0, features=self.dv1), Row(label=0.0, features=self.sv1)]) df = rdd.toDF() schema = df.schema field = [f for f in schema.fields if f.name == "features"][0] self.assertEqual(field.dataType, self.udt) vectors = df.rdd.map(lambda p: p.features).collect() self.assertEqual(len(vectors), 2) for v in vectors: if isinstance(v, SparseVector): self.assertEqual(v, self.sv1) elif isinstance(v, DenseVector): self.assertEqual(v, self.dv1) else: raise TypeError("expecting a vector but got %r of type %r" % (v, type(v)))
def crossJoin(self, other): """Returns the cartesian product with another :class:`DataFrame`. :param other: Right side of the cartesian product. >>> df.select("age", "name").collect() [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')] >>> df2.select("name", "height").collect() [Row(name=u'Tom', height=80), Row(name=u'Bob', height=85)] >>> df.crossJoin(df2.select("height")).select("age", "name", "height").collect() [Row(age=2, name=u'Alice', height=80), Row(age=2, name=u'Alice', height=85), Row(age=5, name=u'Bob', height=80), Row(age=5, name=u'Bob', height=85)] """ jdf = self._jdf.crossJoin(other._jdf) return DataFrame(jdf, self.sql_ctx)
def sort(self, *cols, **kwargs): """Returns a new :class:`DataFrame` sorted by the specified column(s). :param cols: list of :class:`Column` or column names to sort by. :param ascending: boolean or list of boolean (default True). Sort ascending vs. descending. Specify list for multiple sort orders. If a list is specified, length of the list must equal length of the `cols`. >>> df.sort(df.age.desc()).collect() [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')] >>> df.sort("age", ascending=False).collect() [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')] >>> df.orderBy(df.age.desc()).collect() [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')] >>> from pyspark.sql.functions import * >>> df.sort(asc("age")).collect() [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')] >>> df.orderBy(desc("age"), "name").collect() [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')] >>> df.orderBy(["age", "name"], ascending=[0, 1]).collect() [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')] """ jdf = self._jdf.sort(self._sort_cols(cols, kwargs)) return DataFrame(jdf, self.sql_ctx)
def __getitem__(self, item): """Returns the column as a :class:`Column`. >>> df.select(df['age']).collect() [Row(age=2), Row(age=5)] >>> df[ ["name", "age"]].collect() [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)] >>> df[ df.age > 3 ].collect() [Row(age=5, name=u'Bob')] >>> df[df[0] > 3].collect() [Row(age=5, name=u'Bob')] """ if isinstance(item, basestring): jc = self._jdf.apply(item) return Column(jc) elif isinstance(item, Column): return self.filter(item) elif isinstance(item, (list, tuple)): return self.select(*item) elif isinstance(item, int): jc = self._jdf.apply(self.columns[item]) return Column(jc) else: raise TypeError("unexpected item type: %s" % type(item))
def select(self, *cols): """Projects a set of expressions and returns a new :class:`DataFrame`. :param cols: list of column names (string) or expressions (:class:`Column`). If one of the column names is '*', that column is expanded to include all columns in the current DataFrame. >>> df.select('*').collect() [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')] >>> df.select('name', 'age').collect() [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)] >>> df.select(df.name, (df.age + 10).alias('age')).collect() [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)] """ jdf = self._jdf.select(self._jcols(*cols)) return DataFrame(jdf, self.sql_ctx)
def groupBy(self, *cols): """Groups the :class:`DataFrame` using the specified columns, so we can run aggregation on them. See :class:`GroupedData` for all the available aggregate functions. :func:`groupby` is an alias for :func:`groupBy`. :param cols: list of columns to group by. Each element should be a column name (string) or an expression (:class:`Column`). >>> df.groupBy().avg().collect() [Row(avg(age)=3.5)] >>> sorted(df.groupBy('name').agg({'age': 'mean'}).collect()) [Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)] >>> sorted(df.groupBy(df.name).avg().collect()) [Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)] >>> sorted(df.groupBy(['name', df.age]).count().collect()) [Row(name=u'Alice', age=2, count=1), Row(name=u'Bob', age=5, count=1)] """ jgd = self._jdf.groupBy(self._jcols(*cols)) from pyspark.sql.group import GroupedData return GroupedData(jgd, self.sql_ctx)
def range(self, start, end=None, step=1, numPartitions=None): """ Create a :class:`DataFrame` with single :class:`pyspark.sql.types.LongType` column named ``id``, containing elements in a range from ``start`` to ``end`` (exclusive) with step value ``step``. :param start: the start value :param end: the end value (exclusive) :param step: the incremental step (default: 1) :param numPartitions: the number of partitions of the DataFrame :return: :class:`DataFrame` >>> sqlContext.range(1, 7, 2).collect() [Row(id=1), Row(id=3), Row(id=5)] If only one argument is specified, it will be used as the end value. >>> sqlContext.range(3).collect() [Row(id=0), Row(id=1), Row(id=2)] """ return self.sparkSession.range(start, end, step, numPartitions)
def tables(self, dbName=None): """Returns a :class:`DataFrame` containing names of tables in the given database. If ``dbName`` is not specified, the current database will be used. The returned DataFrame has two columns: ``tableName`` and ``isTemporary`` (a column with :class:`BooleanType` indicating if a table is a temporary one or not). :param dbName: string, name of the database to use. :return: :class:`DataFrame` >>> sqlContext.registerDataFrameAsTable(df, "table1") >>> df2 = sqlContext.tables() >>> df2.filter("tableName = 'table1'").first() Row(database=u'', tableName=u'table1', isTemporary=True) """ if dbName is None: return DataFrame(self._ssql_ctx.tables(), self) else: return DataFrame(self._ssql_ctx.tables(dbName), self)
def date_format(date, format): """ Converts a date/timestamp/string to a value of string in the format specified by the date format given by the second argument. A pattern could be for instance `dd.MM.yyyy` and could return a string like '18.03.1993'. All pattern letters of the Java class `java.text.SimpleDateFormat` can be used. .. note:: Use when ever possible specialized functions like `year`. These benefit from a specialized implementation. >>> df = spark.createDataFrame([('2015-04-08',)], ['a']) >>> df.select(date_format('a', 'MM/dd/yyy').alias('date')).collect() [Row(date=u'04/08/2015')] """ sc = SparkContext._active_spark_context return Column(sc._jvm.functions.date_format(_to_java_column(date), format))
def locate(substr, str, pos=1): """ Locate the position of the first occurrence of substr in a string column, after position pos. .. note:: The position is not zero based, but 1 based index. Returns 0 if substr could not be found in str. :param substr: a string :param str: a Column of :class:`pyspark.sql.types.StringType` :param pos: start position (zero based) >>> df = spark.createDataFrame([('abcd',)], ['s',]) >>> df.select(locate('b', df.s, 1).alias('s')).collect() [Row(s=2)] """ sc = SparkContext._active_spark_context return Column(sc._jvm.functions.locate(substr, _to_java_column(str), pos))
def regexp_extract(str, pattern, idx): """Extract a specific group matched by a Java regex, from the specified string column. If the regex did not match, or the specified group did not match, an empty string is returned. >>> df = spark.createDataFrame([('100-200',)], ['str']) >>> df.select(regexp_extract('str', '(\d+)-(\d+)', 1).alias('d')).collect() [Row(d=u'100')] >>> df = spark.createDataFrame([('foo',)], ['str']) >>> df.select(regexp_extract('str', '(\d+)', 1).alias('d')).collect() [Row(d=u'')] >>> df = spark.createDataFrame([('aaaac',)], ['str']) >>> df.select(regexp_extract('str', '(a+)(b)?(c)', 2).alias('d')).collect() [Row(d=u'')] """ sc = SparkContext._active_spark_context jc = sc._jvm.functions.regexp_extract(_to_java_column(str), pattern, idx) return Column(jc)
def create_map(*cols): """Creates a new map column. :param cols: list of column names (string) or list of :class:`Column` expressions that grouped as key-value pairs, e.g. (key1, value1, key2, value2, ...). >>> df.select(create_map('name', 'age').alias("map")).collect() [Row(map={u'Alice': 2}), Row(map={u'Bob': 5})] >>> df.select(create_map([df.name, df.age]).alias("map")).collect() [Row(map={u'Alice': 2}), Row(map={u'Bob': 5})] """ sc = SparkContext._active_spark_context if len(cols) == 1 and isinstance(cols[0], (list, set)): cols = cols[0] jc = sc._jvm.functions.map(_to_seq(sc, cols, _to_java_column)) return Column(jc)
def explode(col): """Returns a new row for each element in the given array or map. >>> from pyspark.sql import Row >>> eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})]) >>> eDF.select(explode(eDF.intlist).alias("anInt")).collect() [Row(anInt=1), Row(anInt=2), Row(anInt=3)] >>> eDF.select(explode(eDF.mapfield).alias("key", "value")).show() +---+-----+ |key|value| +---+-----+ | a| b| +---+-----+ """ sc = SparkContext._active_spark_context jc = sc._jvm.functions.explode(_to_java_column(col)) return Column(jc)
def posexplode(col): """Returns a new row for each element with position in the given array or map. >>> from pyspark.sql import Row >>> eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})]) >>> eDF.select(posexplode(eDF.intlist)).collect() [Row(pos=0, col=1), Row(pos=1, col=2), Row(pos=2, col=3)] >>> eDF.select(posexplode(eDF.mapfield)).show() +---+---+-----+ |pos|key|value| +---+---+-----+ | 0| a| b| +---+---+-----+ """ sc = SparkContext._active_spark_context jc = sc._jvm.functions.posexplode(_to_java_column(col)) return Column(jc)
def get_json_object(col, path): """ Extracts json object from a json string based on json path specified, and returns json string of the extracted json object. It will return null if the input json string is invalid. :param col: string column in json format :param path: path to the json object to extract >>> data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')] >>> df = spark.createDataFrame(data, ("key", "jstring")) >>> df.select(df.key, get_json_object(df.jstring, '$.f1').alias("c0"), \\ ... get_json_object(df.jstring, '$.f2').alias("c1") ).collect() [Row(key=u'1', c0=u'value1', c1=u'value2'), Row(key=u'2', c0=u'value12', c1=None)] """ sc = SparkContext._active_spark_context jc = sc._jvm.functions.get_json_object(_to_java_column(col), path) return Column(jc)
def to_json(col, options={}): """ Converts a column containing a [[StructType]] into a JSON string. Throws an exception, in the case of an unsupported type. :param col: name of column containing the struct :param options: options to control converting. accepts the same options as the json datasource >>> from pyspark.sql import Row >>> from pyspark.sql.types import * >>> data = [(1, Row(name='Alice', age=2))] >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(to_json(df.value).alias("json")).collect() [Row(json=u'{"age":2,"name":"Alice"}')] """ sc = SparkContext._active_spark_context jc = sc._jvm.functions.to_json(_to_java_column(col), options) return Column(jc)
def sort_array(col, asc=True): """ Collection function: sorts the input array in ascending or descending order according to the natural ordering of the array elements. :param col: name of column or expression >>> df = spark.createDataFrame([([2, 1, 3],),([1],),([],)], ['data']) >>> df.select(sort_array(df.data).alias('r')).collect() [Row(r=[1, 2, 3]), Row(r=[1]), Row(r=[])] >>> df.select(sort_array(df.data, asc=False).alias('r')).collect() [Row(r=[3, 2, 1]), Row(r=[1]), Row(r=[])] """ sc = SparkContext._active_spark_context return Column(sc._jvm.functions.sort_array(_to_java_column(col), asc)) # ---------------------------- User Defined Function ----------------------------------
def udf(f, returnType=StringType()): """Creates a :class:`Column` expression representing a user defined function (UDF). .. note:: The user-defined functions must be deterministic. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query. :param f: python function :param returnType: a :class:`pyspark.sql.types.DataType` object >>> from pyspark.sql.types import IntegerType >>> slen = udf(lambda s: len(s), IntegerType()) >>> df.select(slen(df.name).alias('slen')).collect() [Row(slen=5), Row(slen=3)] """ return UserDefinedFunction(f, returnType)
def _test(): import doctest from pyspark.sql import Row, SparkSession import pyspark.sql.functions globs = pyspark.sql.functions.__dict__.copy() spark = SparkSession.builder\ .master("local[4]")\ .appName("sql.functions tests")\ .getOrCreate() sc = spark.sparkContext globs['sc'] = sc globs['spark'] = spark globs['df'] = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', age=5)]).toDF() (failure_count, test_count) = doctest.testmod( pyspark.sql.functions, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) spark.stop() if failure_count: exit(-1)
def test_udf_with_aggregate_function(self): df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"]) from pyspark.sql.functions import udf, col, sum from pyspark.sql.types import BooleanType my_filter = udf(lambda a: a == 1, BooleanType()) sel = df.select(col("key")).distinct().filter(my_filter(col("key"))) self.assertEqual(sel.collect(), [Row(key=1)]) my_copy = udf(lambda x: x, IntegerType()) my_add = udf(lambda a, b: int(a + b), IntegerType()) my_strlen = udf(lambda x: len(x), IntegerType()) sel = df.groupBy(my_copy(col("key")).alias("k"))\ .agg(sum(my_strlen(col("value"))).alias("s"))\ .select(my_add(col("k"), col("s")).alias("t")) self.assertEqual(sel.collect(), [Row(t=4), Row(t=3)])
def test_serialize_nested_array_and_map(self): d = [Row(l=[Row(a=1, b='s')], d={"key": Row(c=1.0, d="2")})] rdd = self.sc.parallelize(d) df = self.spark.createDataFrame(rdd) row = df.head() self.assertEqual(1, len(row.l)) self.assertEqual(1, row.l[0].a) self.assertEqual("2", row.d["key"].d) l = df.rdd.map(lambda x: x.l).first() self.assertEqual(1, len(l)) self.assertEqual('s', l[0].b) d = df.rdd.map(lambda x: x.d).first() self.assertEqual(1, len(d)) self.assertEqual(1.0, d["key"].c) row = df.rdd.map(lambda x: x.d["key"]).first() self.assertEqual(1.0, row.c) self.assertEqual("2", row.d)
def test_infer_schema(self): d = [Row(l=[], d={}, s=None), Row(l=[Row(a=1, b='s')], d={"key": Row(c=1.0, d="2")}, s="")] rdd = self.sc.parallelize(d) df = self.spark.createDataFrame(rdd) self.assertEqual([], df.rdd.map(lambda r: r.l).first()) self.assertEqual([None, ""], df.rdd.map(lambda r: r.s).collect()) df.createOrReplaceTempView("test") result = self.spark.sql("SELECT l[0].a from test where d['key'].d = '2'") self.assertEqual(1, result.head()[0]) df2 = self.spark.createDataFrame(rdd, samplingRatio=1.0) self.assertEqual(df.schema, df2.schema) self.assertEqual({}, df2.rdd.map(lambda r: r.d).first()) self.assertEqual([None, ""], df2.rdd.map(lambda r: r.s).collect()) df2.createOrReplaceTempView("test2") result = self.spark.sql("SELECT l[0].a from test2 where d['key'].d = '2'") self.assertEqual(1, result.head()[0])
def test_udf_with_udt(self): from pyspark.sql.tests import ExamplePoint, ExamplePointUDT row = Row(label=1.0, point=ExamplePoint(1.0, 2.0)) df = self.spark.createDataFrame([row]) self.assertEqual(1.0, df.rdd.map(lambda r: r.point.x).first()) udf = UserDefinedFunction(lambda p: p.y, DoubleType()) self.assertEqual(2.0, df.select(udf(df.point)).first()[0]) udf2 = UserDefinedFunction(lambda p: ExamplePoint(p.x + 1, p.y + 1), ExamplePointUDT()) self.assertEqual(ExamplePoint(2.0, 3.0), df.select(udf2(df.point)).first()[0]) row = Row(label=1.0, point=PythonOnlyPoint(1.0, 2.0)) df = self.spark.createDataFrame([row]) self.assertEqual(1.0, df.rdd.map(lambda r: r.point.x).first()) udf = UserDefinedFunction(lambda p: p.y, DoubleType()) self.assertEqual(2.0, df.select(udf(df.point)).first()[0]) udf2 = UserDefinedFunction(lambda p: PythonOnlyPoint(p.x + 1, p.y + 1), PythonOnlyUDT()) self.assertEqual(PythonOnlyPoint(2.0, 3.0), df.select(udf2(df.point)).first()[0])
def test_parquet_with_udt(self): from pyspark.sql.tests import ExamplePoint, ExamplePointUDT row = Row(label=1.0, point=ExamplePoint(1.0, 2.0)) df0 = self.spark.createDataFrame([row]) output_dir = os.path.join(self.tempdir.name, "labeled_point") df0.write.parquet(output_dir) df1 = self.spark.read.parquet(output_dir) point = df1.head().point self.assertEqual(point, ExamplePoint(1.0, 2.0)) row = Row(label=1.0, point=PythonOnlyPoint(1.0, 2.0)) df0 = self.spark.createDataFrame([row]) df0.write.parquet(output_dir, mode='overwrite') df1 = self.spark.read.parquet(output_dir) point = df1.head().point self.assertEqual(point, PythonOnlyPoint(1.0, 2.0))
def test_union_with_udt(self): from pyspark.sql.tests import ExamplePoint, ExamplePointUDT row1 = (1.0, ExamplePoint(1.0, 2.0)) row2 = (2.0, ExamplePoint(3.0, 4.0)) schema = StructType([StructField("label", DoubleType(), False), StructField("point", ExamplePointUDT(), False)]) df1 = self.spark.createDataFrame([row1], schema) df2 = self.spark.createDataFrame([row2], schema) result = df1.union(df2).orderBy("label").collect() self.assertEqual( result, [ Row(label=1.0, point=ExamplePoint(1.0, 2.0)), Row(label=2.0, point=ExamplePoint(3.0, 4.0)) ] )
def test_first_last_ignorenulls(self): from pyspark.sql import functions df = self.spark.range(0, 100) df2 = df.select(functions.when(df.id % 3 == 0, None).otherwise(df.id).alias("id")) df3 = df2.select(functions.first(df2.id, False).alias('a'), functions.first(df2.id, True).alias('b'), functions.last(df2.id, False).alias('c'), functions.last(df2.id, True).alias('d')) self.assertEqual([Row(a=None, b=1, c=None, d=98)], df3.collect())
def test_infer_long_type(self): longrow = [Row(f1='a', f2=100000000000000)] df = self.sc.parallelize(longrow).toDF() self.assertEqual(df.schema.fields[1].dataType, LongType()) # this saving as Parquet caused issues as well. output_dir = os.path.join(self.tempdir.name, "infer_long_type") df.write.parquet(output_dir) df1 = self.spark.read.parquet(output_dir) self.assertEqual('a', df1.first().f1) self.assertEqual(100000000000000, df1.first().f2) self.assertEqual(_infer_type(1), LongType()) self.assertEqual(_infer_type(2**10), LongType()) self.assertEqual(_infer_type(2**20), LongType()) self.assertEqual(_infer_type(2**31 - 1), LongType()) self.assertEqual(_infer_type(2**31), LongType()) self.assertEqual(_infer_type(2**61), LongType()) self.assertEqual(_infer_type(2**71), LongType())
def text(self, paths): """ Loads text files and returns a :class:`DataFrame` whose schema starts with a string column named "value", and followed by partitioned columns if there are any. Each line in the text file is a new row in the resulting DataFrame. :param paths: string, or list of strings, for input path(s). >>> df = spark.read.text('python/test_support/sql/text-test.txt') >>> df.collect() [Row(value=u'hello'), Row(value=u'this')] """ if isinstance(paths, basestring): paths = [paths] return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths)))
def _monkey_patch_RDD(sparkSession): def toDF(self, schema=None, sampleRatio=None): """ Converts current :class:`RDD` into a :class:`DataFrame` This is a shorthand for ``spark.createDataFrame(rdd, schema, sampleRatio)`` :param schema: a :class:`pyspark.sql.types.StructType` or list of names of columns :param samplingRatio: the sample ratio of rows used for inferring :return: a DataFrame >>> rdd.toDF().collect() [Row(name=u'Alice', age=1)] """ return sparkSession.createDataFrame(self, schema, sampleRatio) RDD.toDF = toDF
def _inferSchemaFromList(self, data): """ Infer schema from list of Row or tuple. :param data: list of Row or tuple :return: :class:`pyspark.sql.types.StructType` """ if not data: raise ValueError("can not infer schema from empty dataset") first = data[0] if type(first) is dict: warnings.warn("inferring schema from dict is deprecated," "please use pyspark.sql.Row instead") schema = reduce(_merge_type, map(_infer_schema, data)) if _has_nulltype(schema): raise ValueError("Some of types cannot be determined after inferring") return schema
def _test(): import os import doctest from pyspark.context import SparkContext from pyspark.sql import Row import pyspark.sql.session os.chdir(os.environ["SPARK_HOME"]) globs = pyspark.sql.session.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') globs['sc'] = sc globs['spark'] = SparkSession(sc) globs['rdd'] = rdd = sc.parallelize( [Row(field1=1, field2="row1"), Row(field1=2, field2="row2"), Row(field1=3, field2="row3")]) globs['df'] = rdd.toDF() (failure_count, test_count) = doctest.testmod( pyspark.sql.session, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) globs['sc'].stop() if failure_count: exit(-1)
def process(time, rdd): print("========= %s =========" % str(time)) try: # Get the singleton instance of SparkSession spark = getSparkSessionInstance(rdd.context.getConf()) # Convert RDD[String] to RDD[Row] to DataFrame rowRdd = rdd.map(lambda w: Row(word=w)) wordsDataFrame = spark.createDataFrame(rowRdd) # Creates a temporary view using the DataFrame. wordsDataFrame.createOrReplaceTempView("words") # Do word count on table using SQL and print it wordCountsDataFrame = \ spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() except: pass
def schema_inference_example(spark): # $example on:schema_inferring$ sc = spark.sparkContext # Load a text file and convert each line to a Row. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) # Infer the schema, and register the DataFrame as a table. schemaPeople = spark.createDataFrame(people) schemaPeople.createOrReplaceTempView("people") # SQL can be run over DataFrames that have been registered as a table. teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # The results of SQL queries are Dataframe objects. # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`. teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect() for name in teenNames: print(name) # Name: Justin # $example off:schema_inferring$
def test_map_rows_sql_1(self): data = [Row(x=float(x)) for x in range(5)] df = self.sql.createDataFrame(data) with IsolatedSession() as issn: # The placeholder that corresponds to column 'x' as a whole column x = tf.placeholder(tf.double, shape=[], name="x") # The output that adds 3 to x z = tf.add(x, 3, name='z') # Let's register these computations in SQL. makeGraphUDF(issn.graph, "map_rows_sql_1", [z]) # Here we go, for the SQL users, straight from PySpark. df2 = df.selectExpr("map_rows_sql_1(x) AS z") print("df2 = %s" % df2) data2 = df2.collect() assert data2[0].z == 3.0, data2
def test_map_blocks_sql_1(self): data = [Row(x=float(x)) for x in range(5)] df = self.sql.createDataFrame(data) with IsolatedSession() as issn: # The placeholder that corresponds to column 'x' as a whole column x = tf.placeholder(tf.double, shape=[None], name="x") # The output that adds 3 to x z = tf.add(x, 3, name='z') # Let's register these computations in SQL. makeGraphUDF(issn.graph, "map_blocks_sql_1", [z], blocked=True) # Here we go, for the SQL users, straight from PySpark. df2 = df.selectExpr("map_blocks_sql_1(x) AS z") print("df2 = %s" % df2) data2 = df2.collect() assert len(data2) == 5, data2 assert data2[0].z == 3.0, data2
def df_to_rdd(line): """Converts a pyspark ``Row`` element to numpy array Parameters ---------- line : pyspark.sql.types.Row A line dataframe.rdd .. note:: The dataframe should contain only numbers. Also the method can be invoked on ``dataframe.rdd`` as ``dataFrame`` objects have no attribute 'map' Returns ------- numpy.ndarray A numpy array representation of the data contained in ``line`` """ return np.append(line.label,np.array(line.features))
def schema_inference_example(spark): # $example on:schema_inferring$ #??????? sc = spark.sparkContext #??text???????????row lines = sc.textFile("/Users/xieenze/Desktop/spark/sparksql/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) # ??schema, ???? DataFrame ?????. schemaPeople = spark.createDataFrame(people) schemaPeople.createOrReplaceTempView("people") teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # ??????DF #p?row???(row.name row.age) teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect() for name in teenNames: print(name) # Name: Justin # $example off:schema_inferring$
def pivot(self, pivot_col, values=None): """ Pivots a column of the current [[DataFrame]] and perform the specified aggregation. There are two versions of pivot function: one that requires the caller to specify the list of distinct values to pivot on, and one that does not. The latter is more concise but less efficient, because Spark needs to first compute the list of distinct values internally. :param pivot_col: Name of the column to pivot. :param values: List of values that will be translated to columns in the output DataFrame. // Compute the sum of earnings for each year by course with each course as a separate column >>> df4.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").collect() [Row(year=2012, dotNET=15000, Java=20000), Row(year=2013, dotNET=48000, Java=30000)] // Or without specifying column values (less efficient) >>> df4.groupBy("year").pivot("course").sum("earnings").collect() [Row(year=2012, Java=20000, dotNET=15000), Row(year=2013, Java=30000, dotNET=48000)] """ if values is None: jgd = self._jdf.pivot(pivot_col) else: jgd = self._jdf.pivot(pivot_col, values) return GroupedData(jgd, self.sql_ctx)
def head(self, n=None): """Returns the first ``n`` rows. :param n: int, default 1. Number of rows to return. :return: If n is greater than 1, return a list of :class:`Row`. If n is 1, return a single Row. >>> df.head() Row(age=2, name=u'Alice') >>> df.head(1) [Row(age=2, name=u'Alice')] """ if n is None: rs = self.head(1) return rs[0] if rs else None return self.take(n)
def mapper(line): fields = line.split(',') return Row(ID=int(fields[0]), name=str(fields[1].encode("utf-8")), age=int(fields[2]), numFriends=int(fields[3]))
def create_test_df(self): test_list = [(1, 2), (2, 3), (3, 4)] rdd = self.sc.parallelize(test_list) rdd_f = rdd.map(lambda x: Row(value=x[0], value2=x[1])) return self.sqlCtx.createDataFrame(rdd_f)
def test_add_column_non_numeric(self): """Should raise an ValueError if a non-numeric column is added""" test_list = ['a', 'b'] rdd = self.sc.parallelize(test_list) rdd_f = rdd.map(lambda x: Row(value=x)) spark_df = self.sqlCtx.createDataFrame(rdd_f) hist = Histogram() with self.assertRaises(ValueError): hist.add_column(spark_df)
def test_java_params(self): """ This tests a bug fixed by SPARK-18274 which causes multiple copies of a Params instance in Python to be linked to the same Java instance. """ evaluator = RegressionEvaluator(metricName="r2") df = self.spark.createDataFrame([Row(label=1.0, prediction=1.1)]) evaluator.evaluate(df) self.assertEqual(evaluator._java_obj.getMetricName(), "r2") evaluatorCopy = evaluator.copy({evaluator.metricName: "mae"}) evaluator.evaluate(df) evaluatorCopy.evaluate(df) self.assertEqual(evaluator._java_obj.getMetricName(), "r2") self.assertEqual(evaluatorCopy._java_obj.getMetricName(), "mae")
def test_ngram(self): dataset = self.spark.createDataFrame([ Row(input=["a", "b", "c", "d", "e"])]) ngram0 = NGram(n=4, inputCol="input", outputCol="output") self.assertEqual(ngram0.getN(), 4) self.assertEqual(ngram0.getInputCol(), "input") self.assertEqual(ngram0.getOutputCol(), "output") transformedDF = ngram0.transform(dataset) self.assertEqual(transformedDF.head().output, ["a b c d", "b c d e"])
def agg(self, *exprs): """Compute aggregates and returns the result as a :class:`DataFrame`. The available aggregate functions are `avg`, `max`, `min`, `sum`, `count`. If ``exprs`` is a single :class:`dict` mapping from string to string, then the key is the column to perform aggregation on, and the value is the aggregate function. Alternatively, ``exprs`` can also be a list of aggregate :class:`Column` expressions. :param exprs: a dict mapping from column name (string) to aggregate functions (string), or a list of :class:`Column`. >>> gdf = df.groupBy(df.name) >>> sorted(gdf.agg({"*": "count"}).collect()) [Row(name=u'Alice', count(1)=1), Row(name=u'Bob', count(1)=1)] >>> from pyspark.sql import functions as F >>> sorted(gdf.agg(F.min(df.age)).collect()) [Row(name=u'Alice', min(age)=2), Row(name=u'Bob', min(age)=5)] """ assert exprs, "exprs should not be empty" if len(exprs) == 1 and isinstance(exprs[0], dict): jdf = self._jgd.agg(exprs[0]) else: # Columns assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column" jdf = self._jgd.agg(exprs[0]._jc, _to_seq(self.sql_ctx._sc, [c._jc for c in exprs[1:]])) return DataFrame(jdf, self.sql_ctx)
def mean(self, *cols): """Computes average values for each numeric columns for each group. :func:`mean` is an alias for :func:`avg`. :param cols: list of column names (string). Non-numeric columns are ignored. >>> df.groupBy().mean('age').collect() [Row(avg(age)=3.5)] >>> df3.groupBy().mean('age', 'height').collect() [Row(avg(age)=3.5, avg(height)=82.5)] """
def avg(self, *cols): """Computes average values for each numeric columns for each group. :func:`mean` is an alias for :func:`avg`. :param cols: list of column names (string). Non-numeric columns are ignored. >>> df.groupBy().avg('age').collect() [Row(avg(age)=3.5)] >>> df3.groupBy().avg('age', 'height').collect() [Row(avg(age)=3.5, avg(height)=82.5)] """