def trainModel(docMatrix, savemodel, k, iterations=10, parallelization=16):
    data = mmread(docMatrix)
    rowRange = sc.parallelize(xrange(data.shape[0]), parallelization)
    dataSpark = spark.createDataFrame(rowRange
            .map(lambda i: Row(label=i, features=sparkToScipySparse(data.getrow(i)))))
    lda = LDA(k=k, maxIter=iterations)
    model =

    topicMatrix = model.topicsMatrix().toArray()
    topicMatrix = topicMatrix.T
    topicMatrix = topicMatrix / topicMatrix.sum(axis=0)
    print 'TODO: give wordXtopic.mtx a path'
    mmwrite('wordXtopic.mtx', topicMatrix)

    print 'TODO: give docXtopic.mtx a path'
    docXTopics = model.transform(dataSpark)
    dxT = docXTopics.collect()
    dxT_v2 = np.array([ dxtI['topicDistribution'] for dxtI in dxT ])
    mmwrite('docXtopic.mtx', dxT_v2)

def test_stopwordsremover(self):
        dataset = self.spark.createDataFrame([Row(input=["a", "panda"])])
        stopWordRemover = StopWordsRemover(inputCol="input", outputCol="output")
        # Default
        self.assertEqual(stopWordRemover.getInputCol(), "input")
        transformedDF = stopWordRemover.transform(dataset)
        self.assertEqual(transformedDF.head().output, ["panda"])
        self.assertEqual(type(stopWordRemover.getStopWords()), list)
        self.assertTrue(isinstance(stopWordRemover.getStopWords()[0], basestring))
        # Custom
        stopwords = ["panda"]
        self.assertEqual(stopWordRemover.getInputCol(), "input")
        self.assertEqual(stopWordRemover.getStopWords(), stopwords)
        transformedDF = stopWordRemover.transform(dataset)
        self.assertEqual(transformedDF.head().output, ["a"])
        # with language selection
        stopwords = StopWordsRemover.loadDefaultStopWords("turkish")
        dataset = self.spark.createDataFrame([Row(input=["acaba", "ama", "biri"])])
        self.assertEqual(stopWordRemover.getStopWords(), stopwords)
        transformedDF = stopWordRemover.transform(dataset)
        self.assertEqual(transformedDF.head().output, [])
def test_infer_schema(self):
        rdd =[Row(label=1.0, features=self.dv1),
                                   Row(label=0.0, features=self.sv1)])
        df = rdd.toDF()
        schema = df.schema
        field = [f for f in schema.fields if == "features"][0]
        self.assertEqual(field.dataType, self.udt)
        vectors = p: p.features).collect()
        self.assertEqual(len(vectors), 2)
        for v in vectors:
            if isinstance(v, SparseVector):
                self.assertEqual(v, self.sv1)
            elif isinstance(v, DenseVector):
                self.assertEqual(v, self.dv1)
                raise TypeError("expecting a vector but got %r of type %r" % (v, type(v)))
