Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
918 views
in Technique[技术] by (71.8m points)

scala - Achieve concurrency when saving to a partitioned parquet file

When writing a dataframe to parquet using partitionBy :

df.write.partitionBy("col1","col2","col3").parquet(path)

It would be my expectation that each partition being written were done independently by a separate task and in parallel to the extent of the number of workers assigned to the current spark job.

However there is actually only one worker/task running at a time when writing to the parquet. That one worker is cycling through each of the partitions and writing out the .parquet files serially. Why would this be the case - and is there a way to compel concurrency in this spark.write.parquet operation?

The following is not what I want to see (should be 700%+ ..)

enter image description here

From this other post I also tried adding repartition in front

Spark parquet partitioning : Large number of files

df.repartition("col1","col2","col3").write.partitionBy("col1","col2","col3").parquet(path)

This unfortunately had no effect: still one worker only..

Note: I am running in local mode with local[8] and have seen other spark operations run with as many as eight concurrent workers and using up to 750% of the cpus.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

In short, writing the multiple output files from a single task is not parallelized, but assuming you have multiple tasks (multiple input splits) each one of those will get their own core on a worker.

The goal of writing out partitioned data isn't to parallelize your writing operation. Spark is already doing that by simultaneously writing out multiple tasks at once. The goal is just to optimize future read operations where you want only one partition of the saved data.

The logic to write partitions in Spark is designed to read all of the records from the previous stage only once when writing them out to their destination. I believe part of the design choice is also to protect against the case where a partition key has many many values.

EDIT: Spark 2.x method

In Spark 2.x, it sorts the records in each task by their partition keys, then it iterates through them writing to one file handle at a time. I assume they are doing this to ensure they never open a huge number of file handles if there are a lot of distinct values in your partition keys.

For reference, here is the sorting:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L121

Scroll down a little and you will see it calling write(iter.next()) looping through each row.

And here is the actual writing (one file/partition key at a time):

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L121

There you can see it only holds one file handle open at a time.

EDIT: Spark 1.x method

What spark 1.x does is for a given task is loop through all the records, opening a new file handle whenever it encounters a new output partition it hasn't seen before for this task. It then immediately writes the record to that file handle and goes onto the next one. This means at any given time while processing a single task it can have up to N file handles open just for that one task where N is the maximum number of output partitions. To make it clearer, here is some python psuedo-code to show the general idea:

# To write out records in a single InputSplit/Task
handles = {}
for row in input_split:
    partition_path = determine_output_path(row, partition_keys)
    if partition_path not in handles:
        handles[partition_path] = open(partition_path, 'w')

    handles[partition_path].write(row)

There is a caveat to the above strategy for writing out records. In spark 1.x the setting spark.sql.sources.maxConcurrentWrites put an upper limit on the mask file handles that could be open per task. After that was reached, Spark would instead sort the data by the partition key, so it could iterate through the records writing out one file at a time.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...