我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pyspark.RDD。
def test_esk609(self): """Test Esk-609: Map data-frame groups""" # run Eskapade self.run_eskapade('esk609_map_df_groups.py') proc_mgr = ProcessManager() ds = proc_mgr.service(DataStore) # check input data for key in ('map_rdd', 'flat_map_rdd'): self.assertIn(key, ds, 'no data found with key "{}"'.format(key)) self.assertIsInstance(ds[key], pyspark.RDD, 'object "{0:s}" is not an RDD (type "{1:s}")'.format(key, str(type(ds[key])))) # sums of "bar" variable bar_sums = [(0, 27.5), (1, 77.5), (2, 127.5), (3, 177.5), (4, 227.5), (5, 277.5), (6, 327.5), (7, 377.5), (8, 427.5), (9, 477.5)] flmap_rows = [(it, 'foo{:d}'.format(it), (it + 1) / 2., bar_sums[it // 10][1]) for it in range(100)] # check mapped data frames self.assertListEqual(sorted(ds['map_rdd'].collect()), bar_sums, 'unexpected values in "map_rdd"') self.assertListEqual(sorted(ds['flat_map_rdd'].collect()), flmap_rows, 'unexpected values in "flat_map_rdd"')
def create_python_rdd(jrdd, serializer): """Creates a Python RDD from a RDD from Scala. Args: jrdd (org.apache.spark.api.java.JavaRDD): The RDD that came from Scala. serializer (:class:`~geopyspark.AvroSerializer` or pyspark.serializers.AutoBatchedSerializer(AvroSerializer)): An instance of ``AvroSerializer`` that is either alone, or wrapped by ``AutoBatchedSerializer``. Returns: RDD """ pysc = get_spark_context() if isinstance(serializer, AutoBatchedSerializer): return RDD(jrdd, pysc, serializer) else: return RDD(jrdd, pysc, AutoBatchedSerializer(serializer))
def pprint(self, num=10): """ Print the first num elements of each RDD generated in this DStream. @param num: the number of elements from the first will be printed. """ def takeAndPrint(time, rdd): taken = rdd.take(num + 1) print("-------------------------------------------") print("Time: %s" % time) print("-------------------------------------------") for record in taken[:num]: print(record) if len(taken) > num: print("...") print("") self.foreachRDD(takeAndPrint)
def window(self, windowDuration, slideDuration=None): """ Return a new DStream in which each RDD contains all the elements in seen in a sliding window of time over this DStream. @param windowDuration: width of the window; must be a multiple of this DStream's batching interval @param slideDuration: sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval """ self._validate_window_param(windowDuration, slideDuration) d = self._ssc._jduration(windowDuration) if slideDuration is None: return DStream(self._jdstream.window(d), self._ssc, self._jrdd_deserializer) s = self._ssc._jduration(slideDuration) return DStream(self._jdstream.window(d, s), self._ssc, self._jrdd_deserializer)
def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None): """ Return a new DStream in which each RDD contains the count of distinct elements in RDDs in a sliding window over this DStream. @param windowDuration: width of the window; must be a multiple of this DStream's batching interval @param slideDuration: sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval @param numPartitions: number of partitions of each RDD in the new DStream. """ keyed = self.map(lambda x: (x, 1)) counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub, windowDuration, slideDuration, numPartitions) return counted.filter(lambda kv: kv[1] > 0)
def _py2java(sc, obj): """ Convert Python object into Java """ if isinstance(obj, RDD): obj = _to_java_object_rdd(obj) elif isinstance(obj, DataFrame): obj = obj._jdf elif isinstance(obj, SparkContext): obj = obj._jsc elif isinstance(obj, list): obj = [_py2java(sc, x) for x in obj] elif isinstance(obj, JavaObject): pass elif isinstance(obj, (int, long, float, bool, bytes, unicode)): pass else: data = bytearray(PickleSerializer().dumps(obj)) obj = sc._jvm.org.apache.spark.ml.python.MLSerDe.loads(data) return obj
def train(cls, data, lambda_=1.0): """ Train a Naive Bayes model given an RDD of (label, features) vectors. This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which can handle all kinds of discrete data. For example, by converting documents into TF-IDF vectors, it can be used for document classification. By making every vector a 0-1 vector, it can also be used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}). The input feature values must be nonnegative. :param data: RDD of LabeledPoint. :param lambda_: The smoothing parameter. (default: 1.0) """ first = data.first() if not isinstance(first, LabeledPoint): raise ValueError("`data` should be an RDD of LabeledPoint") labels, pi, theta = callMLlibFunc("trainNaiveBayesModel", data, lambda_) return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
def predict(self, x): """ Predict the label of one or more examples. .. note:: In Python, predict cannot currently be used within an RDD transformation or action. Call predict directly on the RDD instead. :param x: Data point (feature vector), or an RDD of data points (feature vectors). """ if isinstance(x, RDD): return self.call("predict", x.map(_convert_to_vector)) else: return self.call("predict", _convert_to_vector(x))
def _py2java(sc, obj): """ Convert Python object into Java """ if isinstance(obj, RDD): obj = _to_java_object_rdd(obj) elif isinstance(obj, DataFrame): obj = obj._jdf elif isinstance(obj, SparkContext): obj = obj._jsc elif isinstance(obj, list): obj = [_py2java(sc, x) for x in obj] elif isinstance(obj, JavaObject): pass elif isinstance(obj, (int, long, float, bool, bytes, unicode)): pass else: data = bytearray(PickleSerializer().dumps(obj)) obj = sc._jvm.org.apache.spark.mllib.api.python.SerDe.loads(data) return obj
def predict(self, x): """ Predict labels for provided features. Using a piecewise linear function. 1) If x exactly matches a boundary then associated prediction is returned. In case there are multiple predictions with the same boundary then one of them is returned. Which one is undefined (same as java.util.Arrays.binarySearch). 2) If x is lower or higher than all boundaries then first or last prediction is returned respectively. In case there are multiple predictions with the same boundary then the lowest or highest is returned respectively. 3) If x falls between two values in boundary array then prediction is treated as piecewise linear function and interpolated value is returned. In case there are multiple values with the same boundary then the same rules as in 2) are used. :param x: Feature or RDD of Features to be labeled. """ if isinstance(x, RDD): return x.map(lambda v: self.predict(v)) return np.interp(x, self.boundaries, self.predictions)
def rows(self): """ Rows of the IndexedRowMatrix stored as an RDD of IndexedRows. >>> mat = IndexedRowMatrix(sc.parallelize([IndexedRow(0, [1, 2, 3]), ... IndexedRow(1, [4, 5, 6])])) >>> rows = mat.rows >>> rows.first() IndexedRow(0, [1.0,2.0,3.0]) """ # We use DataFrames for serialization of IndexedRows from # Java, so we first convert the RDD of rows to a DataFrame # on the Scala/Java side. Then we map each Row in the # DataFrame back to an IndexedRow on this side. rows_df = callMLlibFunc("getIndexedRows", self._java_matrix_wrapper._java_model) rows = rows_df.rdd.map(lambda row: IndexedRow(row[0], row[1])) return rows
def entries(self): """ Entries of the CoordinateMatrix stored as an RDD of MatrixEntries. >>> mat = CoordinateMatrix(sc.parallelize([MatrixEntry(0, 0, 1.2), ... MatrixEntry(6, 4, 2.1)])) >>> entries = mat.entries >>> entries.first() MatrixEntry(0, 0, 1.2) """ # We use DataFrames for serialization of MatrixEntry entries # from Java, so we first convert the RDD of entries to a # DataFrame on the Scala/Java side. Then we map each Row in # the DataFrame back to a MatrixEntry on this side. entries_df = callMLlibFunc("getMatrixEntries", self._java_matrix_wrapper._java_model) entries = entries_df.rdd.map(lambda row: MatrixEntry(row[0], row[1], row[2])) return entries
def blocks(self): """ The RDD of sub-matrix blocks ((blockRowIndex, blockColIndex), sub-matrix) that form this distributed matrix. >>> mat = BlockMatrix( ... sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]), 3, 2) >>> blocks = mat.blocks >>> blocks.first() ((0, 0), DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0)) """ # We use DataFrames for serialization of sub-matrix blocks # from Java, so we first convert the RDD of blocks to a # DataFrame on the Scala/Java side. Then we map each Row in # the DataFrame back to a sub-matrix block on this side. blocks_df = callMLlibFunc("getMatrixBlocks", self._java_matrix_wrapper._java_model) blocks = blocks_df.rdd.map(lambda row: ((row[0][0], row[0][1]), row[1])) return blocks
def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration): """ Return a new DStream in which each RDD has a single element generated by reducing all elements in a sliding window over this DStream. if `invReduceFunc` is not None, the reduction is done incrementally using the old window's reduced value : 1. reduce the new values that entered the window (e.g., adding new counts) 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) This is more efficient than `invReduceFunc` is None. @param reduceFunc: associative reduce function @param invReduceFunc: inverse reduce function of `reduceFunc` @param windowDuration: width of the window; must be a multiple of this DStream's batching interval @param slideDuration: sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval """ keyed = self.map(lambda x: (1, x)) reduced = keyed.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) return reduced.map(lambda kv: kv[1])
def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None): """ Return a new DStream in which each RDD contains the count of distinct elements in RDDs in a sliding window over this DStream. @param windowDuration: width of the window; must be a multiple of this DStream's batching interval @param slideDuration: sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval @param numPartitions: number of partitions of each RDD in the new DStream. """ keyed = self.map(lambda x: (x, 1)) counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub, windowDuration, slideDuration, numPartitions) return counted.filter(lambda kv: kv[1] > 0).count()
def train(cls, data, lambda_=1.0): """ Train a Naive Bayes model given an RDD of (label, features) vectors. This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which can handle all kinds of discrete data. For example, by converting documents into TF-IDF vectors, it can be used for document classification. By making every vector a 0-1 vector, it can also be used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}). The input feature values must be nonnegative. :param data: RDD of LabeledPoint. :param lambda_: The smoothing parameter (default: 1.0). """ first = data.first() if not isinstance(first, LabeledPoint): raise ValueError("`data` should be an RDD of LabeledPoint") labels, pi, theta = callMLlibFunc("trainNaiveBayesModel", data, lambda_) return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
def predict(self, x): """ Predict the label of one or more examples. Note: In Python, predict cannot currently be used within an RDD transformation or action. Call predict directly on the RDD instead. :param x: Data point (feature vector), or an RDD of data points (feature vectors). """ if isinstance(x, RDD): return self.call("predict", x.map(_convert_to_vector)) else: return self.call("predict", _convert_to_vector(x))
def _py2java(sc, obj): """ Convert Python object into Java """ if isinstance(obj, RDD): obj = _to_java_object_rdd(obj) elif isinstance(obj, DataFrame): obj = obj._jdf elif isinstance(obj, SparkContext): obj = obj._jsc elif isinstance(obj, list): obj = ListConverter().convert([_py2java(sc, x) for x in obj], sc._gateway._gateway_client) elif isinstance(obj, JavaObject): pass elif isinstance(obj, (int, long, float, bool, bytes, unicode)): pass else: data = bytearray(PickleSerializer().dumps(obj)) obj = sc._jvm.SerDe.loads(data) return obj
def entries(self): """ Entries of the CoordinateMatrix stored as an RDD of MatrixEntries. >>> mat = CoordinateMatrix(sc.parallelize([MatrixEntry(0, 0, 1.2), ... MatrixEntry(6, 4, 2.1)])) >>> entries = mat.entries >>> entries.first() MatrixEntry(0, 0, 1.2) """ # We use DataFrames for serialization of MatrixEntry entries # from Java, so we first convert the RDD of entries to a # DataFrame on the Scala/Java side. Then we map each Row in # the DataFrame back to a MatrixEntry on this side. entries_df = callMLlibFunc("getMatrixEntries", self._java_matrix_wrapper._java_model) entries = entries_df.map(lambda row: MatrixEntry(row[0], row[1], row[2])) return entries
def blocks(self): """ The RDD of sub-matrix blocks ((blockRowIndex, blockColIndex), sub-matrix) that form this distributed matrix. >>> mat = BlockMatrix( ... sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), ... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]), 3, 2) >>> blocks = mat.blocks >>> blocks.first() ((0, 0), DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0)) """ # We use DataFrames for serialization of sub-matrix blocks # from Java, so we first convert the RDD of blocks to a # DataFrame on the Scala/Java side. Then we map each Row in # the DataFrame back to a sub-matrix block on this side. blocks_df = callMLlibFunc("getMatrixBlocks", self._java_matrix_wrapper._java_model) blocks = blocks_df.map(lambda row: ((row[0][0], row[0][1]), row[1])) return blocks
def execute(self): """Execute RddGroupMapper""" # get process manager and data store proc_mgr = ProcessManager() ds = proc_mgr.service(DataStore) # fetch data frame from data store if self.read_key not in ds: raise KeyError('no input data found in data store with key "{}"'.format(self.read_key)) data = ds[self.read_key] if not isinstance(data, pyspark.RDD): raise TypeError('expected a Spark RDD for "{0:s}" (got "{1:s}")'.format(self.read_key, str(type(data)))) # apply input map if self.input_map: data = data.map(self.input_map) # group data by keys in the data data = data.groupByKey(numPartitions=self.num_group_partitions) # apply map on group values if self.flatten_output_groups: data = data.flatMapValues(self.group_map) else: data = data.mapValues(self.group_map) # apply map on result if self.result_map: data = data.map(self.result_map) # store data in data store ds[self.store_key] = data return StatusCode.Success
def test_esk606(self): """Test Esk-606: Convert Spark data frame""" # run Eskapade self.run_eskapade('esk606_convert_spark_df.py') proc_mgr = ProcessManager() ds = proc_mgr.service(DataStore) # define types of stored data sets data_types = {'df': pyspark.sql.DataFrame, 'rdd': pyspark.RDD, 'list': list, 'pd': pd.DataFrame} # define functions to obtain data-frame content content_funcs = {'df': lambda d: sorted(d.rdd.map(tuple).collect()), 'rdd': lambda d: sorted(d.collect()), 'list': lambda d: sorted(d), 'pd': lambda d: sorted(map(tuple, d.values))} # check input data self.assertIn('df', ds, 'no data found with key "df"') self.assertIsInstance(ds['df'], pyspark.sql.DataFrame, 'unexpected type for input data frame') # check created data sets rows = [(it, 'foo{:d}'.format(it), (it + 1) / 2.) for it in range(20, 100)] for key, dtype in data_types.items(): # check content dkey = '{}_output'.format(key) self.assertIn(dkey, ds, 'no data found with key "{}"'.format(dkey)) self.assertIsInstance(ds[dkey], dtype, 'unexpected type for "{}" data'.format(key)) self.assertListEqual(content_funcs[key](ds[dkey]), rows, 'unexpected content for "{}" data'.format(key)) # check schema skey = '{}_schema'.format(key) self.assertIn(skey, ds, 'no schema found with key {}'.format(skey)) self.assertListEqual(list(ds[skey]), list(ds['df'].schema), 'unexpected schema for "{}" data'.format(key))
def queueStream(self, rdds, oneAtATime=True, default=None): """ Create an input stream from an queue of RDDs or list. In each batch, it will process either one or all of the RDDs returned by the queue. .. note:: Changes to the queue after the stream is created will not be recognized. @param rdds: Queue of RDDs @param oneAtATime: pick one rdd each time or pick all of them once. @param default: The default rdd if no more in rdds """ if default and not isinstance(default, RDD): default = self._sc.parallelize(default) if not rdds and default: rdds = [rdds] if rdds and not isinstance(rdds[0], RDD): rdds = [self._sc.parallelize(input) for input in rdds] self._check_serializers(rdds) queue = self._jvm.PythonDStream.toRDDQueue([r._jrdd for r in rdds]) if default: default = default._reserialize(rdds[0]._jrdd_deserializer) jdstream = self._jssc.queueStream(queue, oneAtATime, default._jrdd) else: jdstream = self._jssc.queueStream(queue, oneAtATime) return DStream(jdstream, self, rdds[0]._jrdd_deserializer)
def transform(self, dstreams, transformFunc): """ Create a new DStream in which each RDD is generated by applying a function on RDDs of the DStreams. The order of the JavaRDDs in the transform function parameter will be the same as the order of corresponding DStreams in the list. """ jdstreams = [d._jdstream for d in dstreams] # change the final serializer to sc.serializer func = TransformFunction(self._sc, lambda t, *rdds: transformFunc(rdds).map(lambda x: x), *[d._jrdd_deserializer for d in dstreams]) jfunc = self._jvm.TransformFunction(func) jdstream = self._jssc.transform(jdstreams, jfunc) return DStream(jdstream, self, self._sc.serializer)
def count(self): """ Return a new DStream in which each RDD has a single element generated by counting each RDD of this DStream. """ return self.mapPartitions(lambda i: [sum(1 for _ in i)]).reduce(operator.add)
def mapPartitions(self, f, preservesPartitioning=False): """ Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs of this DStream. """ def func(s, iterator): return f(iterator) return self.mapPartitionsWithIndex(func, preservesPartitioning)
def mapPartitionsWithIndex(self, f, preservesPartitioning=False): """ Return a new DStream in which each RDD is generated by applying mapPartitionsWithIndex() to each RDDs of this DStream. """ return self.transform(lambda rdd: rdd.mapPartitionsWithIndex(f, preservesPartitioning))
def reduceByKey(self, func, numPartitions=None): """ Return a new DStream by applying reduceByKey to each RDD. """ if numPartitions is None: numPartitions = self._sc.defaultParallelism return self.combineByKey(lambda x: x, func, func, numPartitions)
def combineByKey(self, createCombiner, mergeValue, mergeCombiners, numPartitions=None): """ Return a new DStream by applying combineByKey to each RDD. """ if numPartitions is None: numPartitions = self._sc.defaultParallelism def func(rdd): return rdd.combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions) return self.transform(func)
def partitionBy(self, numPartitions, partitionFunc=portable_hash): """ Return a copy of the DStream in which each RDD are partitioned using the specified partitioner. """ return self.transform(lambda rdd: rdd.partitionBy(numPartitions, partitionFunc))
def foreachRDD(self, func): """ Apply a function to each RDD in this DStream. """ if func.__code__.co_argcount == 1: old_func = func func = lambda t, rdd: old_func(rdd) jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer) api = self._ssc._jvm.PythonDStream api.callForeachRDD(self._jdstream, jfunc)
def checkpoint(self, interval): """ Enable periodic checkpointing of RDDs of this DStream @param interval: time in seconds, after each period of that, generated RDD will be checkpointed """ self.is_checkpointed = True self._jdstream.checkpoint(self._ssc._jduration(interval)) return self
def groupByKey(self, numPartitions=None): """ Return a new DStream by applying groupByKey on each RDD. """ if numPartitions is None: numPartitions = self._sc.defaultParallelism return self.transform(lambda rdd: rdd.groupByKey(numPartitions))
def countByValue(self): """ Return a new DStream in which each RDD contains the counts of each distinct value in each RDD of this DStream. """ return self.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y)
def saveAsTextFiles(self, prefix, suffix=None): """ Save each RDD in this DStream as at text file, using string representation of elements. """ def saveAsTextFile(t, rdd): path = rddToFileName(prefix, suffix, t) try: rdd.saveAsTextFile(path) except Py4JJavaError as e: # after recovered from checkpointing, the foreachRDD may # be called twice if 'FileAlreadyExistsException' not in str(e): raise return self.foreachRDD(saveAsTextFile) # TODO: uncomment this until we have ssc.pickleFileStream() # def saveAsPickleFiles(self, prefix, suffix=None): # """ # Save each RDD in this DStream as at binary file, the elements are # serialized by pickle. # """ # def saveAsPickleFile(t, rdd): # path = rddToFileName(prefix, suffix, t) # try: # rdd.saveAsPickleFile(path) # except Py4JJavaError as e: # # after recovered from checkpointing, the foreachRDD may # # be called twice # if 'FileAlreadyExistsException' not in str(e): # raise # return self.foreachRDD(saveAsPickleFile)
def transform(self, func): """ Return a new DStream in which each RDD is generated by applying a function on each RDD of this DStream. `func` can have one argument of `rdd`, or have two arguments of (`time`, `rdd`) """ if func.__code__.co_argcount == 1: oldfunc = func func = lambda t, rdd: oldfunc(rdd) assert func.__code__.co_argcount == 2, "func should take one or two arguments" return TransformedDStream(self, func)
def slice(self, begin, end): """ Return all the RDDs between 'begin' to 'end' (both included) `begin`, `end` could be datetime.datetime() or unix_timestamp """ jrdds = self._jdstream.slice(self._jtime(begin), self._jtime(end)) return [RDD(jrdd, self._sc, self._jrdd_deserializer) for jrdd in jrdds]
def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration): """ Return a new DStream in which each RDD has a single element generated by reducing all elements in a sliding window over this DStream. if `invReduceFunc` is not None, the reduction is done incrementally using the old window's reduced value : 1. reduce the new values that entered the window (e.g., adding new counts) 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) This is more efficient than `invReduceFunc` is None. @param reduceFunc: associative and commutative reduce function @param invReduceFunc: inverse reduce function of `reduceFunc`; such that for all y, and invertible x: `invReduceFunc(reduceFunc(x, y), x) = y` @param windowDuration: width of the window; must be a multiple of this DStream's batching interval @param slideDuration: sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval """ keyed = self.map(lambda x: (1, x)) reduced = keyed.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) return reduced.map(lambda kv: kv[1])
def countByWindow(self, windowDuration, slideDuration): """ Return a new DStream in which each RDD has a single element generated by counting the number of elements in a window over this DStream. windowDuration and slideDuration are as defined in the window() operation. This is equivalent to window(windowDuration, slideDuration).count(), but will be more efficient if window is large. """ return self.map(lambda x: 1).reduceByWindow(operator.add, operator.sub, windowDuration, slideDuration)
def updateStateByKey(self, updateFunc, numPartitions=None, initialRDD=None): """ Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values of the key. @param updateFunc: State update function. If this function returns None, then corresponding state key-value pair will be eliminated. """ if numPartitions is None: numPartitions = self._sc.defaultParallelism if initialRDD and not isinstance(initialRDD, RDD): initialRDD = self._sc.parallelize(initialRDD) def reduceFunc(t, a, b): if a is None: g = b.groupByKey(numPartitions).mapValues(lambda vs: (list(vs), None)) else: g = a.cogroup(b.partitionBy(numPartitions), numPartitions) g = g.mapValues(lambda ab: (list(ab[1]), list(ab[0])[0] if len(ab[0]) else None)) state = g.mapValues(lambda vs_s: updateFunc(vs_s[0], vs_s[1])) return state.filter(lambda k_v: k_v[1] is not None) jreduceFunc = TransformFunction(self._sc, reduceFunc, self._sc.serializer, self._jrdd_deserializer) if initialRDD: initialRDD = initialRDD._reserialize(self._jrdd_deserializer) dstream = self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc, initialRDD._jrdd) else: dstream = self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc) return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer)
def __init__(self, ctx, func, *deserializers): self.ctx = ctx self.func = func self.deserializers = deserializers self.rdd_wrap_func = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser) self.failure = None
def _to_java_object_rdd(rdd): """ Return an JavaRDD of Object by unpickling It will convert each Python object into Java object by Pyrolite, whenever the RDD is serialized in batch or not. """ rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer())) return rdd.ctx._jvm.org.apache.spark.ml.python.MLSerDe.pythonToJava(rdd._jrdd, True)
def predict(self, test): """ Predict values for a single data point or an RDD of points using the model trained. """ raise NotImplementedError
def predict(self, x): """ Predict values for a single data point or an RDD of points using the model trained. """ if isinstance(x, RDD): return x.map(lambda v: self.predict(v)) x = _convert_to_vector(x) if self.numClasses == 2: margin = self.weights.dot(x) + self._intercept if margin > 0: prob = 1 / (1 + exp(-margin)) else: exp_margin = exp(margin) prob = exp_margin / (1 + exp_margin) if self._threshold is None: return prob else: return 1 if prob > self._threshold else 0 else: best_class = 0 max_margin = 0.0 if x.size + 1 == self._dataWithBiasSize: for i in range(0, self._numClasses - 1): margin = x.dot(self._weightsMatrix[i][0:x.size]) + \ self._weightsMatrix[i][x.size] if margin > max_margin: max_margin = margin best_class = i + 1 else: for i in range(0, self._numClasses - 1): margin = x.dot(self._weightsMatrix[i]) if margin > max_margin: max_margin = margin best_class = i + 1 return best_class
def predict(self, x): """ Predict values for a single data point or an RDD of points using the model trained. """ if isinstance(x, RDD): return x.map(lambda v: self.predict(v)) x = _convert_to_vector(x) margin = self.weights.dot(x) + self.intercept if self._threshold is None: return margin else: return 1 if margin > self._threshold else 0
def trainOn(self, dstream): """Train the model on the incoming dstream.""" self._validate(dstream) def update(rdd): # LogisticRegressionWithSGD.train raises an error for an empty RDD. if not rdd.isEmpty(): self._model = LogisticRegressionWithSGD.train( rdd, self.numIterations, self.stepSize, self.miniBatchFraction, self._model.weights, regParam=self.regParam, convergenceTol=self.convergenceTol) dstream.foreachRDD(update)
def predict(self, x): """ Predict values for a single data point or an RDD of points using the model trained. .. note:: In Python, predict cannot currently be used within an RDD transformation or action. Call predict directly on the RDD instead. """ if isinstance(x, RDD): return self.call("predict", x.map(_convert_to_vector)) else: return self.call("predict", _convert_to_vector(x))
def _train(cls, data, type, numClasses, features, impurity="gini", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0): first = data.first() assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint" model = callMLlibFunc("trainDecisionTreeModel", data, type, numClasses, features, impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain) return DecisionTreeModel(model)