Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/initsteps race condition #8145

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
172 changes: 68 additions & 104 deletions src/Nethermind/Nethermind.Init/Steps/EthereumStepsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ public class EthereumStepsManager
{
private readonly ILogger _logger;

private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(true);
private readonly INethermindApi _api;
private readonly List<StepInfo> _allSteps;
private readonly Dictionary<Type, StepInfo> _allStepsByBaseType;

public EthereumStepsManager(
IEthereumStepsLoader loader,
Expand All @@ -36,153 +34,96 @@ public EthereumStepsManager(
?? throw new ArgumentNullException(nameof(logManager));

_allSteps = loader.LoadSteps(_api.GetType()).ToList();
_allStepsByBaseType = _allSteps.ToDictionary(static s => s.StepBaseType, static s => s);
}

private async Task ReviewDependencies(CancellationToken cancellationToken)
public async Task InitializeAll(CancellationToken cancellationToken)
{
bool changedAnything;
List<Task> allRequiredSteps = CreateAndExecuteSteps(cancellationToken);
if (allRequiredSteps.Count == 0)
return;
do
{
foreach (StepInfo stepInfo in _allSteps)
{
_logger.Debug($"{stepInfo} is {stepInfo.Stage}");
}

await _autoResetEvent.WaitOneAsync(cancellationToken);

if (_logger.IsDebug) _logger.Debug("Reviewing steps manager dependencies");

changedAnything = false;
foreach (StepInfo stepInfo in _allSteps)
{
cancellationToken.ThrowIfCancellationRequested();

if (stepInfo.Stage == StepInitializationStage.WaitingForDependencies)
{
bool allDependenciesFinished = true;
foreach (Type dependency in stepInfo.Dependencies)
{
StepInfo dependencyInfo = _allStepsByBaseType[dependency];
if (dependencyInfo.Stage != StepInitializationStage.Complete)
{
if (_logger.IsDebug) _logger.Debug($"{stepInfo} is waiting for {dependencyInfo}");
allDependenciesFinished = false;
break;
}
}

if (allDependenciesFinished)
{
stepInfo.Stage = StepInitializationStage.WaitingForExecution;
changedAnything = true;
if (_logger.IsDebug) _logger.Debug($"{stepInfo} stage changed to {stepInfo.Stage}");
_autoResetEvent.Set();
}
}
}
} while (changedAnything);
Task current = await Task.WhenAny(allRequiredSteps);
ReviewFailedAndThrow(current);
allRequiredSteps.Remove(current);
} while (allRequiredSteps.Any(s => !s.IsCompleted));
}

public async Task InitializeAll(CancellationToken cancellationToken)
{
while (_allSteps.Any(static s => s.Stage != StepInitializationStage.Complete))
{
cancellationToken.ThrowIfCancellationRequested();

RunOneRoundOfInitialization(cancellationToken);
await ReviewDependencies(cancellationToken);
ReviewFailedAndThrow();
}

await Task.WhenAll(_allPending);
}

private readonly ConcurrentQueue<Task> _allPending = new();

private void RunOneRoundOfInitialization(CancellationToken cancellationToken)
private List<Task> CreateAndExecuteSteps(CancellationToken cancellationToken)
{
int startedThisRound = 0;
Dictionary<Type, StepWrapper> createdSteps = [];

foreach (StepInfo stepInfo in _allSteps)
{
cancellationToken.ThrowIfCancellationRequested();

if (stepInfo.Stage != StepInitializationStage.WaitingForExecution)
{
continue;
}

IStep? step = CreateStepInstance(stepInfo);
if (step is null)
{
if (_logger.IsError) _logger.Error($"Unable to create instance of Ethereum runner step {stepInfo}");
continue;
}

if (_logger.IsDebug) _logger.Debug($"Executing step: {stepInfo}");

stepInfo.Stage = StepInitializationStage.Executing;
startedThisRound++;
Task task = ExecuteStep(step, stepInfo, cancellationToken);

if (step.MustInitialize)
{
_allPending.Enqueue(task);
}
else
createdSteps.Add(step.GetType(), new StepWrapper(step));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dependency is mapped by StepBaseType

}
List<Task> allRequiredSteps = new();
foreach (StepInfo stepInfo in _allSteps)
{
if (!createdSteps.ContainsKey(stepInfo.StepType))
{
stepInfo.Stage = StepInitializationStage.Complete;
throw new StepDependencyException($"A step {stepInfo} could not be created and initialization cannot proceed.");
}
}
StepWrapper stepWrapper = createdSteps[stepInfo.StepType];

if (startedThisRound == 0 && _allPending.All(static t => t.IsCompleted))
{
Interlocked.Increment(ref _foreverLoop);
if (_foreverLoop > 100)
Task task = ExecuteStep(stepWrapper, stepInfo, createdSteps, cancellationToken);
if (_logger.IsDebug) _logger.Debug($"Executing step: {stepInfo}");

if (stepWrapper.Step.MustInitialize)
{
if (_logger.IsWarn) _logger.Warn($"Didn't start any initialization steps during initialization round and all previous steps are already completed.");
allRequiredSteps.Add(task);
}
}
return allRequiredSteps;
}

private async Task ExecuteStep(IStep step, StepInfo stepInfo, CancellationToken cancellationToken)
private async Task ExecuteStep(StepWrapper stepWrapper, StepInfo stepInfo, Dictionary<Type, StepWrapper> steps, CancellationToken cancellationToken)
{
long startTime = Stopwatch.GetTimestamp();
try
{
await step.Execute(cancellationToken);
IEnumerable<StepWrapper> dependencies = [];
foreach (Type type in stepInfo.Dependencies)
{
if (!steps.ContainsKey(type))
throw new StepDependencyException($"The dependent step {type.Name} for {stepInfo.StepType.Name} was not created.");
dependencies = stepInfo.Dependencies.Select(t => steps[t]);
}
await stepWrapper.StartExecute(dependencies, cancellationToken);

if (_logger.IsDebug)
_logger.Debug(
$"Step {step.GetType().Name,-24} executed in {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms");

stepInfo.Stage = StepInitializationStage.Complete;
$"Step {stepWrapper.GetType().Name,-24} executed in {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms");
}
catch (Exception exception)
catch (Exception exception) when (exception is not TaskCanceledException)
{
if (step.MustInitialize)
if (stepWrapper.Step.MustInitialize)
{
if (_logger.IsError)
_logger.Error(
$"Step {step.GetType().Name,-24} failed after {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms",
$"Step {stepWrapper.GetType().Name,-24} failed after {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms",
exception);

stepInfo.Stage = StepInitializationStage.Failed;
throw;
}

if (_logger.IsWarn)
{
_logger.Warn(
$"Step {step.GetType().Name,-24} failed after {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms {exception}");
$"Step {stepWrapper.GetType().Name,-24} failed after {Stopwatch.GetElapsedTime(startTime).TotalMilliseconds:N0}ms {exception}");
}
stepInfo.Stage = StepInitializationStage.Complete;
}
finally
{
_autoResetEvent.Set();

if (_logger.IsDebug) _logger.Debug($"{step.GetType().Name,-24} complete");
if (_logger.IsDebug) _logger.Debug($"{stepWrapper.GetType().Name,-24} complete");
}
}