def crossJoin(self, other):
        """Returns the cartesian product with another :class:`DataFrame`.

        :param other: Right side of the cartesian product.

        >>>"age", "name").collect()
        [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
        >>>"name", "height").collect()
        [Row(name=u'Tom', height=80), Row(name=u'Bob', height=85)]
        >>> df.crossJoin("height")).select("age", "name", "height").collect()
        [Row(age=2, name=u'Alice', height=80), Row(age=2, name=u'Alice', height=85),
         Row(age=5, name=u'Bob', height=80), Row(age=5, name=u'Bob', height=85)]

        jdf = self._jdf.crossJoin(other._jdf)
        return DataFrame(jdf, self.sql_ctx)
def sort(self, *cols, **kwargs):
        """Returns a new :class:`DataFrame` sorted by the specified column(s).

        :param cols: list of :class:`Column` or column names to sort by.
        :param ascending: boolean or list of boolean (default True).
            Sort ascending vs. descending. Specify list for multiple sort orders.
            If a list is specified, length of the list must equal length of the `cols`.

        >>> df.sort(df.age.desc()).collect()
        [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
        >>> df.sort("age", ascending=False).collect()
        [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
        >>> df.orderBy(df.age.desc()).collect()
        [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
        >>> from pyspark.sql.functions import *
        >>> df.sort(asc("age")).collect()
        [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
        >>> df.orderBy(desc("age"), "name").collect()
        [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
        >>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
        [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
        jdf = self._jdf.sort(self._sort_cols(cols, kwargs))
        return DataFrame(jdf, self.sql_ctx)
def __getitem__(self, item):
        """Returns the column as a :class:`Column`.

        [Row(age=2), Row(age=5)]
        >>> df[ ["name", "age"]].collect()
        [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
        >>> df[ df.age > 3 ].collect()
        [Row(age=5, name=u'Bob')]
        >>> df[df[0] > 3].collect()
        [Row(age=5, name=u'Bob')]
        if isinstance(item, basestring):
            jc = self._jdf.apply(item)
            return Column(jc)
        elif isinstance(item, Column):
            return self.filter(item)
        elif isinstance(item, (list, tuple)):
        elif isinstance(item, int):
            jc = self._jdf.apply(self.columns[item])
            return Column(jc)
            raise TypeError("unexpected item type: %s" % type(item))
def select(self, *cols):
        """Projects a set of expressions and returns a new :class:`DataFrame`.

        :param cols: list of column names (string) or expressions (:class:`Column`).
            If one of the column names is '*', that column is expanded to include all columns
            in the current DataFrame.

        [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
        >>>'name', 'age').collect()
        [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
        >>>, (df.age + 10).alias('age')).collect()
        [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
        jdf =*cols))
        return DataFrame(jdf, self.sql_ctx)
def groupBy(self, *cols):
        """Groups the :class:`DataFrame` using the specified columns,
        so we can run aggregation on them. See :class:`GroupedData`
        for all the available aggregate functions.

        :func:`groupby` is an alias for :func:`groupBy`.

        :param cols: list of columns to group by.
            Each element should be a column name (string) or an expression (:class:`Column`).

        >>> df.groupBy().avg().collect()
        >>> sorted(df.groupBy('name').agg({'age': 'mean'}).collect())
        [Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)]
        >>> sorted(df.groupBy(
        [Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)]
        >>> sorted(df.groupBy(['name', df.age]).count().collect())
        [Row(name=u'Alice', age=2, count=1), Row(name=u'Bob', age=5, count=1)]
        jgd = self._jdf.groupBy(self._jcols(*cols))
        from import GroupedData
        return GroupedData(jgd, self.sql_ctx)
def range(self, start, end=None, step=1, numPartitions=None):
        Create a :class:`DataFrame` with single :class:`pyspark.sql.types.LongType` column named
        ``id``, containing elements in a range from ``start`` to ``end`` (exclusive) with
        step value ``step``.

        :param start: the start value
        :param end: the end value (exclusive)
        :param step: the incremental step (default: 1)
        :param numPartitions: the number of partitions of the DataFrame
        :return: :class:`DataFrame`

        >>> sqlContext.range(1, 7, 2).collect()
        [Row(id=1), Row(id=3), Row(id=5)]

        If only one argument is specified, it will be used as the end value.

        >>> sqlContext.range(3).collect()
        [Row(id=0), Row(id=1), Row(id=2)]
        return self.sparkSession.range(start, end, step, numPartitions)
def tables(self, dbName=None):
        """Returns a :class:`DataFrame` containing names of tables in the given database.

        If ``dbName`` is not specified, the current database will be used.

        The returned DataFrame has two columns: ``tableName`` and ``isTemporary``
        (a column with :class:`BooleanType` indicating if a table is a temporary one or not).

        :param dbName: string, name of the database to use.
        :return: :class:`DataFrame`

        >>> sqlContext.registerDataFrameAsTable(df, "table1")
        >>> df2 = sqlContext.tables()
        >>> df2.filter("tableName = 'table1'").first()
        Row(database=u'', tableName=u'table1', isTemporary=True)
        if dbName is None:
            return DataFrame(self._ssql_ctx.tables(), self)
            return DataFrame(self._ssql_ctx.tables(dbName), self)
def date_format(date, format):
    Converts a date/timestamp/string to a value of string in the format specified by the date
    format given by the second argument.

    A pattern could be for instance `dd.MM.yyyy` and could return a string like '18.03.1993'. All
    pattern letters of the Java class `java.text.SimpleDateFormat` can be used.

    .. note:: Use when ever possible specialized functions like `year`. These benefit from a
        specialized implementation.

    >>> df = spark.createDataFrame([('2015-04-08',)], ['a'])
    >>>'a', 'MM/dd/yyy').alias('date')).collect()
    sc = SparkContext._active_spark_context
    return Column(sc._jvm.functions.date_format(_to_java_column(date), format))
def locate(substr, str, pos=1):
    Locate the position of the first occurrence of substr in a string column, after position pos.

    .. note:: The position is not zero based, but 1 based index. Returns 0 if substr
        could not be found in str.

    :param substr: a string
    :param str: a Column of :class:`pyspark.sql.types.StringType`
    :param pos: start position (zero based)

    >>> df = spark.createDataFrame([('abcd',)], ['s',])
    >>>'b', df.s, 1).alias('s')).collect()
    sc = SparkContext._active_spark_context
    return Column(sc._jvm.functions.locate(substr, _to_java_column(str), pos))
def regexp_extract(str, pattern, idx):
    """Extract a specific group matched by a Java regex, from the specified string column.
    If the regex did not match, or the specified group did not match, an empty string is returned.

    >>> df = spark.createDataFrame([('100-200',)], ['str'])
    >>>'str', '(\d+)-(\d+)', 1).alias('d')).collect()
    >>> df = spark.createDataFrame([('foo',)], ['str'])
    >>>'str', '(\d+)', 1).alias('d')).collect()
    >>> df = spark.createDataFrame([('aaaac',)], ['str'])
    >>>'str', '(a+)(b)?(c)', 2).alias('d')).collect()
    sc = SparkContext._active_spark_context
    jc = sc._jvm.functions.regexp_extract(_to_java_column(str), pattern, idx)
    return Column(jc)
def create_map(*cols):
    """Creates a new map column.

    :param cols: list of column names (string) or list of :class:`Column` expressions that grouped
        as key-value pairs, e.g. (key1, value1, key2, value2, ...).

    >>>'name', 'age').alias("map")).collect()
    [Row(map={u'Alice': 2}), Row(map={u'Bob': 5})]
    >>>[, df.age]).alias("map")).collect()
    [Row(map={u'Alice': 2}), Row(map={u'Bob': 5})]
    sc = SparkContext._active_spark_context
    if len(cols) == 1 and isinstance(cols[0], (list, set)):
        cols = cols[0]
    jc =, cols, _to_java_column))
    return Column(jc)
