Airflow

데이터 노하우/꿀팁

Airflow 관련 문의 기록

프로젝트 당시 S3toRedshiftOperator 사용시 생겼던 문제에 관한 기록입니다. https://dataengineerstudy.tistory.com/194 [AWS REDSHIFT] AIRFLOW S3 → Redshift UPSERT 관련 에러('syntax error at or near "#"') 먼저 이 글은 저와 같이 REDSHIFT를 AIRFLOW와 연동해서 쓸 때 생기는 문제에 관한 글입니다. 문서가 너무 없고, 저만 겪는 상황 같아서, 열심히 삽질한 결과 작성해놓습니다. 이 글을 읽어보실 분들 dataengineerstudy.tistory.com 당시 문제점들을 기록해놨고, 해당 문제를 airflow 공식 github에 문의한 상태입니다. 임시로 해결해놨지만, 정말 임시로 해결한 문..

AWS

[AWS REDSHIFT] AIRFLOW S3 → Redshift UPSERT 관련 에러('syntax error at or near "#"')

먼저 이 글은 저와 같이 REDSHIFT를 AIRFLOW와 연동해서 쓸 때 생기는 문제에 관한 글입니다. 문서가 너무 없고, 저만 겪는 상황 같아서, 열심히 삽질한 결과 작성해놓습니다. 이 글을 읽어보실 분들의 조건은 다음과 같습니다. 1. 에러 상황 (1) Redshift Cluster 사용 (2) Delimited identifiers를 사용하지 않으면, Syntax Error가 발생하는가? (3) Airflow S3ToRedshiftOperator를 사용해서, UPSERT 방식으로 데이터를 redshift에 로드하려고 하는가? (4) Schema, Table 명을 Delimited identifiers를 사용해서 구성했는가? 여기까지 오셨으면 저랑 같은 상황입니다.. 일단 웹에 관련 정보는 없어서 ..

프로젝트 회고록/음악 평론 웹 제작 프로젝트

3. S3 데이터 가공해서 MYSQL로 적재하기

이번 포스팅에서는 S3에 있는 데이터를 가공해서, MYSQL로 적재하는 TRANSFORM, LOAD 과정을 작성하겠습니다. 아무래도, 각각 데이터 특성이 다르다보니까, ETL 과정에서도 조금 다른 방식의 가공과 적재 방법을 사용했습니다. 일단 학습에 목적을 두기도 했지만, 실무에서 어떤 방식으로 응용될지도 조금 고민해볼 수 있었던 것 같습니다. 데이터 파이프라인 일단 이번 DAG의 파이프라인은 이렇습니다. 사실 지금은 태스크 별로, S3 데이터를 읽는게 아니라, 전역변수로 S3에서 읽고 있는데, 만약에 EC2 환경에서 메모리나 이런 부분들이 부족하다면, 태스크별로 S3에서 데이터를 읽고와서, TASK를 마무리하면 그 때 해당 변수를 제거하는 식으로 메모리를 컨트롤할 수 있을 것 같습니다. 지금은 매일 업..

프로젝트 회고록/음악 평론 웹 제작 프로젝트

2. SPOTIFY 데이터 AIRFLOW로 S3에 업로드하기

안녕하세요. 이번에는 저번 포스팅에 이어서, SPOTIFY API를 활용해서, AIRFLOW로 S3에 업로드하는 과정을 자동화한 것에 대해서 작성하겠습니다. 확실히 S3에 업로드 하는 과정에서 조금 고민거리들이 많았는데, 일단은 진행하면서 배우는 중입니다. 그럼 이 과정들을 소개해보도록 하겠습니다. 데이터 파이프라인 설계 일단 AIRFLOW로 S3에 업로드하는 것은, EXTRACT 과정으로 삼았습니다. 원본 데이터를 JSON에 그대로 저장하고 S3에 업로드해서 AIRFLOW 스케줄러에 문제가 생겼을 때도, 어느 정도 복구할 수 있도록 S3를 거쳐가도록 파이프라인을 설계했습니다. 일단은 Spotify_Extract_toS3의 DAG의 파이프라인입니다. 일단 순서는 다음과 같습니다. globaltop50 데..

