WordCount 코드
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo")
.config("spark.sql.adaptive.enabled", False) \
.config("spark.sql.shuffle.partitions", 3) \
.getOrCreate()
df = spark.read.text("shakespeare.txt")
df_count = df.select(explode(split(df.value, " ")).alias("word")).groupBy("word").count()
df_count.show()
- 이 코드는 몇개의 job을 만들어낼까?
- 1개
- show()가 없다면 이 코드는 몇 개의 Job을 만들어낼까?
- show가 없다면 action이 없으므로, 의미 없는 동작이다.
- Spark은 Lazy Execution을 통해서 Action이 주어지지 않으면 Transformation을 실행하지 않는다.
- GroupBy 이전의 stage0과 stage1으로 나뉨
- 조금 더 깊이 들어가보면, text 데이터를 스캔하고, transformation을 진행한 뒤
- Groupby 때문에 Shuffling 발생(Exchange), 이후 Count
- CollectLimit(show)
Join 코드(Shuffle Join)
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.config("spark.sql.adaptive.enalbed", False) \
.config("spark.sql.shuffle.partitions", 3) \
.getOrCreate()
df_large = spark.read.json("large_data/")
df_small = spark.read.json("small_data/")
join_expr = df_large.id = df_small.id
join_df = df_large.join(df_small, join_expr, "inner")
join_df.show()
- 지금 사용한 join은 셔플 조인이다.
- Job 0 : df_large 만들기
- Job 1 : df_small 만들기
- Job 2 : df_large와 df_small 각각이 join키가 되는 ID를 기준으로 각자 Shuffling
- 그걸 데이터프레임 레코드들을 하나로 묶어주는 역할
- 쿼리 레벨에서 다시 시각화
- 데이터 불러오기
- Sort Merge Join
- 만약 한쪽이 굉장히 작을 경우엔 이렇게 하는게 커다란 오버헤드가 된다.
Join 코드(BroadCast Join)
from pyspark.sql.functions import broadcast
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.config("spark.sql.adaptive.enabled", False) \
.config("spark.sql.shuffle.partitions", 3) \
.getOrCreate()
df_large = spark.read.json("large_data/")
df_small = spark.read.json("small_data/")
join_expr = df_large.id == df_small.id
join_df = df_large.join(broadcast(df_small), join_expr, "inner")
join_df.show()
- broadcast join은 작은 데이터프레임을 큰 단위의 데이터프레임의 분산 서버로 뿌려서 셔플링을 최소화하면서 join하는 방법
- spark.sql.adaptive.autoBroadcastJoinThreshold
- 명시적으로 호출하지 않아도, 옵션에 따라서 Spark이 알아서 작은 데이터프레임을 broadcast하는 형태로 최적화를 하게 되는데, 이 때 사용하는 configuration은 이렇다(바이트 크기)
- 이것보다 작으면 셔플 조인이 아니라 브로드캐스트 조인
Spark Web UI
spark-submit --master "local[3]" wordcount.py
- 웹 UI로 확인 가능하고
- 스테이지별로 보고 싶다면, Jobs로 확인해서 링크를 눌러보면 Stage 확인 가능
- 특정 Stage도 자세히 볼 수 있고
- 특정 Stage가 너무 오래 걸린다면, WebUI를 통해서 찾아본다.
- 하나의 파일은 3개의 Partition으로 나뉜다면 Task가 3개로 된다(3개 파일을 읽어야하기 때문)
- BroadCast Join은 해당하는 데이터가 드라이버로 잠깐 왔다가 가게 되어있는데, 이 때 Job을 하나 더 늘린다.
'데이터 엔지니어링 > Spark' 카테고리의 다른 글
27. SPARK ML 소개 (0) | 2023.08.23 |
---|---|
26. Spark 내부 동작(Bucketing과 Partitioning) (0) | 2023.08.23 |
24. Spark 내부 동작(Execution Plan) (0) | 2023.08.22 |
23. Spark 내부 동작(Spark 파일 포맷, Partition 저장 형태, Schema가 있는 데이터의 Merge) (0) | 2023.08.22 |
22. Unit Test (0) | 2023.08.22 |