def explode(col):
    """Returns a new row for each element in the given array or map.

    >>> from pyspark.sql import Row
    >>> eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
    [Row(anInt=1), Row(anInt=2), Row(anInt=3)]

    >>>"key", "value")).show()
    |  a|    b|
    sc = SparkContext._active_spark_context
    jc = sc._jvm.functions.explode(_to_java_column(col))
    return Column(jc)
def posexplode(col):
    """Returns a new row for each element with position in the given array or map.

    >>> from pyspark.sql import Row
    >>> eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
    [Row(pos=0, col=1), Row(pos=1, col=2), Row(pos=2, col=3)]

    |  0|  a|    b|
    sc = SparkContext._active_spark_context
    jc = sc._jvm.functions.posexplode(_to_java_column(col))
    return Column(jc)
def get_json_object(col, path):
    Extracts json object from a json string based on json path specified, and returns json string
    of the extracted json object. It will return null if the input json string is invalid.

    :param col: string column in json format
    :param path: path to the json object to extract

    >>> data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')]
    >>> df = spark.createDataFrame(data, ("key", "jstring"))
    >>>, get_json_object(df.jstring, '$.f1').alias("c0"), \\
    ...                   get_json_object(df.jstring, '$.f2').alias("c1") ).collect()
    [Row(key=u'1', c0=u'value1', c1=u'value2'), Row(key=u'2', c0=u'value12', c1=None)]
    sc = SparkContext._active_spark_context
    jc = sc._jvm.functions.get_json_object(_to_java_column(col), path)
    return Column(jc)
