실습 2. 헤더가 없는 CSV 파일 처리하기
- 입력 데이터 : 헤더 없는 CSV 파일
- 데이터에 스키마 지정하기
- cust_id, item_id, amount_spent를 데이터 컬럼으로 추가하기(모두 숫자)
- cust_id를 기준으로 amount_spent의 합을 계산하기
1) schema 생성
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, FloatType
schema = StructType([ \
StructField("cust_id", StringType(), True), \
StructField("item_id", StringType(), True), \
StructField("amount_spent", FloatType(), True)])
2) Data Load
df = spark.read.schema(schema).csv("./data/customer-orders.csv")
df.printSchema()
3) Groupby, aggregation, Alias
# groupBy, sum
df_ca = df.groupBy("cust_id").sum("amount_spent")
df_ca.show()
# 컬럼 리네임 하는 방식
df_ca = df.groupBy("cust_id").sum("amount_spent").withColumnRenamed("sum(amount_spent)", "sum")
# 컬럽 리네임하는 방식(alias 활용)
import pyspark.sql.functions as f
# agg라는 함수(aggregation), 필드의 결과를 sum으로 개명
df_ca = df.groupBy("cust_id") \
.agg(f.sum('amount_spent').alias('sum'))
# min과 avg 값도 구하고 싶다면?
df.groupBy("cust_id") \
.agg(
f.sum('amount_spent').alias('sum'),
f.max('amount_spent').alias('max'),
f.avg('amount_spent').alias('avg')).collect()
- GroupBy는 데이터프레임에 메소드를 붙여서, 집계함수와 사용할 수 있습니다.
- 기본적인 틀에 rename 하고 싶다면 withColumnRenamed를 활용합니다.
- agg 함수와 함께 alias를 써서 컬럼명을 바꿀 수도 있습니다.
4) SparkSQL 활용해보기
# 메모리에 테이블을 하나 만듬
df.createOrReplaceTempView("customer_orders")
- 앞서 불러온 df를 활용하여 메모리 위에 테이블을 하나 만듭니다.
spark.sql("""SELECT cust_id, SUM(amount_spent) sum, MAX(amount_spent) max, AVG(amount_spent) avg
FROM customer_orders
GROUP BY 1""").head(5)
- sql function과 함께 쿼리문을 작성합니다.
spark.catalog.listTables()
- Spark은 기본적으로 in-memory 카탈로그를 테이블을 관리합니다.
- 만약 테이블들을 메모리에 올려놓고 관리하는게 아니라, persist(유지) 하고 싶다면
- 스토리지 기반의 Hive 메타 스토어를 활용합니다.
- SparkSession 설정시 enableHiveSupport()을 호출하고, Hive metastore를 카탈로그로 사용하여, Hive UDF와 Hive 파일 포맷을 사용할 수 있습니다.(뒷 포스팅에서 자세히 다루겠습니다.)
항상 정형 데이터의 가공의 경우, 데이터프레임에 비해 SparkSQL로 처리하는 것이, 훨씬 가독성 면에서 뛰어납니다.