From 41a756f52e585a7e596fca8cf0ecffb5e6a4c8f9 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 8 Apr 2020 14:32:22 -0700 Subject: [PATCH] Retrieve launch plan interfaces from admin (#103) # TL;DR When Propeller comes across a launch plan node in a `DynamicJobSpec` it will hit Admin to retrieve the interface for the LP to do the interface check. ## Type - [ ] Bug Fix - [x] Feature - [ ] Plugin ## Are all requirements met? - [x] Code completed - [x] Smoke tested - [x] Unit tests added - [x] Code documentation added - [x] Any pending items have an associated Issue ## Complete description Please see the issue linked below and also the SDK PR https://github.com/lyft/flytekit/pull/92 for more information. A sample dynamic job spec object has been uploaded here as well. Please see the text file for the type of dynamic job spec this PR is meant to support. [dynamic_job_spec.txt](https://github.com/lyft/flytepropeller/files/4430252/dynamic_job_spec.txt) * Added a `GetLaunchPlan` function to a `launchplan/Reader` interface which sits alongside the `launchplan/Executor` interface. Admin client wrapper now satisfies both interfaces. * Added a call to that function in the dynamic job handler `buildContextualDynamicWorkflow` function. ## Tracking Issue https://github.com/lyft/flyte/issues/139 ## Follow-up issue https://github.com/lyft/flyte/issues/246 This PR will be deprecated upon completion of this issue. --- flytepropeller/go.mod | 18 +- flytepropeller/go.sum | 55 ++-- flytepropeller/pkg/compiler/admin.go | 36 +++ flytepropeller/pkg/compiler/admin_test.go | 72 +++++ .../pkg/compiler/workflow_compiler.go | 6 +- .../pkg/compiler/workflow_compiler_test.go | 2 +- flytepropeller/pkg/controller/controller.go | 11 +- .../pkg/controller/nodes/dynamic/handler.go | 41 ++- .../controller/nodes/dynamic/handler_test.go | 285 +++++++++++++++++- .../pkg/controller/nodes/executor.go | 6 +- .../pkg/controller/nodes/executor_test.go | 51 +++- .../pkg/controller/nodes/handler_factory.go | 5 +- .../nodes/subworkflow/launchplan/admin.go | 22 +- .../subworkflow/launchplan/admin_test.go | 37 +++ .../subworkflow/launchplan/launchplan.go | 10 + .../subworkflow/launchplan/mocks/Executor.go | 41 +++ .../nodes/subworkflow/launchplan/noop.go | 8 +- .../pkg/controller/workflow/executor_test.go | 24 +- 18 files changed, 647 insertions(+), 83 deletions(-) create mode 100644 flytepropeller/pkg/compiler/admin.go create mode 100644 flytepropeller/pkg/compiler/admin_test.go diff --git a/flytepropeller/go.mod b/flytepropeller/go.mod index 7078ede06..84edce2a9 100644 --- a/flytepropeller/go.mod +++ b/flytepropeller/go.mod @@ -4,22 +4,24 @@ go 1.13 require ( cloud.google.com/go v0.54.0 // indirect - github.com/Azure/azure-sdk-for-go v40.2.0+incompatible // indirect + github.com/Azure/azure-sdk-for-go v40.3.0+incompatible // indirect github.com/Azure/go-autorest/autorest v0.10.0 // indirect github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 - github.com/aws/aws-sdk-go v1.29.20 // indirect + github.com/aws/aws-sdk-go v1.29.23 // indirect github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/coreos/go-oidc v2.2.1+incompatible // indirect github.com/fatih/color v1.9.0 + github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible github.com/gogo/protobuf v1.3.1 - github.com/golang/protobuf v1.3.4 + github.com/golang/protobuf v1.3.5 github.com/google/uuid v1.1.1 github.com/graymeta/stow v0.2.5 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.2.0 - github.com/grpc-ecosystem/grpc-gateway v1.14.2 // indirect + github.com/grpc-ecosystem/grpc-gateway v1.14.3 // indirect github.com/imdario/mergo v0.3.8 // indirect + github.com/jmespath/go-jmespath v0.3.0 // indirect github.com/lyft/datacatalog v0.2.1 github.com/lyft/flyteidl v0.17.24 github.com/lyft/flyteplugins v0.3.20 @@ -34,11 +36,11 @@ require ( github.com/spf13/cobra v0.0.6 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.5.1 - golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073 // indirect + golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4 // indirect golang.org/x/time v0.0.0-20191024005414-555d28b269f0 gomodules.xyz/jsonpatch/v2 v2.1.0 // indirect - google.golang.org/genproto v0.0.0-20200309141739-5b75447e413d // indirect - google.golang.org/grpc v1.27.1 + google.golang.org/genproto v0.0.0-20200312145019-da6875a35672 // indirect + google.golang.org/grpc v1.28.0 gopkg.in/ini.v1 v1.54.0 // indirect k8s.io/api v0.17.3 k8s.io/apimachinery v0.17.3 @@ -46,7 +48,7 @@ require ( k8s.io/klog v1.0.0 k8s.io/kube-openapi v0.0.0-20200204173128-addea2498afe // indirect k8s.io/utils v0.0.0-20200229041039-0a110f9eb7ab // indirect - sigs.k8s.io/controller-runtime v0.5.0 + sigs.k8s.io/controller-runtime v0.5.1 ) // Pin the version of client-go to something that's compatible with katrogan's fork of api and apimachinery diff --git a/flytepropeller/go.sum b/flytepropeller/go.sum index 9ad866060..f1acc0b1d 100644 --- a/flytepropeller/go.sum +++ b/flytepropeller/go.sum @@ -27,8 +27,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7 github.com/Azure/azure-sdk-for-go v32.5.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go v38.2.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go v39.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= -github.com/Azure/azure-sdk-for-go v40.2.0+incompatible h1:JVGI3ws5ouCNwtQtwProxSff9uf2XiH/K8ydhuNhL1I= -github.com/Azure/azure-sdk-for-go v40.2.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= +github.com/Azure/azure-sdk-for-go v40.3.0+incompatible h1:NthZg3psrLxvQLN6rVm07pZ9mv2wvGNaBNGQ3fnPvLE= +github.com/Azure/azure-sdk-for-go v40.3.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI= github.com/Azure/go-autorest/autorest v0.9.4/go.mod h1:GsRuLYvwzLjjjRoWEIyMUaYq8GNUx2nRB378IPt/1p0= @@ -85,8 +85,8 @@ github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:l github.com/aws/aws-sdk-go v1.23.4/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.28.9/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.28.11/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= -github.com/aws/aws-sdk-go v1.29.20 h1:vAHJhARpdbdeJstTVaugeHgvVj5lBnfz3blbbD24gfo= -github.com/aws/aws-sdk-go v1.29.20/go.mod h1:1KvfttTE3SPKMpo8g2c6jL3ZKfXtFvKscTgahTma5Xg= +github.com/aws/aws-sdk-go v1.29.23 h1:wtiGLOzxAP755OfuVTDIy/NbUIYEDxbIbBEDfNhUpeU= +github.com/aws/aws-sdk-go v1.29.23/go.mod h1:1KvfttTE3SPKMpo8g2c6jL3ZKfXtFvKscTgahTma5Xg= github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 h1:VRtJdDi2lqc3MFwmouppm2jlm6icF+7H3WYKpLENMTo= github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1/go.mod h1:jvdWlw8vowVGnZqSDC7yhPd7AifQeQbRDkZcQXV2nRg= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -106,6 +106,7 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/coocood/freecache v1.1.0 h1:ENiHOsWdj1BrrlPwblhbn4GdAsMymK3pZORJ+bJGAjA= github.com/coocood/freecache v1.1.0/go.mod h1:ePwxCDzOYvARfHdr1pByNct1at3CoKnsipOHwKlNbzI= @@ -151,7 +152,9 @@ github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkg github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful v2.9.5+incompatible h1:spTtZBk5DYEvbxMVutUuTyh1Ao2r4iyvLdACqsl/Ljk= github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5/go.mod h1:a2zkGnVExMxdzMo3M0Hi/3sEU+cWnZpSni0O6/Yb/P0= github.com/ernesto-jimenez/gogen v0.0.0-20180125220232-d7d4131e6607/go.mod h1:Cg4fM0vhYWOZdgM7RIOSTRNIc8/VT7CXClC3Ni86lu4= @@ -166,6 +169,8 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.8-0.20191012010759-4bf2d1fec783 h1:SmsgwFZy9pdTk/k8BZz40D3P5umP5+Ejt3hAi0paBNQ= github.com/fsnotify/fsnotify v1.4.8-0.20191012010759-4bf2d1fec783/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -265,6 +270,8 @@ github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/protobuf v1.3.4 h1:87PNWwrRvUSnqS4dlcBU/ftvOIBep4sYuBLlh6rX2wk= github.com/golang/protobuf v1.3.4/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -317,8 +324,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.3.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpg github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.12.2/go.mod h1:8XEsbTttt/W+VvjtQhLACqCisSPWTxCZ7sBRjU6iH9c= -github.com/grpc-ecosystem/grpc-gateway v1.14.2 h1:SG3eXGmMVahaP4UtKsO/gPQpkovjXOmxXNd7sJlhxNs= -github.com/grpc-ecosystem/grpc-gateway v1.14.2/go.mod h1:6CwZWGDSPRJidgKAtJVvND6soZe6fT7iteq8wDPdhb0= +github.com/grpc-ecosystem/grpc-gateway v1.14.3 h1:OCJlWkOUoTnl0neNGlf4fUm3TmbEtguw7vR+nGtnDjY= +github.com/grpc-ecosystem/grpc-gateway v1.14.3/go.mod h1:6CwZWGDSPRJidgKAtJVvND6soZe6fT7iteq8wDPdhb0= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -343,6 +350,8 @@ github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkr github.com/jinzhu/now v1.0.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc= +github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/json-iterator/go v0.0.0-20180612202835-f2b4162afba3/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= @@ -378,30 +387,13 @@ github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f/go.mod h1:llRdnz github.com/lyft/datacatalog v0.2.1 h1:W7LsAjaS297iLCtSH9ZaAAG3YPofwkbbgIaqkfdeM0o= github.com/lyft/datacatalog v0.2.1/go.mod h1:ktrPvzTDUwHO5Lv0hLH38zLHnOJ++rGoAO0iQ/sIPJ4= github.com/lyft/flyteidl v0.17.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= -github.com/lyft/flyteidl v0.17.6 h1:O0qpT6ya45e/92+E84uGOYa0ZsaFoE5ZfPoyJ6e1bEQ= -github.com/lyft/flyteidl v0.17.6/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= -github.com/lyft/flyteidl v0.17.8 h1:/bZS1K3FO45EMamNrs4Eo6WYQf1TO5bNyNTIUO6cXM0= -github.com/lyft/flyteidl v0.17.8/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteidl v0.17.9 h1:JXT9PovHqS9V3YN74x9zWT0kvIEL48c2uNoujF1KMes= github.com/lyft/flyteidl v0.17.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteidl v0.17.24 h1:N5mmk2/0062VjbIeUXLHWVZwkxGW20RdZtshaea2nL0= github.com/lyft/flyteidl v0.17.24/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= -github.com/lyft/flyteplugins v0.3.10 h1:AwGv0TKl/RTOESY0+P5v9gyjO67LeHXHZqiMVaa+X4w= -github.com/lyft/flyteplugins v0.3.10/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= -github.com/lyft/flyteplugins v0.3.11 h1:E6BX5BU283BLMP48QJQsecqdxeLKLaiA+2+VVS8VYoc= -github.com/lyft/flyteplugins v0.3.11/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= -github.com/lyft/flyteplugins v0.3.12-0.20200318014325-ea4280769ab8/go.mod h1:FOSo04q4EheU6lm0oZFvfYAWgjrum/BDUK+mUT7qDFA= -github.com/lyft/flyteplugins v0.3.14-0.20200324043344-d4df89dada43 h1:hpyBrWo2HrEdNG5TNdU5+90D/T8wu6FmmLgpeyJRN30= -github.com/lyft/flyteplugins v0.3.14-0.20200324043344-d4df89dada43/go.mod h1:heTJLryE8EE4Vcd+W3EkQ3fyF41YyquavCLQv1YfnEA= -github.com/lyft/flyteplugins v0.3.15 h1:chDrm8maK3dCSy7UM8ElfmzTUBn1fiF7UnmP4px4sVI= -github.com/lyft/flyteplugins v0.3.15/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A= -github.com/lyft/flyteplugins v0.3.16 h1:RzfShN2HRFua9JHa5Z37dyXM3jorg3az8tEOItCXmO4= -github.com/lyft/flyteplugins v0.3.16/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A= github.com/lyft/flyteplugins v0.3.20 h1:PG63Z9kaLG67TyY/YyY4gu9vmQrsXxMNAtxNrxjmh5c= github.com/lyft/flyteplugins v0.3.20/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A= github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= -github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw= -github.com/lyft/flytestdlib v0.3.2/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= github.com/lyft/flytestdlib v0.3.3 h1:MkWXPkwQinh6MR3Yf5siZhmRSt9r4YmsF+5kvVVVedE= github.com/lyft/flytestdlib v0.3.3/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= github.com/lyft/spark-on-k8s-operator v0.1.3 h1:rmke8lR2Oy8mvKXRhloKuEu7fgGuXepDxiBNiorVUFI= @@ -624,8 +616,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200117160349-530e935923ad/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073 h1:xMPOj6Pz6UipU1wXLkrtqpHbR0AVFnyPEQq/wRWz9lM= -golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4 h1:QmwruyY+bKbDDL0BaglrbZABEali68eoMFhTZpCjYVA= +golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -728,6 +720,7 @@ golang.org/x/sys v0.0.0-20190616124812-15dcb6c0061f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -852,8 +845,8 @@ google.golang.org/genproto v0.0.0-20200212174721-66ed5ce911ce h1:1mbrb1tUU+Zmt5C google.golang.org/genproto v0.0.0-20200212174721-66ed5ce911ce/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200224152610-e50cd9704f63/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200305110556-506484158171/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= -google.golang.org/genproto v0.0.0-20200309141739-5b75447e413d h1:VQ0pz4dAUaMWcQLM7tGY8Nk691kSrlGPyF5nSgAIw2g= -google.golang.org/genproto v0.0.0-20200309141739-5b75447e413d/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= +google.golang.org/genproto v0.0.0-20200312145019-da6875a35672 h1:jiDSspVssiikoRPFHT6pYrL+CL6/yIc3b9AuHO/4xik= +google.golang.org/genproto v0.0.0-20200312145019-da6875a35672/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= @@ -862,10 +855,13 @@ google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ij google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.27.1 h1:zvIju4sqAGvwKspUQOhwnpcqSbzi7/H6QomNNjTL4sk= google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.28.0 h1:bO/TA4OxCOummhSf10siHuG7vJOiwh7SpRpFZDkOgl4= +google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -895,6 +891,7 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= @@ -943,8 +940,8 @@ rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8 rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= sigs.k8s.io/controller-runtime v0.4.0/go.mod h1:ApC79lpY3PHW9xj/w9pj+lYkLgwAAUZwfXkME1Lajns= -sigs.k8s.io/controller-runtime v0.5.0 h1:CbqIy5fbUX+4E9bpnBFd204YAzRYlM9SWW77BbrcDQo= -sigs.k8s.io/controller-runtime v0.5.0/go.mod h1:REiJzC7Y00U+2YkMbT8wxgrsX5USpXKGhb2sCtAXiT8= +sigs.k8s.io/controller-runtime v0.5.1 h1:TNidCfVoU/cs2i+9xoTcL/l7yhl0bDhYXU0NCG6wmiE= +sigs.k8s.io/controller-runtime v0.5.1/go.mod h1:Uojny7gvg55YLQnEGnPzRE3dC4ik2tRlZJgOUCWXAV4= sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e/go.mod h1:wWxsB5ozmmv/SG7nM11ayaAW51xMvak/t1r0CSlcokI= sigs.k8s.io/structured-merge-diff v0.0.0-20190817042607-6149e4549fca/go.mod h1:IIgPezJWb76P0hotTxzDbWsMYB8APh18qZnxkomBpxA= sigs.k8s.io/structured-merge-diff v1.0.1-0.20191108220359-b1b620dd3f06/go.mod h1:/ULNhyfzRopfcjskuui0cTITekDduZ7ycKN3oUT9R18= diff --git a/flytepropeller/pkg/compiler/admin.go b/flytepropeller/pkg/compiler/admin.go new file mode 100644 index 000000000..cd50a9524 --- /dev/null +++ b/flytepropeller/pkg/compiler/admin.go @@ -0,0 +1,36 @@ +package compiler + +import ( + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" +) + +// This object is meant to satisfy github.com/lyft/flytepropeller/pkg/compiler/common.InterfaceProvider +// This file is pretty much copied from Admin, (sorry for the link, a real link made go mod import admin) +// github-dot-com/lyft/flyteadmin/blob/1acce744b8c7839ab77a0eb1ed922905af15baa5/pkg/workflowengine/impl/interface_provider.go +// but that implementation relies on the internal Admin Gorm model. We should consider deprecating that one in favor +// of this one as Admin already has a dependency on the Propeller compiler. +type LaunchPlanInterfaceProvider struct { + expectedInputs core.ParameterMap + expectedOutputs core.VariableMap + identifier *core.Identifier +} + +func (p *LaunchPlanInterfaceProvider) GetID() *core.Identifier { + return p.identifier +} +func (p *LaunchPlanInterfaceProvider) GetExpectedInputs() *core.ParameterMap { + return &p.expectedInputs + +} +func (p *LaunchPlanInterfaceProvider) GetExpectedOutputs() *core.VariableMap { + return &p.expectedOutputs +} + +func NewLaunchPlanInterfaceProvider(launchPlan admin.LaunchPlan) *LaunchPlanInterfaceProvider { + return &LaunchPlanInterfaceProvider{ + expectedInputs: *launchPlan.Closure.ExpectedInputs, + expectedOutputs: *launchPlan.Closure.ExpectedOutputs, + identifier: launchPlan.Id, + } +} diff --git a/flytepropeller/pkg/compiler/admin_test.go b/flytepropeller/pkg/compiler/admin_test.go new file mode 100644 index 000000000..140b24254 --- /dev/null +++ b/flytepropeller/pkg/compiler/admin_test.go @@ -0,0 +1,72 @@ +package compiler + +import ( + "testing" + + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/lyft/flytepropeller/pkg/utils" + "github.com/stretchr/testify/assert" +) + +var launchPlanIdentifier = core.Identifier{ + ResourceType: core.ResourceType_LAUNCH_PLAN, + Project: "project", + Domain: "domain", + Name: "name", + Version: "version", +} + +var inputs = core.ParameterMap{ + Parameters: map[string]*core.Parameter{ + "foo": { + Var: &core.Variable{ + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRING}}, + }, + Behavior: &core.Parameter_Default{ + Default: utils.MustMakeLiteral("foo-value"), + }, + }, + }, +} +var outputs = core.VariableMap{ + Variables: map[string]*core.Variable{ + "foo": { + Type: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRING}}, + }, + }, +} + +func getDummyLaunchPlan() admin.LaunchPlan { + launchPlanClosure := admin.LaunchPlanClosure{ + ExpectedInputs: &inputs, + ExpectedOutputs: &outputs, + } + return admin.LaunchPlan{ + Id: &launchPlanIdentifier, + Spec: nil, + Closure: &launchPlanClosure, + } +} + +func TestGetId(t *testing.T) { + launchPlan := getDummyLaunchPlan() + provider := NewLaunchPlanInterfaceProvider(launchPlan) + assert.Equal(t, &core.Identifier{ResourceType: 3, Project: "project", Domain: "domain", Name: "name", Version: "version"}, provider.GetID()) +} + +func TestGetExpectedInputs(t *testing.T) { + launchPlan := getDummyLaunchPlan() + provider := NewLaunchPlanInterfaceProvider(launchPlan) + assert.Contains(t, (*provider.GetExpectedInputs()).Parameters, "foo") + assert.NotNil(t, (*provider.GetExpectedInputs()).Parameters["foo"].Var.Type.GetSimple()) + assert.EqualValues(t, "STRING", (*provider.GetExpectedInputs()).Parameters["foo"].Var.Type.GetSimple().String()) + assert.NotNil(t, (*provider.GetExpectedInputs()).Parameters["foo"].GetDefault()) +} + +func TestGetExpectedOutputs(t *testing.T) { + launchPlan := getDummyLaunchPlan() + provider := NewLaunchPlanInterfaceProvider(launchPlan) + assert.EqualValues(t, outputs.Variables["foo"].GetType().GetType(), + provider.GetExpectedOutputs().Variables["foo"].GetType().GetType()) +} diff --git a/flytepropeller/pkg/compiler/workflow_compiler.go b/flytepropeller/pkg/compiler/workflow_compiler.go index e6add4b34..c8c8ddfb4 100755 --- a/flytepropeller/pkg/compiler/workflow_compiler.go +++ b/flytepropeller/pkg/compiler/workflow_compiler.go @@ -288,7 +288,7 @@ func CompileWorkflow(primaryWf *core.WorkflowTemplate, subworkflows []*core.Work compiledWf := &core.CompiledWorkflow{Template: wf} - gb := newWorfklowBuilder(compiledWf, wfIndex, c.NewTaskIndex(taskBuilders...), toInterfaceProviderMap(launchPlans)) + gb := newWorkflowBuilder(compiledWf, wfIndex, c.NewTaskIndex(taskBuilders...), toInterfaceProviderMap(launchPlans)) // Terminate early if there are some required component not present. if !gb.validateAllRequirements(errs.NewScope()) { return nil, errs @@ -312,10 +312,10 @@ func CompileWorkflow(primaryWf *core.WorkflowTemplate, subworkflows []*core.Work } func (w workflowBuilder) newWorkflowBuilder(fg *flyteWorkflow) workflowBuilder { - return newWorfklowBuilder(fg, w.allSubWorkflows, w.allTasks, w.allLaunchPlans) + return newWorkflowBuilder(fg, w.allSubWorkflows, w.allTasks, w.allLaunchPlans) } -func newWorfklowBuilder(fg *flyteWorkflow, wfIndex c.WorkflowIndex, tasks c.TaskIndex, +func newWorkflowBuilder(fg *flyteWorkflow, wfIndex c.WorkflowIndex, tasks c.TaskIndex, workflows map[string]c.InterfaceProvider) workflowBuilder { return workflowBuilder{ diff --git a/flytepropeller/pkg/compiler/workflow_compiler_test.go b/flytepropeller/pkg/compiler/workflow_compiler_test.go index 1cb54511c..259f8ec1b 100755 --- a/flytepropeller/pkg/compiler/workflow_compiler_test.go +++ b/flytepropeller/pkg/compiler/workflow_compiler_test.go @@ -471,7 +471,7 @@ func TestValidateUnderlyingInterface(parentT *testing.T) { } } - g := newWorfklowBuilder( + g := newWorkflowBuilder( &core.CompiledWorkflow{Template: inputWorkflow}, mustBuildWorkflowIndex(inputWorkflow), common.NewTaskIndex(compiledTasks...), diff --git a/flytepropeller/pkg/controller/controller.go b/flytepropeller/pkg/controller/controller.go index c3ec4d604..61996bded 100644 --- a/flytepropeller/pkg/controller/controller.go +++ b/flytepropeller/pkg/controller/controller.go @@ -199,26 +199,26 @@ func newK8sEventRecorder(ctx context.Context, kubeclientset kubernetes.Interface func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Interface, flytepropellerClientset clientset.Interface, flyteworkflowInformerFactory informers.SharedInformerFactory, kubeClient executors.Client, scope promutils.Scope) (*Controller, error) { - var wfLauncher launchplan.Executor + var launchPlanActor launchplan.FlyteAdmin if cfg.EnableAdminLauncher { adminClient, err := admin.InitializeAdminClientFromConfig(ctx) if err != nil { logger.Errorf(ctx, "failed to initialize Admin client, err :%s", err.Error()) return nil, err } - wfLauncher, err = launchplan.NewAdminLaunchPlanExecutor(ctx, adminClient, cfg.DownstreamEval.Duration, + launchPlanActor, err = launchplan.NewAdminLaunchPlanExecutor(ctx, adminClient, cfg.DownstreamEval.Duration, launchplan.GetAdminConfig(), scope.NewSubScope("admin_launcher")) if err != nil { logger.Errorf(ctx, "failed to create Admin workflow Launcher, err: %v", err.Error()) return nil, err } - if err := wfLauncher.Initialize(ctx); err != nil { + if err := launchPlanActor.Initialize(ctx); err != nil { logger.Errorf(ctx, "failed to initialize Admin workflow Launcher, err: %v", err.Error()) return nil, err } } else { - wfLauncher = launchplan.NewFailFastLaunchPlanExecutor() + launchPlanActor = launchplan.NewFailFastLaunchPlanExecutor() } logger.Info(ctx, "Setting up event sink and recorder") @@ -298,7 +298,8 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter return nil, stdErrs.Wrapf(errors3.CausedByError, err, "failed to initialize workflow store") } - nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink, wfLauncher, cfg.MaxDatasetSizeBytes, + nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink, + launchPlanActor, launchPlanActor, cfg.MaxDatasetSizeBytes, storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient, catalogClient, scope) if err != nil { return nil, errors.Wrapf(err, "Failed to create Controller.") diff --git a/flytepropeller/pkg/controller/nodes/dynamic/handler.go b/flytepropeller/pkg/controller/nodes/dynamic/handler.go index 853e0c974..9ac9e4059 100644 --- a/flytepropeller/pkg/controller/nodes/dynamic/handler.go +++ b/flytepropeller/pkg/controller/nodes/dynamic/handler.go @@ -6,6 +6,8 @@ import ( "strconv" "time" + "github.com/lyft/flytepropeller/pkg/controller/nodes/subworkflow/launchplan" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" pluginCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" @@ -60,6 +62,7 @@ type dynamicNodeTaskNodeHandler struct { TaskNodeHandler metrics metrics nodeExecutor executors.Node + lpReader launchplan.Reader } func (d dynamicNodeTaskNodeHandler) handleParentNode(ctx context.Context, prevState handler.DynamicNodeState, nCtx handler.NodeExecutionContext) (handler.Transition, handler.DynamicNodeState, error) { @@ -377,8 +380,23 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C return nil, true, err } - // TODO: This will currently fail if the WF references any launch plans - closure, err = compiler.CompileWorkflow(wf, djSpec.Subworkflows, compiledTasks, []common2.InterfaceProvider{}) + // Get the requirements, that is, a list of all the task IDs and the launch plan IDs that will be called as part of this dynamic task. + // The definition of these will need to be fetched from Admin (in order to get the interface). + requirements, err := compiler.GetRequirements(wf, djSpec.Subworkflows) + if err != nil { + return nil, true, err + } + + launchPlanInterfaces, err := d.getLaunchPlanInterfaces(ctx, requirements.GetRequiredLaunchPlanIds()) + if err != nil { + return nil, true, err + } + + // TODO: In addition to querying Admin for launch plans, we also need to get all the tasks that are missing from the dynamic job spec. + // The reason they might be missing is because if a user yields a task that is SdkTask.fetch'ed, it should not be included + // See https://github.com/lyft/flyte/issues/219 for more information. + + closure, err = compiler.CompileWorkflow(wf, djSpec.Subworkflows, compiledTasks, launchPlanInterfaces) if err != nil { return nil, true, err } @@ -395,6 +413,22 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C return newContextualWorkflow(nCtx.Workflow(), subwf, nStatus, subwf.Tasks, subwf.SubWorkflows, nCtx.DataStore()), true, nil } +func (d dynamicNodeTaskNodeHandler) getLaunchPlanInterfaces(ctx context.Context, launchPlanIDs []compiler.LaunchPlanRefIdentifier) ( + []common2.InterfaceProvider, error) { + + var launchPlanInterfaces = make([]common2.InterfaceProvider, len(launchPlanIDs)) + for idx, id := range launchPlanIDs { + lp, err := d.lpReader.GetLaunchPlan(ctx, &id) + if err != nil { + logger.Debugf(ctx, "Error fetching launch plan definition from admin") + return nil, err + } + launchPlanInterfaces[idx] = compiler.NewLaunchPlanInterfaceProvider(*lp) + } + + return launchPlanInterfaces, nil +} + func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context, dynamicWorkflow v1alpha1.ExecutableWorkflow, nCtx handler.NodeExecutionContext, prevState handler.DynamicNodeState) (handler.Transition, handler.DynamicNodeState, error) { @@ -470,11 +504,12 @@ func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context, return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)), prevState, nil } -func New(underlying TaskNodeHandler, nodeExecutor executors.Node, scope promutils.Scope) handler.Node { +func New(underlying TaskNodeHandler, nodeExecutor executors.Node, launchPlanReader launchplan.Reader, scope promutils.Scope) handler.Node { return &dynamicNodeTaskNodeHandler{ TaskNodeHandler: underlying, metrics: newMetrics(scope), nodeExecutor: nodeExecutor, + lpReader: launchPlanReader, } } diff --git a/flytepropeller/pkg/controller/nodes/dynamic/handler_test.go b/flytepropeller/pkg/controller/nodes/dynamic/handler_test.go index b7b2ca453..28a8bbb00 100644 --- a/flytepropeller/pkg/controller/nodes/dynamic/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/dynamic/handler_test.go @@ -5,6 +5,8 @@ import ( "fmt" "testing" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/lyft/flytestdlib/contextutils" "github.com/lyft/flytestdlib/promutils/labeled" @@ -19,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/types" ioMocks "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io/mocks" + lpMocks "github.com/lyft/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/mocks" "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" flyteMocks "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks" @@ -168,13 +171,14 @@ func Test_dynamicNodeHandler_Handle_Parent(t *testing.T) { assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, dj)) } h := &mocks.TaskNodeHandler{} + mockLPLauncher := &lpMocks.Executor{} n := &executorMocks.Node{} if tt.args.isErr { h.OnHandleMatch(mock.Anything, mock.Anything).Return(handler.UnknownTransition, fmt.Errorf("error")) } else { h.OnHandleMatch(mock.Anything, mock.Anything).Return(tt.args.trns, nil) } - d := New(h, n, promutils.NewTestScope()) + d := New(h, n, mockLPLauncher, promutils.NewTestScope()) got, err := d.Handle(context.TODO(), nCtx) if (err != nil) != tt.want.isErr { t.Errorf("Handle() error = %v, wantErr %v", err, tt.want.isErr) @@ -281,11 +285,12 @@ func Test_dynamicNodeHandler_Handle_ParentFinalize(t *testing.T) { f, err := nCtx.DataStore().ConstructReference(context.TODO(), nCtx.NodeStatus().GetDataDir(), "futures.pb") assert.NoError(t, err) dj := &core.DynamicJobSpec{} + mockLPLauncher := &lpMocks.Executor{} n := &executorMocks.Node{} assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, dj)) h := &mocks.TaskNodeHandler{} h.OnFinalizeMatch(mock.Anything, mock.Anything).Return(nil) - d := New(h, n, promutils.NewTestScope()) + d := New(h, n, mockLPLauncher, promutils.NewTestScope()) got, err := d.Handle(context.TODO(), nCtx) assert.NoError(t, err) assert.Equal(t, handler.EPhaseRunning.String(), got.Info().GetPhase().String()) @@ -300,11 +305,12 @@ func Test_dynamicNodeHandler_Handle_ParentFinalize(t *testing.T) { f, err := nCtx.DataStore().ConstructReference(context.TODO(), nCtx.NodeStatus().GetDataDir(), "futures.pb") assert.NoError(t, err) dj := &core.DynamicJobSpec{} + mockLPLauncher := &lpMocks.Executor{} n := &executorMocks.Node{} assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, dj)) h := &mocks.TaskNodeHandler{} h.OnFinalizeMatch(mock.Anything, mock.Anything).Return(fmt.Errorf("err")) - d := New(h, n, promutils.NewTestScope()) + d := New(h, n, mockLPLauncher, promutils.NewTestScope()) _, err = d.Handle(context.TODO(), nCtx) assert.Error(t, err) }) @@ -533,6 +539,7 @@ func Test_dynamicNodeHandler_Handle_SubTask(t *testing.T) { if tt.args.dj != nil { assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, tt.args.dj)) } + mockLPLauncher := &lpMocks.Executor{} h := &mocks.TaskNodeHandler{} if tt.args.validErr != nil { h.OnValidateOutputAndCacheAddMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tt.args.validErr, nil) @@ -549,7 +556,7 @@ func Test_dynamicNodeHandler_Handle_SubTask(t *testing.T) { endF := v1alpha1.GetOutputsFile("end-node") assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), endF, storage.Options{}, &core.LiteralMap{})) } - d := New(h, n, promutils.NewTestScope()) + d := New(h, n, mockLPLauncher, promutils.NewTestScope()) got, err := d.Handle(context.TODO(), nCtx) if tt.want.isErr { assert.Error(t, err) @@ -564,6 +571,264 @@ func Test_dynamicNodeHandler_Handle_SubTask(t *testing.T) { } } +func createDynamicJobSpecWithLaunchPlans() *core.DynamicJobSpec { + return &core.DynamicJobSpec{ + MinSuccesses: 1, + Tasks: []*core.TaskTemplate{}, + Nodes: []*core.Node{ + { + Id: "Node_1", + Target: &core.Node_WorkflowNode{ + WorkflowNode: &core.WorkflowNode{ + Reference: &core.WorkflowNode_LaunchplanRef{ + LaunchplanRef: &core.Identifier{ + ResourceType: core.ResourceType_LAUNCH_PLAN, + Name: "my_plan", + Project: "p", + Domain: "d", + }, + }, + }, + }, + }, + }, + Outputs: []*core.Binding{ + { + Var: "x", + Binding: &core.BindingData{ + Value: &core.BindingData_Promise{ + Promise: &core.OutputReference{ + Var: "x", + NodeId: "Node_1", + }, + }, + }, + }, + }, + } +} + +func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *testing.T) { + createNodeContext := func(ttype string, finalOutput storage.DataReference) *nodeMocks.NodeExecutionContext { + ctx := context.TODO() + + wfExecID := &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", + } + + nm := &nodeMocks.NodeExecutionMetadata{} + nm.On("GetAnnotations").Return(map[string]string{}) + nm.On("GetExecutionID").Return(v1alpha1.WorkflowExecutionIdentifier{ + WorkflowExecutionIdentifier: wfExecID, + }) + nm.On("GetK8sServiceAccount").Return("service-account") + nm.On("GetLabels").Return(map[string]string{}) + nm.On("GetNamespace").Return("namespace") + nm.On("GetOwnerID").Return(types.NamespacedName{Namespace: "namespace", Name: "name"}) + nm.On("GetOwnerReference").Return(v1.OwnerReference{ + Kind: "sample", + Name: "name", + }) + + taskID := &core.Identifier{} + tk := &core.TaskTemplate{ + Id: taskID, + Type: "test", + Metadata: &core.TaskMetadata{ + Discoverable: true, + }, + Interface: &core.TypedInterface{ + Outputs: &core.VariableMap{ + Variables: map[string]*core.Variable{ + "x": { + Type: &core.LiteralType{ + Type: &core.LiteralType_Simple{ + Simple: core.SimpleType_INTEGER, + }, + }, + }, + }, + }, + }, + } + tr := &nodeMocks.TaskReader{} + tr.On("GetTaskID").Return(taskID) + tr.On("GetTaskType").Return(ttype) + tr.On("Read", mock.Anything).Return(tk, nil) + + n := &flyteMocks.ExecutableNode{} + tID := "dyn-task-1" + n.On("GetTaskID").Return(&tID) + + dataStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) + assert.NoError(t, err) + + ir := &ioMocks.InputReader{} + nCtx := &nodeMocks.NodeExecutionContext{} + nCtx.On("NodeExecutionMetadata").Return(nm) + nCtx.On("Node").Return(n) + nCtx.On("InputReader").Return(ir) + nCtx.On("DataReferenceConstructor").Return(storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())) + nCtx.On("CurrentAttempt").Return(uint32(1)) + nCtx.On("TaskReader").Return(tr) + nCtx.On("MaxDatasetSizeBytes").Return(int64(1)) + nCtx.On("NodeID").Return("n1") + nCtx.On("EnqueueOwnerFunc").Return(func() error { return nil }) + nCtx.OnDataStore().Return(dataStore) + + endNodeStatus := &flyteMocks.ExecutableNodeStatus{} + endNodeStatus.On("GetDataDir").Return(storage.DataReference("end-node")) + endNodeStatus.On("GetOutputDir").Return(storage.DataReference("end-node")) + + subNs := &flyteMocks.ExecutableNodeStatus{} + subNs.On("SetDataDir", mock.Anything).Return() + subNs.On("SetOutputDir", mock.Anything).Return() + subNs.On("ResetDirty").Return() + subNs.On("GetOutputDir").Return(finalOutput) + subNs.On("SetParentTaskID", mock.Anything).Return() + subNs.OnGetAttempts().Return(0) + + dynamicNS := &flyteMocks.ExecutableNodeStatus{} + dynamicNS.On("SetDataDir", mock.Anything).Return() + dynamicNS.On("SetOutputDir", mock.Anything).Return() + dynamicNS.On("SetParentTaskID", mock.Anything).Return() + dynamicNS.OnGetNodeExecutionStatus(ctx, "n1-1-Node_1").Return(subNs) + dynamicNS.OnGetNodeExecutionStatus(ctx, v1alpha1.EndNodeID).Return(endNodeStatus) + + ns := &flyteMocks.ExecutableNodeStatus{} + ns.On("GetDataDir").Return(storage.DataReference("data-dir")) + ns.On("GetOutputDir").Return(storage.DataReference("output-dir")) + ns.On("GetNodeExecutionStatus", dynamicNodeID).Return(dynamicNS) + ns.OnGetNodeExecutionStatus(ctx, dynamicNodeID).Return(dynamicNS) + nCtx.On("NodeStatus").Return(ns) + + w := &flyteMocks.ExecutableWorkflow{} + ws := &flyteMocks.ExecutableWorkflowStatus{} + ws.OnGetNodeExecutionStatus(ctx, "n1").Return(ns) + w.On("GetExecutionStatus").Return(ws) + nCtx.On("Workflow").Return(w) + + r := &nodeMocks.NodeStateReader{} + r.On("GetDynamicNodeState").Return(handler.DynamicNodeState{ + Phase: v1alpha1.DynamicNodePhaseExecuting, + }) + nCtx.On("NodeStateReader").Return(r) + return nCtx + } + + t.Run("launch plan interfaces match parent task interface", func(t *testing.T) { + ctx := context.Background() + lpID := &core.Identifier{ + ResourceType: core.ResourceType_LAUNCH_PLAN, + Name: "my_plan", + Project: "p", + Domain: "d", + } + djSpec := createDynamicJobSpecWithLaunchPlans() + finalOutput := storage.DataReference("/subnode") + nCtx := createNodeContext("test", finalOutput) + s := &dynamicNodeStateHolder{} + nCtx.On("NodeStateWriter").Return(s) + f, err := nCtx.DataStore().ConstructReference(ctx, nCtx.NodeStatus().GetOutputDir(), "futures.pb") + assert.NoError(t, err) + assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, djSpec)) + + mockLPLauncher := &lpMocks.Executor{} + var callsAdmin = false + mockLPLauncher.OnGetLaunchPlanMatch(ctx, lpID).Run(func(args mock.Arguments) { + // When a launch plan node is detected, a call should be made to Admin to fetch the interface for the LP + callsAdmin = true + }).Return(&admin.LaunchPlan{ + Id: lpID, + Closure: &admin.LaunchPlanClosure{ + ExpectedInputs: &core.ParameterMap{}, + ExpectedOutputs: &core.VariableMap{ + Variables: map[string]*core.Variable{ + "x": { + Type: &core.LiteralType{ + Type: &core.LiteralType_Simple{ + Simple: core.SimpleType_INTEGER, + }, + }, + Description: "output of the launch plan", + }, + }, + }, + }, + }, nil) + h := &mocks.TaskNodeHandler{} + n := &executorMocks.Node{} + d := dynamicNodeTaskNodeHandler{ + TaskNodeHandler: h, + nodeExecutor: n, + lpReader: mockLPLauncher, + metrics: newMetrics(promutils.NewTestScope()), + } + executableWorkflow, isDynamic, err := d.buildContextualDynamicWorkflow(ctx, nCtx) + assert.True(t, callsAdmin) + assert.True(t, isDynamic) + assert.NoError(t, err) + assert.NotNil(t, executableWorkflow) + }) + + t.Run("launch plan interfaces do not parent task interface", func(t *testing.T) { + ctx := context.Background() + lpID := &core.Identifier{ + ResourceType: core.ResourceType_LAUNCH_PLAN, + Name: "my_plan", + Project: "p", + Domain: "d", + } + djSpec := createDynamicJobSpecWithLaunchPlans() + finalOutput := storage.DataReference("/subnode") + nCtx := createNodeContext("test", finalOutput) + s := &dynamicNodeStateHolder{} + nCtx.On("NodeStateWriter").Return(s) + f, err := nCtx.DataStore().ConstructReference(ctx, nCtx.NodeStatus().GetOutputDir(), "futures.pb") + assert.NoError(t, err) + assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, djSpec)) + + mockLPLauncher := &lpMocks.Executor{} + var callsAdmin = false + mockLPLauncher.OnGetLaunchPlanMatch(ctx, lpID).Run(func(args mock.Arguments) { + // When a launch plan node is detected, a call should be made to Admin to fetch the interface for the LP + callsAdmin = true + }).Return(&admin.LaunchPlan{ + Id: lpID, + Closure: &admin.LaunchPlanClosure{ + ExpectedInputs: &core.ParameterMap{}, + ExpectedOutputs: &core.VariableMap{ + Variables: map[string]*core.Variable{ + "d": { + Type: &core.LiteralType{ + Type: &core.LiteralType_Simple{ + Simple: core.SimpleType_STRING, + }, + }, + Description: "output of the launch plan", + }, + }, + }, + }, + }, nil) + h := &mocks.TaskNodeHandler{} + n := &executorMocks.Node{} + d := dynamicNodeTaskNodeHandler{ + TaskNodeHandler: h, + nodeExecutor: n, + lpReader: mockLPLauncher, + metrics: newMetrics(promutils.NewTestScope()), + } + executableWorkflow, isDynamic, err := d.buildContextualDynamicWorkflow(ctx, nCtx) + assert.True(t, callsAdmin) + assert.True(t, isDynamic) + assert.Error(t, err) + assert.Nil(t, executableWorkflow) + }) +} + func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) { ctx := context.TODO() @@ -578,10 +843,11 @@ func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) { nCtx.OnNodeStateReader().Return(sr) nCtx.OnCurrentAttempt().Return(0) + mockLPLauncher := &lpMocks.Executor{} h := &mocks.TaskNodeHandler{} h.OnFinalize(ctx, nCtx).Return(nil) n := &executorMocks.Node{} - d := New(h, n, promutils.NewTestScope()) + d := New(h, n, mockLPLauncher, promutils.NewTestScope()) assert.NoError(t, d.Finalize(ctx, nCtx)) assert.NotZero(t, len(h.ExpectedCalls)) assert.Equal(t, "Finalize", h.ExpectedCalls[0].Method) @@ -706,11 +972,12 @@ func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) { dj := createDynamicJobSpec() assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, dj)) + mockLPLauncher := &lpMocks.Executor{} h := &mocks.TaskNodeHandler{} h.OnFinalize(ctx, nCtx).Return(nil) n := &executorMocks.Node{} n.OnFinalizeHandlerMatch(ctx, mock.Anything, mock.Anything).Return(nil) - d := New(h, n, promutils.NewTestScope()) + d := New(h, n, mockLPLauncher, promutils.NewTestScope()) assert.NoError(t, d.Finalize(ctx, nCtx)) assert.NotZero(t, len(h.ExpectedCalls)) assert.Equal(t, "Finalize", h.ExpectedCalls[0].Method) @@ -726,11 +993,12 @@ func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) { dj := createDynamicJobSpec() assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, dj)) + mockLPLauncher := &lpMocks.Executor{} h := &mocks.TaskNodeHandler{} h.OnFinalize(ctx, nCtx).Return(fmt.Errorf("err")) n := &executorMocks.Node{} n.OnFinalizeHandlerMatch(ctx, mock.Anything, mock.Anything).Return(nil) - d := New(h, n, promutils.NewTestScope()) + d := New(h, n, mockLPLauncher, promutils.NewTestScope()) assert.Error(t, d.Finalize(ctx, nCtx)) assert.NotZero(t, len(h.ExpectedCalls)) assert.Equal(t, "Finalize", h.ExpectedCalls[0].Method) @@ -746,11 +1014,12 @@ func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) { dj := createDynamicJobSpec() assert.NoError(t, nCtx.DataStore().WriteProtobuf(context.TODO(), f, storage.Options{}, dj)) + mockLPLauncher := &lpMocks.Executor{} h := &mocks.TaskNodeHandler{} h.OnFinalize(ctx, nCtx).Return(nil) n := &executorMocks.Node{} n.OnFinalizeHandlerMatch(ctx, mock.Anything, mock.Anything).Return(fmt.Errorf("err")) - d := New(h, n, promutils.NewTestScope()) + d := New(h, n, mockLPLauncher, promutils.NewTestScope()) assert.Error(t, d.Finalize(ctx, nCtx)) assert.NotZero(t, len(h.ExpectedCalls)) assert.Equal(t, "Finalize", h.ExpectedCalls[0].Method) diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index a1d7d1a7b..317381ad1 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -740,7 +740,9 @@ func (c *nodeExecutor) Initialize(ctx context.Context) error { } func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink, - workflowLauncher launchplan.Executor, maxDatasetSize int64, defaultRawOutputPrefix storage.DataReference, kubeClient executors.Client, catalogClient catalog.Client, scope promutils.Scope) (executors.Node, error) { + workflowLauncher launchplan.Executor, launchPlanReader launchplan.Reader, maxDatasetSize int64, + defaultRawOutputPrefix storage.DataReference, kubeClient executors.Client, + catalogClient catalog.Client, scope promutils.Scope) (executors.Node, error) { // TODO we may want to make this configurable. shardSelector, err := ioutils.NewBase36PrefixShardSelector(ctx) @@ -779,7 +781,7 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora defaultDataSandbox: defaultRawOutputPrefix, shardSelector: shardSelector, } - nodeHandlerFactory, err := NewHandlerFactory(ctx, exec, workflowLauncher, kubeClient, catalogClient, nodeScope) + nodeHandlerFactory, err := NewHandlerFactory(ctx, exec, workflowLauncher, launchPlanReader, kubeClient, catalogClient, nodeScope) exec.nodeHandlerFactory = nodeHandlerFactory return exec, err } diff --git a/flytepropeller/pkg/controller/nodes/executor_test.go b/flytepropeller/pkg/controller/nodes/executor_test.go index 7c4043107..574c958f6 100644 --- a/flytepropeller/pkg/controller/nodes/executor_test.go +++ b/flytepropeller/pkg/controller/nodes/executor_test.go @@ -45,7 +45,9 @@ func TestSetInputsForStartNode(t *testing.T) { mockStorage := createInmemoryDataStore(t, testScope.NewSubScope("f")) enQWf := func(workflowID v1alpha1.WorkflowID) {} - exec, err := NewExecutor(ctx, config.GetConfig().NodeConfig, mockStorage, enQWf, events.NewMockEventSink(), launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket/", fakeKubeClient, catalogClient, promutils.NewTestScope()) + adminClient := launchplan.NewFailFastLaunchPlanExecutor() + exec, err := NewExecutor(ctx, config.GetConfig().NodeConfig, mockStorage, enQWf, events.NewMockEventSink(), adminClient, + adminClient, 10, "s3://bucket/", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) inputs := &core.LiteralMap{ Literals: map[string]*core.Literal{ @@ -91,7 +93,8 @@ func TestSetInputsForStartNode(t *testing.T) { }) failStorage := createFailingDatastore(t, testScope.NewSubScope("failing")) - execFail, err := NewExecutor(ctx, config.GetConfig().NodeConfig, failStorage, enQWf, events.NewMockEventSink(), launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) + execFail, err := NewExecutor(ctx, config.GetConfig().NodeConfig, failStorage, enQWf, events.NewMockEventSink(), adminClient, + adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) t.Run("StorageFailure", func(t *testing.T) { w := createDummyBaseWorkflow(mockStorage) @@ -113,9 +116,11 @@ func TestNodeExecutor_Initialize(t *testing.T) { mockEventSink := events.NewMockEventSink().(*events.MockEventSink) memStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) assert.NoError(t, err) + adminClient := launchplan.NewFailFastLaunchPlanExecutor() t.Run("happy", func(t *testing.T) { - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, memStore, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, memStore, enQWf, mockEventSink, adminClient, + adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -128,7 +133,8 @@ func TestNodeExecutor_Initialize(t *testing.T) { }) t.Run("error", func(t *testing.T) { - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, memStore, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, memStore, enQWf, mockEventSink, adminClient, + adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -149,7 +155,9 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseStartNodes(t *testing.T) { store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) + adminClient := launchplan.NewFailFastLaunchPlanExecutor() + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, + 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -247,7 +255,9 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseEndNode(t *testing.T) { store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) + adminClient := launchplan.NewFailFastLaunchPlanExecutor() + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, + 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -603,7 +613,9 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { startNode := mockWf.StartNode() store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) + adminClient := launchplan.NewFailFastLaunchPlanExecutor() + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, + adminClient, adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) exec.nodeHandlerFactory = hf @@ -675,7 +687,9 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { hf := &mocks2.HandlerFactory{} store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) + adminClient := launchplan.NewFailFastLaunchPlanExecutor() + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, + adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) exec.nodeHandlerFactory = hf @@ -773,7 +787,9 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { t.Run(test.name, func(t *testing.T) { hf := &mocks2.HandlerFactory{} store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) + adminClient := launchplan.NewFailFastLaunchPlanExecutor() + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, + adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) exec.nodeHandlerFactory = hf @@ -823,7 +839,9 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { t.Run("retries-exhausted", func(t *testing.T) { hf := &mocks2.HandlerFactory{} store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) + adminClient := launchplan.NewFailFastLaunchPlanExecutor() + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, + adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) exec.nodeHandlerFactory = hf @@ -850,7 +868,9 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { t.Run("retries-remaining", func(t *testing.T) { hf := &mocks2.HandlerFactory{} store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) + adminClient := launchplan.NewFailFastLaunchPlanExecutor() + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, + adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) exec.nodeHandlerFactory = hf @@ -881,8 +901,9 @@ func TestNodeExecutor_RecursiveNodeHandler_NoDownstream(t *testing.T) { mockEventSink := events.NewMockEventSink().(*events.MockEventSink) store := createInmemoryDataStore(t, promutils.NewTestScope()) - - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) + adminClient := launchplan.NewFailFastLaunchPlanExecutor() + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, + adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) @@ -990,7 +1011,9 @@ func TestNodeExecutor_RecursiveNodeHandler_UpstreamNotReady(t *testing.T) { store := createInmemoryDataStore(t, promutils.NewTestScope()) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, launchplan.NewFailFastLaunchPlanExecutor(), 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) + adminClient := launchplan.NewFailFastLaunchPlanExecutor() + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, + 10, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*nodeExecutor) diff --git a/flytepropeller/pkg/controller/nodes/handler_factory.go b/flytepropeller/pkg/controller/nodes/handler_factory.go index 32168ef2e..852d10b29 100644 --- a/flytepropeller/pkg/controller/nodes/handler_factory.go +++ b/flytepropeller/pkg/controller/nodes/handler_factory.go @@ -50,7 +50,8 @@ func (f handlerFactory) Setup(ctx context.Context, setup handler.SetupContext) e return nil } -func NewHandlerFactory(ctx context.Context, executor executors.Node, workflowLauncher launchplan.Executor, kubeClient executors.Client, client catalog.Client, scope promutils.Scope) (HandlerFactory, error) { +func NewHandlerFactory(ctx context.Context, executor executors.Node, workflowLauncher launchplan.Executor, + launchPlanReader launchplan.Reader, kubeClient executors.Client, client catalog.Client, scope promutils.Scope) (HandlerFactory, error) { t, err := task.New(ctx, kubeClient, client, scope) if err != nil { @@ -60,7 +61,7 @@ func NewHandlerFactory(ctx context.Context, executor executors.Node, workflowLau f := &handlerFactory{ handlers: map[v1alpha1.NodeKind]handler.Node{ v1alpha1.NodeKindBranch: branch.New(executor, scope), - v1alpha1.NodeKindTask: dynamic.New(t, executor, scope), + v1alpha1.NodeKindTask: dynamic.New(t, executor, launchPlanReader, scope), v1alpha1.NodeKindWorkflow: subworkflow.New(executor, workflowLauncher, scope), v1alpha1.NodeKindStart: start.New(), v1alpha1.NodeKindEnd: end.New(), diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go index 75067d1b5..f54d13f4e 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go @@ -97,6 +97,26 @@ func (a *adminLaunchPlanExecutor) GetStatus(ctx context.Context, executionID *co return item.ExecutionClosure, item.SyncError } +func (a *adminLaunchPlanExecutor) GetLaunchPlan(ctx context.Context, launchPlanRef *core.Identifier) (*admin.LaunchPlan, error) { + if launchPlanRef == nil { + return nil, fmt.Errorf("launch plan reference is nil") + } + logger.Debugf(ctx, "Retrieving launch plan %s", *launchPlanRef) + getObjectRequest := admin.ObjectGetRequest{ + Id: launchPlanRef, + } + + lp, err := a.adminClient.GetLaunchPlan(ctx, &getObjectRequest) + if err != nil { + return nil, errors.Wrapf(RemoteErrorSystem, err, "Could not fetch launch plan definition from Admin") + } + if lp == nil { + return nil, errors.Wrapf(RemoteErrorSystem, err, "No launch plan retrieved from Admin") + } + + return lp, nil +} + func (a *adminLaunchPlanExecutor) Kill(ctx context.Context, executionID *core.WorkflowExecutionIdentifier, reason string) error { req := &admin.ExecutionTerminateRequest{ Id: executionID, @@ -161,7 +181,7 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc } func NewAdminLaunchPlanExecutor(_ context.Context, client service.AdminServiceClient, - syncPeriod time.Duration, cfg *AdminConfig, scope promutils.Scope) (Executor, error) { + syncPeriod time.Duration, cfg *AdminConfig, scope promutils.Scope) (FlyteAdmin, error) { exec := &adminLaunchPlanExecutor{ adminClient: client, } diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go index 3384cedcb..4f303ce98 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go @@ -275,3 +275,40 @@ func TestAdminLaunchPlanExecutor_Kill(t *testing.T) { assert.False(t, IsNotFound(err)) }) } + +func TestNewAdminLaunchPlanExecutor_GetLaunchPlan(t *testing.T) { + ctx := context.TODO() + id := &core.Identifier{ + ResourceType: core.ResourceType_LAUNCH_PLAN, + Name: "n", + Domain: "d", + Project: "p", + Version: "v", + } + + t.Run("launch plan found", func(t *testing.T) { + mockClient := &mocks.AdminServiceClient{} + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + assert.NoError(t, err) + mockClient.OnGetLaunchPlanMatch( + ctx, + mock.MatchedBy(func(o *admin.ObjectGetRequest) bool { return true }), + ).Return(&admin.LaunchPlan{Id: id}, nil) + lp, err := exec.GetLaunchPlan(ctx, id) + assert.NoError(t, err) + assert.Equal(t, lp.Id, id) + }) + + t.Run("launch plan not found", func(t *testing.T) { + mockClient := &mocks.AdminServiceClient{} + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope()) + assert.NoError(t, err) + mockClient.OnGetLaunchPlanMatch( + ctx, + mock.MatchedBy(func(o *admin.ObjectGetRequest) bool { return true }), + ).Return(nil, status.Error(codes.NotFound, "")) + lp, err := exec.GetLaunchPlan(ctx, id) + assert.Nil(t, lp) + assert.Error(t, err) + }) +} diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/launchplan.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/launchplan.go index 413f4a2b8..68da07a52 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/launchplan.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/launchplan.go @@ -34,3 +34,13 @@ type Executor interface { // Initializes Executor. Initialize(ctx context.Context) error } + +type Reader interface { + // Get the definition of a launch plan. This is primarily used to ensure all the TypedInterfaces match up before actually executing. + GetLaunchPlan(ctx context.Context, launchPlanRef *core.Identifier) (*admin.LaunchPlan, error) +} + +type FlyteAdmin interface { + Executor + Reader +} diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/mocks/Executor.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/mocks/Executor.go index c85036471..104d9f86d 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/mocks/Executor.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/mocks/Executor.go @@ -19,6 +19,47 @@ type Executor struct { mock.Mock } +type Executor_GetLaunchPlan struct { + *mock.Call +} + +func (_m Executor_GetLaunchPlan) Return(_a0 *admin.LaunchPlan, _a1 error) *Executor_GetLaunchPlan { + return &Executor_GetLaunchPlan{Call: _m.Call.Return(_a0, _a1)} +} + +func (_m *Executor) OnGetLaunchPlan(ctx context.Context, launchPlanRef *core.Identifier) *Executor_GetLaunchPlan { + c := _m.On("GetLaunchPlan", ctx, launchPlanRef) + return &Executor_GetLaunchPlan{Call: c} +} + +func (_m *Executor) OnGetLaunchPlanMatch(matchers ...interface{}) *Executor_GetLaunchPlan { + c := _m.On("GetLaunchPlan", matchers...) + return &Executor_GetLaunchPlan{Call: c} +} + +// GetLaunchPlan provides a mock function with given fields: ctx, launchPlanRef +func (_m *Executor) GetLaunchPlan(ctx context.Context, launchPlanRef *core.Identifier) (*admin.LaunchPlan, error) { + ret := _m.Called(ctx, launchPlanRef) + + var r0 *admin.LaunchPlan + if rf, ok := ret.Get(0).(func(context.Context, *core.Identifier) *admin.LaunchPlan); ok { + r0 = rf(ctx, launchPlanRef) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*admin.LaunchPlan) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *core.Identifier) error); ok { + r1 = rf(ctx, launchPlanRef) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + type Executor_GetStatus struct { *mock.Call } diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/noop.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/noop.go index 0ba2b0cc1..08150ccde 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/noop.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/noop.go @@ -12,6 +12,8 @@ import ( ) type failFastWorkflowLauncher struct { + Executor + Reader } func (failFastWorkflowLauncher) Launch(ctx context.Context, launchCtx LaunchContext, executionID *core.WorkflowExecutionIdentifier, launchPlanRef *core.Identifier, inputs *core.LiteralMap) error { @@ -28,12 +30,16 @@ func (failFastWorkflowLauncher) Kill(ctx context.Context, executionID *core.Work return nil } +func (failFastWorkflowLauncher) GetLaunchPlan(ctx context.Context, launchPlanRef *core.Identifier) (*admin.LaunchPlan, error) { + return nil, nil +} + // Initializes Executor. func (failFastWorkflowLauncher) Initialize(ctx context.Context) error { return nil } -func NewFailFastLaunchPlanExecutor() Executor { +func NewFailFastLaunchPlanExecutor() FlyteAdmin { logger.Infof(context.TODO(), "created failFast workflow launcher, will not launch subworkflows.") return &failFastWorkflowLauncher{} } diff --git a/flytepropeller/pkg/controller/workflow/executor_test.go b/flytepropeller/pkg/controller/workflow/executor_test.go index 90bdff4a1..f19194194 100644 --- a/flytepropeller/pkg/controller/workflow/executor_test.go +++ b/flytepropeller/pkg/controller/workflow/executor_test.go @@ -226,7 +226,9 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Error(t *testing.T) { catalogClient, err := catalog.NewCatalogClient(ctx) assert.NoError(t, err) - nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) + adminClient := launchplan.NewFailFastLaunchPlanExecutor() + nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, + adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, promutils.NewTestScope()) assert.NoError(t, err) @@ -301,7 +303,9 @@ func TestWorkflowExecutor_HandleFlyteWorkflow(t *testing.T) { catalogClient, err := catalog.NewCatalogClient(ctx) assert.NoError(t, err) - nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) + adminClient := launchplan.NewFailFastLaunchPlanExecutor() + nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, + adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, promutils.NewTestScope()) @@ -359,7 +363,9 @@ func BenchmarkWorkflowExecutor(b *testing.B) { eventSink := events.NewMockEventSink() catalogClient, err := catalog.NewCatalogClient(ctx) assert.NoError(b, err) - nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, scope) + adminClient := launchplan.NewFailFastLaunchPlanExecutor() + nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, + adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, scope) assert.NoError(b, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, promutils.NewTestScope()) @@ -444,7 +450,9 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) { } catalogClient, err := catalog.NewCatalogClient(ctx) assert.NoError(t, err) - nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) + adminClient := launchplan.NewFailFastLaunchPlanExecutor() + nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, + adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, promutils.NewTestScope()) assert.NoError(t, err) @@ -532,7 +540,9 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Events(t *testing.T) { } catalogClient, err := catalog.NewCatalogClient(ctx) assert.NoError(t, err) - nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) + adminClient := launchplan.NewFailFastLaunchPlanExecutor() + nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, + adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, promutils.NewTestScope()) assert.NoError(t, err) @@ -584,7 +594,9 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_EventFailure(t *testing.T) { catalogClient, err := catalog.NewCatalogClient(ctx) assert.NoError(t, err) - nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, nodeEventSink, launchplan.NewFailFastLaunchPlanExecutor(), maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) + adminClient := launchplan.NewFailFastLaunchPlanExecutor() + nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, nodeEventSink, adminClient, + adminClient, maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, promutils.NewTestScope()) assert.NoError(t, err) t.Run("EventAlreadyInTerminalStateError", func(t *testing.T) {