Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] use deep copy of bit arrays when getting array node state #5681

Merged
merged 9 commits into from
Aug 23, 2024
13 changes: 0 additions & 13 deletions .run/single-binary.run.xml

This file was deleted.

6 changes: 5 additions & 1 deletion flytepropeller/events/admin_eventsink.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ func (s *adminEventSink) Sink(ctx context.Context, message proto.Message) error

if s.filter.Contains(ctx, id) {
logger.Debugf(ctx, "event '%s' has already been sent", string(id))
return nil
return &errors.EventError{
Code: errors.AlreadyExists,
Cause: fmt.Errorf("event has already been sent"),
Message: "Event Already Exists",
}
}

// Validate submission with rate limiter and send admin event
Expand Down
9 changes: 6 additions & 3 deletions flytepropeller/events/admin_eventsink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,16 @@ func TestAdminFilterContains(t *testing.T) {
filter.OnContainsMatch(mock.Anything, mock.Anything).Return(true)

wfErr := adminEventSink.Sink(ctx, wfEvent)
assert.NoError(t, wfErr)
assert.Error(t, wfErr)
assert.True(t, errors.IsAlreadyExists(wfErr))

nodeErr := adminEventSink.Sink(ctx, nodeEvent)
assert.NoError(t, nodeErr)
assert.Error(t, nodeErr)
assert.True(t, errors.IsAlreadyExists(nodeErr))

taskErr := adminEventSink.Sink(ctx, taskEvent)
assert.NoError(t, taskErr)
assert.Error(t, taskErr)
assert.True(t, errors.IsAlreadyExists(taskErr))
}

func TestIDFromMessage(t *testing.T) {
Expand Down
6 changes: 5 additions & 1 deletion flytepropeller/events/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
}

func (r EventError) Error() string {
return fmt.Sprintf("%s: %s, caused by [%s]", r.Code, r.Message, r.Cause.Error())
var cause string
if r.Cause != nil {
cause = r.Cause.Error()

Check warning on line 38 in flytepropeller/events/errors/errors.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/events/errors/errors.go#L36-L38

Added lines #L36 - L38 were not covered by tests
}
return fmt.Sprintf("%s: %s, caused by [%s]", r.Code, r.Message, cause)

Check warning on line 40 in flytepropeller/events/errors/errors.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/events/errors/errors.go#L40

Added line #L40 was not covered by tests
}

func (r *EventError) Is(target error) bool {
Expand Down
21 changes: 21 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,27 @@
}
}

func (in *ArrayNodeStatus) DeepCopyInto(out *ArrayNodeStatus) {
*out = *in
out.MutableStruct = in.MutableStruct

Check warning on line 321 in flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L319-L321

Added lines #L319 - L321 were not covered by tests

if in.ExecutionError != nil {
in, out := &in.ExecutionError, &out.ExecutionError
*out = new(core.ExecutionError)
*out = *in

Check warning on line 326 in flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L323-L326

Added lines #L323 - L326 were not covered by tests
}
}

func (in *ArrayNodeStatus) DeepCopy() *ArrayNodeStatus {
if in == nil {
return nil

Check warning on line 332 in flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L330-L332

Added lines #L330 - L332 were not covered by tests
}

out := &ArrayNodeStatus{}
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
in.DeepCopyInto(out)
return out

Check warning on line 337 in flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L335-L337

Added lines #L335 - L337 were not covered by tests
}

