1. Spark Session Build
from pyspark.sql import SparkSession
spark = SparkSession /
.builder /
.appName("Boston Housing Linear Regression example") /
.getOrCreate()
2. Spark read csv
data = spark.read.csv('./data/boston_housing.csv', header=True, inferSchema=True)
3. 피처벡터 생성
# VectorAssembler를 import해서
# input이 되는 컬럼 이름은 feature_columns이고
# outputCol을 만들어라, 그것의 이름은 features로 줘라
from pyspark.ml.feature import VectorAssembler
feature_columns = data.columns[:-1]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
4. 피처벡터 생성
data_2 = assembler.transform(data)
- assembler로 dataframe을 주면, 출력으로 또다른 데이터프레임이 만들어지고
- 새로운 컬럼이 하나 붙는다
- 지정된 인풋컬럼들이 하나의 벡터로 묶여서 이 이름의 피처로 들어간다.
- transform이라는 함수는 하나의 데이터프레임을 입력을 받아서, 거기에 새로운 컬럼을 추가해주는 기능을 한다
- 앞의 transform의 앞에 붙은 변수의 클래스에 따라서 변형이 달라진다
5. 훈련용 / 테스트 데이터 나눈 후, 모델 fit
train, test = data_2.randomSplit([0.7, 0.3])
from pyspark.ml.regression import LinearRegression
# featureCol에 이미 하나를 묶어놓은게 있기 때문에 features라는 벡터가 들어간 걸 input 컬럼으로 주고
# medv가 예측해야하는 레이블 값
algo = LinearRegression(featuresCol="features", labelCol="medv")
model = algo.fit(train)
- LinearRegression에 feature 컬럼과, label 컬럼 나눠서 저장후 model fit
6. 모델 성능 측정
evaluation_summary = model.evaluate(test)
# MAE
evaluation_summary.meanAbsoluteError
# RMSE
evaluation_summary.rootMeanSquaredError
# R2
evaluation_summary.r2
7. 모델 예측값 살펴보기
# test 데이터를 model에 넘기면 predictions라는 컬럼을 추가한다.
predictions = model.transform(test)
predictions.show()
predictions.select(predictions.columns[13:]).show()
8. 모델 저장 및 불러오기
model.save("./data/boston_housing_model")
from pyspark.ml.regression import LinearRegressionModel
loaded_model = LinearRegressionModel.load(path) # "boston_housing_model")
'데이터 엔지니어링 > Spark' 카테고리의 다른 글
30. SparkML Pipeline (0) | 2023.08.25 |
---|---|
29. SparkML(Classification, 타이타닉 생존 예측 모델) (0) | 2023.08.25 |
27. SPARK ML 소개 (0) | 2023.08.23 |
26. Spark 내부 동작(Bucketing과 Partitioning) (0) | 2023.08.23 |
25. Spark 내부 동작(Execution Plan 실습, Spark Web UI) (0) | 2023.08.22 |