5주차에 배운 내용들을 포스팅합니다. 먼저 OLTP와 OLAP의 차이에 대해 간단히 알아보도록 하겠습니다.
OLTP VS OLAP
- OLTP(ONLINE TRASACTION PROCESSING)
- 온라인 트랜잭션 프로세싱, 즉 온라인에서 일어나는 데이터 교환에 중점을 두는 작업입니다.
- 데이터 처리 속도가 보다 중요하며, 큰 데이터를 다루기에는 적합하지 않습니다.
- 우리가 흔히 알고 있는 MYSQL, POSTGRESQL 같은 데이터베이스들이 이 작업에 중점을 둔 DB입니다.
- OLAP(ONLINE ANALYTIC PROCESSING)
- 온라인 애널리틱 프로세싱, 즉 분석을 중점을 두는 작업입니다.
- 데이터 처리 속도는 조금 떨어지더라도, 큰 과거 데이터를 활용해서 분석처리에 사용합니다.
- AWS REDSHIFT, GOOGLE BIGQUERY, SNOWFLAKE 같은 웨어하우스들이 이 처리에 중점을 뒀습니다.
간혹 이제 막 시작한 스타트업들은 모든 데이터를 OLTP 기반의 데이터베이스에 모든 데이터를 집어넣기도 합니다. 하지만 이런 방식은 결국 운영 DB에 과부하를 일으키고, 확장에 한계를 맞게 됩니다.
초기 넷플릭스의 경우에는 영상을 시청하는 시청자의 15초마다 기록을 남겨서, 그대로 OLTP 기반의 데이터베이스에 넣었습니다. 이러한 방식은 결국 한계를 갖게 됐고, 결국 운영에 필수적이지 않은 데이터들은 로그 기반의 파일로 저장해서 클라우드 스토리지를 거쳐서, 데이터웨어하우스로 적재되는 방식으로 바뀌기 시작했습니다.
여기서 알 수 있는 점은 프로덕션 데이터베이스는 항상 운영에 필수적인 데이터만 가지고 있을 것, 그리고 추가적인 분석을 위한 데이터웨어하우스 혹은 파일 기반의 데이터 관리를 해야할 것. 이 정도로 정리하면 좋을 것 같습니다. 따라서 프로덕션 데이터베이스에 있는 필수적인 운영정보들이 분석에 활용되어야하기도 합니다. 따라서 데이터웨어하우스에 적재하는 방법을 알아보고, OLAP 분석을 위한 데이터파이프라인을 만드는 방법을 포스팅해보겠습니다.
MYSQL 데이터 복사해서 REDSHIFT에 적재
- #1 데이터의 크기가 큰 경우
- MYSQL 일부 테이블의 데이터를 S3에 파일 형태로 저장한 뒤
- REDSHIFT의 COPY를 통해서 BULK UPDATE합니다
- #2 데이터의 크기가 작은 경우
- MYSQL의 일부 테이블 데이터를 REDSHIFT에 INSERT 방식으로 적재합니다.
AIRFLOW는 MYSQL, REDSHIFT 뿐만 아니라 수많은 데이터베이스를 지원하며, 이는 WEB UI의 CONNECTION 설정을 통해서 쉽게 가능합니다. 일단 예제를 통해서 CONNECTION 설정을 진행해보겠습니다.
MYSQL, S3 AIRFLOW랑 연결하기
일단 WEB UI를 통해서 MYSQL에 연결합니다. DB 정보를 입력하신 뒤에, TEST를 통해서 성공적으로 연결되었다는 표시가 뜨면 SAVE 합니다.
S3도 같은 방식으로 연결합니다. 2.5.1 버전에서는 간혹 S3가 있기도하고, AMAZONE WEBSERVICE, GENERIC 세 개 중에 하나 골라서 하시면 됩니다. aws access key id, aws Secret Access Key 이렇게 두 개 입력하고, 지역 적어두고 사용하시면 됩니다!
MYSQL 데이터 복사해서 REDSHIFT에 적재(FULL REFRESH)
일단 Dags의 경우엔 MYSQL -> S3, S3 -> REDSHIFT 이 두 개의 TASK를 만들건데, 이미 AIRFLOW에선 이 OPERATOR들을 제공합니다. PythonOperator에 비하면 굉장히 쉽습니다.
schema = "tkddnr961224" # 데이터베이스 이름
table = "nps" # 테이블 이름
s3_bucket = "grepp-data-engineering" # 버킷명(s3에서 폴더를 담는 최상위 객체)
s3_key = schema + "-" + table # 버킷 내 객체의 고유한 식별자
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = "SELECT * FROM prod.nps",
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id", # 연결할 아이디(앞서 웹 ui 설정)
aws_conn_id = "aws_conn_id", # 연결할 아이디(앞서 웹 ui 설정)
verify = False,
replace = True, # 덮어쓰기
pd_kwargs={"index": False, "header": False},
dag = dag
)
- SqlToS3Operator 설정
- mysql에 있는 prod.nps 데이터를 전부 가져와서
- s3_bucket, key 값에 replace하며 업데이트
- 기본적으로 SqlToS3Operator는 pandas를 사용해서, pd_kwargs를 통해서 index와 header를 빼놓고 s3 적재 가능
s3_to_redshift_nps = S3ToRedshiftOperator( # S3에서 REDSHIFT로
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'], # COPY는 CSV로
method = 'REPLACE', # FULL REFRESH
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
dag = dag
)
mysql_to_s3_nps >> s3_to_redshift_nps # TASK 순서
- S3ToRedshiftOperator
- S3에 있는 데이터를 그대로 redshift 테이블로 COPY
- method는 REPLACE로 기존 데이터를 대체(FULL REFRESH)\
- mysql_to_s3_nps 태스크와 s3_to_redshift_nps 태스크를 연결
MYSQL 데이터 복사해서 REDSHIFT에 적재(Incremental Update)
먼저 MYSQL/POSTGRESQL 테이블이라면 INCREMENTAL UPDATE할 때 다음을 만족해야합니다.
- created (timestamp): Optional
- modified (timestamp)
- deleted (boolean): 레코드를 삭제하지 않고 deleted를 True로 설정
created 컬럼이 있고, 만약에 수정이 가능한 데이터라면 modified 컬럼이 있어야합니다. 그리고 삭제가 가능한 데이터라면 물리적으로 데이터를 삭제하지 않고, deleted 컬럼을 추가해서 True로 설정해서 플래그를 만드는 식으로 구현되어있어야합니다. 이 때 Incremental update가 가능합니다. 이 조건이 만족됐을 때 다음 방법으로 Incremental Update를 구현합니다.
Daily Update이고 테이블의 이름이 A이고 MySQL에서 읽어온다면
|
이런 식으로 읽어올 데이터의 양을 줄이고, 업데이트 된 값을 incremental update합니다. 하지만 Operator를 잘 사용한다면, 파라미터를 설정함으로써, 훨씬 간단히 수행할 수 있습니다.
schema = "tkddnr961224"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table # s3_key = schema + "/" + table
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')", # 바뀐 부분
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True,
pd_kwargs={"index": False, "header": False},
dag = dag
)
코드를 보시면 query가 달라졌을겁니다. 먼저 pord database(mysql)에서 nps 테이블의 모든 데이터를 가져오는데, 이 때 airflow task의 execution_date(airflow)와 created_at의 date가 같은 데이터만 가지고 옵니다.
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
redshift_conn_id = "redshift_dev_db",
method = "UPSERT",
upsert_keys = ["id", "created_at"],
dag = dag
)
이 코드에선 method 부분이 달라졌는데, method를 upsert로 구현하면 위에 논리적인 부분들이 알아서 해결됩니다. upsert_keys 값들을 기존 데이터와 비교해서 같은 데이터는 update하고, 없는 데이터는 insert하는 방식으로 구현됩니다.
+ Redshift COPY 권한 허가(AIRFLOW)
VARIABLES에 iam_role_for_copy_access_token을 추가합니다. Val은 redshift에서 받은 액세스 토큰 그대로 사용하시면 됩니다. s3와 연동되어있지 않다면 부가적인 작업이 더 필요한 것 같습니다. 일단 저의 경우엔 이대로 진행하면, ec2 서버에선 진행되고 wsl에서는 되지 않습니다.
'데이터 엔지니어링 > 실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트' 카테고리의 다른 글
[5주차] Airflow + Redshift로 ELT 구현하기 (0) | 2023.05.07 |
---|---|
[5주차] Airflow BackFill(Incremental Update, execution_date) (0) | 2023.05.07 |
[4주차] AIRFLOW Incremental Update 구현하기 (0) | 2023.05.07 |
[4주차] AIRFLOW FULL REPRESH 예제(+ Incremental Update 맛보기) (0) | 2023.05.06 |
[4주차] AIRFLOW 활용 데이터 적재(FULL REPRESH) (0) | 2023.05.06 |