作者:云朵君 转载于数据STUDIO

Apache Spark已经成为机器学习和数据科学方面最常用和最受支持的开源工具之一。

在这篇文章中,我将帮助你开始使用Apache Spark的spark.ml线性回归[1]来预测波士顿住房价格。我们的数据来自Kaggle竞赛:波士顿郊区的住房价值[2]。对于每个房子的观察,我们有以下信息。

  • CRIM-- 各镇的人均犯罪率。
  • ZN-- 划定为25,000平方英尺以上地段的住宅用地比例。
  • INDUS--每个城镇的非零售商业亩数比例。
  • CHAS-- 查尔斯河虚拟变量(=1,如果区块靠近河流;否则为0)。
  • NOX-- 氮氧化物的浓度(每1000万份)。
  • RM-- 每套住宅的平均房间数。
  • AGE--1940年以前建造的业主自住单位的比例。
  • DIS-- 到波士顿五个就业中心的距离的加权平均值。
  • RAD-- 辐射状高速公路的可及性指数。
  • TAX-- 每10,000美元的全值财产税率。
  • PTRATIO-- 各镇的学生-教师比例。
  • BLACK-- 1000(Bk - 0.63)²,其中Bk是各镇的黑人比例。
  • LSTAT--人口的较低地位(百分比)。
  • MEDV-- 业主自住房屋的中位价值,单位为1000美元。这是目标变量。

输入的数据集包含关于各种房屋细节的数据。根据所提供的信息,我们的目标是建立一个模型来预测该地区特定房屋的中位价值。

加载数据
 

from pyspark import SparkConf, SparkContextf 
from pyspark.sql import SQLContext
import pyspark.pandas as ps
import pandas as pd
import munpy as np

sc=SparkContext()

sqlContext = SQLContext(sc)
house_df = sqlContext.read\
          .format('com.databricks.spark.csv')\
          .options(header='true', inferschema='true') 
house_df.take(1)
[Row(_c0=1, crim=0.00632, zn=18.0, 
 indus=2.31, chas=0, nox=0.538, 
 rm=6.575, age=65.2, 
dis=4.09, rad=1, tax=296, ptratio=15.3, 
black=396.9, lstat=4.98, medv=24.0)]

数据探索
以树状格式打印模式

house_df.cache()
house_df.printSchema()
root
 |-- _c0: integer (nullable = true)
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- black: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)

转换格式

house_ps = house_df.pandas_api()
house_ps.head()


进行描述性分析
 

# pandas on pyspark api
house_ps.describe().T 
# pyspark api
# house_df.describe().toPandas().transpose()


计算相关性
散点矩阵是一个很好的方法,可以大致确定我们在多个独立变量之间是否存在线性相关。

numeric_ps = house_ps.select_dtypes(include=['int32', 'float64'])
sampled_data = numeric_ps.sample(None,0.8).to_pandas()
# pyspark api
# numeric_features = [t[0] for t in house_df.dtypes if t[1] == 'int' or t[1] == 'double']
# sampled_data = house_df.select(numeric_features).sample(False, 0.8).toPandas()
axs = pd.plotting.scatter_matrix(sampled_data, figsize=(10, 10))
n = len(sampled_data.columns)
for i in range(n):
    v = axs[i, 0]
    v.yaxis.label.set_rotation(0)
    v.yaxis.label.set_ha('right')
    v.set_yticks(())
    h = axs[n-1, i]
    h.xaxis.label.set_rotation(90)
    h.set_xticks(())


这很难看出来相关性的具体值。我们来寻找自变量和目标变量之间的相关性。

# pandas on pyspark api
for i in house_ps.columns:
    if not(isinstance(house_ps[i][0], str)):
        print("Correlation to medv for ", i, house_df.stat.corr('medv',i))
# pyspark api
# for i in house_df.columns:
#     if not( isinstance(house_df.select(i).take(1)[0][0], str)):
#         print( "Correlation to medv for ", i, house_df.stat.corr('medv',i))


