데이터 엔지니어링/실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트

데이터 엔지니어링/실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트

[7주차] SPARK 기본 개념 및 PANDAS와의 비교

SPARK 버클리 대학의 AMPLab에서 아파치 오픈소스 프로젝트로 2013년 시작한 프레임워크입니다. 하둡의 뒤를 잇는 2세대 빅데이터 기술로, Yarn(Yarn은 Hadoop 2.0, 3.0을 말한다고 보시면 됩니다.)을 분산환경으로 사용하고 Scala로 작성되었습니다. 일단 Spark의 기본적인 개념은 분산, 병렬 처리입니다. 데이터를 나누고 이 각각, Block(Partition)을 따로 가공 및 정제 후에 합치는 방식입니다. Spark은 하둡의 Mapreduce 방식과 다르게 디스크에 저장하는 방식이 아닌, 메모리에 저장함으로써 속도를 월등히 높였습니다. 특히 MapReduce는 프로그래밍에서 사용할 수 있는 Operator가 딱 두 개였고, SQL로 치면 GROUP BY, CASE WHEN 정..

데이터 엔지니어링/실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트

[6주차] Dag Dependencies(Explicit Trigger, Reactive trigger, BranchPythonOperator)

Dag간 실행순서를 정하기 위해서는 Dag Dependencies를 활용하면 됩니다. 즉 어떤 Dag가 실행되면, 따라오는 Dag가 있게끔 설정하는 것이 가능합니다. 보통 TriggerDagOperator를 쓰는데, ExternalTaskSensor는 성능 상의 이유로 잘 쓰지 않습니다. 이유는 후술하겠습니다! Explicit Trigger 이 Trigger 방법은 한 Dag는 다음 Dag를 트리거 시킵니다. 한 방법으로, DAG A의 태스크를 TriggerDagRunOperator로 구현하는 방법입니다. 트리거 A는 다음과 같이 구현하면 됩니다. 이때 유의할 점은 반드시 airflow.cfg dag_run_conf_overrides_params가 True로 설정 되어 있어야합니다. from airflo..

데이터 엔지니어링/실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트

[6주차] API & Airflow 모니터링

