Skip to content

Commit

Permalink
support adding orphaned pods to existing environments (#328)
Browse files Browse the repository at this point in the history
## Overview
This PR adds support for managing orphaned pods. This is an extreme corner-case, that may be impossible, but will ensure we do not orphan pods when GCing fasttask environments.

## Test Plan
This was tested locally and added unit tests.

## Rollout Plan (if applicable)
May be rolled out immediately.

## Upstream Changes
Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [ ] To be upstreamed to OSS

## Issue
https://linear.app/unionai/issue/COR-993/support-adding-pods-to-an-orphaned-environment

## Checklist
* [x] Added tests
* [ ] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [x] Updated documentation
  • Loading branch information
hamersaw authored Jun 21, 2024
1 parent 76e5a02 commit 0f50412
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 3 deletions.
23 changes: 20 additions & 3 deletions fasttask/plugin/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,15 +513,27 @@ func (i *InMemoryEnvBuilder) detectOrphanedEnvironments(ctx context.Context, k8s
defer i.lock.Unlock()

orphanedEnvironments := make(map[string]*environment, 0)
orphanedPods := make(map[string][]string, 0)
for _, pod := range podList.Items {
// if environment exists we do not need to process
// check if environment already exists or pod is accounted for
environmentID, labelExists := pod.Labels[EXECUTION_ENV_ID]
if !labelExists {
continue
}

_, environmentExists := i.environments[environmentID]
if environmentExists {
if environment, environmentExists := i.environments[environmentID]; environmentExists {
found := false
for _, replica := range environment.replicas {
if replica == pod.Name {
found = true
break
}
}

if !found {
orphanedPods[environmentID] = append(orphanedPods[environmentID], pod.Name)
}

continue
}

Expand Down Expand Up @@ -582,6 +594,11 @@ func (i *InMemoryEnvBuilder) detectOrphanedEnvironments(ctx context.Context, k8s
i.environments[environmentID] = orphanedEnvironment
}

for environmentID, podNames := range orphanedPods {
logger.Infof(ctx, "detected orphaned pod(s) '%v' for environment '%s'", podNames, environmentID)
i.environments[environmentID].replicas = append(i.environments[environmentID].replicas, podNames...)
}

return nil
}

Expand Down
11 changes: 11 additions & 0 deletions fasttask/plugin/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,17 @@ func TestDetectOrphanedEnvironments(t *testing.T) {
expectedEnvironmentCount: 1,
expectedReplicaCount: 2,
},
{
name: "ExistingOrphanedEnvironment",
environments: map[string]*environment{
"foo": &environment{
replicas: []string{"bar"},
state: ORPHANED,
},
},
expectedEnvironmentCount: 1,
expectedReplicaCount: 2,
},
}

for _, test := range tests {
Expand Down

0 comments on commit 0f50412

Please sign in to comment.