Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: using domain-qualified finalizers #6023

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ require (
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-redis/redis v6.15.7+incompatible // indirect
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/go-test/deep v1.0.7 // indirect
github.com/goccy/go-json v0.10.2 // indirect
Expand Down Expand Up @@ -155,6 +156,7 @@ require (
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/ncw/swift v1.0.53 // indirect
github.com/nxadm/tail v1.4.11 // indirect
github.com/ory/go-acc v0.2.6 // indirect
github.com/ory/go-convenience v0.1.0 // indirect
github.com/ory/viper v1.7.5 // indirect
Expand Down Expand Up @@ -199,6 +201,7 @@ require (
golang.org/x/term v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
Expand All @@ -212,6 +215,7 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.28.4 // indirect
k8s.io/component-base v0.28.4 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
Expand Down
6 changes: 6 additions & 0 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ github.com/go-openapi/validate v0.18.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+
github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2KDnRCRMUi7GTA=
github.com/go-openapi/validate v0.19.3/go.mod h1:90Vh6jjkTn+OT1Eefm0ZixWNFjhtOH7vS9k0lo6zwJo=
github.com/go-openapi/validate v0.19.10/go.mod h1:RKEZTUWDkxKQxN2jDT7ZnZi2bhZlbNMAuKvKB+IaGx8=
github.com/go-redis/redis v6.15.7+incompatible h1:3skhDh95XQMpnqeqNftPkQD9jL9e5e36z/1SUm6dy1U=
github.com/go-redis/redis v6.15.7+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
Expand Down Expand Up @@ -1013,6 +1015,8 @@ github.com/ncw/swift v1.0.53/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/
github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d/go.mod h1:YUTz3bUH2ZwIWBy3CJBeOBEugqcmXREj14T+iG/4k4U=
github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY=
github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/oleiade/reflections v1.0.0/go.mod h1:RbATFBbKYkVdqmSFtx13Bb/tVhR0lgOBXunWTZKeL4w=
github.com/oleiade/reflections v1.0.1 h1:D1XO3LVEYroYskEsoSiGItp9RUxG6jWnCVvrqH0HHQM=
Expand All @@ -1024,6 +1028,7 @@ github.com/onsi/ginkgo v1.9.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU=
github.com/onsi/ginkgo/v2 v2.11.0/go.mod h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM=
github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
Expand Down Expand Up @@ -1967,6 +1972,7 @@ gopkg.in/square/go-jose.v2 v2.2.2/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76
gopkg.in/square/go-jose.v2 v2.5.2-0.20210529014059-a5c7eec3c614/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
gopkg.in/square/go-jose.v2 v2.6.0 h1:NGk74WTnPKBNUhNzQX7PYcTLUjoq7mzKk2OKbvwk2iI=
gopkg.in/square/go-jose.v2 v2.6.0/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19/go.mod h1:o4V0GXN9/CAmCsvJ0oXYZvrZOe7syiDZSN1GWGZTGzc=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
Expand Down
4 changes: 3 additions & 1 deletion flyteadmin/pkg/workflowengine/impl/prepare_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/codes"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
"github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller"
)

func addMapValues(overrides map[string]string, defaultValues map[string]string) map[string]string {
Expand Down Expand Up @@ -130,7 +132,7 @@ func PrepareFlyteWorkflow(data interfaces.ExecutionData, flyteWorkflow *v1alpha1
flyteWorkflow.AcceptedAt = &acceptAtWrapper

// Add finalizer
flyteWorkflow.Finalizers = append(flyteWorkflow.Finalizers, "flyte-finalizer")
_ = controllerutil.AddFinalizer(flyteWorkflow, controller.Finalizer)

// add permissions from auth and security context. Adding permissions from auth would be removed once all clients
// have migrated over to security context
Expand Down
3 changes: 2 additions & 1 deletion flyteadmin/pkg/workflowengine/impl/prepare_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller"
)

const testRole = "role"
Expand Down Expand Up @@ -254,5 +255,5 @@ func TestPrepareFlyteWorkflow(t *testing.T) {
OutputLocationPrefix: "s3://bucket/key",
},
})
assert.Equal(t, flyteWorkflow.Finalizers, []string{"flyte-finalizer"})
assert.Equal(t, flyteWorkflow.Finalizers, []string{controller.Finalizer})
}
2 changes: 1 addition & 1 deletion flyteidl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22
require (
github.com/flyteorg/flyte/flytestdlib v0.0.0-00010101000000-000000000000
github.com/go-test/deep v1.0.7
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/golang/protobuf v1.5.3
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
Expand Down Expand Up @@ -55,7 +56,6 @@ require (
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
Expand Down
42 changes: 24 additions & 18 deletions flyteplugins/go/tasks/plugins/array/k8s/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/flyteorg/flyte/flyteplugins/go/tasks/errors"
pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
Expand All @@ -30,8 +31,11 @@
ErrBuildPodTemplate stdErrors.ErrorCode = "POD_TEMPLATE_FAILED"
ErrReplaceCmdTemplate stdErrors.ErrorCode = "CMD_TEMPLATE_FAILED"
FlyteK8sArrayIndexVarName string = "FLYTE_K8S_ARRAY_INDEX"
finalizer string = "flyte/array"
JobIndexVarName string = "BATCH_JOB_ARRAY_INDEX_VAR_NAME"
finalizer string = "flyte.lyft.com/finalizer-array"
// Old non-domain-qualified finalizer for backwards compatibility
// This should eventually be removed
oldFinalizer string = "flyte/array"
JobIndexVarName string = "BATCH_JOB_ARRAY_INDEX_VAR_NAME"
)

var (
Expand Down Expand Up @@ -69,8 +73,7 @@
}

if k8sPluginCfg.InjectFinalizer {
f := append(pod.GetFinalizers(), finalizer)
pod.SetFinalizers(f)
_ = controllerutil.AddFinalizer(pod, finalizer)

Check warning on line 76 in flyteplugins/go/tasks/plugins/array/k8s/subtask.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/array/k8s/subtask.go#L76

Added line #L76 was not covered by tests
}

if len(cfg.DefaultScheduler) > 0 {
Expand Down Expand Up @@ -134,25 +137,28 @@
}

if err != nil && !isK8sObjectNotExists(err) {
logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v",
logger.Warningf(ctx, "Failed to clear finalizer for Resource with name: %v/%v. Error: %v",

Check warning on line 140 in flyteplugins/go/tasks/plugins/array/k8s/subtask.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/array/k8s/subtask.go#L140

Added line #L140 was not covered by tests
resourceToFinalize.GetNamespace(), resourceToFinalize.GetName(), err)
return err
}

return nil
}