def to_json(col, options={}):
    Converts a column containing a [[StructType]] into a JSON string. Throws an exception,
    in the case of an unsupported type.

    :param col: name of column containing the struct
    :param options: options to control converting. accepts the same options as the json datasource

    >>> from pyspark.sql import Row
    >>> from pyspark.sql.types import *
    >>> data = [(1, Row(name='Alice', age=2))]
    >>> df = spark.createDataFrame(data, ("key", "value"))

    sc = SparkContext._active_spark_context
    jc = sc._jvm.functions.to_json(_to_java_column(col), options)
    return Column(jc)
def sort_array(col, asc=True):
    Collection function: sorts the input array in ascending or descending order according
    to the natural ordering of the array elements.

    :param col: name of column or expression

    >>> df = spark.createDataFrame([([2, 1, 3],),([1],),([],)], ['data'])
    [Row(r=[1, 2, 3]), Row(r=[1]), Row(r=[])]
    >>>, asc=False).alias('r')).collect()
    [Row(r=[3, 2, 1]), Row(r=[1]), Row(r=[])]
    sc = SparkContext._active_spark_context
    return Column(sc._jvm.functions.sort_array(_to_java_column(col), asc))

# ---------------------------- User Defined Function ----------------------------------
def udf(f, returnType=StringType()):
    """Creates a :class:`Column` expression representing a user defined function (UDF).

    .. note:: The user-defined functions must be deterministic. Due to optimization,
        duplicate invocations may be eliminated or the function may even be invoked more times than
        it is present in the query.

    :param f: python function
    :param returnType: a :class:`pyspark.sql.types.DataType` object

    >>> from pyspark.sql.types import IntegerType
    >>> slen = udf(lambda s: len(s), IntegerType())
    [Row(slen=5), Row(slen=3)]
    return UserDefinedFunction(f, returnType)
def _test():
    import doctest
    from pyspark.sql import Row, SparkSession
    import pyspark.sql.functions
    globs = pyspark.sql.functions.__dict__.copy()
    spark = SparkSession.builder\
        .appName("sql.functions tests")\
    sc = spark.sparkContext
    globs['sc'] = sc
    globs['spark'] = spark
    globs['df'] = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', age=5)]).toDF()
    (failure_count, test_count) = doctest.testmod(
        pyspark.sql.functions, globs=globs,
        optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
    if failure_count:
def test_udf_with_aggregate_function(self):
        df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
        from pyspark.sql.functions import udf, col, sum
        from pyspark.sql.types import BooleanType

        my_filter = udf(lambda a: a == 1, BooleanType())
        sel ="key")).distinct().filter(my_filter(col("key")))
        self.assertEqual(sel.collect(), [Row(key=1)])

        my_copy = udf(lambda x: x, IntegerType())
        my_add = udf(lambda a, b: int(a + b), IntegerType())
        my_strlen = udf(lambda x: len(x), IntegerType())
        sel = df.groupBy(my_copy(col("key")).alias("k"))\
            .select(my_add(col("k"), col("s")).alias("t"))
        self.assertEqual(sel.collect(), [Row(t=4), Row(t=3)])
def test_serialize_nested_array_and_map(self):
        d = [Row(l=[Row(a=1, b='s')], d={"key": Row(c=1.0, d="2")})]
        rdd =
        df = self.spark.createDataFrame(rdd)
        row = df.head()
        self.assertEqual(1, len(row.l))
        self.assertEqual(1, row.l[0].a)
        self.assertEqual("2", row.d["key"].d)

        l = x: x.l).first()
        self.assertEqual(1, len(l))
        self.assertEqual('s', l[0].b)

        d = x: x.d).first()
        self.assertEqual(1, len(d))
        self.assertEqual(1.0, d["key"].c)

        row = x: x.d["key"]).first()
        self.assertEqual(1.0, row.c)
        self.assertEqual("2", row.d)
def test_infer_schema(self):
        d = [Row(l=[], d={}, s=None),
             Row(l=[Row(a=1, b='s')], d={"key": Row(c=1.0, d="2")}, s="")]
        rdd =
        df = self.spark.createDataFrame(rdd)
        self.assertEqual([], r: r.l).first())
        self.assertEqual([None, ""], r: r.s).collect())
        result = self.spark.sql("SELECT l[0].a from test where d['key'].d = '2'")
        self.assertEqual(1, result.head()[0])

        df2 = self.spark.createDataFrame(rdd, samplingRatio=1.0)
        self.assertEqual(df.schema, df2.schema)
        self.assertEqual({}, r: r.d).first())
        self.assertEqual([None, ""], r: r.s).collect())
        result = self.spark.sql("SELECT l[0].a from test2 where d['key'].d = '2'")
        self.assertEqual(1, result.head()[0])
def test_udf_with_udt(self):
        from pyspark.sql.tests import ExamplePoint, ExamplePointUDT
        row = Row(label=1.0, point=ExamplePoint(1.0, 2.0))
        df = self.spark.createDataFrame([row])
        self.assertEqual(1.0, r: r.point.x).first())
        udf = UserDefinedFunction(lambda p: p.y, DoubleType())
        udf2 = UserDefinedFunction(lambda p: ExamplePoint(p.x + 1, p.y + 1), ExamplePointUDT())
        self.assertEqual(ExamplePoint(2.0, 3.0),[0])

        row = Row(label=1.0, point=PythonOnlyPoint(1.0, 2.0))
        df = self.spark.createDataFrame([row])
        self.assertEqual(1.0, r: r.point.x).first())
        udf = UserDefinedFunction(lambda p: p.y, DoubleType())
        udf2 = UserDefinedFunction(lambda p: PythonOnlyPoint(p.x + 1, p.y + 1), PythonOnlyUDT())
        self.assertEqual(PythonOnlyPoint(2.0, 3.0),[0])
def test_parquet_with_udt(self):
        from pyspark.sql.tests import ExamplePoint, ExamplePointUDT
        row = Row(label=1.0, point=ExamplePoint(1.0, 2.0))
        df0 = self.spark.createDataFrame([row])
        output_dir = os.path.join(, "labeled_point")
        df1 =
        point = df1.head().point
        self.assertEqual(point, ExamplePoint(1.0, 2.0))

        row = Row(label=1.0, point=PythonOnlyPoint(1.0, 2.0))
        df0 = self.spark.createDataFrame([row])
        df0.write.parquet(output_dir, mode='overwrite')
        df1 =
        point = df1.head().point
        self.assertEqual(point, PythonOnlyPoint(1.0, 2.0))
def test_union_with_udt(self):
        from pyspark.sql.tests import ExamplePoint, ExamplePointUDT
        row1 = (1.0, ExamplePoint(1.0, 2.0))
        row2 = (2.0, ExamplePoint(3.0, 4.0))
        schema = StructType([StructField("label", DoubleType(), False),
                             StructField("point", ExamplePointUDT(), False)])
        df1 = self.spark.createDataFrame([row1], schema)
        df2 = self.spark.createDataFrame([row2], schema)

        result = df1.union(df2).orderBy("label").collect()
                Row(label=1.0, point=ExamplePoint(1.0, 2.0)),
                Row(label=2.0, point=ExamplePoint(3.0, 4.0))
def test_first_last_ignorenulls(self):
        from pyspark.sql import functions
        df = self.spark.range(0, 100)
        df2 = % 3 == 0, None).otherwise("id"))
        df3 =, False).alias('a'),
                         functions.first(, True).alias('b'),
                         functions.last(, False).alias('c'),
                         functions.last(, True).alias('d'))
        self.assertEqual([Row(a=None, b=1, c=None, d=98)], df3.collect())
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def test_infer_long_type(self):
        longrow = [Row(f1='a', f2=100000000000000)]
        df =
        self.assertEqual(df.schema.fields[1].dataType, LongType())

        # this saving as Parquet caused issues as well.
        output_dir = os.path.join(, "infer_long_type")
        df1 =
        self.assertEqual('a', df1.first().f1)
        self.assertEqual(100000000000000, df1.first().f2)

        self.assertEqual(_infer_type(1), LongType())
        self.assertEqual(_infer_type(2**10), LongType())
        self.assertEqual(_infer_type(2**20), LongType())
        self.assertEqual(_infer_type(2**31 - 1), LongType())
        self.assertEqual(_infer_type(2**31), LongType())
        self.assertEqual(_infer_type(2**61), LongType())
        self.assertEqual(_infer_type(2**71), LongType())
def text(self, paths):
        Loads text files and returns a :class:`DataFrame` whose schema starts with a
        string column named "value", and followed by partitioned columns if there
        are any.

        Each line in the text file is a new row in the resulting DataFrame.

        :param paths: string, or list of strings, for input path(s).

        >>> df ='python/test_support/sql/text-test.txt')
        >>> df.collect()
        [Row(value=u'hello'), Row(value=u'this')]
        if isinstance(paths, basestring):
            paths = [paths]
        return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths)))
def _monkey_patch_RDD(sparkSession):
    def toDF(self, schema=None, sampleRatio=None):
        Converts current :class:`RDD` into a :class:`DataFrame`

        This is a shorthand for ``spark.createDataFrame(rdd, schema, sampleRatio)``

        :param schema: a :class:`pyspark.sql.types.StructType` or list of names of columns
        :param samplingRatio: the sample ratio of rows used for inferring
        :return: a DataFrame

        >>> rdd.toDF().collect()
        [Row(name=u'Alice', age=1)]
        return sparkSession.createDataFrame(self, schema, sampleRatio)

    RDD.toDF = toDF
def _inferSchemaFromList(self, data):
        Infer schema from list of Row or tuple.

        :param data: list of Row or tuple
        :return: :class:`pyspark.sql.types.StructType`
        if not data:
            raise ValueError("can not infer schema from empty dataset")
        first = data[0]
        if type(first) is dict:
            warnings.warn("inferring schema from dict is deprecated,"
                          "please use pyspark.sql.Row instead")
        schema = reduce(_merge_type, map(_infer_schema, data))
        if _has_nulltype(schema):
            raise ValueError("Some of types cannot be determined after inferring")
        return schema
def _test():
    import os
    import doctest
    from pyspark.context import SparkContext
    from pyspark.sql import Row
    import pyspark.sql.session


    globs = pyspark.sql.session.__dict__.copy()
    sc = SparkContext('local[4]', 'PythonTest')
    globs['sc'] = sc
    globs['spark'] = SparkSession(sc)
    globs['rdd'] = rdd = sc.parallelize(
        [Row(field1=1, field2="row1"),
         Row(field1=2, field2="row2"),
         Row(field1=3, field2="row3")])
    globs['df'] = rdd.toDF()
    (failure_count, test_count) = doctest.testmod(
        pyspark.sql.session, globs=globs,
        optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
    if failure_count:
def process(time, rdd):
        print("========= %s =========" % str(time))

            # Get the singleton instance of SparkSession
            spark = getSparkSessionInstance(rdd.context.getConf())

            # Convert RDD[String] to RDD[Row] to DataFrame
            rowRdd = w: Row(word=w))
            wordsDataFrame = spark.createDataFrame(rowRdd)

            # Creates a temporary view using the DataFrame.

            # Do word count on table using SQL and print it
            wordCountsDataFrame = \
                spark.sql("select word, count(*) as total from words group by word")
def schema_inference_example(spark):
    # $example on:schema_inferring$
    sc = spark.sparkContext

    # Load a text file and convert each line to a Row.
    lines = sc.textFile("examples/src/main/resources/people.txt")
    parts = l: l.split(","))
    people = p: Row(name=p[0], age=int(p[1])))

    # Infer the schema, and register the DataFrame as a table.
    schemaPeople = spark.createDataFrame(people)

    # SQL can be run over DataFrames that have been registered as a table.
    teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

    # The results of SQL queries are Dataframe objects.
    # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
    teenNames = p: "Name: " +
    for name in teenNames:
    # Name: Justin
项目:spark-deep-learning    作者:databricks    | 项目源码 | 文件源码
def test_map_rows_sql_1(self):
        data = [Row(x=float(x)) for x in range(5)]
        df = self.sql.createDataFrame(data)
        with IsolatedSession() as issn:
            # The placeholder that corresponds to column 'x' as a whole column
            x = tf.placeholder(tf.double, shape=[], name="x")
            # The output that adds 3 to x
            z = tf.add(x, 3, name='z')
            # Let's register these computations in SQL.
            makeGraphUDF(issn.graph, "map_rows_sql_1", [z])

        # Here we go, for the SQL users, straight from PySpark.
        df2 = df.selectExpr("map_rows_sql_1(x) AS z")
        print("df2 = %s" % df2)
        data2 = df2.collect()
        assert data2[0].z == 3.0, data2
项目:spark-deep-learning    作者:databricks    | 项目源码 | 文件源码
def test_map_blocks_sql_1(self):
        data = [Row(x=float(x)) for x in range(5)]
        df = self.sql.createDataFrame(data)
        with IsolatedSession() as issn:
            # The placeholder that corresponds to column 'x' as a whole column
            x = tf.placeholder(tf.double, shape=[None], name="x")
            # The output that adds 3 to x
            z = tf.add(x, 3, name='z')
            # Let's register these computations in SQL.
            makeGraphUDF(issn.graph, "map_blocks_sql_1", [z], blocked=True)

        # Here we go, for the SQL users, straight from PySpark.
        df2 = df.selectExpr("map_blocks_sql_1(x) AS z")
        print("df2 = %s" % df2)
        data2 = df2.collect()
        assert len(data2) == 5, data2
        assert data2[0].z == 3.0, data2
def df_to_rdd(line):
    """Converts a pyspark ``Row`` element to numpy array 

    line : pyspark.sql.types.Row
        A line dataframe.rdd 

        .. note::

           The dataframe should contain only numbers. Also the method can be invoked on ``dataframe.rdd`` as ``dataFrame`` objects have no attribute 'map'

        A numpy array representation of the data contained in ``line``

    return np.append(line.label,np.array(line.features))
