Spark MLlib 源代码解读之线性回归

xiaoxiao2021-02-28  113

Spark MLlib线性回归算法

原理分析:

什么是线性回归:

回归分析是一种统计工具,它利用两个或两个以上变量之间的关系,由一个或几个变量来预测另一个变量。

当自变量只有一个的时候,叫做一元线性回归。

h(x)=b0+b1(x)

当自变量有多个的时候,叫做多元线性回归。

h(x1,x2,..xn)=b0+b1(x1)+b2(x2)... 基本的应用场景就不介绍了,这个应该已经很熟悉了。 有一些参考的博客可以供学习。

线性回归理论篇

一般情况下,通过对于已知数据的拟合,我们可以得到一个线性方程。

hΘ(x)=Θ0+Θ1x

我们统一可以将方程写成如下的格式:

hΘ(x)=i=0nΘixi

对于theta参数的求解,我们需要去求解 theta是否是最优。可以定义一个损失函数,这个损失函数定义为:

J(Θ)=12i=1m(hΘ(xi)yi)2

可以通过最小二乘法 和梯度下降法来调节,使得J(theta)有最小值。

梯度下降法:

批量梯度下降算法:

如上面的公式描述的那样,问题转换为求解 J(Θ) 的极小值问题。由于求解的是极小值,因此梯度方向是偏导数的反方向。

Θj:=ΘjαJ(Θ)Θj

其中 α 表示的是学习速率。当 α 过大时,有可能无法收敛。过小的时候,收敛的速度可能很慢。

当数据集中只有一个样本的时候,即m=1的时候。

J(Θ)Θj=12(hΘ(x)y)2Θj

=212(hΘ(x)y)(hΘ(x)y)Θj

=(hΘ(x)y)xj

所以梯度下降算法的公式变可以改变为:

Θj:=Θj+α(yihΘ(xi))xij 当样本数量不为1的时候,上述公式变为

Θj:=Θj+αi=1m(yihΘ(xi))xij

这个公式计算每次迭代计算theta时候,需要使用到整个样本数据集,复杂度很高。我们称这个算法为批量梯度下降算法。

随机梯度下降算法:

Loop{ for i=1 to m

Θj:=Θj+α(yihΘ(xi))xij (对于每个参数j)

}

随机梯度下降算法每次读取一条样本,就迭代对theta进行更新。然后判断其是否收敛。没有收敛的话,则继续读取样本进行处理。当所有样本都读取完了之后,则从头开始循环。与批量梯度下降算法相比,随机梯度下降算法耗时更少,尤其是当数据量很大的时候。

源码分析


首先是线性回归的伴生对象类,LinearRegressionWithSGD

这个类包含有静态的train方法,用于训练线性回归模型。 其中训练样本格式为(label, features).

input表示为训练样本,格式RDD(label, features)

numIterations表示迭代次数, stepSize表示迭代步长。

miniBatchFraction表示每次迭代的参与的样本的比例

initialWeights表示的是初始的权重。返回值是一个线性回归的模型类。

