-
Notifications
You must be signed in to change notification settings - Fork 224
Added example showing parallel writes to parquet (x num_cores) #436
Conversation
cc @ritchie46 @sundy-li @houqp @ritchie46 I tried to summarize in this example how we can go about writing parquet files in parallel, by moving the CPU-bounded work away from the IMO we should not offer a For an iterator of batches, the Regardless, the principle is the same: split IO-bounded from CPU-bounded and leverage all kinds of parallelism tricks to saturate IO. When the order of the batches does not matter, there is a further optimization on which we parallelize both columns and records (in which case the records are written as they are processed). |
Codecov Report
@@ Coverage Diff @@
## main #436 +/- ##
=======================================
Coverage 80.78% 80.79%
=======================================
Files 372 372
Lines 22645 22645
=======================================
+ Hits 18294 18295 +1
+ Misses 4351 4350 -1
Continue to review full report at Codecov.
|
fab5e2c
to
d0c6bf5
Compare
Thanks for this great example and discussion starter. I am also interested what kind of overhead why might expect when parallelizing across columns. Like in this classic paper: The "traditional" approach of e.g. Spark is to map one thread per (input or output) file. Great when the number of target files is bigger than the number of threads - but not ideal when having one file as output, where having more parallelism in e.g. a parquet writer could be beneficial. Also at greater concurrency (lot's of users using a system on shared resources) resource efficiency and scalability is another angle which is also important. |
This is really cool @jorgecarleitao . The abstraction looks solid 👍
I feel like even if we have more files than threads, this is still not optimal right? For example, encoding+compression for a single file might not incur enough compute to saturate a single core? Perhaps this is where @andygrove 's idea of introducing a scheduler to a compute engine would help. The scheduler can first decide on what's the unit of parallelism (column or records) based on requirements. Then schedule as many parallel tasks as it can until we saturate either CPU or IO. |
@alamb might be interested in doing this for iox too :) |
For this reason I agree with @jorgecarleitao that this behavior should be delegated to the implemenations. For Anyhow.. very nice! 🚀 |
Looks neat 👍 Do I read the code correctly that for this approach the resulting parquet pages must be all buffered in memory before writing out the file? Such buffering is probably fine for most usecases (including IOx) though for larger data the ability to do streaming writes is likely desirable. Very cool |
Yes - but usually output bandwidth (e.g. to S3) is higher than the writing / compression speed, so writing has usually very high cpu utilization, depending on de size of the partitions/files of course. Encoding + compression is actually very expensive and hard to achieve the full 10-25Gbps (or more) per node. Even if possible, there is no use to try to be faster than that! Some nice strategy by Databricks/Spark that you might have heard is the optimized writes https://docs.databricks.com/delta/optimizations/auto-optimize.html#when-to-opt-in-to-optimized-writes . I personally think there are more directions / techniques to try beyond what Spark has to offer so happy for this example and looking forward to more experiments in the area. |
To me it looks like a task per batch and column, so that should work in a streaming manner too? The 10M batch size in this example is pretty large I would say (>1000x the DF default). |
@ritchie46 Yes agreed. |
That is great observation. Note that this for a single I believe that we can't have both "parallelism" and "order" without buffering: once we introduce two workers, either we write the results as they arrive, or we need to potentially buffer up to all pages to keep them in order. The Alternatively, we could parallelize over the other dimension ( Finally, we may not parallelize, but them we likely not saturate IO and thus not maximize our capacity (at least not without writing to other files, a-la spark, which has other consequences like the default 200 partitions). A reason why this op is usually CPU-bounded is that high compression requires long pages (so that all values belong to the same "compression unit"), which requires going through a lot of data; the higher the number of groups, the less compressed the file is. cc'ing @xhochy here because I learnt most of this from him, and will likely correct me here (see https://github.com/elastacloud/parquet-dotnet/issues/392 for some context). |
Encoding + compression is embarrassingly parallel across columns, and thus results in a speedup factor equal to the number of available cores, up to the number of columns to be written.