我正在尝试Estimator在PySpark MLlib中构建一个简单的自定义。我在这里可以编写自定义的Transformer,但是我不确定如何在.NET上执行此操作Estimator。我也不明白做什么@keyword_only,为什么我需要这么多的设置方法和获取方法。Scikit-learn似乎有一个适用于自定义模型的文档(请参阅此处,但PySpark没有。
Estimator在PySpark MLlib
.NET
@keyword_only
Scikit-learn
示例模型的伪代码:
class NormalDeviation(): def __init__(self, threshold = 3): def fit(x, y=None): self.model = {'mean': x.mean(), 'std': x.std()] def predict(x): return ((x-self.model['mean']) > self.threshold * self.model['std']) def decision_function(x): # does ml-lib support this?
一般来说,没有文档,因为对于Spark 1.6 / 2.0,大多数相关API都不打算公开。它应该在Spark 2.1.0中更改(请参阅SPARK-7146)。
API是比较复杂的,因为它必须遵循特定的惯例,以使给定Transformer或Estimator兼容的PipelineAPI。对于某些功能,例如读写和网格搜索,可能需要其中一些方法。其他,例如keyword_only,只是简单的帮手,并非严格要求。
Transformer
Estimator
PipelineAPI
keyword_only
假设您已经为平均参数定义了以下混合:
from pyspark.ml.pipeline import Estimator, Model, Pipeline from pyspark.ml.param.shared import * from pyspark.sql.functions import avg, stddev_samp class HasMean(Params): mean = Param(Params._dummy(), "mean", "mean", typeConverter=TypeConverters.toFloat) def __init__(self): super(HasMean, self).__init__() def setMean(self, value): return self._set(mean=value) def getMean(self): return self.getOrDefault(self.mean)
标准偏差参数:
class HasStandardDeviation(Params): standardDeviation = Param(Params._dummy(), "standardDeviation", "standardDeviation", typeConverter=TypeConverters.toFloat) def __init__(self): super(HasStandardDeviation, self).__init__() def setStddev(self, value): return self._set(standardDeviation=value) def getStddev(self): return self.getOrDefault(self.standardDeviation)
和阈值:
class HasCenteredThreshold(Params): centeredThreshold = Param(Params._dummy(), "centeredThreshold", "centeredThreshold", typeConverter=TypeConverters.toFloat) def __init__(self): super(HasCenteredThreshold, self).__init__() def setCenteredThreshold(self, value): return self._set(centeredThreshold=value) def getCenteredThreshold(self): return self.getOrDefault(self.centeredThreshold)
您可以创建以下基本Estimator内容:
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable from pyspark import keyword_only class NormalDeviation(Estimator, HasInputCol, HasPredictionCol, HasCenteredThreshold, # Available in PySpark >= 2.3.0 # Credits https://stackoverflow.com/a/52467470 # by https://stackoverflow.com/users/234944/benjamin-manns DefaultParamsReadable, DefaultParamsWritable): @keyword_only def __init__(self, inputCol=None, predictionCol=None, centeredThreshold=1.0): super(NormalDeviation, self).__init__() kwargs = self._input_kwargs self.setParams(**kwargs) # Required in Spark >= 3.0 def setInputCol(self, value): """ Sets the value of :py:attr:`inputCol`. """ return self._set(inputCol=value) # Required in Spark >= 3.0 def setPredictionCol(self, value): """ Sets the value of :py:attr:`predictionCol`. """ return self._set(predictionCol=value) @keyword_only def setParams(self, inputCol=None, predictionCol=None, centeredThreshold=1.0): kwargs = self._input_kwargs return self._set(**kwargs) def _fit(self, dataset): c = self.getInputCol() mu, sigma = dataset.agg(avg(c), stddev_samp(c)).first() return NormalDeviationModel( inputCol=c, mean=mu, standardDeviation=sigma, centeredThreshold=self.getCenteredThreshold(), predictionCol=self.getPredictionCol()) class NormalDeviationModel(Model, HasInputCol, HasPredictionCol, HasMean, HasStandardDeviation, HasCenteredThreshold, DefaultParamsReadable, DefaultParamsWritable): @keyword_only def __init__(self, inputCol=None, predictionCol=None, mean=None, standardDeviation=None, centeredThreshold=None): super(NormalDeviationModel, self).__init__() kwargs = self._input_kwargs self.setParams(**kwargs) @keyword_only def setParams(self, inputCol=None, predictionCol=None, mean=None, standardDeviation=None, centeredThreshold=None): kwargs = self._input_kwargs return self._set(**kwargs) def _transform(self, dataset): x = self.getInputCol() y = self.getPredictionCol() threshold = self.getCenteredThreshold() mu = self.getMean() sigma = self.getStddev() return dataset.withColumn(y, (dataset[x] - mu) > threshold * sigma)
最后,它可以按如下方式使用:
df = sc.parallelize([(1, 2.0), (2, 3.0), (3, 0.0), (4, 99.0)]).toDF(["id", "x"]) normal_deviation = NormalDeviation().setInputCol("x").setCenteredThreshold(1.0) model = Pipeline(stages=[normal_deviation]).fit(df) model.transform(df).show() ## +---+----+----------+ ## | id| x|prediction| ## +---+----+----------+ ## | 1| 2.0| false| ## | 2| 3.0| false| ## | 3| 0.0| false| ## | 4|99.0| true| ## +---+----+----------+