我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用pyspark.sql.DataFrame()。
def execute(self): """Execute SparkToGeneralFuncProcessor""" ds = ProcessManager().service(DataStore) spark_df = ds[self.readKey] if isinstance(spark_df, DataFrame): if not self.columns: self.columns = spark_df.columns spark_df = spark_df.rdd else: if not self.columns: self.log().critical('Columns are not specified for rdd') raise RuntimeError('Columns are not specified for rdd') res = spark_df.map(lambda row: (tuple([row[c] for c in self.groupby]), row)).groupByKey()\ .repartition(self.nb_partitions).mapValues(lambda group: self.generalfunc(group, self.columns, **self.function_args))\ .map(self.return_map) ds[self.storeKey] = res return StatusCode.Success
def test_gaussian_mixture_summary(self): data = [(Vectors.dense(1.0),), (Vectors.dense(5.0),), (Vectors.dense(10.0),), (Vectors.sparse(1, [], []),)] df = self.spark.createDataFrame(data, ["features"]) gmm = GaussianMixture(k=2) model = gmm.fit(df) self.assertTrue(model.hasSummary) s = model.summary self.assertTrue(isinstance(s.predictions, DataFrame)) self.assertEqual(s.probabilityCol, "probability") self.assertTrue(isinstance(s.probability, DataFrame)) self.assertEqual(s.featuresCol, "features") self.assertEqual(s.predictionCol, "prediction") self.assertTrue(isinstance(s.cluster, DataFrame)) self.assertEqual(len(s.clusterSizes), 2) self.assertEqual(s.k, 2)
def _py2java(sc, obj): """ Convert Python object into Java """ if isinstance(obj, RDD): obj = _to_java_object_rdd(obj) elif isinstance(obj, DataFrame): obj = obj._jdf elif isinstance(obj, SparkContext): obj = obj._jsc elif isinstance(obj, list): obj = [_py2java(sc, x) for x in obj] elif isinstance(obj, JavaObject): pass elif isinstance(obj, (int, long, float, bool, bytes, unicode)): pass else: data = bytearray(PickleSerializer().dumps(obj)) obj = sc._jvm.org.apache.spark.ml.python.MLSerDe.loads(data) return obj
def _py2java(sc, obj): """ Convert Python object into Java """ if isinstance(obj, RDD): obj = _to_java_object_rdd(obj) elif isinstance(obj, DataFrame): obj = obj._jdf elif isinstance(obj, SparkContext): obj = obj._jsc elif isinstance(obj, list): obj = [_py2java(sc, x) for x in obj] elif isinstance(obj, JavaObject): pass elif isinstance(obj, (int, long, float, bool, bytes, unicode)): pass else: data = bytearray(PickleSerializer().dumps(obj)) obj = sc._jvm.org.apache.spark.mllib.api.python.SerDe.loads(data) return obj
def _prepare(cls, ratings): if isinstance(ratings, RDD): pass elif isinstance(ratings, DataFrame): ratings = ratings.rdd else: raise TypeError("Ratings should be represented by either an RDD or a DataFrame, " "but got %s." % type(ratings)) first = ratings.first() if isinstance(first, Rating): pass elif isinstance(first, (tuple, list)): ratings = ratings.map(lambda x: Rating(*x)) else: raise TypeError("Expect a Rating or a tuple/list, but got %s." % type(first)) return ratings
def _py2java(sc, obj): """ Convert Python object into Java """ if isinstance(obj, RDD): obj = _to_java_object_rdd(obj) elif isinstance(obj, DataFrame): obj = obj._jdf elif isinstance(obj, SparkContext): obj = obj._jsc elif isinstance(obj, list): obj = ListConverter().convert([_py2java(sc, x) for x in obj], sc._gateway._gateway_client) elif isinstance(obj, JavaObject): pass elif isinstance(obj, (int, long, float, bool, bytes, unicode)): pass else: data = bytearray(PickleSerializer().dumps(obj)) obj = sc._jvm.SerDe.loads(data) return obj
def _dataframe(self, hash_list, pkgformat): """ Creates a DataFrame from a set of objects (identified by hashes). """ enumformat = PackageFormat(pkgformat) if enumformat is PackageFormat.HDF5: return self._read_hdf5(hash_list) elif enumformat is PackageFormat.PARQUET: parqlib = self.get_parquet_lib() if parqlib is ParquetLib.SPARK: return self._read_parquet_spark(hash_list) elif parqlib is ParquetLib.ARROW: try: return self._read_parquet_arrow(hash_list) except ValueError as err: raise PackageException(str(err)) else: assert False, "Unimplemented Parquet Library %s" % parqlib else: assert False, "Unimplemented package format: %s" % enumformat
def main(): """ 1. first put log_2.dat to hdfs path /user/hadoop/examples/log_2.dat """ log_data_file = '/user/hadoop/examples/log_2.dat' text_line = sc.textFile(log_data_file) print text_line.collect() # Creates a DataFrame having a single column named "line" # StructField(name, dataType, nullable) # ??StructType??????????????name???dataType??field??????nullable?????????null?? schema = StructType([ StructField("line", StringType(), True), ]) df = sqlContext.createDataFrame(text_line.map(lambda r: Row(r)), schema) print df.collect() # Counts all the errors errors_df = df.filter(col("line").like("%error%")) print errors_df.collect() print errors_df.count()
def __call__(self, func): # Whenever an object is evicted from the cache we want to # unpersist its contents too if it's a dataframe def eviction_callback(key, value): if isinstance(value, DataFrame): value.unpersist() lru_decorator = pylru.lrudecorator(self.maxsize) lru_decorator.cache.callback = eviction_callback @lru_decorator @functools.wraps(func) def func_and_persist(*args, **kwargs): result = func(*args, **kwargs) if isinstance(result, DataFrame): result.persist(self.storage_level) return result return func_and_persist
def test_caching(self): df = mock.MagicMock(spec=DataFrame) called = [0] @lru_cache(storage_level=StorageLevel.DISK_ONLY) def func(*args, **kwargs): called[0] += 1 return df func() df.persist.assert_called_once_with(StorageLevel.DISK_ONLY) self.assertEqual(df.unpersist.mock_calls, []) self.assertEqual(called[0], 1) cached_df = func() self.assertEqual(cached_df, df) self.assertEqual(called[0], 1)
def test_eviction(self): first_df = mock.MagicMock(spec=DataFrame) second_df = mock.MagicMock(spec=DataFrame) @lru_cache(maxsize=1, storage_level=StorageLevel.DISK_ONLY) def func(uid): if uid == 'first': return first_df else: return second_df func('first') first_df.persist.assert_called_once_with(StorageLevel.DISK_ONLY) self.assertEqual(first_df.unpersist.mock_calls, []) func('second') first_df.persist.assert_called_once_with(StorageLevel.DISK_ONLY) first_df.unpersist.assert_called_once_with() second_df.persist.assert_called_once_with(StorageLevel.DISK_ONLY) self.assertEqual(second_df.unpersist.mock_calls, [])
def _fit_java(self, dataset): """ Fits a Java model to the input dataset. :param dataset: input dataset, which is an instance of :py:class:`pyspark.sql.DataFrame` :param params: additional params (overwriting embedded values) :return: fitted Java model """ self._transfer_params_to_java() return self._java_obj.fit(dataset._jdf)
def _transform(self, dataset): self._transfer_params_to_java() return DataFrame(self._java_obj.transform(dataset._jdf), dataset.sql_ctx)
def evaluate(self, dataset): """ Evaluates the model on a test dataset. :param dataset: Test dataset to evaluate model on, where dataset is an instance of :py:class:`pyspark.sql.DataFrame` """ if not isinstance(dataset, DataFrame): raise ValueError("dataset must be a DataFrame but got %s." % type(dataset)) java_blr_summary = self._call_java("evaluate", dataset) return BinaryLogisticRegressionSummary(java_blr_summary)
def test_linear_regression_summary(self): df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)), (0.0, 2.0, Vectors.sparse(1, [], []))], ["label", "weight", "features"]) lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", weightCol="weight", fitIntercept=False) model = lr.fit(df) self.assertTrue(model.hasSummary) s = model.summary # test that api is callable and returns expected types self.assertGreater(s.totalIterations, 0) self.assertTrue(isinstance(s.predictions, DataFrame)) self.assertEqual(s.predictionCol, "prediction") self.assertEqual(s.labelCol, "label") self.assertEqual(s.featuresCol, "features") objHist = s.objectiveHistory self.assertTrue(isinstance(objHist, list) and isinstance(objHist[0], float)) self.assertAlmostEqual(s.explainedVariance, 0.25, 2) self.assertAlmostEqual(s.meanAbsoluteError, 0.0) self.assertAlmostEqual(s.meanSquaredError, 0.0) self.assertAlmostEqual(s.rootMeanSquaredError, 0.0) self.assertAlmostEqual(s.r2, 1.0, 2) self.assertTrue(isinstance(s.residuals, DataFrame)) self.assertEqual(s.numInstances, 2) devResiduals = s.devianceResiduals self.assertTrue(isinstance(devResiduals, list) and isinstance(devResiduals[0], float)) coefStdErr = s.coefficientStandardErrors self.assertTrue(isinstance(coefStdErr, list) and isinstance(coefStdErr[0], float)) tValues = s.tValues self.assertTrue(isinstance(tValues, list) and isinstance(tValues[0], float)) pValues = s.pValues self.assertTrue(isinstance(pValues, list) and isinstance(pValues[0], float)) # test evaluation (with training dataset) produces a summary with same values # one check is enough to verify a summary is returned, Scala version runs full test sameSummary = model.evaluate(df) self.assertAlmostEqual(sameSummary.explainedVariance, s.explainedVariance)
def test_glr_summary(self): from pyspark.ml.linalg import Vectors df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)), (0.0, 2.0, Vectors.sparse(1, [], []))], ["label", "weight", "features"]) glr = GeneralizedLinearRegression(family="gaussian", link="identity", weightCol="weight", fitIntercept=False) model = glr.fit(df) self.assertTrue(model.hasSummary) s = model.summary # test that api is callable and returns expected types self.assertEqual(s.numIterations, 1) # this should default to a single iteration of WLS self.assertTrue(isinstance(s.predictions, DataFrame)) self.assertEqual(s.predictionCol, "prediction") self.assertTrue(isinstance(s.residuals(), DataFrame)) self.assertTrue(isinstance(s.residuals("pearson"), DataFrame)) coefStdErr = s.coefficientStandardErrors self.assertTrue(isinstance(coefStdErr, list) and isinstance(coefStdErr[0], float)) tValues = s.tValues self.assertTrue(isinstance(tValues, list) and isinstance(tValues[0], float)) pValues = s.pValues self.assertTrue(isinstance(pValues, list) and isinstance(pValues[0], float)) self.assertEqual(s.degreesOfFreedom, 1) self.assertEqual(s.residualDegreeOfFreedom, 1) self.assertEqual(s.residualDegreeOfFreedomNull, 2) self.assertEqual(s.rank, 1) self.assertTrue(isinstance(s.solver, basestring)) self.assertTrue(isinstance(s.aic, float)) self.assertTrue(isinstance(s.deviance, float)) self.assertTrue(isinstance(s.nullDeviance, float)) self.assertTrue(isinstance(s.dispersion, float)) # test evaluation (with training dataset) produces a summary with same values # one check is enough to verify a summary is returned, Scala version runs full test sameSummary = model.evaluate(df) self.assertAlmostEqual(sameSummary.deviance, s.deviance)
def test_bisecting_kmeans_summary(self): data = [(Vectors.dense(1.0),), (Vectors.dense(5.0),), (Vectors.dense(10.0),), (Vectors.sparse(1, [], []),)] df = self.spark.createDataFrame(data, ["features"]) bkm = BisectingKMeans(k=2) model = bkm.fit(df) self.assertTrue(model.hasSummary) s = model.summary self.assertTrue(isinstance(s.predictions, DataFrame)) self.assertEqual(s.featuresCol, "features") self.assertEqual(s.predictionCol, "prediction") self.assertTrue(isinstance(s.cluster, DataFrame)) self.assertEqual(len(s.clusterSizes), 2) self.assertEqual(s.k, 2)
def test_kmeans_summary(self): data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([1.0, 1.0]),), (Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)] df = self.spark.createDataFrame(data, ["features"]) kmeans = KMeans(k=2, seed=1) model = kmeans.fit(df) self.assertTrue(model.hasSummary) s = model.summary self.assertTrue(isinstance(s.predictions, DataFrame)) self.assertEqual(s.featuresCol, "features") self.assertEqual(s.predictionCol, "prediction") self.assertTrue(isinstance(s.cluster, DataFrame)) self.assertEqual(len(s.clusterSizes), 2) self.assertEqual(s.k, 2)
def _java2py(sc, r, encoding="bytes"): if isinstance(r, JavaObject): clsName = r.getClass().getSimpleName() # convert RDD into JavaRDD if clsName != 'JavaRDD' and clsName.endswith("RDD"): r = r.toJavaRDD() clsName = 'JavaRDD' if clsName == 'JavaRDD': jrdd = sc._jvm.org.apache.spark.ml.python.MLSerDe.javaToPython(r) return RDD(jrdd, sc) if clsName == 'Dataset': return DataFrame(r, SQLContext.getOrCreate(sc)) if clsName in _picklable_classes: r = sc._jvm.org.apache.spark.ml.python.MLSerDe.dumps(r) elif isinstance(r, (JavaArray, JavaList)): try: r = sc._jvm.org.apache.spark.ml.python.MLSerDe.dumps(r) except Py4JJavaError: pass # not pickable if isinstance(r, (bytearray, bytes)): r = PickleSerializer().loads(bytes(r), encoding=encoding) return r
def numInstances(self): """ Number of instances in DataFrame predictions """ return self._call_java("numInstances")
def evaluate(self, dataset): """ Evaluates the model on a test dataset. :param dataset: Test dataset to evaluate model on, where dataset is an instance of :py:class:`pyspark.sql.DataFrame` """ if not isinstance(dataset, DataFrame): raise ValueError("dataset must be a DataFrame but got %s." % type(dataset)) java_glr_summary = self._call_java("evaluate", dataset) return GeneralizedLinearRegressionSummary(java_glr_summary)
def _java2py(sc, r, encoding="bytes"): if isinstance(r, JavaObject): clsName = r.getClass().getSimpleName() # convert RDD into JavaRDD if clsName != 'JavaRDD' and clsName.endswith("RDD"): r = r.toJavaRDD() clsName = 'JavaRDD' if clsName == 'JavaRDD': jrdd = sc._jvm.org.apache.spark.mllib.api.python.SerDe.javaToPython(r) return RDD(jrdd, sc) if clsName == 'Dataset': return DataFrame(r, SQLContext.getOrCreate(sc)) if clsName in _picklable_classes: r = sc._jvm.org.apache.spark.mllib.api.python.SerDe.dumps(r) elif isinstance(r, (JavaArray, JavaList)): try: r = sc._jvm.org.apache.spark.mllib.api.python.SerDe.dumps(r) except Py4JJavaError: pass # not pickable if isinstance(r, (bytearray, bytes)): r = PickleSerializer().loads(bytes(r), encoding=encoding) return r
def convertVectorColumnsToML(dataset, *cols): """ Converts vector columns in an input DataFrame from the :py:class:`pyspark.mllib.linalg.Vector` type to the new :py:class:`pyspark.ml.linalg.Vector` type under the `spark.ml` package. :param dataset: input dataset :param cols: a list of vector columns to be converted. New vector columns will be ignored. If unspecified, all old vector columns will be converted excepted nested ones. :return: the input dataset with old vector columns converted to the new vector type >>> import pyspark >>> from pyspark.mllib.linalg import Vectors >>> from pyspark.mllib.util import MLUtils >>> df = spark.createDataFrame( ... [(0, Vectors.sparse(2, [1], [1.0]), Vectors.dense(2.0, 3.0))], ... ["id", "x", "y"]) >>> r1 = MLUtils.convertVectorColumnsToML(df).first() >>> isinstance(r1.x, pyspark.ml.linalg.SparseVector) True >>> isinstance(r1.y, pyspark.ml.linalg.DenseVector) True >>> r2 = MLUtils.convertVectorColumnsToML(df, "x").first() >>> isinstance(r2.x, pyspark.ml.linalg.SparseVector) True >>> isinstance(r2.y, pyspark.mllib.linalg.DenseVector) True """ if not isinstance(dataset, DataFrame): raise TypeError("Input dataset must be a DataFrame but got {}.".format(type(dataset))) return callMLlibFunc("convertVectorColumnsToML", dataset, list(cols))
def convertMatrixColumnsToML(dataset, *cols): """ Converts matrix columns in an input DataFrame from the :py:class:`pyspark.mllib.linalg.Matrix` type to the new :py:class:`pyspark.ml.linalg.Matrix` type under the `spark.ml` package. :param dataset: input dataset :param cols: a list of matrix columns to be converted. New matrix columns will be ignored. If unspecified, all old matrix columns will be converted excepted nested ones. :return: the input dataset with old matrix columns converted to the new matrix type >>> import pyspark >>> from pyspark.mllib.linalg import Matrices >>> from pyspark.mllib.util import MLUtils >>> df = spark.createDataFrame( ... [(0, Matrices.sparse(2, 2, [0, 2, 3], [0, 1, 1], [2, 3, 4]), ... Matrices.dense(2, 2, range(4)))], ["id", "x", "y"]) >>> r1 = MLUtils.convertMatrixColumnsToML(df).first() >>> isinstance(r1.x, pyspark.ml.linalg.SparseMatrix) True >>> isinstance(r1.y, pyspark.ml.linalg.DenseMatrix) True >>> r2 = MLUtils.convertMatrixColumnsToML(df, "x").first() >>> isinstance(r2.x, pyspark.ml.linalg.SparseMatrix) True >>> isinstance(r2.y, pyspark.mllib.linalg.DenseMatrix) True """ if not isinstance(dataset, DataFrame): raise TypeError("Input dataset must be a DataFrame but got {}.".format(type(dataset))) return callMLlibFunc("convertMatrixColumnsToML", dataset, list(cols))
def convertMatrixColumnsFromML(dataset, *cols): """ Converts matrix columns in an input DataFrame to the :py:class:`pyspark.mllib.linalg.Matrix` type from the new :py:class:`pyspark.ml.linalg.Matrix` type under the `spark.ml` package. :param dataset: input dataset :param cols: a list of matrix columns to be converted. Old matrix columns will be ignored. If unspecified, all new matrix columns will be converted except nested ones. :return: the input dataset with new matrix columns converted to the old matrix type >>> import pyspark >>> from pyspark.ml.linalg import Matrices >>> from pyspark.mllib.util import MLUtils >>> df = spark.createDataFrame( ... [(0, Matrices.sparse(2, 2, [0, 2, 3], [0, 1, 1], [2, 3, 4]), ... Matrices.dense(2, 2, range(4)))], ["id", "x", "y"]) >>> r1 = MLUtils.convertMatrixColumnsFromML(df).first() >>> isinstance(r1.x, pyspark.mllib.linalg.SparseMatrix) True >>> isinstance(r1.y, pyspark.mllib.linalg.DenseMatrix) True >>> r2 = MLUtils.convertMatrixColumnsFromML(df, "x").first() >>> isinstance(r2.x, pyspark.mllib.linalg.SparseMatrix) True >>> isinstance(r2.y, pyspark.ml.linalg.DenseMatrix) True """ if not isinstance(dataset, DataFrame): raise TypeError("Input dataset must be a DataFrame but got {}.".format(type(dataset))) return callMLlibFunc("convertMatrixColumnsFromML", dataset, list(cols))
def __init__(self, rdd, sqlCtx): if not isinstance(rdd, _pssql.DataFrame): raise Exception("PivotTable requires a DataFrame, " "please create one using DataFrameFactory") self.rdd = rdd self.sqlCtx = sqlCtx
def create(self, index, columns, values, aggregates): """ Creates a PivotTable from a SchemaRDD or DataFrame Parameters: ----------- :param index: a column or list of columns. Keys to group by on the pivot table index. :param columns: a column or list of them. Keys to group by on the pivot table column. :param values: columns to aggregate. :param aggregates: function with which to aggregate. Options: 'AVG','MAX','SUM','COUNT','MIN'. Note only one type of aggregation can be performed at one time Example usage: -------------- >>> import pandas as pd >>> pandas_df = pd.read_csv('./examples/iris.csv') >>> spark_df = sqlContext.createDataFrame(pandas_df) >>> from PySparkPivot import PivotTableFactory >>> piv = PivotTableFactory(spark_df, sqlContext) >>> pivot_table = piv.create(index=['Target'], columns=['Colour'], ... values=['SepLength'], aggregates=['AVG']) >>> pivot_table.show() Returns: --------- :return table: PySpark DataFrame """ table = PivotTable(index, columns, values, aggregates, self.rdd, self.sqlCtx) return self.sqlCtx.createDataFrame(table.transform(), table.schema())
def create(self, rdd): ''' Create the PySpark DataFrame from an RDD :param rdd: RDD for convertsion :return table: DataFrame ''' return self.sqlCtx.createDataFrame(self.parser.parse(rdd))
def toImage(self, array, origin=""): """ Converts an array with metadata to a two-dimensional image. :param `numpy.ndarray` array: The array to convert to image. :param str origin: Path to the image, optional. :return: a :class:`Row` that is a two dimensional image. .. versionadded:: 2.3.0 """ if not isinstance(array, np.ndarray): raise TypeError( "array argument should be numpy.ndarray; however, it got [%s]." % type(array)) if array.ndim != 3: raise ValueError("Invalid array shape") height, width, nChannels = array.shape ocvTypes = ImageSchema.ocvTypes if nChannels == 1: mode = ocvTypes["CV_8UC1"] elif nChannels == 3: mode = ocvTypes["CV_8UC3"] elif nChannels == 4: mode = ocvTypes["CV_8UC4"] else: raise ValueError("Invalid number of channels") # Running `bytearray(numpy.array([1]))` fails in specific Python versions # with a specific Numpy version, for example in Python 3.6.0 and NumPy 1.13.3. # Here, it avoids it by converting it to bytes. data = bytearray(array.astype(dtype=np.uint8).ravel().tobytes()) # Creating new Row with _create_row(), because Row(name = value, ... ) # orders fields by name, which conflicts with expected schema order # when the new DataFrame is created by UDF return _create_row(self.imageFields, [origin, height, width, nChannels, mode, data])
def readImages(self, path, recursive=False, numPartitions=-1, dropImageFailures=False, sampleRatio=1.0, seed=0): """ Reads the directory of images from the local or remote source. .. note:: If multiple jobs are run in parallel with different sampleRatio or recursive flag, there may be a race condition where one job overwrites the hadoop configs of another. .. note:: If sample ratio is less than 1, sampling uses a PathFilter that is efficient but potentially non-deterministic. :param str path: Path to the image directory. :param bool recursive: Recursive search flag. :param int numPartitions: Number of DataFrame partitions. :param bool dropImageFailures: Drop the files that are not valid images. :param float sampleRatio: Fraction of the images loaded. :param int seed: Random number seed. :return: a :class:`DataFrame` with a single column of "images", see ImageSchema for details. >>> df = ImageSchema.readImages('python/test_support/image/kittens', recursive=True) >>> df.count() 4 .. versionadded:: 2.3.0 """ ctx = SparkContext._active_spark_context spark = SparkSession(ctx) image_schema = ctx._jvm.org.apache.spark.ml.image.ImageSchema jsession = spark._jsparkSession jresult = image_schema.readImages(path, jsession, recursive, numPartitions, dropImageFailures, float(sampleRatio), seed) return DataFrame(jresult, spark._wrapped)
def generate_code(self): code = u""" from pyspark.sql import DataFrame try: ext_pkg = spark_session._jvm.br.ufmg.dcc.lemonade.ext.fpm associative_impl = ext_pkg.LemonadeAssociativeRules() except TypeError as te: if 'JavaPackage' in te.message: raise ValueError('{required_pkg}') else: raise items = {input} java_df = associative_impl.run(spark_session._jsparkSession, items._jdf, {confidence}, '{attr}', '{freq}') {output} = DataFrame(java_df, spark_session) """.format(input=self.named_inputs['input data'], confidence=self.confidence, output=self.output, rules_count=self.rules_count, attr=self.attribute[0], freq=self.freq[0], task_id=self.parameters['task']['id'], model_trained=_('Model trained'), required_pkg=_('Required Lemonade Spark Extensions ' 'not found in CLASSPATH.') ) return dedent(code)
def generate_code(self): code = dedent(u""" from pyspark.sql import DataFrame try: ext_pkg = spark_session._jvm.br.ufmg.dcc.lemonade.ext.fpm prefix_span_impl = ext_pkg.LemonadePrefixSpan() except TypeError as te: if 'JavaPackage' in te.message: raise ValueError('{required_pkg}') else: raise sequences = {input} java_df = prefix_span_impl.run(spark_session._jsparkSession, sequences._jdf, {min_support}, {max_length}, '{attr}', '{freq}') {output} = DataFrame(java_df, spark_session) """.format(input=self.named_inputs['input data'], min_support=self.min_support, max_length=self.max_length, output=self.output, attr=self.attribute[0], freq=self.freq[0], required_pkg=_('Required Lemonade Spark Extensions ' 'not found in CLASSPATH.') )) return code
def cache(f): """ Decorator that caches the function's return value, so cached RDDs, DataFrames, and other objects can be shared between calls to tasks. """ @wraps(f) def wrapper(self, *args, **kwargs): with _data_lock: try: self._cache except AttributeError: self._cache = {} # function call key adapted from http://stackoverflow.com/a/10220908/1236542 key = (f,) + args + (_kwd_mark,) + tuple(sorted(kwargs.items())) if key in self._cache: return self._cache[key] else: from pyspark.rdd import RDD from pyspark.sql import DataFrame result = f(self, *args, **kwargs) self._cache[key] = result if isinstance(result, RDD): st = result.getStorageLevel() if not st.useDisk and not st.useMemory and not st.useOffHeap: raise ValueError('An RDD returned by a @cache function should be persisted with .cache() or .persist().') elif isinstance(result, DataFrame): st = result.storageLevel if not st.useDisk and not st.useMemory and not st.useOffHeap: raise ValueError('A DataFrame returned by a @cache function should be persisted with .cache() or .persist().') return result return wrapper
def _java2py(sc, r, encoding="bytes"): if isinstance(r, JavaObject): clsName = r.getClass().getSimpleName() # convert RDD into JavaRDD if clsName != 'JavaRDD' and clsName.endswith("RDD"): r = r.toJavaRDD() clsName = 'JavaRDD' if clsName == 'JavaRDD': jrdd = sc._jvm.SerDe.javaToPython(r) return RDD(jrdd, sc) if clsName == 'DataFrame': return DataFrame(r, SQLContext.getOrCreate(sc)) if clsName in _picklable_classes: r = sc._jvm.SerDe.dumps(r) elif isinstance(r, (JavaArray, JavaList)): try: r = sc._jvm.SerDe.dumps(r) except Py4JJavaError: pass # not pickable if isinstance(r, (bytearray, bytes)): r = PickleSerializer().loads(bytes(r), encoding=encoding) return r
def save_cached_df(self, hashes, name, path, ext, target, fmt): """ Save a DataFrame to the store. """ buildfile = name.lstrip('/').replace('/', '.') self._add_to_contents(buildfile, hashes, ext, path, target, fmt)
def setUp(self): self.df = mock.Mock(spec=DataFrame) self.df.sql_ctx = mock.Mock(spec=SQLContext) self.df.sql_ctx.sparkSession = mock.Mock(spec=SparklySession) self.write_ext = SparklyWriter(self.df)
def save_df(self, dataframe, name, path, ext, target, fmt): """ Save a DataFrame to the store. """ enumformat = PackageFormat(fmt) buildfile = name.lstrip('/').replace('/', '.') storepath = self._store.temporary_object_path(buildfile) # Serialize DataFrame to chosen format if enumformat is PackageFormat.PARQUET: # switch parquet lib parqlib = self.get_parquet_lib() if isinstance(dataframe, pd.DataFrame): #parqlib is ParquetLib.ARROW: # other parquet libs are deprecated, remove? import pyarrow as pa from pyarrow import parquet table = pa.Table.from_pandas(dataframe) parquet.write_table(table, storepath) elif parqlib is ParquetLib.SPARK: from pyspark import sql as sparksql assert isinstance(dataframe, sparksql.DataFrame) dataframe.write.parquet(storepath) else: assert False, "Unimplemented ParquetLib %s" % parqlib else: assert False, "Unimplemented PackageFormat %s" % enumformat # Move serialized DataFrame to object store if os.path.isdir(storepath): # Pyspark hashes = [] files = [ofile for ofile in os.listdir(storepath) if ofile.endswith(".parquet")] for obj in files: path = os.path.join(storepath, obj) objhash = digest_file(path) move(path, self._store.object_path(objhash)) hashes.append(objhash) self._add_to_contents(buildfile, hashes, ext, path, target, fmt) rmtree(storepath) return hashes else: filehash = digest_file(storepath) self._add_to_contents(buildfile, [filehash], ext, path, target, fmt) move(storepath, self._store.object_path(filehash)) return [filehash]