Skip to content

Commit

Permalink
Support GeneratedNameMaxLength property (flyteorg#245)
Browse files Browse the repository at this point in the history
* Support GeneratedNameMaxLength property

Signed-off-by: Filipe Regadas <[email protected]>

* Use pluginCore LoadPlugin

Signed-off-by: Filipe Regadas <[email protected]>

* Update flyteplugins to v0.5.40

Signed-off-by: Filipe Regadas <[email protected]>

* Fix lint

Signed-off-by: Filipe Regadas <[email protected]>
  • Loading branch information
regadas authored Mar 24, 2021
1 parent 45bd4d7 commit f2e4a2f
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 34 deletions.
2 changes: 1 addition & 1 deletion flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.18.25
github.com/flyteorg/flyteplugins v0.5.39
github.com/flyteorg/flyteplugins v0.5.40
github.com/flyteorg/flytestdlib v0.3.13
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
6 changes: 2 additions & 4 deletions flytepropeller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 h1:xJ0dAkuxJXfwdH7IaSzBEbSQxEDz36YUmt7+CB4zoNA=
Expand Down Expand Up @@ -233,8 +232,8 @@ github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGE
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.18.25 h1:XbHwM4G1u5nGAcdKod+ENgbL84cHdNzQIWY+NajuHs8=
github.com/flyteorg/flyteidl v0.18.25/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteplugins v0.5.39 h1:nN8lK4SBtK3FvxSKHDiH/caNwTlb0V+DWAOIMCeFcu0=
github.com/flyteorg/flyteplugins v0.5.39/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w=
github.com/flyteorg/flyteplugins v0.5.40 h1:3Vaat/CzMv87hIuloVRKsPussO0271TUmtbCzBMTAN8=
github.com/flyteorg/flyteplugins v0.5.40/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w=
github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
Expand Down Expand Up @@ -1229,7 +1228,6 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4 h1:UoveltGrhghAA7ePc+e+QYDHXrBps2PqFZiHkGR/xK8=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.0.0-20210217171935-8e2decd92398/go.mod h1:60tmSUpHxGPFerNHbo/ayI2lKxvtrhbxFyXuEIWJd78=
k8s.io/api v0.18.2/go.mod h1:SJCWI7OLzhZSvbY7U8zwNl9UA4o1fizoug34OV/2r78=
Expand Down
8 changes: 4 additions & 4 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (t *Handler) Setup(ctx context.Context, sCtx handler.SetupContext) error {
sCtxFinal := newNameSpacedSetupCtx(
tSCtx, newResourceManagerBuilder.GetResourceRegistrar(pluginResourceNamespacePrefix))
logger.Infof(ctx, "Loading Plugin [%s] ENABLED", p.ID)
cp, err := p.LoadPlugin(ctx, sCtxFinal)
cp, err := pluginCore.LoadPlugin(ctx, sCtxFinal, p)
if err != nil {
return regErrors.Wrapf(err, "failed to load plugin - %s", p.ID)
}
Expand Down Expand Up @@ -476,7 +476,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
logger.Infof(ctx, "Node level caching is disabled. Skipping catalog read.")
}

tCtx, err := t.newTaskExecutionContext(ctx, nCtx, p.GetID())
tCtx, err := t.newTaskExecutionContext(ctx, nCtx, p)
if err != nil {
return handler.UnknownTransition, errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "unable to create Handler execution context")
}
Expand Down Expand Up @@ -662,7 +662,7 @@ func (t Handler) Abort(ctx context.Context, nCtx handler.NodeExecutionContext, r
return errors.Wrapf(errors.UnsupportedTaskTypeError, nCtx.NodeID(), err, "unable to resolve plugin")
}

tCtx, err := t.newTaskExecutionContext(ctx, nCtx, p.GetID())
tCtx, err := t.newTaskExecutionContext(ctx, nCtx, p)
if err != nil {
return errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "unable to create Handler execution context")
}
Expand Down Expand Up @@ -718,7 +718,7 @@ func (t Handler) Finalize(ctx context.Context, nCtx handler.NodeExecutionContext
return errors.Wrapf(errors.UnsupportedTaskTypeError, nCtx.NodeID(), err, "unable to resolve plugin")
}

