데이터엔지니어

데이터 엔지니어링/Spark

15. Spark SQL(UDF, Pandas UDF Scalar)

UDF(User Defined function) 사용해보기 데이터프레임의 경우 .withColumn 함수와 같이 사용하는 것이 일반적 Spark SQL에서도 사용 가능함 Aggregation용 UDAF(User Defined Aggregation Function)도 존재 GROUP BY에서 사용되는 SUM, AVG와 같은 함수를 만드는 것 PySpark에서 지원되지 않음. Scalar/Java를 사용해야함 보통 UDF는 pandas UDF 스칼라함수를 사용(하나씩 처리 -> 배치 처리 가능) (1) UDF - DataFrame 사용해보기 #1 주어진 데이터를 대문자로 만드는 UDF import pyspark.sql.functions as F from pyspark.sql.types import * upp..

데이터 엔지니어링/Spark

14. Spark SQL(Aggregation-JOIN, Shuffle JOIN, Bucket JOIN, BroadCast JOIN)

SQL Aggregation 함수 DataFrame이 아닌 SQL로 작성하는 것을 추천 Group By (SUM, MIN, MAX, AVG, COUNT) Window(ROW_NUMBER, FIRST_VALUE, LAST_VALUE) Rank JOIN SQL 조인은 두 개 혹은 그 이상의 테이블들을 공통 필드를 가지고 머지 스타 스키마로 구성된 테이블들로 분산되어 있던 정보를 통합하는데 사용 왼쪽 테이블을 LEFT라고 하고 오른쪽 테이블을 RIGHT이라고 하면 JOIN의 결과는 방식에 따라 양쪽의 필드를 모두 가진 새로운 테이블을 생성 조인의 방식에 따라 다음 두가지가 달라짐 어떤 레코드들이 선택되는지? 어떤 필드들이 채워지는지? 다양한 종류의 조인 (1) JOIN 실습 - 예제 데이터 준비 (2) JOI..

데이터 엔지니어링/Spark

13. Spark SQL 소개(사용법, 외부 데이터베이스 연결)

SQL의 중요성 데이터 분야에서 일하고자하면 반드시 익혀야할 기본 기술 구조화된 데이터를 다루는 한 SQL은 데이터 규모와 상관없이 쓰임 모든 대용량 데이터 웨어하우스는 SQL 기반 Redshift, Snowflake, BigQuery Hive/Presto Spark도 예외는 아님 Spark SQL이 지원됨 요즘 마케터 직종에 있는 분들도 SQL을 배우는 분들이 많다. UDEMY는 신입들은 ONBOADING에서 전부 SQL을 배웠다. 하지만 많은 사람들이 테이블을 만들 수 있으면, DATA DISCOVERY 문제가 발생한다. 현업에서 데이터 직군이 아님에도 불구하고, 데이터 분석을 하는 사람이 늘어난다. 이 사람들은 CITIZEN DATA ANALYST, CITIZEN DATA SCIENTIST라고도 하..

일기

Spark 공부하면서 쓴 일기