相关系数的范围从-1到1。当它接近1时,意味着有强烈的正相关;例如,当房间数量增加时,中位数趋于上升。当系数接近于-1时,意味着存在强烈的负相关;当人口中地位较低的百分比上升时,中位数趋于下降。最后,系数接近于零意味着没有线性相关。

我们将保留所有的变量。

数据预处理
为机器学习准备好数据。而我们只需要两列--feature和label("medv")。

from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'black', 'lstat'], outputCol = 'features')
vhouse_df = vectorAssembler.transform(house_ps.to_spark())
vhouse_df.take(1)
[Row(_c0=1, crim=0.00632, zn=18.0, indus=2.31,
     chas=0, nox=0.538, rm=6.575, age=65.2,
     dis=4.09, rad=1, tax=296, ptratio=15.3, 
     black=396.9, lstat=4.98, medv=24.0, 
features=DenseVector([0.0063, 18.0, 2.31, 0.0,
    0.538, 6.575, 65.2, 4.09, 1.0, 296.0, 15.3, 
    396.9, 4.98]))]

 

vhouse_df = vhouse_df.select(['features', 'medv'])
vhouse_df.show(3)
+--------------------+----+
|            features|medv|
+--------------------+----+
|[0.00632,18.0,2.3...|24.0|
|[0.02731,0.0,7.07...|21.6|
|[0.02729,0.0,7.07...|34.7|
+--------------------+----+
only showing top 3 rows

划分数据集
 

splits = vhouse_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]


构建模型
线性回归

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol='medv', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))
Coefficients: [-0.033270320810993494,0.0,0.0,
2.8968866736947967,-5.472405297743957,
4.696287531281046,-0.0012064102264590804,
-0.5230323761054688,0.0, -0.0003539128579220387,
-0.7878237086072251,0.009554888917947318,
-0.48680223168746073]
Intercept: 15.408275888118018


对训练集的模型进行总结,并打印出一些指标。

trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
RMSE: 4.761246
r2: 0.719844


RMSE衡量模型的预测值和实际值之间的差异。然而,在我们与实际的 "medv "值(如平均值、最小值和最大值)进行比较之前,单靠RMSE是没有意义的。我们从这样的比较结果看,RMSE看起来还不错。

train_df.description().show()
+-------+-----------------+
|summary|             medv|
+-------+-----------------+
|  count|              351|
|   mean|22.28176638176639|
| stddev| 9.00824436753725|
|    min|              5.0|
|    max|             50.0|
+-------+-----------------+


R平方为0.74,表明在我们的模型中,大约74%的 "medv "的变异性可以用模型来解释。这个结果与用Scikit-Learn中模型的结果一致。然而,我们必须注意到,模型在训练集上的性能可能与测试集上性能的有差别。

lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","medv","features").show(5)

