Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a ProducerConsumer helper for processing a sequence of items in parallel. #73323

Merged
merged 12 commits into from
May 3, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -222,23 +222,20 @@ private static async Task FindResultsInUnreferencedProjectSourceSymbolsAsync(

// Defer to the ProducerConsumer. We're search the unreferenced projects in parallel. As we get results, we'll
// add them to the 'allSymbolReferences' queue. If we get enough results, we'll cancel all the other work.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

view with whitespace off.

await ProducerConsumer<ImmutableArray<SymbolReference>>.RunAsync(
ProducerConsumerOptions.SingleReaderOptions,
produceItems: static (onItemsFound, args) => RoslynParallel.ForEachAsync(
args.viableUnreferencedProjects,
args.linkedTokenSource.Token,
async (project, cancellationToken) =>
{
// Search in this unreferenced project. But don't search in any of its' direct references. i.e. we
// don't want to search in its metadata references or in the projects it references itself. We'll be
// searching those entities individually.
var references = await args.finder.FindInSourceSymbolsInProjectAsync(
args.projectToAssembly, project, args.exact, cancellationToken).ConfigureAwait(false);
onItemsFound(references);
}),
await ProducerConsumer<ImmutableArray<SymbolReference>>.RunParallelAsync(
viableUnreferencedProjects,
produceItems: static async (project, onItemsFound, args) =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this form is much easier to understand (for me at least). instead of hte nested lambdas, you just pass hte single lambda. It will be called in parallel, passing a single item from "viableUnreferencedProjects", and teh callback to call when items are produced based on htat item.

{
// Search in this unreferenced project. But don't search in any of its' direct references. i.e. we
// don't want to search in its metadata references or in the projects it references itself. We'll be
// searching those entities individually.
var references = await args.finder.FindInSourceSymbolsInProjectAsync(
args.projectToAssembly, project, args.exact, args.linkedTokenSource.Token).ConfigureAwait(false);
onItemsFound(references);
},
consumeItems: static (symbolReferencesEnumerable, args) =>
ProcessReferencesAsync(args.allSymbolReferences, args.maxResults, symbolReferencesEnumerable, args.linkedTokenSource),
args: (projectToAssembly, allSymbolReferences, maxResults, finder, exact, viableUnreferencedProjects, linkedTokenSource),
args: (projectToAssembly, allSymbolReferences, maxResults, finder, exact, linkedTokenSource),
linkedTokenSource.Token).ConfigureAwait(false);
}

Expand Down Expand Up @@ -266,26 +263,23 @@ private async Task FindResultsInUnreferencedMetadataSymbolsAsync(

// Defer to the ProducerConsumer. We're search the metadata references in parallel. As we get results, we'll
// add them to the 'allSymbolReferences' queue. If we get enough results, we'll cancel all the other work.
await ProducerConsumer<ImmutableArray<SymbolReference>>.RunAsync(
ProducerConsumerOptions.SingleReaderOptions,
produceItems: static (onItemsFound, args) => RoslynParallel.ForEachAsync(
args.newReferences,
args.linkedTokenSource.Token,
async (tuple, cancellationToken) =>
{
var (referenceProject, reference) = tuple;
var compilation = args.referenceToCompilation.GetOrAdd(
reference, r => CreateCompilation(args.project, r));

// Ignore netmodules. First, they're incredibly esoteric and barely used.
// Second, the SymbolFinder API doesn't even support searching them.
if (compilation.GetAssemblyOrModuleSymbol(reference) is not IAssemblySymbol assembly)
return;

var references = await args.finder.FindInMetadataSymbolsAsync(
assembly, referenceProject, reference, args.exact, args.linkedTokenSource.Token).ConfigureAwait(false);
onItemsFound(references);
}),
await ProducerConsumer<ImmutableArray<SymbolReference>>.RunParallelAsync(
newReferences,
produceItems: static async (tuple, onItemsFound, args) =>
{
var (referenceProject, reference) = tuple;
var compilation = args.referenceToCompilation.GetOrAdd(
reference, r => CreateCompilation(args.project, r));

// Ignore netmodules. First, they're incredibly esoteric and barely used.
// Second, the SymbolFinder API doesn't even support searching them.
if (compilation.GetAssemblyOrModuleSymbol(reference) is not IAssemblySymbol assembly)
return;

var references = await args.finder.FindInMetadataSymbolsAsync(
assembly, referenceProject, reference, args.exact, args.linkedTokenSource.Token).ConfigureAwait(false);
onItemsFound(references);
},
consumeItems: static (symbolReferencesEnumerable, args) =>
ProcessReferencesAsync(args.allSymbolReferences, args.maxResults, symbolReferencesEnumerable, args.linkedTokenSource),
args: (referenceToCompilation, project, allSymbolReferences, maxResults, finder, exact, newReferences, linkedTokenSource),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,45 +121,41 @@ public async Task<ImmutableArray<CodeRefactoring>> GetRefactoringsAsync(
foreach (var provider in orderedProviders)
providerToIndex.Add(provider, providerToIndex.Count);

await ProducerConsumer<(CodeRefactoringProvider provider, CodeRefactoring codeRefactoring)>.RunAsync(
ProducerConsumerOptions.SingleReaderOptions,
produceItems: static (callback, args) =>
await ProducerConsumer<(CodeRefactoringProvider provider, CodeRefactoring codeRefactoring)>.RunParallelAsync(
orderedProviders,
produceItems: static async (provider, callback, args) =>
{
// Run all providers in parallel to get the set of refactorings for this document.
RoslynParallel.ForEachAsync(
args.orderedProviders,
args.cancellationToken,
async (provider, cancellationToken) =>
{
// Log an individual telemetry event for slow code refactoring computations to
// allow targeted trace notifications for further investigation. 500 ms seemed like
// a good value so as to not be too noisy, but if fired, indicates a potential
// area requiring investigation.
const int CodeRefactoringTelemetryDelay = 500;

var providerName = provider.GetType().Name;

var logMessage = KeyValueLogMessage.Create(m =>
{
m[TelemetryLogging.KeyName] = providerName;
m[TelemetryLogging.KeyLanguageName] = args.document.Project.Language;
});

using (args.addOperationScope(providerName))
using (RoslynEventSource.LogInformationalBlock(FunctionId.Refactoring_CodeRefactoringService_GetRefactoringsAsync, providerName, cancellationToken))
using (TelemetryLogging.LogBlockTime(FunctionId.CodeRefactoring_Delay, logMessage, CodeRefactoringTelemetryDelay))
{
var refactoring = await [email protected](
args.document, args.state, provider, args.options, cancellationToken).ConfigureAwait(false);
if (refactoring != null)
callback((provider, refactoring));
}
}),
// Log an individual telemetry event for slow code refactoring computations to
// allow targeted trace notifications for further investigation. 500 ms seemed like
// a good value so as to not be too noisy, but if fired, indicates a potential
// area requiring investigation.
const int CodeRefactoringTelemetryDelay = 500;

var providerName = provider.GetType().Name;

var logMessage = KeyValueLogMessage.Create(m =>
{
m[TelemetryLogging.KeyName] = providerName;
m[TelemetryLogging.KeyLanguageName] = args.document.Project.Language;
});

using (args.addOperationScope(providerName))
using (RoslynEventSource.LogInformationalBlock(FunctionId.Refactoring_CodeRefactoringService_GetRefactoringsAsync, providerName, args.cancellationToken))
using (TelemetryLogging.LogBlockTime(FunctionId.CodeRefactoring_Delay, logMessage, CodeRefactoringTelemetryDelay))
{
var refactoring = await [email protected](
args.document, args.state, provider, args.options, args.cancellationToken).ConfigureAwait(false);
if (refactoring != null)
callback((provider, refactoring));
}
},
consumeItems: static async (reader, args) =>
{
await foreach (var pair in reader)
args.pairs.Add(pair);
},
args: (@this: this, document, state, orderedProviders, options, addOperationScope, pairs, cancellationToken),
args: (@this: this, document, state, options, addOperationScope, pairs, cancellationToken),
cancellationToken).ConfigureAwait(false);

return pairs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ private static Task PerformParallelSearchAsync<T>(
Func<T, Action<RoslynNavigateToItem>, ValueTask> callback,
Func<ImmutableArray<RoslynNavigateToItem>, Task> onItemsFound,
CancellationToken cancellationToken)
=> ProducerConsumer<RoslynNavigateToItem>.RunAsync(
ProducerConsumerOptions.SingleReaderOptions,
produceItems: static (onItemFound, args) => RoslynParallel.ForEachAsync(args.items, args.cancellationToken, (item, cancellationToken) => args.callback(item, onItemFound)),
=> ProducerConsumer<RoslynNavigateToItem>.RunParallelAsync(
items,
produceItems: static async (item, onItemFound, args) => await args.callback(item, onItemFound).ConfigureAwait(false),
consumeItems: static (items, args) => args.onItemsFound(items),
args: (items, callback, onItemsFound, cancellationToken),
cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,54 @@ public static Task RunAsync<TArgs>(
cancellationToken);
}

/// <summary>
/// Version of RunAsync that will process <paramref name="source"/> in parallel.
/// </summary>
public static Task RunParallelAsync<TSource, TArgs>(
CyrusNajmabadi marked this conversation as resolved.
Show resolved Hide resolved
IEnumerable<TSource> source,
Func<TSource, Action<TItem>, TArgs, Task> produceItems,
Func<IAsyncEnumerable<TItem>, TArgs, Task> consumeItems,
TArgs args,
CancellationToken cancellationToken)
{
return RunAsync(
// We're running in parallel, so we def have multiple writers
ProducerConsumerOptions.SingleReaderOptions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProducerConsumerOptions.SingleReaderOptions

Why not let the caller specify?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never mind, I can see clearly now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:-D

produceItems: static (callback, args) =>
RoslynParallel.ForEachAsync(
args.source,
args.cancellationToken,
async (source, cancellationToken) =>
await args.produceItems(source, callback, args.args).ConfigureAwait(false)),
consumeItems: static (enumerable, args) => args.consumeItems(enumerable, args.args),
args: (source, produceItems, consumeItems, args, cancellationToken),
cancellationToken);
}

/// <summary>
/// Version of RunAsync that will process <paramref name="source"/> in parallel.
/// </summary>
public static Task RunParallelAsync<TSource, TArgs>(
IEnumerable<TSource> source,
Func<TSource, Action<TItem>, TArgs, Task> produceItems,
Func<ImmutableArray<TItem>, TArgs, Task> consumeItems,
TArgs args,
CancellationToken cancellationToken)
{
return RunAsync(
// We're running in parallel, so we def have multiple writers
ProducerConsumerOptions.SingleReaderOptions,
produceItems: static (callback, args) =>
RoslynParallel.ForEachAsync(
args.source,
args.cancellationToken,
async (source, cancellationToken) =>
await args.produceItems(source, callback, args.args).ConfigureAwait(false)),
consumeItems: static (enumerable, args) => args.consumeItems(enumerable, args.args),
args: (source, produceItems, consumeItems, args, cancellationToken),
cancellationToken);
}

/// <summary>
/// Helper utility for the pattern of a pair of a production routine and consumption routine using a channel to
/// coordinate data transfer. The provided <paramref name="options"/> are used to create a <see
Expand Down
Loading