프로젝트 회고록/음악 평론 웹 제작 프로젝트

1. SPOTIFY API로 데이터 추출하기

안녕하세요! 저번에는 음악 평론 웹 제작 프로젝트를 미니 프로젝트로 진행하면서, 웹의 뼈대 정도는 만들어둔 상황입니다. 아직 프론트도 제대로 구현되지 않았고, 백 쪽도 확실하게 되진 않았습니다!! 전부 완성이 되면 기능 소개글도 한번 올려보겠습니다. 그런데 뒤에 데이터 파이프라인을 만들기가, 영 쉽지 않습니다.... 특히 SPOTIFY API 에서 주는 데이터들이 그렇게 친절하지 않아서..? 직접 오류를 제어하면서 뽑는 코드를 만들려니 고생 꽤나 했는데, 재밌습니다..하하; 그래서 SPOTIPY라는 누군가 만들어둔 라이브러리를 발견하고 쓰려고 했는데, 이건 오류제어를 하면서, 계속해서 API 호출을 할 수 없었습니다. 그래서 대표적인 에러를 제어하는 방식으로, 모듈을 조금 만들어봤습니다. 일단은 데이터엔..

데이터 엔지니어링/실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트

[6주차] Dag Dependencies(Explicit Trigger, Reactive trigger, BranchPythonOperator)

Dag간 실행순서를 정하기 위해서는 Dag Dependencies를 활용하면 됩니다. 즉 어떤 Dag가 실행되면, 따라오는 Dag가 있게끔 설정하는 것이 가능합니다. 보통 TriggerDagOperator를 쓰는데, ExternalTaskSensor는 성능 상의 이유로 잘 쓰지 않습니다. 이유는 후술하겠습니다! Explicit Trigger 이 Trigger 방법은 한 Dag는 다음 Dag를 트리거 시킵니다. 한 방법으로, DAG A의 태스크를 TriggerDagRunOperator로 구현하는 방법입니다. 트리거 A는 다음과 같이 구현하면 됩니다. 이때 유의할 점은 반드시 airflow.cfg dag_run_conf_overrides_params가 True로 설정 되어 있어야합니다. from airflo..

데이터 엔지니어링/실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트

[6주차] API & Airflow 모니터링

