이번 과제는 NPS를 구해서, 해당 NPS를 단 한번만 업데이트하는 ELT 구현입니다. NPS는 사용자의 서비스 추천정도를 물은 뒤에 0~10점으로 환산하여, 10점 혹은 9점을 준 고객의 비율에서 0~6점을 준 고객의 비율을 빼는 방법입니다. 간단하게 ELT를 구현하고 테스트 코드도 동시에 삽입해서 ELT 과정을 구현해보겠습니다.
SQL 구현하기
저번 포스팅에서 만들어둔 nps 테이블을 활용하여 다음 SQL을 준비합니다.
SELECT DATE(created_at) AS date,
CAST(CAST(COUNT(CASE WHEN score IN (9,10) THEN 1 END)
- COUNT(CASE WHEN score IN (0,1,2,3,4,5,6) THEN 1 END) as FLOAT)
/ COUNT(*) AS FLOAT) as nps
FROM tkddnr961224.nps
GROUP BY DATE(created_at);
- created_at을 날짜 기준으로 그룹화합니다.
- 스코어가 9점,10점인 count 값에서, 0점부터 6점까지의 count 값을 빼고 전체 개수로 나눕니다.
- 각각을 float로 치환합니다.
그럼 이제 이 쿼리문을 활용해서 ELT를 구현합니다.
dag = DAG(
dag_id = "Build_Summary_nps",
start_date = datetime(2021,12,10),
schedule = '@once', # 한번만 실행
catchup = False
)
execsql = PythonOperator(
task_id = 'execsql',
python_callable = execSQL,
params = {
'schema' : 'tkddnr961224',
'table': 'nps_summary',
'sql' : """SELECT DATE(created_at) AS date,
CAST(CAST(COUNT(CASE WHEN score IN (9,10) THEN 1 END)
- COUNT(CASE WHEN score IN (0,1,2,3,4,5,6) THEN 1 END) as FLOAT)
/ COUNT(*) AS FLOAT) as nps
FROM tkddnr961224.nps
GROUP BY DATE(created_at);"""
},
dag = dag
)
쿼리문을 PythonOperator의 params로 넣어줍니다.
def execSQL(**context):
schema = context['params']['schema']
table = context['params']['table']
select_sql = context['params']['sql']
logging.info(schema)
logging.info(table)
logging.info(select_sql)
cur = get_Redshift_connection()
sql = f"""DROP TABLE IF EXISTS {schema}.temp_{table};CREATE TABLE {schema}.temp_{table} AS """
sql += select_sql
cur.execute(sql)
cur.execute(f"""SELECT COUNT(1) FROM {schema}.temp_{table}""")
count = cur.fetchone()[0]
if count == 0:
raise ValueError(f"{schema}.{table} didn't have any record")
try:
sql = f"""DROP TABLE IF EXISTS {schema}.{table};ALTER TABLE {schema}.temp_{table} RENAME to {table};"""
sql += "COMMIT;"
logging.info(sql)
cur.execute(sql)
except Exception as e:
cur.execute("ROLLBACK")
logging.error('Failed to sql. Completed ROLLBACK!')
raise AirflowException("")
해당 코드의 논리는 다음과 같습니다.
- 만약 임시 테이블이 있다면 임시 테이블을 삭제합니다.
- 임시테이블을 생성합니다.
- 앞서 만든 쿼리를 실행합니다.
- 쿼리 실행 후 임시 테이블에 행이 하나도 없다면 에러를 발생시킵니다.
- 임시테이블에 행이 하나라도 있다면 기존에 있던 원본 테이블을 삭제합니다.
- 임시테이블의 이름을 원본 테이블명으로 바꾸고, 대체한 뒤 커밋합니다.
- 에러가 발생했다면 ROLLBACK을 수행합니다.
보통 ELT는 데이터 애널리스트들이 수행하는 경우도 많고, ANALYTIC ENGINEER들이 수행하는 경우도 많습니다. 다만 데이터 애널리스트는 SQL문을 짜고, 해당 SQL문에 대해서 엔지니어는 돌려서 몇몇 DAG에 의한 트리거로 테이블을 업데이트하는 경우가 많습니다.
또한 해당 테스트, ELT 과정을 범용적으로구현하기 위해 DBT라는 프레임워크가 존재하기도 합니다. 추후 포스팅에 간략히 소개해드리도록 하겠습니다. 감사합니다:)
'데이터 엔지니어링 > 실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트' 카테고리의 다른 글
[6주차] AIRFLOW 주요 고려사항 정리 (0) | 2023.05.07 |
---|---|
[6주차] DOCKER & K8S & DBT (0) | 2023.05.07 |
[5주차] Airflow BackFill(Incremental Update, execution_date) (0) | 2023.05.07 |
[5주차] OLTP/OLAP, MYSQL 데이터 복사하기 (0) | 2023.05.07 |
[4주차] AIRFLOW Incremental Update 구현하기 (0) | 2023.05.07 |