UDF(User Defined function) 사용해보기
- 데이터프레임의 경우 .withColumn 함수와 같이 사용하는 것이 일반적
- Spark SQL에서도 사용 가능함
- Aggregation용 UDAF(User Defined Aggregation Function)도 존재
- GROUP BY에서 사용되는 SUM, AVG와 같은 함수를 만드는 것
- PySpark에서 지원되지 않음. Scalar/Java를 사용해야함
- 보통 UDF는 pandas UDF 스칼라함수를 사용(하나씩 처리 -> 배치 처리 가능)
(1) UDF - DataFrame 사용해보기 #1
- 주어진 데이터를 대문자로 만드는 UDF
import pyspark.sql.functions as F
from pyspark.sql.types import *
upperUDF = F.udf(lambda z:z.upper())
df.withColumn("Curated Name", upperUDF("Name"))
(2) UDF - SQL 사용해보기 #1
def upper(s):
return s.upper()
# 먼저 테스트
# 문자열로 된 이름, 함수명으로 등록
upperUDF = spark.udf.register("upper", upper)
spark.sql("SELECT upper('aBcD')").show()
# DataFrame 기반 SQL에 적용
df.createOrReplaceTempView("test")
spark.sql("""SELECT name, upper(name) "Curated Name" FROM test""").show()
(3) UDF - DataFrame 사용해보기 #2
data = [
{"a" : 1, "b" : 2},
{"a" : 5, "b" : 5}
]
df = spark.createDataFrame(data)
df.withColumn("c", F.udf(lambda x, y : x + y)("a","b"))
(4) UDF - SQL 사용해보기 #2
def plus(x, y):
return x + y
plusUDF = spark.udf.register("plus", plus)
# test
spark.sql("SELECT plus(1,2)").show()
# 적용
df.createOrReplaceTempView("test")
spark.sql("SELECT a,b, plus(a,b) c FROM test").show()
(5) UDF - Pandas UDF Scalar 함수 사용해보기
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(StringType())
def upper_udf2(s: pd.Series) -> pd.Series:
return s.str.upper()
# test
upperUDF = spark.udf.register("upper_udf", upper_udf2)
# 실제 사용
df.select("Name", upperUDF("Name")).show()
spark.sql("""SELECT name, upper_udf(name) `curated Name` FROM test""").show()
- 레코드 집합을 입력으로 받아서 프로세싱을 하기 때문에
- 판다스의 Series 타입으로 받아오게 되어있다.
- 판다스 시리즈로 받아서 판다스 시리즈로 리턴해준다
- 어노테이션의 타입은 받아야할 타입을 지정
- 판다스 udf로 만들 경우 하나씩 처리가 아니라 bulk로 처리하기 때문에 퍼포먼스가 더 좋다
- 여기서 apache arrow라는 기술을 내부적으로 사용
- 값들의 집합이 들어왔다가, 값들의 집합이 나가기 때문에 Scalar 함수이다.
- python object 와 jvm object 간의 변환이 훨씬 빠르고 효율적
(6) UDF - Pandas UDF Scalar 함수 사용해보기
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(FloatType())
def average(v: pd.Series) -> float:
return v.mean()
averageUDF = spark.udf.register('average', average)
spark.sql('SELECT average(b) FROM test').show()
df.agg(averageUDF("b").alias("count")).show()
'데이터 엔지니어링 > Spark' 카테고리의 다른 글
17. SparkSQL(JOIN - DataFrame, SQL) (0) | 2023.08.21 |
---|---|
16. SparkSQL(udf 활용 dataframe 가공 및 spark sql 활용 집계) (1) | 2023.08.19 |
14. Spark SQL(Aggregation-JOIN, Shuffle JOIN, Bucket JOIN, BroadCast JOIN) (3) | 2023.08.17 |
13. Spark SQL 소개(사용법, 외부 데이터베이스 연결) (0) | 2023.08.17 |
12. Spark DataFrame 실습(Redshift 연결, jdbc, join) (0) | 2023.08.17 |