Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2.x] Replace ParallelGzipOutputStream with Ichoran's implementation #1456

Merged
merged 5 commits into from
Oct 15, 2024

Conversation

Friendseeker
Copy link
Member

@Friendseeker Friendseeker commented Oct 12, 2024

@Ichoran thank you for your contribution!


Overview

This PR features Ichoran's implementation of ParallelGzipOutputStream that provides binary-identical output and similar performance compared to Stefan's ParallelGzipOutputStream but uses raw threads directly to avoid concurrency issue with Stefan's implementation.

Validation

New unit tests are added to stress test ParallelGzipOutputStream against a variety of data size & parallelism settings. Tests involving multiple threads each creating & using ParallelGzipOutputStream is added to simulate Zinc's multithread usage.

Flow Diagram of Ichoran's ParallelGzipOutputStream algorithm

image

@Friendseeker
Copy link
Member Author

Friendseeker commented Oct 12, 2024

TODO: add a bunch of unit tests for ParallelGzipOutputStream

Non concurrent tests:

  • Lots of roundtrip tests for data correctness

Concurrent tests:

  • setting parallelism to be 1
  • setting parallelism to be 4
  • setting parallelism to be Runtime.getRuntime.availableProcessors()
  • setting parallelism to be 10000
  • Construct 10 ParallelGzipOutputStream, each with parallelism 4 and let each compress data concurrently.
  • Construct 100 ParallelGzipOutputStream, each with parallelism 100 and let each compress data concurrently.

@Friendseeker Friendseeker force-pushed the Ichoran-fixed-Gzip-stream branch from 2290dee to 90bbcba Compare October 12, 2024 08:18
val compressedSize = d.length()
val compressionRatio = compressedSize.toDouble / uncompressedSize.toDouble
assert(compressionRatio < 0.85)
// compression rate for each data: 0.8185090254676337, 0.7247774786370688, 0.8346021341469837
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compression rate looks somewhat underwhelming... But we only have 3 data points. Maybe Stefan's internal testing at databricks showed better compression rates that motivated the introduction of GZIP compression.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially related comment on this #1326 (comment)

The gzip compression is very slow relative to everything else, despite gzip with native zlib being fast and using the best performance/size tradeoff in he compression settings as determined by benchmarks. Or maybe I should say: The new serializer and deserializer are very fast compared to gzip.

I reran a quick benchmark. Writing without compression takes 153ms, writing with Java's standard GZIPOutputStream is 402ms, writing with ParallelGzipOutputStream is 157ms. The latter is on a 12-core M2 Max (but with only 230% CPU usage; that's enough to offload all the compression to background threads, making it almost free in terms of wall clock time).

The compression level currently set is

  private val compression = Deflater.DEFAULT_COMPRESSION

So my read is that by using parallel gzip, it's not like we get an amazing compression but we get it almost without additional time penalty.

@Friendseeker
Copy link
Member Author

Transient CI failure observed

[info] - should handle multiple ParallelGzipOutputStream concurrently *** FAILED ***
[info]   sbt.internal.io.TranslatedIOException: Error wrapping InputStream in GZIPInputStream: java.util.zip.ZipException: Not in GZIP format
[info]   at sbt.internal.io.ErrorHandling$.translate(ErrorHandling.scala:21)
[info]   at sbt.io.WrapUsing.open(Using.scala:44)
[info]   at sbt.io.Using.apply(Using.scala:26)
[info]   at sbt.inc.consistent.ParallelGzipOutputStreamSpecification.decompress(ParallelGzipOutputStreamSpecification.scala:39)
[info]   at sbt.inc.consistent.ParallelGzipOutputStreamSpecification.verifyRoundTrip(ParallelGzipOutputStreamSpecification.scala:55)

@Ichoran
Copy link

Ichoran commented Oct 14, 2024

Well that error isn't very informative :/ Is there any way to get your hands on the file so we can inspect what went wrong? "Not in GZip format" could be anything. Is that a SBT message, or is it wrapping the Java error without the full stack trace?

@Friendseeker
Copy link
Member Author

Friendseeker commented Oct 14, 2024

@Ichoran

Here's the full stack trace for documentation. It does not tell much other than the fact that the GZIP is not properly compressed.

