-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Speed up streamed-proto query output by distributing work to multiple threads #24305
base: master
Are you sure you want to change the base?
Changes from 5 commits
6bd312b
41fbacf
0c9a1d2
913d4a3
9843a5e
9a0efa0
1852be0
89e8b3b
5fc8b13
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,15 +16,31 @@ | |
import com.google.devtools.build.lib.packages.LabelPrinter; | ||
import com.google.devtools.build.lib.packages.Target; | ||
import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback; | ||
import com.google.devtools.build.lib.query2.proto.proto2api.Build; | ||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.io.OutputStream; | ||
import java.util.stream.StreamSupport; | ||
|
||
/** | ||
* An output formatter that outputs a protocol buffer representation of a query result and outputs | ||
* the proto bytes to the output print stream. By taking the bytes and calling {@code mergeFrom()} | ||
* on a {@code Build.QueryResult} object the full result can be reconstructed. | ||
*/ | ||
public class StreamedProtoOutputFormatter extends ProtoOutputFormatter { | ||
|
||
/** | ||
* The most bytes that protobuf delimited proto format will prepend to a proto message. See <a | ||
* href="https://github.com/protocolbuffers/protobuf/blob/c11033dc27c3e9c1913e45b62fb5d4c5b5644b3e/java/core/src/main/java/com/google/protobuf/AbstractMessageLite.java#L72"> | ||
* <code>writeDelimitedTo</code></a> and <a | ||
* href="https://github.com/protocolbuffers/protobuf/blob/c11033dc27c3e9c1913e45b62fb5d4c5b5644b3e/java/core/src/main/java/com/google/protobuf/WireFormat.java#L28"> | ||
* <code>MAX_VARINT32_SIZE</code></a>. | ||
* | ||
* <p>The value for int32 (used by {@code writeDelimitedTo} is actually 5, but we pick 10 just to | ||
* be safe. | ||
*/ | ||
private static final int MAX_BYTES_FOR_VARINT32_ENCODING = 10; | ||
|
||
@Override | ||
public String getName() { | ||
return "streamed_proto"; | ||
|
@@ -34,13 +50,76 @@ public String getName() { | |
public OutputFormatterCallback<Target> createPostFactoStreamCallback( | ||
final OutputStream out, final QueryOptions options, LabelPrinter labelPrinter) { | ||
return new OutputFormatterCallback<Target>() { | ||
private final LabelPrinter ourLabelPrinter = labelPrinter; | ||
|
||
@Override | ||
public void processOutput(Iterable<Target> partialResult) | ||
throws IOException, InterruptedException { | ||
for (Target target : partialResult) { | ||
toTargetProtoBuffer(target, labelPrinter).writeDelimitedTo(out); | ||
try { | ||
StreamSupport.stream(partialResult.spliterator(), /* parallel= */ true) | ||
.map(this::toProto) | ||
.map(StreamedProtoOutputFormatter::writeDelimited) | ||
// I imagine forEachOrdered hurts performance somewhat in some cases. While we may | ||
// not need to actually produce output in order, this code does not know whether | ||
// ordering was requested. So we just always write it in order, and hope performance | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it actually does know, since we have access to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but there are a few options that together can influence output order AFAICT |
||
// is OK. | ||
.forEachOrdered(this::writeToOutputStreamThreadSafe); | ||
} catch (WrappedIOException e) { | ||
throw e.getCause(); | ||
} catch (WrappedInterruptedException e) { | ||
throw e.getCause(); | ||
} | ||
} | ||
|
||
private Build.Target toProto(Target target) { | ||
try { | ||
return toTargetProtoBuffer(target, ourLabelPrinter); | ||
} catch (InterruptedException e) { | ||
throw new WrappedInterruptedException(e); | ||
} | ||
} | ||
|
||
private synchronized void writeToOutputStreamThreadSafe(ByteArrayOutputStream bout) { | ||
try { | ||
bout.writeTo(out); | ||
} catch (IOException e) { | ||
throw new WrappedIOException(e); | ||
} | ||
} | ||
}; | ||
} | ||
|
||
private static ByteArrayOutputStream writeDelimited(Build.Target targetProtoBuffer) { | ||
try { | ||
var bout = | ||
new ByteArrayOutputStream( | ||
targetProtoBuffer.getSerializedSize() + MAX_BYTES_FOR_VARINT32_ENCODING); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather than always having a 10 byte margin, you can use However, maybe you should just bypass going through the overhead of var serializedSize = targetProtoBuffer.getSerializedSize();
var headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize);
var output = new byte[headerSize + serializedSize];
targetProtoBuffer.writeTo(CodedOutputStream.newInstance(output, headerSize, output.length - headerSize));
return output; There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am up for this, but I'd argue this makes the code harder to read and maintain, and may not have any noticeable performance benefit. It also might have subtle bugs and could require more rigorous testing compared with the PR in its current form. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of doing the length-delimiting here, you can just serialize to byte array (
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I went with your first suggestion, so I didn't need a big comment explaining why writing a byte array without a tag was equivalent to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately it crashes when run on our 700k-target workspace. This highlights a bigger problem with my approach: there is a risk of building up a huge backlog of objects to be written and OOMing like this. I know other parts of the codebase use RxJava - would it be acceptable to do so here? Otherwise I could try to come up with some kind of throttling or batching system.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about using a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good idea! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on my experience with the internal code I referenced elsewhere, I think some batching would go a long way - right now we're encountering a lot of overhead by having a separate task for each record + calling to write each record to the stream individually. Putting it all together, I think the simplest case is to (1) parallelize the formatting and produce batches of WRT a bounded If we could avoid RxJava here for now I think that would be preferred. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I haven't reviewed this pull request in detail, but I nevertheless do have an opinion about RxJava, which is "don't". We added that dependency before virtual threads were available and it did not live up to our expectations and now we'd much like to cut that dependency if we could. I do realize RxJava does way more than virtual threads, but it's also a whole lot of new concepts to grasp for marginal benefit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for your feedback everyone! I updated the PR with a solution which uses ~minimal memory and is just as performant as my original PR. I don't think it's too complex, but you can be the judge of that :) |
||
targetProtoBuffer.writeDelimitedTo(bout); | ||
return bout; | ||
} catch (IOException e) { | ||
throw new WrappedIOException(e); | ||
} | ||
} | ||
|
||
private static class WrappedIOException extends RuntimeException { | ||
private WrappedIOException(IOException cause) { | ||
super(cause); | ||
} | ||
|
||
@Override | ||
public IOException getCause() { | ||
return (IOException) super.getCause(); | ||
} | ||
} | ||
|
||
private static class WrappedInterruptedException extends RuntimeException { | ||
private WrappedInterruptedException(InterruptedException cause) { | ||
super(cause); | ||
} | ||
|
||
@Override | ||
public InterruptedException getCause() { | ||
return (InterruptedException) super.getCause(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not familiar with parallel streams, but I think if this is interrupted then only one of the parallel items sees it and exits, while the rest will carry on. I don't think we want this since it will leave lingering threads doing formatting and writing to the output possibly beyond the output being closed or the command itself having exited. I think encountering an IOException has a similar issue - every thread will keep going and hitting the io exception even after this has exited.
We have a similar parallel formatting implementation for some internal code1 - IIRC we use
close
as a synchronization point to make sure that nothing is left behind fromprocessOutput
, as well as to reconcile any concurrent or cascading exceptions. What you're trying to do might be different enough that you can avoid this, but I fear you also might not get so lucky.Footnotes
A little too tightly coupled with some internal-only code to easily open source, coming from having tried a while ago ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, worth investigating. I will test ctrl+c'ing out of it. If needed I will make a little test case to determine runtime behavior of interrupts on parallel streams.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ctrl+c does not respond quickly! I will need to find a better solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: ctrl+c responds immediately with the current
ForkJoinPool
-based implementation!