AWS EMR
- EMR(Elastic MapReduce) 위에서 실행하는 것이 일반적
- EMR이란?
- AWS의 Hadoop 서비스 (On-demand Hadoop)
- Hadoop(YARN), Spark, Hive, Notebook 등등이 설치되어 제공되는 서비스
- 기본적으로는 YARN 위에서 돌림
- Spark은 Zepplin과 같은 Spark 전용 Notebook과 같이 실행되는 경우가 많다..
- EC2 서버들을 worker node로 사용하고(이중 하나가 마스터 노드) S3를 HDFS로 사용
- AWS 내의 다른 서비스들과의 연동이 쉬움(Kinesis, DynamoDB, Redshift, ...)
- AWS의 Hadoop 서비스 (On-demand Hadoop)
Spark on EMR 실행 및 사용 과정
- AWS의 EMR(Elastic MapReduce - 하둡) 클러스터 생성
- EMR 생성시 Spark을 실행(옵션으로 선택)
- S3를 기본 파일 시스템으로 사용
- EMR의 마스터 노드를 드라이버 노드로 사용
- 마스터 노드를 SSH로 로그인
- spark-submit을 사용
- Spark Cluster 모드에 해당
- 클러스터 모드 : 클러스터 안에 들어가서 실행하기 때문에 프로덕션 환경에 적합
- 클라이언트 모드 : 노트북, 쉘 같은 인터랙티브 환경에서 사용
- 마스터 노드를 SSH로 로그인
- Spark 클러스터 매니저와 실행 모델 요약
Spark on EMR 클러스터 생성 스텝
스텝 1. EMR 서비스 페이지로 이동
스텝 2. EMR 클러스터 생성하기 - 이름과 기술 스택 선택
- Zepplin이란?
- https://zeppelin.apache.org/ : 노트북(Spark, SQL, Python)
- Python Notebook 같은 Notebook인데, Spark 전용 Notebook
- SQL, Python, R들을 실행할 수 있는데 기본 백엔드가 Spark
스텝 3. EMR 클러스터 생성하기 - 클러스터 사양 선택 후 생성
스텝 4. EMR 클러스터 생성까지 대기
- 마스터 노드의 포트번호 22번 열기 (1)
- 마스터 노드의 포트번호 22번 열기 (2)
스텝 5. Spark History Server 보기
PySpark 잡 EMR 클러스터 위에서 실행
- Spark 마스터노드에 SSH로 로그인
- 이를 위해 마스터노드의 TCP 포트번호 22번을 오픈해야함
- spark-submit을 이용해서 실행하면서 디버깅
- 두 개의 잡을 AWS EMR 상에서 실행해 볼 예정
1. 입력 데이터를 S3로 로딩
- Stackoverflow 2022년 개발자 서베이 CSV 파일을 S3 버킷으로 업로드
- 익명화된 83,339개의 서베이 응답
- 입력 CSV 파일을 분석하여 그 결과를 S3에 다시 저장(stackoverflow.py)
- 미국 개발자만 대상으로 코딩을 처음 어떻게 배웠는지 카운트해서 S3로 저장
- DataFrame 버전
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
S3_DATA_INPUT_PATH = 's3://spark-tutorial-dataset/survey_result_public.csv'
S3_DATA_OUTPUT_PATH = 's3://spark-tutorial-dataset/data-output'
spark = SparkSession.builder.appName('Tutorial').getOrCreate()
df = spark.read.csv(S3_DATA_INPUT_PATH, header = True)
print('# of records {}'.format(df.count()))
learnCodeUS = df.where((col('Country') == 'United States of America')).groupby('LearnCode').count()
learnCodeUs.write.mode('overwrite').csv(S3_DATA_OUTPUT_PATH) # parquet
learnCodeUs.show()
print('Selected data is successfully saved to S3 : {}'.format(S3_DATA_OUTPUT_PATH)
- SparkSQL 버전
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
S3_DATA_INPUT_PATH = 's3://spark-tutorial-dataset/survey_result_public.csv'
S3_DATA_OUTPUT_PATH = 's3://spark-tutorial-dataset/data-output'
spark = SparkSession.builder.appName('Tutorial').getOrCreate()
df = spark.read.csv(S3_DATA_INPUT_PATH, header = True)
print('# of records {}'.format(df.count()))
# 임시테이블 생성
df.createOrReplaceTempView("stackoverflow")
learnCodeUS = spark.sql("""
SELECT LearnCode, COUNT(1) count
FROM stackoverflow
WHERE Country = 'United States of America'
GROUP BY 1""")
# learnCodeUS = df.where((col('Country') == 'United States of America')).groupby('LearnCode').count()
learnCodeUs.write.mode('overwrite').csv(S3_DATA_OUTPUT_PATH) # parquet
learnCodeUs.show()
print('Selected data is successfully saved to S3 : {}'.format(S3_DATA_OUTPUT_PATH)
2. Spark 마스터 노드에 ssh로 로그인하여 spark-submit을 통해 실행
- 앞서 다운로드 받은 .pem 파일을 사용하여 SSH 로그인
spark-submit --master yarn stackoverflow.py
3. EMR Spark Cluster Web UI 확인
DataFrame Version
- 7개의 Job으로 구성, 각 Job은 Stage 하나로 구성
SQL Version
- Job 7개(Dataframe과 별 차이 없었음)
EMR 클러스터의 Zeppelin
(1) Zeppelin ACCESS ON EMR
- Yarn에도 node 매니저 뿐만 아니라, 다양한 웹 UI가 있다.
- 이런 것들은 SSH CONNECTION을 ENABLE 해야 ACCESS 할 수 있게 되어있다.
- SSH 터널링
- 크롬에선 EXTENSION 받아서 Configuration을 끝내야한다.
- 링크에서 포트번호 확인 후 내 작업 브라우저에서 ACCESS
(2) Zeppelin 사용
- Python, R, Scala를 이용해서 Spark과 Interactive하게 코딩할 수 있는 Notebook
- Zepplin은 Spark과 관련된 코딩을 하기에 굉장히 편한 Notebook
- 이미 데이터브릭스 제품에는 기본으로 들어가있다.
- spark-submit 전에 개발용으로 사용
- Create New note 클릭
- Default Interpreter : Spark
- Create
- %pyspark을 통해 Pyspark 사용
'데이터 엔지니어링 > Spark' 카테고리의 다른 글
32. Spark 마무리, 다음 스텝 (2) | 2023.08.25 |
---|---|
30. SparkML Pipeline (0) | 2023.08.25 |
29. SparkML(Classification, 타이타닉 생존 예측 모델) (0) | 2023.08.25 |
28. Spark ML(Regression, 보스턴 주택값 예측 모델) (0) | 2023.08.23 |
27. SPARK ML 소개 (0) | 2023.08.23 |