Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Revert "Remove BarrierTick (#545)"
Browse files Browse the repository at this point in the history
This reverts commit 0c982ea.

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw committed Sep 26, 2023
1 parent 51eda24 commit c115614
Show file tree
Hide file tree
Showing 10 changed files with 471 additions and 11 deletions.
1 change: 1 addition & 0 deletions pkg/controller/nodes/handler/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type TaskNodeState struct {
PluginPhaseVersion uint32
PluginState []byte
PluginStateVersion uint32
BarrierClockTick uint32
LastPhaseUpdatedAt time.Time
PreviousNodeExecutionCheckpointURI storage.DataReference
CleanupOnFailure bool
Expand Down
1 change: 0 additions & 1 deletion pkg/controller/nodes/handler/transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ type TransitionType int

const (
TransitionTypeEphemeral TransitionType = iota
// @deprecated support for Barrier type transitions has been deprecated
TransitionTypeBarrier
)

Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/node_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (n nodeStateManager) GetTaskNodeState() handler.TaskNodeState {
PluginPhaseVersion: tn.GetPhaseVersion(),
PluginStateVersion: tn.GetPluginStateVersion(),
PluginState: tn.GetPluginState(),
BarrierClockTick: tn.GetBarrierClockTick(),
LastPhaseUpdatedAt: tn.GetLastPhaseUpdatedAt(),
PreviousNodeExecutionCheckpointURI: tn.GetPreviousNodeExecutionCheckpointPath(),
CleanupOnFailure: tn.GetCleanupOnFailure(),
Expand Down
61 changes: 61 additions & 0 deletions pkg/controller/nodes/task/barrier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package task

import (
"context"
"time"

"github.com/flyteorg/flytestdlib/logger"
"k8s.io/apimachinery/pkg/util/cache"

"github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config"
)

type BarrierKey = string

type PluginCallLog struct {
PluginTransition *pluginRequestedTransition
}

type BarrierTransition struct {
BarrierClockTick uint32
CallLog PluginCallLog
}

var NoBarrierTransition = BarrierTransition{BarrierClockTick: 0}

type barrier struct {
barrierCacheExpiration time.Duration
barrierTransitions *cache.LRUExpireCache
barrierEnabled bool
}

func (b *barrier) RecordBarrierTransition(ctx context.Context, k BarrierKey, bt BarrierTransition) {
if b.barrierEnabled {
b.barrierTransitions.Add(k, bt, b.barrierCacheExpiration)
}
}

func (b *barrier) GetPreviousBarrierTransition(ctx context.Context, k BarrierKey) BarrierTransition {
if b.barrierEnabled {
if v, ok := b.barrierTransitions.Get(k); ok {
f, casted := v.(BarrierTransition)
if !casted {
logger.Errorf(ctx, "Failed to cast recorded value to BarrierTransition")
return NoBarrierTransition
}
return f
}
}
return NoBarrierTransition
}

func newLRUBarrier(_ context.Context, cfg config.BarrierConfig) *barrier {
b := &barrier{
barrierEnabled: cfg.Enabled,
}
if cfg.Enabled {
b.barrierCacheExpiration = cfg.CacheTTL.Duration
b.barrierTransitions = cache.NewLRUExpireCache(cfg.CacheSize)
}
return b
}
12 changes: 12 additions & 0 deletions pkg/controller/nodes/task/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ var (
defaultConfig = &Config{
TaskPlugins: TaskPluginConfig{EnabledPlugins: []string{}, DefaultForTaskTypes: map[string]string{}},
MaxPluginPhaseVersions: 100000,
BarrierConfig: BarrierConfig{
Enabled: true,
CacheSize: 10000,
CacheTTL: config.Duration{Duration: time.Minute * 30},
},
BackOffConfig: BackOffConfig{
BaseSecond: 2,
MaxDuration: config.Duration{Duration: time.Second * 20},
Expand All @@ -32,10 +37,17 @@ var (
type Config struct {
TaskPlugins TaskPluginConfig `json:"task-plugins" pflag:",Task plugin configuration"`
MaxPluginPhaseVersions int32 `json:"max-plugin-phase-versions" pflag:",Maximum number of plugin phase versions allowed for one phase."`
BarrierConfig BarrierConfig `json:"barrier" pflag:",Config for Barrier implementation"`
BackOffConfig BackOffConfig `json:"backoff" pflag:",Config for Exponential BackOff implementation"`
MaxErrorMessageLength int `json:"maxLogMessageLength" pflag:",Deprecated!!! Max length of error message."`
}

type BarrierConfig struct {
Enabled bool `json:"enabled" pflag:",Enable Barrier transitions using inmemory context"`
CacheSize int `json:"cache-size" pflag:",Max number of barrier to preserve in memory"`
CacheTTL config.Duration `json:"cache-ttl" pflag:", Max duration that a barrier would be respected if the process is not restarted. This should account for time required to store the record into persistent storage (across multiple rounds."`
}

type TaskPluginConfig struct {
EnabledPlugins []string `json:"enabled-plugins" pflag:",Plugins enabled currently"`
// Maps task types to their plugin handler (by ID).
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/nodes/task/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 42 additions & 0 deletions pkg/controller/nodes/task/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 40 additions & 8 deletions pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ type Handler struct {
kubeClient pluginCore.KubeClient
secretManager pluginCore.SecretManager
resourceManager resourcemanager.BaseResourceManager
barrierCache *barrier
cfg *config.Config
pluginScope promutils.Scope
eventConfig *controllerConfig.EventConfig
Expand Down Expand Up @@ -567,19 +568,48 @@ func (t Handler) Handle(ctx context.Context, nCtx interfaces.NodeExecutionContex
}
}

barrierTick := uint32(0)
occurredAt := time.Now()
// STEP 2: If no cache-hit and not transitioning to PhaseWaitingForCache, then lets invoke the plugin and wait for a transition out of undefined
if pluginTrns.execInfo.TaskNodeInfo == nil || (pluginTrns.pInfo.Phase() != pluginCore.PhaseWaitingForCache &&
pluginTrns.execInfo.TaskNodeInfo.TaskNodeMetadata.CacheStatus != core.CatalogCacheStatus_CACHE_HIT) {
prevBarrier := t.barrierCache.GetPreviousBarrierTransition(ctx, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName())
// Lets start with the current barrierTick (the value to be stored) same as the barrierTick in the cache
barrierTick = prevBarrier.BarrierClockTick
// Lets check if this value in cache is less than or equal to one in the store
if barrierTick <= ts.BarrierClockTick {
var err error
pluginTrns, err = t.invokePlugin(ctx, p, tCtx, ts)
if err != nil {
return handler.UnknownTransition, errors.Wrapf(errors.RuntimeExecutionError, nCtx.NodeID(), err, "failed during plugin execution")
}
if pluginTrns.IsPreviouslyObserved() {
logger.Debugf(ctx, "No state change for Task, previously observed same transition. Short circuiting.")
return pluginTrns.FinalTransition(ctx)
}
// Now no matter what we should update the barrierTick (stored in state)
// This is because the state is ahead of the inmemory representation
// This can happen in the case where the process restarted or the barrier cache got reset
barrierTick = ts.BarrierClockTick
// Now if the transition is of type barrier, lets tick the clock by one from the prev known value
// store that in the cache
if pluginTrns.ttype == handler.TransitionTypeBarrier {
logger.Infof(ctx, "Barrier transition observed for Plugin [%s], TaskExecID [%s]. recording: [%s]", p.GetID(), tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), pluginTrns.pInfo.String())
barrierTick = barrierTick + 1
t.barrierCache.RecordBarrierTransition(ctx, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), BarrierTransition{
BarrierClockTick: barrierTick,
CallLog: PluginCallLog{
PluginTransition: pluginTrns,
},
})

var err error
pluginTrns, err = t.invokePlugin(ctx, p, tCtx, ts)
if err != nil {
return handler.UnknownTransition, errors.Wrapf(errors.RuntimeExecutionError, nCtx.NodeID(), err, "failed during plugin execution")
}
if pluginTrns.IsPreviouslyObserved() {
logger.Debugf(ctx, "No state change for Task, previously observed same transition. Short circuiting.")
return pluginTrns.FinalTransition(ctx)
}
} else {
// Barrier tick will remain to be the one in cache.
// Now it may happen that the cache may get reset before we store the barrier tick
// this will cause us to lose that information and potentially replaying.
logger.Infof(ctx, "Replaying Barrier transition for cache tick [%d] < stored tick [%d], Plugin [%s], TaskExecID [%s]. recording: [%s]", barrierTick, ts.BarrierClockTick, p.GetID(), tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), prevBarrier.CallLog.PluginTransition.pInfo.String())
pluginTrns = prevBarrier.CallLog.PluginTransition
}
}

Expand Down Expand Up @@ -655,6 +685,7 @@ func (t Handler) Handle(ctx context.Context, nCtx interfaces.NodeExecutionContex
PluginStateVersion: pluginTrns.pluginStateVersion,
PluginPhase: pluginTrns.pInfo.Phase(),
PluginPhaseVersion: pluginTrns.pInfo.Version(),
BarrierClockTick: barrierTick,
LastPhaseUpdatedAt: time.Now(),
PreviousNodeExecutionCheckpointURI: ts.PreviousNodeExecutionCheckpointURI,
CleanupOnFailure: ts.CleanupOnFailure || pluginTrns.pInfo.CleanupOnFailure(),
Expand Down Expand Up @@ -870,6 +901,7 @@ func New(ctx context.Context, kubeClient executors.Client, client catalog.Client
asyncCatalog: async,
resourceManager: nil,
secretManager: secretmanager.NewFileEnvSecretManager(secretmanager.GetConfig()),
barrierCache: newLRUBarrier(ctx, cfg.BarrierConfig),
cfg: cfg,
eventConfig: eventConfig,
clusterID: clusterID,
Expand Down
Loading

0 comments on commit c115614

Please sign in to comment.