diff --git a/flyteplugins/go/tasks/plugins/array/arraystatus/status.go b/flyteplugins/go/tasks/plugins/array/arraystatus/status.go index 7b7f41c556..1a5d745254 100644 --- a/flyteplugins/go/tasks/plugins/array/arraystatus/status.go +++ b/flyteplugins/go/tasks/plugins/array/arraystatus/status.go @@ -5,6 +5,9 @@ package arraystatus import ( + "encoding/binary" + "hash/fnv" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flytestdlib/bitarray" ) @@ -20,6 +23,22 @@ type ArrayStatus struct { Detailed bitarray.CompactArray `json:"details"` } +// HashCode computes a hash of the phase indicies stored in the Detailed array to uniquely represent +// a collection of subtask phases. +func (a ArrayStatus) HashCode() (uint64, error) { + hash := fnv.New64() + bytes := make([]byte, 8) + for _, phaseIndex := range a.Detailed.GetItems() { + binary.LittleEndian.PutUint64(bytes, phaseIndex) + _, err := hash.Write(bytes) + if err != nil { + return 0, err + } + } + + return hash.Sum64(), nil +} + // This is a status object that is returned after we make Catalog calls to see if subtasks are Cached type ArrayCachedStatus struct { CachedJobs *bitarray.BitSet `json:"cachedJobs"` diff --git a/flyteplugins/go/tasks/plugins/array/arraystatus/status_test.go b/flyteplugins/go/tasks/plugins/array/arraystatus/status_test.go index 2282c26393..d45496d58d 100644 --- a/flyteplugins/go/tasks/plugins/array/arraystatus/status_test.go +++ b/flyteplugins/go/tasks/plugins/array/arraystatus/status_test.go @@ -8,9 +8,91 @@ import ( "testing" types "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" + + "github.com/flyteorg/flytestdlib/bitarray" + "github.com/stretchr/testify/assert" ) +func TestArrayStatus_HashCode(t *testing.T) { + size := uint(10) + + t.Run("Empty Equal", func(t *testing.T) { + expected := ArrayStatus{} + expectedHashCode, err := expected.HashCode() + assert.Nil(t, err) + + actual := ArrayStatus{} + actualHashCode, err := actual.HashCode() + assert.Nil(t, err) + + assert.Equal(t, expectedHashCode, actualHashCode) + }) + + t.Run("Populated Equal", func(t *testing.T) { + expectedDetailed, err := bitarray.NewCompactArray(size, bitarray.Item(len(types.Phases)-1)) + assert.Nil(t, err) + expected := ArrayStatus{ + Detailed: expectedDetailed, + } + expectedHashCode, err := expected.HashCode() + assert.Nil(t, err) + + actualDetailed, err := bitarray.NewCompactArray(size, bitarray.Item(len(types.Phases)-1)) + assert.Nil(t, err) + actual := ArrayStatus{ + Detailed: actualDetailed, + } + actualHashCode, err := actual.HashCode() + assert.Nil(t, err) + + assert.Equal(t, expectedHashCode, actualHashCode) + }) + + t.Run("Updated Not Equal", func(t *testing.T) { + expectedDetailed, err := bitarray.NewCompactArray(size, bitarray.Item(len(types.Phases)-1)) + assert.Nil(t, err) + expectedDetailed.SetItem(0, uint64(1)) + expected := ArrayStatus{ + Detailed: expectedDetailed, + } + expectedHashCode, err := expected.HashCode() + assert.Nil(t, err) + + actualDetailed, err := bitarray.NewCompactArray(size, bitarray.Item(len(types.Phases)-1)) + assert.Nil(t, err) + actual := ArrayStatus{ + Detailed: actualDetailed, + } + actualHashCode, err := actual.HashCode() + assert.Nil(t, err) + + assert.NotEqual(t, expectedHashCode, actualHashCode) + }) + + t.Run("Updated Equal", func(t *testing.T) { + expectedDetailed, err := bitarray.NewCompactArray(size, bitarray.Item(len(types.Phases)-1)) + assert.Nil(t, err) + expectedDetailed.SetItem(0, uint64(1)) + expected := ArrayStatus{ + Detailed: expectedDetailed, + } + expectedHashCode, err := expected.HashCode() + assert.Nil(t, err) + + actualDetailed, err := bitarray.NewCompactArray(size, bitarray.Item(len(types.Phases)-1)) + actualDetailed.SetItem(0, uint64(1)) + assert.Nil(t, err) + actual := ArrayStatus{ + Detailed: actualDetailed, + } + actualHashCode, err := actual.HashCode() + assert.Nil(t, err) + + assert.Equal(t, expectedHashCode, actualHashCode) + }) +} + func TestArraySummary_MergeFrom(t *testing.T) { t.Run("Update when not equal", func(t *testing.T) { expected := ArraySummary{ diff --git a/flyteplugins/go/tasks/plugins/array/awsbatch/executor.go b/flyteplugins/go/tasks/plugins/array/awsbatch/executor.go index 6a7d2a7d28..23cc0b3fc4 100644 --- a/flyteplugins/go/tasks/plugins/array/awsbatch/executor.go +++ b/flyteplugins/go/tasks/plugins/array/awsbatch/executor.go @@ -63,7 +63,7 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c var err error - p, _ := pluginState.GetPhase() + p, version := pluginState.GetPhase() logger.Infof(ctx, "Entering handle with phase [%v]", p) switch p { @@ -85,16 +85,16 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c e.jobStore, tCtx.DataStore(), pluginConfig, pluginState, e.metrics) case arrayCore.PhaseAssembleFinalOutput: - pluginState.State, err = array.AssembleFinalOutputs(ctx, e.outputAssembler, tCtx, arrayCore.PhaseSuccess, pluginState.State) + pluginState.State, err = array.AssembleFinalOutputs(ctx, e.outputAssembler, tCtx, arrayCore.PhaseSuccess, version, pluginState.State) case arrayCore.PhaseWriteToDiscoveryThenFail: - pluginState.State, err = array.WriteToDiscovery(ctx, tCtx, pluginState.State, arrayCore.PhaseAssembleFinalError) + pluginState.State, err = array.WriteToDiscovery(ctx, tCtx, pluginState.State, arrayCore.PhaseAssembleFinalError, version) case arrayCore.PhaseWriteToDiscovery: - pluginState.State, err = array.WriteToDiscovery(ctx, tCtx, pluginState.State, arrayCore.PhaseAssembleFinalOutput) + pluginState.State, err = array.WriteToDiscovery(ctx, tCtx, pluginState.State, arrayCore.PhaseAssembleFinalOutput, version) case arrayCore.PhaseAssembleFinalError: - pluginState.State, err = array.AssembleFinalOutputs(ctx, e.errorAssembler, tCtx, arrayCore.PhaseRetryableFailure, pluginState.State) + pluginState.State, err = array.AssembleFinalOutputs(ctx, e.errorAssembler, tCtx, arrayCore.PhaseRetryableFailure, version, pluginState.State) } if err != nil { diff --git a/flyteplugins/go/tasks/plugins/array/awsbatch/monitor.go b/flyteplugins/go/tasks/plugins/array/awsbatch/monitor.go index a7d033aa15..232068fe15 100644 --- a/flyteplugins/go/tasks/plugins/array/awsbatch/monitor.go +++ b/flyteplugins/go/tasks/plugins/array/awsbatch/monitor.go @@ -63,6 +63,11 @@ func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata Detailed: arrayCore.NewPhasesCompactArray(uint(currentState.GetExecutionArraySize())), } + currentSubTaskPhaseHash, err := currentState.GetArrayStatus().HashCode() + if err != nil { + return currentState, err + } + queued := 0 for childIdx, subJob := range job.SubJobs { actualPhase := subJob.Status.Phase @@ -132,16 +137,20 @@ func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata errorMsg := msg.Summary(cfg.MaxErrorStringLength) parentState = parentState.SetReason(errorMsg) } + _, version := currentState.GetPhase() if phase == arrayCore.PhaseCheckingSubTaskExecutions { - newPhaseVersion := uint32(0) - // For now, the only changes to PhaseVersion and PreviousSummary occur for running array jobs. - for phase, count := range parentState.GetArrayStatus().Summary { - newPhaseVersion += uint32(phase) * uint32(count) + newSubTaskPhaseHash, err := parentState.GetArrayStatus().HashCode() + if err != nil { + return currentState, err + } + + if newSubTaskPhaseHash != currentSubTaskPhaseHash { + version++ } - parentState = parentState.SetPhase(phase, newPhaseVersion).SetReason("Task is still running.") + parentState = parentState.SetPhase(phase, version).SetReason("Task is still running") } else { - parentState = parentState.SetPhase(phase, core.DefaultPhaseVersion) + parentState = parentState.SetPhase(phase, version) } p, v := parentState.GetPhase() diff --git a/flyteplugins/go/tasks/plugins/array/catalog.go b/flyteplugins/go/tasks/plugins/array/catalog.go index 9ba3a4c539..874a267bb1 100644 --- a/flyteplugins/go/tasks/plugins/array/catalog.go +++ b/flyteplugins/go/tasks/plugins/array/catalog.go @@ -193,7 +193,7 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex return state, nil } -func WriteToDiscovery(ctx context.Context, tCtx core.TaskExecutionContext, state *arrayCore.State, phaseOnSuccess arrayCore.Phase) (*arrayCore.State, error) { +func WriteToDiscovery(ctx context.Context, tCtx core.TaskExecutionContext, state *arrayCore.State, phaseOnSuccess arrayCore.Phase, versionOnSuccess uint32) (*arrayCore.State, error) { // Check that the taskTemplate is valid taskTemplate, err := tCtx.TaskReader().Read(ctx) @@ -205,7 +205,7 @@ func WriteToDiscovery(ctx context.Context, tCtx core.TaskExecutionContext, state if tMeta := taskTemplate.Metadata; tMeta == nil || !tMeta.Discoverable { logger.Debugf(ctx, "Task is not marked as discoverable. Moving to [%v] phase.", phaseOnSuccess) - return state.SetPhase(phaseOnSuccess, core.DefaultPhaseVersion).SetReason("Task is not discoverable."), nil + return state.SetPhase(phaseOnSuccess, versionOnSuccess).SetReason("Task is not discoverable."), nil } var inputReaders []io.InputReader @@ -263,7 +263,7 @@ func WriteToDiscovery(ctx context.Context, tCtx core.TaskExecutionContext, state } if len(catalogWriterItems) == 0 { - state.SetPhase(phaseOnSuccess, core.DefaultPhaseVersion).SetReason("No outputs need to be cached.") + state.SetPhase(phaseOnSuccess, versionOnSuccess).SetReason("No outputs need to be cached.") return state, nil } @@ -273,7 +273,7 @@ func WriteToDiscovery(ctx context.Context, tCtx core.TaskExecutionContext, state } if allWritten { - state.SetPhase(phaseOnSuccess, core.DefaultPhaseVersion).SetReason("Finished writing catalog cache.") + state.SetPhase(phaseOnSuccess, versionOnSuccess).SetReason("Finished writing catalog cache.") } return state, nil diff --git a/flyteplugins/go/tasks/plugins/array/core/state.go b/flyteplugins/go/tasks/plugins/array/core/state.go index 65e96572d5..66f4b7eba8 100644 --- a/flyteplugins/go/tasks/plugins/array/core/state.go +++ b/flyteplugins/go/tasks/plugins/array/core/state.go @@ -164,11 +164,6 @@ func ToArrayJob(structObj *structpb.Struct, taskTypeVersion int32) (*idlPlugins. return arrayJob, err } -func GetPhaseVersionOffset(currentPhase Phase, length int64) uint32 { - // NB: Make sure this is the last/highest value of the Phase! - return uint32(length * (int64(core.PhasePermanentFailure) + 1) * int64(currentPhase)) -} - // Any state of the plugin needs to map to a core.PhaseInfo (which in turn will map to Admin events) so that the rest // of the Flyte platform can understand what's happening. That is, each possible state that our plugin state // machine returns should map to a unique (core.Phase, core.PhaseInfo.version). @@ -189,20 +184,16 @@ func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idl case PhaseStart: phaseInfo = core.PhaseInfoInitializing(t, core.DefaultPhaseVersion, state.GetReason(), nowTaskInfo) + case PhaseWaitingForResources: + phaseInfo = core.PhaseInfoWaitingForResourcesInfo(t, version, state.GetReason(), nowTaskInfo) + case PhasePreLaunch: - version := GetPhaseVersionOffset(p, 1) + version - phaseInfo = core.PhaseInfoRunning(version, nowTaskInfo) + fallthrough case PhaseLaunch: - // The first time we return a Running core.Phase, we can just use the version inside the state object itself. - phaseInfo = core.PhaseInfoRunning(version, nowTaskInfo) - - case PhaseWaitingForResources: - phaseInfo = core.PhaseInfoWaitingForResourcesInfo(t, version, state.GetReason(), nowTaskInfo) + fallthrough case PhaseCheckingSubTaskExecutions: - // For future Running core.Phases, we have to make sure we don't use an earlier Admin version number, - // which means we need to offset things. fallthrough case PhaseAssembleFinalOutput: @@ -215,15 +206,11 @@ func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idl fallthrough case PhaseWriteToDiscovery: - // If the array task has 0 inputs we need to ensure the phaseVersion changes so that the - // task can progess. Therefore we default to task length 1 to ensure phase updates. - length := int64(1) - if state.GetOriginalArraySize() != 0 { - length = state.GetOriginalArraySize() - } - - version := GetPhaseVersionOffset(p, length) + version - phaseInfo = core.PhaseInfoRunning(version, nowTaskInfo) + // The state version is only incremented in PhaseCheckingSubTaskExecutions when subtask + // phases are updated. Therefore by adding the phase to the state version we ensure that + // (1) all phase changes will have a new phase version and (2) all subtask phase updates + // result in monotonically increasing phase version. + phaseInfo = core.PhaseInfoRunning(version+uint32(p), nowTaskInfo) case PhaseSuccess: phaseInfo = core.PhaseInfoSuccess(nowTaskInfo) diff --git a/flyteplugins/go/tasks/plugins/array/core/state_test.go b/flyteplugins/go/tasks/plugins/array/core/state_test.go index 36c091e3cc..6549794314 100644 --- a/flyteplugins/go/tasks/plugins/array/core/state_test.go +++ b/flyteplugins/go/tasks/plugins/array/core/state_test.go @@ -14,14 +14,6 @@ import ( "github.com/stretchr/testify/assert" ) -func TestGetPhaseVersionOffset(t *testing.T) { - length := int64(100) - checkSubTasksOffset := GetPhaseVersionOffset(PhaseAssembleFinalOutput, length) - discoverWriteOffset := GetPhaseVersionOffset(PhaseWriteToDiscovery, length) - // There are 9 possible core.Phases, from PhaseUndefined to PhasePermanentFailure - assert.Equal(t, uint32(length*9), discoverWriteOffset-checkSubTasksOffset) -} - func TestInvertBitSet(t *testing.T) { input := bitarray.NewBitSet(4) input.Set(0) @@ -105,7 +97,7 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, nil) assert.NoError(t, err) assert.Equal(t, core.PhaseRunning, phaseInfo.Phase()) - assert.Equal(t, uint32(368), phaseInfo.Version()) + assert.Equal(t, uint32(12), phaseInfo.Version()) }) t.Run("write to discovery", func(t *testing.T) { @@ -124,7 +116,7 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, nil) assert.NoError(t, err) assert.Equal(t, core.PhaseRunning, phaseInfo.Phase()) - assert.Equal(t, uint32(548), phaseInfo.Version()) + assert.Equal(t, uint32(14), phaseInfo.Version()) }) t.Run("success", func(t *testing.T) { diff --git a/flyteplugins/go/tasks/plugins/array/k8s/executor.go b/flyteplugins/go/tasks/plugins/array/k8s/executor.go index 729363d026..1727b69efe 100644 --- a/flyteplugins/go/tasks/plugins/array/k8s/executor.go +++ b/flyteplugins/go/tasks/plugins/array/k8s/executor.go @@ -117,16 +117,16 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c tCtx.DataStore(), tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetRawOutputPrefix(), pluginState) case arrayCore.PhaseAssembleFinalOutput: - nextState, err = array.AssembleFinalOutputs(ctx, e.outputsAssembler, tCtx, arrayCore.PhaseSuccess, pluginState) + nextState, err = array.AssembleFinalOutputs(ctx, e.outputsAssembler, tCtx, arrayCore.PhaseSuccess, version, pluginState) case arrayCore.PhaseWriteToDiscoveryThenFail: - nextState, err = array.WriteToDiscovery(ctx, tCtx, pluginState, arrayCore.PhaseAssembleFinalError) + nextState, err = array.WriteToDiscovery(ctx, tCtx, pluginState, arrayCore.PhaseAssembleFinalError, version) case arrayCore.PhaseWriteToDiscovery: - nextState, err = array.WriteToDiscovery(ctx, tCtx, pluginState, arrayCore.PhaseAssembleFinalOutput) + nextState, err = array.WriteToDiscovery(ctx, tCtx, pluginState, arrayCore.PhaseAssembleFinalOutput, version) case arrayCore.PhaseAssembleFinalError: - nextState, err = array.AssembleFinalOutputs(ctx, e.errorAssembler, tCtx, arrayCore.PhasePermanentFailure, pluginState) + nextState, err = array.AssembleFinalOutputs(ctx, e.errorAssembler, tCtx, arrayCore.PhasePermanentFailure, version, pluginState) default: nextState = pluginState diff --git a/flyteplugins/go/tasks/plugins/array/k8s/management.go b/flyteplugins/go/tasks/plugins/array/k8s/management.go index d9a8e112b9..3e3eb042f8 100644 --- a/flyteplugins/go/tasks/plugins/array/k8s/management.go +++ b/flyteplugins/go/tasks/plugins/array/k8s/management.go @@ -128,6 +128,11 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon currentParallelism := 0 maxParallelism := int(arrayJob.Parallelism) + currentSubTaskPhaseHash, err := currentState.GetArrayStatus().HashCode() + if err != nil { + return currentState, externalResources, err + } + for childIdx, existingPhaseIdx := range currentState.GetArrayStatus().Detailed.GetItems() { existingPhase := core.Phases[existingPhaseIdx] retryAttempt := currentState.RetryAttempts.GetItem(childIdx) @@ -255,17 +260,20 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon newState = newState.SetReason(errorMsg) } + _, version := currentState.GetPhase() if phase == arrayCore.PhaseCheckingSubTaskExecutions { - newPhaseVersion := uint32(0) + newSubTaskPhaseHash, err := newState.GetArrayStatus().HashCode() + if err != nil { + return currentState, externalResources, err + } - // For now, the only changes to PhaseVersion and PreviousSummary occur for running array jobs. - for phase, count := range newState.GetArrayStatus().Summary { - newPhaseVersion += uint32(phase) * uint32(count) + if newSubTaskPhaseHash != currentSubTaskPhaseHash { + version++ } - newState = newState.SetPhase(phase, newPhaseVersion).SetReason("Task is still running.") + newState = newState.SetPhase(phase, version).SetReason("Task is still running") } else { - newState = newState.SetPhase(phase, core.DefaultPhaseVersion) + newState = newState.SetPhase(phase, version) } return newState, externalResources, nil diff --git a/flyteplugins/go/tasks/plugins/array/outputs.go b/flyteplugins/go/tasks/plugins/array/outputs.go index 7aad40b082..531f204877 100644 --- a/flyteplugins/go/tasks/plugins/array/outputs.go +++ b/flyteplugins/go/tasks/plugins/array/outputs.go @@ -170,7 +170,7 @@ func buildFinalPhases(executedTasks bitarray.CompactArray, indexes *bitarray.Bit // Assembles a single outputs.pb that contain all the outputs of the subtasks and write them to the final OutputWriter. // This step can potentially be expensive (hence the metrics) and why it's offloaded to a background process. func AssembleFinalOutputs(ctx context.Context, assemblyQueue OutputAssembler, tCtx pluginCore.TaskExecutionContext, - terminalPhase arrayCore.Phase, state *arrayCore.State) (*arrayCore.State, error) { + terminalPhase arrayCore.Phase, terminalVersion uint32, state *arrayCore.State) (*arrayCore.State, error) { // Otherwise, run the data catalog steps - create and submit work items to the catalog processor, // build input readers @@ -191,7 +191,7 @@ func AssembleFinalOutputs(ctx context.Context, assemblyQueue OutputAssembler, tC outputVariables := taskTemplate.GetInterface().GetOutputs() if outputVariables == nil || outputVariables.GetVariables() == nil { // If the task has no outputs, bail early. - state = state.SetPhase(terminalPhase, 0).SetReason("Task has no outputs") + state = state.SetPhase(terminalPhase, terminalVersion).SetReason("Task has no outputs") return state, nil } @@ -241,7 +241,7 @@ func AssembleFinalOutputs(ctx context.Context, assemblyQueue OutputAssembler, tC } if outputExists { - state = state.SetPhase(terminalPhase, 0).SetReason("Assembled outputs") + state = state.SetPhase(terminalPhase, terminalVersion).SetReason("Assembled outputs") return state, nil } @@ -256,11 +256,11 @@ func AssembleFinalOutputs(ctx context.Context, assemblyQueue OutputAssembler, tC return nil, err } - state = state.SetPhase(terminalPhase, 0). + state = state.SetPhase(terminalPhase, terminalVersion). SetReason("Assembled error"). SetExecutionErr(ee.ExecutionError) } else { - state = state.SetPhase(terminalPhase, 0).SetReason("No output or error assembled.") + state = state.SetPhase(terminalPhase, terminalVersion).SetReason("No output or error assembled.") } case workqueue.WorkStatusFailed: state = state.SetExecutionErr(&core.ExecutionError{ diff --git a/flyteplugins/go/tasks/plugins/array/outputs_test.go b/flyteplugins/go/tasks/plugins/array/outputs_test.go index 96f4e5e6e4..de46fd5fa7 100644 --- a/flyteplugins/go/tasks/plugins/array/outputs_test.go +++ b/flyteplugins/go/tasks/plugins/array/outputs_test.go @@ -256,9 +256,10 @@ func TestAssembleFinalOutputs(t *testing.T) { tCtx.OnMaxDatasetSizeBytes().Return(10000) tCtx.OnDataStore().Return(d) - _, err = AssembleFinalOutputs(ctx, assemblyQueue, tCtx, arrayCore.PhaseSuccess, s) + _, err = AssembleFinalOutputs(ctx, assemblyQueue, tCtx, arrayCore.PhaseSuccess, 1, s) assert.NoError(t, err) assert.Equal(t, arrayCore.PhaseSuccess, s.CurrentPhase) + assert.Equal(t, uint32(1), s.PhaseVersion) assert.False(t, called) }) @@ -294,9 +295,10 @@ func TestAssembleFinalOutputs(t *testing.T) { tCtx := &mocks3.TaskExecutionContext{} tCtx.OnTaskExecutionMetadata().Return(tMeta) - _, err := AssembleFinalOutputs(ctx, assemblyQueue, tCtx, arrayCore.PhaseSuccess, s) + _, err := AssembleFinalOutputs(ctx, assemblyQueue, tCtx, arrayCore.PhaseSuccess, 1, s) assert.NoError(t, err) assert.Equal(t, arrayCore.PhaseRetryableFailure, s.CurrentPhase) + assert.Equal(t, uint32(0), s.PhaseVersion) assert.False(t, called) }) @@ -377,9 +379,10 @@ func TestAssembleFinalOutputs(t *testing.T) { tCtx.OnDataStore().Return(ds) tCtx.OnMaxDatasetSizeBytes().Return(10000) - _, err = AssembleFinalOutputs(ctx, assemblyQueue, tCtx, arrayCore.PhaseSuccess, s) + _, err = AssembleFinalOutputs(ctx, assemblyQueue, tCtx, arrayCore.PhaseSuccess, 1, s) assert.NoError(t, err) assert.Equal(t, arrayCore.PhaseSuccess, s.CurrentPhase) + assert.Equal(t, uint32(1), s.PhaseVersion) assert.True(t, called) }) }