def train( input: RDD[LabeledPoint], //输入,为labeledPoint类型,其中labele为double类型,feature为向量, numIterations: Int, //迭代次数 stepSize: Double, //迭代步长 miniBatchFraction: Double, //每次迭代的时候参与计算的样本的比例 initialWeights: Vector): LinearRegressionModel = { //返回的是一个线性回归的模型model类 //这里初始化了一个线性回归类并调用了里面的run方法 new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction) .run(input, initialWeights) }

然后我们来看这个线性回归的类LinearRegressionWithSGD类

同样的stepSize表示迭代步长,numIterations表示迭代次数,

可以看到这个类继承了GeneralizedLinearAlgorithm。

这个类里面并没有run方法,调用伴生对象的run方法后,其运行的是父类GeneralizedLinearAlgorithm中的run方法

其中的run方法在广义线性回归的类里面。

class LinearRegressionWithSGD private[mllib] ( //线性回归的类 private var stepSize: Double, //步长, private var numIterations: Int, //迭代次数 private var miniBatchFraction: Double) //每次迭代的时候参与计算的样本的比例 extends GeneralizedLinearAlgorithm[LinearRegressionModel] with Serializable { private val gradient = new LeastSquaresGradient()// 梯度下降算法采用最小平方损失函数梯度下降法。 private val updater = new SimpleUpdater() //采用简单梯度更新方法,没有正则化方法。 @Since("0.8.0") override val optimizer = new GradientDescent(gradient, updater) //根据梯度下降方法,梯度更新方法,新建梯度优化计算方法。 .setStepSize(stepSize) //设置步长 .setNumIterations(numIterations) //设置迭代次数 .setMiniBatchFraction(miniBatchFraction) //设置数据的采样比例 @Since("0.8.0") def this() = this(1.0, 100, 1.0) //默认的线性回归对象。步长为1.0,迭代次数为100次,比例为1.0 override protected[mllib] def createModel(weights: Vector, intercept: Double) = { new LinearRegressionModel(weights, intercept) } }

可以由上面看到,这个类调用的run方法再它的父类广义线性回归里面。我们来看看这个类的run方法

可以看到,在这个方法里面进行了特征维度的检测,然后是数据是否缓存,是否降维

是否需要增加偏置项,最后初始化权重。然后调用了optimizer的optimize方法来进行计算。 最后调用createModel方法,返回结果。

其中,在optimizer是一个GradientDescent类的对象。所以我们之后可以进一步看这个方法,这个方法也是整个线性回归最核心的方法。

def run(input: RDD[LabeledPoint], initialWeights: Vector): M = { //样本训练的run方法。 if (numFeatures < 0) { //特征的维度,如果特征的维度被设置为小于0,则取出第一个特征的特征的维度 numFeatures = input.map(_.features.size).first() } //看看输入样本有没有缓存。 if (input.getStorageLevel == StorageLevel.NONE) { logWarning("The input data is not directly cached, which may hurt performance if its" + " parent RDDs are also uncached.") } // Check the data properties before running the optimizer //检查数据的属性。 if (validateData && !validators.forall(func => func(input))) { throw new SparkException("Input validation failed.") } /** * 数据的降维。 *在优化过程中,收敛率取决于训练数据的维度。 *通过降维,改变了收敛速度。 */ val scaler = if (useFeatureScaling) { new StandardScaler(withStd = true, withMean = false).fit(input.map(_.features)) } else { null } //是否需要增加偏置项。即theta0的常数项。 val data = if (addIntercept) { if (useFeatureScaling) { input.map(lp => (lp.label, appendBias(scaler.transform(lp.features)))).cache() } else { input.map(lp => (lp.label, appendBias(lp.features))).cache() } } else { if (useFeatureScaling) { input.map(lp => (lp.label, scaler.transform(lp.features))).cache() } else { input.map(lp => (lp.label, lp.features)) } } //初始的权重和偏置项。 val initialWeightsWithIntercept = if (addIntercept && numOfLinearPredictor == 1) { appendBias(initialWeights) } else { /** If `numOfLinearPredictor > 1`, initialWeights already contains intercepts. */ initialWeights } //利用了optimizer的optimize方法进行梯度下降。返回最优权重,调用的是GradientDescent的optimize方法。 val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept) //如果有偏置项的话,获取偏置项,否则intercept置为0 val intercept = if (addIntercept && numOfLinearPredictor == 1) { weightsWithIntercept(weightsWithIntercept.size - 1) } else { 0.0 } //获取权重 var weights = if (addIntercept && numOfLinearPredictor == 1) { Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1)) } else { weightsWithIntercept } if (useFeatureScaling) { if (numOfLinearPredictor == 1) { weights = scaler.transform(weights) } else { /** * For `numOfLinearPredictor > 1`, we have to transform the weights back to the original * scale for each set of linear predictor. Note that the intercepts have to be explicitly * excluded when `addIntercept == true` since the intercepts are part of weights now. */ var i = 0 val n = weights.size / numOfLinearPredictor val weightsArray = weights.toArray while (i < numOfLinearPredictor) { val start = i * n val end = (i + 1) * n - { if (addIntercept) 1 else 0 } val partialWeightsArray = scaler.transform( Vectors.dense(weightsArray.slice(start, end))).toArray System.arraycopy(partialWeightsArray, 0, weightsArray, start, partialWeightsArray.size) i += 1 } weights = Vectors.dense(weightsArray) } } // Warn at the end of the run as well, for increased visibility. if (input.getStorageLevel == StorageLevel.NONE) { logWarning("The input data was not directly cached, which may hurt performance if its" + " parent RDDs are also uncached.") } // Unpersist cached data if (data.getStorageLevel != StorageLevel.NONE) { data.unpersist(false) } createModel(weights, intercept) }

这个是GradientDescent类的optimize方法,其内部又调用了一个runMiniBatchSGD方法 runMiniBatchSGD返回的结果是权重

//data为RDD格式,其类型为RDD[(Double,Vector)] 训练的数据,initialWeights, 初始化的权重。 //其返回值为更新的权重,类型为Vector类型。 //这个方法里面又再一次调用了GradientDescent.runMiniBatchSGD方法。 def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = { val (weights, _) = GradientDescent.runMiniBatchSGD( data, gradient, updater, stepSize, numIterations, regParam, miniBatchFraction, initialWeights, convergenceTol) weights } }

接下来看这个runMiniBatchSGD方法,这个方法是整个线性回归最核心的方法

它大体的思路是,首先初始化好初始权重参数和历史迭代误差的可变数组,然后在每次迭代的时候,广播这个更新的权重到每个rdd。调用treeAggregate算子,每次对数据进行随机采样(无放回采样),然后先对每个分区的数据进行计算梯度值和误差值,然后接下来对每个分区的计算好的梯度值和误差值进行累加。最后更新权重值。

def runMiniBatchSGD( data: RDD[(Double, Vector)], //输入样本 gradient: Gradient, //梯度函数对象,(用于计算损失函数的梯度的一个单一的例子。) updater: Updater, //梯度更新的函数的对象。 stepSize: Double, //步长 numIterations: Int, //迭代次数 regParam: Double, //正则化参数 miniBatchFraction: Double, //每次迭代参与计算的样本的比例,默认这个比例是1.0 initialWeights: Vector, //初始化权重 convergenceTol: Double): (Vector, Array[Double]) = { //返回为两个元素 //第一个元素是一个列矩阵,表示的是每一个特征的权重。第二个元素表示的是迭代的损失值。 if (miniBatchFraction < 1.0 && convergenceTol > 0.0) { logWarning("Testing against a convergenceTol when using miniBatchFraction " + "< 1.0 can be unstable because of the stochasticity in sampling.") } //历史迭代的误差数组。存储的是每次迭代的误差值。 val stochasticLossHistory = new ArrayBuffer[Double](numIterations) // Record previous weight and current one to calculate solution vector difference var previousWeights: Option[Vector] = None //之前的权重 var currentWeights: Option[Vector] = None //当前的权重 //训练的样本数量。 val numExamples = data.count() // if no data, return initial weights to avoid NaNs //如果数据为空,则返回初始的输入参数,即初始的权重和一个误差数组。因为没有找到数据 if (numExamples == 0) { logWarning("GradientDescent.runMiniBatchSGD returning initial weights, no data found") return (initialWeights, stochasticLossHistory.toArray) } //如果数据量乘以采样比例小于1的话,说明miniBatchFraction设置的太小了。弹出警告需要设置的大一点。 if (numExamples * miniBatchFraction < 1) { logWarning("The miniBatchFraction is too small") } var weights = Vectors.dense(initialWeights.toArray) //将权重初始化,转换为密集向量。 val n = weights.size //表示参数的个数 /** * For the first iteration, the regVal will be initialized as sum of weight squares * if it's L2 updater; for L1 updater, the same logic is followed. *第一次迭代,正则化值初始化为权重的加权平方和。 */ var regVal = updater.compute( weights, Vectors.zeros(weights.size), 0, 1, regParam)._2 //这个参数用于表明是否收敛 var converged = false // indicates whether converged based on convergenceTol var i = 1 //i等于1表明第一次迭代 // //接下来就是真个梯度下降法的核心代码。 //weights权重的迭代计算 while (!converged && i <= numIterations) { //首先广播权重, 注意在每次迭代的开始的时候都需要广播更新的权重值 val bcWeights = data.context.broadcast(weights) //聚合的时候利用的是treeAggregate方法进行聚合。聚合后返回值的类型为 //(gradientSum(表示的是梯度的和),lossSum(表示的是损失和),miniBatchSize(表示的是采样比例) //treeAggregate算子的执行逻辑如下: //treeAggregate的逻辑和aggregate相似,不过它是采用一种多层树结构的模式进行聚合。 //和aggregate不一样的另一个区别是它的初始值不会被应用到第二个reduce函数上面去。 //默认的这个tree的深度是2. //举个简单的例子。 //val z = sc.parallelize(List(1,2,3,4,5,6), 2) //z.treeAggregate(0)(math.max(_, _), _ + _) //res40: Int = 9 //注意,这个初始值不会作用到第二个reduce函数。s //z.treeAggregate(5)(math.max(_, _), _ + _) //res42: Int = 11 // reduce of partition 0 will be max(5, 1, 2, 3) = 5 // reduce of partition 1 will be max(4, 5, 6) = 6 // final reduce across partitions will be 5 + 6 = 11 //梯度计算采用的是随机梯度下降方法。false表示的是不放回抽样 //随机抽取样本自己,采样时采用不放回采样。每次采样比例为miniBatchFraction。最后一个参数表示为随机种子,每次的值都不一样。 //保证每次抽样是随机的 val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i) .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))( //调用BDV.zeros方法初始化一个长度为n的0向量。 //初始值为一个长度为n的0向量,初始的误差值设为0, //计算每一个样本的梯度,然后对所有的样本进行累加。 seqOp = (c, v) => { // c: (grad, loss, count), v: (label, features) //第一个seqOp函数输入为(c,v)类型,返回的是一个c类型。 //通过调用gradient.compute方法来计算误差值。这个方法输入参数为features,label,权重值,以及得到的梯度值 //返回的类型为(梯度值,误差值,计数值,样本数+1) //默认调用的是LeastSquaresGradient的compute方法。 val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1)) (c._1, c._2 + l, c._3 + 1) }, //这个表示对于所有的处理好的样本(均为c类型)进行聚合。 combOp = (c1, c2) => { // c: (grad, loss, count) //即对应的梯度向量值相加,对应的损失和相加,对应的计数值相加。最后一个参数表示的是样本数量 (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3) }) if (miniBatchSize > 0) { //当样本数量大于0的时候。 /** *保存误差,迭代误差=平均损失+正则误差。 */ stochasticLossHistory.append(lossSum / miniBatchSize + regVal) //这个表示迭代完成后将误差加入误差数组。 //其中的损失为平均损失,即总的损失除以总数量的计数和。 //调用updater的compute方法来更新梯度值。 val update = updater.compute( weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble), //Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble)表示总的梯度和除以数据量表示平均梯度。 stepSize, i, regParam) //stepSize表示步长,i表示第i次迭代,regParam表示正则化参数。 weights = update._1 //将权重更新为update的第一个值 regVal = update._2 previousWeights = currentWeights currentWeights = Some(weights) if (previousWeights != None && currentWeights != None) { converged = isConverged(previousWeights.get, currentWeights.get, convergenceTol) } } else { logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero") } i += 1 } logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format( stochasticLossHistory.takeRight(10).mkString(", "))) (weights, stochasticLossHistory.toArray) //迭代完成之后,返回的是一个迭代的初始权重和每次迭代的损失数组。 }

可以看到在上述代码的计算中,使用到了Gradient.compute方法来计算梯度值和误差值。 接下来可以看看这个方法.在之前LinearRegressionWithSGD的类中,可以看到gradient使用的是LeastSquaresGradient,那么计算的时候,调用的也是这个类里面的compute方法。

首先是计算 y-h(x).这个表示的是模拟值和实际值的误差,用diff表示 接下来计算cumGradient= x*((y-h(x)))。这些计算公式和之前推导的公式都是一致的 最后返回损失值。

class LeastSquaresGradient extends Gradient { //对每个样本,计算线性回归的最小二乘损失函数的梯度 override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = { val diff = dot(data, weights) - label //(y-h(x))用于计算当前标签和类别标签的值的差值。 val loss = diff * diff / 2.0 //当前的平方的值的一般为损失值。 val gradient = data.copy scal(diff, gradient) //计算梯度值是。 x*((y-h(x))) (gradient, loss) //返回的值的类型为一个元祖,其中第一个值表示的是梯度值,第二个值表示的是损失值。 } override def compute( //注意默认调用的是这个方法用来做计算 data: Vector, //传入的样本的数据的值 label: Double, //label表示样本标签。 weights: Vector, //权重值。 cumGradient: Vector): Double = { //cumGradient表示最终计算得到的梯度值, val diff = dot(data, weights) - label //(y-h(x))用于计算当前标签和类别标签的值的差值。 axpy(diff, data, cumGradient) //cumGradient= x*((y-h(x)))。其中cumGradient是已经创建好的的向量,可能使我们刚开始 //传入的初始的0向量值。 diff * diff / 2.0 //然后返回损失值。 } }

在计算完之后需要对权重进行更新。更新使用的是SimpleUpdater 首先会计算alpha值(学习速率)。alpha值在刚开始的时候更新比较快,后面更新比较慢。 thisIterStepSize表示的是alpha值,平方根的倒数作为alpha可以确保在迭代初期的时候,迭代的比较快,在迭代的后期比较慢。

class SimpleUpdater extends Updater { override def compute( weightsOld: Vector, //weightsOld表示的是上一次迭代的时候计算的特征权重向量。 gradient: Vector, // *gradient表示的是这一次迭代时候计算的特征权重向量 stepSize: Double, // iter: Int, //iter, 表示的是当前的迭代次数。 regParam: Double): (Vector, Double) = { // * regParam 表示的是正则化参数 //这个表示已当前迭代次数的平方根的倒数作为本次迭代的下降因子。 ///这个在公式中就是alpha, val thisIterStepSize = stepSize / math.sqrt(iter) val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector //将上次的特征权重的值转换为密集向量。 brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights) //theta=theta-alpha*gradient //表示的是brzWeights=brzWeights- thisIterStepSize*gradient.toBreeze (Vectors.fromBreeze(brzWeights), 0) //最后返回值 } }

最后就是线性回归模型类,LinearRegressionModel类。 weigths表示的是权重值,intercept表示的是偏置项,常数项。 有一个predictPoint方法用来预测y值

class LinearRegressionModel ( @Since("1.0.0") override val weights: Vector, //模型的参数,如theta0theta1 @Since("0.8.0") override val intercept: Double) //这个表示的是偏置项 extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable with Saveable with PMMLExportable { //y=a*X+b override protected def predictPoint( dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double = { //这一步其实表示的就是theta*X+intercept(偏置项)。返回的结果就是预测的y值。 weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept } override def save(sc: SparkContext, path: String): Unit = { GLMRegressionModel.SaveLoadV1_0.save(sc, path, this.getClass.getName, weights, intercept) } override protected def formatVersion: String = "1.0" }

参考资料:

正则化深入梯度下降算法mllib线性回归源代码 线性回归及梯度下降
转载请注明原文地址: https://www.6miu.com/read-45975.html

最新回复(0)