Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Merge 703cd32 into 3379031
Browse files Browse the repository at this point in the history
  • Loading branch information
EngHabu authored Jun 2, 2023
2 parents 3379031 + 703cd32 commit 9c1f358
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 9 deletions.
42 changes: 33 additions & 9 deletions pkg/clusterresource/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ func prepareDynamicCreate(target executioncluster.ExecutionTarget, config string
// a) read template file
// b) substitute templatized variables with their resolved values
// 2. create the resource on the kubernetes cluster and cache successful outcomes
func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, domain *admin.Domain, namespace NamespaceName,
templateValues, customTemplateValues templateValuesType) error {
func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, domain *admin.Domain,
namespace NamespaceName, templateValues, customTemplateValues templateValuesType) (ResourceSyncStats, error) {
templateDir := c.config.ClusterResourceConfiguration().GetTemplatePath()
if c.lastAppliedTemplateDir != templateDir {
// Invalidate all caches
Expand All @@ -283,12 +283,13 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project,
}
templateFiles, err := ioutil.ReadDir(templateDir)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal,
return ResourceSyncStats{}, errors.NewFlyteAdminErrorf(codes.Internal,
"Failed to read config template dir [%s] for namespace [%s] with err: %v",
namespace, templateDir, err)
}

collectedErrs := make([]error, 0)
stats := ResourceSyncStats{}
for _, templateFile := range templateFiles {
templateFileName := templateFile.Name()
if filepath.Ext(templateFileName) != ".yaml" {
Expand All @@ -309,6 +310,7 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project,
if c.templateAlreadyApplied(namespace, templateFileName, checksum) {
// nothing to do.
logger.Debugf(ctx, "syncing namespace [%s]: templateFile [%s] already applied, nothing to do.", namespace, templateFile.Name())
stats.AlreadyThere++
continue
}

Expand All @@ -320,6 +322,7 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project,
"into a dynamic unstructured mapping with err: %v, manifest: %v", namespace, err, k8sManifest)
collectedErrs = append(collectedErrs, err)
c.metrics.KubernetesResourcesCreateErrors.Inc()
stats.Errored++
continue
}

Expand All @@ -341,6 +344,7 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project,
logger.Warningf(ctx, "Failed to get current resource from server [%+v] in namespace [%s] with err: %v",
dynamicObj.obj.GetKind(), namespace, err)
collectedErrs = append(collectedErrs, err)
stats.Errored++
continue
}

Expand All @@ -350,6 +354,7 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project,
logger.Warningf(ctx, "Failed to marshal resource [%+v] in namespace [%s] to json with err: %v",
dynamicObj.obj.GetKind(), namespace, err)
collectedErrs = append(collectedErrs, err)
stats.Errored++
continue
}

Expand All @@ -359,12 +364,14 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project,
logger.Warningf(ctx, "Failed to create patch for resource [%+v] in namespace [%s] err: %v",
dynamicObj.obj.GetKind(), namespace, err)
collectedErrs = append(collectedErrs, err)
stats.Errored++
continue
}

if string(patch) == noChange {
logger.Infof(ctx, "Resource [%+v] in namespace [%s] is not modified",
dynamicObj.obj.GetKind(), namespace)
stats.AlreadyThere++
continue
}

Expand All @@ -375,9 +382,11 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project,
logger.Warningf(ctx, "Failed to patch resource [%+v] in namespace [%s] with err: %v",
dynamicObj.obj.GetKind(), namespace, err)
collectedErrs = append(collectedErrs, err)
stats.Errored++
continue
}

stats.Updated++
logger.Debugf(ctx, "Successfully updated resource [%+v] in namespace [%s]",
dynamicObj.obj.GetKind(), namespace)
c.setTemplateChecksum(namespace, templateFileName, checksum)
Expand All @@ -389,9 +398,11 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project,
err := errors.NewFlyteAdminErrorf(codes.Internal,
"Failed to create kubernetes object from config template [%s] for namespace [%s] with err: %v",
templateFileName, namespace, err)
stats.Errored++
collectedErrs = append(collectedErrs, err)
}
} else {
stats.Created++
logger.Debugf(ctx, "Created resource [%+v] for namespace [%s] in kubernetes",
dynamicObj.obj.GetKind(), namespace)
c.metrics.KubernetesResourcesCreated.Inc()
Expand All @@ -400,9 +411,10 @@ func (c *controller) syncNamespace(ctx context.Context, project *admin.Project,
}
}
if len(collectedErrs) > 0 {
return errors.NewCollectedFlyteAdminError(codes.Internal, collectedErrs)
return stats, errors.NewCollectedFlyteAdminError(codes.Internal, collectedErrs)
}
return nil

return stats, nil
}

var metadataAccessor = meta.NewAccessor()
Expand Down Expand Up @@ -573,40 +585,52 @@ func (c *controller) Sync(ctx context.Context) error {
errs = append(errs, err)
}

stats := ResourceSyncStats{}

for _, project := range projects.Projects {
for _, domain := range project.Domains {
namespace := common.GetNamespaceName(c.config.NamespaceMappingConfiguration().GetNamespaceTemplate(), project.Id, domain.Name)
customTemplateValues, err := c.getCustomTemplateValues(
ctx, project.Id, domain.Id, domainTemplateValues[domain.Id])
if err != nil {
logger.Warningf(ctx, "Failed to get custom template values for %s with err: %v", namespace, err)
logger.Errorf(ctx, "Failed to get custom template values for %s with err: %v", namespace, err)
errs = append(errs, err)
}
err = c.syncNamespace(ctx, project, domain, namespace, templateValues, customTemplateValues)

newStats, err := c.syncNamespace(ctx, project, domain, namespace, templateValues, customTemplateValues)
if err != nil {
logger.Warningf(ctx, "Failed to create cluster resources for namespace [%s] with err: %v", namespace, err)
c.metrics.ResourceAddErrors.Inc()
errs = append(errs, err)
} else {
c.metrics.ResourcesAdded.Inc()
logger.Debugf(ctx, "Successfully created kubernetes resources for [%s]", namespace)
stats.Add(newStats)
}

logger.Infof(ctx, "Completed cluster resource creation loop for namespace [%s] with stats: [%+v]", namespace, newStats)
}
}

logger.Infof(ctx, "Completed cluster resource creation loop with stats: [%+v]", stats)

if len(errs) > 0 {
return errors.NewCollectedFlyteAdminError(codes.Internal, errs)
}

return nil
}

func (c *controller) Run() {
ctx := context.Background()
logger.Debugf(ctx, "Running ClusterResourceController")
logger.Info(ctx, "Running ClusterResourceController")
interval := c.config.ClusterResourceConfiguration().GetRefreshInterval()
wait.Forever(func() {
err := c.Sync(ctx)
if err != nil {
logger.Warningf(ctx, "Failed cluster resource creation loop with: %v", err)
logger.Errorf(ctx, "Failed cluster resource creation loop with: %v", err)
} else {
logger.Infof(ctx, "Successfully completed cluster resource creation loop")
}
}, interval)
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/clusterresource/sync_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package clusterresource

// ResourceSyncStats is a simple struct to track the number of resources created, updated, already there, and errored
type ResourceSyncStats struct {
Created int
Updated int
AlreadyThere int
Errored int
}

// Add adds the values of the other ResourceSyncStats to this one
func (m *ResourceSyncStats) Add(other ResourceSyncStats) {
m.Created += other.Created
m.Updated += other.Updated
m.AlreadyThere += other.AlreadyThere
m.Errored += other.Errored
}

0 comments on commit 9c1f358

Please sign in to comment.