Skip to content

Commit

Permalink
Use checksums to track applied state of cluster resource templates (f…
Browse files Browse the repository at this point in the history
…lyteorg#496)

* Use checksums to track applied state of cluster resource templates

Signed-off-by: Andrew Dye <[email protected]>

* Fix lints

Signed-off-by: Andrew Dye <[email protected]>

Signed-off-by: Andrew Dye <[email protected]>
  • Loading branch information
andrewwdye authored Nov 29, 2022
1 parent 495ef5c commit fa0d835
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 96 deletions.
102 changes: 50 additions & 52 deletions flyteadmin/pkg/clusterresource/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clusterresource

import (
"context"
"crypto/md5" // #nosec
"encoding/json"
"fmt"
"io/ioutil"
Expand All @@ -10,56 +11,46 @@ import (
"path/filepath"
"runtime/debug"
"strings"
"time"

impl2 "github.com/flyteorg/flyteadmin/pkg/clusterresource/impl"
"github.com/flyteorg/flyteadmin/pkg/config"
"github.com/flyteorg/flyteadmin/pkg/executioncluster/impl"
"github.com/flyteorg/flyteadmin/pkg/manager/impl/resources"
"github.com/flyteorg/flyteadmin/pkg/repositories"
errors2 "github.com/flyteorg/flyteadmin/pkg/repositories/errors"
admin2 "github.com/flyteorg/flyteidl/clients/go/admin"

"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/flyteorg/flyteadmin/pkg/clusterresource/interfaces"
"github.com/flyteorg/flyteadmin/pkg/executioncluster"
executionclusterIfaces "github.com/flyteorg/flyteadmin/pkg/executioncluster/interfaces"
"github.com/flyteorg/flyteadmin/pkg/runtime"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/yaml"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/jsonmergepatch"
"k8s.io/apimachinery/pkg/util/mergepatch"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes/scheme"

"k8s.io/apimachinery/pkg/api/meta"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer/yaml"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/restmapper"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

impl2 "github.com/flyteorg/flyteadmin/pkg/clusterresource/impl"
"github.com/flyteorg/flyteadmin/pkg/clusterresource/interfaces"
"github.com/flyteorg/flyteadmin/pkg/common"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"

"k8s.io/apimachinery/pkg/util/wait"

"github.com/prometheus/client_golang/prometheus"

"github.com/flyteorg/flytestdlib/logger"
"google.golang.org/grpc/codes"

"github.com/flyteorg/flyteadmin/pkg/config"
"github.com/flyteorg/flyteadmin/pkg/errors"
"github.com/flyteorg/flyteadmin/pkg/executioncluster"
"github.com/flyteorg/flyteadmin/pkg/executioncluster/impl"
executionclusterIfaces "github.com/flyteorg/flyteadmin/pkg/executioncluster/interfaces"
"github.com/flyteorg/flyteadmin/pkg/manager/impl/resources"
"github.com/flyteorg/flyteadmin/pkg/repositories"
errors2 "github.com/flyteorg/flyteadmin/pkg/repositories/errors"
"github.com/flyteorg/flyteadmin/pkg/runtime"
runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
admin2 "github.com/flyteorg/flyteidl/clients/go/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
)

const namespaceVariable = "namespace"
Expand Down Expand Up @@ -92,8 +83,8 @@ type controllerMetrics struct {

type FileName = string
type NamespaceName = string
type LastModTimeCache = map[FileName]time.Time
type NamespaceCache = map[NamespaceName]LastModTimeCache
type TemplateChecksums = map[FileName][16]byte
type NamespaceCache = map[NamespaceName]TemplateChecksums

type templateValuesType = map[string]string

Expand All @@ -108,19 +99,28 @@ type controller struct {
listTargets executionclusterIfaces.ListTargetsInterface
}

func (c *controller) templateAlreadyApplied(namespace NamespaceName, templateFile os.FileInfo) bool {
// templateAlreadyApplied checks if there is an applied template with the same checksum
func (c *controller) templateAlreadyApplied(namespace NamespaceName, templateFilename string, checksum [16]byte) bool {
namespacedAppliedTemplates, ok := c.appliedTemplates[namespace]
if !ok {
// There is no record of this namespace altogether.
return false
}
timestamp, ok := namespacedAppliedTemplates[templateFile.Name()]
appliedChecksum, ok := namespacedAppliedTemplates[templateFilename]
if !ok {
// There is no record of this file having ever been applied.
return false
}
// The applied template file could have been modified, in which case we will need to apply it once more.
return timestamp.Equal(templateFile.ModTime())
// Check if the applied template matches the new one
return appliedChecksum == checksum
}

// setTemplateChecksum records the latest checksum for the template file
func (c *controller) setTemplateChecksum(namespace NamespaceName, templateFilename string, checksum [16]byte) {
if _, ok := c.appliedTemplates[namespace]; !ok {
c.appliedTemplates[namespace] = make(TemplateChecksums)
}
c.appliedTemplates[namespace][templateFilename] = checksum
}

// Given a map of templatized variable names -> data source, this function produces an output that maps the same
Expand Down Expand Up @@ -298,23 +298,21 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project,
continue
}

if c.templateAlreadyApplied(namespace, templateFile) {
// nothing to do.
logger.Debugf(ctx, "syncing namespace [%s]: templateFile [%s] already applied, nothing to do.", namespace, templateFile.Name())
continue
}

// 1) create resource from template:
// 1) create resource from template and check if already applied
k8sManifest, err := c.createResourceFromTemplate(ctx, templateDir, templateFileName, project, domain, namespace, templateValues, customTemplateValues)
if err != nil {
collectedErrs = append(collectedErrs, err)
continue
}

// 2) create the resource on the kubernetes cluster and cache successful outcomes
if _, ok := c.appliedTemplates[namespace]; !ok {
c.appliedTemplates[namespace] = make(LastModTimeCache)
checksum := md5.Sum([]byte(k8sManifest)) // #nosec
if c.templateAlreadyApplied(namespace, templateFileName, checksum) {
// nothing to do.
logger.Debugf(ctx, "syncing namespace [%s]: templateFile [%s] already applied, nothing to do.", namespace, templateFile.Name())
continue
}

// 2) create the resource on the kubernetes cluster and cache successful outcomes
for _, target := range c.listTargets.GetValidTargets() {
dynamicObj, err := prepareDynamicCreate(*target, k8sManifest)
if err != nil {
Expand Down Expand Up @@ -382,7 +380,7 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project,

logger.Debugf(ctx, "Successfully updated resource [%+v] in namespace [%s]",
dynamicObj.obj.GetKind(), namespace)
c.appliedTemplates[namespace][templateFile.Name()] = templateFile.ModTime()
c.setTemplateChecksum(namespace, templateFileName, checksum)
} else {
// Some error other than AlreadyExists was raised when we tried to Create the k8s object.
c.metrics.KubernetesResourcesCreateErrors.Inc()
Expand All @@ -397,7 +395,7 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project,
logger.Debugf(ctx, "Created resource [%+v] for namespace [%s] in kubernetes",
dynamicObj.obj.GetKind(), namespace)
c.metrics.KubernetesResourcesCreated.Inc()
c.appliedTemplates[namespace][templateFile.Name()] = templateFile.ModTime()
c.setTemplateChecksum(namespace, templateFileName, checksum)
}
}
}
Expand Down Expand Up @@ -649,7 +647,7 @@ func NewClusterResourceController(adminDataProvider interfaces.FlyteAdminDataPro
listTargets: listTargets,
poller: make(chan struct{}),
metrics: newMetrics(scope),
appliedTemplates: make(map[string]map[string]time.Time),
appliedTemplates: make(map[string]TemplateChecksums),
}
}

Expand Down
54 changes: 10 additions & 44 deletions flyteadmin/pkg/clusterresource/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package clusterresource

import (
"context"
"crypto/md5" // #nosec
"io/ioutil"
"os"
"testing"
"time"

"github.com/flyteorg/flyteadmin/pkg/errors"
"google.golang.org/grpc/codes"
Expand All @@ -27,57 +27,23 @@ const domain = "domain-bar"

var testScope = mockScope.NewTestScope()

type mockFileInfo struct {
name string
modTime time.Time
}

func (i *mockFileInfo) Name() string {
return i.name
}

func (i *mockFileInfo) Size() int64 {
return 0
}

func (i *mockFileInfo) Mode() os.FileMode {
return os.ModeExclusive
}

func (i *mockFileInfo) ModTime() time.Time {
return i.modTime
}

func (i *mockFileInfo) IsDir() bool {
return false
}

func (i *mockFileInfo) Sys() interface{} {
return nil
}

func TestTemplateAlreadyApplied(t *testing.T) {
const namespace = "namespace"
const fileName = "fileName"
var lastModifiedTime = time.Now()
testController := controller{
metrics: newMetrics(testScope),
}
mockFile := mockFileInfo{
name: fileName,
modTime: lastModifiedTime,
}
assert.False(t, testController.templateAlreadyApplied(namespace, &mockFile))

testController.appliedTemplates = make(map[string]map[string]time.Time)
testController.appliedTemplates[namespace] = make(map[string]time.Time)
assert.False(t, testController.templateAlreadyApplied(namespace, &mockFile))
checksum1 := md5.Sum([]byte("template1")) // #nosec
checksum2 := md5.Sum([]byte("template2")) // #nosec
assert.False(t, testController.templateAlreadyApplied(namespace, fileName, checksum1))

testController.appliedTemplates[namespace][fileName] = lastModifiedTime.Add(-10 * time.Minute)
assert.False(t, testController.templateAlreadyApplied(namespace, &mockFile))
testController.appliedTemplates = make(map[string]TemplateChecksums)
testController.setTemplateChecksum(namespace, fileName, checksum1)
assert.True(t, testController.templateAlreadyApplied(namespace, fileName, checksum1))
assert.False(t, testController.templateAlreadyApplied(namespace, fileName, checksum2))

testController.appliedTemplates[namespace][fileName] = lastModifiedTime
assert.True(t, testController.templateAlreadyApplied(namespace, &mockFile))
testController.setTemplateChecksum(namespace, fileName, checksum2)
assert.True(t, testController.templateAlreadyApplied(namespace, fileName, checksum2))
}

func TestPopulateTemplateValues(t *testing.T) {
Expand Down

0 comments on commit fa0d835

Please sign in to comment.