SPARK
버클리 대학의 AMPLab에서 아파치 오픈소스 프로젝트로 2013년 시작한 프레임워크입니다. 하둡의 뒤를 잇는 2세대 빅데이터 기술로, Yarn(Yarn은 Hadoop 2.0, 3.0을 말한다고 보시면 됩니다.)을 분산환경으로 사용하고 Scala로 작성되었습니다. 일단 Spark의 기본적인 개념은 분산, 병렬 처리입니다. 데이터를 나누고 이 각각, Block(Partition)을 따로 가공 및 정제 후에 합치는 방식입니다.
Spark은 하둡의 Mapreduce 방식과 다르게 디스크에 저장하는 방식이 아닌, 메모리에 저장함으로써 속도를 월등히 높였습니다. 특히 MapReduce는 프로그래밍에서 사용할 수 있는 Operator가 딱 두 개였고, SQL로 치면 GROUP BY, CASE WHEN 정도만 가지고 있었습니다. 이 과정에서 생산성에 큰 문제가 있었습니다.
이를 해결하기 위해서 하둡의 맵리듀스 기반 프레임워크가 나오기 시작했는데, MAP REDUCE를 SQL 방식으로 사용하기 위한 인터프리터였습니다. 대표적으로는 HIVE, PRESTO가 있습니다. Hive는 최근 인기가 굉장히 떨어지고 있는 추세고, PRESTO는 비교적 많이 사용되고 있습니다. AWS의 ATHENA가 이 PRESTO를 사용한 것입니다.
MAP REDUCE 방식과 SPARK의 차이는 다음과 같습니다.
- SPARK은 기본적으로 메모리 기반입니다.
- 메모리가 부족해지면 그 때 디스크를 사용하기 때문에 속도가 상당히 빠릅니다.
- 반면 MAPREDUCE는 애초부터 디스크 기반입니다.
- 또한 MAPREDUCE는 하둡(YARN) 위에서만 동작하지만, SPARK은 하둡 외에도 K8S, MESOS 같은 환경에서도 동작합니다.
- MAPREDUCE는 키와 밸류 기반 데이터 구조만 지원합니다.
- 하지만 SPARK은 판다스 데이터프레임과 개념적으로 동일한 데이터 구조를 지원합니다.
- SPARK은 배치 데이터처리, 스트림 데이터 처리, SQL, 머신러닝, 그래프 분석으로 다양한 컴퓨팅을 지원합니다.
SPARK의 활용
구조화된 빅데이터를 다룰 때는 보통 SPARK SQL을 사용하는 것이 좋습니다. SPARK이 데이터프레임을 지원하긴하지만, 다른 사람들과의 협업과 한눈에 데이터를 처리하는 과정을 볼 수 있다는 점에서 SPARK SQL을 사용하는 것이 좋습니다. 하지만, ML 피처 엔지니어링이나, SPARK ML, 그리고 SQL만으로 하기 어려운 작업을 할 때는 SPARK 데이터프레임을 사용합니다.
SPARK 데이터 시스템은 이렇게 사용하고 있습니다.
(1) 대용량 비구조화된 데이터 처리(HIVE의 대체 기술)(ELT 혹은 ETL)
로그 데이터(대용량 비구조화된 데이터)를 데이터레이크에 적재 한 뒤에, SPARK을 통해서 가공하고, 정제합니다. 이 SPARK으로 처리된 데이터는 데이터웨어하우스나, 데이터레이크에 저장됩니다.
(2) ML 모델에 사용되는 대용량 피쳐 처리(배치/스트림) & SPARK ML
데이터레이크, 데이터웨어하우스, 혹은 실시간 분석처리시스템에서 데이터를 SPARK로 처리해서 CASSANDRA나 HBASE 같은 NOSQL에 적재합니다. 이 데이터는 ML 모델의 INPUT DATA로 사용됩니다.
SPARK 시스템 아키텍처
- DRIVER는 실행되는 코드의 마스터 역할을 수행합니다.(YARN의 APPLICATION MASTER)
- EXECUTOR는 실제 태스크를 실행해주는 역할을 수행합니다.(YARN의 컨테이너)
먼저 하둡 맵의 데이터 처리 단위는 디스크에 있는 데이터 블록 (128MB)입니다. 이 블록의 단위는 hdfs-site.xml에 있는 dfs.block.size 프로퍼티가 결정합니다. 그런데 SPARK에서는 이를 Spark에서는 이를 파티션 (Partition)이라 부릅니다. 파티션의 기본크기도 128MB로 같습니다. spark.sql.files.maxPartitionBytes 옵션은 HDFS등에 있는 파일을 읽어올 때만 적용됩니다. 이렇게 각각 블록과 파티션은 이런 역할을 수행합니다.
- 맵리듀스에서 N개의 데이터 블록으로 구성된 파일 처리시 N개의 Map 태스크가 실행
- Spark에서는 파티션 단위로 메모리로 로드되어 Executor가 배정됩니다. Executor는 각 서버라고 보면 되고, 각 Executor의 CPU 수는 파티션의 수에 영향을 미칩니다.
만약 Spark Cluster가 한 서버에서 돌아가고 있고, 서버의 CPU 가 2대라면 위의 4개의 태스크는 (P1,P2), (P3,P4) 이렇게 두 번 돌아가는게 적절한 파티션의 수입니다.
SPARK의 단점
SPARK은 하둡이 가지고 있는 문제점을 그대로 가지고 있습니다. 데이터를 분산 처리할 때, 각 BLOCK(PARTITION) 간의 불균형이 일어나면 문제가 생깁니다.즉 JOIN 혹은 GROUP BY 등의 연산을 통해서, 불균형이 생기게 되고 이 과정에서 데이터를 다른 노드로 옮기는 과정이 생깁니다. 이 과정에서의 지연, 그리고 데이터의 불균형으로 인한 Bottle Lack(병목) 현상이 생기게 됩니다.
Groupby를 통해서 데이터가 다른 파티션에 지정되면, 데이터가 다른 파티션으로 이동해야하기 때문에 이 과정에서 네트워크를 타고 이동합니다. 이 과정을 셔플링이라고 합니다. 이 셔플링 과정에서는 데이터가 한 파티션에 쏠리는 현상이 발생하는데, 이를 data Skew라고 합니다. 데이터가 한 파티션에 쏠리게 되면, 한 파티션이 감당해야하는 데이터 크기가 커지고, 작업에 있어서 Bottle Lack이 발생하며, 심하게는 한 파티션의 뻑나는 상황이 발생하는 것입니다.
이 과정을 해결하기 위해서 OPTIMIZING을 위한 작업을 주로 하는데, PARTITION 최적화를 하는 것이 도움이 되며, 또한 SPARK SQL을 사용하는 것이 OPTIMIZING에서도 유리합니다.
SPARK SQL 사용해보기
(1) SPARK SESSION 생성
SPARK는 프로그램을 하나 만들어서, SPARK CLUSTER와 통신합니다. 이는 SINGLETON 객체입니다.
from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf() # 세션 오브젝트
conf.set("spark.app.name", "PySpark DataFrame #1") # 앱 네임 지정
conf.set("spark.master", "local[*]") # *를 사용하면 sparkcluster에 있는 모든 cpu를 다 쓰겠다, 숫자를 넣으면 그만큼 CPU 사용
spark = SparkSession.builder\
.config(conf=conf)\
.getOrCreate()
(2) CSV 혹은 PARQUET 파일 임포트
df = spark.read.format("csv")\
.load("1800.csv")\
.toDF("stationID", "date", "measure_type", "temperature", "_c4", "_c5", "_c6", "_c7")
(3) SPARK SQL 사용
df.createOrReplaceTempView("station1800") # df라는 데이터프레임을 station1800이라는 sql 테이블처럼 활용
results = spark.sql("""SELECT stationID, MIN(temperature)
FROM station1800
WHERE measure_type = 'TMIN'
GROUP BY 1""") # spark sql
results.show()
SPARK DATAFRAME 플로우
- (1) SPARK 세션 생성
- (2) 입력 데이터 로딩
- (3) 데이터 조작 작업(판다스와 매우 흡사)
- (4) 계속 가공 후, 저장
오늘은 SPARK에 대해서 알아봤고, 다음 번에는 SPARK로 직접 데이터를 가공해보면서 감을 익히도록 하겠습니다. 강사님께서는 SPARK SQL을 주로 쓴다고 하셨고, 옵티마이징 면이나 협업면이나 모든 면에서 유리하다고 하셨습니다. 또 웬만해서는 데이터가 메모리에 올라가는 경우에는 PANDAS를 활용하는게 좋다고 합니다.(+ PANDAS SQL) 빅데이터 분산 처리 환경에서 SPARK를 활용하는 것이 좋을 것 같습니다. 이번 포스팅은 여기까지고, 실리콘밸리에서 날아온 데이터엔지니어링 키트에 대한 수강은 전부 완료됐습니다. 다음에 후기글로 찾아뵙겠습니다. 감사합니다:)
'데이터 엔지니어링 > 실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트' 카테고리의 다른 글
[6주차] Dag Dependencies(Explicit Trigger, Reactive trigger, BranchPythonOperator) (1) | 2023.05.07 |
---|---|
[6주차] API & Airflow 모니터링 (0) | 2023.05.07 |
[6주차] AIRFLOW SLACK 연동(파이프라인 에러메시지 받기) (0) | 2023.05.07 |
[6주차] AIRFLOW 주요 고려사항 정리 (0) | 2023.05.07 |
[6주차] DOCKER & K8S & DBT (0) | 2023.05.07 |