diff --git a/config.yaml b/config.yaml index eca2d0923f..64d2a90fed 100644 --- a/config.yaml +++ b/config.yaml @@ -32,6 +32,9 @@ tasks: - container - K8S-ARRAY - qubole-hive-executor + default-for-task-type: + - container-array: k8s-array + - presto: my-presto # Uncomment to enable sagemaker plugin # - sagemaker_training # - sagemaker_hyperparameter_tuning diff --git a/go.mod b/go.mod index c57043f106..4f8970c93a 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/Azure/go-autorest/autorest v0.10.0 // indirect github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 - github.com/coreos/etcd v3.3.15+incompatible // indirect github.com/coreos/go-oidc v2.2.1+incompatible // indirect github.com/fatih/color v1.9.0 github.com/ghodss/yaml v1.0.0 @@ -23,11 +22,10 @@ require ( github.com/imdario/mergo v0.3.8 // indirect github.com/lyft/datacatalog v0.2.1 github.com/lyft/flyteidl v0.18.9 - github.com/lyft/flyteplugins v0.5.12 + github.com/lyft/flyteplugins v0.5.14 github.com/lyft/flytestdlib v0.3.9 github.com/magiconair/properties v1.8.1 github.com/mattn/go-colorable v0.1.6 // indirect - github.com/mitchellh/go-ps v1.0.0 // indirect github.com/mitchellh/mapstructure v1.1.2 github.com/ncw/swift v1.0.50 // indirect github.com/pkg/errors v0.9.1 @@ -49,7 +47,6 @@ require ( k8s.io/kube-openapi v0.0.0-20200204173128-addea2498afe // indirect k8s.io/utils v0.0.0-20200229041039-0a110f9eb7ab // indirect sigs.k8s.io/controller-runtime v0.5.1 - sigs.k8s.io/testing_frameworks v0.1.2 // indirect sigs.k8s.io/yaml v1.2.0 // indirect ) diff --git a/go.sum b/go.sum index 4206c546bf..935702f776 100644 --- a/go.sum +++ b/go.sum @@ -25,7 +25,6 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go v32.5.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go v38.2.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= -github.com/Azure/azure-sdk-for-go v39.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go v40.3.0+incompatible h1:NthZg3psrLxvQLN6rVm07pZ9mv2wvGNaBNGQ3fnPvLE= github.com/Azure/azure-sdk-for-go v40.3.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= @@ -91,7 +90,6 @@ github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20200410212604-780c48e github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20200410212604-780c48ecb21a/go.mod h1:kw+Gl0uvPAMADPoubX+kLx0P7e7zWOr6rc+R7D24pbc= github.com/aws/aws-sdk-go v1.23.4/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.28.9/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= -github.com/aws/aws-sdk-go v1.28.11/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.29.23 h1:wtiGLOzxAP755OfuVTDIy/NbUIYEDxbIbBEDfNhUpeU= github.com/aws/aws-sdk-go v1.29.23/go.mod h1:1KvfttTE3SPKMpo8g2c6jL3ZKfXtFvKscTgahTma5Xg= github.com/aws/aws-sdk-go-v2 v0.20.0 h1:/yefUjgMrda9PNFwWctBU63nL10CJMdBwkAmaQ4w4Hs= @@ -119,10 +117,8 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/coocood/freecache v1.1.0 h1:ENiHOsWdj1BrrlPwblhbn4GdAsMymK3pZORJ+bJGAjA= github.com/coocood/freecache v1.1.0/go.mod h1:ePwxCDzOYvARfHdr1pByNct1at3CoKnsipOHwKlNbzI= -github.com/coreos/bbolt v1.3.1-coreos.6/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= -github.com/coreos/etcd v3.3.15+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-oidc v2.1.0+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHoZ1nMCKZlZ9V6mm3/LKc= github.com/coreos/go-oidc v2.2.1+incompatible h1:mh48q/BqXqgjVHpy2ZY7WnWAbenxRjsz9N1i1YxjHAk= @@ -269,7 +265,6 @@ github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFU github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.0.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -317,7 +312,6 @@ github.com/graymeta/stow v0.2.4/go.mod h1:+0vRL9oMECKjPMP7OeVWl8EIqRCpFwDlth3mrA github.com/graymeta/stow v0.2.5 h1:YFSo4nsAU4Fbi4r/neLIgVYlrMzA1ReDUkdLYTQm/RM= github.com/graymeta/stow v0.2.5/go.mod h1:+0vRL9oMECKjPMP7OeVWl8EIqRCpFwDlth3mrAeV2Kw= github.com/gregjones/httpcache v0.0.0-20170728041850-787624de3eb7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= -github.com/grpc-ecosystem/go-grpc-middleware v0.0.0-20190222133341-cfaf5686ec79/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-middleware v1.1.0/go.mod h1:f5nM7jw/oeRSadq3xCzHAvxcr8HZnzsqU6ILg/0NiiE= @@ -325,7 +319,6 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.2.0 h1:0IKlLyQ3Hs9nDaiK5cSHAGmcQ github.com/grpc-ecosystem/go-grpc-middleware v1.2.0/go.mod h1:mJzapYve32yjrKlk9GbyCZHuPgZsrbyIbyKhSzOpg6s= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= -github.com/grpc-ecosystem/grpc-gateway v1.3.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.12.2/go.mod h1:8XEsbTttt/W+VvjtQhLACqCisSPWTxCZ7sBRjU6iH9c= @@ -396,20 +389,14 @@ github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f/go.mod h1:llRdnz github.com/lyft/datacatalog v0.2.1 h1:W7LsAjaS297iLCtSH9ZaAAG3YPofwkbbgIaqkfdeM0o= github.com/lyft/datacatalog v0.2.1/go.mod h1:ktrPvzTDUwHO5Lv0hLH38zLHnOJ++rGoAO0iQ/sIPJ4= github.com/lyft/flyteidl v0.17.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= -github.com/lyft/flyteidl v0.18.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= -github.com/lyft/flyteidl v0.18.7 h1:R8gSt2Tze9BlHbFHZPDPWl630272US+MbSjqoeVkflg= -github.com/lyft/flyteidl v0.18.7/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteidl v0.18.9 h1:p9gLp92whTSSOeMGPtZ4tkgsVHNGuBuXXMQ447s0J9E= github.com/lyft/flyteidl v0.18.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= -github.com/lyft/flyteplugins v0.5.1 h1:76FpQFohLCy4Eo490sES2empRAi31DJiERfbOSV9pCg= -github.com/lyft/flyteplugins v0.5.1/go.mod h1:8zhqFG9BzbHNQGEXzGYltTJLD+KTmQZkanxXgeFI25c= -github.com/lyft/flyteplugins v0.5.6 h1:4r9aT8XGRcTQk6VCUlJkz9tsSFFPXLVL6C/mVcgCvyQ= -github.com/lyft/flyteplugins v0.5.6/go.mod h1:BRUqCc6HycsFVnhF5Z5MMBHYoqOemdCyfG59lg0B1bY= -github.com/lyft/flyteplugins v0.5.10 h1:mMAtx9PZ1ZdEQ57HiAQsZmF6Rhwo9SWFNnZxYHZY394= -github.com/lyft/flyteplugins v0.5.10/go.mod h1:X17xeh3Sc9ZaAEoZp1Lw6e1lysiToY7aPqYMcstAREI= github.com/lyft/flyteplugins v0.5.12 h1:obz52m9dJ/ununeQJ2OcLZ1z38TE5KMDo8gyH/g8NAg= github.com/lyft/flyteplugins v0.5.12/go.mod h1:UOoiW+rwQdrDDig3bJSxTWoyW8hW5bcqnuRsp+g2zhQ= -github.com/lyft/flytepropeller v0.4.2/go.mod h1:TIiWv/ZP1KOI0mqeUbiMqSn2XuY8O8kn8fQc5tWcaLA= +github.com/lyft/flyteplugins v0.5.14-0.20201019203656-dbcbf808fc35 h1:JWOQEs0sJN/vyW2EhpQfK2VhuGwMcd4MU5JGB9OsWls= +github.com/lyft/flyteplugins v0.5.14-0.20201019203656-dbcbf808fc35/go.mod h1:UOoiW+rwQdrDDig3bJSxTWoyW8hW5bcqnuRsp+g2zhQ= +github.com/lyft/flyteplugins v0.5.14 h1:YDJG7d7ZecT6xoe3Jrj99Q4HiCOg6btm9yjKz1LDuOk= +github.com/lyft/flyteplugins v0.5.14/go.mod h1:UOoiW+rwQdrDDig3bJSxTWoyW8hW5bcqnuRsp+g2zhQ= github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= github.com/lyft/flytestdlib v0.3.9 h1:NaKp9xkeWWwhVvqTOcR/FqlASy1N2gu/kN7PVe4S7YI= github.com/lyft/flytestdlib v0.3.9/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= @@ -439,7 +426,6 @@ github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsO github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= -github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg= github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -461,7 +447,6 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.4.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -469,7 +454,6 @@ github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+ github.com/onsi/ginkgo v1.11.0 h1:JAKSXpt1YjtLA7YpPiqO9ss6sNXEsPfSGdwN0UHqzrw= github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= -github.com/onsi/gomega v1.3.0/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= @@ -499,12 +483,10 @@ github.com/pquerna/cachecontrol v0.0.0-20180517163645-1555304b9b35/go.mod h1:prY github.com/pquerna/ffjson v0.0.0-20190813045741-dac163c6c0a9/go.mod h1:YARuvh7BUWHNhzDq2OM5tzR2RiCcN2D7sapiKyCel/M= github.com/prometheus/client_golang v0.9.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= -github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs= github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og= -github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= github.com/prometheus/client_golang v1.5.0 h1:Ctq0iGpCmr3jeP77kbF2UxgvRwzWWz+4Bh9/vJTyg1A= github.com/prometheus/client_golang v1.5.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= @@ -516,7 +498,6 @@ github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2 github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= -github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= @@ -525,7 +506,6 @@ github.com/prometheus/common v0.9.1 h1:KOMtN28tlbam3/7ZKEYKHhKoJZYYj3gMH4uc62x7X github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= -github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= @@ -552,7 +532,6 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykE github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= -github.com/soheilhy/cmux v0.1.3/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= @@ -599,7 +578,6 @@ github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGr github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw= -github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= @@ -614,14 +592,11 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3 h1:8sGtKOrtQqkN1bp2AtX+misvLIlOmsEsNd+9NIcPEm8= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= -go.uber.org/atomic v0.0.0-20181018215023-8dc6146f7569/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= -go.uber.org/multierr v0.0.0-20180122172545-ddea229ff1df/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= -go.uber.org/zap v0.0.0-20180814183419-67bc79d13d15/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= @@ -675,13 +650,11 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20180112015858-5ccada7d0a7b/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181005035420-146acd28ed58/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -722,7 +695,6 @@ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20180117170059-2c42eef0765b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -765,7 +737,6 @@ golang.org/x/sys v0.0.0-20200327173247-9dae0f8f5775/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.1-0.20171227012246-e19ae1496984/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -905,7 +876,6 @@ gopkg.in/square/go-jose.v2 v2.4.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76 gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= -gopkg.in/yaml.v2 v2.0.0/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -927,16 +897,12 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= k8s.io/apiextensions-apiserver v0.0.0-20190409022649-727a075fdec8/go.mod h1:IxkesAMoaCRoLrPJdZNZUQp9NfZnzqaVzLhb2VEQzXE= -k8s.io/apiextensions-apiserver v0.0.0-20190918161926-8f644eb6e783/go.mod h1:xvae1SZB3E17UpV59AWc271W/Ph25N+bjPyR63X6tPY= k8s.io/apiextensions-apiserver v0.17.2 h1:cP579D2hSZNuO/rZj9XFRzwJNYb41DbNANJb6Kolpss= k8s.io/apiextensions-apiserver v0.17.2/go.mod h1:4KdMpjkEjjDI2pPfBA15OscyNldHWdBCfsWMDWAmSTs= -k8s.io/apiserver v0.0.0-20190918160949-bfa5e2e684ad/go.mod h1:XPCXEwhjaFN29a8NldXA901ElnKeKLrLtREO9ZhFyhg= k8s.io/apiserver v0.17.2/go.mod h1:lBmw/TtQdtxvrTk0e2cgtOxHizXI+d0mmGQURIHQZlo= k8s.io/client-go v0.0.0-20191016111102-bec269661e48 h1:C2XVy2z0dV94q9hSSoCuTPp1KOG7IegvbdXuz9VGxoU= k8s.io/client-go v0.0.0-20191016111102-bec269661e48/go.mod h1:hrwktSwYGI4JK+TJA3dMaFyyvHVi/aLarVHpbs8bgCU= -k8s.io/code-generator v0.0.0-20190912054826-cd179ad6a269/go.mod h1:V5BD6M4CyaN5m+VthcclXWsVcT1Hu+glwa1bi3MIsyE= k8s.io/code-generator v0.17.2/go.mod h1:DVmfPQgxQENqDIzVR2ddLXMH34qeszkKSdH/N+s+38s= -k8s.io/component-base v0.0.0-20190918160511-547f6c5d7090/go.mod h1:933PBGtQFJky3TEwYx4aEPZ4IxqhWh3R6DCmzqIn1hA= k8s.io/component-base v0.17.2/go.mod h1:zMPW3g5aH7cHJpKYQ/ZsGMcgbsA/VyhEugF3QT1awLs= k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= k8s.io/gengo v0.0.0-20190822140433-26a664648505/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= @@ -965,15 +931,12 @@ rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8 rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= sigs.k8s.io/controller-runtime v0.2.0/go.mod h1:ZHqrRDZi3f6BzONcvlUxkqCKgwasGk5FZrnSv9TVZF4= -sigs.k8s.io/controller-runtime v0.4.0/go.mod h1:ApC79lpY3PHW9xj/w9pj+lYkLgwAAUZwfXkME1Lajns= sigs.k8s.io/controller-runtime v0.5.1 h1:TNidCfVoU/cs2i+9xoTcL/l7yhl0bDhYXU0NCG6wmiE= sigs.k8s.io/controller-runtime v0.5.1/go.mod h1:Uojny7gvg55YLQnEGnPzRE3dC4ik2tRlZJgOUCWXAV4= sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e/go.mod h1:wWxsB5ozmmv/SG7nM11ayaAW51xMvak/t1r0CSlcokI= -sigs.k8s.io/structured-merge-diff v0.0.0-20190817042607-6149e4549fca/go.mod h1:IIgPezJWb76P0hotTxzDbWsMYB8APh18qZnxkomBpxA= sigs.k8s.io/structured-merge-diff v1.0.1-0.20191108220359-b1b620dd3f06/go.mod h1:/ULNhyfzRopfcjskuui0cTITekDduZ7ycKN3oUT9R18= sigs.k8s.io/structured-merge-diff/v3 v3.0.0-20200116222232-67a7b8c61874/go.mod h1:PlARxl6Hbt/+BC80dRLi1qAmnMqwqDg62YvvVkZjemw= sigs.k8s.io/testing_frameworks v0.1.1/go.mod h1:VVBKrHmJ6Ekkfz284YKhQePcdycOzNH9qL6ht1zEr/U= -sigs.k8s.io/testing_frameworks v0.1.2/go.mod h1:ToQrwSC3s8Xf/lADdZp3Mktcql9CG0UAmdJG9th5i0w= sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q= sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= diff --git a/pkg/controller/nodes/task/config/config.go b/pkg/controller/nodes/task/config/config.go index b4f45d473c..cc7d219beb 100644 --- a/pkg/controller/nodes/task/config/config.go +++ b/pkg/controller/nodes/task/config/config.go @@ -1,11 +1,15 @@ package config import ( + "context" + "fmt" "strings" "time" - "github.com/lyft/flytestdlib/config" + "github.com/lyft/flytestdlib/logger" "k8s.io/apimachinery/pkg/util/sets" + + "github.com/lyft/flytestdlib/config" ) //go:generate pflags Config --default-var defaultConfig @@ -14,7 +18,7 @@ const SectionKey = "tasks" var ( defaultConfig = &Config{ - TaskPlugins: TaskPluginConfig{EnabledPlugins: []string{}}, + TaskPlugins: TaskPluginConfig{EnabledPlugins: []string{}, DefaultForTaskTypes: map[string]string{}}, MaxPluginPhaseVersions: 100000, BarrierConfig: BarrierConfig{ Enabled: true, @@ -46,7 +50,9 @@ type BarrierConfig struct { } type TaskPluginConfig struct { - EnabledPlugins []string `json:"enabled-plugins" pflag:",Plugins enabled currently"` + EnabledPlugins []string `json:"enabled-plugins" pflag:",deprecated"` + // Maps task types to their plugin handler (by ID). + DefaultForTaskTypes map[string]string `json:"default-for-task-types" pflag:"-,"` } type BackOffConfig struct { @@ -54,14 +60,61 @@ type BackOffConfig struct { MaxDuration config.Duration `json:"max-duration" pflag:",The cap of the backoff duration"` } -func (p TaskPluginConfig) GetEnabledPluginsSet() sets.String { - s := sets.NewString() - for _, e := range p.EnabledPlugins { - cleanedPluginName := strings.Trim(e, " ") - cleanedPluginName = strings.ToLower(cleanedPluginName) - s.Insert(cleanedPluginName) +type PluginID = string +type TaskType = string + +// Contains the set of enabled plugins for this flytepropeller deployment along with default plugin handlers +// for specific task types. +type PluginsConfigMeta struct { + EnabledPlugins sets.String + AllDefaultForTaskTypes map[PluginID][]TaskType +} + +func cleanString(source string) string { + cleaned := strings.Trim(source, " ") + cleaned = strings.ToLower(cleaned) + return cleaned +} + +func (p TaskPluginConfig) GetEnabledPlugins() (PluginsConfigMeta, error) { + enabledPluginsNames := sets.NewString() + for _, pluginName := range p.EnabledPlugins { + cleanedPluginName := cleanString(pluginName) + enabledPluginsNames.Insert(cleanedPluginName) + } + + pluginDefaultForTaskType := make(map[PluginID][]TaskType) + // Reverse the DefaultForTaskTypes map. Having the config use task type as a key guarantees only one default plugin can be specified per + // task type but now we need to sort for which tasks a plugin needs to be the default. + for taskName, pluginName := range p.DefaultForTaskTypes { + existing, found := pluginDefaultForTaskType[pluginName] + if !found { + existing = make([]string, 0, 1) + } + pluginDefaultForTaskType[cleanString(pluginName)] = append(existing, cleanString(taskName)) + } + + // All plugins are enabled, nothing further to validate here. + if enabledPluginsNames.Len() == 0 { + return PluginsConfigMeta{ + EnabledPlugins: enabledPluginsNames, + AllDefaultForTaskTypes: pluginDefaultForTaskType, + }, nil + } + + // Finally, validate that default plugins for task types only reference enabled plugins + for pluginName, taskTypes := range pluginDefaultForTaskType { + if !enabledPluginsNames.Has(pluginName) { + logger.Errorf(context.TODO(), "Cannot set default plugin [%s] for task types [%+v] when it is not "+ + "configured to be an enabled plugin. Please double check the flytepropeller config.", pluginName, taskTypes) + return PluginsConfigMeta{}, fmt.Errorf("cannot set default plugin [%s] for task types [%+v] when it is not "+ + "configured to be an enabled plugin", pluginName, taskTypes) + } } - return s + return PluginsConfigMeta{ + EnabledPlugins: enabledPluginsNames, + AllDefaultForTaskTypes: pluginDefaultForTaskType, + }, nil } func GetConfig() *Config { diff --git a/pkg/controller/nodes/task/config/config_flags.go b/pkg/controller/nodes/task/config/config_flags.go index 76707946d4..b7e8ad8489 100755 --- a/pkg/controller/nodes/task/config/config_flags.go +++ b/pkg/controller/nodes/task/config/config_flags.go @@ -41,7 +41,7 @@ func (Config) mustMarshalJSON(v json.Marshaler) string { // flags is json-name.json-sub-name... etc. func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags := pflag.NewFlagSet("Config", pflag.ExitOnError) - cmdFlags.StringSlice(fmt.Sprintf("%v%v", prefix, "task-plugins.enabled-plugins"), []string{}, "Plugins enabled currently") + cmdFlags.StringSlice(fmt.Sprintf("%v%v", prefix, "task-plugins.enabled-plugins"), []string{}, "deprecated") cmdFlags.Int32(fmt.Sprintf("%v%v", prefix, "max-plugin-phase-versions"), defaultConfig.MaxPluginPhaseVersions, "Maximum number of plugin phase versions allowed for one phase.") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "barrier.enabled"), defaultConfig.BarrierConfig.Enabled, "Enable Barrier transitions using inmemory context") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "barrier.cache-size"), defaultConfig.BarrierConfig.CacheSize, "Max number of barrier to preserve in memory") diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index b92dc30396..66353ac76a 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -153,6 +153,7 @@ type PluginRegistryIface interface { GetK8sPlugins() []pluginK8s.PluginEntry } +type taskType = string type pluginID = string type Handler struct { @@ -204,21 +205,35 @@ func (t *Handler) Setup(ctx context.Context, sCtx handler.SetupContext) error { return err } + // Not every task type will have a default plugin specified in the flytepropeller config. + // That's fine, we resort to using the plugins' static RegisteredTaskTypes as a fallback further below. + fallbackTaskHandlerMap := make(map[taskType]map[pluginID]pluginCore.Plugin) + for _, p := range enabledPlugins { // create a new resource registrar proxy for each plugin, and pass it into the plugin's LoadPlugin() via a setup context pluginResourceNamespacePrefix := pluginCore.ResourceNamespace(newResourceManagerBuilder.GetID()).CreateSubNamespace(pluginCore.ResourceNamespace(p.ID)) sCtxFinal := newNameSpacedSetupCtx( tSCtx, newResourceManagerBuilder.GetResourceRegistrar(pluginResourceNamespacePrefix)) logger.Infof(ctx, "Loading Plugin [%s] ENABLED", p.ID) - // cp, err := p.LoadPlugin(ctx, tSCtx) cp, err := p.LoadPlugin(ctx, sCtxFinal) if err != nil { return regErrors.Wrapf(err, "failed to load plugin - %s", p.ID) } + // For every default plugin for a task type specified in flytepropeller config we validate that the plugin's + // static definition includes that task type as something it is registered to handle. for _, tt := range p.RegisteredTaskTypes { - logger.Infof(ctx, "Plugin [%s] registered for TaskType [%s]", cp.GetID(), tt) - // TODO(katrogan): Make the default task plugin assignment more explicit (https://github.com/lyft/flyte/issues/516) - t.defaultPlugins[tt] = cp + for _, defaultTaskType := range p.DefaultForTaskTypes { + if defaultTaskType == tt { + if existingHandler, alreadyDefaulted := t.defaultPlugins[tt]; alreadyDefaulted && existingHandler.GetID() != cp.GetID() { + logger.Errorf(ctx, "TaskType [%s] has multiple default handlers specified: [%s] and [%s]", + tt, existingHandler.GetID(), cp.GetID()) + return regErrors.New(fmt.Sprintf("TaskType [%s] has multiple default handlers specified: [%s] and [%s]", + tt, existingHandler.GetID(), cp.GetID())) + } + logger.Infof(ctx, "Plugin [%s] registered for TaskType [%s]", cp.GetID(), tt) + t.defaultPlugins[tt] = cp + } + } pluginsForTaskType, ok := t.pluginsForType[tt] if !ok { @@ -226,6 +241,13 @@ func (t *Handler) Setup(ctx context.Context, sCtx handler.SetupContext) error { } pluginsForTaskType[cp.GetID()] = cp t.pluginsForType[tt] = pluginsForTaskType + + fallbackMap, ok := fallbackTaskHandlerMap[tt] + if !ok { + fallbackMap = make(map[pluginID]pluginCore.Plugin) + } + fallbackMap[cp.GetID()] = cp + fallbackTaskHandlerMap[tt] = fallbackMap } if p.IsDefault { if err := t.setDefault(ctx, cp); err != nil { @@ -234,6 +256,20 @@ func (t *Handler) Setup(ctx context.Context, sCtx handler.SetupContext) error { } } + // Read from the fallback task handler map for any remaining tasks without a defaultPlugins registered handler. + for taskType, registeredPlugins := range fallbackTaskHandlerMap { + if _, ok := t.defaultPlugins[taskType]; ok { + break + } + if len(registeredPlugins) != 1 { + logger.Errorf(ctx, "Multiple plugins registered to handle task type: %s. ([%+v])", taskType, registeredPlugins) + return regErrors.New(fmt.Sprintf("Multiple plugins registered to handle task type: %s. ([%+v]). Use default-for-task-type config option to choose the desired plugin.", taskType, registeredPlugins)) + } + for _, plugin := range registeredPlugins { + t.defaultPlugins[taskType] = plugin + } + } + rm, err := newResourceManagerBuilder.BuildResourceManager(ctx) if err != nil { logger.Errorf(ctx, "Failed to build a resource manager") diff --git a/pkg/controller/nodes/task/handler_test.go b/pkg/controller/nodes/task/handler_test.go index 648893b390..85ab2fb367 100644 --- a/pkg/controller/nodes/task/handler_test.go +++ b/pkg/controller/nodes/task/handler_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + pluginK8sMocks "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s/mocks" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" @@ -28,7 +30,6 @@ import ( "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" ioMocks "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io/mocks" pluginK8s "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s" - pluginK8sMocks "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s/mocks" "github.com/lyft/flytestdlib/promutils" "github.com/lyft/flytestdlib/storage" "github.com/stretchr/testify/assert" @@ -150,41 +151,60 @@ func Test_task_Setup(t *testing.T) { defaultPluginID string } tests := []struct { - name string - registry PluginRegistryIface - fields wantFields - wantErr bool + name string + registry PluginRegistryIface + enabledPlugins []string + defaultForTaskTypes map[string]string + fields wantFields + wantErr bool }{ - {"no-plugins", testPluginRegistry{}, wantFields{}, false}, + {"no-plugins", testPluginRegistry{}, []string{}, map[string]string{}, wantFields{}, false}, {"no-default-only-core", testPluginRegistry{ core: []pluginCore.PluginEntry{corePluginEntry}, k8s: []pluginK8s.PluginEntry{}, - }, wantFields{ - pluginIDs: map[pluginCore.TaskType]string{corePluginType: corePluginType}, - }, false}, + }, []string{corePluginType}, map[string]string{ + corePluginType: corePluginType}, + wantFields{ + pluginIDs: map[pluginCore.TaskType]string{corePluginType: corePluginType}, + }, false}, {"no-default-only-k8s", testPluginRegistry{ core: []pluginCore.PluginEntry{}, k8s: []pluginK8s.PluginEntry{k8sPluginEntry}, + }, []string{k8sPluginType}, map[string]string{ + k8sPluginType: k8sPluginType}, + wantFields{ + pluginIDs: map[pluginCore.TaskType]string{k8sPluginType: k8sPluginType}, + }, false}, + {"no-default", testPluginRegistry{}, []string{corePluginType, k8sPluginType}, map[string]string{ + corePluginType: corePluginType, + k8sPluginType: k8sPluginType, }, wantFields{ - pluginIDs: map[pluginCore.TaskType]string{k8sPluginType: k8sPluginType}, - }, false}, - {"no-default", testPluginRegistry{ - core: []pluginCore.PluginEntry{corePluginEntry}, k8s: []pluginK8s.PluginEntry{k8sPluginEntry}, - }, wantFields{ - pluginIDs: map[pluginCore.TaskType]string{corePluginType: corePluginType, k8sPluginType: k8sPluginType}, + pluginIDs: map[pluginCore.TaskType]string{}, }, false}, {"only-default-core", testPluginRegistry{ core: []pluginCore.PluginEntry{corePluginEntry, corePluginEntryDefault}, k8s: []pluginK8s.PluginEntry{k8sPluginEntry}, + }, []string{corePluginType, corePluginDefaultType, k8sPluginType}, map[string]string{ + corePluginType: corePluginType, + corePluginDefaultType: corePluginDefaultType, + k8sPluginType: k8sPluginType, }, wantFields{ pluginIDs: map[pluginCore.TaskType]string{corePluginType: corePluginType, corePluginDefaultType: corePluginDefaultType, k8sPluginType: k8sPluginType}, defaultPluginID: corePluginDefaultType, }, false}, {"only-default-k8s", testPluginRegistry{ core: []pluginCore.PluginEntry{corePluginEntry}, k8s: []pluginK8s.PluginEntry{k8sPluginEntryDefault}, + }, []string{corePluginType, k8sPluginDefaultType}, map[string]string{ + corePluginType: corePluginType, + k8sPluginDefaultType: k8sPluginDefaultType, }, wantFields{ pluginIDs: map[pluginCore.TaskType]string{corePluginType: corePluginType, k8sPluginDefaultType: k8sPluginDefaultType}, defaultPluginID: k8sPluginDefaultType, }, false}, {"default-both", testPluginRegistry{ core: []pluginCore.PluginEntry{corePluginEntry, corePluginEntryDefault}, k8s: []pluginK8s.PluginEntry{k8sPluginEntry, k8sPluginEntryDefault}, + }, []string{corePluginType, corePluginDefaultType, k8sPluginType, k8sPluginDefaultType}, map[string]string{ + corePluginType: corePluginType, + corePluginDefaultType: corePluginDefaultType, + k8sPluginType: k8sPluginType, + k8sPluginDefaultType: k8sPluginDefaultType, }, wantFields{ pluginIDs: map[pluginCore.TaskType]string{corePluginType: corePluginType, corePluginDefaultType: corePluginDefaultType, k8sPluginType: k8sPluginType, k8sPluginDefaultType: k8sPluginDefaultType}, defaultPluginID: corePluginDefaultType, @@ -200,6 +220,8 @@ func Test_task_Setup(t *testing.T) { sCtx.On("MetricsScope").Return(promutils.NewTestScope()) tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), &pluginCatalogMocks.Client{}, promutils.NewTestScope()) + tk.cfg.TaskPlugins.EnabledPlugins = tt.enabledPlugins + tk.cfg.TaskPlugins.DefaultForTaskTypes = tt.defaultForTaskTypes assert.NoError(t, err) tk.pluginRegistry = tt.registry if err := tk.Setup(context.TODO(), sCtx); err != nil { diff --git a/pkg/controller/nodes/task/plugin_config.go b/pkg/controller/nodes/task/plugin_config.go index 2f1372010b..d6ab6b4b35 100644 --- a/pkg/controller/nodes/task/plugin_config.go +++ b/pkg/controller/nodes/task/plugin_config.go @@ -8,7 +8,6 @@ import ( "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" "github.com/lyft/flytestdlib/logger" - "k8s.io/apimachinery/pkg/util/sets" "github.com/lyft/flytepropeller/pkg/controller/nodes/task/config" "github.com/lyft/flytepropeller/pkg/controller/nodes/task/k8s" @@ -16,23 +15,32 @@ import ( func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPluginConfig, pr PluginRegistryIface) ([]core.PluginEntry, error) { allPluginsEnabled := false - enabledPlugins := sets.NewString() + pluginsConfigMeta := config.PluginsConfigMeta{ + AllDefaultForTaskTypes: map[pluginID][]taskType{}, + } + var err error if cfg != nil { - enabledPlugins = cfg.GetEnabledPluginsSet() + pluginsConfigMeta, err = cfg.GetEnabledPlugins() + if err != nil { + return nil, err + } } - if enabledPlugins.Len() == 0 { + if pluginsConfigMeta.EnabledPlugins.Len() == 0 { allPluginsEnabled = true } var finalizedPlugins []core.PluginEntry - logger.Infof(ctx, "Enabled plugins: %v", enabledPlugins.List()) + logger.Infof(ctx, "Enabled plugins: %v", pluginsConfigMeta.EnabledPlugins.List()) logger.Infof(ctx, "Loading core Plugins, plugin configuration [all plugins enabled: %v]", allPluginsEnabled) for _, cpe := range pr.GetCorePlugins() { id := strings.ToLower(cpe.ID) - if !allPluginsEnabled && !enabledPlugins.Has(id) { + if !allPluginsEnabled && !pluginsConfigMeta.EnabledPlugins.Has(id) { logger.Infof(ctx, "Plugin [%s] is DISABLED (not found in enabled plugins list).", id) } else { logger.Infof(ctx, "Plugin [%s] ENABLED", id) + if defaults, ok := pluginsConfigMeta.AllDefaultForTaskTypes[id]; ok { + cpe.DefaultForTaskTypes = defaults + } finalizedPlugins = append(finalizedPlugins, cpe) } } @@ -47,18 +55,20 @@ func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPlu for i := range k8sPlugins { kpe := k8sPlugins[i] id := strings.ToLower(kpe.ID) - if !allPluginsEnabled && !enabledPlugins.Has(id) { + if !allPluginsEnabled && !pluginsConfigMeta.EnabledPlugins.Has(id) { logger.Infof(ctx, "K8s Plugin [%s] is DISABLED (not found in enabled plugins list).", id) } else { logger.Infof(ctx, "K8s Plugin [%s] is ENABLED.", id) - finalizedPlugins = append(finalizedPlugins, core.PluginEntry{ + plugin := core.PluginEntry{ ID: id, RegisteredTaskTypes: kpe.RegisteredTaskTypes, LoadPlugin: func(ctx context.Context, iCtx core.SetupContext) (plugin core.Plugin, e error) { return k8s.NewPluginManagerWithBackOff(ctx, iCtx, kpe, backOffController, monitorIndex) }, - IsDefault: kpe.IsDefault, - }) + IsDefault: kpe.IsDefault, + DefaultForTaskTypes: pluginsConfigMeta.AllDefaultForTaskTypes[id], + } + finalizedPlugins = append(finalizedPlugins, plugin) } } return finalizedPlugins, nil diff --git a/pkg/controller/nodes/task/plugin_config_test.go b/pkg/controller/nodes/task/plugin_config_test.go index fd28892ce7..41bc0757aa 100644 --- a/pkg/controller/nodes/task/plugin_config_test.go +++ b/pkg/controller/nodes/task/plugin_config_test.go @@ -52,8 +52,8 @@ func TestWranglePluginsAndGenerateFinalList(t *testing.T) { {"no-config-no-plugins", args{}, want{}}, {"no-config-plugins", args{corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin)}, want{final: sets.NewString(k8sContainer, k8sOther, coreOther, coreContainer)}}, {"empty-config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin)}, want{final: sets.NewString(k8sContainer, k8sOther, coreOther, coreContainer)}}, - {"config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{coreContainer, k8sOther}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}}, - {"case-differs-config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{strings.ToUpper(coreContainer), strings.ToUpper(k8sOther)}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}}, + {"config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{coreContainer, k8sOther}, DefaultForTaskTypes: map[string]string{"container": coreContainer}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}}, + {"case-differs-config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{strings.ToUpper(coreContainer), strings.ToUpper(k8sOther)}, DefaultForTaskTypes: map[string]string{"container": coreContainer}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {