Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' into terminate-workflow-error
Browse files Browse the repository at this point in the history
  • Loading branch information
EngHabu authored Apr 16, 2023
2 parents 4f9aa91 + 0c982ea commit 4226801
Show file tree
Hide file tree
Showing 12 changed files with 14 additions and 474 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.13.0
github.com/flyteorg/flyteidl v1.3.14
github.com/flyteorg/flyteplugins v1.0.47
github.com/flyteorg/flyteplugins v1.0.48
github.com/flyteorg/flytestdlib v1.0.15
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYF
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.3.14 h1:o5M0g/r6pXTPu5PEurbYxbQmuOu3hqqsaI2M6uvK0N8=
github.com/flyteorg/flyteidl v1.3.14/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteplugins v1.0.47 h1:+SnRM7Z257xiIg5B5i3gLJxEUtZJlEyrzCPCAMolsug=
github.com/flyteorg/flyteplugins v1.0.47/go.mod h1:ztsonku5fKwyxcIg1k69PTiBVjRI6d3nK5DnC+iwx08=
github.com/flyteorg/flyteplugins v1.0.48 h1:2r1dxp6WMnANQNgAGVQwYnRu6YDKJ0R+DbCeoZAng5Q=
github.com/flyteorg/flyteplugins v1.0.48/go.mod h1:ztsonku5fKwyxcIg1k69PTiBVjRI6d3nK5DnC+iwx08=
github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0=
github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s=
github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk=
Expand Down
1 change: 0 additions & 1 deletion pkg/controller/nodes/handler/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ type TaskNodeState struct {
PluginPhaseVersion uint32
PluginState []byte
PluginStateVersion uint32
BarrierClockTick uint32
LastPhaseUpdatedAt time.Time
PreviousNodeExecutionCheckpointURI storage.DataReference
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/handler/transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ type TransitionType int

const (
TransitionTypeEphemeral TransitionType = iota
// @deprecated support for Barrier type transitions has been deprecated
TransitionTypeBarrier
)

Expand Down
1 change: 0 additions & 1 deletion pkg/controller/nodes/node_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func (n nodeStateManager) GetTaskNodeState() handler.TaskNodeState {
PluginPhaseVersion: tn.GetPhaseVersion(),
PluginStateVersion: tn.GetPluginStateVersion(),
PluginState: tn.GetPluginState(),
BarrierClockTick: tn.GetBarrierClockTick(),
LastPhaseUpdatedAt: tn.GetLastPhaseUpdatedAt(),
PreviousNodeExecutionCheckpointURI: tn.GetPreviousNodeExecutionCheckpointPath(),
}
Expand Down
61 changes: 0 additions & 61 deletions pkg/controller/nodes/task/barrier.go

This file was deleted.

12 changes: 0 additions & 12 deletions pkg/controller/nodes/task/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ var (
defaultConfig = &Config{
TaskPlugins: TaskPluginConfig{EnabledPlugins: []string{}, DefaultForTaskTypes: map[string]string{}},
MaxPluginPhaseVersions: 100000,
BarrierConfig: BarrierConfig{
Enabled: true,
CacheSize: 10000,
CacheTTL: config.Duration{Duration: time.Minute * 30},
},
BackOffConfig: BackOffConfig{
BaseSecond: 2,
MaxDuration: config.Duration{Duration: time.Second * 20},
Expand All @@ -37,17 +32,10 @@ var (
type Config struct {
TaskPlugins TaskPluginConfig `json:"task-plugins" pflag:",Task plugin configuration"`
MaxPluginPhaseVersions int32 `json:"max-plugin-phase-versions" pflag:",Maximum number of plugin phase versions allowed for one phase."`
BarrierConfig BarrierConfig `json:"barrier" pflag:",Config for Barrier implementation"`
BackOffConfig BackOffConfig `json:"backoff" pflag:",Config for Exponential BackOff implementation"`
MaxErrorMessageLength int `json:"maxLogMessageLength" pflag:",Deprecated!!! Max length of error message."`
}

type BarrierConfig struct {
Enabled bool `json:"enabled" pflag:",Enable Barrier transitions using inmemory context"`
CacheSize int `json:"cache-size" pflag:",Max number of barrier to preserve in memory"`
CacheTTL config.Duration `json:"cache-ttl" pflag:", Max duration that a barrier would be respected if the process is not restarted. This should account for time required to store the record into persistent storage (across multiple rounds."`
}

type TaskPluginConfig struct {
EnabledPlugins []string `json:"enabled-plugins" pflag:",Plugins enabled currently"`
// Maps task types to their plugin handler (by ID).
Expand Down
3 changes: 0 additions & 3 deletions pkg/controller/nodes/task/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 0 additions & 42 deletions pkg/controller/nodes/task/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 8 additions & 40 deletions pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ type Handler struct {
kubeClient pluginCore.KubeClient
secretManager pluginCore.SecretManager
resourceManager resourcemanager.BaseResourceManager
barrierCache *barrier
cfg *config.Config
pluginScope promutils.Scope
eventConfig *controllerConfig.EventConfig
Expand Down Expand Up @@ -658,48 +657,19 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
}
}

barrierTick := uint32(0)
occurredAt := time.Now()
// STEP 2: If no cache-hit and not transitioning to PhaseWaitingForCache, then lets invoke the plugin and wait for a transition out of undefined
if pluginTrns.execInfo.TaskNodeInfo == nil || (pluginTrns.pInfo.Phase() != pluginCore.PhaseWaitingForCache &&
pluginTrns.execInfo.TaskNodeInfo.TaskNodeMetadata.CacheStatus != core.CatalogCacheStatus_CACHE_HIT) {
prevBarrier := t.barrierCache.GetPreviousBarrierTransition(ctx, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName())
// Lets start with the current barrierTick (the value to be stored) same as the barrierTick in the cache
barrierTick = prevBarrier.BarrierClockTick
// Lets check if this value in cache is less than or equal to one in the store
if barrierTick <= ts.BarrierClockTick {
var err error
pluginTrns, err = t.invokePlugin(ctx, p, tCtx, ts)
if err != nil {
return handler.UnknownTransition, errors.Wrapf(errors.RuntimeExecutionError, nCtx.NodeID(), err, "failed during plugin execution")
}
if pluginTrns.IsPreviouslyObserved() {
logger.Debugf(ctx, "No state change for Task, previously observed same transition. Short circuiting.")
return pluginTrns.FinalTransition(ctx)
}
// Now no matter what we should update the barrierTick (stored in state)
// This is because the state is ahead of the inmemory representation
// This can happen in the case where the process restarted or the barrier cache got reset
barrierTick = ts.BarrierClockTick
// Now if the transition is of type barrier, lets tick the clock by one from the prev known value
// store that in the cache
if pluginTrns.ttype == handler.TransitionTypeBarrier {
logger.Infof(ctx, "Barrier transition observed for Plugin [%s], TaskExecID [%s]. recording: [%s]", p.GetID(), tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), pluginTrns.pInfo.String())
barrierTick = barrierTick + 1
t.barrierCache.RecordBarrierTransition(ctx, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), BarrierTransition{
BarrierClockTick: barrierTick,
CallLog: PluginCallLog{
PluginTransition: pluginTrns,
},
})

}
} else {
// Barrier tick will remain to be the one in cache.
// Now it may happen that the cache may get reset before we store the barrier tick
// this will cause us to lose that information and potentially replaying.
logger.Infof(ctx, "Replaying Barrier transition for cache tick [%d] < stored tick [%d], Plugin [%s], TaskExecID [%s]. recording: [%s]", barrierTick, ts.BarrierClockTick, p.GetID(), tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), prevBarrier.CallLog.PluginTransition.pInfo.String())
pluginTrns = prevBarrier.CallLog.PluginTransition
var err error
pluginTrns, err = t.invokePlugin(ctx, p, tCtx, ts)
if err != nil {
return handler.UnknownTransition, errors.Wrapf(errors.RuntimeExecutionError, nCtx.NodeID(), err, "failed during plugin execution")
}
if pluginTrns.IsPreviouslyObserved() {
logger.Debugf(ctx, "No state change for Task, previously observed same transition. Short circuiting.")
return pluginTrns.FinalTransition(ctx)
}
}

Expand Down Expand Up @@ -775,7 +745,6 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
PluginStateVersion: pluginTrns.pluginStateVersion,
PluginPhase: pluginTrns.pInfo.Phase(),
PluginPhaseVersion: pluginTrns.pInfo.Version(),
BarrierClockTick: barrierTick,
LastPhaseUpdatedAt: time.Now(),
PreviousNodeExecutionCheckpointURI: ts.PreviousNodeExecutionCheckpointURI,
})
Expand Down Expand Up @@ -932,7 +901,6 @@ func New(ctx context.Context, kubeClient executors.Client, client catalog.Client
asyncCatalog: async,
resourceManager: nil,
secretManager: secretmanager.NewFileEnvSecretManager(secretmanager.GetConfig()),
barrierCache: newLRUBarrier(ctx, cfg.BarrierConfig),
cfg: cfg,
eventConfig: eventConfig,
clusterID: clusterID,
Expand Down
Loading

0 comments on commit 4226801

Please sign in to comment.