1. Spark 데이터 시스템 아키텍처
- adhoc 형태의 인터랙티브 쿼리를 날리거나 할 때, presto나 hive를 써도 큰 상관이 없다.
- 다만 Spark으로 옮겨가는 이유는 하나의 시스템으로 다양한 기능을 할 수 있기 때문이다.
- 외부 데이터(RDB, NOSQL)는 로딩해서 프로세싱해서 데이터엔지니어들이 HDFS로 주기적으로 읽어오게 한다.
- 보통 ETL JOB 스케줄을 위해서는 AIRFLOW를 사용한다.
- 다른 방법으로는 SPARK에서 로딩을 해서 보내는 거다.
- SPARK STREAMING
- 배치로 SPARK SQL 활용
- LOAD 하는 곳은 NOSQL, RDB, 데이터 레이크가 될 수도 있다.
- DATA STRUCTURE들이 나뉘어서 SPARK에 LOAD 된다.
- 데이터 병렬처리가 가능하려면?
- 데이터가 먼저 분산되어야함
- 하둡 맵의 데이터 처리 단위는 디스크에 있는 데이터 블록(128MB)
- hdfs-site.xml에 있는 dfs.block.size 프로퍼티가 결정
- Spark에서는 이를 파티션(Partition)이라 부름. 파티션의 기본 크기도 128MB
- spark.sql.files.maxPartitionBytes : HDFS 등에 있는 파일을 읽어올 때만 적용됨
- 하둡 맵의 데이터 처리 단위는 디스크에 있는 데이터 블록(128MB)
- 다음으로 나눠진 데이터를 각각 따로 동시 처리
- 맵리듀스에서는 N개의 데이터 블록으로 구성된 파일 처리시 N개의 Map 태스크가 실행
- Spark에서는 파티션 단위로 메모리가 로드 되어 Executor가 배정됨
- 데이터가 먼저 분산되어야함
- 처리 데이터를 나누기 -> 파티션 -> 병렬처리
- JDBC 소스는 기본적으로 파라미터를 설정하지 않으면 하나의 파티션만 만드는데,
- HDFS에서 가져올 경우 알아서 파티셔닝 해준다. 대부분은 HDFS에 저장하고, HDFS에서 SPARK로 읽어들이는게 가장 좋다.
- 4개의 태스크가 실행되어야한다면 EXECUTOR1에 EXECUTOR2에 P1, P2를 배정하고, 그 다음에 P3, P4를 배정한다.
- 병렬성의 최대화하려면 PARTITION의 수를 EXECUTOR의 수, 그리고 EXECUTOR의 CPU 수로 해주면, 병렬성을 최대화 해줄 수 있다. 항상 가능한 건 아니지만, 동시에 모든 걸 처리하니까 처리 속도가 빨라진다.
- Spark 데이터 처리 흐름
- 데이터프레임은 작은 파티션들로 구성됨
- 데이터프레임은 한번 만들어지면 수정 불가(Immutable)
- 입력 데이터프레임을 원하는 결과 도출까지 다른 데이터프레임으로 계속 변환
- sort, group by, filter, map, join, ...
- 데이터프레임은 작은 파티션들로 구성됨
- 셔플링 : 파티션 간에 데이터 이동이 필요한 경우 발생
- 셔플링이 발생하는 경우는?
- 개발자가 명시적으로 파티션을 새롭게 하는 경우 (예 : 파티션 수를 줄이기)
- 시스템에 의해 이뤄지는 셔플링
- 예를 들면 그룹핑 등의 aggregation이나 sorting
- 셔플링이 발생할 때 네트워크를 타고 데이터가 이동하게 됨
- 몇 개의 파티션이 결과로 만들어질까?
- spark.sql.shuffle.partitions이 결정
- 기본값은 200이며 이는 최대 파티션수(무조건 200이라고 할 수 없음)
- 오퍼레이션에 따라 파티션 수가 결정됨
- random. hashing partition, range partition 등
- sorting의 경우 range partition을 사용함
- spark.sql.shuffle.partitions이 결정
- 또한 이 때 Data skew 발생 가능!
- 몇 개의 파티션이 결과로 만들어질까?
- 셔플링이 발생하는 경우는?
- 셔플링 : hashing partition
- Aggregation 오퍼레이션
- 같은 값을 같는 레코드들을 모으고 싶으면 hashing Partion을 하는데, 필드의 값을 Hashing Function으로 넘기고, 그 값을 만들어지는 Partion의 수로 나눠서 나머지를 가지고 어느 Partion으로 보낼지 결정된다.
- ex ) Tom / John <- Hashing Function 나머지 0, 1로 나뉨
- 나머지에 따라 Partion 정해서 들어감
- Data Skewness
- Data partitioning은 데이터 처리에 병렬성은 주지만 단점도 존재
- 이는 데이터가 균등하게 분포하지 않는 경우
- 주로 데이터 셔플링 후에 발생
- 셔플링을 최소화하는 것이 중요하고 파티션 최적화를 하는 것이 중요
- 이는 데이터가 균등하게 분포하지 않는 경우
- Data partitioning은 데이터 처리에 병렬성은 주지만 단점도 존재
2. Spark 데이터 구조 : RDD, DataFrame, Dataset
- Spark 데이터 구조
- RDD, DataFrame, Dataset (Immutable Distributed Data)
- 2016년에 DataFrame과 Dataset은 하나의 API로 통합됨
- 모두 파티션으로 나뉘어 Spark에서 처리됨
- RDD는 할 수 있는건 많지만, 생산성이 떨어져 대부분의 파이썬 사용자의 경우 DataFrame 사용
- Scala, Java 코딩은 Dataset, Dataset의 특수한 형태가 DataFrame이라고 보면 된다.
- RDD, DataFrame, Dataset (Immutable Distributed Data)
- RDD
- 로우레벨 데이터로 클러스터 내에 서버에 분산된 데이터를 지칭
- 레코드별로 존재하지만 스키마가 존재하지 않음
- 구조화된 데이터나 비구조화된 데이터 모두 지원
- DataFrame과 Dataset
- RDD 위에 만들어지는 RDD와는 달리 필드 정보를 갖고 있음(테이블)
- Dataset은 타입 정보가 존재하며 컴파일 언어에서 사용 가능
- 컴파일 언어 : Scala/Java에서 사용 가능
- PySpark에서는 DataFrame을 사용
Spark DataFrame 동작구조
- Code Analysis
- Logical Optimization
- 코드를 실행할 수 있는 여러가지 방안을 만들고, 방안마다의 비용을 계산
- 그 중 가장 값싼 비용의 플랜을 결정 후 RDD OPERATION 코드로 바뀜
- CODE GENERATION
- JAVA 코드로 변환, TUNGSTEN 기술, MPV에 쓰이는 최적화 방법을 활용하여 코드 최적화
- RDD로 코딩할 경우, 위에처럼 Optimization의 적용을 받을 수 없음
- Spark 데이터 구조 - RDD
- 변경이 불가능한 분산 저장된 데이터
- RDD는 다수의 파티션으로 구성
- 로우레벨의 함수형 변환 지원 (map, filter, flatMap 등등)
- 일반 파이썬 데이터는 paralleize 함수로 RDD로 변환
- 반대는 collect로 파이썬 데이터로 변환 가능
- 변경이 불가능한 분산 저장된 데이터
- Spark 데이터 구조 - 데이터 프레임
- 변경이 불가능한 분산 저장된 데이터
- rdd와는 다르게 관계형 데이터베이스 테이블처럼 컬럼으로 나눠 저장
- 판다스의 데이터 프레임 혹은 관계형 데이터베이스의 테이블과 거의 흡사
- 다양한 데이터소스 지원 : HDFS, Hive, 외부 데이터베이스, RDD 등
- 스칼라, 자바, 파이썬과 같은 언어에서 지원
'데이터 엔지니어링 > Spark' 카테고리의 다른 글
6. Spark(개발환경 옵션, Local Standalone, 활용 Demo) (0) | 2023.07.28 |
---|---|
5. Spark 프로그램 구조(Spark Session 생성, 환경변수) (1) | 2023.07.11 |
3. 빅데이터 처리와 Spark 소개(Spark 소개, Spark 프로그램 실행 옵션) (0) | 2023.07.10 |
2. 빅데이터 처리와 Spark 소개(맵리듀스 프로그래밍) (0) | 2023.07.10 |
1. 빅데이터 처리와 Spark 소개(빅데이터 정의, 하둡 이론 등) (2) | 2023.07.10 |