-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PERF] Using the BroadcastHub with thousands of consumers seems painfully slow. #7253
Comments
Thank you for the reproduction! I'll see what we can do here. |
The problem in the sample code is here: return signalValueProducer
.Where(producedValue => // <-- This is very bad
{
// We only care about a specific value in this flow/stream
return valueToWatch == producedValue;
})
.RunForeach(y =>
{
// Remove from list, can be called multiple times with same value.
streamValuesToRemove.Remove(y);
}, materializer); The The higher the |
The provided sample code, for completeness: async Task Main()
{
var streamCount = 2000;
var actorSystem = ActorSystem.Create("bla");
IMaterializer materializer = actorSystem.Materializer();
//Create list of values to watch 0-signalCount
var streamValuesToWatch = Enumerable
.Range(0, streamCount)
.Select(e => e.ToString())
.ToList();
//On every tick get a value from streamValuesToWatch
var index = 0;
var streamValuesSource = Source
.Tick(TimeSpan.FromSeconds(0), TimeSpan.FromMilliseconds(50), "")
.Select(i => streamValuesToWatch[index++ % streamCount])
.Take(streamCount);
// Create a thread safe collection to monitor if a value has been seen by is observer, if removed its been seen.
var streamValuesToRemove = streamValuesToWatch
.Select(x => (x, x))
.ToHashMap()
.ToAtom();
// Create a broadcasthub so we can create a consumer for each streamValuesToWatch
Source<string, NotUsed> signalValueProducer = streamValuesSource
.ToMaterialized(BroadcastHub.Sink<string>(bufferSize: 1024), Keep.Right)
.Run(materializer);
// Create each consumer
var tasks = streamValuesToWatch
.Select(valueToWatch =>
{
return signalValueProducer
.Where(producedValue =>
{
// We only care about a specific value in this flow/stream
return valueToWatch == producedValue;
})
.RunForeach(y =>
{
// Remove from list, can be called multiple times with same value.
streamValuesToRemove.Remove(y);
}, materializer);
}).ToList();
// Used to monitor current value to be processed.
signalValueProducer
.RunWith(Sink.AsObservable<string>(), materializer)
.DumpLatest("CurrentValue");
// Dump the Count for signalsToRemove, should shrink when value has been seen.
Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(o => streamValuesToRemove.Count)
.DumpLatest("signalsToRemove");
// Wait for all tasks. they should all complete when all n-signalCount items have been produced???
await Task.WhenAll(tasks);
} |
Here is a better solution for the provided example: LinqPad |
Hi sorry for the late reply, i have had a look at your implementation and if i'm understanding it correctly its a bit different and i'm not sure on the performance. We were trying to test the throughput of a stream when 2000 consumers are looking at each value and either working on it or discarding it. In our example, working on it was removing the value from a collection and discarding it would be do nothing with it. From this image, using example provided here:
You can see it take 3 minutes to process 15 values... this to me would create back pressure and in our scenario fill our queue, as we have values being placed on the queue with a heart beat every 10 seconds. I hope i'm making sense and i have understood correctly? In the mean time we opted for a different solution using Async-enumerable and System.Threading.Channel . chris |
I think there's a misunderstanding on how a source, a broadcast hub, and how a LINQ statement works. var tasks = streamValuesToWatch
.Select(valueToWatch =>
{
return signalValueProducer
.Where(producedValue =>
{
// We only care about a specific value in this flow/stream
return valueToWatch == producedValue;
})
.RunForeach(y =>
{
// Remove from list, can be called multiple times with same value.
streamValuesToRemove.Remove(y);
}, materializer);
}).ToList(); Basically, what you're doing is
Observation:
Note: The |
So I tried this out myself and was able to get the application to process everything in about 3:30, which is still too slow IMHO: https://share.linqpad.net/qa5jrmw9.linq Trick was to make sure we didn't start producing until after all of the consumers were attached. The problem appears to be, IMHO, that this system is lossy or slow - that there's consumers who are being put on pause while the hub is distributing work to others. |
I think the problem is here: akka.net/src/core/Akka.Streams/Dsl/Hub.cs Lines 940 to 948 in 1af82a7
The code we use for determining which consumers get which message looks complicated to me - and based on my profiling data we're not doing anything particularly CPU intensive. Looks like there's a lot of timer-driven waiting going on inside the hub itself. |
@RollsChris There might be a bug in our code after all, we'll re-assess the broadcast hub internal implementation |
This may also benefit from a Box<T> wrapper for the queue itself due to
variance checks on arrays.
Probably a small opt but worth mentioning while we are here...
…On Wed, Aug 7, 2024, 1:42 PM Aaron Stannard ***@***.***> wrote:
I think the problem is here:
https://github.com/akkadotnet/akka.net/blob/1af82a75c58d943f0fe802b566dbfe8b78e34203/src/core/Akka.Streams/Dsl/Hub.cs#L940-L948
The code we use for determining which consumers get which message looks
complicated to me - and based on my profiling data we're not doing anything
particularly CPU intensive. Looks like there's a lot of timer-driven
waiting going on inside the hub itself.
—
Reply to this email directly, view it on GitHub
<#7253 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AC7UYVPVF7CAFLTAVTSCE5DZQJL7FAVCNFSM6AAAAABJHZA3RCVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDENZTHE4TKMZZGU>
.
You are receiving this because you are subscribed to this thread.Message
ID: ***@***.***>
|
Also bounds checks with calloffs to throwhelpers if needed...
…On Wed, Aug 7, 2024, 2:55 PM Andrew Laing ***@***.***> wrote:
This may also benefit from a Box<T> wrapper for the queue itself due to
variance checks on arrays.
Probably a small opt but worth mentioning while we are here...
On Wed, Aug 7, 2024, 1:42 PM Aaron Stannard ***@***.***>
wrote:
> I think the problem is here:
>
>
> https://github.com/akkadotnet/akka.net/blob/1af82a75c58d943f0fe802b566dbfe8b78e34203/src/core/Akka.Streams/Dsl/Hub.cs#L940-L948
>
> The code we use for determining which consumers get which message looks
> complicated to me - and based on my profiling data we're not doing anything
> particularly CPU intensive. Looks like there's a lot of timer-driven
> waiting going on inside the hub itself.
>
> —
> Reply to this email directly, view it on GitHub
> <#7253 (comment)>,
> or unsubscribe
> <https://github.com/notifications/unsubscribe-auth/AC7UYVPVF7CAFLTAVTSCE5DZQJL7FAVCNFSM6AAAAABJHZA3RCVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDENZTHE4TKMZZGU>
> .
> You are receiving this because you are subscribed to this thread.Message
> ID: ***@***.***>
>
|
Thanks for the help, my gut feeling is that it should be a lot faster. We are looking at having 14000 consumers, with potentially 14000 updates every 10 seconds ( and that's the best case with a 1-1 mapping). Id also like to show you are alternative method but not got time at the moment to make a demo app. |
The performance issues have nothing to do with boxing or any measurable CPU issues - it's a scheduling problem. There's long periods of time where the consumers aren't doing anything. I think the algorithm for waking up / sleeping the consumers is off. |
Version Information
Akka.Streams="1.5.20"
Describe the performance issue
I'm trying to create thousands of consumers for a BroadCastHub.
Each consumer listens to the stream and checks if they care about the current item.
As the number increases the performance slows considerably.
We think the buffer plays apart because as soon as we go past the buffer value things get really slow but we are unsure.
We also think that when the buffer gets full there is a potential for it to drop messages, as some values we never see.
Here is a LinqPad URL to demo the issue: LinqPad
Expected behavior
Should be able to process 14000 messages across 14000 consumers using a BroadCastHub within seconds
Actual behavior
Becomes un-usable in the thousands.
Environment
LinqPad demo
We are using Linux docker images in product, same performance issue.
The text was updated successfully, but these errors were encountered: