FULL REFRESH 상황 가정 데이터 적재
FULL REFRESH란 테이블에 있는 모든 정보를 삭제하고, 다시 INSERT하는 방식으로 보통 데이터의 크기가 크지 않거나, 스케줄의 실행 시간을 보고 판단합니다. 보통 FULL REFRESH가 가지는 장점은, BACKFILL(AIRFLOW의 TASK 중 일부 혹은 전체 DAG의 오류를 복구시키는 작업) 과정이 굉장히 단순하고 유지보수에 들이는 자원이 굉장히 작다는 점입니다.
보통은 FULL REPRESH를 쓰는 것이 좋지만, 아래의 경우에는 INCREMENTAL UPDATE로 전환합니다.
- 1시간 주기로 돌아야하는 DAG의 RUNNING TIME이 30분 이상 소요될 경우
- 하루 주기로 돌아야하는 DAG의 RUNNING TIME이 반나절 이상 소요될 경우
- 즉 주기의 반을 RUNNING TIME(UPDATE TIME)에 쓸 때 INCREMENTAL UPDATE로의 전환을 고려합니다.
이 때 트랜잭션을 활용하므로, DAG의 실행시간 동안에 있을 잠재적 위험은 크지 않습니다.
(1) 일단 구현해보기
def get_Redshift_connection():
host = "..." # 데이터웨어하우스(redshift) host 주소
user = "..." # 본인 ID 사용
password = "..." # 본인 Password 사용
port = ... # 포트번호
dbname = "..." # 데이터베이스 이름
conn = psycopg2.connect(f"dbname={dbname} user={user} host={host} password={password} port={port}")
conn.set_session(autocommit=True)
return conn.cursor()
먼저 레드쉬프트에 연결하는 방법입니다. psycopg2를 활용합니다. 일단은 하드코딩으로 연결했지만 앞으로는 airflow의 web ui를 활용합니다. ui를 활용하는 이유는 코드를 바꿀 때 설명드리겠습니다!
def extract(url):
logging.info("Extract started")
f = requests.get(url)
logging.info("Extract done")
return (f.text)
def transform(text):
logging.info("transform started")
# ignore the first line - header
lines = text.split("\n")[1:]
logging.info("transform done")
return lines
def load(lines):
logging.info("load started")
cur = get_Redshift_connection()
sql = "BEGIN;DELETE FROM tkddnr961224.name_gender;"
for l in lines:
if l != '':
(name, gender) = l.split(",")
sql += f"INSERT INTO tkddnr961224.name_gender VALUES ('{name}', '{gender}');"
sql += "END;"
cur.execute(sql)
logging.info(sql)
logging.info("load done")
def etl():
link = "..." # s3 혹은 데이터 저장소 링크
data = extract(link)
lines = transform(data)
load(lines)
etl을 단계별로 끊어서, etl 함수에 통합합니다.
- extract
- extract를 통해서 데이터를 읽어옵니다. 상황별로 다르겠지만, 여기선 s3에 있는 데이터를 읽어오는 과정입니다.
- transform
- transform을 통해서 줄간격에 띄워져있는 데이터를 split한 후 return 합니다
- load
- load에선 본격적으로 redshift에 연동한 후,
- 앞서 split된 text 데이터를 적재합니다. 여기선 BEGIN을 통해서 트랜잭션을 시작하고, 먼저 해당 테이블의 모든 데이터를 삭제한 후(FULL REPRESH), TEXT 데이터를 모두 INSERT 한 SQL 문을 작성해서 넣어준 뒤, END로 COMMIT 합니다.
dag_second_assignment = DAG(
dag_id = 'NameGenderCSVtoRedshift',
catchup = False,
start_date = datetime(2023,4,6, tzinfo = local_tz), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *') # 적당히 조절
task = PythonOperator(
task_id = 'perform_etl',
python_callable = etl,
dag = dag_second_assignment)
DAG를 통해서, 스케줄링을 합니다.
- 기본적으로 dag_id는 반드시 파일명과 일치해야합니다.
- ex) NameGenderCSVtoRedshift -> NameGenderCSVtoRedshift.py
- catchup 옵션은 startdate로부터 실행되지 않은 dag를 실행할지 안할지 설정하는 옵션입니다. 디폴트 값은 True이지만, 일반적으로 False로 둡니다. 만약 catchup을 True로 둬서, 만일 이전의 dag를 전부 실행하면 불필요한 작업을 하게되어, 서버비용 등의 문제가 생길 수 있습니다.
DAG를 생성했다면, Operator를 통해서 task를 만듭니다. task의 종류는 다양하지만, 대표적인 예시로 PythonOperator를 사용했습니다.
- task_id는 웹 ui에 표시되니, backfill, 정보 조회등을 위해 id를 명확히 짓도록 합니다.
- python_callable은 실행하고자 하는 파이썬 함수를 지정합니다.
- dag에는 아까 만들어두었던 dag의 변수명을 입력합니다.
일단 여기까지 진행하고, Airlfow의 dag 폴더에 넣어두셨다면, UI에 해당 DAG가 잡히기 시작합니다.
이 액션 버튼을 통해서, 모든 TASK의 일정을 무시하고 즉각 실행할 수 있습니다. 오류가 없었다면 REDSHIFT에 데이터가 잘 적재된걸 보실 수 있습니다. 이제 앞서 이 TASK들을 재구성해서, 좀 더 좋은 코드로 만들어보겠습니다. 여러 변경 사항이 있겠지만, 주목해야하는 부분만 작성하겠습니다.
(2) PARAMS를 활용해보기(Variables 활용하기)
앞선 코드에선 모든 변수를 하드코딩을 통해서 DAG를 구성했습니다. 다만, 이 코드는 재사용성면에서 좋지 않은 방식입니다. 따라서, 에어플로우에서 제공하는 PARAMS 방식을 사용해서 변수를 넘겨보겠습니다.
dag_second_assignment = DAG(
dag_id = 'NameGenderCSVtoRedshift_v2',
start_date = datetime(2023,4,6, tzinfo = local_tz), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *', # 적당히 조절
catchup = False, # catchup이 False일 때 이전에 실행되지 않은 것들을 다시 실행하지 않는다.
max_active_runs = 1, # 한번만 돌아갈 수 있게
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
} # 재시도 최대 한번, 재시도 딜레이는 3분 후
)
task = PythonOperator(
task_id = 'perform_etl',
python_callable = etl,
params = {
'url': "..." # S3 데이터 주소
},
dag = dag_second_assignment)
일단 PythonOperator에 주목해봅니다. params가 추가 됐고, params에는 딕셔너리 구조로 데이터를 저장해놓을 수 있습니다. 이 방식으로 params에 들어가서 변수를 가져오는 방식이 가능해집니다.
def etl(**context):
link = context["params"]["url"]
# task 자체에 대한 정보 (일부는 DAG의 정보가 되기도 함)를 읽고 싶다면 context['task_instance'] 혹은 context['ti']를 통해 가능
# https://airflow.readthedocs.io/en/latest/_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance
task_instance = context['task_instance']
execution_date = context['execution_date']
logging.info(execution_date)
data = extract(link)
lines = transform(data)
load(lines)
**context를 활용하면 task에 대한 자체정보를 가져올 수 있습니다. context["params"]["url"]을 통해서 저희가 저장해놓은 데이터의 링크를 가져옵니다. 하지만 이 방식도 결국 코드를 변경해야하는 구조입니다. 더 나은 방식은 웹 ui의 variable을 활용하는 방법입니다.
- Airflow 웹 ui에서 Admin -> Variables에 들어갑니다.
- Variables에서 + 버튼을 클릭하여, key 값과 Val 값, 그리고 설명, 암호화 여부를 등록합니다.
이렇게 하면 코드를 바꿀 필요 없이, 웹 ui의 값만 변경해주면 같은 코드를 사용하더라도, 쉬운 수정이 가능합니다. 이 데이터를 가져오는 방법은 다음과 같습니다.
from airflow.models import Variable
extract = PythonOperator(
task_id = 'extract',
python_callable = extract,
params = {
'url': Variable.get("csv_url") # 이 부분
},
dag = dag_second_assignment)
아까와 같이 params에 url을 지정해두고, Variable.get(키값의 이름)을 넣어주면 됩니다. 그러면 웹 ui에 등록한 값(airflow의 meta DB에 저장되어 있는 값)을 가져올 수 있습니다.
(3) xcom_pull을 활용해서 task 나눠서 관리하기
방금 전에 사용한 코드들은 extract, transform, load를 기껏 나누고, 다시 한 함수에서 합친 뒤에 그 함수를 한 task로 관리했습니다. 하지만 이 방식을 사용할 경우, task별 backfill이 불가능해지면서, backfill을 할 때 한번에 다해야하는 불리함이 생깁니다. 따라서, 저희는 task를 나누기 위해 xcom_pull을 활용합니다.
def extract(**context):
link = context["params"]["url"]
task_instance = context['task_instance']
execution_date = context['execution_date']
logging.info(execution_date)
f = requests.get(link)
return (f.text)
def transform(**context):
text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
lines = text.split("\n")[1:]
return lines
task에 대한 정보는 context 안에 task_instance에 있습니다. transform 함수에서 보시면, context 안에 있는 'task instance'를 가져오고 그 안에서 extract라는 task id를 갖는 값의 return value를 가져오라는 방식으로 구현되어 있습니다. 그런데 굳이 이렇게 복잡하게 return 값을 가져오는 이유는 backfill 때문입니다.
- task의 return 값을 airflow는 metaDB에 저장합니다. 지금 이 방식은 metaDB에 있는 데이터를 끌어오는 과정이라고 보시면 됩니다. 이렇게 해야, 한 task가 오류가 생겼을 때, backfill 과정에서 그대로 다음 task를 이어서 진행할 수 있습니다.
- 하지만 데이터 크기가 커졌을 경우에 이 방식은 선호되지 않습니다. DB에 데이터를 저장해놓기 때문에, 계속해서 큰 데이터를 주고 받는 과정은 낭비에 가깝기 때문입니다.
- 따라서 큰 데이터일 때는 보통 Extract 과정에서 뽑아온 데이터를 S3 혹은 다른 저장소에 저장해놓고, 그 링크를 통해서 transform을 하는 방식을 주로 사용합니다.
dag_second_assignment = DAG(
dag_id = 'NameGenderCSVtoRedshift_v3',
start_date = datetime(2023,4,6, tzinfo = local_tz), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *', # 적당히 조절
catchup = False,
max_active_runs = 1,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
extract = PythonOperator(
task_id = 'extract',
python_callable = extract,
params = {
'url': Variable.get("csv_url")
},
dag = dag_second_assignment)
transform = PythonOperator(
task_id = 'transform',
python_callable = transform,
params = {
},
dag = dag_second_assignment)
load = PythonOperator(
task_id = 'load',
python_callable = load,
params = {
'schema': 'tkddnr961224',
'table': 'name_gender'
},
dag = dag_second_assignment)
extract >> transform >> load
위 방식을 통해서 task 별로 데이터를 주고 받는 과정을 구현했다면, 뒤에 operator와 dag를 작성하는 과정은 간단합니다. 각 operator를 작성하고, operator 간의 실행 순서를 정해줍니다(extract >> transform >> load )
(4) Redshift Connection 설정(Data Warehouse)
웹 UI를 통해서 AIRFLOW와 연결할 CONNECTIONS를 추가합니다. 해당 페이지를 통해서, 연결하고 싶은 데이터베이스의 연결 정보를 입력해줍니다.
from airflow.providers.postgres.hooks.postgres import PostgresHook
def get_Redshift_connection(autocommit=False):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
앞선 redshift 연결 함수를 이렇게 바꿔써줍니다. 데이터베이스 정보가 코드를 통해 유출 될 일도, 데이터베이스 변경 시에도 쉽게 db를 전환할 수 있습니다! redshift는 PostgresHook을 사용합니다.
여기까지 따라오셨다면, 4개의 DAG를 생성하셨거나, 1개의 DAG로 계속 업데이트하셨을 수도 있습니다. FAIL의 경우엔,, S3 관련 삽질이라 시간이 나는대로 포스팅하겠습니다! VARIABLES 관련 오류였습니다. 이 외에도 이번 챕터에서
- KST에 맞춰서 스케줄링 하는 법
- AIRFLOW의 각종 기능
들이 많지만, 하면서 쉽게 터득할 수 있는거라 포스팅하진 않겠습니다! 다음 포스팅은 Open Weather API를 활용해서 full refresh ETL 작업, 그리고 여유가 된다면 Incremental Update까지 진행해보겠습니다~ 감사합니다:)
'데이터 엔지니어링 > 실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트' 카테고리의 다른 글
[4주차] AIRFLOW Incremental Update 구현하기 (0) | 2023.05.07 |
---|---|
[4주차] AIRFLOW FULL REPRESH 예제(+ Incremental Update 맛보기) (0) | 2023.05.06 |
[4주차] AIRFLOW ubuntu에 설치하기 (0) | 2023.04.30 |
[4주차] 트랜잭션 (0) | 2023.04.16 |
[3주차] AIRFLOW (0) | 2023.04.08 |