안녕하세요. 이번에는 저번 포스팅에 이어서, SPOTIFY API를 활용해서, AIRFLOW로 S3에 업로드하는 과정을 자동화한 것에 대해서 작성하겠습니다. 확실히 S3에 업로드 하는 과정에서 조금 고민거리들이 많았는데, 일단은 진행하면서 배우는 중입니다. 그럼 이 과정들을 소개해보도록 하겠습니다.
데이터 파이프라인 설계
일단 AIRFLOW로 S3에 업로드하는 것은, EXTRACT 과정으로 삼았습니다. 원본 데이터를 JSON에 그대로 저장하고 S3에 업로드해서 AIRFLOW 스케줄러에 문제가 생겼을 때도, 어느 정도 복구할 수 있도록 S3를 거쳐가도록 파이프라인을 설계했습니다.
일단은 Spotify_Extract_toS3의 DAG의 파이프라인입니다. 일단 순서는 다음과 같습니다.
- globaltop50 데이터 추출 및 json 파일 저장
- globaltop50 데이터 활용, artist 데이터 추출 및 json 파일 저장
- artist 데이터 활용, hot 10 tracks 추출 및 활용, track, audio 세부 정보 추출 및 json 파일 저장
- 저장 된 파일 한꺼번에 s3로 업로드
- 다음 dag(s3 데이터 읽어서 transform하고 mysql 적재하는 dag) trigger
tirgger Dag를 제외하면 전부다 PythonOperator로 작성했습니다. 그런데 이 과정에서 json 파일을 로컬에 저장할까 말까하다가, 아무래도 곧바로 S3에 쏘는 것보다는 로컬에 저장하고, S3에 업로드하는게 조금 더 안정적일 것 같아서 일단은 이렇게 진행했습니다. 그런데, 여기서 비효율이 발생한다고 하면 언제든지 고칠 생각입니다. 일단은 아직 구현은 안됐지만, 로컬에 쌓인 데이터는 BashOperator를 통해서 주기적으로 삭제해줄 예정입니다. 대표적으로 track_audio_extract_task 코드로 구조를 한번 리뷰해보겠습니다.
EXTRACT 코드
# Global Top 50 아티스트의 TopTracks 10곡, 10곡에 대한 Audio_feature 추출
def track_audio_extract():
try:
headers = get_headers(client_id, client_secret)
# open artists data
with open(artist_save_path, 'r') as f:
artists = json.load(f)
# vacant tracks json file load
with open(track_save_path, 'w') as newf:
json.dump([], newf)
with open(track_save_path, 'r') as newf:
tracks = json.load(newf)
# vacant audio json file load
with open(audio_save_path, 'w') as newf2:
json.dump([], newf2)
with open(audio_save_path, 'r') as newf2:
audio = json.load(newf2)
## 전 artist를 조회하기 위한 반복문
for artist in artists:
artist_id = artist['id']
artist_name = artist['name']
logging.info(f'######## {artist_name} extract start')
params = {'country' : 'US'}
result = extract('Artist_toptrack', artist_id, headers, params = params)
for track in result['tracks']:
track_id = track['id']
# track json Update
track_info = extract('Track', track_id, headers)
tracks.append(track_info)
logging.info(f'#### {artist_name} {track_id} extract track success')
# audio json Update
audio_info = extract('Audio', track_id, headers)
audio.append(audio_info)
logging.info(f'#### {artist_name} {track_id} extract audio success')
# save json file
with open(track_save_path, 'w') as newf:
json.dump(tracks, newf, indent=4)
logging.info('######## Track file save success')
# save json file
with open(audio_save_path, 'w') as newf2:
json.dump(audio, newf2, indent=4)
logging.info('######## Audio file save success')
except Exception as e:
logging.error('An error occurred: %s', e)
raise
일단은 저번에 올린 api 오류 제어 함수를 최대한 활용했고, 스포티파이 api를 최대한 활용하기 위해서 한 아티스트 당 호출이 꽤 많습니다. 그 이유는, artist의 hot track 10곡을 뽑았을 때, track에 대한 세밀한 정보가 나오지 않기 때문에, 다시 한번 그 데이터 안에 있는 track_id를 사용해야했습니다. 그래서 이 track_id로 다른 api를 이용해서 track 세부 정보와, audio의 세부 정보를 뽑는 코드입니다.
보통 hot track이 10곡이니까 한 아티스트에 대해서 21번 정도 API 콜을 합니다. 처음엔 걱정이 꽤 많았는데, 아티스트 50명정도를 한꺼번에 돌려도 429에러나, 401에러 없이 잘 돌아갑니다. 간혹 뜨긴 하는데, 금방 제어됩니다. 그래서 이렇게 뽑은 데이터를 빈 json 파일을 만들어서 각각에 넣고, 마지막에 save하는 구조입니다.
dag = DAG(
dag_id = 'Spotify_Extract_toS3',
start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 17 * * *', # 한국 기준 새벽 2시
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=1),
'on_failure_callback': slack.on_failure_callback,
}
)
track_audio_extract_task = PythonOperator(
task_id = 'track_audio_extract_task',
python_callable = track_audio_extract,
dag = dag)
DAG와 TASK 중에서 일부만 보여드리면, Dag가 실패했을 때, 한번 재시도를 하게 해놨고, 또 슬랙으로 메세지가 오게끔 해놨습니다.
그래서 이런 방식으로 데이터를 추출하고 로컬에 데이터를 저장합니다. 이제 모든 데이터들을 s3로 올립니다.
S3 업로드
def s3_upload():
try:
# s3 connect, bucket name
s3 = s3_client_connect()
bucket = 'rotton-melon'
# s3 upload
s3.upload_file(globaltop50_save_path, bucket, 'globaltop50/' + os.path.split(globaltop50_save_path)[1])
s3.upload_file(artist_save_path, bucket, 'artists/' + os.path.split(artist_save_path)[1])
s3.upload_file(track_save_path, bucket, 'tracks/' + os.path.split(track_save_path)[1])
s3.upload_file(audio_save_path, bucket, 'audio/' + os.path.split(audio_save_path)[1])
logging.info('######## All files Upload to S3 success')
except Exception as e:
logging.error('An error occurred: %s', e)
raise
이 데이터들을 s3로 올리는데에는 boto3 파이썬 라이브러리를 활용했는데, AIRFLOW connection을 통해서 업로드하는 걸로 바꿀 예정입니다. 일단 처음 s3로 업로드해보는 거라 굉장히 실패가 많았습니다.., 일단은 저는 모듈화해놓고 plugins에 따로 코드를 넣어두고 쓰고 있어서, boto3로 s3 업로드하는 방법은 제 블로그 포스팅에 있습니다.
https://dataengineerstudy.tistory.com/181
그래서 결과는...
폴더별로 데이터가 잘 들어오고 있습니다.
어떻게 더 나은 파이프라인으로 만들 수 있을까
일단 이 부분에서의 지금 발전 시킬 것은,
- json을 로컬에 저장하는데, 이 데이터들을 어떻게 처리할 것인가 -> BashOperator를 통한 주기적 삭제 자동화
- s3 업로드를 airflow CONNECTION을 통해서 조금 더 유지보수에 쉬운 CONNECTION 방법 사용
이 정도가 있습니다. 일단은 AIRFLOW는 잘 돌아가고 있고, 다음 포스팅에서는 S3 데이터를 읽어서 가공한 뒤에, MYSQL로 적재하는 것에 대해서 포스팅 하도록 하겠습니다. 이 부분에서 고민이 상당히 많았어서, 내용이 조금 길어질수도 있을 것 같습니다. 감사합니다:)
'프로젝트 회고록 > 음악 평론 웹 제작 프로젝트' 카테고리의 다른 글
4. 참조 무결성 제약 조건으로 인한 DAG 수정 (0) | 2023.05.18 |
---|---|
3. S3 데이터 가공해서 MYSQL로 적재하기 (2) | 2023.05.14 |
1. SPOTIFY API로 데이터 추출하기 (2) | 2023.05.14 |