1. 파일의 종류
- 데이터는 디스크에 파일로 저장됨 : 일에 맞게 최적화 필요
- 입력이 되는 데이터는 대부분 HDFS에서 오게 된다. 이는 물론 클라우드 스토리지를 포함
- 이 HDFS에서 내가 처리하려는 데이터가 어떤 포맷으로 저장되어있느냐가, 성능에 굉장한 영향을 미친다.
- Unstructured : Text
- Semi-structured : JSON, XML, CSV
- 사람의 눈으로 읽을 수 있다.
- Structued : PARQUET, AVRO, ORC, SequenceFile
- PARQUET이 SPARK에서 제일 많이 쓰인다.
2. Spark의 주요 파일 타입
- 컬럼 스토리지 : 열별로 저장
- 압축 가능 : 전부 압축 가능
- Splitable
- HDFS에서 데이터가 저장이 될 때는 데이터 블록 단위로 나뉜다. 그 때 데이터 블락이 partition으로 바로 올라갈 수 있느냐 -> splitable하다.
- 압축된 경우 CSV와 JSON은 splitable하지 않다.
- Human readable : CSV와 JSON의 유일한 장점
- Nested structure support : 자료 구조 안에 자료 구조가 있느냐(CSV 불가능)
- Schema evolution : csv, json 지원 X
- 데이터가 처음에 어떤 컬럼이 없다가 나타나도 CSV, JSON은 문제가 없다
3. Parquet : Spark의 기본 파일 포맷
- 트위터와 클라우데라 공동 개발(Doug Cutting)
- 컬럼 스토리지는 읽기에 최적화가 되어있다.
- 하이브리드 스토리지는 데이터 블락 단위로 하나의 row group을 만든다.
- 같은 row group 안에서는 컬럼 별로 저장이 된다.
- parquet은 row group(블록) 안에서 컬럼 row 스토리지로 저장이 된다.
4. Schema Evolution 소개
- Test1
- partition을 통한 데이터 저장 확인
- Test2
- Parquet 파일 3개로 테스트
- schema1.parquet
- schema2.parquet
- schema3.parquet
- Parquet 파일 3개로 테스트
- 만약 csv나 json 파일이 이렇게 다른 컬럼을 갖는 경우
- spark에서는 에러를 내게 되어있다. 하지만 spark evolution을 지원해주는 binary 파일에서는 로딩할 때 그 필드 값을 null로 채워주는 방식으로 처리한다.
5. Schema Evolution : 파티션된 데이터는 어떻게 저장되는가?
(1) Spark Session Build
from pyspark.sql import *
from pyspark.sql.functions import *
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Spark Schema Evolution Demo") \
.master("local[3]") \
.config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.4.1") \
.getOrCreate()
(2) Read Data
df = spark. read \
.format('csv') \
.load('./data/appl_stock.csv')
(3) Partition1
# partitioin의 숫자 리턴
# PARTITION ID별 COUNT
print("Num Partition before : " + str(df.rdd.getNumPartitions()))
df.groupBy(spark_partition_id()).count().show()
- 기본적으로 Local StandAlone에서 파티션은 1개
- Spark Partition id의 한 값에서는 1763개
(4) Partition2
# 해싱을 통한 repartition
df2 = df.repartition(4)
print("Num Partitions after: " + str(df2.rdd.getNumPartitions()))
df2.groupBy(spark_partition_id()).count().show()
- 파티션 4개로 재조정 후, partition_id별로 데이터는 분산
- 해싱으로 처리된 키 값의 나머지로 분산 시키기 때문에, 데이터양은 거의 균등
(5) Partition3
# coalesce라는 함수는 파티션의 수를 줄이는 함수
# 셔플링을 최소화하는 방향으로 줄인다.
df3 = df2.coalesce(2)
print("Num Partitions after: " + str(df3.rdd.getNumPartitions()))
df3.groupBy(spark_partition_id()).count().show()
- 파티션 2개로 줄인 후, partition_id별로 데이터는 분산
(6) 데이터 저장 및 비교
# df는 파일 하나만 저장(파티션이 한개)
df.write \
.format("avro") \
.mode("overwrite") \
.option("path", "dataOutput/avro/") \
.save()
# df2는 파티션이 4개였고, 파일 4개
df2.write \
.format('parquet') \
.mode('overwrite') \
.option('path', 'dataOutput/parquet/') \
.save()
# df3는 파티션이 2개였고, 파일 두 개
df3.write \
.format('json') \
.mode('overwrite') \
.option('path', 'dataOutput/json/') \
.save()
6. Schema Evolution : read한 데이터를 schema에 따라 합치기
df1 = spark.read. \
parquet("./data/schema1.parquet")
df2 = spark.read. \
parquet("./data/schema2.parquet")
df3 = spark.read. \
parquet("./data/schema3.parquet")
# schema 파일을 동시에 합쳐서 데이터를 합칠 수 있다.
# mergeSchema를 트루로 지정
df = spark.read. \
option("mergeSchema", True). \
parquet("./data/*.parquet")
df.printSchema()
- 동일한 스키마에 대해서는 그대로 붙고
- 스키마가 다를 경우 다른 컬럼 값을 null 처리하고, schema가 붙는다
'데이터 엔지니어링 > Spark' 카테고리의 다른 글
25. Spark 내부 동작(Execution Plan 실습, Spark Web UI) (0) | 2023.08.22 |
---|---|
24. Spark 내부 동작(Execution Plan) (0) | 2023.08.22 |
22. Unit Test (0) | 2023.08.22 |
21. Hive 메타스토어 사용하기 (0) | 2023.08.22 |
20. SPARK SQL(Windowing) (0) | 2023.08.22 |