Spark 개발환경 옵션
- Local Standalone Spark + Spark Shell
- Python IDE - Pycharm, Visual Studio
- Databricks Cloud - 커뮤니티 에디션을 무료로 사용
- 다른 노트북 - 주피터 노트북, 구글 Colab, 아나콘다 등
Local Standalone Spark
- Spark Cluster Manager로 local[n] 지정
- master를 local[n]으로 지정
- master는 클러스터 매니저를 지정하는데 사용
- 주로 개발이나 간단한 테스트 용도
- 하나의 JVM에서 모든 프로세스를 실행
- 하나의 Driver와 하나의 Executor가 실행됨
- 1 + 쓰레드가 Executor안에서 실행됨
- Executor안에 생성되는 쓰레드 수
- local : 하나의 쓰레드만 생성
- local[*] : 컴퓨터의 CPU 수만큼 쓰레드를 생성
구글 Colab에서 Spark 사용
- PySpark + Py4J를 설치
- 구글 Colab 가상서버 위에 로컬 모드 Spark을 실행
- 개발 목적으로는 충분하지만, 큰 데이터 처리는 불가
- Spark Web UI는 기본적으로는 접근 불가
- 해당 포트 넘버가 막혀있음
- ngrok을 통해 억지로 열 수는 있음
-
- Py4J
- 파이썬에서 JVM내에 있는 자바 객체를 사용 가능하게 해줌
- Py4J
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\
.appName('PySpark Tutorial')\
.getOrCreate()
- SPARK 오브젝트는 싱글턴 오브젝트
- SPARK이라고 입력하면 Spark UI를 통해서, 어떤 JOB들이 실행 중인지 알 수 있다.
- 각 실행된 JOB들별로 상황을 볼 수 있고, 디버깅, 최적화가 가능하다.
- 그런데 SPARK UI는 포트번호가 4040인데, 코랩에서는 접근이 불가능함
Spark 활용 DEMO
colab 환경에서 진행했습니다.
Pyspark 설치
Py4J 패키지는 파이썬 프로그램이 자바가상머신상의 오브젝트들을 접근할 수 있게 해준다. Local Standalone Spark을 사용한다.
!pip install pyspark==3.3.1 py4j==0.10.9.5
Spark Session Build
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\
.appName('PySpark Tutorial')\
.getOrCreate()
- Spark Session : SparkSession은 Spark 2.0부터 엔트리 포인트로 사용된다.
- SparkSession을 이용해 RDD, 데이터 프레임등을 만든다.
- SparkSession은 SparkSession.builder를 호출하여 생성하며 다양한 함수들을 통해 세부 설정이 가능하다(싱글턴 객체)
- local[*] Spark이 하나의 JVM으로 동작하고 그 안에 컴퓨터의 코어 수 만큼의 스레드가 Executor로 동작한다
RDD
name_list_json = [ '{"name": "keeyong"}', '{"name": "benjamin"}', '{"name": "claire"}' ]
# 파이썬 리스트를 RDD로 변환
# RDD로 변환되는 순간 Spark 클러스터의 서버들에 데이터가 나눠 저장됨 (파티션)
rdd = spark.sparkContext.parallelize(name_list_json)
rdd.count()
import json
# RDD는 lambda function을 사용하기가 쉽다.
# string들을 딕셔너리 형태로 바꾼 dictionary 형태로 바꾸기
parsed_rdd = rdd.map(lambda el:json.loads(el))
# 파이썬 드라이버 쪽에는 없고 spark 클러스터에 있다.
parsed_rdd
# collect를 통해서 파이썬 드라이버로 가져온다.
# 원래 string이였던 spark cluster에서 dictionary로 바꿨고,
# 이걸 파이썬 드라이버로 가져오는 코드(collect)
parsed_rdd.collect()
parsed_name_rdd = rdd.map(lambda el:json.loads(el)["name"])
parsed_name_rdd.collect()
DataFrame
# 헤더가 있음에도 불구하고, 그냥 스키마를 지정했다.
# csv 파일을 로드할 때 헤더가 있는 것을 알려줘야한다.
# 수정 전
df = spark.read.csv("name_gender.csv")
# 수정 후
df = spark.read.option("header", True).csv("name_gender.csv")
# 스키마 출력
df.printSchema()
# show(상위 20개만 출력)
df.show()
# 데이터프레임도 결국엔 rdd 위에서 올라가고, getNumpartitions()를 해보면 데이터프레임이 몇개의 파티션으로 구성된지 알 수 있다.
df.groupby(["gender"]).count().collect()
df.rdd.getNumPartitions()
Spark SQL
# 마치 테이블처럼 만들어야함
df.createOrReplaceTempView("namegender")
# sql 실행
namegender_group_df = spark.sql("SELECT gender, count(1) FROM namegender GROUP BY 1")
# collect(파이썬 드라이버로 가져옴)
namegender_group_df.collect()
# spark 세션 밑에 오브젝트 밑의 list tables를 불러보면
# 지금은 temp 테이블
# 만약 spark 세션을 만들 때 hive를 쓰게 세팅 했으면, 하이브가 갖고 있는 메타데이터 스토어가 있는데
# 그것과 스파크가 연결되고, 하이브에 있는 테이블도 보인다.
spark.catalog.listTables()
# 결과 예시
# [Table(name='namegender', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
# 파티션 수 계산
namegender_group_df.rdd.getNumPartitions()
# 리파티션으로 레코드들을 둘로 나눌 수 있는데, 해싱 같은 걸 지정 안했기 때문에 랜덤으로 파티셔닝한다.
two_namegender_group_df = namegender_group_df.repartition(2)
two_namegender_group_df.rdd.getNumPartitions()
'데이터 엔지니어링 > Spark' 카테고리의 다른 글
8. Spark DataFrame 실습(SparkSession, conf, Schema, Filter, select, SparkSQL, sql.types, 컬럼지칭 방식) (0) | 2023.08.17 |
---|---|
7. 윈도우에 Local Standalone Spark 클러스터 설치, Spark-submit 오류 해결, findspark (0) | 2023.07.28 |
5. Spark 프로그램 구조(Spark Session 생성, 환경변수) (1) | 2023.07.11 |
4. Spark 프로그래밍 : DataFrame(데이터처리, 동작구조) (0) | 2023.07.11 |
3. 빅데이터 처리와 Spark 소개(Spark 소개, Spark 프로그램 실행 옵션) (0) | 2023.07.10 |