in src/main/scala/com/aliyun/emr/example/spark/LinearRegression.scala [84:123]
def run(params: Params) {
val examples = MLUtils.loadLibSVMFile(getSparkContext, params.input).cache()
val splits = examples.randomSplit(Array(0.8, 0.2))
val training = splits(0).cache()
val test = splits(1).cache()
val numTraining = training.count()
val numTest = test.count()
println(s"Training: $numTraining, test: $numTest.")
examples.unpersist(blocking = false)
val updater = params.regType match {
case NONE => new SimpleUpdater()
case L1 => new L1Updater()
case L2 => new SquaredL2Updater()
}
val algorithm = new LinearRegressionWithSGD()
algorithm.optimizer
.setNumIterations(params.numIterations)
.setStepSize(params.stepSize)
.setUpdater(updater)
.setRegParam(params.regParam)
val model = algorithm.run(training)
val prediction = model.predict(test.map(_.features))
val predictionAndLabel = prediction.zip(test.map(_.label))
val loss = predictionAndLabel.map { case (p, l) =>
val err = p - l
err * err
}.reduce(_ + _)
val rmse = math.sqrt(loss / numTest)
println(s"Test RMSE = $rmse.")
getSparkContext.stop()
}