UDF 실습
- 앞선 포스팅의 UDF를 실습
- 하나의 레코드로부터 다수의 레코드 만들어내기
- Order 데이터의 items 필드에서 다수의 Order item 레코드를 만들기
(1) Spark Session 생성
from pyspark.sql import SparkSession
import findspark
findspark.init()
spark = SparkSession \
.builder \
.appName("Python Spark UDF") \
.getOrCreate()
(2) Dataframe/SQL에 UDF 사용해보기 #1
columns = ["Seqno","Name"]
data = [("1", "john jones"),
("2", "tracey smith"),
("3", "amy sanders")]
df = spark.createDataFrame(data=data, schema=columns)
df.show(truncate=False)
- createDataFrame로 데이터 프레임 생성
(2.1) 람다 활용 컬럼 생성(DataFrame)
import pyspark.sql.functions as F
from pyspark.sql.types import *
upperUDF = F.udf(lambda z:z.upper())
df.withColumn("Curated Name", upperUDF("Name")) \
.show(truncate=False)
- lambda 활용 udf 생성
- udf 적용하여 Curated Name 컬럼 생성
- show
(2.2) 파이썬 함수 활용 컬럼 생성(DataFrame)
def upper_udf(s):
return s.upper()
- 파이썬 function 생성
# default가 stringType
upperUDF = F.udf(upper_udf, StringType())
df.withColumn("Curated Name", upperUDF("Name")) \
.show(truncate=False)
- withColumn 활용 컬럼 생성
df.select("Name", upperUDF("Name").alias("Curated Name")).show()
- select 활용 컬럼 생성
(2.3) 파이썬 함수(pandas_udf) 활용 컬럼 생성
from pyspark.sql.functions import pandas_udf
import pandas as pd
# Define the UDF
@pandas_udf(StringType())
def upper_udf_f(s: pd.Series) -> pd.Series:
return s.str.upper()
- pands_udf 어노테이션 활용 타입, 리턴값 정해주고 함수 정의
# 위에서 정의한 파이썬 upper 함수를 그대로 사용
upperUDF = spark.udf.register("upper_udf", upper_udf_f)
spark.sql("SELECT upper_udf('aBcD')").show()
- spark sql 활용 udf 적용(테스트)
df.select("name", upperUDF("name")).show()
- select 메소드 활용 정의
df.createOrReplaceTempView("test")
spark.sql("""
SELECT name, upper_udf(name) `Curated Name` FROM test
""").show()
- spark sql 활용 udf 적용
(3) Dataframe/SQL에 UDF 사용해보기 #2
(3.1) 람다 활용 컬럼 생성(DataFrame)
data = [
{"a": 1, "b": 2},
{"a": 5, "b": 5}
]
df = spark.createDataFrame(data)
df.withColumn("c", F.udf(lambda x, y: x + y)("a", "b")).show()
(3.2) 파이썬 함수 활용 컬럼 생성(DataFrame)
def plus(x, y):
return x + y
plusUDF = spark.udf.register("plus", plus)
spark.sql("SELECT plus(1, 2) sum").show()
(3.3) 파이썬 함수 활용 컬럼 생성(Spark SQL)
df.createOrReplaceTempView("test")
spark.sql("SELECT a, b, plus(a, b) p FROM test").show()
(4) Dataframe에 UDAF 사용해보기
from pyspark.sql.functions import pandas_udf
import pandas as pd
# Define the UDF
@pandas_udf(FloatType())
def average_udf_f(v: pd.Series) -> float:
return v.mean()
averageUDF = spark.udf.register('average_udf', average_udf_f)
spark.sql('SELECT average_udf(a) FROM test').show()
- average_udf_f 함수 정의
- spark.udf.register로 등록
- spark sql 활용
df.agg(averageUDF("b").alias("count")).show()
- agg 함수로 적용하기
- count라는 컬럼 이름 생성
(5) DataFrame에 explode 사용해보기
arrayData = [
('James',['Java','Scala'],{'hair':'black','eye':'brown'}),
('Michael',['Spark','Java',None],{'hair':'brown','eye':None}),
('Robert',['CSharp',''],{'hair':'red','eye':''}),
('Washington',None,None),
('Jefferson',['1','2'],{})]
df = spark.createDataFrame(data=arrayData, schema = ['name','knownLanguages','properties'])
df.show()
# knownLanguages 필드를 언어별로 짤라서 새로운 레코드로 생성
from pyspark.sql.functions import explode
df2 = df.select(df.name,explode(df.knownLanguages))
df2.printSchema()
df2.show()
(5) 하나의 레코드에서 다수의 레코드를 만들어내는 예제 (Order to 1+ Items)
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, LongType
order = spark.read.options(delimiter='\t').option("header","true").csv("./data/orders.csv")
- spark로 read
# 데이터프레임을 이용해서 해보기
struct = ArrayType(
StructType([
StructField("name", StringType()),
StructField("id", StringType()),
StructField("quantity", LongType())
])
)
order.withColumn("item", explode(from_json("items", struct))).show(truncate=False)
- struct 타입 정의 후
- json으로 부터, struct 타입으로 하나씩 빼낸 후 explode
order_items = order.withColumn("item", explode(from_json("items", struct))).drop("items")
- item 컬럼 생성
- items로부터 json 데이터 가져오고 explode
- items 컬럼 드랍
order_items.createOrReplaceTempView("order_items")
- order_items 데이터프레임을 order_items라는 임시테이블명으로 생성
spark.sql("""
SELECT order_id, CAST(average_udf(item.quantity) as decimal) avg_count
FROM order_items
GROUP BY 1
ORDER BY 2 DESC""").show(5)
- average_udf를 활용해서 item의 quantity 부분만 평균값을 구한다. CAST로 decimal로 깔끔하게
- GROUP BY는 order_id
spark.sql("""SELECT item.quantity FROM order_items WHERE order_id = '1816674631892'""").show()
- order_id가 해당 번호인 아이템의 quantity
spark.catalog.listTables()
- 만들어진 테이블의 카탈로그 보기
# udf function 뿐만 아니라 native function도 출력된다.
for f in spark.catalog.listFunctions():
print(f[0])
- 카탈로그 안에 있는 function들을 출력
- 이 때 등록한 udf도 출력이 되지만 native function도 출력됨
'데이터 엔지니어링 > Spark' 카테고리의 다른 글
18. SparkSQL(Redshift Connect, Ranking) (0) | 2023.08.21 |
---|---|
17. SparkSQL(JOIN - DataFrame, SQL) (0) | 2023.08.21 |
15. Spark SQL(UDF, Pandas UDF Scalar) (0) | 2023.08.19 |
14. Spark SQL(Aggregation-JOIN, Shuffle JOIN, Bucket JOIN, BroadCast JOIN) (3) | 2023.08.17 |
13. Spark SQL 소개(사용법, 외부 데이터베이스 연결) (0) | 2023.08.17 |