안녕하세요! 저번에는 음악 평론 웹 제작 프로젝트를 미니 프로젝트로 진행하면서, 웹의 뼈대 정도는 만들어둔 상황입니다. 아직 프론트도 제대로 구현되지 않았고, 백 쪽도 확실하게 되진 않았습니다!! 전부 완성이 되면 기능 소개글도 한번 올려보겠습니다. 그런데 뒤에 데이터 파이프라인을 만들기가, 영 쉽지 않습니다.... 특히 SPOTIFY API 에서 주는 데이터들이 그렇게 친절하지 않아서..? 직접 오류를 제어하면서 뽑는 코드를 만들려니 고생 꽤나 했는데, 재밌습니다..하하;
그래서 SPOTIPY라는 누군가 만들어둔 라이브러리를 발견하고 쓰려고 했는데, 이건 오류제어를 하면서, 계속해서 API 호출을 할 수 없었습니다. 그래서 대표적인 에러를 제어하는 방식으로, 모듈을 조금 만들어봤습니다. 일단은 데이터엔지니어 하시는 분들은 토이프로젝트 하실 때, 이 API를 자주 쓰실 수도 있는데, 만들어둔 코드 공유 드리겠습니다., 부족할 수도 있지만 저는 나름 요기나게 쓰고 있씁니닷..
def get_headers(client_id, client_secret):
endpoint = "https://accounts.spotify.com/api/token"
encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')
headers = {
"Authorization": "Basic {}".format(encoded),
}
payload = {
"grant_type": "client_credentials"
}
response = requests.post(endpoint, headers=headers, data=payload)
access_token = response.json()["access_token"]
headers = {
"Authorization": "Bearer {}".format(access_token)
}
일단 SPOTIFY는 API를 제공하는데, 액세스 토큰이 일정 시간이 지나면 토큰이 만료가 됐습니다. 그래서, 이 토큰이 만료됐다고 401 에러를 뱉는데, 이 때 get_headers 함수를 이용해서 headers를 다시 받았습니다. 그래서 401에러를 받았을 때 headers를 초기화 하는 방식으로 함수를 짰습니다. 그래서 일단 에러를 제어하는 코드를 일반적인 상황에서 전부 쓸 수 있게 따로 함수를 만들었습니다.
def extract(keyword, nid, headers, params = None):
# 이 안에 있는 모든 url은 한 페이지 안에 끝납니다.
url = get_url(keyword, nid)
r = requests.get(url, params = params, headers=headers)
if r.status_code == 200:
result = json.loads(r.text)
elif r.status_code == 429: # Too many api calls in short time
retry_after = json.loads(json.dumps(dict(r.headers)))['retry-after']
logging.info(f'#### {keyword} {nid} 429 ERROR, Will retry after {retry_after}sec')
time.sleep(int(retry_after))
r = requests.get(url, params = params, headers=headers)
result = json.loads(r.text)
elif r.status_code == 401: # Authorization Problem: header expired
logging.info(f'#### {keyword} {nid} 401 ERROR, Will get headers again')
headers = get_headers(client_id, client_secret)
r = requests.get(url, params = params, headers=headers)
result = json.loads(r.text)
else :
logging.info(r.status_code)
raise Exception('Unknown error')
return result
200번 status_code는 정상 작동 상태입니다.. 그런데 429 에러 같은 경우에는, 짧은 시간에 너무 많은 api call을 했기 때문에 일정시간 대기해야했습니다. 그런데 대기시간에 대해서 에러코드에서 안내를 해주기 때문에, 시간을 받아서 그대로 time.sleep 해주는 형태의 대처 방법이 있었습니다.
이 방식은 모듈화가 됐기 때문에 잘 사용하면 하나의 api call에서 오류를 쉽게 제어할 수 있었습니다. 그래서 코드를 보시면 keyword를 파라미터로 받는걸 보실 수 있는데, 이 파라미터는 api의 종류를 제어하게끔 짜놨습니다. get_url이라는 함수로 여러 종류의 url을 받을 수 있게 해놨습니다.
def get_url(keyword, nid):
'''
글로벌 탑 차트 조회: GlobalTop50, nid
아티스트별 탑 10 트랙 조회 : Artist_toptrack, artist_id
아티스트 세부 데이터 조회 : Artist, artist_id
트랙 세부 데이터 조회 : Track, track_id
트랙 오디오 데이터 조회 : Audio, track_id
'''
if keyword == "GlobalTop50":
# GlobalChart_id = '37i9dQZEVXbMDoHDwVN2tF'
url = f'https://api.spotify.com/v1/playlists/{nid}'
elif keyword == 'Artist_toptrack':
url = f"https://api.spotify.com/v1/artists/{nid}/top-tracks/"
elif keyword == 'Artist':
url = f"https://api.spotify.com/v1/artists/{nid}"
elif keyword == 'Track':
url = f"https://api.spotify.com/v1/tracks/{nid}"
elif keyword == 'Audio':
url = f"https://api.spotify.com/v1/audio-features/{nid}"
else:
raise Exception('you need to change First input keyword, e.g. Artist_toptrack, Artist, Track, Audio')
return url
일단은 제가 쓸 것들만 해놨는데, 혹시라도 다른 걸 쓰시고 싶다면, KEYWORD에 맞춰서 URL만 잘 추가해주시면 됩니다. 저는 에어플로우로 스케줄링을 해보고 싶어서, 매일 업데이트 되는 데이터를 찾았습니다. 근데 매일 업데이트되는 데이터가 Global Top 50이 있었습니다. api키로 딱 정해져있던 건 아니고, playlist의 id를 찾아서 그 아이디를 url에 써서 제공 받는 형식입니다. 제가 글로벌 차트는 저렇게 써놨는데, 다른 플레이리스트의 아이디들도 쉽게 웹에서 제공 받을 수 있습니다. 일단 데이터를 뽑는 순서를 정리해서 말씀드리면,
- Global Top 50 데이터 추출 : 먼저 이 데이터는 따로 저장하고, 여기에 있는 아티스트의 아이디를 가지고 옵니다.
- Artist : 아티스트 아이디를 활용해서, 아티스트의 정보를 가지고 옵니다.
- Artist Toptrack : 앞선 id를 활용해서 아티스트의 탑 10 트랙을 가지고 옵니다. 저희는 음악 평론 웹을 제작 중이기 때문에, 가장 인기 많았던 top10만 가지고 왔습니다.
- Track : 앞서 뽑은 TopTrack에서 Track_id를 활용해서 Track 세부 정보를 가지고 옵니다.
- Audio : 앞서 뽑은 TopTrack에서 Track_id를 활용해서 Audio 세부 정보를 가지고 옵니다.
일단 제가 활용한 api는 이렇지만, spotify api doc에서 찾기 힘든 api라도, spotipy(라이브러리)가 오픈소스라서 내부 코드 들어가보면 api url 몇개는 쉽게 찾을 수 있습니다. 저는 top-track을 오픈소스 내부 코드에서 찾았습니다. 그래서 이 모듈을 어떻게 활용했냐면, get_url은 extract 함수 내부에서 돌고, api 호출 한번 당 에러를 제어할 수 있도록 짜놨습니다.
# global Top 50의 아티스트 추출
def globaltop50_extract():
headers = get_headers(client_id, client_secret)
with open(globaltop50_save_path, 'w') as gt:
json.dump([], gt)
with open(globaltop50_save_path, 'r') as gt:
gt50 = json.load(gt)
# 현재 Global Top 50 음악 추출
headers = get_headers(client_id, client_secret)
data = extract("GlobalTop50", '37i9dQZEVXbMDoHDwVN2tF', headers = headers) # nid는 플레이리스트의 id
# 통으로 json 파일에 집어넣기
gt50.extend(data['tracks']['items'])
# save json file
with open(globaltop50_save_path, 'w') as gt:
json.dump(gt50, gt, indent=4)
logging.info('######## Global Top 50 file save success')
단 글로벌 TOP 50 같은 경우는 데이터를 한번에 json으로 밀어주는데, 일단 이건 굉장히 간단했습니다. 그런데 aritst의 top track을 받아왔을 때 track_id로 audio 정보, track 세부 정보를 받아와야하기 때문에 api 호출량이 엄청 늘어나는 문제가 있었습니다.
그래서 가끔 401에러나 429에러를 만났는데, 이 과정을 거치면서 나온 코드가 위의 저 모듈들입니다... 지금은 raw 데이터를 json 형식으로 모아서, 매일 s3에 적재하는 과정을 AIRFLOW로 자동화하는 중입니다. 사실 지금은 S3를 이용하지 않고, 로컬에서 돌고 있는데 아마 내일 쯤? 진행할 것 같습니다. 일단 지금 RDB에 퍼올리는 데이터는 이렇게 두 가지로 나뉩니다.
- 벌크 데이터(아티스트 10,000명 분의 데이터)
- 매일 업데이트 되는 데이터(금일 포스팅한 데이터들)
일단 하루 단위로 업데이트 되는 데이터는 데이터가 그렇게 크지않아서, 사실 API 호출에 별 문제가 없습니다. 그런데 벌크 데이터 같은 경우는 지금 429에러가 계속 떠서, 긴 시간 대기를 계속하게 되는?? 그런 상황입니다. 일단은 계속 진행하면서 데이터 받고, 데이터 적재하고~ 해보겠습니다.
지금은 이 데이터들이 전부 s3에 json 파일 형태로 저장되고, 이 s3에 있는 데이터를 관계형 데이터로 transform 해서 mysql로 load하는 방식입니다. 일단 지금은 s3를 빼면 코드는 구현이 되었습니다. 차근차근히 과정 포스팅하면서 공부해보겠습니다. 나중엔 GIT에 코드 다 올려서 업데이트 해놓겠습니다. 감사합니다:)
'프로젝트 회고록 > 음악 평론 웹 제작 프로젝트' 카테고리의 다른 글
4. 참조 무결성 제약 조건으로 인한 DAG 수정 (0) | 2023.05.18 |
---|---|
3. S3 데이터 가공해서 MYSQL로 적재하기 (2) | 2023.05.14 |
2. SPOTIFY 데이터 AIRFLOW로 S3에 업로드하기 (0) | 2023.05.14 |