실습 3. 텍스트를 파싱해서 구조화된 데이터로 변환하기
- Regex를 이용해서 아래와 같이 변환하는 것이 목표
- 입력
- "On 2021-01-04 the cost per ton from 85001 to 85002 is $28.32 at ABC Hauling"
- regex 패턴 : "On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)"
- \S (non-whitespace character), \d(numeric character)
- 출력
(1) Spark Session build
from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.app.name", "PySpark DataFrame #3")
conf.set("spark.master", "local[*]")
spark = SparkSession.builder\
.config(conf=conf)\
.getOrCreate()
(2) Spark read text
import pyspark.sql.functions as F
from pyspark.sql.types import *
schema = StructType([ StructField("text", StringType(), True)])
# 텍스트를 로드
transfer_cost_df = spark.read.schema(schema).text("./data/transfer_cost.txt")
- StructFiled를 text로 schema를 지정한 후, 텍스트를 로드합니다.
# truncate = False면 기본적으로 필드의 크기가 어느 선을 넘어가면 잘려서 표시되는데, 그냥 다 보여달라는 뜻
transfer_cost_df.show(truncate=False)
(3) Spark Text Processing(with Regex)
from pyspark.sql.functions import *
regex_str = r'On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)'
df_with_new_columns = transfer_cost_df\
.withColumn('week', regexp_extract('text', regex_str, 1))\
.withColumn('departure_zipcode', regexp_extract(column('text'), regex_str, 2))\
.withColumn('arrival_zipcode', regexp_extract(transfer_cost_df.text, regex_str, 3))\
.withColumn('cost', regexp_extract(col('text'), regex_str, 4))\
.withColumn('vendor', regexp_extract(col('text'), regex_str, 5))
- DataFrame 지원 function 중 withColumn이 있습니다.
- 해당 방식을 사용하면, 추가하려는 컬럼 혹은 존재하는 컬럼을 지정한 후
- 컬럼 이름, Value를 지정할 수 있습니다.
- 이 때 Value에 함수를 작성해주면, User Define Function(UDF)를 사용할 수 있습니다.
- 다만, 이번 예시에서는 sql.functions 내에 있는 regex_extract 라는 함수를 사용해서,
- regex가 정의 된 곳에서 순서대로 값을 추출합니다.(0이 시작이 아닌 1이 시작 값)
# text라는 컬럼 Drop
final_df = df_with_new_columns.drop("text")
- 추출이 된 후, 기존에 있던 text 컬럼은 Drop 합니다.
(4) Save DataFrame
final_df.write.csv("extracted.csv")
- 데이터프레임을 csv 형태로 저장합니다.
final_df.write.format("json").save("extracted.json")
- 데이터프레임을 json 형태로 저장합니다.
- 데이터를 저장하면 저장한 이름으로 폴더가 구성이 되고 그 안에 데이터가 저장됩니다. 이유는 다음과 같습니다.
- 큰 데이터를 프로세싱해서 저장하려고하면, 굉장히 큰 크기를 가질 수 있는데, HDFS에 저장 될 때는 블락 단위로 나눠서 저장됩니다.
- 보통 128MB 단위로 여러개의 파일로 저장될 수 있지만, 해당 예제의 경우엔 데이터가 작아 파일 하나로 저장되었습니다.
* 윈도우 사용시 에러 해결방법(write.csv)
저의 경우엔 spark 3.4.1, hadoop3 버전 활용 중입니다. write.csv 사용 시에 에러가 발생했습니다.
{
"name": "Py4JJavaError",
"message": "An error occurred while calling o152.csv.\n: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'\r\n\tat org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)\r\n\tat org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)\r\n\tat org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)\r\n\tat org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)\r\n\tat org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)\r\n\tat org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)\r\n\tat org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)\r\n\tat org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)\r\n\tat org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)\r\n\tat org.apache.hadoop.fs.
이런 에러였고, 해결하기 위해서 stackoverflow에서 찾은 방법입니다.
Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/Str
trying to run MR program version(2.7) in windows 7 64 bit in eclipse while running the above exception occurring . I verified that using 64 bit 1.8 java version and observed that all the hadoop d...
stackoverflow.com
- 일단 윈도우에서 Hadoop 자체에 대한 인식 문제였고, java 프로그램에서 dll 파일에 제대로 접근할 수 없었습니다.
- dll 파일을 다운로드(제 경우엔 3.0.0 버전으로 넣었습니다)
- https://github.com/steveloughran/winutils/blob/master/hadoop-3.0.0/bin/hadoop.dll
- HADOOP/bin <- (환경변수에 설정된 폴더) dll 파일 삽입
- C:\Windows\System32 <- dll 파일 삽입
이렇게 하면 윈도우에서도 성공적으로 write.csv를 진행할 수 있습니다.
'데이터 엔지니어링 > Spark' 카테고리의 다른 글
12. Spark DataFrame 실습(Redshift 연결, jdbc, join) (0) | 2023.08.17 |
---|---|
11. Spark DataFrame 실습(trim, split, explode, overwrite, sort) (0) | 2023.08.17 |
9. Spark DataFrame 실습(Alias, catalog) (0) | 2023.08.17 |
8. Spark DataFrame 실습(SparkSession, conf, Schema, Filter, select, SparkSQL, sql.types, 컬럼지칭 방식) (0) | 2023.08.17 |
7. 윈도우에 Local Standalone Spark 클러스터 설치, Spark-submit 오류 해결, findspark (0) | 2023.07.28 |