데이터엔지니어

데이터 엔지니어링/Spark

27. SPARK ML 소개

1. SPARK ML 소개 머신러닝 관련 다양한 알고리즘, 유틸리티로 구성된 라이브러리 Classification, Regression, Clustering, Collaborative Filtering, Dimensionality Reduction 아직 딥러닝 지원은 미약 https://spark.apache.org/docs/latest/ml-classification-regression.html 여기에는 RDD 기반과 데이터프레임 기반의 두 버전이 존재 spark.mlib vs. spark.ml spark.mlib가 RDD 기반이고 spark.ml은 데이터프레임 기반 spark.mlib는 RDD 위에서 동작하는 이전 라이브러리로 더 이상 업데이트가 안됨 항상 spark.ml을 사용할 것! import ..

데이터 엔지니어링/Spark

26. Spark 내부 동작(Bucketing과 Partitioning)

HDFS 데이터나 입력 파티션을 처리형태에 맞춰 최적화할 수 있다면 시간을 단축하고 리소스를 덜 사용할 수 있다. Bucketing과 File System Partitioning 소개 둘다 Hive 메타스토어의 사용이 필요 : saveAsTable 데이터 저장을 이후 반복처리에 최적화된 방법으로 하는 것 Bucketing 먼저 Aggregation이나 Window 함수나 JOIN에서 많이 사용되는 컬럼이 있는지? 셔플링을 최소화하려는 목적 나중에 이 데이터들이 Partition으로 메모리에 로딩이 됐을 때, 셔플링을 최소화하는 상태로 Operation이 진행될 수 있도록 하는 것 특히 조인을 할 때, 조인 대상의 Partition 수가 맞지 않는다고 할 때, 또 셔플링이 발생하기 때문에 Bucketing..

데이터 엔지니어링/Spark

25. Spark 내부 동작(Execution Plan 실습, Spark Web UI)

WordCount 코드 spark = SparkSession \ .builder \ .master("local[3]") \ .appName("SparkSchemaDemo") .config("spark.sql.adaptive.enabled", False) \ .config("spark.sql.shuffle.partitions", 3) \ .getOrCreate() df = spark.read.text("shakespeare.txt") df_count = df.select(explode(split(df.value, " ")).alias("word")).groupBy("word").count() df_count.show() 이 코드는 몇개의 job을 만들어낼까? 1개 show()가 없다면 이 코드는 몇 개의 ..

데이터 엔지니어링/Spark

24. Spark 내부 동작(Execution Plan)

Spark은 개발자가 만든 코드를 어떻게 변환하여 실행하는가? 다음 데이터 프레임 연산을 자세히 보자 spark.read.option("header", True).csv("test.csv"). \ where("gender" "F"). \ select("name", "gender"). \ groupby("gender"). \ count(). \ show() WHERE, SELECT는 셔플링 없이 파티션에서 독립적으로 작업을 수행할 수 있다. 그런데 GROUPBY가 호출되는 순간 GROUPBY 키에 맞게, 같은 값을 갖는 레코드들이 같은 파티션으로 재정렬이 되어야하기 때문에, 셔플링이 발생한다. COUNT 자체는 해당 파티션에서 병렬적으로 작업이 가능하다. SHOW는 일부 레코드를 DRIVER로 가져온다. ..

데이터 엔지니어링/Spark

23. Spark 내부 동작(Spark 파일 포맷, Partition 저장 형태, Schema가 있는 데이터의 Merge)

1. 파일의 종류 데이터는 디스크에 파일로 저장됨 : 일에 맞게 최적화 필요 입력이 되는 데이터는 대부분 HDFS에서 오게 된다. 이는 물론 클라우드 스토리지를 포함 이 HDFS에서 내가 처리하려는 데이터가 어떤 포맷으로 저장되어있느냐가, 성능에 굉장한 영향을 미친다. Unstructured : Text Semi-structured : JSON, XML, CSV 사람의 눈으로 읽을 수 있다. Structued : PARQUET, AVRO, ORC, SequenceFile PARQUET이 SPARK에서 제일 많이 쓰인다. 2. Spark의 주요 파일 타입 컬럼 스토리지 : 열별로 저장 압축 가능 : 전부 압축 가능 Splitable HDFS에서 데이터가 저장이 될 때는 데이터 블록 단위로 나뉜다. 그 때 ..

데이터 엔지니어링/Spark

22. Unit Test

유닛 테스트란? 코드 상의 특정 기능(보통 메소드의 형태)을 테스트하기 위해 작성된 코드 보통 정해진 입력을 주고 예상된 출력이 나오는지 형태로 테스트 CI/CD를 사용하려면 전체 코드의 테스트 커버리지가 굉장히 중요해짐 각 언어별로 정해진 테스트 프레임웍을 사용하는 것이 일반적 JUnit for Java NUnit for .NET unittest for Python unittest를 사용해볼 예정 좋은 개발자가 되기 위해선, 내 코드에 어떤 문제가 있을지 테스트를 작성하는 습관을 가져야한다. 유닛 테스트 실습 1 (1) Spark Session Build # 코드를 작성하기 전에 test 코드를 먼저 만들어두고, 코드를 만들어라 # TDD(TEST DRIVEN DEVELOPMENT) from pyspa..

