실습 4. stackoverflow 서베이 기반 인기 언어 찾기
- stackoverflow CSV 파일에서 다음 두 필드는 ;를 구분자로 프로그래밍 언어를 구분
- LanguageHaveWorkedWith
- LanguageWantToWorkWith
- 이를 별개 레코드로 분리하여 가장 많이 사용되는 언어 top50와 가장 많이 쓰고 싶은 언어 top 50 계산해보기
(1) Spark Session Build
from pyspark.sql import SparkSession
# .config("spark.jars", "/usr/local/lib/python3.7/dist-packages/pyspark/jars/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar") \
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.getOrCreate()
(2) Read CSV
df = spark.read.csv("./data/survey_results_public.csv", header=True).select('ResponseId', 'LanguageHaveWorkedWith', 'LanguageWantToWorkWith')
- header가 존재하고, 특정 컬럼 3개만 불러옵니다.
(3) Split(문자열 나눠 리스트 반환)
import pyspark.sql.functions as F
# LanguageHaveWorkedWith 값을 트림하고 ;를 가지고 나눠서 리스트의 형태로 language_have 필드로 설정
df2 = df.withColumn(
"language_have",
F.split(F.trim(F.col("LanguageHaveWorkedWith")), ";")
)
- language_have라는 필드를 추가하면서
- 해당 필드에는 value를 채워줍니다.
- 이 때 value는 기존의 LanguageHaveWorkwith 컬럼을 ;로 분할한 리스트를 반환합니다(split)
- trim은 양옆 공백을 제거하는 메소드
# LanguageWantToWorkWith 값을 트림하고 ;를 가지고 나눠서 리스트의 형태로 language_want 필드로 설정
df3 = df2.withColumn(
"language_want",
F.split(F.trim(F.col("LanguageWantToWorkWith")), ";")
)
df3.printSchema()
df3.show(5)
- 같은 방식으로 다른 컬럼 또한 만듭니다.
(4) Explode(리스트인 컬럼을 하나의 값으로만 갖는 컬럼으로 변환)
df_language_have = df3.select(
df3.ResponseId,
F.explode(df3.language_have).alias("language_have")
)
(5) Sort
# sort의 인자로 desc라는 게 있는데 count Filed를 기준으로 내림차순
df_language_have.groupby("language_have").count().sort(F.desc("count")).collect()
# sort 말고 orderBy 활용하기, ascedning = False면 desc
df_language_have.groupby("language_have").count().orderBy('count', ascending=False).collect()
- Sort를 활용하는 방식에는 sort 메소드를 활용하는 방법
- 이 때는 desc라는 Function을 활용합니다.
- orderBy 메소드를 활용하는 방식이 있습니다.
(6) 특정 부분만 데이터프레임으로 가져오기
# 위에서 50개만 데이터프레임으로
df_language50_have = df_language_have.groupby("language_have")\
.count()\
.orderBy('count', ascending=False)\
.limit(50)
- limit을 활용합니다.
(7) HDFS에 쓰기
# 결과를 HDFS에 쓰기
# mode('overwrite')은 덮어쓰기
df_language50_have.write.mode('overwrite').csv("./data/language50_have")
- mode('overwrite')을 사용하면, 이미 있는 파일의 경우에 덮어쓰기 합니다.
해당 문제는 SQL로 해결하기 어려운 문제 유형입니다. 따라서 이런 방식으로 데이터프레임을 활용해서, 문제를 해결합니다.
'데이터 엔지니어링 > Spark' 카테고리의 다른 글
13. Spark SQL 소개(사용법, 외부 데이터베이스 연결) (0) | 2023.08.17 |
---|---|
12. Spark DataFrame 실습(Redshift 연결, jdbc, join) (0) | 2023.08.17 |
10. Spark DataFrame 실습(텍스트 파싱 -> 구조화 데이터 변환, 윈도우 Hadoop.dll 관련 에러해결법) (0) | 2023.08.17 |
9. Spark DataFrame 실습(Alias, catalog) (0) | 2023.08.17 |
8. Spark DataFrame 실습(SparkSession, conf, Schema, Filter, select, SparkSQL, sql.types, 컬럼지칭 방식) (0) | 2023.08.17 |