[info] - should handle multiple ParallelGzipOutputStream concurrently *** FAILED ***
[info]   sbt.internal.io.TranslatedIOException: Error wrapping InputStream in GZIPInputStream: java.util.zip.ZipException: Not in GZIP format
[info]   at sbt.internal.io.ErrorHandling$.translate(ErrorHandling.scala:21)
[info]   at sbt.io.WrapUsing.open(Using.scala:44)
[info]   at sbt.io.Using.apply(Using.scala:26)
[info]   at sbt.inc.consistent.ParallelGzipOutputStreamSpecification.decompress(ParallelGzipOutputStreamSpecification.scala:39)
[info]   at sbt.inc.consistent.ParallelGzipOutputStreamSpecification.verifyRoundTrip(ParallelGzipOutputStreamSpecification.scala:55)
[info]   at sbt.inc.consistent.ParallelGzipOutputStreamSpecification.$anonfun$2$$anonfun$1(ParallelGzipOutputStreamSpecification.scala:111)
[info]   at sbt.inc.consistent.ParallelGzipOutputStreamSpecification.$anonfun$2$$anonfun$adapted$1(ParallelGzipOutputStreamSpecification.scala:113)
[info]   at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:687)
[info]   at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
[info]   at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
[info]   ...
[info]   Cause: java.util.zip.ZipException: Not in GZIP format
[info]   at java.util.zip.GZIPInputStream.readHeader(GZIPInputStream.java:165)
[info]   at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:79)
[info]   at sbt.io.Using$.$init$$$anonfun$10(Using.scala:124)
[info]   at sbt.io.Using$$anon$1.openImpl(Using.scala:70)
[info]   at sbt.io.WrapUsing.open$$anonfun$2(Using.scala:44)
[info]   at sbt.internal.io.ErrorHandling$.translate(ErrorHandling.scala:19)
[info]   at sbt.io.WrapUsing.open(Using.scala:44)
[info]   at sbt.io.Using.apply(Using.scala:26)
[info]   at sbt.inc.consistent.ParallelGzipOutputStreamSpecification.decompress(ParallelGzipOutputStreamSpecification.scala:39)
[info]   at sbt.inc.consistent.ParallelGzipOutputStreamSpecification.verifyRoundTrip(ParallelGzipOutputStreamSpecification.scala:55)
[info]   ...

I made a test branch to gather more information. I managed to reproduce another failure involving small data (which may be helpful for debugging)

[info] - should handle highly redundant data correctly *** FAILED ***
[info]   Decompression failed. See compressed_parallelism = 4, size = 0, redundant.gz and data_parallelism = 4, size = 0, redundant.bin (ParallelGzipOutputStreamSpecification.scala:80)

(size refers to the size of uncompressed data, aka the failure happened when an empty byte array was compressed)

Uncompressed data & improperly compressed gzip for the above failure in https://github.com/Friendseeker/zinc/actions/runs/11334286709/artifacts/2054937941

The improperly compressed GZIP has 10 bytes

03 00 00 00 00 00 00 00 00 00

@Friendseeker
Copy link
Member Author

Friendseeker commented Oct 14, 2024

I think I might know what is happening with size = 0 failure. When scribe.start() is called, if the stream is already closed, we never write the GZIP header, and we only write the trailer.

Hopefully that is the only failure.

@Ichoran
Copy link

Ichoran commented Oct 14, 2024

Oh, it's possible to have this try to write a zero-byte file? I didn't even consider that as a use case. "Cache nothing by writing it into a file" didn't seem like a desirable way to run things. I can look over my code again to make sure it handles that anyway, if you'd like.

@Friendseeker
Copy link
Member Author

Oh, it's possible to have this try to write a zero-byte file? I didn't even consider that as a use case. "Cache nothing by writing it into a file" didn't seem like a desirable way to run things. I can look over my code again to make sure it handles that anyway, if you'd like.

Yeah I don't think it happens. I was just doing smoke testing with some boundary inputs.

It is fine leaving this alone then. I can add a comment saying it does not support 0 byte input.

@Friendseeker Friendseeker force-pushed the Ichoran-fixed-Gzip-stream branch 3 times, most recently from 9c7766a to 86e8122 Compare October 15, 2024 00:24
@Friendseeker Friendseeker marked this pull request as ready for review October 15, 2024 02:29
Copy link
Member

@eed3si9n eed3si9n left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks both!

@Friendseeker Friendseeker force-pushed the Ichoran-fixed-Gzip-stream branch from aa66f40 to d35e028 Compare October 15, 2024 16:41
@eed3si9n eed3si9n merged commit 41dd74e into sbt:develop Oct 15, 2024
8 checks passed
@Friendseeker Friendseeker deleted the Ichoran-fixed-Gzip-stream branch October 15, 2024 18:09
Friendseeker added a commit to Friendseeker/zinc that referenced this pull request Oct 18, 2024
Friendseeker added a commit to Friendseeker/zinc that referenced this pull request Oct 18, 2024
Friendseeker added a commit to Friendseeker/zinc that referenced this pull request Oct 18, 2024
Friendseeker added a commit to Friendseeker/zinc that referenced this pull request Oct 18, 2024
@szeiger
Copy link
Contributor

szeiger commented Oct 21, 2024

Were you able to identify the situation where my original implementation causes a deadlock? We're building around 10 million targets per day with it and haven't encountered any problems.

@eed3si9n
Copy link
Member

I think the Zinc benchmark got stuck? Also I wonder if it might be more to do with EC implementation. Like thread pool vs fork/join.

@Ichoran
Copy link

Ichoran commented Oct 21, 2024

Were you able to identify the situation where my original implementation causes a deadlock? We're building around 10 million targets per day with it and haven't encountered any problems.

I read through the code carefully and I didn't see any conceptual problems with it. Since it was JVM 8 specific and behavior varied with execution context, and you used ThreadLocal, I assumed that there was some flaw in scheduling and/or ThreadLocal usage with Future.

Friendseeker tested with a small fixed-pool execution context and it blocked every time. There's really no reason that should happen at all: the futures themselves do nothing that ought to block, and the calling thread blocks until the lead future is done, which should at the very worst case happen when all work is done (e.g. if there is one thread to work on and everything happens in the worst order).

So I could only assume that behind the scenes something about the ThreadLocal access was somehow using blocking operations that required more threads to unwind the problem (ForkJoinPool had intermittent failures).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants