데이터 엔지니어링/Spark

데이터 엔지니어링/Spark

32. Spark 마무리, 다음 스텝

1. 요약 Spark은 2세대 빅데이터 처리 기술 자체 분산 파일 시스템을 갖고 있지 않음 DataFrame, SQL, ML, Streaming, Graph와 같은 다양한 기능 지원 빅데이터 처리 종합 선물세트 단순히 ETL만 할거라면, Hive 같은 것과 차이가 나지 않는다. 구조화된 데이터 처리라면 SQL을 사용 굳이 데이터프레임으로 readability를 떨어뜨릴 필요가 없다. spark SQL 엔진의 최적화 기능을 사용하기 위해서라도, 웬만하면 SQL로 작성 하지만 어떤 기능들은 SQL만으로도 불가능하니, 데이터프레임과 적절히 조합해서 사용 어떤 경우에는 UDF가 굉장히 유용하게 쓰일 수 있다. 주로 기능적인 부분에 대해 학습 Spark 고급 강의에서는 최적화와 관계된 부분의 심화학습 예정 SPA..

데이터 엔지니어링/Spark

31. Spark EMR, Zepplin

AWS EMR EMR(Elastic MapReduce) 위에서 실행하는 것이 일반적 EMR이란? AWS의 Hadoop 서비스 (On-demand Hadoop) Hadoop(YARN), Spark, Hive, Notebook 등등이 설치되어 제공되는 서비스 기본적으로는 YARN 위에서 돌림 Spark은 Zepplin과 같은 Spark 전용 Notebook과 같이 실행되는 경우가 많다.. EC2 서버들을 worker node로 사용하고(이중 하나가 마스터 노드) S3를 HDFS로 사용 AWS 내의 다른 서비스들과의 연동이 쉬움(Kinesis, DynamoDB, Redshift, ...) Spark on EMR 실행 및 사용 과정 AWS의 EMR(Elastic MapReduce - 하둡) 클러스터 생성 EMR..

데이터 엔지니어링/Spark

30. SparkML Pipeline

모델 빌딩과 관련된 흔한 문제들 트레이닝 셋의 관리가 안됨 모델 훈련 방법이 기록이 안됨 어떤 트레이닝 셋을 사용했는지? 어떤 피쳐들을 사용했는지? 하이퍼 파라미터는 무엇을 사용했는지? 모델 훈련에 많은 시간 소요 모델 훈련이 자동화가 안된 경우 매번 각 스텝들을 노트북 등에서 일일히 수행 에러가 발생할 여지가 많음 (특정 스텝을 까먹거나 조금 다른 방식 적용) ML Pipeline의 등장 모델 훈련 방법 기록, 모델 훈련 시간 소요등의 문제를 해결하기 위해 등장 자동화를 통해 에러 소지를 줄이고 반복을 빠르게 가능하게 해줌 Spark ML 관련 개념 정리 ML 파이프라인이란? 데이터 과학자가 머신러닝 개발과 테스트를 쉽게 해주는 기능(데이터 프레임 기반) 머신러닝 알고리즘에 관계없이 일관된 형태의 AP..

데이터 엔지니어링/Spark

29. SparkML(Classification, 타이타닉 생존 예측 모델)

1. Spark Session Build from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Titanic Binary Classification example") \ .getOrCreate() 2. Spark read csv data = spark.read.csv('./data/titanic.csv', header=True, inferSchema=True) 3. 데이터프레임 검정통계량 data.select(['*']).describe().show() 4. 데이터 클린업 PassengerID, Name, Ticket, Embarked는 사용하지 않을 예정 (아무 의미가 없음). Cabin도 비어있는 값이 너무 많..

데이터 엔지니어링/Spark

28. Spark ML(Regression, 보스턴 주택값 예측 모델)

1. Spark Session Build from pyspark.sql import SparkSession spark = SparkSession / .builder / .appName("Boston Housing Linear Regression example") / .getOrCreate() 2. Spark read csv data = spark.read.csv('./data/boston_housing.csv', header=True, inferSchema=True) 3. 피처벡터 생성 # VectorAssembler를 import해서 # input이 되는 컬럼 이름은 feature_columns이고 # outputCol을 만들어라, 그것의 이름은 features로 줘라 from pyspark.ml.fe..

데이터 엔지니어링/Spark

27. SPARK ML 소개

1. SPARK ML 소개 머신러닝 관련 다양한 알고리즘, 유틸리티로 구성된 라이브러리 Classification, Regression, Clustering, Collaborative Filtering, Dimensionality Reduction 아직 딥러닝 지원은 미약 https://spark.apache.org/docs/latest/ml-classification-regression.html 여기에는 RDD 기반과 데이터프레임 기반의 두 버전이 존재 spark.mlib vs. spark.ml spark.mlib가 RDD 기반이고 spark.ml은 데이터프레임 기반 spark.mlib는 RDD 위에서 동작하는 이전 라이브러리로 더 이상 업데이트가 안됨 항상 spark.ml을 사용할 것! import ..

데이터 엔지니어링/Spark

26. Spark 내부 동작(Bucketing과 Partitioning)

