월별 채널별 매출과 방문자 정보 계산하기
- user_session_channel, session_timestamp, session_transaction 테이블 사용
- 다음 형태의 결과를 만들어보기
1. Spark Session Build
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL #1") \
.config("spark.jars", "/usr/local/lib/python3.8/dist-packages/pyspark/jars/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar") \
.getOrCreate()
2. Redshift Connect
# Redshift와 연결해서 DataFrame으로 로딩하기
url = "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=Guest1234"
df_user_session_channel = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.user_session_channel") \
.load()
df_session_timestamp = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.session_timestamp") \
.load()
df_session_transaction = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.session_transaction") \
.load()
3. 임시 테이블 생성
df_user_session_channel.createOrReplaceTempView("user_session_channel")
df_session_timestamp.createOrReplaceTempView("session_timestamp")
df_session_transaction.createOrReplaceTempView("session_transaction")
spark.table("session_timestamp").show(5)
df_user_session_channel.show(5)
df_session_timestamp.show(5)
df_session_transaction.show(5)
4. JOIN KEY의 중복 여부 판단 쿼리
spark.sql("""SELECT sessionid, COUNT(1) count
FROM user_session_channel
GROUP BY 1
ORDER BY 2 DESC
LIMIT 1""").show()
- 데이터엔지니어로서 중요한 행동 양식 : 데이터를 의심하는 버릇
- 정말로 session_id가 각 데이터셋에서 unique한 것인지 확인
- 세션 id 별로 카운트하는데, ORDER BY를 통해서 가장 큰 count 값을 확인
- count 값이 1보다 크다면 중복 발생
5. 월별 채널별 총 방문자 계산
mon_channel_rev_df = spark.sql("""
SELECT LEFT(sti.ts, 7) year_month,
usc.channel channel,
COUNT(DISTINCT userid) total_visitors
FROM user_session_channel usc
LEFT JOIN session_timestamp sti ON usc.sessionid = sti.sessionid
GROUP BY 1 ,2
ORDER BY 1, 2""")
6. 월별 채널별 총 방문자와 구매 방문자 계산
mon_channel_rev_df = spark.sql("""
SELECT LEFT(sti.ts, 7) year_month,
usc.channel channel,
COUNT(DISTINCT userid) total_visitors,
COUNT(DISTINCT CASE WHEN amount is not NULL THEN userid END) paid_visitors
FROM user_session_channel usc
LEFT JOIN session_timestamp sti ON usc.sessionid = sti.sessionid
LEFT JOIN session_transaction str ON usc.sessionid = str.sessionid
GROUP BY 1 ,2
ORDER BY 1, 2""")
7. 월별 채널별 총 매출액 (리펀드 포함), 총 방문자, 매출 발생 방문자, 전환율 계산
mon_channel_rev_df = spark.sql("""
SELECT LEFT(ts, 7) month,
usc.channel,
COUNT(DISTINCT userid) uniqueUsers,
COUNT(DISTINCT (CASE WHEN amount >= 0 THEN userid END)) paidUsers,
SUM(amount) grossRevenue,
SUM(CASE WHEN refunded is not True THEN amount END) netRevenue,
ROUND(COUNT(DISTINCT CASE WHEN amount >= 0 THEN userid END)*100
/ COUNT(DISTINCT userid), 2) conversionRate
FROM user_session_channel usc
LEFT JOIN session_timestamp t ON t.sessionid = usc.sessionid
LEFT JOIN session_transaction st ON st.sessionid = usc.sessionid
GROUP BY 1, 2
ORDER BY 1, 2;
""")
'데이터 엔지니어링 > Spark' 카테고리의 다른 글
21. Hive 메타스토어 사용하기 (0) | 2023.08.22 |
---|---|
20. SPARK SQL(Windowing) (0) | 2023.08.22 |
18. SparkSQL(Redshift Connect, Ranking) (0) | 2023.08.21 |
17. SparkSQL(JOIN - DataFrame, SQL) (0) | 2023.08.21 |
16. SparkSQL(udf 활용 dataframe 가공 및 spark sql 활용 집계) (1) | 2023.08.19 |