# coalesce라는 함수는 파티션의 수를 줄이는 함수
# 셔플링을 최소화하는 방향으로 줄인다.
df3 = df2.coalesce(2)
print("Num Partitions after: " + str(df3.rdd.getNumPartitions()))
df3.groupBy(spark_partition_id()).count().show()
파티션 2개로 줄인 후, partition_id별로 데이터는 분산
(6) 데이터 저장 및 비교
# df는 파일 하나만 저장(파티션이 한개)
df.write \
.format("avro") \
.mode("overwrite") \
.option("path", "dataOutput/avro/") \
.save()
# df2는 파티션이 4개였고, 파일 4개
df2.write \
.format('parquet') \
.mode('overwrite') \
.option('path', 'dataOutput/parquet/') \
.save()
# df3는 파티션이 2개였고, 파일 두 개
df3.write \
.format('json') \
.mode('overwrite') \
.option('path', 'dataOutput/json/') \
.save()
ParquetJson
6. Schema Evolution : read한 데이터를 schema에 따라 합치기
df1 = spark.read. \
parquet("./data/schema1.parquet")
df2 = spark.read. \
parquet("./data/schema2.parquet")
df3 = spark.read. \
parquet("./data/schema3.parquet")
# schema 파일을 동시에 합쳐서 데이터를 합칠 수 있다.
# mergeSchema를 트루로 지정
df = spark.read. \
option("mergeSchema", True). \
parquet("./data/*.parquet")
df.printSchema()