사용자별로 처음 채널과 마지막 채널 알아내기
- user_session_channel, session_timestamp 테이블 사용
- 다음 형태의 결과를 만들어보기
- 사용자 251번의 시간 순으로 봤을 때 첫번째 채널과 마지막 채널은 무엇인가?
- 노가다를 하자면 아래 쿼리를 실행해서 처음과 마지막 채널을 보면 된다.
SELECT ts, channel
FROM user_session_channel usc
JOIN session_timestamp st ON usc.sessionid = st.sessionid
WHERE userid = 251
ORDER BY 1
- ROW_NUMBER를 이용해서 해보자
- ROW_NUMBER () OVER (PARTITION BY field1 ORDER BY field2) nn
- vs. FIRST_VALUE/LAST_VALUE
- user_session_channel, session_timestamp 테이블 사용
- 다음 형태의 결과를 만들어보기
ROW_NUMBER의 동작
- 1. 사용자별로 시간순으로 일련번호를 매기고 싶다면?
- 2. 새로운 컬럼 추가!!
- 사용자별로 레코드를 모으고 그 안에서 시간 순으로 소팅한 후 사용자별로 1부터 번호 부여
- 3. ROW_NUMBER를 써서 2를 구현
ROW_NUMBER OVER (partition by userid order by ts) seq
ROWS BETWEEN AND 이해하기
SELECT
SUM(value) OVER (
order by value
rows between 2 preceding and 2 following
) AS rolling_sum
FROM rows_test;
- 앞의 두개와 뒤의 2개를 더하는 쿼리
- 만약 앞에는 다 더하고, 뒤에는 2개만 더하고 싶다면
- rows between unbounded preceding and 2 following
- 이런 식으로 unbounded로 대체
(1) Spark Session Build
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL #1") \
.getOrCreate()
rows_test = [
{ 'value': 1, 'name': 'Luka' },
{ 'value': 2, 'name': 'Luka'},
{ 'value': 3, 'name': 'Dirk' },
{ 'value': 4, 'name': 'Dirk' },
{ 'value': 5, 'name': 'Luka' },
]
df = spark.createDataFrame(rows_test)
(2) Redshift connect
# Redshift와 연결해서 DataFrame으로 로딩하기
url = "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=아이디&password=비밀번호"
df_user_session_channel = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.user_session_channel") \
.load()
df_session_timestamp = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.session_timestamp") \
.load()
df_session_transaction = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.session_transaction") \
.load()
(3) 임시 테이블 생성
df_user_session_channel.createOrReplaceTempView("user_session_channel")
df_session_timestamp.createOrReplaceTempView("session_timestamp")
df_session_transaction.createOrReplaceTempView("session_transaction")
(4) ROW_NUMBER를 사용해서 처음 채널과 마지막 채널 알아내기
# WITH를 활용해서 임시 테이블 만들기
# 임시 테이블 해서 ROW_NUMBER를 부르는데, 정렬을 다르게 한 후
# 채널을 구하기 위해서 SELF 조인형태처럼 진행
first_last_channel_df = spark.sql("""
WITH RECORD AS (
SELECT /*사용자의 유입에 따른, 채널 순서 매기는 쿼리*/
userid,
channel,
ROW_NUMBER() OVER (PARTITION BY userid ORDER BY ts ASC) AS seq_first,
ROW_NUMBER() OVER (PARTITION BY userid ORDER BY ts DESC) AS seq_last
FROM user_session_channel u
LEFT JOIN session_timestamp t
ON u.sessionid = t.sessionid
)
SELECT /*유저의 첫번째 유입채널, 마지막 유입 채널 구하기*/
f.userid,
f.channel first_channel,
l.channel last_channel
FROM RECORD f
INNER JOIN RECORD l ON f.userid = l.userid
WHERE f.seq_first = 1 and l.seq_last = 1
ORDER BY userid
""")
- ROW_NUMBER를 활용할 때는 WITH를 활용한 테이블 조인을 활용한다
- 첫번째 채널을 위한 ROW_NUMBER, 마지막 채널을 위한 ROW_NUMBER를 구하고
- seq_first가 1일 때, seq_last가 1일 때의 channel을 구한다.
- 다소 쿼리가 복잡해지는 측면이 있음
(5) FIRST_VALUE, LAST_VALUE를 사용해서 처음 채널과 마지막 채널 알아내기
# BOUND 한 것 중에 PARTITION 하고 같은 PARTITION 안에서 오름 차순으로 세팅을 한 다음에
# 채널의 FIRST VALUE, LAST VALUE를 뽑는다
# rows between : rolling window를 어떻게 가져갈거냐, unbounded를 양쪽에 쓰면 rolling window가 아니라 전체 partition 안에서 전부 보겠다
first_last_channel_df2 = spark.sql("""
SELECT DISTINCT A.userid,
FIRST_VALUE(A.channel) over(partition by A.userid order by B.ts
rows between unbounded preceding and unbounded following) AS First_Channel,
LAST_VALUE(A.channel) over(partition by A.userid order by B.ts
rows between unbounded preceding and unbounded following) AS Last_Channel
FROM user_session_channel A
LEFT JOIN session_timestamp B
ON A.sessionid = B.sessionid""")
- PARTITION 중 unbounded를 통해서 첫번째 채널과 마지막 채널을 알아낸다
- 앞선 예시보다 조금 더 쿼리가 단순해지는 형태
- unbounded 대신 숫자를 써서 해당 윈도우 내에서의 연산 범위를 조정할 수도 있음
'데이터 엔지니어링 > Spark' 카테고리의 다른 글
22. Unit Test (0) | 2023.08.22 |
---|---|
21. Hive 메타스토어 사용하기 (0) | 2023.08.22 |
19. SPARK SQL(Redshift Connect, Grouping) (0) | 2023.08.22 |
18. SparkSQL(Redshift Connect, Ranking) (0) | 2023.08.21 |
17. SparkSQL(JOIN - DataFrame, SQL) (0) | 2023.08.21 |