배운 것 Spark DataFrame 활용 데이터 가공 및 처리 정형데이터 처리 쪽은 SparkSQL이 훨씬 편하고 좋은 것 같음 그런데 정형데이터 처리할 때 조금 난해한 문제들이 있음 예를 들어 데이터 하나가 1정규화 안된 채로 있어서, 이걸 1정규화 해줘야 하는 경우 split, explode 같은 거 활용(pandas랑 크게 다르지 않음) 비정형 데이터 처리 regex 핵꿀 데이터 저장 데이터 저장 방식이 좀 흥미로웠는데, 애초에 Spark은 partition 단위로 가공하기 때문에 파일을 저장해도 애초에 폴더로 저장해서, 그 안에 part로 저장됨 일단 지금은 standalone이라 1개씩 저장되는데, 기본적으로 큰 데이터 가공하면 part가 여러 개씩 나올듯(예를 들면 data.csv로 저장하면..

데이터 엔지니어링/Spark

12. Spark DataFrame 실습(Redshift 연결, jdbc, join)

실습 5. Redshift 연결해보기 MAU(Monthly Active User) 계산해보기 두 개의 테이블을 Redshift에서 Spark로 로드 JDBC 연결 실습 DataFrame과 SparkSQL을 사용해서 조인 DataFrame JOIN left_DF.join(right_DF, join condition, join type) join type : "inner", "left", "right", "outer", "semi", "anti" * redshift jdbc jar 파일 넣는 경로(윈도우) C:\Spark\spark-3.4.1-bin-hadoop3\jars https://docs.aws.amazon.com/ko_kr/redshift/latest/mgmt/jdbc20-download-drive..

데이터 엔지니어링/Spark

11. Spark DataFrame 실습(trim, split, explode, overwrite, sort)

실습 4. stackoverflow 서베이 기반 인기 언어 찾기 stackoverflow CSV 파일에서 다음 두 필드는 ;를 구분자로 프로그래밍 언어를 구분 LanguageHaveWorkedWith LanguageWantToWorkWith 이를 별개 레코드로 분리하여 가장 많이 사용되는 언어 top50와 가장 많이 쓰고 싶은 언어 top 50 계산해보기 (1) Spark Session Build from pyspark.sql import SparkSession # .config("spark.jars", "/usr/local/lib/python3.7/dist-packages/pyspark/jars/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar") \ spark = SparkSe..

데이터 엔지니어링/Spark

10. Spark DataFrame 실습(텍스트 파싱 -> 구조화 데이터 변환, 윈도우 Hadoop.dll 관련 에러해결법)

실습 3. 텍스트를 파싱해서 구조화된 데이터로 변환하기 Regex를 이용해서 아래와 같이 변환하는 것이 목표 입력 "On 2021-01-04 the cost per ton from 85001 to 85002 is $28.32 at ABC Hauling" regex 패턴 : "On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)" \S (non-whitespace character), \d(numeric character) 출력 (1) Spark Session build from pyspark.sql import SparkSession from pyspark import SparkConf conf = SparkConf() conf.set("spar..

데이터 엔지니어링/Spark

9. Spark DataFrame 실습(Alias, catalog)

실습 2. 헤더가 없는 CSV 파일 처리하기 입력 데이터 : 헤더 없는 CSV 파일 데이터에 스키마 지정하기 cust_id, item_id, amount_spent를 데이터 컬럼으로 추가하기(모두 숫자) cust_id를 기준으로 amount_spent의 합을 계산하기 1) schema 생성 from pyspark.sql import SparkSession from pyspark.sql import functions as func from pyspark.sql.types import StructType, StructField, StringType, FloatType schema = StructType([ \ StructField("cust_id", StringType(), True), \ StructFie..

데이터 엔지니어링/Spark

8. Spark DataFrame 실습(SparkSession, conf, Schema, Filter, select, SparkSQL, sql.types, 컬럼지칭 방식)

실습 1. 헤더가 없는 CSV 파일 처리하기 입력 데이터 : 헤더가 없는 CSV 파일 데이터에 스키마 지정하기 SparkConf 사용해보기 measure_type 값이 TMIN인 레코드 대상으로 stationId별 최소 온도 찾기 1) Spark로 CSV 로드(SparkSession, conf, Schema 지정) from pyspark.sql import SparkSession from pyspark import SparkConf # SparkConf로 SparkSession의 환경 설정 conf = SparkConf() # application의 이름 conf.set("spark.app.name", "PySpark DataFrame #1") # master 설정, local 모든 스레드를 가져오겠다 ..

데이터 엔지니어링/Spark

6. Spark(개발환경 옵션, Local Standalone, 활용 Demo)

Spark 개발환경 옵션 Local Standalone Spark + Spark Shell Python IDE - Pycharm, Visual Studio Databricks Cloud - 커뮤니티 에디션을 무료로 사용 다른 노트북 - 주피터 노트북, 구글 Colab, 아나콘다 등 Local Standalone Spark Spark Cluster Manager로 local[n] 지정 master를 local[n]으로 지정 master는 클러스터 매니저를 지정하는데 사용 주로 개발이나 간단한 테스트 용도 하나의 JVM에서 모든 프로세스를 실행 하나의 Driver와 하나의 Executor가 실행됨 1 + 쓰레드가 Executor안에서 실행됨 Executor안에 생성되는 쓰레드 수 local : 하나의 쓰레드..

데이터 엔지니어링

[LV3/SQL] 카테고리별 도서 판매량 집계하기

https://school.programmers.co.kr/learn/courses/30/lessons/144855 프로그래머스 코드 중심의 개발자 채용. 스택 기반의 포지션 매칭. 프로그래머스의 개발자 맞춤형 프로필을 등록하고, 나와 기술 궁합이 잘 맞는 기업들을 매칭 받으세요. programmers.co.kr [풀이] -- 코드를 입력하세요 SELECT CATEGORY, SUM(BS.SALES) AS TOTAL_SALES FROM BOOK AS B INNER JOIN BOOK_SALES AS BS ON B.BOOK_ID = BS.BOOK_ID AND BS.SALES_DATE LIKE '2022-01%' GROUP BY B.CATEGORY ORDER BY CATEGORY 다시 한번 등장한 JOIN 전에..

코딩테스트 스터디/프로그래머스

[LV2/DFS] 타겟 넘버

https://school.programmers.co.kr/learn/courses/30/lessons/43165 프로그래머스 코드 중심의 개발자 채용. 스택 기반의 포지션 매칭. 프로그래머스의 개발자 맞춤형 프로필을 등록하고, 나와 기술 궁합이 잘 맞는 기업들을 매칭 받으세요. programmers.co.kr [풀이] def solution(numbers, target): global answer answer = 0 def DFS(L, sum): global answer if L == len(numbers): if sum == target: answer += 1 else: DFS(L + 1, sum + numbers[L]) DFS(L + 1, sum - numbers[L]) DFS(0, 0) retur..

우상욱
'데이터엔지니어' 태그의 글 목록 (4 Page)