// clearFinalizers removes finalizers (if they exist) from the k8s resource
func clearFinalizers(ctx context.Context, o client.Object, kubeClient pluginsCore.KubeClient) error {
if len(o.GetFinalizers()) > 0 {
o.SetFinalizers([]string{})
// clearFinalizer removes the Flyte finalizer (if it exists) from the k8s resource
func clearFinalizer(ctx context.Context, o client.Object, kubeClient pluginsCore.KubeClient) error {
// Checking for the old finalizer too for backwards compatibility. This should eventually be removed
// Go does short-circuiting so we have to make sure both are removed
finalizerRemoved := controllerutil.RemoveFinalizer(o, finalizer)
oldFinalizerRemoved := controllerutil.RemoveFinalizer(o, oldFinalizer)
if finalizerRemoved || oldFinalizerRemoved {

Check warning on line 154 in flyteplugins/go/tasks/plugins/array/k8s/subtask.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/array/k8s/subtask.go#L149-L154

Added lines #L149 - L154 were not covered by tests
err := kubeClient.GetClient().Update(ctx, o)
if err != nil && !isK8sObjectNotExists(err) {
logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v", o.GetNamespace(), o.GetName(), err)
logger.Warningf(ctx, "Failed to clear finalizer for Resource with name: %v/%v. Error: %v", o.GetNamespace(), o.GetName(), err)

Check warning on line 157 in flyteplugins/go/tasks/plugins/array/k8s/subtask.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/array/k8s/subtask.go#L157

Added line #L157 was not covered by tests
return err
}
} else {
logger.Debugf(ctx, "Finalizers are already empty for Resource with name: %v/%v", o.GetNamespace(), o.GetName())
logger.Debugf(ctx, "Finalizer is already cleared for Resource with name: %v/%v", o.GetNamespace(), o.GetName())

Check warning on line 161 in flyteplugins/go/tasks/plugins/array/k8s/subtask.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/array/k8s/subtask.go#L161

Added line #L161 was not covered by tests
}
return nil
}
Expand Down Expand Up @@ -211,7 +217,7 @@
}

// finalizeSubtask performs operations to complete the k8s pod defined by the SubTaskExecutionContext
// and Config. These may include removing finalizers and deleting the k8s resource.
// and Config. These may include removing finalizer and deleting the k8s resource.
func finalizeSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Config, kubeClient pluginsCore.KubeClient) error {
errs := stdErrors.ErrorCollection{}
var pod *v1.Pod
Expand All @@ -231,10 +237,10 @@
nsName = k8stypes.NamespacedName{Namespace: pod.GetNamespace(), Name: pod.GetName()}
}

