Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a ProducerConsumer helper for processing a sequence of items in parallel. #73323

Merged
merged 12 commits into from
May 3, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -222,23 +222,20 @@ private static async Task FindResultsInUnreferencedProjectSourceSymbolsAsync(

// Defer to the ProducerConsumer. We're search the unreferenced projects in parallel. As we get results, we'll
// add them to the 'allSymbolReferences' queue. If we get enough results, we'll cancel all the other work.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

view with whitespace off.

await ProducerConsumer<ImmutableArray<SymbolReference>>.RunAsync(
ProducerConsumerOptions.SingleReaderOptions,
produceItems: static (onItemsFound, args) => RoslynParallel.ForEachAsync(
args.viableUnreferencedProjects,
args.linkedTokenSource.Token,
async (project, cancellationToken) =>
{
// Search in this unreferenced project. But don't search in any of its' direct references. i.e. we
// don't want to search in its metadata references or in the projects it references itself. We'll be
// searching those entities individually.
var references = await args.finder.FindInSourceSymbolsInProjectAsync(
args.projectToAssembly, project, args.exact, cancellationToken).ConfigureAwait(false);
onItemsFound(references);
}),
consumeItems: static (symbolReferencesEnumerable, args) =>
await ProducerConsumer<ImmutableArray<SymbolReference>>.RunParallelAsync(
source: viableUnreferencedProjects,
produceItems: static async (project, onItemsFound, args, cancellationToken) =>
{
// Search in this unreferenced project. But don't search in any of its' direct references. i.e. we
// don't want to search in its metadata references or in the projects it references itself. We'll be
// searching those entities individually.
var references = await args.finder.FindInSourceSymbolsInProjectAsync(
args.projectToAssembly, project, args.exact, cancellationToken).ConfigureAwait(false);
onItemsFound(references);
},
consumeItems: static (symbolReferencesEnumerable, args, cancellationToken) =>
ProcessReferencesAsync(args.allSymbolReferences, args.maxResults, symbolReferencesEnumerable, args.linkedTokenSource),
args: (projectToAssembly, allSymbolReferences, maxResults, finder, exact, viableUnreferencedProjects, linkedTokenSource),
args: (projectToAssembly, allSymbolReferences, maxResults, finder, exact, linkedTokenSource),
linkedTokenSource.Token).ConfigureAwait(false);
}

Expand Down Expand Up @@ -266,27 +263,24 @@ private async Task FindResultsInUnreferencedMetadataSymbolsAsync(

// Defer to the ProducerConsumer. We're search the metadata references in parallel. As we get results, we'll
// add them to the 'allSymbolReferences' queue. If we get enough results, we'll cancel all the other work.
await ProducerConsumer<ImmutableArray<SymbolReference>>.RunAsync(
ProducerConsumerOptions.SingleReaderOptions,
produceItems: static (onItemsFound, args) => RoslynParallel.ForEachAsync(
args.newReferences,
args.linkedTokenSource.Token,
async (tuple, cancellationToken) =>
{
var (referenceProject, reference) = tuple;
var compilation = args.referenceToCompilation.GetOrAdd(
reference, r => CreateCompilation(args.project, r));

// Ignore netmodules. First, they're incredibly esoteric and barely used.
// Second, the SymbolFinder API doesn't even support searching them.
if (compilation.GetAssemblyOrModuleSymbol(reference) is not IAssemblySymbol assembly)
return;

var references = await args.finder.FindInMetadataSymbolsAsync(
assembly, referenceProject, reference, args.exact, args.linkedTokenSource.Token).ConfigureAwait(false);
onItemsFound(references);
}),
consumeItems: static (symbolReferencesEnumerable, args) =>
await ProducerConsumer<ImmutableArray<SymbolReference>>.RunParallelAsync(
source: newReferences,
produceItems: static async (tuple, onItemsFound, args, cancellationToken) =>
{
var (referenceProject, reference) = tuple;
var compilation = args.referenceToCompilation.GetOrAdd(
reference, r => CreateCompilation(args.project, r));

// Ignore netmodules. First, they're incredibly esoteric and barely used.
// Second, the SymbolFinder API doesn't even support searching them.
if (compilation.GetAssemblyOrModuleSymbol(reference) is not IAssemblySymbol assembly)
return;

var references = await args.finder.FindInMetadataSymbolsAsync(
assembly, referenceProject, reference, args.exact, cancellationToken).ConfigureAwait(false);
onItemsFound(references);
},
consumeItems: static (symbolReferencesEnumerable, args, cancellationToken) =>
ProcessReferencesAsync(args.allSymbolReferences, args.maxResults, symbolReferencesEnumerable, args.linkedTokenSource),
args: (referenceToCompilation, project, allSymbolReferences, maxResults, finder, exact, newReferences, linkedTokenSource),
linkedTokenSource.Token).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,45 +121,41 @@ public async Task<ImmutableArray<CodeRefactoring>> GetRefactoringsAsync(
foreach (var provider in orderedProviders)
providerToIndex.Add(provider, providerToIndex.Count);

await ProducerConsumer<(CodeRefactoringProvider provider, CodeRefactoring codeRefactoring)>.RunAsync(
ProducerConsumerOptions.SingleReaderOptions,
produceItems: static (callback, args) =>
await ProducerConsumer<(CodeRefactoringProvider provider, CodeRefactoring codeRefactoring)>.RunParallelAsync(
source: orderedProviders,
produceItems: static async (provider, callback, args, cancellationToken) =>
{
// Run all providers in parallel to get the set of refactorings for this document.
RoslynParallel.ForEachAsync(
args.orderedProviders,
args.cancellationToken,
async (provider, cancellationToken) =>
{
// Log an individual telemetry event for slow code refactoring computations to
// allow targeted trace notifications for further investigation. 500 ms seemed like
// a good value so as to not be too noisy, but if fired, indicates a potential
// area requiring investigation.
const int CodeRefactoringTelemetryDelay = 500;

var providerName = provider.GetType().Name;

var logMessage = KeyValueLogMessage.Create(m =>
{
m[TelemetryLogging.KeyName] = providerName;
m[TelemetryLogging.KeyLanguageName] = args.document.Project.Language;
});

using (args.addOperationScope(providerName))
using (RoslynEventSource.LogInformationalBlock(FunctionId.Refactoring_CodeRefactoringService_GetRefactoringsAsync, providerName, cancellationToken))
using (TelemetryLogging.LogBlockTime(FunctionId.CodeRefactoring_Delay, logMessage, CodeRefactoringTelemetryDelay))
{
var refactoring = await [email protected](
args.document, args.state, provider, args.options, cancellationToken).ConfigureAwait(false);
if (refactoring != null)
callback((provider, refactoring));
}
}),
consumeItems: static async (reader, args) =>
// Log an individual telemetry event for slow code refactoring computations to
// allow targeted trace notifications for further investigation. 500 ms seemed like
// a good value so as to not be too noisy, but if fired, indicates a potential
// area requiring investigation.
const int CodeRefactoringTelemetryDelay = 500;

var providerName = provider.GetType().Name;

var logMessage = KeyValueLogMessage.Create(m =>
{
m[TelemetryLogging.KeyName] = providerName;
m[TelemetryLogging.KeyLanguageName] = args.document.Project.Language;
});

using (args.addOperationScope(providerName))
using (RoslynEventSource.LogInformationalBlock(FunctionId.Refactoring_CodeRefactoringService_GetRefactoringsAsync, providerName, cancellationToken))
using (TelemetryLogging.LogBlockTime(FunctionId.CodeRefactoring_Delay, logMessage, CodeRefactoringTelemetryDelay))
{
var refactoring = await [email protected](
args.document, args.state, provider, args.options, cancellationToken).ConfigureAwait(false);
if (refactoring != null)
callback((provider, refactoring));
}
},
consumeItems: static async (reader, args, cancellationToken) =>
{
await foreach (var pair in reader)
args.pairs.Add(pair);
},
args: (@this: this, document, state, orderedProviders, options, addOperationScope, pairs, cancellationToken),
args: (@this: this, document, state, options, addOperationScope, pairs),
cancellationToken).ConfigureAwait(false);

return pairs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ await PerformParallelSearchAsync(

async ValueTask ProcessSingleProjectGroupAsync(
IGrouping<ProjectKey, DocumentKey> group,
Action<RoslynNavigateToItem> onItemFound)
Action<RoslynNavigateToItem> onItemFound,
CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static async Task SearchGeneratedDocumentsInCurrentProcessAsync(
return;

async ValueTask ProcessSingleProjectAsync(
Project project, Action<RoslynNavigateToItem> onItemFound)
Project project, Action<RoslynNavigateToItem> onItemFound, CancellationToken cancellationToken)
{
// First generate all the source-gen docs. Then handoff to the standard search routine to find matches in them.
var sourceGeneratedDocs = await project.GetSourceGeneratedDocumentsAsync(cancellationToken).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ await PerformParallelSearchAsync(

async ValueTask SearchSingleProjectAsync(
Project project,
Action<RoslynNavigateToItem> onItemFound)
Action<RoslynNavigateToItem> onItemFound,
CancellationToken cancellationToken)
{
using var _ = GetPooledHashSet(priorityDocuments.Where(d => project == d.Project), out var highPriDocs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ private static IEnumerable<T> Prioritize<T>(IEnumerable<T> items, Func<T, bool>
/// </summary>
private static Task PerformParallelSearchAsync<T>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PerformParallelSearchAsync

Curious if this method provides any value now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can consider removing in follow up

IEnumerable<T> items,
Func<T, Action<RoslynNavigateToItem>, ValueTask> callback,
Func<T, Action<RoslynNavigateToItem>, CancellationToken, ValueTask> callback,
Func<ImmutableArray<RoslynNavigateToItem>, Task> onItemsFound,
CancellationToken cancellationToken)
=> ProducerConsumer<RoslynNavigateToItem>.RunAsync(
ProducerConsumerOptions.SingleReaderOptions,
produceItems: static (onItemFound, args) => RoslynParallel.ForEachAsync(args.items, args.cancellationToken, (item, cancellationToken) => args.callback(item, onItemFound)),
consumeItems: static (items, args) => args.onItemsFound(items),
args: (items, callback, onItemsFound, cancellationToken),
=> ProducerConsumer<RoslynNavigateToItem>.RunParallelAsync(
source: items,
produceItems: static async (item, onItemFound, args, cancellationToken) => await args.callback(item, onItemFound, cancellationToken).ConfigureAwait(false),
consumeItems: static (items, args, cancellationToken) => args.onItemsFound(items),
args: (items, callback, onItemsFound),
cancellationToken);
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ public async Task FindReferencesAsync(
{
await ProducerConsumer<Reference>.RunAsync(
ProducerConsumerOptions.SingleReaderOptions,
produceItems: static (onItemFound, args) => [email protected](args.symbols, onItemFound, args.cancellationToken),
consumeItems: static async (references, args) => await args.@this._progress.OnReferencesFoundAsync(references, @args.cancellationToken).ConfigureAwait(false),
(@this: this, symbols, cancellationToken),
produceItems: static (onItemFound, args, cancellationToken) => [email protected](args.symbols, onItemFound, cancellationToken),
consumeItems: static async (references, args, cancellationToken) => await args.@this._progress.OnReferencesFoundAsync(references, cancellationToken).ConfigureAwait(false),
(@this: this, symbols),
cancellationToken).ConfigureAwait(false);
}
finally
Expand Down
Loading
Loading