실습 5. Redshift 연결해보기
- MAU(Monthly Active User) 계산해보기
- 두 개의 테이블을 Redshift에서 Spark로 로드
- JDBC 연결 실습
- DataFrame과 SparkSQL을 사용해서 조인
- DataFrame JOIN
- left_DF.join(right_DF, join condition, join type)
- join type : "inner", "left", "right", "outer", "semi", "anti"
- left_DF.join(right_DF, join condition, join type)
* redshift jdbc jar 파일 넣는 경로(윈도우)
- C:\Spark\spark-3.4.1-bin-hadoop3\jars
- https://docs.aws.amazon.com/ko_kr/redshift/latest/mgmt/jdbc20-download-driver.html
(1) SparkSession Build
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("PySpark DataFrame #5") \
.getOrCreate()
(2) Redshift Connect(테이블 읽어오기)
df_user_session_channel = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=아이디&password=비밀번호") \
.option("dbtable", "raw_data.user_session_channel") \
.load()
df_session_timestamp = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=아이디&password=비밀번호") \
.option("dbtable", "raw_data.session_timestamp") \
.load()
- jdbc format으로
- driver 이름 설정
- url 통해서 redshift 특정 데이터베이스와 연결
- 특정 스키마 아래 테이블 load
# 스키마 프린트
df_user_session_channel.printSchema()
# 몇개의 파티션을 갖고 있는지
df_user_session_channel.rdd.getNumPartitions()
(3) DataFrame으로 처리하기(채널별 유입량)
# join 조건 설정
join_expr = df_user_session_channel.sessionid == df_session_timestamp.sessionid
# inner join
session_df = df_user_session_channel.join(df_session_timestamp, join_expr, "inner")
- df_user_session_channel과 df_session_timestamp를 join
# 세션 id가 두번 나온다
session_df.show(5)
# 이렇게 하면 sessionid가 두 개였기 때문에 반드시 에러가 발생한다.
session_df = df_user_session_channel.join(df_session_timestamp, join_expr, "inner").select(
"userid", "sessionid", "channel", "ts"
)
- 에러가 발생하는 이유는 sessionid가 두 개이기 때문에, 특정 테이블에서 가져온 걸 명시해줘야한다.
# 가장 간단한 방법은 데이터프레임 이름을 작성한다.
session_df = df_user_session_channel.join(df_session_timestamp, join_expr, "inner").select(
"userid", df_user_session_channel.sessionid, "channel", "ts"
)
- 명시하여 해결
(4) DataFrame으로 처리하기(MAU)
from pyspark.sql.functions import date_format, asc, countDistinct
session_df.withColumn('month', date_format('ts', 'yyyy-MM')).groupby('month').\
agg(countDistinct("userid").alias("mau")).sort(asc('month')).show()
- join된 데이터프레임에서, month 라는 컬럼을 추가하는데 이때 date_format은 ts컬럼을 'yyyy-MM'으로 바꿔 해결
- 해당 데이터프레임을 month를 기준으로 groupby하고,
- userId를 중복 제외하고 Count 이 때 집계된 컬럼 이름을 mau로 지정
- sort는 month를 오름차순으로 정렬
- show한다.
(5) SparkSQL로 처리하기(채널별 Unique User)
df_user_session_channel.createOrReplaceTempView("user_session_channel")
df_session_timestamp.createOrReplaceTempView("session_timestamp")
- 데이터프레임을 특정 테이블 명으로 구성
channel_count_df = spark.sql("""
SELECT channel, count(distinct userId) uniqueUsers
FROM session_timestamp st
JOIN user_session_channel usc ON st.sessionID = usc.sessionID
GROUP BY 1
ORDER BY 2 DESC
""")
- 쿼리 사용하여 집계
(6) SparkSQL로 처리하기(mau)
mau_df = spark.sql("""
SELECT
LEFT(A.ts, 7) AS month,
COUNT(DISTINCT B.userid) AS mau
FROM session_timestamp A
JOIN user_session_channel B ON A.sessionid = B.sessionid
GROUP BY 1
ORDER BY 1 DESC""")
- 쿼리 사용하여 집계
mau_df.collect()
정형 데이터 집계 방식에는 SparkSQL이 훨씬 유리한 것 같습니다. 다만, 비정형 데이터 처리나 조금 난해한 데이터 처리의 경우엔, DataFrame 방식이 더 유용한 것 같습니다.
'데이터 엔지니어링 > Spark' 카테고리의 다른 글
14. Spark SQL(Aggregation-JOIN, Shuffle JOIN, Bucket JOIN, BroadCast JOIN) (3) | 2023.08.17 |
---|---|
13. Spark SQL 소개(사용법, 외부 데이터베이스 연결) (0) | 2023.08.17 |
11. Spark DataFrame 실습(trim, split, explode, overwrite, sort) (0) | 2023.08.17 |
10. Spark DataFrame 실습(텍스트 파싱 -> 구조화 데이터 변환, 윈도우 Hadoop.dll 관련 에러해결법) (0) | 2023.08.17 |
9. Spark DataFrame 실습(Alias, catalog) (0) | 2023.08.17 |