1. 요약 Spark은 2세대 빅데이터 처리 기술 자체 분산 파일 시스템을 갖고 있지 않음 DataFrame, SQL, ML, Streaming, Graph와 같은 다양한 기능 지원 빅데이터 처리 종합 선물세트 단순히 ETL만 할거라면, Hive 같은 것과 차이가 나지 않는다. 구조화된 데이터 처리라면 SQL을 사용 굳이 데이터프레임으로 readability를 떨어뜨릴 필요가 없다. spark SQL 엔진의 최적화 기능을 사용하기 위해서라도, 웬만하면 SQL로 작성 하지만 어떤 기능들은 SQL만으로도 불가능하니, 데이터프레임과 적절히 조합해서 사용 어떤 경우에는 UDF가 굉장히 유용하게 쓰일 수 있다. 주로 기능적인 부분에 대해 학습 Spark 고급 강의에서는 최적화와 관계된 부분의 심화학습 예정 SPA..
AWS EMR EMR(Elastic MapReduce) 위에서 실행하는 것이 일반적 EMR이란? AWS의 Hadoop 서비스 (On-demand Hadoop) Hadoop(YARN), Spark, Hive, Notebook 등등이 설치되어 제공되는 서비스 기본적으로는 YARN 위에서 돌림 Spark은 Zepplin과 같은 Spark 전용 Notebook과 같이 실행되는 경우가 많다.. EC2 서버들을 worker node로 사용하고(이중 하나가 마스터 노드) S3를 HDFS로 사용 AWS 내의 다른 서비스들과의 연동이 쉬움(Kinesis, DynamoDB, Redshift, ...) Spark on EMR 실행 및 사용 과정 AWS의 EMR(Elastic MapReduce - 하둡) 클러스터 생성 EMR..
1. SPARK ML 소개 머신러닝 관련 다양한 알고리즘, 유틸리티로 구성된 라이브러리 Classification, Regression, Clustering, Collaborative Filtering, Dimensionality Reduction 아직 딥러닝 지원은 미약 https://spark.apache.org/docs/latest/ml-classification-regression.html 여기에는 RDD 기반과 데이터프레임 기반의 두 버전이 존재 spark.mlib vs. spark.ml spark.mlib가 RDD 기반이고 spark.ml은 데이터프레임 기반 spark.mlib는 RDD 위에서 동작하는 이전 라이브러리로 더 이상 업데이트가 안됨 항상 spark.ml을 사용할 것! import ..
HDFS 데이터나 입력 파티션을 처리형태에 맞춰 최적화할 수 있다면 시간을 단축하고 리소스를 덜 사용할 수 있다. Bucketing과 File System Partitioning 소개 둘다 Hive 메타스토어의 사용이 필요 : saveAsTable 데이터 저장을 이후 반복처리에 최적화된 방법으로 하는 것 Bucketing 먼저 Aggregation이나 Window 함수나 JOIN에서 많이 사용되는 컬럼이 있는지? 셔플링을 최소화하려는 목적 나중에 이 데이터들이 Partition으로 메모리에 로딩이 됐을 때, 셔플링을 최소화하는 상태로 Operation이 진행될 수 있도록 하는 것 특히 조인을 할 때, 조인 대상의 Partition 수가 맞지 않는다고 할 때, 또 셔플링이 발생하기 때문에 Bucketing..
WordCount 코드 spark = SparkSession \ .builder \ .master("local[3]") \ .appName("SparkSchemaDemo") .config("spark.sql.adaptive.enabled", False) \ .config("spark.sql.shuffle.partitions", 3) \ .getOrCreate() df = spark.read.text("shakespeare.txt") df_count = df.select(explode(split(df.value, " ")).alias("word")).groupBy("word").count() df_count.show() 이 코드는 몇개의 job을 만들어낼까? 1개 show()가 없다면 이 코드는 몇 개의 ..
Spark은 개발자가 만든 코드를 어떻게 변환하여 실행하는가? 다음 데이터 프레임 연산을 자세히 보자 spark.read.option("header", True).csv("test.csv"). \ where("gender" "F"). \ select("name", "gender"). \ groupby("gender"). \ count(). \ show() WHERE, SELECT는 셔플링 없이 파티션에서 독립적으로 작업을 수행할 수 있다. 그런데 GROUPBY가 호출되는 순간 GROUPBY 키에 맞게, 같은 값을 갖는 레코드들이 같은 파티션으로 재정렬이 되어야하기 때문에, 셔플링이 발생한다. COUNT 자체는 해당 파티션에서 병렬적으로 작업이 가능하다. SHOW는 일부 레코드를 DRIVER로 가져온다. ..
1. 파일의 종류 데이터는 디스크에 파일로 저장됨 : 일에 맞게 최적화 필요 입력이 되는 데이터는 대부분 HDFS에서 오게 된다. 이는 물론 클라우드 스토리지를 포함 이 HDFS에서 내가 처리하려는 데이터가 어떤 포맷으로 저장되어있느냐가, 성능에 굉장한 영향을 미친다. Unstructured : Text Semi-structured : JSON, XML, CSV 사람의 눈으로 읽을 수 있다. Structued : PARQUET, AVRO, ORC, SequenceFile PARQUET이 SPARK에서 제일 많이 쓰인다. 2. Spark의 주요 파일 타입 컬럼 스토리지 : 열별로 저장 압축 가능 : 전부 압축 가능 Splitable HDFS에서 데이터가 저장이 될 때는 데이터 블록 단위로 나뉜다. 그 때 ..
유닛 테스트란? 코드 상의 특정 기능(보통 메소드의 형태)을 테스트하기 위해 작성된 코드 보통 정해진 입력을 주고 예상된 출력이 나오는지 형태로 테스트 CI/CD를 사용하려면 전체 코드의 테스트 커버리지가 굉장히 중요해짐 각 언어별로 정해진 테스트 프레임웍을 사용하는 것이 일반적 JUnit for Java NUnit for .NET unittest for Python unittest를 사용해볼 예정 좋은 개발자가 되기 위해선, 내 코드에 어떤 문제가 있을지 테스트를 작성하는 습관을 가져야한다. 유닛 테스트 실습 1 (1) Spark Session Build # 코드를 작성하기 전에 test 코드를 먼저 만들어두고, 코드를 만들어라 # TDD(TEST DRIVEN DEVELOPMENT) from pyspa..
Spark 데이터베이스와 테이블(1) 카탈로그 : 테이블과 뷰에 관한 메타 데이터 관리 기본으로 메모리 기반 카탈로그 제공 - 세션이 끝나면 사라짐 Hive와 호환되는 카탈로그 제공 - Persistent 테이블 관리 방식 데이터들은 데이터베이스라 부르는 폴더와 같은 구조로 관리(2단계) Spark 데이터베이스와 테이블(2) 메모리 기반 테이블/뷰: 임시 테이블로 앞서 사용해봤음 스토리지 기반 테이블 기본적으로 HDFS와 Parquet 포맷을 사용 Hive와 호환되는 메타스토어 사용 두 종류의 테이블이 존재(Hive와 동일한 개념) Managed Table Spark이 실제 데이터와 메타 데이터 모두 관리 Drop 테이블 하면 실제 HDFS에서 삭제 Unmanaged (External) Table Spa..
사용자별로 처음 채널과 마지막 채널 알아내기 user_session_channel, session_timestamp 테이블 사용 다음 형태의 결과를 만들어보기 사용자 251번의 시간 순으로 봤을 때 첫번째 채널과 마지막 채널은 무엇인가? 노가다를 하자면 아래 쿼리를 실행해서 처음과 마지막 채널을 보면 된다. SELECT ts, channel FROM user_session_channel usc JOIN session_timestamp st ON usc.sessionid = st.sessionid WHERE userid = 251 ORDER BY 1 ROW_NUMBER를 이용해서 해보자 ROW_NUMBER () OVER (PARTITION BY field1 ORDER BY field2) nn vs. FIR..