이번 포스팅은 AIRFLOW의 FULL REPRESH를 연습해보기 위한 예제로 OPENWEATHER API를 활용합니다!
https://openweathermap.org/api/one-call-api
One Call API: weather data for any geographical coordinate - OpenWeatherMap
Make just one API call and get all your essential weather data for a specific location with our new OpenWeather One Call API 2.5. Easy migration from the Dark Sky API. The One Call API 2.5 provides the following weather data for any geographical coordinate
openweathermap.org
순서는 다음과 같습니다.
- OPENWEATHER API를 활용한 EXTRACT
- JSON DATA TRANSFORM, LOAD
- DAG 구성 및 스케줄링
1. EXTRACT
def extract(**context):
# Variables 활용
api_key = Variable.get("open_weather_api_key")
lat = 37.5665
lon = 126.9780
# https://openweathermap.org/api/one-call-api
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
response = requests.get(url)
data = json.loads(response.text)
logging.info('############ extract success')
return data
서울 지역의 8일간의 날씨를 가져옵니다. 맨 앞의 코드에서 API_KEY는 웹 ui의 variable을 활용했습니다. 전 포스팅에서도 언급한 것과 같이, 앱 ui에 varialbe을 등록하면 이는 airflow의 metadata DB에 등록됩니다.
API 키의 경우에는 자동으로 암호화 되어 표시됩니다. 해당 텍스트를 가져와서 api를 호출해서 8일간의 날씨를 가져옵니다.
2. TRANSFORM_LOAD
def transform_load(**context):
schema = context["params"]["schema"]
table = context["params"]["table"]
cur = get_Redshift_connection()
data = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
sql = "BEGIN; DELETE FROM {schema}.{table};".format(schema=schema, table=table)
for d in data['daily']:
if d != "":
sql += f"""INSERT INTO {schema}.{table} VALUES ('{datetime.fromtimestamp(d['dt']).strftime('%Y-%m-%d')}', '{d['temp']['day']}', '{d['temp']['min']}', '{d['temp']['max']}');"""
sql += "END;"
logging.info(sql)
cur.execute(sql)
logging.info('############ transform_load success')
SCHEMA와 TABLE의 경우엔, PythonOperator에 적어둔 params를 참조합니다. 또한 extract에서 return된 데이터는 transform_load 함수에서 xcom_pull을 통해 가져옵니다. 이는 데이터가 작기 때문에 사용하는 방식이며, 반드시 데이터가 커졌을 때는 s3 우회 방식 등 다른 방식을 사용해야합니다!!
가져온 데이터를 통해서 sql 문을 작성합니다. 여기선 Begin을 사용했지만, autocommit=False일 때는 Begin을 사용할 필요가 없습니다! begin을 써봤자 autocommit이 False여서 자동으로 커밋되지 않기 때문입니다. 문자열로 데이터를 반복문을 통해 집어넣고, sql을 한꺼번에 실행시키는 방법입니다.
또한 트랜잭션의 맨 앞에 Delete를 통해 테이블의 모든 내용을 삭제하고, insert 합니다. 이는 full refresh를 구현하는 방법입니다!
마지막엔 cur.execute를 통해 마무리합니다.
3. DAGS
import pendulum
# timezone 한국시간으로 변경
local_tz = pendulum.timezone("Asia/Seoul")
weather_dag = DAG(
dag_id = 'OpenWeatherDag',
start_date = datetime(2023,4,6, tzinfo = local_tz), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 6 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
# default_args = {
# 'retries': 1,
# 'retry_delay': timedelta(minutes=1),
# # 'on_failure_callback': slack.on_failure_callback,
# }
)
한국시간으로 매일 06시에 실행되는 DAG입니다. max_active_runs는 한번에 동시에 돌수 있는 TASK 수인데, FULL REPRESH를 하고싶을 땐 1로 작성합니다. 태스크가 여러 개가 한번에 돌아봐야 아무 의미가 없기 때문입니다. catchup은 전 포스팅에도 언급했듯, 사고를 방지하고자 False를 받습니다.
아래에 있는 default_args는 주석으로 처리해놓았는데, retries는 dag가 실패했을 때 재시도 횟수입니다. retry_delay는 실패한 후에, 어느 정도 시간 뒤에 재시도 할지 설정합니다. 저는 즉시 실행을 통해 먼저 dag가 돌아가는지 확인할 예정이기 때문에 주석처리 해놨습니다.
4. Operators
extract = PythonOperator(
task_id = 'extract',
python_callable = extract,
dag = weather_dag)
transform_load = PythonOperator(
task_id = 'transform_load',
python_callable = transform_load,
params = {
'schema': 'tkddnr961224', ## 자신의 스키마로 변경
'table': 'weather_forecast'
},
dag = weather_dag)
extract >> transform_load
PythonOperator를 두 개 작성해놨습니다. redshift의 schema와 table은 params로 입력해두고, PythonOperator를 통해서 dag를 받아 task를 정해놓습니다. 그리고 task의 순서를 명시하기 위해 extract >> transform_load를 입력해둡니다.
5. 확인
내가 원하는 dag가 웹 ui에 잘표시되었는지 확인합니다. 보통 refresh는 5분 간격으로 진행되지만, 성격이 급하신 분들은 bash에 이렇게 입력합니다.
airflow dags list
원하는대로 Dag가 올라왔는지 확인하고, 테스트를 위해서 우측 끝에 있는 재생버튼을 클릭해봅니다. success가 모든 Task가 success 됐다면 잘 실행된 것입니다. 다만 retry를 할 때 첫번째 시도가 실패했다면,
노란색으로 up_for_retry 가 뜹니다. 이 때는 다음 시도를 기다려보고, 로그를 확인해보면 됩니다. 저 같은 경우에는 로그를 확인해보니, logging 라이브러리를 임포트 하지 않았습니다..;;; 굉장히 사소한 실수일 수 있으니, 추측하기보단 로그 정보를 활용해서 디버깅하는 습관을 길러야할 것 같습니다!! 로그 정보는 보통 failed 된 빨간색 동그라미를 클릭하고, log_url 창으로 넘어가면 됩니다.
저의 경우 success하고 나서, redshift Database를 확인해보니,
아주 잘 들어온 것을 확인할 수 있습니다. 사실 날씨 데이터의 경우에는 full refresh 방식의 ETL도 가능하겠으나, incremental update를 통해서 이전에 데이터는 샇으면서, 미래 데이터는 업데이트하는 방식을 쓰는 방법도 있다고 합니다.
+ Incremental Update(upsert 방식)
이렇게 데이터를 계속해서 update하고 insert해야할 때, 이러한 방식을 upsert라고 부릅니다. 하지만 이 방식에는 Primary Key를 유지하는 방식 때문에 조금 더 다른 방법을 씁니다. 대부분의 데이터웨어하우스들은 Primary Key를 보장하지 않기 때문에 보통 Upsert를 할 때는 이러한 방식을 자주 사용합니다.
- 임시 테이블(스테이징 테이블)을 만들고 거기로 현재 모든 레코드를 복사합니다.
- 임시 테이블에 새로 데이터소스에서 읽어들인 레코드들을 복사(이 때 중복 존재 가능)
- 중복을 걸러주는 SQL 작성을 작성합니다.
- ROW_NUMBER를 이용해서 primary key로 partition을 잡고 적당한 다른 필드(보통 타임스탬프 필드)로 ordering을 수행해 primary key별로 하나의 레코드를 잡아냅니다.
- 위의 SQL을 바탕으로 최종 원본 테이블로 복사합니다 (Swap)
- 이때 원래 원본 테이블을 DROP하고 임시 temp 테이블을 원본 테이블로 바꿔주어야 합니다 (ALTER TABLE)
한 테이블을 복사하는 과정에서 데이터가 굉장히 큰 경우가 많을텐데, 이 방식이 사용하기 적합한게 맞는가?
대부분의 데이터 웨어하우스는 이런 작업에 최적화 되어있습니다. 데이터가 아무리 크다고 하더라도, 이런 작업을 위해 생겨난 데이터베이스이기 때문에, 걱정하지 않고 사용하셔도 좋습니다. 다음 포스팅은 이 INCREMENTAL UPDATE를 활용한 DAG를 생성하는 예제를 작성하겠습니다. 감사합니다:)
'데이터 엔지니어링 > 실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트' 카테고리의 다른 글
[5주차] OLTP/OLAP, MYSQL 데이터 복사하기 (0) | 2023.05.07 |
---|---|
[4주차] AIRFLOW Incremental Update 구현하기 (0) | 2023.05.07 |
[4주차] AIRFLOW 활용 데이터 적재(FULL REPRESH) (0) | 2023.05.06 |
[4주차] AIRFLOW ubuntu에 설치하기 (0) | 2023.04.30 |
[4주차] 트랜잭션 (0) | 2023.04.16 |