type NodeStatus struct {
MutableStruct
Phase NodePhase `json:"phase,omitempty"`
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ const (
type EventConfig struct {
RawOutputPolicy RawOutputPolicy `json:"raw-output-policy" pflag:",How output data should be passed along in execution events."`
FallbackToOutputReference bool `json:"fallback-to-output-reference" pflag:",Whether output data should be sent by reference when it is too large to be sent inline in execution events."`
ErrorOnAlreadyExists bool `json:"error-on-already-exists" pflag:",Whether to return an error when an event already exists."`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worth enabling this in single-binary by default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should never really be enabled except for array node. I didn't want to update the record task event function, but I guess that might be better if users were to unknowingly change this and have executions fail.

Copy link
Contributor Author

@pvditt pvditt Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I can just remove the ability to set this and keep it in the eventconfig struct

ah darn, I should've made that change here #5680

}

// ParallelismBehavior defines how ArrayNode should handle subNode parallelism by default
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions flytepropeller/pkg/controller/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 46 additions & 5 deletions flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/ioutils"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/array/errorcollector"
"github.com/flyteorg/flyte/flytepropeller/events"
eventsErr "github.com/flyteorg/flyte/flytepropeller/events/errors"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/validators"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
Expand All @@ -21,6 +22,7 @@
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/k8s"
"github.com/flyteorg/flyte/flytestdlib/bitarray"
stdConfig "github.com/flyteorg/flyte/flytestdlib/config"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/storage"
Expand Down Expand Up @@ -112,6 +114,10 @@

// update state for subNodes
if err := eventRecorder.finalize(ctx, nCtx, idlcore.TaskExecution_ABORTED, 0, a.eventConfig); err != nil {
// a task event with abort phase is already emitted when handling ArrayNodePhaseFailing
if eventsErr.IsAlreadyExists(err) {
return nil

Check warning on line 119 in flytepropeller/pkg/controller/nodes/array/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/handler.go#L118-L119

Added lines #L118 - L119 were not covered by tests
}
logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error())
return err
}
Expand Down Expand Up @@ -579,12 +585,35 @@

// increment taskPhaseVersion if we detect any changes in subNode state.
if incrementTaskPhaseVersion {
arrayNodeState.TaskPhaseVersion = arrayNodeState.TaskPhaseVersion + 1
arrayNodeState.TaskPhaseVersion++
}

if err := eventRecorder.finalize(ctx, nCtx, taskPhase, arrayNodeState.TaskPhaseVersion, a.eventConfig); err != nil {
logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error())
return handler.UnknownTransition, err
const maxRetries = 3
retries := 0
for retries <= maxRetries {
err := eventRecorder.finalize(ctx, nCtx, taskPhase, arrayNodeState.TaskPhaseVersion, a.eventConfig)

if err == nil {
break
}

// Handle potential race condition if FlyteWorkflow CRD fails to get synced
if eventsErr.IsAlreadyExists(err) {
if !incrementTaskPhaseVersion {
break

Check warning on line 603 in flytepropeller/pkg/controller/nodes/array/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/handler.go#L603

Added line #L603 was not covered by tests
}
logger.Warnf(ctx, "Event version already exists, bumping version and retrying (%d/%d): [%s]", retries+1, maxRetries, err.Error())
arrayNodeState.TaskPhaseVersion++
} else {
logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error())
return handler.UnknownTransition, err
}

retries++
if retries > maxRetries {
logger.Errorf(ctx, "ArrayNode event recording failed after %d retries: [%s]", maxRetries, err.Error())
return handler.UnknownTransition, err
}
}

// if the ArrayNode phase has changed we need to reset the taskPhaseVersion to 0
Expand Down Expand Up @@ -632,9 +661,21 @@
return nil, err
}

eventConfigCopy, err := stdConfig.DeepCopyConfig(eventConfig)
if err != nil {
return nil, err

Check warning on line 666 in flytepropeller/pkg/controller/nodes/array/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/handler.go#L666

Added line #L666 was not covered by tests
}

deepCopiedEventConfig, ok := eventConfigCopy.(*config.EventConfig)
if !ok {
return nil, fmt.Errorf("deep copy error: expected *config.EventConfig, but got %T", eventConfigCopy)

Check warning on line 671 in flytepropeller/pkg/controller/nodes/array/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/handler.go#L671

Added line #L671 was not covered by tests
}

deepCopiedEventConfig.ErrorOnAlreadyExists = true

arrayScope := scope.NewSubScope("array")
return &arrayNodeHandler{
eventConfig: eventConfig,
eventConfig: deepCopiedEventConfig,
gatherOutputsRequestChannel: make(chan *gatherOutputsRequest),
metrics: newMetrics(arrayScope),
nodeExecutionRequestChannel: make(chan *nodeExecutionRequest),
Expand Down
Loading
Loading