Aggregation용 UDAF(User Defined Aggregation Function)도 존재
GROUP BY에서 사용되는 SUM, AVG와 같은 함수를 만드는 것
PySpark에서 지원되지 않음. Scalar/Java를 사용해야함
보통 UDF는 pandas UDF 스칼라함수를 사용(하나씩 처리 -> 배치 처리 가능)
(1) UDF - DataFrame 사용해보기 #1
주어진 데이터를 대문자로 만드는 UDF
import pyspark.sql.functions as F
from pyspark.sql.types import *
upperUDF = F.udf(lambda z:z.upper())
df.withColumn("Curated Name", upperUDF("Name"))
(2) UDF - SQL 사용해보기 #1
def upper(s):
return s.upper()
# 먼저 테스트
# 문자열로 된 이름, 함수명으로 등록
upperUDF = spark.udf.register("upper", upper)
spark.sql("SELECT upper('aBcD')").show()
# DataFrame 기반 SQL에 적용
df.createOrReplaceTempView("test")
spark.sql("""SELECT name, upper(name) "Curated Name" FROM test""").show()
(3) UDF - DataFrame 사용해보기 #2
data = [
{"a" : 1, "b" : 2},
{"a" : 5, "b" : 5}
]
df = spark.createDataFrame(data)
df.withColumn("c", F.udf(lambda x, y : x + y)("a","b"))
(4) UDF - SQL 사용해보기 #2
def plus(x, y):
return x + y
plusUDF = spark.udf.register("plus", plus)
# test
spark.sql("SELECT plus(1,2)").show()
# 적용
df.createOrReplaceTempView("test")
spark.sql("SELECT a,b, plus(a,b) c FROM test").show()
(5) UDF - Pandas UDF Scalar 함수 사용해보기
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(StringType())
def upper_udf2(s: pd.Series) -> pd.Series:
return s.str.upper()
# test
upperUDF = spark.udf.register("upper_udf", upper_udf2)
# 실제 사용
df.select("Name", upperUDF("Name")).show()
spark.sql("""SELECT name, upper_udf(name) `curated Name` FROM test""").show()
레코드 집합을 입력으로 받아서 프로세싱을 하기 때문에
판다스의 Series 타입으로 받아오게 되어있다.
판다스 시리즈로 받아서 판다스 시리즈로 리턴해준다
어노테이션의 타입은 받아야할 타입을 지정
판다스 udf로 만들 경우 하나씩 처리가 아니라 bulk로 처리하기 때문에 퍼포먼스가 더 좋다
여기서 apache arrow라는 기술을 내부적으로 사용
값들의 집합이 들어왔다가, 값들의 집합이 나가기 때문에 Scalar 함수이다.
python object 와 jvm object 간의 변환이 훨씬 빠르고 효율적
(6) UDF - Pandas UDF Scalar 함수 사용해보기
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(FloatType())
def average(v: pd.Series) -> float:
return v.mean()
averageUDF = spark.udf.register('average', average)
spark.sql('SELECT average(b) FROM test').show()
df.agg(averageUDF("b").alias("count")).show()