AIRFLOW
- 에어플로우는 스케줄러, 데이터 파이프라인을 위한 PYTHOON 프레임워크
- PYTHON3에서 프로그래밍 방식으로, 파이프라인 설계, 예약 및 모니터링 가능
- 웹 UI 제공
- 에어플로우에서의 데이터 파이프라인(ETL)을 "DIRECTED ACYLIC GRAPH(방향 비순환 그래프)", DAG라고 합니다.
- DAG는 TASK로 구성됩니다.
- 에어플로우는 하나 이상의 서버로 구성된 클러스터이고, WORKERS와 SCHEDULER로 구성됩니다.
- 스케줄러는 작업을 여러 WORKERS에게 분산 시킵니다.
- DAG 와 스케줄링 정보는 기본적으로 DB에 저장됩니다(SQLITE가 기본적으로 운영)
- 하지만 실제 운영환경에서는 MYSQL 또는 POSTGRESQL을 사용하는 것이 좋습니다(성능, 기능 상의 문제)
- AIRFLOW의 버전 선택에 있어서, 가장 좋은 방법은 큰 회사에서 어떤 버전을 사용하는지를 확인하는게 좋습니다(버그 문제 등)
- https://cloud.google.com/composer/docs/concepts/versioning/composer-versions
- 대부분의 큰 회사들의 경우 100이면 100, AIRFLOW를 사용하고 국내에서 가장 많은 데이터파이프라인을 가지고 있는 회사는 아마 쿠팡일 것입니다.
AIRFLOW의 구성
AIRFLOW COMPONENT
AIRFLOW는 총 5개의 컴포넌트로 구성됩니다
- WEBSERVER(파이썬 플라스크)
- SCHEDULER(CRONJOB의 문법을 그대로 사용합니다)
- WORKER(DAG의 실행 순서를 결정합니다, WORKER는 CPU의 숫자만큼 정해집니다.)
- DATABASE(기본으로 SQLITE)
- QUEUE(멀티 노드 구성인 경우에만 사용)
- 워커의 수보다 데이터파이프라인의 수가 많아질 경우, 워커는 QUEUE에 저장된 데이터파이프라인 태스크를 순서대로 수행합니다.
- 이 경우에는 EXECUTOR가 달라집니다(CELERYEXXCUTER, KUBERNETESEXCUTOR)
AIRFLOW 구조 : 서버 한대일 때
AIRLFOW 구조 : 서버 여러 대 일 때
- 서버가 여러대일 때 메인 노드에서, WORKER들은 분리됩니다.
- 스케줄러에 의해서 QUEUE로 DAG가 저장되고,
- WORKER들은 QUEUE에 저장된 DAG를 보고 작업을 수행합니다.
AIRFLOW의 스케일링 방법
- 스케일업(더 좋은 사양의 서버 사용, CPU 추가, 메모리 추가 등)
유데미의 경우에도 거액 투자 전까지 서버 한 대로 버텼습니다. - 스케일 아웃(서버 추가)
지금은 스케일링을 하는데에 있어서 AWS 환경에서 직접 지원을 해주기 때문에, 금액을 지불하고 AWS의 자동관리 시스템을 사용해도 좋습니다.
AIRFLOW 개발의 장단점
- 장점
- 데이터 파이프라인을 세밀하게 제어 할 수 있다.
- 다양한 데이터 소스와 데이터 웨어하우스를 지원합니다.
- 백필(BACKFILL)이 쉽습니다.
백필의 경우에는 데이터엔지니어들이 가지고 가야할 숙제 같은 것입니다. 항상 데이터파이프라인엔 문제가 생기고, 백필을 쉽게할 수 있는 이 기능만으로도, AIRFLOW는 강력합니다.
- 단점
- 배우기가 쉽지 않습니다.
- 상대적으로 개발환경 구성이 어렵습니다.
- 직접 운영이 쉽지 않고, 클라우드 버전 사용이 선호 됩니다.
- 구글 클라우드는 "CLOUD COMPOSER"를 제공합니다.
- AWS는 "MANAGED WORKFLOWS FOR APACHE AIRFLOW"를 제공합니다.
DAG란 무엇인가?
- DIRECTED ACYCLIC GRAPH의 줄임말입니다.
- AIRFLOW에서 ETL을 부르는 명칭입니다.
- DAG의 구성은 다음과 같습니다.
- DAG는 TASK로 구성됩니다.
- TASK는 AIRFLOW의 오퍼레이터(OPERATOR)로 만들어집니다.
- AIRFLOW에서 이미 다양한 종류의 오퍼레이터를 제공합니다.
- 경우에 맞게 오퍼레이터를 결정하거나 필요하다면 직접 개발합니다.
- 오퍼레이터 예시 : REDSHIFT WRITING, POSTGRES QUERY, S3 READ/WRITE, HIVE QUERY, SPARK JOB, SHELL SCRIPT 등
DAG의 구성 예제
- 모든 TASK에 필요한 기본 정보
이 과정에선 TASK별 필요한 정보를 파이썬의 딕셔너리 형태로 담습니다.
default_args = {
'owner': 'sangwook', # task의 owner
'start_date': datetime(2020, 8, 7, hour=0, minute=00), # task 시작 시간보다 하루 전 단위
'end_date': datetime(2020, 8, 31, hour=23, minute=00),
'email': ['sangwook@naver.com'],
'retries': 1, # task 실패시 재시도 횟수
'retry_delay': timedelta(minutes=3), #task 실패시 대기 시간
}
test_dag = DAG(
"dag_v1", # DAG 이름
schedule="0 9 * * *", # 크론잡 문법의 스케줄 작성
tags=['test'] # 태그명
default_args=default_args # 위에서 작성한 필요 정보
)
에어플로우는 위 인자값들을 바탕으로 Operator를 동작시키고, 각 operator는 태스크가 됩니다. operator의 순서를 주는 방식은 다음과 같습니다.
- t1 >> [t2,t3] : t1 태스크를 진행 후에, t2, t3를 동시에 진행시켜라
- t1 >> t2 >> t3 : t1 태스크 후에, t2 태스크를 진행하고, t3 태스크를 진행시켜라
아래는 예제입니다.
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=test_dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=test_dag)
t3 = BashOperator(
task_id='ls',
bash_command='ls /tmp',
dag=test_dag)
t1 >> [t2, t3]
DummyOperator를 통하는 방식도 있습니다.
start = DummyOperator(dag=dag, task_id="start", *args, **kwargs)
t1 = BashOperator(
task_id='ls1',
bash_command='ls /tmp/downloaded',
retries=3,
dag=dag)
t2 = BashOperator(
task_id='ls2',
bash_command='ls /tmp/downloaded',
dag=dag)
end = DummyOperator(dag=dag, task_id='end', *args, **kwargs)
start >> [t1, t2] >> end
데이터 파이프라인을 만들때 고려사항
- 데이터 파이프라인의 실패
- 버그
- 데이터 소스상의 이슈
- 데이터 파이프라인 간 의존도 이해 부족
파이프라인이 수십개가 넘어가는 순간부터, 관리가 어려워지고 유지보수비용이 기하급수적으로 늘어납니다. 이를 위해 데이터파이프라인을 체계적으로 관리해야하는데, 보통 데이터 카탈로그를 통해 관리합니다.
모범사례 1 : FULL REFRESH
- 데이터가 작을 경우 가능하면 매번 통채로 복사해서 테이블을 만드는 것이 좋습니다. (FullRefresh)
- Incremental update만이 가능하다면, 대상 데이터 소스가 갖춰야할 몇가지 조건이 있습니다.
- 데이터 소스가 프로덕션 데이터베이스 테이블이라면 다음 필드가 필요합니다.
- created(데이터 업데이트 관점에서는 필요하지 않습니다)
- modified
- deleted
- 데이터 소스가 프로덕션 데이터베이스 테이블이라면 다음 필드가 필요합니다.
- 데이터 소스가 API라면 특정 날짜를 기준으로 새로 생성되거나 업데이트된 레코드를 읽어올 수 있어야합니다.
모범사례 2 : 멱등성
- 멱등성(IDEMPOTENCY)을 보장해야합니다.
- 멱등성이란 동일한 입력데이터로 데이터파이프라인을 여러번 실행해도, 최종 테이블의 내용이 달라지지 않아야한다는 것입니다.
- 중복 데이터에 대한 통제를 통해 멱등성을 보장하는 방법이 있습니다.
모범사례 3 : BACKFILL
- 실패한 데이터 파이프라인 재실행이 쉬워야합니다.
- 과거 데이터를 다시 채우는 과정(BACKFILL)이 쉬워야합니다.
- AIRFLOW는 특히 이 BACKFILL에 강점을 가지고 있습니다.
- DAG의 CATCHUP 파라미터가 TRUE가 되어야하고, START_DATE, END_DATE를 적절히 설정해야합니다.
여기서 CATCHUP 파라미터를 TRUE로 하면, START_DATE로부터 데이터파이프라인이 실행되지 않은 모든 시간 대의 데이터 파이프라인을 재실행합니다. - 대상 테이블이 INCREMENTAL UPDATE가 되는 경우에만 의미가 있음
- EXECUTION_DATE 파라미터를 사용해서 업데이트 되는 날짜 혹은 시간을 알아내게 코드를 작성해야합니다.
- 현재 시간을 기준으로 업데이트 대상을 선택하는 것은 안티 패턴입니다.
- DAG의 CATCHUP 파라미터가 TRUE가 되어야하고, START_DATE, END_DATE를 적절히 설정해야합니다.
모범사례 4 : 데이터 디스커버리
- 데이터 파이프라인의 입력과 출력을 명확히하고 문서화합니다(데이터 디스커버리 문제)
- 주기적으로 쓸모없는 데이터 삭제
- 사용되지 않는 테이블이나 데이터파이프라인은 제거합니다.
- 필요한 데이터만 데이터웨어하우스에 보관하고, 과거 데이터는 데이터레이크 혹은 스토리지로 이동시킵니다.
모범사례 5:
- 데이터 파이프라인 사고 발생마다 사고 레포트(POST-MORTEM) 작성
- 중요한 데이터 파이프라인 입출력 체크
- 간단히 입력 레코드 수와 출력 레코드 수를 몇개인지 체크하는 것부터 시작합니다
- SUMMARY 테이블을 만들고, PRIMARY KEY가 존재한다면 PRIMARY KEY UNIQUENESS가 보장되는지 체크하는 것이 필요합니다.
BACK FILL
AIRFLOW를 사용하는 이유 중 하나는 BACKFILL입니다. 하지만 INCREMENTAL UPDATE에서 START_DATE의 개념과 CATCHUP을 통해서 실수를 저지를 수 있기 때문에 이 부분은 세밀히 알고 넘어가야합니다.
INCREMENTAL UPDATE / START_DATE
매일 업데이트 되어야하는 데이터는, 2020년 11월 8일에 2020년 11월 7일까지의 데이터를 업데이트합니다. AIRFLOW의 START_DATE는 이 방식을 그대로 적용합니다. AIRFLOW의 START_DATE는 시작 날짜가 아니라, 처음 읽어와야하는 데이터의 날짜를 의미합니다.
- AIRFLOW의 접근 방식
- 모든 DAG 실행에는 execution_date가 지정되어 있습니다.
- execution_date로 채워야하는 날짜와 시간이 넘어오고,
- 이를 바탕으로 데이터를 갱신하도록 코드를 작성하면
- backfill이 쉬워집니다.
start_date와 execution_date 이해하기
- 2020-08-10 02:00:00이 start_date로 설정된 daily job이 있습니다(catchup이 True로 설정되어있다고 가정)
- 지금 시간이 2020-08-13 20:00:00이고 처음으로 이 job이 활성화되었다.
- 이 경우 이 job은 몇번 실행될까?
총 3번 실행됩니다. start_date는 그 다음날 혹은 그 다음 시간 단위부터 작업을 수행하기 때문이며, catchup이 True일 때는 start_date로 부터 실행되지 않은 모든 job을 실행합니다. 따라서 3번입니다.
'데이터 엔지니어링 > 실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트' 카테고리의 다른 글
[4주차] AIRFLOW ubuntu에 설치하기 (0) | 2023.04.30 |
---|---|
[4주차] 트랜잭션 (0) | 2023.04.16 |
[3주차] ETL/데이터파이프라인 (0) | 2023.04.08 |
SQL 윈도우 함수 요약, 예제 (0) | 2023.04.01 |
SQL 꿀팁(IS TRUE, NULLIF, COALESCE) (0) | 2023.04.01 |