UDF 실습
앞선 포스팅의 UDF를 실습
하나의 레코드로부터 다수의 레코드 만들어내기
Order 데이터의 items 필드에서 다수의 Order item 레코드를 만들기
(1) Spark Session 생성
from pyspark.sql import SparkSession
import findspark
findspark.init()
spark = SparkSession \
.builder \
.appName("Python Spark UDF") \
.getOrCreate()
(2) Dataframe/SQL에 UDF 사용해보기 #1
columns = ["Seqno","Name"]
data = [("1", "john jones"),
("2", "tracey smith"),
("3", "amy sanders")]
df = spark.createDataFrame(data=data, schema=columns)
df.show(truncate=False)
createDataFrame로 데이터 프레임 생성
(2.1) 람다 활용 컬럼 생성(DataFrame)
import pyspark.sql.functions as F
from pyspark.sql.types import *
upperUDF = F.udf(lambda z:z.upper())
df.withColumn("Curated Name", upperUDF("Name")) \
.show(truncate=False)
lambda 활용 udf 생성
udf 적용하여 Curated Name 컬럼 생성
show
(2.2) 파이썬 함수 활용 컬럼 생성(DataFrame)
def upper_udf(s):
return s.upper()
# default가 stringType
upperUDF = F.udf(upper_udf, StringType())
df.withColumn("Curated Name", upperUDF("Name")) \
.show(truncate=False)
df.select("Name", upperUDF("Name").alias("Curated Name")).show()
(2.3) 파이썬 함수(pandas_udf) 활용 컬럼 생성
from pyspark.sql.functions import pandas_udf
import pandas as pd
# Define the UDF
@pandas_udf(StringType())
def upper_udf_f(s: pd.Series) -> pd.Series:
return s.str.upper()
pands_udf 어노테이션 활용 타입, 리턴값 정해주고 함수 정의
# 위에서 정의한 파이썬 upper 함수를 그대로 사용
upperUDF = spark.udf.register("upper_udf", upper_udf_f)
spark.sql("SELECT upper_udf('aBcD')").show()
df.select("name", upperUDF("name")).show()
df.createOrReplaceTempView("test")
spark.sql("""
SELECT name, upper_udf(name) `Curated Name` FROM test
""").show()
(3) Dataframe/SQL에 UDF 사용해보기 #2
(3.1) 람다 활용 컬럼 생성(DataFrame)
data = [
{"a": 1, "b": 2},
{"a": 5, "b": 5}
]
df = spark.createDataFrame(data)
df.withColumn("c", F.udf(lambda x, y: x + y)("a", "b")).show()
(3.2) 파이썬 함수 활용 컬럼 생성(DataFrame)
def plus(x, y):
return x + y
plusUDF = spark.udf.register("plus", plus)
spark.sql("SELECT plus(1, 2) sum").show()
(3.3) 파이썬 함수 활용 컬럼 생성(Spark SQL)
df.createOrReplaceTempView("test")
spark.sql("SELECT a, b, plus(a, b) p FROM test").show()
(4) Dataframe에 UDAF 사용해보기
from pyspark.sql.functions import pandas_udf
import pandas as pd
# Define the UDF
@pandas_udf(FloatType())
def average_udf_f(v: pd.Series) -> float:
return v.mean()
averageUDF = spark.udf.register('average_udf', average_udf_f)
spark.sql('SELECT average_udf(a) FROM test').show()
average_udf_f 함수 정의
spark.udf.register로 등록
spark sql 활용
df.agg(averageUDF("b").alias("count")).show()
agg 함수로 적용하기
count라는 컬럼 이름 생성
(5) DataFrame에 explode 사용해보기
arrayData = [
('James',['Java','Scala'],{'hair':'black','eye':'brown'}),
('Michael',['Spark','Java',None],{'hair':'brown','eye':None}),
('Robert',['CSharp',''],{'hair':'red','eye':''}),
('Washington',None,None),
('Jefferson',['1','2'],{})]
df = spark.createDataFrame(data=arrayData, schema = ['name','knownLanguages','properties'])
df.show()
# knownLanguages 필드를 언어별로 짤라서 새로운 레코드로 생성
from pyspark.sql.functions import explode
df2 = df.select(df.name,explode(df.knownLanguages))
df2.printSchema()
df2.show()
(5) 하나의 레코드에서 다수의 레코드를 만들어내는 예제 (Order to 1+ Items)
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, LongType
order = spark.read.options(delimiter='\t').option("header","true").csv("./data/orders.csv")
# 데이터프레임을 이용해서 해보기
struct = ArrayType(
StructType([
StructField("name", StringType()),
StructField("id", StringType()),
StructField("quantity", LongType())
])
)
order.withColumn("item", explode(from_json("items", struct))).show(truncate=False)
struct 타입 정의 후
json으로 부터, struct 타입으로 하나씩 빼낸 후 explode
order_items = order.withColumn("item", explode(from_json("items", struct))).drop("items")
item 컬럼 생성
items로부터 json 데이터 가져오고 explode
items 컬럼 드랍
order_items.createOrReplaceTempView("order_items")
order_items 데이터프레임을 order_items라는 임시테이블명으로 생성
spark.sql("""
SELECT order_id, CAST(average_udf(item.quantity) as decimal) avg_count
FROM order_items
GROUP BY 1
ORDER BY 2 DESC""").show(5)
average_udf를 활용해서 item의 quantity 부분만 평균값을 구한다. CAST로 decimal로 깔끔하게
GROUP BY는 order_id
spark.sql("""SELECT item.quantity FROM order_items WHERE order_id = '1816674631892'""").show()
order_id가 해당 번호인 아이템의 quantity
spark.catalog.listTables()
# udf function 뿐만 아니라 native function도 출력된다.
for f in spark.catalog.listFunctions():
print(f[0])
카탈로그 안에 있는 function들을 출력
이 때 등록한 udf도 출력이 되지만 native function도 출력됨