def schema_inference_example(spark):
    # $example on:schema_inferring$
    sc = spark.sparkContext

    lines = sc.textFile("/Users/xieenze/Desktop/spark/sparksql/people.txt")
    parts = l: l.split(","))
    people = p: Row(name=p[0], age=int(p[1])))

    # ??schema, ???? DataFrame ?????.
    schemaPeople = spark.createDataFrame(people)

    teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

    # ??????DF
    #p?row???( row.age)
    teenNames = p: "Name: " +
    for name in teenNames:
    # Name: Justin
    # $example off:schema_inferring$
def pivot(self, pivot_col, values=None):
        Pivots a column of the current [[DataFrame]] and perform the specified aggregation.
        There are two versions of pivot function: one that requires the caller to specify the list
        of distinct values to pivot on, and one that does not. The latter is more concise but less
        efficient, because Spark needs to first compute the list of distinct values internally.

        :param pivot_col: Name of the column to pivot.
        :param values: List of values that will be translated to columns in the output DataFrame.

        // Compute the sum of earnings for each year by course with each course as a separate column
        >>> df4.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").collect()
        [Row(year=2012, dotNET=15000, Java=20000), Row(year=2013, dotNET=48000, Java=30000)]

        // Or without specifying column values (less efficient)
        >>> df4.groupBy("year").pivot("course").sum("earnings").collect()
        [Row(year=2012, Java=20000, dotNET=15000), Row(year=2013, Java=30000, dotNET=48000)]
        if values is None:
            jgd = self._jdf.pivot(pivot_col)
            jgd = self._jdf.pivot(pivot_col, values)
