슬랙 메시지를 활용해서 데이터파이프라인을 효율적으로 관리할 수 있습니다. 보통 현업에 계신 분들은 데이터파이프라인 에러를 슬랙으로 받고, 해당 메시지를 받으면 데이터 파이프라인을 backfill 혹은 복구하는 작업을 합니다. 이번엔 airflow와 슬랙을 연동해서 에러메세지를 받아보는 작업을 해보겠습니다.
1.Incoming Webhooks App 생성하기
https://api.slack.com/messaging/webhooks
해당 링크를 통해서 webhooks app을 를 생성합니다.
채널명은 미리 생성해둔 채널로 해두시고, 마지막에 webhook URL을 카피한 뒤, "https://hooks.slack.com/services/~" 뒤에 있는 코드들을 가져옵니다. 예시로는 T04FA~~~~/BE~~~~/BM~~~ 이런 식입니다. 채널명은 마음대로 하셔도 됩니다.
2. AIRFLOW VARIABLE 등록하기
웹 UI를 통해서 VARIABLE을 등록합니다.
3. slack.py 파일 생성하기
from airflow.models import Variable
import logging
import requests
def on_failure_callback(context):
"""
https://airflow.apache.org/_modules/airflow/operators/slack_operator.html
Define the callback to post on Slack if a failure is detected in the Workflow
:return: operator.execute
"""
text = str(context['task_instance'])
text += "```" + str(context.get('exception')) +"```"
send_message_to_a_slack_channel(text, ":scream:")
# def send_message_to_a_slack_channel(message, emoji, channel, access_token):
def send_message_to_a_slack_channel(message, emoji):
# url = "https://slack.com/api/chat.postMessage"
url = "https://hooks.slack.com/services/"+Variable.get("slack_url")
headers = {
'content-type': 'application/json',
}
data = { "username": "Data GOD", "text": message, "icon_emoji": emoji }
r = requests.post(url, json=data, headers=headers)
return r
보시면 slack_url Variable을 가져와서 url을 통해서 전달하는 코드입니다. 이 파일을 /var/lib/airflow/dags/plugins 경로에 넣어두시고, DAG 중 하나를 골라서 일부로 에러를 내보겠습니다.
4. DAG에 on_failer_callback 파라미터 쓰기
from plugins import slack
dag_second_assignment = DAG(
dag_id = 'NameGenderCSVtoRedshift_v4',
start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
'on_failure_callback': slack.on_failure_callback, # 실패시 SLACK 함수 요청
}
)
기존 DAG에 파라미터 on_failure_callback에 아까 만들었던 slack.py의 on_failur_callback 함수를 작성해둡니다. 저는 해당 DAG에 리스트에 숫자를 더하는 식으로 의도적으로 오류를 냈고, 웹 UI를 통한 MANUAL TRIGGER를 통해서 에러메세지를 테스트해봤습니다. 일단은 retiries와 retries_delay를 설정해뒀습니다.
5. 에러메세지 확인
슬랙에 메세지가 잘 온걸 확인할 수 있습니다. 기다려보면서 본건데, 일단 retry가 발생했고, retry 딜레이가 모두 끝나면 그 때 fail 메시지를 발송합니다. 그리고 저는 retry가 1회였기 때문에 총 2회의 fail 메시지가 온걸 확인할 수 있습니다. 또한 아까 말씀드렸듯이 리스트에 숫자를 더했기 때문에 해당 오류메세지가 잘 온것을 확인할 수 있습니다.
'데이터 엔지니어링 > 실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트' 카테고리의 다른 글
[6주차] Dag Dependencies(Explicit Trigger, Reactive trigger, BranchPythonOperator) (1) | 2023.05.07 |
---|---|
[6주차] API & Airflow 모니터링 (0) | 2023.05.07 |
[6주차] AIRFLOW 주요 고려사항 정리 (0) | 2023.05.07 |
[6주차] DOCKER & K8S & DBT (0) | 2023.05.07 |
[5주차] Airflow + Redshift로 ELT 구현하기 (0) | 2023.05.07 |