Dag간 실행순서를 정하기 위해서는 Dag Dependencies를 활용하면 됩니다. 즉 어떤 Dag가 실행되면, 따라오는 Dag가 있게끔 설정하는 것이 가능합니다. 보통 TriggerDagOperator를 쓰는데, ExternalTaskSensor는 성능 상의 이유로 잘 쓰지 않습니다. 이유는 후술하겠습니다!
Explicit Trigger
이 Trigger 방법은 한 Dag는 다음 Dag를 트리거 시킵니다. 한 방법으로, DAG A의 태스크를 TriggerDagRunOperator로 구현하는 방법입니다. 트리거 A는 다음과 같이 구현하면 됩니다. 이때 유의할 점은 반드시 airflow.cfg dag_run_conf_overrides_params가 True로 설정 되어 있어야합니다.
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
trigger_B = TriggerDagRunOperator(
task_id="trigger_B",
trigger_dag_id="트리커하려는DAG이름",
# DAG B에 넘기고 싶은 정보. DAG B에서는 Jinja 템플릿(dag_run.conf["path"])으로 접근 가능.
# DAG B PythonOperator(**context)에서라면 kwargs['dag_run'].conf.get('conf')
conf={ 'path': '/opt/ml/conf' },
# Jinja 템플릿을 통해 DAG A의 execution_date을 패스
execution_date="{{ ds }}",
reset_dag_run=True, # True일 경우 해당 날짜가 이미 실행되었더라는 다시 재실행
wait_for_downstream=True # DAG B가 끝날 때까지 기다릴지 여부를 결정. 디폴트값은 False
)
Reactive Trigger
앞서와는 반대로 DAG B의 ExternalTaskSensor 태스크가 DAG A의 특정 태스크가 끝났는지 "지속적으로" 체크합니다.
- 먼저 동일한 schedule_interval(schedule)을 사용하며,
- 이 경우 두 태스크들의 Execution Date이 동일해야합니다. 아니면 매칭이 불가능합니다.
from airflow.sensors.external_task import ExternalTaskSensor
waiting_for_end_of_dag_a = ExternalTaskSensor(
task_id = 'waiting_for_end_of_dag_a',
external_dag_id = 'DAG이름',
external_task_id = 'end',
timeout = 5*60,
mode = 'reschedule',
execution_delta = timedelta(minutes=5)
)
만일 DAG A와 DAG B가 서로 다른 schedule interval을 갖는다면, execution_delta 혹은 execution_date_fn을 사용해서 두 스케쥴을 맞출 수 있습니다. 또한 두 개의 DAG가 서로다른 frequency를 갖고 잇다면 이 경우엔 ExternalTaskSensor는 사용불가합니다.
ExternalTaskSensor를 잘 사용하지 않는 이유
- ExternalTaskSensor는 양쪽의 schedule interval을 맞추기가 상당히 난해합니다.
- 센서는 항상 떠있으면서 콜링을 하는데, cpu를 하나를 지속적으로 쓰기 때문에, 리소스를 낭비시킵니다
- 이러한 이유 때문에 보통 DAG간의 의존성을 만들 때는, Explicit Trigger 방법을 주로 사용합니다.
BranchPythonOperator
해당 오퍼레이터는 상황에 따라 뒤에 실행되어야할 태스크를 동적으로 결정해주는 오퍼레이터입니다. TriggerDag Operator 앞에 이 오퍼레이터를 사용하는 경우도 있습니다.
# 상황에 따라 뒤에 실행되어야 하는 태스크를 리턴
def skip_or_cont_trigger():
if Variable.get("mode", "dev") == "dev":
return []
else:
return ["trigger_b"]
# "mode"라는 Variable의 값이 "dev"이면 trigger_b 태스크를 스킵
branching = BranchPythonOperator(
task_id='branching',
python_callable=skip_or_cont_trigger,
)
이 operator는 주로 개발 환경일 때와 테스트 환경일 때 실행을 약간 다르게 하고 싶을 때 자주 사용합니다.
예제 - Explicit Trigger
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.models import Variable
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
import json
def print_hello():
print("hello!")
return "hello!"
dag = DAG(
dag_id = 'nps_trigger',
start_date = datetime(2023,4,20), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 9 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
# default_args = {
# 'retries': 1,
# 'retry_delay': timedelta(minutes=3),
# }
)
hi = PythonOperator(
task_id = 'hi',
python_callable = print_hello,
dag = dag
)
nps_trigger = TriggerDagRunOperator(
task_id="execsql",
trigger_dag_id="Build_Summary_nps",
wait_for_completion=True,
dag = dag
)
hi >> nps_trigger
해당 코드를 보시면 알겠지만, 결국 Operator를 따로 사용해서, 마지막에 Trigger 하는 방식으로 구현 됩니다. 트리거는 완료 된 이후에 지정된 dag를 Run 시키고, wait_for_completion이 True일 때, 트리거 시킨 Dag가 끝날 때까지를 Dag의 running 종료 시점으로 잡습니다.
'데이터 엔지니어링 > 실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트' 카테고리의 다른 글
[7주차] SPARK 기본 개념 및 PANDAS와의 비교 (1) | 2023.05.12 |
---|---|
[6주차] API & Airflow 모니터링 (0) | 2023.05.07 |
[6주차] AIRFLOW SLACK 연동(파이프라인 에러메시지 받기) (0) | 2023.05.07 |
[6주차] AIRFLOW 주요 고려사항 정리 (0) | 2023.05.07 |
[6주차] DOCKER & K8S & DBT (0) | 2023.05.07 |