Expand All @@ -201,13 +142,36 @@ private async Task ExecuteStep(IStep step, StepInfo stepInfo, CancellationToken
return step;
}

private int _foreverLoop;
private void ReviewFailedAndThrow(Task task)
{
if (task?.IsFaulted == true && task?.Exception is not null)
ExceptionDispatchInfo.Capture(task.Exception.GetBaseException()).Throw();
}

private void ReviewFailedAndThrow()
private class StepWrapper(IStep step)
{
Task? anyFaulted = _allPending.FirstOrDefault(static t => t.IsFaulted);
if (anyFaulted?.IsFaulted == true && anyFaulted?.Exception is not null)
ExceptionDispatchInfo.Capture(anyFaulted.Exception.GetBaseException()).Throw();
public IStep Step => step;
public Task StepTask => _taskCompletedSource.Task;

private TaskCompletionSource _taskCompletedSource = new TaskCompletionSource();

public async Task StartExecute(IEnumerable<StepWrapper> dependentSteps, CancellationToken cancellationToken)
{
cancellationToken.Register(() => _taskCompletedSource.TrySetCanceled());

await Task.WhenAll(dependentSteps.Select(s => s.StepTask));
try
{
await step.Execute(cancellationToken);
_taskCompletedSource.TrySetResult();
}
catch
{
_taskCompletedSource.TrySetCanceled();
throw;
}
}
}
}

}
4 changes: 1 addition & 3 deletions src/Nethermind/Nethermind.Init/Steps/StepInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@ public StepInfo(Type type, Type baseType)

public Type[] Dependencies { get; }

public StepInitializationStage Stage { get; set; }

public override string ToString()
{
return $"{StepType.Name} : {StepBaseType.Name} ({Stage})";
return $"{StepType.Name} : {StepBaseType.Name}";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public async Task With_steps_from_here()
LimboLogs.Instance);

using CancellationTokenSource source = new CancellationTokenSource(TimeSpan.FromSeconds(1));

source.Cancel();
try
{
await stepsManager.InitializeAll(source.Token);
Expand Down Expand Up @@ -97,7 +97,7 @@ public async Task With_failing_steps()
LimboLogs.Instance);

using CancellationTokenSource source = new CancellationTokenSource(TimeSpan.FromSeconds(2));

source.Cancel();
try
{
await stepsManager.InitializeAll(source.Token);
Expand Down Expand Up @@ -153,7 +153,7 @@ public StepA(NethermindApi runnerContext)
}
}

[RunnerStepDependencies(typeof(StepC))]
[RunnerStepDependencies(typeof(StepCStandard))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the previous test case is correct, the dependency is declared by base type not the subtype. Its a subtle thing that plugins rely on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see. If that is the intended behavior there can be an issue if steps are inhering from another step like InitDatabaseSnapshot and something then uses that as a dependency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many scary issue that I don't want to bring in my sleep.

Copy link
Member

@LukaszRozmej LukaszRozmej Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my original implementation, that wasn't an issue, steps were grouped by base step for dependency resolution

that is why _allStepsByBaseType existed

public class StepB : IStep
{
public Task Execute(CancellationToken cancellationToken)
Expand Down