diff --git a/flytepropeller/.github/workflows/checks.yml b/flytepropeller/.github/workflows/checks.yml index faf1eedf48..6d8cd32638 100644 --- a/flytepropeller/.github/workflows/checks.yml +++ b/flytepropeller/.github/workflows/checks.yml @@ -13,12 +13,16 @@ jobs: lint: name: Lint uses: flyteorg/flytetools/.github/workflows/lint.yml@master + with: + go-version: 1.17 tests: name: Unit Tests uses: flyteorg/flytetools/.github/workflows/tests.yml@master secrets: FLYTE_BOT_PAT: ${{ secrets.FLYTE_BOT_PAT }} + with: + go-version: 1.17 docker-build: name: Docker Build Images @@ -26,7 +30,7 @@ jobs: endtoend: name: End2End Test - needs: [docker-build] + needs: [ docker-build ] uses: flyteorg/flytetools/.github/workflows/end2end.yml@master with: cache_key: ${{ needs.docker-build.outputs.cache_key }} @@ -34,14 +38,17 @@ jobs: integration: name: Integration Test - needs: [docker-build] + needs: [ docker-build ] uses: flyteorg/flytetools/.github/workflows/integration.yml@master with: cache_key: ${{ needs.docker-build.outputs.cache_key }} + go-version: 1.17 generate: - name: Check Go Gennerate + name: Check Go Generate uses: flyteorg/flytetools/.github/workflows/go_generate.yml@master + with: + go-version: 1.17 bump_version: name: Bump Version @@ -55,6 +62,8 @@ jobs: name: Goreleaser needs: [ bump_version ] # Only to ensure it can successfully build uses: flyteorg/flytetools/.github/workflows/goreleaser.yml@master + with: + go-version: 1.17 secrets: FLYTE_BOT_PAT: ${{ secrets.FLYTE_BOT_PAT }} diff --git a/flytepropeller/.github/workflows/upgrade_automation.yml b/flytepropeller/.github/workflows/upgrade_automation.yml index b94163b1c2..0bca2cf7ba 100644 --- a/flytepropeller/.github/workflows/upgrade_automation.yml +++ b/flytepropeller/.github/workflows/upgrade_automation.yml @@ -16,5 +16,6 @@ jobs: uses: flyteorg/flytetools/.github/workflows/flyte_automation.yml@master with: component: ${{ github.event.inputs.component }} + go-version: 1.17 secrets: FLYTE_BOT_PAT: ${{ secrets.FLYTE_BOT_PAT }} diff --git a/flytepropeller/go.mod b/flytepropeller/go.mod index f41eddc980..3a75e1faec 100644 --- a/flytepropeller/go.mod +++ b/flytepropeller/go.mod @@ -6,7 +6,7 @@ require ( github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.10.0 - github.com/flyteorg/flyteidl v0.24.0 + github.com/flyteorg/flyteidl v0.24.9 github.com/flyteorg/flyteplugins v0.10.16 github.com/flyteorg/flytestdlib v0.4.13 github.com/ghodss/yaml v1.0.0 @@ -25,6 +25,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.7.0 golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba + golang.org/x/tools v0.1.10 // indirect google.golang.org/grpc v1.36.0 google.golang.org/protobuf v1.25.0 k8s.io/api v0.20.2 diff --git a/flytepropeller/go.sum b/flytepropeller/go.sum index fbc6b4f67e..1e856b5761 100644 --- a/flytepropeller/go.sum +++ b/flytepropeller/go.sum @@ -237,8 +237,8 @@ github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= github.com/flyteorg/flyteidl v0.23.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= -github.com/flyteorg/flyteidl v0.24.0 h1:bEr9LGCilUX8t6gE8+K6qfW+LDXuCvBpHfllA+zDPZI= -github.com/flyteorg/flyteidl v0.24.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= +github.com/flyteorg/flyteidl v0.24.9 h1:wmZ/JEiCQ8cR2mkpFsvwwoUdz+g9GotoifBjLqXh7QY= +github.com/flyteorg/flyteidl v0.24.9/go.mod h1:vHSugApgS3hRITIafzQDU8DZD/W8wFRfFcgaFU35Dww= github.com/flyteorg/flyteplugins v0.10.16 h1:rwNI2MACPbcST2O6CEUsNW6bccz7ZLni0GiY3orevfw= github.com/flyteorg/flyteplugins v0.10.16/go.mod h1:YBWV8QnFakDJfLyua8pYddiWqszAqseBKIJPNMERlos= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= @@ -766,6 +766,7 @@ github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= @@ -823,8 +824,9 @@ golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= -golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -858,8 +860,9 @@ golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.4.1 h1:Kvvh58BN8Y9/lBi7hTekvtMpm07eUZ0ck5pRHpsMWrY= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 h1:kQgndtyPBW/JIYERgdxfwMYh3AVStj88WQTlNDi2a+o= +golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -909,8 +912,9 @@ golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f h1:OfiFi4JbukWwe3lzw+xunroH1mnC1e2Gy5cxNJApiSY= +golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -933,6 +937,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -998,9 +1003,11 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210309074719-68d13333faf2/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 h1:id054HUawV2/6IGm2IV8KZQjqtwAOo2CYlOToYqa0d0= +golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d h1:SZxvLBoTP5yHO3Frd4z4vrF+DBX9vMVanchswa69toE= @@ -1012,8 +1019,10 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1090,8 +1099,9 @@ golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= +golang.org/x/tools v0.1.10 h1:QjFRCZxdOhBJ/UNgnBZLbNV13DlbnK0quyivTnXJM20= +golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index 27c7a6e776..a2df13d893 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -454,9 +454,10 @@ func (c *nodeExecutor) handleNotYetStartedNode(ctx context.Context, dag executor if np != nodeStatus.GetPhase() { // assert np == Queued! logger.Infof(ctx, "Change in node state detected from [%s] -> [%s]", nodeStatus.GetPhase().String(), np.String()) + nev, err := ToNodeExecutionEvent(nCtx.NodeExecutionMetadata().GetNodeExecutionID(), p, nCtx.InputReader().GetInputPath().String(), nodeStatus, nCtx.ExecutionContext().GetEventVersion(), - nCtx.ExecutionContext().GetParentInfo(), nCtx.node, c.clusterID) + nCtx.ExecutionContext().GetParentInfo(), nCtx.node, c.clusterID, nCtx.NodeStateReader().GetDynamicNodeState().Phase) if err != nil { return executors.NodeStatusUndefined, errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "could not convert phase info to event") } @@ -570,7 +571,7 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx *node logger.Infof(ctx, "Change in node state detected from [%s] -> [%s], (handler phase [%s])", nodeStatus.GetPhase().String(), np.String(), p.GetPhase().String()) nev, err := ToNodeExecutionEvent(nCtx.NodeExecutionMetadata().GetNodeExecutionID(), p, nCtx.InputReader().GetInputPath().String(), nCtx.NodeStatus(), nCtx.ExecutionContext().GetEventVersion(), - nCtx.ExecutionContext().GetParentInfo(), nCtx.node, c.clusterID) + nCtx.ExecutionContext().GetParentInfo(), nCtx.node, c.clusterID, nCtx.NodeStateReader().GetDynamicNodeState().Phase) if err != nil { return executors.NodeStatusUndefined, errors.Wrapf(errors.IllegalStateError, nCtx.NodeID(), err, "could not convert phase info to event") } diff --git a/flytepropeller/pkg/controller/nodes/executor_test.go b/flytepropeller/pkg/controller/nodes/executor_test.go index a6c932e4f9..1624d9083c 100644 --- a/flytepropeller/pkg/controller/nodes/executor_test.go +++ b/flytepropeller/pkg/controller/nodes/executor_test.go @@ -590,6 +590,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { Message: "Expected Failure", }) } + mockN2Status.OnGetDynamicNodeStatus().Return(&v1alpha1.DynamicNodeStatus{}) mockNode := &mocks.ExecutableNode{} mockNode.OnGetID().Return(nodeN2) @@ -622,6 +623,8 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { n := v1.Now() mockN0Status.OnGetStoppedAt().Return(&n) + mockN0Status.OnGetDynamicNodeStatus().Return(&v1alpha1.DynamicNodeStatus{}) + tk := &mocks.ExecutableTask{} tk.OnCoreTask().Return(&core.TaskTemplate{}) mockWfStatus := &mocks.ExecutableWorkflowStatus{} @@ -1271,6 +1274,7 @@ func TestNodeExecutor_RecursiveNodeHandler_BranchNode(t *testing.T) { bns := &mocks.MutableBranchNodeStatus{} parentBranchNodeStatus.OnGetBranchStatus().Return(bns) bns.OnGetPhase().Return(test.parentNodePhase) + parentBranchNodeStatus.OnGetDynamicNodeStatus().Return(&v1alpha1.DynamicNodeStatus{}) tk := &mocks.ExecutableTask{} tk.OnCoreTask().Return(&core.TaskTemplate{}) @@ -1307,6 +1311,7 @@ func TestNodeExecutor_RecursiveNodeHandler_BranchNode(t *testing.T) { branchTakeNodeStatus.OnGetParentNodeID().Return(&parentBranchNodeID) branchTakeNodeStatus.OnGetParentTaskID().Return(nil) branchTakeNodeStatus.OnGetStartedAt().Return(&now) + branchTakeNodeStatus.OnGetDynamicNodeStatus().Return(&v1alpha1.DynamicNodeStatus{}) if test.phaseUpdateExpected { var ee *core.ExecutionError @@ -1707,13 +1712,15 @@ func TestNodeExecutionEventStartNode(t *testing.T) { n := &mocks.ExecutableNode{} n.OnGetID().Return(id) n.OnGetName().Return("name") + n.OnGetKind().Return(v1alpha1.NodeKindStart) nl := &mocks4.NodeLookup{} ns := &mocks.ExecutableNodeStatus{} ns.OnGetPhase().Return(v1alpha1.NodePhaseNotYetStarted) nl.OnGetNodeExecutionStatusMatch(mock.Anything, id).Return(ns) ns.OnGetParentTaskID().Return(tID) ns.OnGetOutputDirMatch(mock.Anything).Return("dummy://dummyOutUrl") - ev, err := ToNodeExecutionEvent(nID, p, "reference", ns, v1alpha1.EventVersion0, parentInfo, n, testClusterID) + ns.OnGetDynamicNodeStatus().Return(&v1alpha1.DynamicNodeStatus{}) + ev, err := ToNodeExecutionEvent(nID, p, "reference", ns, v1alpha1.EventVersion0, parentInfo, n, testClusterID, v1alpha1.DynamicNodePhaseNone) assert.NoError(t, err) assert.Equal(t, "start-node", ev.Id.NodeId) assert.Equal(t, execID, ev.Id.ExecutionId) @@ -1749,12 +1756,13 @@ func TestNodeExecutionEventV0(t *testing.T) { n := &mocks.ExecutableNode{} n.OnGetID().Return(id) n.OnGetName().Return("name") + n.OnGetKind().Return(v1alpha1.NodeKindTask) nl := &mocks4.NodeLookup{} ns := &mocks.ExecutableNodeStatus{} ns.OnGetPhase().Return(v1alpha1.NodePhaseNotYetStarted) nl.OnGetNodeExecutionStatusMatch(mock.Anything, id).Return(ns) ns.OnGetParentTaskID().Return(tID) - ev, err := ToNodeExecutionEvent(nID, p, "reference", ns, v1alpha1.EventVersion0, parentInfo, n, testClusterID) + ev, err := ToNodeExecutionEvent(nID, p, "reference", ns, v1alpha1.EventVersion0, parentInfo, n, testClusterID, v1alpha1.DynamicNodePhaseNone) assert.NoError(t, err) assert.Equal(t, "n1", ev.Id.NodeId) assert.Equal(t, execID, ev.Id.ExecutionId) @@ -1789,12 +1797,13 @@ func TestNodeExecutionEventV1(t *testing.T) { n := &mocks.ExecutableNode{} n.OnGetID().Return(id) n.OnGetName().Return("name") + n.OnGetKind().Return(v1alpha1.NodeKindTask) nl := &mocks4.NodeLookup{} ns := &mocks.ExecutableNodeStatus{} ns.OnGetPhase().Return(v1alpha1.NodePhaseNotYetStarted) nl.OnGetNodeExecutionStatusMatch(mock.Anything, id).Return(ns) ns.OnGetParentTaskID().Return(tID) - eventOpt, err := ToNodeExecutionEvent(nID, p, "reference", ns, v1alpha1.EventVersion1, parentInfo, n, testClusterID) + eventOpt, err := ToNodeExecutionEvent(nID, p, "reference", ns, v1alpha1.EventVersion1, parentInfo, n, testClusterID, v1alpha1.DynamicNodePhaseNone) assert.NoError(t, err) assert.Equal(t, "np1-2-n1", eventOpt.Id.NodeId) assert.Equal(t, execID, eventOpt.Id.ExecutionId) diff --git a/flytepropeller/pkg/controller/nodes/transformers.go b/flytepropeller/pkg/controller/nodes/transformers.go index e94fe6ccac..e5839984e0 100644 --- a/flytepropeller/pkg/controller/nodes/transformers.go +++ b/flytepropeller/pkg/controller/nodes/transformers.go @@ -19,6 +19,9 @@ import ( "github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler" ) +// This is used by flyteadmin to indicate that the events will now contain populated IsParent and IsDynamic bits. +var nodeExecutionEventVersion = int32(1) + func ToNodeExecOutput(info *handler.OutputInfo) *event.NodeExecutionEvent_OutputUri { if info == nil || info.OutputURI == "" { return nil @@ -76,7 +79,7 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier, status v1alpha1.ExecutableNodeStatus, eventVersion v1alpha1.EventVersion, parentInfo executors.ImmutableParentInfo, - node v1alpha1.ExecutableNode, clusterID string) (*event.NodeExecutionEvent, error) { + node v1alpha1.ExecutableNode, clusterID string, dynamicNodePhase v1alpha1.DynamicNodePhase) (*event.NodeExecutionEvent, error) { if info.GetPhase() == handler.EPhaseNotReady { return nil, nil } @@ -105,16 +108,18 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier, OutputResult: ToNodeExecOutput(&handler.OutputInfo{ OutputURI: outputsFile, }), - OccurredAt: occurredTime, - ProducerId: clusterID, + OccurredAt: occurredTime, + ProducerId: clusterID, + EventVersion: nodeExecutionEventVersion, } } else { nev = &event.NodeExecutionEvent{ - Id: nodeExecID, - Phase: phase, - InputUri: inputPath, - OccurredAt: occurredTime, - ProducerId: clusterID, + Id: nodeExecID, + Phase: phase, + InputUri: inputPath, + OccurredAt: occurredTime, + ProducerId: clusterID, + EventVersion: nodeExecutionEventVersion, } } @@ -161,6 +166,14 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier, Error: info.GetErr(), } } + if node.GetKind() == v1alpha1.NodeKindWorkflow && node.GetWorkflowNode() != nil && node.GetWorkflowNode().GetSubWorkflowRef() != nil { + nev.IsParent = true + } else if dynamicNodePhase != v1alpha1.DynamicNodePhaseNone { + nev.IsDynamic = true + if nev.GetTaskNodeMetadata() != nil && nev.GetTaskNodeMetadata().DynamicWorkflow != nil { + nev.IsParent = true + } + } return nev, nil } diff --git a/flytepropeller/pkg/controller/nodes/transformers_test.go b/flytepropeller/pkg/controller/nodes/transformers_test.go new file mode 100644 index 0000000000..b040b1385f --- /dev/null +++ b/flytepropeller/pkg/controller/nodes/transformers_test.go @@ -0,0 +1,88 @@ +package nodes + +import ( + "testing" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks" + mocks2 "github.com/flyteorg/flytepropeller/pkg/controller/executors/mocks" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler" + "github.com/flyteorg/flytestdlib/storage" + "github.com/stretchr/testify/assert" +) + +func TestToNodeExecutionEvent(t *testing.T) { + project := "project" + domain := "domain" + t.Run("dynamic node", func(t *testing.T) { + info := handler.PhaseInfoDynamicRunning(&handler.ExecutionInfo{TaskNodeInfo: &handler.TaskNodeInfo{ + TaskNodeMetadata: &event.TaskNodeMetadata{ + DynamicWorkflow: &event.DynamicWorkflowNodeMetadata{ + Id: &core.Identifier{ + Project: project, + Domain: domain, + Name: "wfname", + Version: "wfversion", + }, + }, + }, + }}) + status := mocks.ExecutableNodeStatus{} + status.OnGetOutputDir().Return(storage.DataReference("s3://foo/bar")) + status.OnGetParentNodeID().Return(nil) + parentInfo := mocks2.ImmutableParentInfo{} + parentInfo.OnCurrentAttempt().Return(0) + parentInfo.OnGetUniqueID().Return("u") + node := mocks.ExecutableNode{} + node.OnGetID().Return("n") + node.OnGetName().Return("nodey") + node.OnGetKind().Return(v1alpha1.NodeKindTask) + + nev, err := ToNodeExecutionEvent(&core.NodeExecutionIdentifier{ + NodeId: "nodey", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "exec", + }, + }, info, "inputPath", &status, v1alpha1.EventVersion2, &parentInfo, &node, "clusterID", v1alpha1.DynamicNodePhaseParentFinalized) + assert.NoError(t, err) + assert.True(t, nev.IsDynamic) + assert.True(t, nev.IsParent) + assert.Equal(t, nodeExecutionEventVersion, nev.EventVersion) + }) + t.Run("is parent", func(t *testing.T) { + info := handler.PhaseInfoDynamicRunning(&handler.ExecutionInfo{TaskNodeInfo: &handler.TaskNodeInfo{ + TaskNodeMetadata: &event.TaskNodeMetadata{}, + }}) + status := mocks.ExecutableNodeStatus{} + status.OnGetOutputDir().Return(storage.DataReference("s3://foo/bar")) + status.OnGetParentNodeID().Return(nil) + parentInfo := mocks2.ImmutableParentInfo{} + parentInfo.OnCurrentAttempt().Return(0) + parentInfo.OnGetUniqueID().Return("u") + node := mocks.ExecutableNode{} + node.OnGetID().Return("n") + node.OnGetName().Return("nodey") + node.OnGetKind().Return(v1alpha1.NodeKindWorkflow) + executableWorkflowNode := mocks.ExecutableWorkflowNode{} + subworkflowRef := "ref" + executableWorkflowNode.OnGetSubWorkflowRef().Return(&subworkflowRef) + node.OnGetWorkflowNode().Return(&executableWorkflowNode) + + nev, err := ToNodeExecutionEvent(&core.NodeExecutionIdentifier{ + NodeId: "nodey", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "exec", + }, + }, info, "inputPath", &status, v1alpha1.EventVersion2, &parentInfo, &node, "clusterID", v1alpha1.DynamicNodePhaseNone) + assert.NoError(t, err) + assert.False(t, nev.IsDynamic) + assert.True(t, nev.IsParent) + assert.Equal(t, nodeExecutionEventVersion, nev.EventVersion) + }) +}