HDFS 데이터나 입력 파티션을 처리형태에 맞춰 최적화할 수 있다면 시간을 단축하고 리소스를 덜 사용할 수 있다. Bucketing과 File System Partitioning 소개 둘다 Hive 메타스토어의 사용이 필요 : saveAsTable 데이터 저장을 이후 반복처리에 최적화된 방법으로 하는 것 Bucketing 먼저 Aggregation이나 Window 함수나 JOIN에서 많이 사용되는 컬럼이 있는지? 셔플링을 최소화하려는 목적 나중에 이 데이터들이 Partition으로 메모리에 로딩이 됐을 때, 셔플링을 최소화하는 상태로 Operation이 진행될 수 있도록 하는 것 특히 조인을 할 때, 조인 대상의 Partition 수가 맞지 않는다고 할 때, 또 셔플링이 발생하기 때문에 Bucketing..

데이터 엔지니어링/Spark

25. Spark 내부 동작(Execution Plan 실습, Spark Web UI)

WordCount 코드 spark = SparkSession \ .builder \ .master("local[3]") \ .appName("SparkSchemaDemo") .config("spark.sql.adaptive.enabled", False) \ .config("spark.sql.shuffle.partitions", 3) \ .getOrCreate() df = spark.read.text("shakespeare.txt") df_count = df.select(explode(split(df.value, " ")).alias("word")).groupBy("word").count() df_count.show() 이 코드는 몇개의 job을 만들어낼까? 1개 show()가 없다면 이 코드는 몇 개의 ..

데이터 엔지니어링/Spark

24. Spark 내부 동작(Execution Plan)

Spark은 개발자가 만든 코드를 어떻게 변환하여 실행하는가? 다음 데이터 프레임 연산을 자세히 보자 spark.read.option("header", True).csv("test.csv"). \ where("gender" "F"). \ select("name", "gender"). \ groupby("gender"). \ count(). \ show() WHERE, SELECT는 셔플링 없이 파티션에서 독립적으로 작업을 수행할 수 있다. 그런데 GROUPBY가 호출되는 순간 GROUPBY 키에 맞게, 같은 값을 갖는 레코드들이 같은 파티션으로 재정렬이 되어야하기 때문에, 셔플링이 발생한다. COUNT 자체는 해당 파티션에서 병렬적으로 작업이 가능하다. SHOW는 일부 레코드를 DRIVER로 가져온다. ..

데이터 엔지니어링/Spark

23. Spark 내부 동작(Spark 파일 포맷, Partition 저장 형태, Schema가 있는 데이터의 Merge)

1. 파일의 종류 데이터는 디스크에 파일로 저장됨 : 일에 맞게 최적화 필요 입력이 되는 데이터는 대부분 HDFS에서 오게 된다. 이는 물론 클라우드 스토리지를 포함 이 HDFS에서 내가 처리하려는 데이터가 어떤 포맷으로 저장되어있느냐가, 성능에 굉장한 영향을 미친다. Unstructured : Text Semi-structured : JSON, XML, CSV 사람의 눈으로 읽을 수 있다. Structued : PARQUET, AVRO, ORC, SequenceFile PARQUET이 SPARK에서 제일 많이 쓰인다. 2. Spark의 주요 파일 타입 컬럼 스토리지 : 열별로 저장 압축 가능 : 전부 압축 가능 Splitable HDFS에서 데이터가 저장이 될 때는 데이터 블록 단위로 나뉜다. 그 때 ..

데이터 엔지니어링/Spark

22. Unit Test

유닛 테스트란? 코드 상의 특정 기능(보통 메소드의 형태)을 테스트하기 위해 작성된 코드 보통 정해진 입력을 주고 예상된 출력이 나오는지 형태로 테스트 CI/CD를 사용하려면 전체 코드의 테스트 커버리지가 굉장히 중요해짐 각 언어별로 정해진 테스트 프레임웍을 사용하는 것이 일반적 JUnit for Java NUnit for .NET unittest for Python unittest를 사용해볼 예정 좋은 개발자가 되기 위해선, 내 코드에 어떤 문제가 있을지 테스트를 작성하는 습관을 가져야한다. 유닛 테스트 실습 1 (1) Spark Session Build # 코드를 작성하기 전에 test 코드를 먼저 만들어두고, 코드를 만들어라 # TDD(TEST DRIVEN DEVELOPMENT) from pyspa..

데이터 엔지니어링/Spark

21. Hive 메타스토어 사용하기

Spark 데이터베이스와 테이블(1) 카탈로그 : 테이블과 뷰에 관한 메타 데이터 관리 기본으로 메모리 기반 카탈로그 제공 - 세션이 끝나면 사라짐 Hive와 호환되는 카탈로그 제공 - Persistent 테이블 관리 방식 데이터들은 데이터베이스라 부르는 폴더와 같은 구조로 관리(2단계) Spark 데이터베이스와 테이블(2) 메모리 기반 테이블/뷰: 임시 테이블로 앞서 사용해봤음 스토리지 기반 테이블 기본적으로 HDFS와 Parquet 포맷을 사용 Hive와 호환되는 메타스토어 사용 두 종류의 테이블이 존재(Hive와 동일한 개념) Managed Table Spark이 실제 데이터와 메타 데이터 모두 관리 Drop 테이블 하면 실제 HDFS에서 삭제 Unmanaged (External) Table Spa..

우상욱
'데이터 엔지니어링/Spark' 카테고리의 글 목록