Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Remove BarrierTick #545

Merged
merged 9 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.13.0
github.com/flyteorg/flyteidl v1.3.14
github.com/flyteorg/flyteplugins v1.0.47
github.com/flyteorg/flyteplugins v1.0.48
github.com/flyteorg/flytestdlib v1.0.15
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYF
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.3.14 h1:o5M0g/r6pXTPu5PEurbYxbQmuOu3hqqsaI2M6uvK0N8=
github.com/flyteorg/flyteidl v1.3.14/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteplugins v1.0.47 h1:+SnRM7Z257xiIg5B5i3gLJxEUtZJlEyrzCPCAMolsug=
github.com/flyteorg/flyteplugins v1.0.47/go.mod h1:ztsonku5fKwyxcIg1k69PTiBVjRI6d3nK5DnC+iwx08=
github.com/flyteorg/flyteplugins v1.0.48 h1:2r1dxp6WMnANQNgAGVQwYnRu6YDKJ0R+DbCeoZAng5Q=
github.com/flyteorg/flyteplugins v1.0.48/go.mod h1:ztsonku5fKwyxcIg1k69PTiBVjRI6d3nK5DnC+iwx08=
github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0=
github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s=
github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk=
Expand Down
1 change: 0 additions & 1 deletion pkg/controller/nodes/handler/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ type TaskNodeState struct {
PluginPhaseVersion uint32
PluginState []byte
PluginStateVersion uint32
BarrierClockTick uint32
LastPhaseUpdatedAt time.Time
PreviousNodeExecutionCheckpointURI storage.DataReference
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/handler/transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ type TransitionType int

const (
TransitionTypeEphemeral TransitionType = iota
// @deprecated support for Barrier type transitions has been deprecated
TransitionTypeBarrier
)

Expand Down
1 change: 0 additions & 1 deletion pkg/controller/nodes/node_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func (n nodeStateManager) GetTaskNodeState() handler.TaskNodeState {
PluginPhaseVersion: tn.GetPhaseVersion(),
PluginStateVersion: tn.GetPluginStateVersion(),
PluginState: tn.GetPluginState(),
BarrierClockTick: tn.GetBarrierClockTick(),
LastPhaseUpdatedAt: tn.GetLastPhaseUpdatedAt(),
PreviousNodeExecutionCheckpointURI: tn.GetPreviousNodeExecutionCheckpointPath(),
}
Expand Down
61 changes: 0 additions & 61 deletions pkg/controller/nodes/task/barrier.go

This file was deleted.

12 changes: 0 additions & 12 deletions pkg/controller/nodes/task/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ var (
defaultConfig = &Config{
TaskPlugins: TaskPluginConfig{EnabledPlugins: []string{}, DefaultForTaskTypes: map[string]string{}},
MaxPluginPhaseVersions: 100000,
BarrierConfig: BarrierConfig{
Enabled: true,
CacheSize: 10000,
CacheTTL: config.Duration{Duration: time.Minute * 30},
},
BackOffConfig: BackOffConfig{
BaseSecond: 2,
MaxDuration: config.Duration{Duration: time.Second * 20},
Expand All @@ -37,17 +32,10 @@ var (
type Config struct {
TaskPlugins TaskPluginConfig `json:"task-plugins" pflag:",Task plugin configuration"`
MaxPluginPhaseVersions int32 `json:"max-plugin-phase-versions" pflag:",Maximum number of plugin phase versions allowed for one phase."`
BarrierConfig BarrierConfig `json:"barrier" pflag:",Config for Barrier implementation"`
BackOffConfig BackOffConfig `json:"backoff" pflag:",Config for Exponential BackOff implementation"`
MaxErrorMessageLength int `json:"maxLogMessageLength" pflag:",Deprecated!!! Max length of error message."`
}

type BarrierConfig struct {
Enabled bool `json:"enabled" pflag:",Enable Barrier transitions using inmemory context"`
CacheSize int `json:"cache-size" pflag:",Max number of barrier to preserve in memory"`
CacheTTL config.Duration `json:"cache-ttl" pflag:", Max duration that a barrier would be respected if the process is not restarted. This should account for time required to store the record into persistent storage (across multiple rounds."`
}

type TaskPluginConfig struct {
EnabledPlugins []string `json:"enabled-plugins" pflag:",Plugins enabled currently"`
// Maps task types to their plugin handler (by ID).
Expand Down
3 changes: 0 additions & 3 deletions pkg/controller/nodes/task/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 0 additions & 42 deletions pkg/controller/nodes/task/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 8 additions & 40 deletions pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ type Handler struct {
kubeClient pluginCore.KubeClient
secretManager pluginCore.SecretManager
resourceManager resourcemanager.BaseResourceManager
barrierCache *barrier
cfg *config.Config
pluginScope promutils.Scope
eventConfig *controllerConfig.EventConfig
Expand Down Expand Up @@ -658,48 +657,19 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
}
}

barrierTick := uint32(0)
occurredAt := time.Now()
// STEP 2: If no cache-hit and not transitioning to PhaseWaitingForCache, then lets invoke the plugin and wait for a transition out of undefined
if pluginTrns.execInfo.TaskNodeInfo == nil || (pluginTrns.pInfo.Phase() != pluginCore.PhaseWaitingForCache &&
pluginTrns.execInfo.TaskNodeInfo.TaskNodeMetadata.CacheStatus != core.CatalogCacheStatus_CACHE_HIT) {
prevBarrier := t.barrierCache.GetPreviousBarrierTransition(ctx, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName())
// Lets start with the current barrierTick (the value to be stored) same as the barrierTick in the cache
barrierTick = prevBarrier.BarrierClockTick
// Lets check if this value in cache is less than or equal to one in the store
if barrierTick <= ts.BarrierClockTick {
var err error
pluginTrns, err = t.invokePlugin(ctx, p, tCtx, ts)
if err != nil {
return handler.UnknownTransition, errors.Wrapf(errors.RuntimeExecutionError, nCtx.NodeID(), err, "failed during plugin execution")
}
if pluginTrns.IsPreviouslyObserved() {
logger.Debugf(ctx, "No state change for Task, previously observed same transition. Short circuiting.")
return pluginTrns.FinalTransition(ctx)
}
// Now no matter what we should update the barrierTick (stored in state)
// This is because the state is ahead of the inmemory representation
// This can happen in the case where the process restarted or the barrier cache got reset
barrierTick = ts.BarrierClockTick
// Now if the transition is of type barrier, lets tick the clock by one from the prev known value
// store that in the cache
if pluginTrns.ttype == handler.TransitionTypeBarrier {
logger.Infof(ctx, "Barrier transition observed for Plugin [%s], TaskExecID [%s]. recording: [%s]", p.GetID(), tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), pluginTrns.pInfo.String())
barrierTick = barrierTick + 1
t.barrierCache.RecordBarrierTransition(ctx, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), BarrierTransition{
BarrierClockTick: barrierTick,
CallLog: PluginCallLog{
PluginTransition: pluginTrns,
},
})

}
} else {
// Barrier tick will remain to be the one in cache.
// Now it may happen that the cache may get reset before we store the barrier tick
// this will cause us to lose that information and potentially replaying.
logger.Infof(ctx, "Replaying Barrier transition for cache tick [%d] < stored tick [%d], Plugin [%s], TaskExecID [%s]. recording: [%s]", barrierTick, ts.BarrierClockTick, p.GetID(), tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), prevBarrier.CallLog.PluginTransition.pInfo.String())
pluginTrns = prevBarrier.CallLog.PluginTransition
var err error
pluginTrns, err = t.invokePlugin(ctx, p, tCtx, ts)
if err != nil {
return handler.UnknownTransition, errors.Wrapf(errors.RuntimeExecutionError, nCtx.NodeID(), err, "failed during plugin execution")
}
if pluginTrns.IsPreviouslyObserved() {
logger.Debugf(ctx, "No state change for Task, previously observed same transition. Short circuiting.")
return pluginTrns.FinalTransition(ctx)
}
}

Expand Down Expand Up @@ -775,7 +745,6 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
PluginStateVersion: pluginTrns.pluginStateVersion,
PluginPhase: pluginTrns.pInfo.Phase(),
PluginPhaseVersion: pluginTrns.pInfo.Version(),
BarrierClockTick: barrierTick,
LastPhaseUpdatedAt: time.Now(),
PreviousNodeExecutionCheckpointURI: ts.PreviousNodeExecutionCheckpointURI,
})
Expand Down Expand Up @@ -932,7 +901,6 @@ func New(ctx context.Context, kubeClient executors.Client, client catalog.Client
asyncCatalog: async,
resourceManager: nil,
secretManager: secretmanager.NewFileEnvSecretManager(secretmanager.GetConfig()),
barrierCache: newLRUBarrier(ctx, cfg.BarrierConfig),
cfg: cfg,
eventConfig: eventConfig,
clusterID: clusterID,
Expand Down
Loading