项目:pyspark    作者:v-v-vishnevskiy    | 项目源码 | 文件源码
def sort(self, *cols, **kwargs):
        """Returns a new :class:`DataFrame` sorted by the specified column(s).

        :param cols: list of :class:`Column` or column names to sort by.
        :param ascending: boolean or list of boolean (default True).
            Sort ascending vs. descending. Specify list for multiple sort orders.
            If a list is specified, length of the list must equal length of the `cols`.

        >>> df.sort(df.age.desc()).collect()
        [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
        >>> df.sort("age", ascending=False).collect()
        [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
        >>> df.orderBy(df.age.desc()).collect()
        [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
        >>> from pyspark.sql.functions import *
        >>> df.sort(asc("age")).collect()
        [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
        >>> df.orderBy(desc("age"), "name").collect()
        [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
        >>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
        [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
        jdf = self._jdf.sort(self._sort_cols(cols, kwargs))
        return DataFrame(jdf, self.sql_ctx)
def head(self, n=None):
        """Returns the first ``n`` rows.

        :param n: int, default 1. Number of rows to return.
        :return: If n is greater than 1, return a list of :class:`Row`.
            If n is 1, return a single Row.

        >>> df.head()
        Row(age=2, name=u'Alice')
        >>> df.head(1)
        [Row(age=2, name=u'Alice')]
        if n is None:
            rs = self.head(1)
            return rs[0] if rs else None
