이번 포스팅에서는 S3에 있는 데이터를 가공해서, MYSQL로 적재하는 TRANSFORM, LOAD 과정을 작성하겠습니다. 아무래도, 각각 데이터 특성이 다르다보니까, ETL 과정에서도 조금 다른 방식의 가공과 적재 방법을 사용했습니다. 일단 학습에 목적을 두기도 했지만, 실무에서 어떤 방식으로 응용될지도 조금 고민해볼 수 있었던 것 같습니다.
데이터 파이프라인
일단 이번 DAG의 파이프라인은 이렇습니다. 사실 지금은 태스크 별로, S3 데이터를 읽는게 아니라, 전역변수로 S3에서 읽고 있는데, 만약에 EC2 환경에서 메모리나 이런 부분들이 부족하다면, 태스크별로 S3에서 데이터를 읽고와서, TASK를 마무리하면 그 때 해당 변수를 제거하는 식으로 메모리를 컨트롤할 수 있을 것 같습니다. 지금은 매일 업데이트 되는 데이터가 크기가 그렇게 크지 않아서, 일단은 다른 사람이 봤을 때, 가장 이해하기 쉬운 형태로 작성했습니다. 여튼 이번 파이프라인은, 이런 구성입니다.
- S3 READ, GLOBAL TOP 50 곡 가공 및 정제 후 MYSQL 적재
- S3 READ, ARTIST 데이터 가공 및 정제 후 MYSQL 적재
- S3 READ, TRACK 데이터 가공 및 정제 후 MYSQL 적재
- S3 READ, AUDIO 데이터 가공 및 정제 후 MYSQL 적재
사실 데이터 별로 지금 INSERT 하는 방식이 다른데, 일단 간단히 요약하자면,
- GLOBAL TOP 50 : FULL REFRESH
- RDB에 적재할 때, 테이블의 데이터를 모두 지우고 업데이트하는 방식입니다. 이건 나중에 웨어하우스에서 데이터를 끌어와서 매일 저장하는 프로세스를 염두해두고, FULL REFRESH로 했는데 데이터웨어하우스를 MYSQL로만 쓴다면 이 부분은 조금 수정이 필요할 것 같습니다.
- ARTIST, TRACK : UPSERT
- id를 이미 가지고 있는 경우에는 UPDATE를 하고, id가 없는 경우에는 INSERT를 합니다. 해당 방식을 사용한 이유는, GLOBAL TOP 50 특성 상 날짜 단위로 겹치는 데이터가 많고, 혹여나 ARTIST 정보나, TRACK 정보가 바뀔 여지가 있기 때문에 UPSERT로 구현하였습니다.
- AUDIO : 부분 INSERT
- id가 없는 경우에만 INSERT 합니다. 이 데이터는 수정될 여지가 없기 때문에 이 방식을 선택했습니다.
코드 구현
(1) S3 데이터 읽기
def s3_resource_connect():
# s3 connect
AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY")
AWS_DEFAULT_REGION = "ap-northeast-2"
resource = boto3.resource('s3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_DEFAULT_REGION
)
return resource
def read_json_object(bucket, key, resource):
obj = resource.Object(bucket, key)
obj_body = obj.get()['Body'].read().decode('utf-8')
obj_json = json.loads(obj_body)
return obj_json
# s3 connect
s3 = s3_resource_connect()
bucket = 'rotton-melon'
# globalTop50 file read from S3
gt_key = 'globaltop50' + '/' + 'globaltop50_' + datetime.now().strftime('%Y%m%d') + '.json'
globaltop50 = read_json_object(bucket, gt_key, s3)
일단 먼저 S3에 있는 데이터를 읽을 때, 제가 사용한 코드입니다. boto3의 resource 메소드를 활용해서 날짜가 붙어있는 파일명으로 가져오는 형식입니다. 위에 함수에서는 AIRFLOW에서 제공하는 variable을 활용했는데, 저번 포스팅에서 언급했듯이, 이 부분에서 조금 더 유지보수가 쉬울 수 있도록 connection을 설정하는 방법으로 바꾸려고합니다. 일단 이 부분은 바뀌면 다시 포스팅하겠습니다 ㅎㅎ. 일단 이 방식으로 s3에 있는 데이터를 가지고 옵니다.
(2) FULL REFRESH JOB
def globaltop50_transformload():
# Insert(Full refresh)
try:
cur.execute("BEGIN;")
cur.execute("DELETE FROM globaltop50;")
for i in range(len(globaltop50)):
track_id = globaltop50[i]['track']['id']
disc_number = globaltop50[i]['track']['disc_number']
created_date = datetime.now()
sql = "INSERT INTO globaltop50 (track_id, disc_number, created_date) VALUES (%s, %s, %s);"
params = (track_id, i + 1, created_date)
cur.execute(sql, params)
cur.execute('COMMIT;')
logging.info(f"Today Global Top 50 Tracks INSERT SUCCESS")
except Exception as e:
cur.execute('ROLLBACK')
logging.error('An error occurred: %s', e)
raise
globaltop50 데이터의 경우는 FULL REFRESH JOB으로 일단 진행하고 있습니다. BEGIN으로 트랜잭션을 연 이후에, 삭제와 가공 그리고, INSERT를 동시에 진행합니다. 아무래도 이 부분을 조금 고민을 많이 해보고, 질문도 많이 해봐야할 것 같습니다. 지금에 있어서는 굳이 FULL REFESH JOB으로 할 필요가 있나 싶긴 한데, 일단은 백엔드에서 테이블을 가져갈 때, WHERE 쪽을 굳이 신경 쓸 필요가 없다는 점이 좋을 것 같긴한데.. 사실 이건 제 생각일 뿐이라 좀 더 고민해보겠습니다.(생각해보니까 그냥 ORDER BY해서 상위 50개만 뽑으면 되는군하..)
(3) UPSERT
def artist_transformload():
# Upsert
# count
insert_cnt = 0
update_cnt = 0
# Insert
try:
cur.execute("BEGIN;")
for i in range(len(artists)):
artist_id = artists[i]['id']
artist_name = artists[i]['name']
artist_popularity = artists[i]['popularity']
artist_followers = artists[i]['followers']['total']
artist_image_link = artists[i]['images'][0]['url']
created_date = datetime.now()
cur.execute(f"SELECT COUNT(*) FROM music_artist WHERE artist_id = '{artist_id}';")
result = cur.fetchone()[0]
# If there is no artist_id in original table -> Insert
if result == 0:
sql = "INSERT INTO music_artist (artist_id, artist_name, artist_popularity, artist_followers, artist_image_link, created_date) VALUES (%s, %s, %s, %s, %s, %s)"
params = (artist_id, artist_name, artist_popularity, artist_followers, artist_image_link, created_date)
cur.execute(sql, params)
insert_cnt += 1
logging.info(f"INSERT {params} SUCCESS")
# If there is artist_id in original table -> Update
elif result == 1:
sql = "UPDATE music_artist SET artist_name = %s, artist_popularity = %s, artist_followers = %s, artist_image_link = %s, created_date = %s WHERE artist_id = %s"
params = (artist_name, artist_popularity, artist_followers, artist_image_link, created_date, artist_id)
cur.execute(sql, params)
update_cnt += 1
logging.info(f"UPDATE {params} SUCCESS")
cur.execute('COMMIT;')
logging.info(f"{update_cnt} Artists Update to Mysql ")
logging.info(f"{insert_cnt} Artists Load to Mysql ")
except Exception as e:
cur.execute('ROLLBACK;')
logging.error('An error occurred: %s', e)
raise
이 부분은 ARTIST의 데이터를 UPSERT 하는 코드입니다. 먼저 json 데이터를 가공한 뒤에, aritst_id를 조회해보고, count가 1이라면 UPDATE를 하고, count가 0이라면 INSERT를 하는 코드입니다. UPDATE 시에 굳이 변경되지 않아도 되는 컬럼은 UPDATE를 안하려고 했는데, SPOTIFY에서 어떤 방식으로 아티스트의 정보를 업데이트하는지 몰라서 일단은 조금이라도 변경 가능성이 있는 것들은 전부 UPDATE하게 해놨습니다. 이 코드도 역시, 트랜잭션으로 구현됩니다.
(4) DAG
dag = DAG(
dag_id = 'Spotify_transformload_S3toMySQL',
start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
schedule = '@once', # 한번만 실행
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=1),
'on_failure_callback': slack.on_failure_callback,
}
)
일단 이 DAG는 앞선 포스팅에서의 DAG가 TRIGGER 해서 발생하는 DAG입니다. 그래서 schedule을 보시면 따로 지정된 건 아니고, @once로 단 한번만 실행되게끔 설정했습니다.
더 나은 파이프라인이 되려면
제가 지금 단계에서 생각해낸 파이프라인 개선 방향은
- 태스크별로 메모리에 데이터를 올리면서 메모리 사용량을 줄일 것
- AIRFLOW에서 제공하는 S3 커넥션을 통해 유지보수에 유리하게 바꿀 것
이 정도입니다. 그런데 일단 SPOTIFY 데이터의 엔지니어링 프로젝트에서 조금 아쉬운 것은
- 날짜로 통제할 수 있는 API가 아닙니다. 그래서, BACKFILL 하는 상황이 생긴다면 어제의 GLOBAL TOP 50은 불러올 수가 없습니다. 그래서 이 파이프라인에서는 하루 단위로 EXTRACT가 에러가 났다면, BACKFILL할 방법이 없다는 점입니다.,. 이 방법을 해결하기위해서 excute_date를 한번쯤 써보고 싶었는데, 이건 나중에 포스팅할 ELT 과정에서는 created_date를 활용해서 가능할 것 같습니다.. 혹시라도 SPOTIFY API에 날짜로 통제할 수 있는 데이터가 있다면 댓글 부탁드립니다 ㅜㅜ
- to_sql로 하면 편할 것 같은데 안한 이유는, log 정보를 커스터마이징하고 싶었습니다. 그래서, INSERT 된 데이터, UPDATE된 데이터의 COUNT를 직접 로그 정보에 남기고 싶었고, 제 파이프라인 특성 상 중복 데이터를 업데이트해야되는 경우가 잦기 때문에, 해당 정보를 조금 더 잘 보고 싶었습니다.
일단 ETL 과정은 마무리가 됐습니다. 사실 매일 업데이트 되는 스케줄링 데이터는 구현이 끝난 상태고, 벌크 데이터를 업데이트를 먼저 해야하는 상황입니다. 하지만 SPOTIFY API 특성 상, 물리적으로 꽤 시간이 걸려서 이 부분은 차근 차근히 진행해서 한번에 데이터를 적재할 예정입니다. 지금은 데이터웨어하우스를 레드쉬프트로 쓸까, 말까 고민 중인데 이 부분은 팀원분들과 협의를 좀 한 이후에 다음 포스팅에서 업데이트 해보겠습니다.
또 MYSQL에 있는 데이터로, ELT를 구현해보고 분석가 분과 협업해서 가벼운 머신러닝 모델도 만들어볼 예정입니다. 프로젝트가 조금 더 개선될 때마다 업데이트 해보겠습니다. 감사합니다:)
'프로젝트 회고록 > 음악 평론 웹 제작 프로젝트' 카테고리의 다른 글
4. 참조 무결성 제약 조건으로 인한 DAG 수정 (0) | 2023.05.18 |
---|---|
2. SPOTIFY 데이터 AIRFLOW로 S3에 업로드하기 (0) | 2023.05.14 |
1. SPOTIFY API로 데이터 추출하기 (2) | 2023.05.14 |