// In InjectFinalizer is on, it means we may have added the finalizers when we launched this resource. Attempt to
// clear them to allow the object to be deleted/garbage collected. If InjectFinalizer was turned on (through config)
// In InjectFinalizer is on, it means we may have added the finalizer when we launched this resource. Attempt to
// clear it to allow the object to be deleted/garbage collected. If InjectFinalizer was turned on (through config)
// after the resource was created, we will not find any finalizers to clear and the object may have already been
// deleted at this point. Therefore, account for these cases and do not consider them errors.
// deleted at this point. Therefore, account for these cases and do not consider the errors.
if k8sPluginCfg.InjectFinalizer {
// Attempt to get resource from informer cache, if not found, retrieve it from API server.
if err := kubeClient.GetClient().Get(ctx, nsName, pod); err != nil {
Expand All @@ -250,7 +256,7 @@
// This must happen after sending admin event. It's safe against partial failures because if the event failed, we will
// simply retry in the next round. If the event succeeded but this failed, we will try again the next round to send
// the same event (idempotent) and then come here again...
err := clearFinalizers(ctx, pod, kubeClient)
err := clearFinalizer(ctx, pod, kubeClient)

Check warning on line 259 in flyteplugins/go/tasks/plugins/array/k8s/subtask.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/array/k8s/subtask.go#L259

Added line #L259 was not covered by tests
if err != nil {
errs.Append(err)
}
Expand Down Expand Up @@ -308,10 +314,10 @@
return pluginsCore.PhaseInfoUndefined, err
}

if !phaseInfo.Phase().IsTerminal() && o.GetDeletionTimestamp() != nil {
if !phaseInfo.Phase().IsTerminal() && !o.GetDeletionTimestamp().IsZero() {
// If the object has been deleted, that is, it has a deletion timestamp, but is not in a terminal state, we should
// mark the task as a retryable failure. We've seen this happen when a kubelet disappears - all pods running on
// the node are marked with a deletionTimestamp, but our finalizers prevent the pod from being deleted.
// the node are marked with a deletionTimestamp, but our finalizer prevents the pod from being deleted.

Check warning on line 320 in flyteplugins/go/tasks/plugins/array/k8s/subtask.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/array/k8s/subtask.go#L320

Added line #L320 was not covered by tests
// This can also happen when a user deletes a Pod directly.
failureReason := fmt.Sprintf("object [%s] terminated in the background, manually", nsName.String())
return pluginsCore.PhaseInfoSystemRetryableFailure("UnexpectedObjectDeletion", failureReason, nil), nil
Expand Down
5 changes: 5 additions & 0 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ import (
)

const (
// Finalizer is the global and domain-qualified Flyte finalizer
Finalizer = "flyte.lyft.com/finalizer"
trutx marked this conversation as resolved.
Show resolved Hide resolved
// OldFinalizer is the old non-domain-qualified finalizer, kept for backwards compatibility
// This should eventually be removed
OldFinalizer = "flyte-finalizer"
resourceLevelMonitorCycleDuration = 5 * time.Second
missing = "missing"
podDefaultNamespace = "flyte"
Expand Down
36 changes: 0 additions & 36 deletions flytepropeller/pkg/controller/finalizer.go

This file was deleted.

70 changes: 0 additions & 70 deletions flytepropeller/pkg/controller/finalizer_test.go

This file was deleted.

Loading
Loading