Skip to content

Commit

Permalink
Use StrategicMergePatch when updating existing cluster resources (fly…
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan authored Dec 12, 2019
1 parent d63313a commit d1b921d
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 3 deletions.
13 changes: 10 additions & 3 deletions flyteadmin/pkg/clusterresource/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (

"github.com/lyft/flyteadmin/pkg/executioncluster/interfaces"

"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/lyft/flyteadmin/pkg/common"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"

Expand Down Expand Up @@ -285,18 +283,27 @@ func (c *controller) syncNamespace(ctx context.Context, namespace NamespaceName,
}
for _, target := range c.executionCluster.GetAllValidTargets() {
k8sObjCopy := k8sObj.DeepCopyObject()
logger.Debugf(ctx, "Attempting to create resource [%+v](%+v) in cluster [%v] for namespace [%s]",
k8sObj.GetObjectKind().GroupVersionKind().Kind, k8sObj, target.ID, namespace)
err = target.Client.Create(ctx, k8sObjCopy)
if err != nil {
if k8serrors.IsAlreadyExists(err) {
logger.Debugf(ctx, "Resource [%+v] in namespace [%s] already exists - attempting update instead",
k8sObj.GetObjectKind().GroupVersionKind().Kind, namespace)
c.metrics.AppliedTemplateExists.Inc()
err = target.Client.Patch(ctx, k8sObjCopy, client.MergeFrom(k8sObjCopy))
// Use a strategic-merge-patch to mimic `kubectl apply` behavior.
// Kubectl defaults to using the StrategicMergePatch strategy.
// However the controller-runtime only has an implementation for MergePatch which we were formerly
// using but failed to actually always merge resources in the Patch call.
err = target.Client.Patch(ctx, k8sObjCopy, StrategicMergeFrom(k8sObjCopy))
if err != nil {
c.metrics.TemplateUpdateErrors.Inc()
logger.Infof(ctx, "Failed to update resource [%+v] in namespace [%s] with err :%v",
k8sObj.GetObjectKind().GroupVersionKind().Kind, namespace, err)
collectedErrs = append(collectedErrs, err)
} else {
logger.Debugf(ctx, "Successfully updated resource [%+v] in namespace [%s]",
k8sObj.GetObjectKind().GroupVersionKind().Kind, namespace)
}
c.appliedTemplates[namespace][templateFile.Name()] = templateFile.ModTime()
} else {
Expand Down
38 changes: 38 additions & 0 deletions flyteadmin/pkg/clusterresource/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package clusterresource

import (
jsonpatch "github.com/evanphx/json-patch"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type strategicMergeFromPatch struct {
from runtime.Object
}

// Type implements patch.
func (s *strategicMergeFromPatch) Type() types.PatchType {
return types.StrategicMergePatchType
}

// Data implements Patch.
func (s *strategicMergeFromPatch) Data(obj runtime.Object) ([]byte, error) {
originalJSON, err := json.Marshal(s.from)
if err != nil {
return nil, err
}

modifiedJSON, err := json.Marshal(obj)
if err != nil {
return nil, err
}

return jsonpatch.CreateMergePatch(originalJSON, modifiedJSON)
}

// StrategicMergeFrom creates a Patch using the strategic-merge-patch strategy with the given object as base.
func StrategicMergeFrom(obj runtime.Object) client.Patch {
return &strategicMergeFromPatch{obj}
}

0 comments on commit d1b921d

Please sign in to comment.