HDFS 데이터나 입력 파티션을 처리형태에 맞춰 최적화할 수 있다면 시간을 단축하고 리소스를 덜 사용할 수 있다.
Bucketing과 File System Partitioning 소개
- 둘다 Hive 메타스토어의 사용이 필요 : saveAsTable
- 데이터 저장을 이후 반복처리에 최적화된 방법으로 하는 것
- Bucketing
- 먼저 Aggregation이나 Window 함수나 JOIN에서 많이 사용되는 컬럼이 있는지?
- 셔플링을 최소화하려는 목적
- 나중에 이 데이터들이 Partition으로 메모리에 로딩이 됐을 때, 셔플링을 최소화하는 상태로 Operation이 진행될 수 있도록 하는 것
- 특히 조인을 할 때, 조인 대상의 Partition 수가 맞지 않는다고 할 때, 또 셔플링이 발생하기 때문에 Bucketing을 해놔도 별 의미가 없다
- 있다면 데이터를 이 특정 컬럼(들)을 기준으로 테이블로 저장
- 이 때의 버킷의 수도 지정
- 먼저 Aggregation이나 Window 함수나 JOIN에서 많이 사용되는 컬럼이 있는지?
- File System Partitioning
- 원래 Hive에서 많이 사용
- 데이터의 특정 컬럼(들)을 기준으로 폴더 구조를 만들어 데이터 저장 최적화
- 위의 컬럼들을 Partition Key라고 부름
Bucketing
- DataFrame을 특정 ID(컬럼)를 기준으로 나눠서 테이블로 저장
- 다음부터는 이를 로딩하여 사용함으로써 반복 처리시 시간 단축
- DataFrameWriter의 bucketBy 함수 사용
- Bucket의 수와 기준 ID(컬럼) 지정
- 컬럼의 값을 가지고 해싱을 한 다음, 버킷의 수로 나눠서 특정 레코드가 어느 버킷으로 갈지 정한다.
- 데이터의 특성을 잘 알고 있는 경우 사용 가능
- DataFrameWriter의 bucketBy 함수 사용
- 다음부터는 이를 로딩하여 사용함으로써 반복 처리시 시간 단축
File System Partitioning
- 데이터를 Partition Key 기반 폴더 ("Partition") 구조로 물리적으로 나눠 저장
DataFrame에서 이야기하는 Partition- Hive에서 사용하는 Partitioning을 말함
- Partitioning의 예와 잇점
- 굉장히 큰 로그 파일을 데이터 생성시간 기반으로 데이터 읽기를 많이 한다면?
- 1. 데이터 자체를 연도-월-일의 폴더 구조로 저장
- 필요하다면 시간 기준으로 폴더를 하위에 또 만들어도 됨
- 관리하는 것도 직관적, 많은 경우 이미 로그 파일을 이 형태로 저장
- Spark에서는 데이터를 재정리할 필요도 없고, External 테이블 형태로 schema를 매핑해서 처리할 수도 있다.
- 2. 보통 위의 구조로 이미 저장되는 경우가 많음
- 1. 데이터 자체를 연도-월-일의 폴더 구조로 저장
- 이를 통해 데이터를 읽기 과정을 최적화 (스캐닝 과정이 줄어들거나 없어짐)
- 데이터 관리도 쉬워짐(Retention Policy 적용시)
- 이런 로그 파일은 1년 동안 저장을 한다(일년이 지난 날짜의 데이터들을 날리면 된다)
- 굉장히 큰 로그 파일을 데이터 생성시간 기반으로 데이터 읽기를 많이 한다면?
- DataFrameWriter의 partitionBy 사용
- Partition key를 잘못 선택하면 엄청나게 많은 파일들이 생성됨!
- 데이터프레임을 write할 때만 쓸 수 있는 operating이고, 키에 맞춰서 디렉토리가 생성된다.
- cardinality가 낮은 걸 써야한다(가능한 값의 경우의 수가 낮은 것)
실습(Bucket Demo)
(1) bucketDemo.py
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
# 최적화를 막기 위해서, 교육적인 측면에서 일단 disabled(execution plan을 알아보기 힘듬)
spark = SparkSession \
.builder \
.appName("Python Spark SQL 저장하기") \
.config("spark.sql.autoBroadcastJoinThreshold", -1) \
.config("spark.sql.adaptive.enabled", False) \
.enableHiveSupport() \
.getOrCreate()
# Redshift와 연결해서 DataFrame으로 로딩하기
url = "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=아이디&password=비밀번호"
df_user_session_channel = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.user_session_channel") \
.load()
df_session_timestamp = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.session_timestamp") \
.load()
join_expr = df_user_session_channel.sessionid == df_session_timestamp.sessionid
df_join = df_user_session_channel.join(df_session_timestamp, join_expr, "inner")
df_join.show(10)
spark.sql("DROP TABLE IF EXISTS bk_usc")
spark.sql("DROP TABLE IF EXISTS bk_st")
# 3개의 버킷을 기준으로 sessionid로 테이블을 만드는 걸로(Action 2개)
df_user_session_channel.write.mode("overwrite").bucketBy(3, "sessionid").saveAsTable("bk_usc")
df_session_timestamp.write.mode("overwrite").bucketBy(3, "sessionid").saveAsTable("bk_st")
df_bk_usc = spark.read.table("bk_usc")
df_bk_st = spark.read.table("bk_st")
join_expr2 = df_bk_usc.sessionid == df_bk_st.sessionid
df_join2 = df_bk_usc.join(df_bk_st, join_expr2, "inner")
df_join2.show(10)
input("Waiting ...")
- Spark session Build(Hive metastore 사용)
- redshift 연결해서, 테이블 로드
- 두 테이블 조인(1차)
- 처리 이후 테이블 드랍
- 버킷을 3개로 나눠서 ,session id 기준으로 테이블 생성(spark-warehouse에 저장됨)
- 다시 조인(2차)
- input은 세션 완료를 잠시 보류하기 위해서 사용(web ui 보기 위함)
- spark-submit
spark-submit --master "local[4]" bucketDemo.py
(2) Spark WebUI(JOB간 비교)
첫번째 JOB
- 두 개의 데이터프레임을 JDBC를 통해 읽은 후
- SHUFFLE JOIN 발생(EXCHANGE)
- 조인을 한 후 결과를 받아서 다시 show
- stage 3개 : 데이터 read, 데이터 read, Exchange
두번째 JOB
- 버켓팅을 통해 테이블을 저장
세번째 JOB
- 버켓팅을 통해 테이블을 저장
네번째 JOB
- 첫번째 job과 비교
- 조인 키가 session_id 였고, session_id를 기준으로 bucketing 진행했기 때문에
- join 간 셔플링이 발생하지 않았음
- 최적화할 수 있다면 시간을 단축하고 리소스를 덜 사용
(3) Spark-warehouse 저장 파일 확인
- 버킷을 3개로 저장했기 때문에, 3개의 파일로 저장되는 걸 볼 수 있음
실습(Partitioning Demo)
(1) Spark Session Build
from pyspark.sql import *
from pyspark.sql.functions import *
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Spark FS Partition Demo") \
.master("local[3]") \
.enableHiveSupport() \
.getOrCreate()
(2) Spark read csv
df = spark.read.csv("./data/appl_stock.csv", header=True, inferSchema=True)
df.printSchema()
(3) year, month 컬럼 생성
df = df.withColumn("year", year(df.Date)) \
.withColumn("month", month(df.Date))
(4) 기존 테이블 삭제
# 뒤에서 기존 테이블이 있을 경우, 에러가 나기 때문에 drop 테이블 한것
# 없는 테이블을 삭제하는 경우 에러가 나기 때문에 Drop TABLE 한것
spark.sql("DROP TABLE IF EXISTS appl_stock")
(5) year, month 기준 partition 후 저장, Spark-warehouse 확인
df.write.partitionBy("year", "month").saveAsTable("appl_stock")
(6) 어떻게 partition 된 데이터를 Read 하는가?
# DATAFRAME API를 활용해서 이 조건에 맞는 데이터들만 로딩할 수 있다.
# 처음부터 원하는 테이블만 읽어들이기 때문에 SCANNING에 OVERHEAD가 없다
df = spark.read.table("appl_stock").where("year = 2016 and month = 12")
- spark read table
- where 사용 후 조건 지정
- 원하는 테이블을 읽어들이기 때문에 scanning에서 overhead를 줄일 수 있음
spark.sql("SELECT * FROM appl_stock WHERE year = 2010 and month = 12").show(10)
- spark sql로도 사용할 수 있음
'데이터 엔지니어링 > Spark' 카테고리의 다른 글
28. Spark ML(Regression, 보스턴 주택값 예측 모델) (0) | 2023.08.23 |
---|---|
27. SPARK ML 소개 (0) | 2023.08.23 |
25. Spark 내부 동작(Execution Plan 실습, Spark Web UI) (0) | 2023.08.22 |
24. Spark 내부 동작(Execution Plan) (0) | 2023.08.22 |
23. Spark 내부 동작(Spark 파일 포맷, Partition 저장 형태, Schema가 있는 데이터의 Merge) (0) | 2023.08.22 |