유닛 테스트란?
- 코드 상의 특정 기능(보통 메소드의 형태)을 테스트하기 위해 작성된 코드
- 보통 정해진 입력을 주고 예상된 출력이 나오는지 형태로 테스트
- CI/CD를 사용하려면 전체 코드의 테스트 커버리지가 굉장히 중요해짐
- 각 언어별로 정해진 테스트 프레임웍을 사용하는 것이 일반적
- JUnit for Java
- NUnit for .NET
- unittest for Python
- unittest를 사용해볼 예정
- 좋은 개발자가 되기 위해선, 내 코드에 어떤 문제가 있을지 테스트를 작성하는 습관을 가져야한다.
유닛 테스트 실습 1
(1) Spark Session Build
# 코드를 작성하기 전에 test 코드를 먼저 만들어두고, 코드를 만들어라
# TDD(TEST DRIVEN DEVELOPMENT)
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark Unit Test") \
.getOrCreate()
(2) 결과값 확인
df = spark.read.option("header", True).csv("./data/name_gender.csv")
df.printSchema()
df.count()
df.createOrReplaceTempView("namegender")
spark.sql("SELECT gender, COUNT(1) count FROM namegender GROUP BY 1").show()
(3) 함수 정의
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import *
import pandas as pd
# Define the UDF
@pandas_udf(StringType())
def upper_udf_f(s: pd.Series) -> pd.Series:
return s.str.upper()
upperUDF = spark.udf.register("upper_udf", upper_udf_f)
- Series를 받아서 Series로 나오는 대문자 전환 함수를 spark udf에 등록
def load_gender(spark, file_path):
return spark.read.option("header", True).csv(file_path)
def get_gender_count(spark, df, field_to_count):
df.createOrReplaceTempView("namegender_test")
return spark.sql(f"SELECT {field_to_count}, COUNT(1) count FROM namegender_test GROUP BY 1")
- file 읽기 함수
- 임시 테이블 만들어서, 쿼리날리는 함수 생성
(4) 함수 간이 테스트
df = load_gender(spark, "./data/name_gender.csv")
get_gender_count(spark, df, "gender").show()
df.select(upperUDF("name").alias("NAME")).show()
(5) 유닛 테스트
from unittest import TestCase
# 일반적으로는 아래 함수가 정의된 모듈을 임포트하고 그걸 테스트
# - upper_udf_f
# - load_gender
# - get_gender_count
# 보통 테스트 코드를 만든 다음에, 내가 테스트 하고 싶은 함수들을 import 해서 쓰는 것이 일반적
# 이 외에도 2가지 방법이 더 존재
# - from pyspark.sql.tests import SparkTestingBase
# - pytest-spark (pytest testing framework plugin)
class UtilsTestCase(TestCase):
# 전역 변수 생성
spark = None
# setUpClass는 계속해서 사용할 리소스들이 있으면, 여기서 생성
# spark 세션을 하나 만들어서 Spark이라는 변수에 저장을 해놓았다.
# self.spark으로 지칭
@classmethod
def setUpClass(cls) -> None:
cls.spark = SparkSession.builder \
.appName("Spark Unit Test") \
.getOrCreate()
# 데이터프레임 받아서 그거의 카운트 세서, 100과 같은지 다른지 체크
# assertEqual은 두 개의 인자가 같으면 성공, 실패
def test_datafile_loading(self):
sample_df = load_gender(self.spark, "./data/name_gender.csv")
result_count = sample_df.count()
self.assertEqual(result_count, 100, "Record count should be 100")
# 데이터 프레임 받은 후
# get gender count해서 콜렉트 해서
# 파이썬에서 row 타입의 리스트로 받아온 후
# 숫자가 맞으면
def test_gender_count(self):
sample_df = load_gender(self.spark, "./data/name_gender.csv")
count_list = get_gender_count(self.spark, sample_df, "gender").collect()
count_dict = dict()
for row in count_list:
count_dict[row["gender"]] = row["count"]
self.assertEqual(count_dict["F"], 65, "Count for F should be 65")
self.assertEqual(count_dict["M"], 28, "Count for M should be 28")
self.assertEqual(count_dict["Unisex"], 7, "Count for Unisex should be 7")
# udf를 테스트해보는 케이스
# spark 데이터 프레임으로 만들고
# upper_udf 함수 등록
# 대문자로 바뀐 컬럼 이름을 그 데이터프레임의 결과를 collect로 받아온다.
def test_upper_udf(self):
test_data = [
{ "name": "John Kim" },
{ "name": "Johnny Kim"},
{ "name": "1234" }
]
expected_results = [ "JOHN KIM", "JOHNNY KIM", "1234" ]
upperUDF = self.spark.udf.register("upper_udf", upper_udf_f)
test_df = self.spark.createDataFrame(test_data)
names = test_df.select("name", upperUDF("name").alias("NAME")).collect()
results = []
for name in names:
results.append(name["NAME"])
# assertCountEqual은 리스트를 두개 받아서 리스트를 sorting 했을 대 동일한지 보는 것
self.assertCountEqual(results, expected_results)
# tearDownClass는 마지막에 호출
# setUpClass에서 release
@classmethod
def tearDownClass(cls) -> None:
cls.spark.stop()
- Python의 unnitTest 활용
- 테스트 케이스 클래스 만들어서 테스트 코드 생성
import unittest
unittest.main(argv=[''], verbosity=2, exit=False)
- 유닛 테스트 실행
- 명령어로 실행하는 법
- test_df.py 파일 생성 후(코드 위와 동일)
python -m unittest test_df.py
- 일반적으로 application 코드와 test 코드가 별도로 있고, 이렇게 해서 모듈화를 진행한다.
'데이터 엔지니어링 > Spark' 카테고리의 다른 글
24. Spark 내부 동작(Execution Plan) (0) | 2023.08.22 |
---|---|
23. Spark 내부 동작(Spark 파일 포맷, Partition 저장 형태, Schema가 있는 데이터의 Merge) (0) | 2023.08.22 |
21. Hive 메타스토어 사용하기 (0) | 2023.08.22 |
20. SPARK SQL(Windowing) (0) | 2023.08.22 |
19. SPARK SQL(Redshift Connect, Grouping) (0) | 2023.08.22 |