tCtx, err := t.newTaskExecutionContext(ctx, nCtx, p.GetID())
tCtx, err := t.newTaskExecutionContext(ctx, nCtx, p)
if err != nil {
return errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "unable to create Handler execution context")
}
Expand Down
15 changes: 14 additions & 1 deletion flytepropeller/pkg/controller/nodes/task/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,20 @@ func Test_task_Setup(t *testing.T) {
corePluginType := "core"
corePlugin := &pluginCoreMocks.Plugin{}
corePlugin.On("GetID").Return(corePluginType)
corePlugin.OnGetProperties().Return(pluginCore.PluginProperties{})

corePluginDefaultType := "coredefault"
corePluginDefault := &pluginCoreMocks.Plugin{}
corePluginDefault.On("GetID").Return(corePluginDefaultType)
corePluginDefault.OnGetProperties().Return(pluginCore.PluginProperties{})

k8sPluginType := "k8s"
k8sPlugin := &pluginK8sMocks.Plugin{}
k8sPlugin.OnGetProperties().Return(pluginK8s.PluginProperties{})

k8sPluginDefaultType := "k8sdefault"
k8sPluginDefault := &pluginK8sMocks.Plugin{}
k8sPluginDefault.OnGetProperties().Return(pluginK8s.PluginProperties{})

corePluginEntry := pluginCore.PluginEntry{
ID: corePluginType,
Expand Down Expand Up @@ -1173,7 +1177,10 @@ func Test_task_Handle_Barrier(t *testing.T) {
assert.NoError(t, err)
tk.resourceManager = noopRm

tctx, err := tk.newTaskExecutionContext(context.TODO(), nCtx, "plugin1")
p := &pluginCoreMocks.Plugin{}
p.On("GetID").Return("plugin1")
p.OnGetProperties().Return(pluginCore.PluginProperties{})
tctx, err := tk.newTaskExecutionContext(context.TODO(), nCtx, p)
assert.NoError(t, err)
id := tctx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()

Expand Down Expand Up @@ -1322,12 +1329,14 @@ func Test_task_Abort(t *testing.T) {
{"abort-fails", fields{defaultPluginCallback: func() pluginCore.Plugin {
p := &pluginCoreMocks.Plugin{}
p.On("GetID").Return("id")
p.OnGetProperties().Return(pluginCore.PluginProperties{})
p.On("Abort", mock.Anything, mock.Anything).Return(fmt.Errorf("error"))
return p
}}, args{nil}, true, true},
{"abort-success", fields{defaultPluginCallback: func() pluginCore.Plugin {
p := &pluginCoreMocks.Plugin{}
p.On("GetID").Return("id")
p.OnGetProperties().Return(pluginCore.PluginProperties{})
p.On("Abort", mock.Anything, mock.Anything).Return(nil)
return p
}}, args{ev: &fakeBufferedTaskEventRecorder{}}, false, true},
Expand Down Expand Up @@ -1464,12 +1473,14 @@ func Test_task_Abort_v1(t *testing.T) {
{"abort-fails", fields{defaultPluginCallback: func() pluginCore.Plugin {
p := &pluginCoreMocks.Plugin{}
p.On("GetID").Return("id")
p.OnGetProperties().Return(pluginCore.PluginProperties{})
p.On("Abort", mock.Anything, mock.Anything).Return(fmt.Errorf("error"))
return p
}}, args{nil}, true, true},
{"abort-success", fields{defaultPluginCallback: func() pluginCore.Plugin {
p := &pluginCoreMocks.Plugin{}
p.On("GetID").Return("id")
p.OnGetProperties().Return(pluginCore.PluginProperties{})
p.On("Abort", mock.Anything, mock.Anything).Return(nil)
return p
}}, args{ev: &fakeBufferedTaskEventRecorder{}}, false, true},
Expand Down Expand Up @@ -1603,12 +1614,14 @@ func Test_task_Finalize(t *testing.T) {
{"finalize-fails", fields{defaultPluginCallback: func() pluginCore.Plugin {
p := &pluginCoreMocks.Plugin{}
p.On("GetID").Return("id")
p.OnGetProperties().Return(pluginCore.PluginProperties{})
p.On("Finalize", mock.Anything, mock.Anything).Return(fmt.Errorf("error"))
return p
}}, args{nCtx: nCtx}, true, true},
{"finalize-success", fields{defaultPluginCallback: func() pluginCore.Plugin {
p := &pluginCoreMocks.Plugin{}
p.On("GetID").Return("id")
p.OnGetProperties().Return(pluginCore.PluginProperties{})
p.On("Finalize", mock.Anything, mock.Anything).Return(nil)
return p
}}, args{nCtx: nCtx}, false, true},
Expand Down
31 changes: 15 additions & 16 deletions flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,8 @@ type PluginManager struct {
kubeClient pluginsCore.KubeClient
metrics PluginMetrics
// Per namespace-resource
backOffController *backoff.Controller
resourceLevelMonitor *ResourceLevelMonitor
disableInjectOwnerReferences bool
disableInjectFinalizer bool
backOffController *backoff.Controller
resourceLevelMonitor *ResourceLevelMonitor
}

func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) {
Expand All @@ -109,18 +107,21 @@ func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetad
o.SetLabels(utils.UnionMaps(o.GetLabels(), utils.CopyMap(taskCtx.GetLabels()), cfg.DefaultLabels))
o.SetName(taskCtx.GetTaskExecutionID().GetGeneratedName())

if !e.disableInjectOwnerReferences {
if !e.plugin.GetProperties().DisableInjectOwnerReferences {
o.SetOwnerReferences([]metav1.OwnerReference{taskCtx.GetOwnerReference()})
}

if cfg.InjectFinalizer && !e.disableInjectFinalizer {
if cfg.InjectFinalizer && !e.plugin.GetProperties().DisableInjectFinalizer {
f := append(o.GetFinalizers(), finalizer)
o.SetFinalizers(f)
}
}

func (e *PluginManager) GetProperties() pluginsCore.PluginProperties {
return pluginsCore.PluginProperties{}
props := e.plugin.GetProperties()
return pluginsCore.PluginProperties{
GeneratedNameMaxLength: props.GeneratedNameMaxLength,
}
}

func (e *PluginManager) GetID() string {
Expand Down Expand Up @@ -432,7 +433,7 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
}

workflowParentPredicate := func(o metav1.Object) bool {
if entry.DisableInjectOwnerReferences {
if entry.Plugin.GetProperties().DisableInjectOwnerReferences {
return true
}

Expand Down Expand Up @@ -526,14 +527,12 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
rm.RunCollectorOnce(ctx)

return &PluginManager{
id: entry.ID,
plugin: entry.Plugin,
resourceToWatch: entry.ResourceToWatch,
metrics: newPluginMetrics(metricsScope),
kubeClient: kubeClient,
resourceLevelMonitor: rm,
disableInjectOwnerReferences: entry.DisableInjectOwnerReferences,
disableInjectFinalizer: entry.DisableInjectFinalizer,
id: entry.ID,
plugin: entry.Plugin,
resourceToWatch: entry.ResourceToWatch,
metrics: newPluginMetrics(metricsScope),
kubeClient: kubeClient,
resourceLevelMonitor: rm,
}, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (e extendedFakeClient) Delete(ctx context.Context, obj client.Object, opts
type k8sSampleHandler struct {
}

func (k8sSampleHandler) GetProperties() k8s.PluginProperties {
panic("implement me")
}

func (k8sSampleHandler) BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (client.Object, error) {
panic("implement me")
}
Expand Down Expand Up @@ -208,6 +212,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
tCtx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseStarted)
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{})
mockResourceHandler.OnBuildResourceMatch(mock.Anything, mock.Anything).Return(&v1.Pod{}, nil)
fakeClient := fake.NewClientBuilder().WithRuntimeObjects().Build()
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{
Expand Down Expand Up @@ -236,6 +241,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseStarted)
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{})
mockResourceHandler.OnBuildResourceMatch(mock.Anything, mock.Anything).Return(&v1.Pod{}, nil)
fakeClient := fake.NewClientBuilder().WithRuntimeObjects().Build()
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{
Expand Down Expand Up @@ -266,6 +272,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseNotStarted)
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{})
mockResourceHandler.OnBuildResourceMatch(mock.Anything, mock.Anything).Return(&v1.Pod{}, nil)
fakeClient := extendedFakeClient{
Client: fake.NewClientBuilder().WithRuntimeObjects().Build(),
Expand Down Expand Up @@ -299,6 +306,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseNotStarted)
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{})
mockResourceHandler.OnBuildResourceMatch(mock.Anything, mock.Anything).Return(&v1.Pod{}, nil)
fakeClient := extendedFakeClient{
Client: fake.NewClientBuilder().WithRuntimeObjects().Build(),
Expand Down Expand Up @@ -330,6 +338,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseNotStarted)
// Creating a mock k8s plugin
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{})
mockResourceHandler.OnBuildResourceMatch(mock.Anything, mock.Anything).Return(&v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: flytek8s.PodKind,
Expand Down Expand Up @@ -397,6 +406,7 @@ func TestPluginManager_Abort(t *testing.T) {

// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{})
mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil)
mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil)
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{
Expand All @@ -417,6 +427,7 @@ func TestPluginManager_Abort(t *testing.T) {
fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme)}
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{})
mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil)
mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil)
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{
Expand Down Expand Up @@ -538,6 +549,7 @@ func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) {
fc := tt.args.fakeClient()
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{})
mockResourceHandler.On("BuildIdentityResource", mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil)
mockResourceHandler.On("GetTaskPhase", mock.Anything, mock.Anything, mock.Anything).Return(tt.args.getTaskPhaseCB())
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{
Expand Down Expand Up @@ -573,6 +585,7 @@ func TestPluginManager_CustomKubeClient(t *testing.T) {
tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseStarted)
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{})
mockResourceHandler.On("BuildResource", mock.Anything, tctx).Return(&v1.Pod{}, nil)
fakeClient := fake.NewClientBuilder().Build()
newFakeClient := &pluginsCoreMock.KubeClient{}
Expand Down Expand Up @@ -601,7 +614,9 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) {

t.Run("default", func(t *testing.T) {
o := &v1.Pod{}
pluginManager := PluginManager{}
p := pluginsk8sMock.Plugin{}
p.OnGetProperties().Return(k8s.PluginProperties{})
pluginManager := PluginManager{plugin: &p}
pluginManager.AddObjectMetadata(tm, o, cfg)
assert.Equal(t, genName, o.GetName())
assert.Equal(t, []v12.OwnerReference{or}, o.GetOwnerReferences())
Expand All @@ -615,7 +630,9 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) {
})

t.Run("Disable OwnerReferences injection", func(t *testing.T) {
pluginManager := PluginManager{disableInjectOwnerReferences: true}
p := pluginsk8sMock.Plugin{}
p.OnGetProperties().Return(k8s.PluginProperties{DisableInjectOwnerReferences: true})
pluginManager := PluginManager{plugin: &p}
o := &v1.Pod{}
pluginManager.AddObjectMetadata(tm, o, cfg)
assert.Equal(t, genName, o.GetName())
Expand All @@ -631,7 +648,9 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) {
})

t.Run("Disable enabled InjectFinalizer", func(t *testing.T) {
pluginManager := PluginManager{disableInjectFinalizer: true}
p := pluginsk8sMock.Plugin{}
p.OnGetProperties().Return(k8s.PluginProperties{DisableInjectFinalizer: true})
pluginManager := PluginManager{plugin: &p}
// enable finalizer injection
cfg.InjectFinalizer = true
o := &v1.Pod{}
Expand All @@ -649,7 +668,9 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) {
})

t.Run("Disable disabled InjectFinalizer", func(t *testing.T) {
pluginManager := PluginManager{disableInjectFinalizer: true}
p := pluginsk8sMock.Plugin{}
p.OnGetProperties().Return(k8s.PluginProperties{DisableInjectFinalizer: true})
pluginManager := PluginManager{plugin: &p}
// disable finalizer injection
cfg.InjectFinalizer = false
o := &v1.Pod{}
Expand Down
11 changes: 8 additions & 3 deletions flytepropeller/pkg/controller/nodes/task/taskexec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (t taskExecutionContext) SecretManager() pluginCore.SecretManager {
return t.sm
}

func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.NodeExecutionContext, pluginID string) (*taskExecutionContext, error) {
func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.NodeExecutionContext, plugin pluginCore.Plugin) (*taskExecutionContext, error) {
id := GetTaskExecutionIdentifier(nCtx)

currentNodeUniqueID := nCtx.NodeID()
Expand All @@ -136,7 +136,12 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node
}
}

uniqueID, err := utils.FixedLengthUniqueIDForParts(IDMaxLength, nCtx.NodeExecutionMetadata().GetOwnerID().Name, currentNodeUniqueID, strconv.Itoa(int(id.RetryAttempt)))
length := IDMaxLength
if l := plugin.GetProperties().GeneratedNameMaxLength; l != nil {
length = *l
}

uniqueID, err := utils.FixedLengthUniqueIDForParts(length, nCtx.NodeExecutionMetadata().GetOwnerID().Name, currentNodeUniqueID, strconv.Itoa(int(id.RetryAttempt)))
if err != nil {
// SHOULD never really happen
return nil, err
Expand All @@ -157,7 +162,7 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node
return nil, errors.Wrapf(errors.RuntimeExecutionError, nCtx.NodeID(), err, "unable to initialize plugin state manager")
}

resourceNamespacePrefix := pluginCore.ResourceNamespace(t.resourceManager.GetID()).CreateSubNamespace(pluginCore.ResourceNamespace(pluginID))
resourceNamespacePrefix := pluginCore.ResourceNamespace(t.resourceManager.GetID()).CreateSubNamespace(pluginCore.ResourceNamespace(plugin.GetID()))
maxAttempts := uint32(DefaultMaxAttempts)
if nCtx.Node().GetRetryStrategy() != nil && nCtx.Node().GetRetryStrategy().MinAttempts != nil {
maxAttempts = uint32(*nCtx.Node().GetRetryStrategy().MinAttempts)
Expand Down
Loading

0 comments on commit f2e4a2f

Please sign in to comment.