Airflow는 환경설정을 통해서, 웹 api를 외부에서 활용할 수 있습니다. Airflow API 활성화 airflow.cfg의 api 섹션에서 auth_backend의 값을 변경합니다. auth_backend = airflow.api.auth.backend.basic_auth airflow 웹서버를 재실행합니다. sudo systemctl restart airflow-webserver basic_auth 설정 재확인 sudo su airflow airflow config get-value api auth_backends Airflow Web UI 에서 새로운 사용자 추가 Security -> List Users -> + 이후 화면에서 새 사용자 정보를 추가 합니다(monitor:MonitorUser1..

데이터 엔지니어링/실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트

[6주차] AIRFLOW SLACK 연동(파이프라인 에러메시지 받기)

슬랙 메시지를 활용해서 데이터파이프라인을 효율적으로 관리할 수 있습니다. 보통 현업에 계신 분들은 데이터파이프라인 에러를 슬랙으로 받고, 해당 메시지를 받으면 데이터 파이프라인을 backfill 혹은 복구하는 작업을 합니다. 이번엔 airflow와 슬랙을 연동해서 에러메세지를 받아보는 작업을 해보겠습니다. 1.Incoming Webhooks App 생성하기 https://api.slack.com/messaging/webhooks Sending messages using Incoming Webhooks Creating an Incoming Webhook gives you a unique URL to which you send a JSON payload with the message text and some..

데이터 엔지니어링/실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트

[6주차] AIRFLOW 주요 고려사항 정리

에어플로우 환경 설정 관련 고려사항 airflow.cfg 에어플로우의 환경설정 파일입니다. /var/lib/airflow/airflow.cfg 해당 경로에 위치합니다. 환경설정의 변경은 웹 서버 및 스케줄러를 다시 시작할 때 반영됩니다. dag_dir_list_interval : dags_folder를 Airflow가 얼마나 자주 스캔하는지 명시합니다(초 단위, default : 300) DAGs 폴더 [core] 섹션의 dags_folder가 반드시 DAG들이 있는 디렉토리가 되어야합니다. Airflow Database 에어플로우의 메타데이터 데이터베이스는 기본적으로 sqlite을 사용합니다. 다만 추후 확장을 위해 보통은 postgresql로 사용합니다. 이 데이터베이스는 주기적으로 백업되어야하며, ..

데이터 엔지니어링/실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트

[5주차] Airflow + Redshift로 ELT 구현하기

이번 과제는 NPS를 구해서, 해당 NPS를 단 한번만 업데이트하는 ELT 구현입니다. NPS는 사용자의 서비스 추천정도를 물은 뒤에 0~10점으로 환산하여, 10점 혹은 9점을 준 고객의 비율에서 0~6점을 준 고객의 비율을 빼는 방법입니다. 간단하게 ELT를 구현하고 테스트 코드도 동시에 삽입해서 ELT 과정을 구현해보겠습니다. SQL 구현하기 저번 포스팅에서 만들어둔 nps 테이블을 활용하여 다음 SQL을 준비합니다. SELECT DATE(created_at) AS date, CAST(CAST(COUNT(CASE WHEN score IN (9,10) THEN 1 END) - COUNT(CASE WHEN score IN (0,1,2,3,4,5,6) THEN 1 END) as FLOAT) / COUNT..

데이터 엔지니어링/실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트

[5주차] Airflow BackFill(Incremental Update, execution_date)

BACKFILL 데이터파이프라인을 설계하고 운용할 때, 가끔 에러가 나거나, 이미 지난 날짜를 기준으로 데이터를 재처리 해야할 때가 있습니다. 이 때 백필은 재처리 작업을 의미합니다. 단어 의미 그대로 '매우는 작업'이라고 보시면 됩니다. Full Refrest를 한다면 backfill은 필요 없습니다. backfill은 일별 혹은 시간별 업데이트를 의미합니다. 마지막 업데이트 시간 기준 backfill을 하는 경우라면(데이터 웨어하우스 기록 시간 기준), execution_date을 이용한 backfill은 필요하지 않습니다. 데이터의 크기가 커질수록 backfill 기능을 구현해두는 것은 필수입니다. airflow는 이 backfill 작업을 굉장히 쉽게 만들지만, 데이터소스의 도움없인 불가능합니다...

데이터 엔지니어링/실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트

[5주차] OLTP/OLAP, MYSQL 데이터 복사하기

5주차에 배운 내용들을 포스팅합니다. 먼저 OLTP와 OLAP의 차이에 대해 간단히 알아보도록 하겠습니다. OLTP VS OLAP OLTP(ONLINE TRASACTION PROCESSING) 온라인 트랜잭션 프로세싱, 즉 온라인에서 일어나는 데이터 교환에 중점을 두는 작업입니다. 데이터 처리 속도가 보다 중요하며, 큰 데이터를 다루기에는 적합하지 않습니다. 우리가 흔히 알고 있는 MYSQL, POSTGRESQL 같은 데이터베이스들이 이 작업에 중점을 둔 DB입니다. OLAP(ONLINE ANALYTIC PROCESSING) 온라인 애널리틱 프로세싱, 즉 분석을 중점을 두는 작업입니다. 데이터 처리 속도는 조금 떨어지더라도, 큰 과거 데이터를 활용해서 분석처리에 사용합니다. AWS REDSHIFT, GO..

우상욱
'Airflow' 태그의 글 목록