Skip to content

Commit

Permalink
Adding Status function to ExecutionEnvClient to retrieve fasttask wor…
Browse files Browse the repository at this point in the history
…ker status' (#331)

* adding Status function to ExecutionEnvClient to check if all fasttask replicas have failed

Signed-off-by: Daniel Rammer <[email protected]>

* correctly reporting reasons

Signed-off-by: Daniel Rammer <[email protected]>

* moving DemystifyPodStatus to PodPlugin to maintain current unit testing

Signed-off-by: Daniel Rammer <[email protected]>

* cleanup and adding metric

Signed-off-by: Daniel Rammer <[email protected]>

* fixing fasttask unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* docs

Signed-off-by: Daniel Rammer <[email protected]>

* spelling is hard

Signed-off-by: Daniel Rammer <[email protected]>

* lint

Signed-off-by: Daniel Rammer <[email protected]>

* and again

Signed-off-by: Daniel Rammer <[email protected]>

* lots of things are hard - like indenting

Signed-off-by: Daniel Rammer <[email protected]>

* updated comment

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Jun 21, 2024
1 parent 3090f36 commit bbc37c7
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 72 deletions.
38 changes: 38 additions & 0 deletions fasttask/plugin/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,44 @@ func (i *InMemoryEnvBuilder) Create(ctx context.Context, executionEnvID string,
return env.extant, nil
}

// Status returns the status of the environment with the given execution environment ID. This
// includes the details of each pod in the environment replica set.
func (i *InMemoryEnvBuilder) Status(ctx context.Context, executionEnvID string) (interface{}, error) {
i.lock.Lock()
defer i.lock.Unlock()

// check if environment exists
environment, exists := i.environments[executionEnvID]
if !exists {
return nil, nil
}

// retrieve pod details from kubeclient cache
statuses := make(map[string]*v1.Pod, 0)

podTemplateSpec := &v1.PodTemplateSpec{}
if err := json.Unmarshal(environment.spec.GetPodTemplateSpec(), podTemplateSpec); err != nil {
return nil, flyteerrors.Errorf(flyteerrors.BadTaskSpecification,
"unable to unmarshal PodTemplateSpec [%v], Err: [%v]", environment.spec.GetPodTemplateSpec(), err.Error())
}

for _, podName := range environment.replicas {
pod := v1.Pod{}
err := i.kubeClient.GetCache().Get(ctx, types.NamespacedName{
Name: podName,
Namespace: podTemplateSpec.Namespace,
}, &pod)

if k8serrors.IsNotFound(err) || k8serrors.IsGone(err) {
statuses[podName] = nil
} else {
statuses[podName] = &pod
}
}

return statuses, nil
}