데이터 엔지니어링/Spark

21. Hive 메타스토어 사용하기

Spark 데이터베이스와 테이블(1) 카탈로그 : 테이블과 뷰에 관한 메타 데이터 관리 기본으로 메모리 기반 카탈로그 제공 - 세션이 끝나면 사라짐 Hive와 호환되는 카탈로그 제공 - Persistent 테이블 관리 방식 데이터들은 데이터베이스라 부르는 폴더와 같은 구조로 관리(2단계) Spark 데이터베이스와 테이블(2) 메모리 기반 테이블/뷰: 임시 테이블로 앞서 사용해봤음 스토리지 기반 테이블 기본적으로 HDFS와 Parquet 포맷을 사용 Hive와 호환되는 메타스토어 사용 두 종류의 테이블이 존재(Hive와 동일한 개념) Managed Table Spark이 실제 데이터와 메타 데이터 모두 관리 Drop 테이블 하면 실제 HDFS에서 삭제 Unmanaged (External) Table Spa..

데이터 엔지니어링/Spark

20. SPARK SQL(Windowing)

사용자별로 처음 채널과 마지막 채널 알아내기 user_session_channel, session_timestamp 테이블 사용 다음 형태의 결과를 만들어보기 사용자 251번의 시간 순으로 봤을 때 첫번째 채널과 마지막 채널은 무엇인가? 노가다를 하자면 아래 쿼리를 실행해서 처음과 마지막 채널을 보면 된다. SELECT ts, channel FROM user_session_channel usc JOIN session_timestamp st ON usc.sessionid = st.sessionid WHERE userid = 251 ORDER BY 1 ROW_NUMBER를 이용해서 해보자 ROW_NUMBER () OVER (PARTITION BY field1 ORDER BY field2) nn vs. FIR..

데이터 엔지니어링/Spark

19. SPARK SQL(Redshift Connect, Grouping)

월별 채널별 매출과 방문자 정보 계산하기 user_session_channel, session_timestamp, session_transaction 테이블 사용 다음 형태의 결과를 만들어보기 1. Spark Session Build from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL #1") \ .config("spark.jars", "/usr/local/lib/python3.8/dist-packages/pyspark/jars/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar") \ .getOrCreate() 2. Redshift Connect # Redsh..

데이터 엔지니어링/Spark

18. SparkSQL(Redshift Connect, Ranking)

매출 사용자 10명 알아내기(RANKING) user_session_channel, session_timestamp, session_transaction 테이블 사용 다음 형태의 결과를 만들어보기 매출은 refund 포함 3개의 테이블을 각기 데이터프레임으로 로딩 데이터프레임별로 테이블 이름 지정 Spark SQL로 처리 먼저 조인 방식 결정 조인키 조인방식(INNER, LEFT, RIGHT, FULL) 다음으로 간단한 지표부터 계산 1. Redshift 커넥트 # Redshift와 연결해서 DataFrame으로 로딩하기 url = "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=아이디&pas..

데이터 엔지니어링/Spark

17. SparkSQL(JOIN - DataFrame, SQL)

실습 문제들 1. JOIN 실습 2. 매출 사용자 10명 알아내기(RANKING) 3. 월별 채널별 매출과 방문자 정보 계산하기(Grouping) 4. 사용자별로 처음 채널과 마지막 채널 알아내기(Windowing) 실습 사용 테이블 3개 설명 - 사용자 ID, 세션 ID 사용자 ID : 보통 웹서비스에서는 등록된 사용자마다 유일한 ID를 부여 -> 사용자 ID 세션 ID : 사용자가 외부 링크(보통 광고)를 타고 오거나 직접 방문해서 올 경우 세션을 생성 즉 하나의 사용자 ID는 여러 개의 세션 ID를 가질 수 있음 보통 세션의 경우 세션을 만들어낸 소스를 채널이란 이름으로 기록해둠 마케팅 관련 기여도 분석을 위함 또한 세션이 생긴 시간도 기록 이 정보를 기반으로 다양한 데이터 분석과 지표 설정이 가능..

데이터 엔지니어링/Spark

16. SparkSQL(udf 활용 dataframe 가공 및 spark sql 활용 집계)

UDF 실습 앞선 포스팅의 UDF를 실습 하나의 레코드로부터 다수의 레코드 만들어내기 Order 데이터의 items 필드에서 다수의 Order item 레코드를 만들기 (1) Spark Session 생성 from pyspark.sql import SparkSession import findspark findspark.init() spark = SparkSession \ .builder \ .appName("Python Spark UDF") \ .getOrCreate() (2) Dataframe/SQL에 UDF 사용해보기 #1 columns = ["Seqno","Name"] data = [("1", "john jones"), ("2", "tracey smith"), ("3", "amy sanders")]..

우상욱
'데이터엔지니어' 태그의 글 목록 (3 Page)