Incremental Update의 경우에는 backfill 이슈가 발생하면서, 유지보수 비용이 full refresh에 비해 기하급수적으로 올라갑니다. 하지만, 데이터가 커질수록 full refresh의 경우에도 비용이 증가하면서, 오히려 Incremental Update가 나은 상황이 있을 수 있습니다. 따라서 해당 포스팅에서는 Incremental Update를 기초적으로 어떻게하는지 알아보겠습니다.
먼저 해당 포스팅에 대한 기본적인 정보는 앞선 포스팅을 참고해주세요.
https://dataengineerstudy.tistory.com/161
[4주차] AIRFLOW 활용 데이터 적재(FULL REPRESH)
FULL REFRESH 상황 가정 데이터 적재 FULL REFRESH란 테이블에 있는 모든 정보를 삭제하고, 다시 INSERT하는 방식으로 보통 데이터의 크기가 크지 않거나, 스케줄의 실행 시간을 보고 판단합니다. 보통 FUL
dataengineerstudy.tistory.com
INCREMENTAL UPDATE 구현하기
DAG를 작성하는 방법보단, 논리적으로 어떻게 구현을 하는지에 중점을 맞춰서 보여드리겠습니다.
create_sql = f"""DROP TABLE IF EXISTS {schema}.temp_{table};
CREATE TABLE {schema}.temp_{table} (LIKE {schema}.{table} INCLUDING DEFAULTS);INSERT INTO {schema}.temp_{table} SELECT * FROM {schema}.{table};"""
logging.info(create_sql)
try:
cur.execute(create_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
- 임시 테이블을 생성합니다.
- 임시 테이블에는 Incremental update를 하려고 하는 테이블을 스키마를 복사합니다. 그리고 여기서 쓰인 INCLUDING DEFAULTS는 GET_DATE를 그대로 가져오기 위해 사용합니다.
- 임시테이블에 원본 테이블에 있는 모든 값을 INSERT 합니다.
- 해당 sql문이 작동하면 COMMIT, 에러가 발생했을 때는 ROLLBACK 합니다.(autocommit= False일 때 상황)
# 임시 테이블 데이터 입력
insert_sql = f"INSERT INTO {schema}.temp_{table} VALUES " + ",".join(ret)
logging.info(insert_sql)
try:
cur.execute(insert_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
임시테이블에 데이터를 INSERT하고 COMMIT 합니다.
# 기존 테이블 대체
alter_sql = f"""DELETE FROM {schema}.{table};
INSERT INTO {schema}.{table}
SELECT date, temp, min_temp, max_temp FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM {schema}.temp_{table}
)
WHERE seq = 1;"""
logging.info(alter_sql)
try:
cur.execute(alter_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
- 기존 테이블의 데이터를 삭제합니다.
- 임시 테이블에 있던 값들을 윈도 함수를 통해, 중복을 제거합니다
- ROW_NUMBER를 통해서 DATE를 기준으로 PARTITION 한 값들에 번호를 매깁니다. 해당 번호는 CREATED_DATE로 인해서 내림차순 정렬됩니다.
- 내림 차순된 데이터 중에서 seq = 1;인 데이터 즉, 가장 최신 데이터만 남겨놓습니다.
- 중복을 제거한 데이터를 그대로 원본 테이블에 넣습니다.
이렇게하면 날짜가 늘어날 때마다, 기존 데이터를 삭제하지 않고, 늘어나는 데이터를 업데이트 할 수 있습니다.
이 방법 외에도 많은 방법들이 있습니다. 해당 코드에선 임시테이블을 만들어서 중복을 제거했지만, 기존의 RDBMS에서 가져오는 데이터들은 이미 PRIMARY KEY를 보장하고, 보통 CREATED_DATE가 있는 데이터들이 있기 때문에 조금 더 쉽게 구현할 수도 있습니다.
'데이터 엔지니어링 > 실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트' 카테고리의 다른 글
[5주차] Airflow BackFill(Incremental Update, execution_date) (0) | 2023.05.07 |
---|---|
[5주차] OLTP/OLAP, MYSQL 데이터 복사하기 (0) | 2023.05.07 |
[4주차] AIRFLOW FULL REPRESH 예제(+ Incremental Update 맛보기) (0) | 2023.05.06 |
[4주차] AIRFLOW 활용 데이터 적재(FULL REPRESH) (0) | 2023.05.06 |
[4주차] AIRFLOW ubuntu에 설치하기 (0) | 2023.04.30 |