// createPod creates a new pod for the given execution environment ID and pod name. The pod is
// created using the given FastTaskEnvironmentSpec.
func (i *InMemoryEnvBuilder) createPod(ctx context.Context, fastTaskEnvironmentSpec *pb.FastTaskEnvironmentSpec, executionEnvID, podName string) error {
Expand Down
42 changes: 20 additions & 22 deletions fasttask/plugin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,32 @@ import (

var (
defaultConfig = &Config{
CallbackURI: "http://host.k3d.internal:15605",
Endpoint: "0.0.0.0:15605",
EnvDetectOrphanInterval: config.Duration{Duration: time.Second * 60},
EnvGCInterval: config.Duration{Duration: time.Second * 5},
EnvRepairInterval: config.Duration{Duration: time.Second * 10},
GracePeriodStatusNotFound: config.Duration{Duration: time.Second * 90},
GracePeriodWorkersUnavailable: config.Duration{Duration: time.Second * 30},
HeartbeatBufferSize: 512,
NonceLength: 12,
TaskStatusBufferSize: 512,
AdditionalWorkerArgs: []string{},
CallbackURI: "http://host.k3d.internal:15605",
Endpoint: "0.0.0.0:15605",
EnvDetectOrphanInterval: config.Duration{Duration: time.Second * 60},
EnvGCInterval: config.Duration{Duration: time.Second * 5},
EnvRepairInterval: config.Duration{Duration: time.Second * 10},
GracePeriodStatusNotFound: config.Duration{Duration: time.Second * 90},
HeartbeatBufferSize: 512,
NonceLength: 12,
TaskStatusBufferSize: 512,
AdditionalWorkerArgs: []string{},
}

configSection = pluginsConfig.MustRegisterSubSection("fasttask", defaultConfig)
)

type Config struct {
CallbackURI string `json:"callback-uri" pflag:",Fasttask gRPC service URI that fasttask workers will connect to."`
Endpoint string `json:"endpoint" pflag:",Fasttask gRPC service endpoint."`
EnvDetectOrphanInterval config.Duration `json:"env-detect-orphan-interval" pflag:",Frequency that orphaned environments detection is performed."`
EnvGCInterval config.Duration `json:"env-gc-interval" pflag:",Frequency that environments are GCed in case of TTL expirations."`
EnvRepairInterval config.Duration `json:"env-repair-interval" pflag:",Frequency that environments are repaired in case of external modifications (ex. pod deletion)."`
GracePeriodStatusNotFound config.Duration `json:"grace-period-status-not-found" pflag:",The grace period for a task status to be reported before the task is considered failed."`
GracePeriodWorkersUnavailable config.Duration `json:"grace-period-workers-unavailable" pflag:",The grace period for a worker to become available before the task is considered failed."`
HeartbeatBufferSize int `json:"heartbeat-buffer-size" pflag:",The size of the heartbeat buffer for each worker."`
NonceLength int `json:"nonce-length" pflag:",The length of the nonce value to uniquely link a fasttask replica to the environment instance, ensuring fast turnover of environments regardless of cache freshness."`
TaskStatusBufferSize int `json:"task-status-buffer-size" pflag:",The size of the task status buffer for each task."`
AdditionalWorkerArgs []string `json:"additional-worker-args" pflag:",Additional arguments to pass to the fasttask worker binary."`
CallbackURI string `json:"callback-uri" pflag:",Fasttask gRPC service URI that fasttask workers will connect to."`
Endpoint string `json:"endpoint" pflag:",Fasttask gRPC service endpoint."`
EnvDetectOrphanInterval config.Duration `json:"env-detect-orphan-interval" pflag:",Frequency that orphaned environments detection is performed."`
EnvGCInterval config.Duration `json:"env-gc-interval" pflag:",Frequency that environments are GCed in case of TTL expirations."`
EnvRepairInterval config.Duration `json:"env-repair-interval" pflag:",Frequency that environments are repaired in case of external modifications (ex. pod deletion)."`
GracePeriodStatusNotFound config.Duration `json:"grace-period-status-not-found" pflag:",The grace period for a task status to be reported before the task is considered failed."`
HeartbeatBufferSize int `json:"heartbeat-buffer-size" pflag:",The size of the heartbeat buffer for each worker."`
NonceLength int `json:"nonce-length" pflag:",The length of the nonce value to uniquely link a fasttask replica to the environment instance, ensuring fast turnover of environments regardless of cache freshness."`
TaskStatusBufferSize int `json:"task-status-buffer-size" pflag:",The size of the task status buffer for each task."`
AdditionalWorkerArgs []string `json:"additional-worker-args" pflag:",Additional arguments to pass to the fasttask worker binary."`
}

func GetConfig() *Config {
Expand Down
1 change: 0 additions & 1 deletion fasttask/plugin/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 0 additions & 14 deletions fasttask/plugin/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 50 additions & 7 deletions fasttask/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ import (
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/ioutils"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/utils"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/utils/secrets"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/array/errorcollector"
podplugin "github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/k8s/pod"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"

"github.com/unionai/flyte/fasttask/plugin/pb"
)

const fastTaskType = "fast-task"
const maxErrorMessageLength = 102400 // 100kb

var (
statusUpdateNotFoundError = errors.New("StatusUpdateNotFound")
Expand All @@ -45,14 +48,14 @@ const (

// pluginMetrics is a collection of metrics for the plugin.
type pluginMetrics struct {
workersUnavailableTimeout prometheus.Counter
allReplicasFailed prometheus.Counter
statusUpdateNotFoundTimeout prometheus.Counter
}

// newPluginMetrics creates a new pluginMetrics with the given scope.
func newPluginMetrics(scope promutils.Scope) pluginMetrics {
return pluginMetrics{
workersUnavailableTimeout: scope.MustNewCounter("workers_unavailable_timeout", "Count of tasks that timed out waiting for workers to become available"),
allReplicasFailed: scope.MustNewCounter("all_replicas_failed", "Count of tasks that failed due to all environment replicas failing"),
statusUpdateNotFoundTimeout: scope.MustNewCounter("status_update_not_found_timeout", "Count of tasks that timed out waiting for status update from worker"),
}
}
Expand Down Expand Up @@ -273,12 +276,52 @@ func (p *Plugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (co
pluginState.LastUpdated = time.Now()
}

// fail if no worker available within grace period
if time.Since(pluginState.LastUpdated) > GetConfig().GracePeriodWorkersUnavailable.Duration {
logger.Infof(ctx, "Timed out waiting for available worker for queue %s", queueID)
p.metrics.workersUnavailableTimeout.Inc()
// fail if all replicas for this environment are in a failed state
statuses, err := tCtx.GetExecutionEnvClient().Status(ctx, queueID)
if err != nil {
return core.UnknownTransition, err
}

statusesMap := statuses.(map[string]*v1.Pod)

allReplicasFailed := true
messageCollector := errorcollector.NewErrorMessageCollector()

now := time.Now()
index := 0
for _, pod := range statusesMap {
if pod == nil {
// pod does not exist because it has not yet been populated in the kubeclient
// cache or was deleted. to be safe, we treat both as a non-failure state.
allReplicasFailed = false
break
}

phaseInfo, err := podplugin.DemystifyPodStatus(pod, core.TaskInfo{OccurredAt: &now})
if err != nil {
return core.UnknownTransition, err
}

switch phaseInfo.Phase() {
case core.PhasePermanentFailure, core.PhaseRetryableFailure:
if phaseInfo.Err() != nil {
messageCollector.Collect(index, phaseInfo.Err().GetMessage())
} else {
messageCollector.Collect(index, phaseInfo.Reason())
}
default:
allReplicasFailed = false
}

index++
}

if allReplicasFailed {
logger.Infof(ctx, "all workers have failed for queue %s", queueID)
p.metrics.allReplicasFailed.Inc()

phaseInfo = core.PhaseInfoSystemFailure("unknown", fmt.Sprintf("timed out waiting for available worker for queue %s", queueID), nil)
phaseInfo = core.PhaseInfoSystemFailure("unknown", fmt.Sprintf("all workers have failed for queue %s\n%s",
queueID, messageCollector.Summary(maxErrorMessageLength)), nil)
} else {
phaseInfo = core.PhaseInfoWaitingForResourcesInfo(time.Now(), core.DefaultPhaseVersion, "no workers available", nil)
}
Expand Down
36 changes: 25 additions & 11 deletions fasttask/plugin/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,24 +367,34 @@ func TestAddObjectMetadata(t *testing.T) {
func TestHandleNotYetStarted(t *testing.T) {
ctx := context.TODO()
tests := []struct {
name string
workerID string
lastUpdated time.Time
expectedPhase core.Phase
expectedReason string
expectedError error
name string
workerID string
lastUpdated time.Time
executionEnvStatus map[string]*v1.Pod
expectedPhase core.Phase
expectedReason string
expectedError error
}{
{
name: "NoWorkersAvailable",
workerID: "",
name: "NoWorkersAvailable",
workerID: "",
executionEnvStatus: map[string]*v1.Pod{
"foo": nil,
},
expectedPhase: core.PhaseWaitingForResources,
expectedReason: "no workers available",
expectedError: nil,
},
{
name: "NoWorkersAvailableGracePeriodFailure",
workerID: "",
lastUpdated: time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC),
name: "NoWorkersAllFailed",
workerID: "",
executionEnvStatus: map[string]*v1.Pod{
"foo": &v1.Pod{
Status: v1.PodStatus{
Phase: v1.PodFailed,
},
},
},
expectedPhase: core.PhasePermanentFailure,
expectedReason: "",
expectedError: nil,
Expand Down Expand Up @@ -436,6 +446,10 @@ func TestHandleNotYetStarted(t *testing.T) {
tCtx.OnTaskExecutionMetadata().Return(taskMetadata)
tCtx.OnTaskReader().Return(taskReader)

executionEnvClient := &coremocks.ExecutionEnvClient{}
executionEnvClient.OnStatusMatch(ctx, mock.Anything).Return(test.executionEnvStatus, nil)
tCtx.OnGetExecutionEnvClient().Return(executionEnvClient)

arrayNodeStateInput := &State{
SubmissionPhase: NotSubmitted,
LastUpdated: test.lastUpdated,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ import (
type ExecutionEnvClient interface {
Get(ctx context.Context, executionEnvID string) *_struct.Struct
Create(ctx context.Context, executionEnvID string, executionEnvSpec *_struct.Struct) (*_struct.Struct, error)
Status(ctx context.Context, executionEnvID string) (interface{}, error)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 24 additions & 17 deletions flyteplugins/go/tasks/plugins/k8s/pod/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,29 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin
info.Logs = taskLogs
}

phaseInfo, err := DemystifyPodStatus(pod, info)
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
} else if phaseInfo.Phase() != pluginsCore.PhaseRunning && phaseInfo.Phase() == pluginState.Phase &&
phaseInfo.Version() <= pluginState.PhaseVersion && phaseInfo.Reason() != pluginState.Reason {

// if we have the same Phase as the previous evaluation and updated the Reason but not the PhaseVersion we must
// update the PhaseVersion so an event is sent to reflect the Reason update. this does not handle the Running
// Phase because the legacy used `DefaultPhaseVersion + 1` which will only increment to 1.
phaseInfo = phaseInfo.WithVersion(pluginState.PhaseVersion + 1)
}

return phaseInfo, err
}

func (plugin) GetProperties() k8s.PluginProperties {
return k8s.PluginProperties{}
}

func DemystifyPodStatus(pod *v1.Pod, info pluginsCore.TaskInfo) (pluginsCore.PhaseInfo, error) {
phaseInfo := pluginsCore.PhaseInfoUndefined
var err error

switch pod.Status.Phase {
case v1.PodSucceeded:
phaseInfo, err = flytek8s.DemystifySuccess(pod.Status, info)
Expand All @@ -189,11 +211,11 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin
case v1.PodPending:
phaseInfo, err = flytek8s.DemystifyPending(pod.Status)
case v1.PodReasonUnschedulable:
phaseInfo = pluginsCore.PhaseInfoQueued(transitionOccurredAt, pluginsCore.DefaultPhaseVersion, "pod unschedulable")
phaseInfo = pluginsCore.PhaseInfoQueued(*info.OccurredAt, pluginsCore.DefaultPhaseVersion, "pod unschedulable")
case v1.PodUnknown:
// DO NOTHING
default:
primaryContainerName, exists := r.GetAnnotations()[flytek8s.PrimaryContainerKey]
primaryContainerName, exists := pod.GetAnnotations()[flytek8s.PrimaryContainerKey]
if !exists {
// if all of the containers in the Pod are complete, as an optimization, we can declare the task as
// succeeded rather than waiting for the Pod to be marked completed.
Expand Down Expand Up @@ -234,24 +256,9 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin
}
}

if err != nil {
return pluginsCore.PhaseInfoUndefined, err
} else if phaseInfo.Phase() != pluginsCore.PhaseRunning && phaseInfo.Phase() == pluginState.Phase &&
phaseInfo.Version() <= pluginState.PhaseVersion && phaseInfo.Reason() != pluginState.Reason {

// if we have the same Phase as the previous evaluation and updated the Reason but not the PhaseVersion we must
// update the PhaseVersion so an event is sent to reflect the Reason update. this does not handle the Running
// Phase because the legacy used `DefaultPhaseVersion + 1` which will only increment to 1.
phaseInfo = phaseInfo.WithVersion(pluginState.PhaseVersion + 1)
}

return phaseInfo, err
}

func (plugin) GetProperties() k8s.PluginProperties {
return k8s.PluginProperties{}
}

func init() {
// Register ContainerTaskType and SidecarTaskType plugin entries. These separate task types
// still exist within the system, only now both are evaluated using the same internal pod plugin
Expand Down
Loading

0 comments on commit bbc37c7

Please sign in to comment.