项目:Frank-Kanes-Taming-Big-Data-with-Apache-Spark-and-Python    作者:PacktPublishing    | 项目源码 | 文件源码
def mapper(line):
    fields = line.split(',')
    return Row(ID=int(fields[0]), name=str(fields[1].encode("utf-8")), age=int(fields[2]), numFriends=int(fields[3]))
项目:pyspark_dist_explore    作者:Bergvca    | 项目源码 | 文件源码
def create_test_df(self):
        test_list = [(1, 2), (2, 3), (3, 4)]
        rdd =
        rdd_f = x: Row(value=x[0], value2=x[1]))
项目:pyspark_dist_explore    作者:Bergvca    | 项目源码 | 文件源码
def test_add_column_non_numeric(self):
        """Should raise an ValueError if a non-numeric column is added"""
        test_list = ['a', 'b']
        rdd =
        rdd_f = x: Row(value=x))
        spark_df = self.sqlCtx.createDataFrame(rdd_f)
        hist = Histogram()
        with self.assertRaises(ValueError):
def test_java_params(self):
        This tests a bug fixed by SPARK-18274 which causes multiple copies
        of a Params instance in Python to be linked to the same Java instance.
        evaluator = RegressionEvaluator(metricName="r2")
        df = self.spark.createDataFrame([Row(label=1.0, prediction=1.1)])
        self.assertEqual(evaluator._java_obj.getMetricName(), "r2")
        evaluatorCopy = evaluator.copy({evaluator.metricName: "mae"})
        self.assertEqual(evaluator._java_obj.getMetricName(), "r2")
        self.assertEqual(evaluatorCopy._java_obj.getMetricName(), "mae")