from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="medv",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))
+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
| 27.32909684363379|22.0|[0.01096,55.0,2.2...|
|37.974936545033536|50.0|[0.01381,80.0,0.4...|
| 30.66096931803053|32.9|[0.01778,95.0,1.4...|
|25.590640678505917|23.1|[0.0187,85.0,4.15...|
|38.814535925105446|50.0|[0.02009,95.0,2.6...|
+------------------+----+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.699399

 

test_result = lr_model.evaluation(test_df)
print("测试集的均方误差(RMSE)= %g" % test_result.rootMeanSquaredError)


测试集的均方误差(RMSE)= 5.25553
我们在测试集上取得了更差的RMSE和R方。

print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
numIterations: 10
objectiveHistory: [0.49999999999999956, 
    0.43511159432936203, 0.23381429881321927,
    0.2079210300117359, 0.18059986620530444, 
    0.17872115598582575, 0.17801227910946057,
    0.17711600414335174, 0.17687578211116042,
    0.1767267266979467, 0.17665674267796524]
+--------------------+
|           residuals|
+--------------------+
|  -6.333741072734888|
|  1.7452091242446315|
|  0.6582938058650996|
|   5.469655307614115|
|   2.171389985714903|
|  0.9899554967126676|
| -0.9352089006808697|
|   -2.61447969541733|
|   8.332672316987711|
|   9.532751766721113|
|   3.701782081252727|
|  6.2940307524651615|
|-0.21258231477351686|
|    6.16341528227079|
|  0.2909720065068946|
| -10.276082002964081|
|   3.105304916403604|
| -3.9365856362334135|
|  2.4432113249928697|
| -1.4032802979313246|
+--------------------+
only showing top 20 rows


使用我们的线性回归模型来做一些预测。

predictions = lr_model.transform(test_df)
predictions.select("prediction","medv","features").show()
+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
| 27.32909684363379|22.0|[0.01096,55.0,2.2...|
|37.974936545033536|50.0|[0.01381,80.0,0.4...|
| 30.66096931803053|32.9|[0.01778,95.0,1.4...|
|25.590640678505917|23.1|[0.0187,85.0,4.15...|
|38.814535925105446|50.0|[0.02009,95.0,2.6...|
|  25.5351898705881|24.7|[0.02055,85.0,0.7...|
|27.439461908462157|23.9|[0.02543,55.0,3.7...|
| 26.41441481969886|25.0|[0.02875,28.0,15....|
|26.226035014631403|26.6|[0.02899,40.0,1.2...|
|26.275168005928407|28.7|[0.02985,0.0,2.18...|
| 20.60508392425436|17.5|[0.03113,0.0,4.39...|
| 29.97426122478751|34.9|[0.0315,95.0,1.47...|
|  31.5175578694329|34.9|[0.03359,75.0,2.9...|
| 37.53102148795149|48.5|[0.0351,95.0,2.68...|
|28.004950734280353|23.5|[0.03584,80.0,3.3...|
|27.881312230075878|27.9|[0.03615,80.0,4.9...|
|23.239362032730973|20.7|[0.03738,0.0,5.19...|
| 33.96969747719193|34.6|[0.03768,80.0,1.5...|
| 35.31712093757065|33.3|[0.04011,80.0,1.5...|
| 26.50714027180863|22.9|[0.04203,28.0,15....|
+------------------+----+--------------------+
only showing top 20 rows


决策树回归

from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'medv')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="medv", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("测试集的均方误差 (RMSE) = %g" % rmse)
测试集的均方误差 (RMSE) = 3.62349


特征重要性

dt_model.featureImportances
SparseVector(13, {0: 0.0578, 4: 0.0041, 5: 0.55
    76, 6: 0.0065, 7: 0.0743, 8: 0.0037, 
    9: 0.019, 10: 0.0256, 12: 0.2515})
house_df.take(1)
[Row(_c0=1, crim=0.00632, zn=18.0, indus=2.31,
     chas=0, nox=0.538, rm=6.575, age=65.2, 
     dis=4.09, rad=1, tax=296, ptratio=15.3, 
     black=396.9, lstat=4.98, medv=24.0)]


显然,在我们的数据中,房间的数量是预测房屋中位价的最重要特征。

梯度增强的树回归

from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'medv', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'medv', 'features').show(5)
+------------------+----+--------------------+
|        prediction|medv|            features|
+------------------+----+--------------------+
|22.313941290346456|22.0|[0.01096,55.0,2.2...|
|  44.5722191710249|50.0|[0.01381,80.0,0.4...|
|  34.9935613795878|32.9|[0.01778,95.0,1.4...|
| 22.34459784132282|23.1|[0.0187,85.0,4.15...|
|  44.6573190791291|50.0|[0.02009,95.0,2.6...|
+------------------+----+--------------------+
only showing top 5 rows
gbt_evaluator = RegressionEvaluator(
    labelCol="medv", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("测试集的均方误差 (RMSE) = %g" % rmse)
测试集的均方误差 (RMSE) = 3.50861

梯度增强的树状回归在我们的数据上表现最好。

参考资料
[1]
Apache Spark的spark.ml线性回归:https://spark.apache.org/docs/2.2.0/ml-classification-regression.html#linear-regression

[2]
Kaggle竞赛:波士顿郊区的住房价值:https://www.kaggle.com/c/boston-housing/data