Airflow는 환경설정을 통해서, 웹 api를 외부에서 활용할 수 있습니다. Airflow API 활성화 airflow.cfg의 api 섹션에서 auth_backend의 값을 변경합니다. auth_backend = airflow.api.auth.backend.basic_auth airflow 웹서버를 재실행합니다. sudo systemctl restart airflow-webserver basic_auth 설정 재확인 sudo su airflow airflow config get-value api auth_backends Airflow Web UI 에서 새로운 사용자 추가 Security -> List Users -> + 이후 화면에서 새 사용자 정보를 추가 합니다(monitor:MonitorUser1..

데이터 엔지니어링/실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트

[6주차] AIRFLOW SLACK 연동(파이프라인 에러메시지 받기)

슬랙 메시지를 활용해서 데이터파이프라인을 효율적으로 관리할 수 있습니다. 보통 현업에 계신 분들은 데이터파이프라인 에러를 슬랙으로 받고, 해당 메시지를 받으면 데이터 파이프라인을 backfill 혹은 복구하는 작업을 합니다. 이번엔 airflow와 슬랙을 연동해서 에러메세지를 받아보는 작업을 해보겠습니다. 1.Incoming Webhooks App 생성하기 https://api.slack.com/messaging/webhooks Sending messages using Incoming Webhooks Creating an Incoming Webhook gives you a unique URL to which you send a JSON payload with the message text and some..

데이터 엔지니어링/실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트

[6주차] AIRFLOW 주요 고려사항 정리

에어플로우 환경 설정 관련 고려사항 airflow.cfg 에어플로우의 환경설정 파일입니다. /var/lib/airflow/airflow.cfg 해당 경로에 위치합니다. 환경설정의 변경은 웹 서버 및 스케줄러를 다시 시작할 때 반영됩니다. dag_dir_list_interval : dags_folder를 Airflow가 얼마나 자주 스캔하는지 명시합니다(초 단위, default : 300) DAGs 폴더 [core] 섹션의 dags_folder가 반드시 DAG들이 있는 디렉토리가 되어야합니다. Airflow Database 에어플로우의 메타데이터 데이터베이스는 기본적으로 sqlite을 사용합니다. 다만 추후 확장을 위해 보통은 postgresql로 사용합니다. 이 데이터베이스는 주기적으로 백업되어야하며, ..

데이터 엔지니어링/실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트

[6주차] DOCKER & K8S & DBT

DOCKER DOCKER는 컴퓨터 안의 컴퓨터라고 생각하시면 좋습니다. 기존의 가상환경을 구축하는 것보다, 더 경량화된 컨테이너를 구축하는 작업이라고 보시면 됩니다. 다만 운영체제의 사양이 어느정도 받쳐줘야 사용할 수 있습니다. 특히 AIRFLOW의 경우에는 DOCKER CONTAINER에 8기가 정도의 메모리를 줘야 에어플로우를 쓰는데 문제가 없습니다. DOCKER IMAGE 단순히 응용 프로그램 뿐만 아니라 그 프로그램이 필요로 하는 모든 다른 환경까지 포함한 소프트웨어 패키지 Docker Registry에 가면 다양한 Docker Image들을 찾아볼 수 있습니다. Docker Container Docker Image를 Docker Engine에서 실행한 것을 지칭 Docker Engine만 실행하..

데이터 엔지니어링/실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트

[5주차] Airflow + Redshift로 ELT 구현하기

이번 과제는 NPS를 구해서, 해당 NPS를 단 한번만 업데이트하는 ELT 구현입니다. NPS는 사용자의 서비스 추천정도를 물은 뒤에 0~10점으로 환산하여, 10점 혹은 9점을 준 고객의 비율에서 0~6점을 준 고객의 비율을 빼는 방법입니다. 간단하게 ELT를 구현하고 테스트 코드도 동시에 삽입해서 ELT 과정을 구현해보겠습니다. SQL 구현하기 저번 포스팅에서 만들어둔 nps 테이블을 활용하여 다음 SQL을 준비합니다. SELECT DATE(created_at) AS date, CAST(CAST(COUNT(CASE WHEN score IN (9,10) THEN 1 END) - COUNT(CASE WHEN score IN (0,1,2,3,4,5,6) THEN 1 END) as FLOAT) / COUNT..

데이터 엔지니어링/실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트

[5주차] Airflow BackFill(Incremental Update, execution_date)

BACKFILL 데이터파이프라인을 설계하고 운용할 때, 가끔 에러가 나거나, 이미 지난 날짜를 기준으로 데이터를 재처리 해야할 때가 있습니다. 이 때 백필은 재처리 작업을 의미합니다. 단어 의미 그대로 '매우는 작업'이라고 보시면 됩니다. Full Refrest를 한다면 backfill은 필요 없습니다. backfill은 일별 혹은 시간별 업데이트를 의미합니다. 마지막 업데이트 시간 기준 backfill을 하는 경우라면(데이터 웨어하우스 기록 시간 기준), execution_date을 이용한 backfill은 필요하지 않습니다. 데이터의 크기가 커질수록 backfill 기능을 구현해두는 것은 필수입니다. airflow는 이 backfill 작업을 굉장히 쉽게 만들지만, 데이터소스의 도움없인 불가능합니다...

데이터 엔지니어링/실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트

[5주차] OLTP/OLAP, MYSQL 데이터 복사하기

5주차에 배운 내용들을 포스팅합니다. 먼저 OLTP와 OLAP의 차이에 대해 간단히 알아보도록 하겠습니다. OLTP VS OLAP OLTP(ONLINE TRASACTION PROCESSING) 온라인 트랜잭션 프로세싱, 즉 온라인에서 일어나는 데이터 교환에 중점을 두는 작업입니다. 데이터 처리 속도가 보다 중요하며, 큰 데이터를 다루기에는 적합하지 않습니다. 우리가 흔히 알고 있는 MYSQL, POSTGRESQL 같은 데이터베이스들이 이 작업에 중점을 둔 DB입니다. OLAP(ONLINE ANALYTIC PROCESSING) 온라인 애널리틱 프로세싱, 즉 분석을 중점을 두는 작업입니다. 데이터 처리 속도는 조금 떨어지더라도, 큰 과거 데이터를 활용해서 분석처리에 사용합니다. AWS REDSHIFT, GO..

데이터 엔지니어링/실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트

[4주차] AIRFLOW Incremental Update 구현하기

Incremental Update의 경우에는 backfill 이슈가 발생하면서, 유지보수 비용이 full refresh에 비해 기하급수적으로 올라갑니다. 하지만, 데이터가 커질수록 full refresh의 경우에도 비용이 증가하면서, 오히려 Incremental Update가 나은 상황이 있을 수 있습니다. 따라서 해당 포스팅에서는 Incremental Update를 기초적으로 어떻게하는지 알아보겠습니다. 먼저 해당 포스팅에 대한 기본적인 정보는 앞선 포스팅을 참고해주세요. https://dataengineerstudy.tistory.com/161 [4주차] AIRFLOW 활용 데이터 적재(FULL REPRESH) FULL REFRESH 상황 가정 데이터 적재 FULL REFRESH란 테이블에 있는 모든..

데이터 엔지니어링/실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트

[4주차] AIRFLOW FULL REPRESH 예제(+ Incremental Update 맛보기)

이번 포스팅은 AIRFLOW의 FULL REPRESH를 연습해보기 위한 예제로 OPENWEATHER API를 활용합니다! https://openweathermap.org/api/one-call-api One Call API: weather data for any geographical coordinate - OpenWeatherMap Make just one API call and get all your essential weather data for a specific location with our new OpenWeather One Call API 2.5. Easy migration from the Dark Sky API. The One Call API 2.5 provides the followin..

데이터 엔지니어링/실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트

[4주차] AIRFLOW 활용 데이터 적재(FULL REPRESH)

FULL REFRESH 상황 가정 데이터 적재 FULL REFRESH란 테이블에 있는 모든 정보를 삭제하고, 다시 INSERT하는 방식으로 보통 데이터의 크기가 크지 않거나, 스케줄의 실행 시간을 보고 판단합니다. 보통 FULL REFRESH가 가지는 장점은, BACKFILL(AIRFLOW의 TASK 중 일부 혹은 전체 DAG의 오류를 복구시키는 작업) 과정이 굉장히 단순하고 유지보수에 들이는 자원이 굉장히 작다는 점입니다. 보통은 FULL REPRESH를 쓰는 것이 좋지만, 아래의 경우에는 INCREMENTAL UPDATE로 전환합니다. 1시간 주기로 돌아야하는 DAG의 RUNNING TIME이 30분 이상 소요될 경우 하루 주기로 돌아야하는 DAG의 RUNNING TIME이 반나절 이상 소요될 경우 ..

우상욱
'데이터 엔지니어링/실리콘밸리에서 날아온 데이터엔지니어링 스타터 키트' 카테고리의 글 목록