def test_ngram(self):
        dataset = self.spark.createDataFrame([
            Row(input=["a", "b", "c", "d", "e"])])
        ngram0 = NGram(n=4, inputCol="input", outputCol="output")
        self.assertEqual(ngram0.getN(), 4)
        self.assertEqual(ngram0.getInputCol(), "input")
        self.assertEqual(ngram0.getOutputCol(), "output")
        transformedDF = ngram0.transform(dataset)
        self.assertEqual(transformedDF.head().output, ["a b c d", "b c d e"])
def agg(self, *exprs):
        """Compute aggregates and returns the result as a :class:`DataFrame`.

        The available aggregate functions are `avg`, `max`, `min`, `sum`, `count`.

        If ``exprs`` is a single :class:`dict` mapping from string to string, then the key
        is the column to perform aggregation on, and the value is the aggregate function.

        Alternatively, ``exprs`` can also be a list of aggregate :class:`Column` expressions.

        :param exprs: a dict mapping from column name (string) to aggregate functions (string),
            or a list of :class:`Column`.

        >>> gdf = df.groupBy(
        >>> sorted(gdf.agg({"*": "count"}).collect())
        [Row(name=u'Alice', count(1)=1), Row(name=u'Bob', count(1)=1)]

        >>> from pyspark.sql import functions as F
        >>> sorted(gdf.agg(F.min(df.age)).collect())
        [Row(name=u'Alice', min(age)=2), Row(name=u'Bob', min(age)=5)]
        assert exprs, "exprs should not be empty"
        if len(exprs) == 1 and isinstance(exprs[0], dict):
            jdf = self._jgd.agg(exprs[0])
            # Columns
            assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column"
            jdf = self._jgd.agg(exprs[0]._jc,
                                _to_seq(self.sql_ctx._sc, [c._jc for c in exprs[1:]]))
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def mean(self, *cols):
        """Computes average values for each numeric columns for each group.

        :func:`mean` is an alias for :func:`avg`.

        :param cols: list of column names (string). Non-numeric columns are ignored.

        >>> df.groupBy().mean('age').collect()
        >>> df3.groupBy().mean('age', 'height').collect()
        [Row(avg(age)=3.5, avg(height)=82.5)]
def avg(self, *cols):
        """Computes average values for each numeric columns for each group.

        :func:`mean` is an alias for :func:`avg`.

        :param cols: list of column names (string). Non-numeric columns are ignored.

        >>> df.groupBy().avg('age').collect()
        >>> df3.groupBy().avg('age', 'height').collect()
        [Row(avg(age)=3.5, avg(height)=82.5)]