diff --git a/Gopkg.lock b/Gopkg.lock index ebc7b1a46f..2bf63a6c1a 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -2,23 +2,26 @@ [[projects]] - digest = "1:9facd370d4b5aaf82bb6549c358eefc71c771458fc91d1afe724434aece9d886" + digest = "1:975b4c23997bb1cb942934f3144732b2d6d3c7d250c2d8e53fe96de6c5a1ecea" name = "cloud.google.com/go" packages = ["compute/metadata"] pruneopts = "" - revision = "cfe8f6d1fe6976d03af790d7a8b9bf6aa73287bd" - version = "v0.47.0" + revision = "b4cdc8d6eb508c4e74df26094d1adb678c87f818" + version = "v0.51.0" [[projects]] - digest = "1:9a11be778d5fcb8e4873e64a097dfd2862d8665d9e2d969b90810d5272e51acb" + digest = "1:e4d91a70242d1adc40f296d397be3bfd6f3728d2abc2445189b60d9e8aded5c3" name = "github.com/Azure/azure-sdk-for-go" - packages = ["storage"] + packages = [ + "storage", + "version", + ] pruneopts = "" - revision = "2d49bb8f2cee530cc16f1f1a9f0aae763dee257d" - version = "v10.2.1-beta" + revision = "2e62f976f4de2713ef1b8bb3254bebb81b95b638" + version = "v38.0.0" [[projects]] - digest = "1:e4a02906493a47ee87ef61aeea130ce6624da07349a6dc62494a4e72b550ca8e" + digest = "1:b82e05494fb7e33ff9e4421171be18159a8afb614e3794572ddfe2db28a05d5f" name = "github.com/Azure/go-autorest" packages = [ "autorest", @@ -29,8 +32,8 @@ "tracing", ] pruneopts = "" - revision = "3492b2aff5036c67228ab3c7dba3577c871db200" - version = "v13.3.0" + revision = "21d4b01533b1005be0d020da67a6d3f8ebdf0141" + version = "v13.3.1" [[projects]] digest = "1:e1549ae10031ac55dd7d26ac4d480130ddbdf97f9a26ebbedff089aa0335798f" @@ -45,10 +48,11 @@ version = "v0.1.3" [[projects]] - digest = "1:5cd3c3d202b439040a051b6fa21f538af6469c94f9758dfb4f97a24ceb144149" + digest = "1:e08d1378855f8ac73a1d60f3ea4ac230fab251c50c753bf3eae22766ef480060" name = "github.com/aws/aws-sdk-go" packages = [ "aws", + "aws/arn", "aws/awserr", "aws/awsutil", "aws/client", @@ -73,6 +77,7 @@ "internal/sdkrand", "internal/sdkuri", "internal/shareddefaults", + "internal/strings", "private/protocol", "private/protocol/eventstream", "private/protocol/eventstream/eventstreamapi", @@ -86,14 +91,15 @@ "private/protocol/xml/xmlutil", "service/batch", "service/s3", + "service/s3/internal/arn", "service/s3/s3iface", "service/s3/s3manager", "service/sts", "service/sts/stsiface", ] pruneopts = "" - revision = "2f232d11486e77d344da0723340b566d3ff7865a" - version = "v1.25.24" + revision = "e92e1b8eb5fa589199a2bbcdf84a0342fbc40dc6" + version = "v1.27.4" [[projects]] digest = "1:ac2a05be7167c495fe8aaf8aaf62ecf81e78d2180ecb04e16778dc6c185c96a5" @@ -104,12 +110,12 @@ version = "v1.0.1" [[projects]] - digest = "1:545ae40d6dde46043a71bdfd7f9a17f2353ce16277c83ac685af231b4b7c4beb" + digest = "1:d25acc7560ed91f825cb9b01a1e945bb1117cdcaba19077137e2d9ffc9cf6d05" name = "github.com/cespare/xxhash" packages = ["."] pruneopts = "" - revision = "de209a9ffae3256185a6bb135d1a0ada7b2b5f09" - version = "v2.1.0" + revision = "d7df74196a9e781ede915320c11c378c1b2f3a1f" + version = "v2.1.1" [[projects]] digest = "1:193f6d32d751f26540aa8eeedc114ce0a51f9e77b6c22dda3a4db4e5f65aec66" @@ -136,28 +142,28 @@ version = "v3.2.0" [[projects]] - digest = "1:46ddeb9dd35d875ac7568c4dc1fc96ce424e034bdbb984239d8ffc151398ec01" + digest = "1:cdcdaf690213dd7daa324a427928c6e7b062085c5fd6d4272db8fb0afba8dac3" name = "github.com/evanphx/json-patch" packages = ["."] pruneopts = "" - revision = "026c730a0dcc5d11f93f1cf1cc65b01247ea7b6f" - version = "v4.5.0" + revision = "bf22ed9311622d93e213ba31e4ae7a5771e5d379" + version = "v4.6.0" [[projects]] - digest = "1:e988ed0ca0d81f4d28772760c02ee95084961311291bdfefc1b04617c178b722" + digest = "1:38c537dca3034658912a9ed95ae5d0d300a6a37cd04ca53143e056a5170a3c83" name = "github.com/fatih/color" packages = ["."] pruneopts = "" - revision = "5b77d2a35fb0ede96d138fc9a99f5c9b6aef11b4" - version = "v1.7.0" + revision = "daf2830f2741ebb735b21709a520c5f37d642d85" + version = "v1.9.0" [[projects]] - branch = "master" - digest = "1:c47fa36ae2c154748003f5d39c5ca94e86ec144b37e4975fad91a1c7ff5047fb" + digest = "1:eb53021a8aa3f599d29c7102e65026242bdedce998a54837dc67f14b6a97c5fd" name = "github.com/fsnotify/fsnotify" packages = ["."] pruneopts = "" - revision = "4bf2d1fec78374803a39307bfb8d340688f4f28e" + revision = "c2828203cd70a50dcccfb2761f8b1f8ceef9a8e9" + version = "v1.4.7" [[projects]] digest = "1:65587005c6fa4293c0b8a2e457e689df7fda48cc5e1f5449ea2c1e7784551558" @@ -188,11 +194,11 @@ [[projects]] branch = "master" - digest = "1:156e8134a4ab3efcbcbc6e3dd5069bfb70ed8cbb1cdd7db557bd9035c363f335" + digest = "1:bd738ea0dd7d1ce14f6410e1a966c946bcb1855db767a16836d5c56ca3f2f2a0" name = "github.com/golang/groupcache" packages = ["lru"] pruneopts = "" - revision = "611e8accdfc92c4187d399e95ce826046d4c8d73" + revision = "215e87163ea771ffa998a96c611387313bb5a403" [[projects]] digest = "1:b852d2b62be24e445fcdbad9ce3015b44c207815d631230dfce3f14e7803f5bf" @@ -211,7 +217,7 @@ version = "v1.3.2" [[projects]] - digest = "1:9fcb267c272bc5054564b392e3ff7e65e35400fd9914afb1d169f92b95e7dbc9" + digest = "1:c35d25413dce110e4ff2ca871c648bf5365ab8c954021394cabd39f5e0566fd8" name = "github.com/google/go-cmp" packages = [ "cmp", @@ -221,8 +227,8 @@ "cmp/internal/value", ] pruneopts = "" - revision = "2d0692c2e9617365a95b295612ac0d4415ba4627" - version = "v0.3.1" + revision = "5a6f75716e1203a923a78c9efb94089d857df0f6" + version = "v0.4.0" [[projects]] digest = "1:8d4a577a9643f713c25a32151c0f26af7228b4b97a219b5ddb7fd38d16f6e673" @@ -253,7 +259,7 @@ version = "v0.3.1" [[projects]] - digest = "1:1ea91d049b6a609f628ecdfda32e85f445a0d3671980dcbf7cbe1bbd7ee6aabc" + digest = "1:3c582fec3bc9ac9fd1809e58ed6139341afb16e83241d0a2edd7a293cc0e2792" name = "github.com/graymeta/stow" packages = [ ".", @@ -265,7 +271,8 @@ "swift", ] pruneopts = "" - revision = "903027f87de7054953efcdb8ba70d5dc02df38c7" + revision = "926777b8f7aaf9b57d475c612baebb0968532c48" + version = "v0.2.4" [[projects]] digest = "1:7f6f07500a0b7d3766b00fa466040b97f2f5b5f3eef2ecabfe516e703b05119a" @@ -313,12 +320,12 @@ revision = "c2b33e84" [[projects]] - digest = "1:64bdeae058b988b2b198326b1ca6155497e904e697348d838add8a6e4c25842e" + digest = "1:fb8bce9822eac1e2aeee6c2621cf25c6dec8f8f5f50a09a4a894d7932bfb2106" name = "github.com/json-iterator/go" packages = ["."] pruneopts = "" - revision = "03217c3e97663914aec3faafde50d081f197a0a2" - version = "v1.1.8" + revision = "acfec88f7a0d5140ace3dcdbee10184e3684a9e1" + version = "v1.1.9" [[projects]] digest = "1:0f51cee70b0d254dbc93c22666ea2abf211af81c1701a96d04e2284b408621db" @@ -329,7 +336,7 @@ version = "v1.0.2" [[projects]] - digest = "1:24508da9d4741637264e1f5deeceb58150415974ce08efe1f0764ffa881d833d" + digest = "1:7e7dbc278fab0a772ac3eb0f97c75677e151887fc9b8f065639090e0af0747a0" name = "github.com/lyft/flyteidl" packages = [ "clients/go/coreutils", @@ -339,11 +346,11 @@ "gen/pb-go/flyteidl/plugins", ] pruneopts = "" - revision = "7f8cdc2cb0f613bb62b8b44167828326b733f8a5" - version = "v0.16.3" + revision = "28c0dfb6608b70262aac9cb1ff83a750521ded8e" + version = "v0.16.5" [[projects]] - digest = "1:7fee6ae151d081a92d58a53099adcfccf89e424aef03eb20531ae0285da8613d" + digest = "1:6c8d5d4b7189d903f7a2bcca20d5e2d721493f51406a16dcf27caf09315f522c" name = "github.com/lyft/flytestdlib" packages = [ "atomic", @@ -363,8 +370,8 @@ "utils", ] pruneopts = "" - revision = "f9214671dc3b5e86ff029bd514fc18be38c1c3f0" - version = "v0.2.28" + revision = "ee0dc8209867ca344a70003924bfb6be12fd29ce" + version = "v0.2.31" [[projects]] digest = "1:ae39921edb7f801f7ce1b6b5484f9715a1dd2b52cb645daef095cd10fd6ee774" @@ -378,20 +385,20 @@ version = "v1.8.1" [[projects]] - digest = "1:9ea83adf8e96d6304f394d40436f2eb44c1dc3250d223b74088cc253a6cd0a1c" + digest = "1:9b58ad18b38cf0d44b1d006fce319e45f200eafa406fe7f65fe0d0194b9f7a9f" name = "github.com/mattn/go-colorable" packages = ["."] pruneopts = "" - revision = "167de6bfdfba052fa6b2d3664c8f5272e23c9072" - version = "v0.0.9" + revision = "98ec13f34aabf44cc914c65a1cfb7b9bc815aef1" + version = "v0.1.4" [[projects]] - digest = "1:5794dadc5456af270fc564d98efbfa1f3627cd6c733ca772d14409793772064f" + digest = "1:f438921468fbc770ac4c97628f188913a099d4f004226e4aeb6e0f62f5b14fd1" name = "github.com/mattn/go-isatty" packages = ["."] pruneopts = "" - revision = "88ba11cfdc67c7588b30042edf244b2875f892b6" - version = "v0.0.10" + revision = "31745d66dd679ac0ac4f8d3ecff168fce6170c6a" + version = "v0.0.11" [[projects]] digest = "1:63722a4b1e1717be7b98fc686e0b30d5e7f734b9e93d7dee86293b6deab7ea28" @@ -426,12 +433,12 @@ version = "1.0.1" [[projects]] - branch = "master" - digest = "1:b6c101f6c8ab09c631e969c30d3a4b42aeca82580499253bad77cb2426d4fc27" + digest = "1:f05d92116f66e924481201d313512e58cd85c135ba6d9818ab5f6b19501f8e76" name = "github.com/ncw/swift" packages = ["."] pruneopts = "" - revision = "a24ef33bc9b7e59ae4bed9e87a51d7bc76122731" + revision = "f737f4e00462f79ff2e0ddbcfb09331ce7ec4fa9" + version = "v1.0.49" [[projects]] digest = "1:5f347ea0b4656f17f0ddffae1419dafaac4bc6ed57c92d58cadc7452f8a9ac3f" @@ -469,12 +476,12 @@ version = "v0.9.4" [[projects]] - branch = "master" - digest = "1:0a565f69553dd41b3de790fde3532e9237142f2637899e20cd3e7396f0c4f2f7" + digest = "1:ff7a5f44653e65cf1a0577bbe3f2cdaf514930348f6df581bbd687bbe35ead5b" name = "github.com/prometheus/client_model" packages = ["go"] pruneopts = "" - revision = "14fe0d1b01d4d5fc031dd4bec1823bd3ebbe8016" + revision = "d1d2010b5beead3fa1c5f271a5cf626e40b3ad6e" + version = "v0.1.0" [[projects]] digest = "1:8904acfa3ef080005c1fc0670ed0471739d1e211be5638cfa6af536b701942ae" @@ -489,7 +496,7 @@ version = "v0.7.0" [[projects]] - digest = "1:af5cd8219fd15c06eadaab455c0beb72f2f7bb32d298acb401d30c452a8dbd7e" + digest = "1:4c64aa254bc24990bc0216de9dd955ff83f061e9baac7ed2ffc293442ab7514a" name = "github.com/prometheus/procfs" packages = [ ".", @@ -497,12 +504,12 @@ "internal/util", ] pruneopts = "" - revision = "499c85531f756d1129edd26485a5f73871eeb308" - version = "v0.0.5" + revision = "6d489fc7f1d9cd890a250f3ea3431b1744b9623f" + version = "v0.0.8" [[projects]] digest = "1:7f569d906bdd20d906b606415b7d794f798f91a62fcfb6a4daa6d50690fb7a3f" - name = "github.com/satori/uuid" + name = "github.com/satori/go.uuid" packages = ["."] pruneopts = "" revision = "f58768cc1a7a7e77a3bd49e98cdd21419399b6a3" @@ -528,12 +535,12 @@ version = "v1.2.2" [[projects]] - digest = "1:ae3493c780092be9d576a1f746ab967293ec165e8473425631f06658b6212afc" + digest = "1:0a3d8f7dc17afdc30a5a5becb45fb91ff28aaa64645c69836ccd600d88e9ed9e" name = "github.com/spf13/cast" packages = ["."] pruneopts = "" - revision = "8c9545af88b134710ab1cd196795e7f2388358d7" - version = "v1.3.0" + revision = "1ffadf551085444af981432dd0f6d1160c11ec64" + version = "v1.3.1" [[projects]] digest = "1:0c63b3c7ad6d825a898f28cb854252a3b29d37700c68a117a977263f5ec94efe" @@ -560,11 +567,12 @@ version = "v1.0.5" [[projects]] - digest = "1:c25a789c738f7cc8ec7f34026badd4e117853f329334a5aa45cf5d0727d7d442" + digest = "1:ede5f300103cb012aafde77c692dc853c4b590bb412d3c7965a11748a5c37635" name = "github.com/spf13/viper" packages = ["."] pruneopts = "" - revision = "ae103d7e593e371c69e832d5eb3347e2b80cbbc9" + revision = "eabbc68a3ecd5cf8c11a2f84dbda5e7a38493b2f" + version = "v1.6.1" [[projects]] digest = "1:711eebe744c0151a9d09af2315f0bb729b2ec7637ef4c410fa90a18ef74b65b6" @@ -585,6 +593,14 @@ revision = "221dbe5ed46703ee255b1da0dec05086f5035f62" version = "v1.4.0" +[[projects]] + digest = "1:e818a738c880c5571b3817e9173add5c598a90f8615fa7a9bafc23fea774a603" + name = "github.com/subosito/gotenv" + packages = ["."] + pruneopts = "" + revision = "2ef7124db659d49edac6aa459693a15ae36c671a" + version = "v1.2.0" + [[projects]] branch = "master" digest = "1:f05401b06311e9ab82c76f89022a3503139a9f608233fdbf14386ed1e4fc9520" @@ -595,7 +611,7 @@ source = "github.com/enghabu/mockery" [[projects]] - digest = "1:1967fb934ef747bf690fcc56487a06c46bf674bd91cb3381a78a7e4d5c2e1a82" + digest = "1:61ff2a14b0f9a396d92bb6967ecd1ae4c4767ff3e008acdfd8291af134336d9f" name = "go.opencensus.io" packages = [ ".", @@ -616,20 +632,20 @@ "trace/tracestate", ] pruneopts = "" - revision = "59d1ce35d30f3c25ba762169da2a37eab6ffa041" - version = "v0.22.1" + revision = "aad2c527c5defcf89b5afab7f37274304195a6b2" + version = "v0.22.2" [[projects]] branch = "master" - digest = "1:47cd7b63835b62da6a8c3817fafde2e07aaf952ce170a6ecafedb06946be6ddb" + digest = "1:47ec6ad813222524cbcd6c5c23b9a6a1a3f45b502fe5088fd59e6ba6a1436db3" name = "golang.org/x/crypto" packages = ["ssh/terminal"] pruneopts = "" - revision = "8986dd9e96cf0a6f74da406c005ba3df38527c04" + revision = "61a87790db17894570dfb32dbaa0a4af9ce60cb4" [[projects]] branch = "master" - digest = "1:196c20449a0cc94113029193d7ce9e36e46e676c27a2fe06522243f57ced19c5" + digest = "1:cab37ea831bb9be343b41a7673e64b6bb8399bc0f051180c04ce561573ed2c89" name = "golang.org/x/net" packages = [ "context", @@ -642,11 +658,11 @@ "trace", ] pruneopts = "" - revision = "fe3aa8a4527195a6057b3fad46619d7d090e99b5" + revision = "c0dbc17a35534bf2e581d7a942408dc936316da4" [[projects]] branch = "master" - digest = "1:01bdbbc604dcd5afb6f66a717f69ad45e9643c72d5bc11678d44ffa5c50f9e42" + digest = "1:571c7f844acf3c916ac5997f82f227c49a38490f1fca65afd47d64d188ced96e" name = "golang.org/x/oauth2" packages = [ ".", @@ -656,18 +672,18 @@ "jwt", ] pruneopts = "" - revision = "0f29369cfe4552d0e4bcddc57cc75f4d7e672a33" + revision = "bf48bf16ab8d622ce64ec6ce98d2c98f916b6303" [[projects]] branch = "master" - digest = "1:a155b7400cb9270dda3f63651160c0349a9e16855975956c79e618e9f30c160c" + digest = "1:ca984d839d46cda7860ac472476ef53423d8be07f19822f59a7c575127f995be" name = "golang.org/x/sys" packages = [ "unix", "windows", ] pruneopts = "" - revision = "f43be2a4598cf3a47be9f94f0c28197ed9eae611" + revision = "548cf772de5052aa878ccb47cdeb7d262b75c8ec" [[projects]] digest = "1:740b51a55815493a8d0f2b1e0d0ae48fe48953bf7eaf3fcc4198823bf67768c0" @@ -704,18 +720,17 @@ [[projects]] branch = "master" - digest = "1:61bfb54376228f1111f1f0f2d609971251f49ba1e3d8dd801249b521010ae0ca" + digest = "1:9d4ac09a835404ae9306c6e1493cf800ecbb0f3f828f4333b3e055de4c962eea" name = "golang.org/x/xerrors" packages = [ ".", "internal", ] pruneopts = "" - revision = "1b5146add8981d58be77b16229c0ff0f8bebd8c1" + revision = "9bdfabe68543c54f90421aeb9a60ef8061b5b544" [[projects]] - branch = "master" - digest = "1:6cad05171b0cb6abda71bcea1abfc392b32816264a0cfd15ea3ed629a25219df" + digest = "1:1e9e6f4d4ca4c890f3de6dbaf071a9a428991c571fbaeae210981729cd19cd1a" name = "google.golang.org/api" packages = [ "googleapi", @@ -729,7 +744,8 @@ "transport/http/internal/propagation", ] pruneopts = "" - revision = "4b5ab5e994c422e7c6359a085062f721716b4c22" + revision = "aa5d4e47691e7ae1aebb5221ff8e4beea23fad72" + version = "v0.15.0" [[projects]] digest = "1:c4404231035fad619a12f82ae3f0f8f9edc1cc7f34e7edad7a28ccac5336cc96" @@ -752,17 +768,19 @@ [[projects]] branch = "master" - digest = "1:b29336aebd5fb26d3f6b8e776969db4e4e29765ff7313eb43d4593961a64e438" + digest = "1:a58dce82b2e7f7174b861598654aa2d8c7f1a545eb0c961f87cc569029ce1941" name = "google.golang.org/genproto" packages = ["googleapis/rpc/status"] pruneopts = "" - revision = "919d9bdd9fe6f1a5dd95ce5d5e4cdb8fd3c516d0" + revision = "bd8f9a0ef82f9870cb10caef4f23c348069600cb" [[projects]] - digest = "1:30d215704e78c21ffff90aa8e86ca1a438fec2837bad082116510fe5a862af5e" + digest = "1:7af390490e636a6adc9c76b37a3c823195fbf375a02c4d9506b4dd49d5d2409a" name = "google.golang.org/grpc" packages = [ ".", + "attributes", + "backoff", "balancer", "balancer/base", "balancer/roundrobin", @@ -778,10 +796,13 @@ "internal/backoff", "internal/balancerload", "internal/binarylog", + "internal/buffer", "internal/channelz", "internal/envconfig", "internal/grpcrand", "internal/grpcsync", + "internal/resolver/dns", + "internal/resolver/passthrough", "internal/syscall", "internal/transport", "keepalive", @@ -789,16 +810,14 @@ "naming", "peer", "resolver", - "resolver/dns", - "resolver/passthrough", "serviceconfig", "stats", "status", "tap", ] pruneopts = "" - revision = "f6d0f9ee430895e87ef1ceb5ac8f39725bafceef" - version = "v1.24.0" + revision = "f5b0812e6fe574d90da76b205e9eb51f6ddb1919" + version = "v1.26.0" [[projects]] digest = "1:75fb3fcfc73a8c723efde7777b40e8e8ff9babf30d8c56160d01beffea8a95a6" @@ -809,12 +828,20 @@ version = "v0.9.1" [[projects]] - digest = "1:ab9547706f32a7535bb4f25d6b58ad00436630593cd3e3ed4602f1613ed84783" + digest = "1:d68a26e5d4ba1d9213041aad76d2d6b6d68dba853ceb585235311bd86748e473" + name = "gopkg.in/ini.v1" + packages = ["."] + pruneopts = "" + revision = "94291fffe2b14f4632ec0e67c1bfecfc1287a168" + version = "v1.51.1" + +[[projects]] + digest = "1:5a53f6ef09fb1ac261a97f8a72e8837ff53cbaa969022a6679da210e4cbe9b0f" name = "gopkg.in/yaml.v2" packages = ["."] pruneopts = "" - revision = "f221b8435cfb71e54062f6c6e99e9ade30b124d5" - version = "v2.2.4" + revision = "1f64d6156d11335c3f22d9330b0ad14fc1e789ce" + version = "v2.2.7" [[projects]] digest = "1:4a456d15ea81830e9aaf685ad36100c8be2adfb646996db18432073e07949fe9" @@ -959,15 +986,15 @@ [[projects]] branch = "master" - digest = "1:ad13d36fb31a3e590b143439610f1a35b4033437ebf565dbc14a72ed4bd61dfb" + digest = "1:16a343bd9d820ae320de4d1eaa8acc7a214aac4b38fb21d03255d3a457d861df" name = "k8s.io/kube-openapi" packages = ["pkg/util/proto"] pruneopts = "" - revision = "0270cf2f1c1d995d34b36019a6f65d58e6e33ad4" + revision = "30be4d16710ac61bce31eb28a01054596fe6a9f1" [[projects]] branch = "master" - digest = "1:eea15d4ad177b9185f26550b314024a374838830b33a0954b179cad16f2e39a7" + digest = "1:fa34b9e9fa4395fc2a63a64265dda7bff9a9ab4b5bd94020be3ff866dc87ca0f" name = "k8s.io/utils" packages = [ "buffer", @@ -975,7 +1002,7 @@ "trace", ] pruneopts = "" - revision = "2b95a09bc58df43d4032504619706b6a38293a47" + revision = "94aeca20bf0991bf33922a5938174b9147ab8ca7" [[projects]] digest = "1:d65cadd0126bf9385147e2853e53fda65fc419c04da329b3b077fb4765e3737c" diff --git a/Makefile b/Makefile index f2e2a3125f..3c4f908bc3 100755 --- a/Makefile +++ b/Makefile @@ -8,7 +8,7 @@ update_boilerplate: generate: which pflags || (go get github.com/lyft/flytestdlib/cli/pflags) - which mockery || (go install github.com/lyft/flyteidl/vendor/github.com/vektra/mockery/cmd/mockery) + which mockery || (go install github.com/lyft/flyteplugins/vendor/github.com/vektra/mockery/cmd/mockery) which enumer || (go get github.com/alvaroloes/enumer) @go generate ./... diff --git a/go/tasks/logs/config.go b/go/tasks/logs/config.go index e47202c71a..d52c9e6eec 100755 --- a/go/tasks/logs/config.go +++ b/go/tasks/logs/config.go @@ -30,4 +30,3 @@ func GetLogConfig() *LogConfig { func SetLogConfig(logConfig *LogConfig) error { return logConfigSection.SetConfig(logConfig) } - diff --git a/go/tasks/pluginmachinery/catalog/mocks/async_client.go b/go/tasks/pluginmachinery/catalog/mocks/async_client.go index 46fc8c5eca..7ec246a001 100644 --- a/go/tasks/pluginmachinery/catalog/mocks/async_client.go +++ b/go/tasks/pluginmachinery/catalog/mocks/async_client.go @@ -2,13 +2,9 @@ package mocks -import ( - context "context" - - catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" - - mock "github.com/stretchr/testify/mock" -) +import catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" +import context "context" +import mock "github.com/stretchr/testify/mock" // AsyncClient is an autogenerated mock type for the AsyncClient type type AsyncClient struct { diff --git a/go/tasks/pluginmachinery/catalog/mocks/client.go b/go/tasks/pluginmachinery/catalog/mocks/client.go index d3bedd110d..04dcf7a3e5 100644 --- a/go/tasks/pluginmachinery/catalog/mocks/client.go +++ b/go/tasks/pluginmachinery/catalog/mocks/client.go @@ -2,15 +2,10 @@ package mocks -import ( - context "context" - - catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" - - io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" - - mock "github.com/stretchr/testify/mock" -) +import catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" +import context "context" +import io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" +import mock "github.com/stretchr/testify/mock" // Client is an autogenerated mock type for the Client type type Client struct { diff --git a/go/tasks/pluginmachinery/catalog/mocks/download_future.go b/go/tasks/pluginmachinery/catalog/mocks/download_future.go index ba08ced6f0..6c555a243d 100644 --- a/go/tasks/pluginmachinery/catalog/mocks/download_future.go +++ b/go/tasks/pluginmachinery/catalog/mocks/download_future.go @@ -2,10 +2,8 @@ package mocks -import ( - catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" - mock "github.com/stretchr/testify/mock" -) +import catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" +import mock "github.com/stretchr/testify/mock" // DownloadFuture is an autogenerated mock type for the DownloadFuture type type DownloadFuture struct { diff --git a/go/tasks/pluginmachinery/catalog/mocks/download_response.go b/go/tasks/pluginmachinery/catalog/mocks/download_response.go index cff414ea80..a58f93fa01 100644 --- a/go/tasks/pluginmachinery/catalog/mocks/download_response.go +++ b/go/tasks/pluginmachinery/catalog/mocks/download_response.go @@ -2,11 +2,9 @@ package mocks -import ( - bitarray "github.com/lyft/flytestdlib/bitarray" +import bitarray "github.com/lyft/flytestdlib/bitarray" - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // DownloadResponse is an autogenerated mock type for the DownloadResponse type type DownloadResponse struct { diff --git a/go/tasks/pluginmachinery/catalog/mocks/future.go b/go/tasks/pluginmachinery/catalog/mocks/future.go index 710d46b753..c08d0b949a 100644 --- a/go/tasks/pluginmachinery/catalog/mocks/future.go +++ b/go/tasks/pluginmachinery/catalog/mocks/future.go @@ -2,10 +2,8 @@ package mocks -import ( - catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" - mock "github.com/stretchr/testify/mock" -) +import catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" +import mock "github.com/stretchr/testify/mock" // Future is an autogenerated mock type for the Future type type Future struct { diff --git a/go/tasks/pluginmachinery/catalog/mocks/upload_future.go b/go/tasks/pluginmachinery/catalog/mocks/upload_future.go index c6bd559186..42686bd74c 100644 --- a/go/tasks/pluginmachinery/catalog/mocks/upload_future.go +++ b/go/tasks/pluginmachinery/catalog/mocks/upload_future.go @@ -2,10 +2,8 @@ package mocks -import ( - catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" - mock "github.com/stretchr/testify/mock" -) +import catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" +import mock "github.com/stretchr/testify/mock" // UploadFuture is an autogenerated mock type for the UploadFuture type type UploadFuture struct { diff --git a/go/tasks/pluginmachinery/core/mocks/events_recorder.go b/go/tasks/pluginmachinery/core/mocks/events_recorder.go index ae493a1ec2..a2ba4a287d 100644 --- a/go/tasks/pluginmachinery/core/mocks/events_recorder.go +++ b/go/tasks/pluginmachinery/core/mocks/events_recorder.go @@ -2,12 +2,9 @@ package mocks -import ( - context "context" - - core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" - mock "github.com/stretchr/testify/mock" -) +import context "context" +import core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" +import mock "github.com/stretchr/testify/mock" // EventsRecorder is an autogenerated mock type for the EventsRecorder type type EventsRecorder struct { diff --git a/go/tasks/pluginmachinery/core/mocks/kube_client.go b/go/tasks/pluginmachinery/core/mocks/kube_client.go index 1404b317b5..ca48835335 100644 --- a/go/tasks/pluginmachinery/core/mocks/kube_client.go +++ b/go/tasks/pluginmachinery/core/mocks/kube_client.go @@ -2,12 +2,10 @@ package mocks -import ( - cache "sigs.k8s.io/controller-runtime/pkg/cache" - client "sigs.k8s.io/controller-runtime/pkg/client" +import cache "sigs.k8s.io/controller-runtime/pkg/cache" +import client "sigs.k8s.io/controller-runtime/pkg/client" - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // KubeClient is an autogenerated mock type for the KubeClient type type KubeClient struct { diff --git a/go/tasks/pluginmachinery/core/mocks/plugin.go b/go/tasks/pluginmachinery/core/mocks/plugin.go index 1d925a798b..74b6ae5b5a 100644 --- a/go/tasks/pluginmachinery/core/mocks/plugin.go +++ b/go/tasks/pluginmachinery/core/mocks/plugin.go @@ -2,12 +2,9 @@ package mocks -import ( - context "context" - - core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" - mock "github.com/stretchr/testify/mock" -) +import context "context" +import core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" +import mock "github.com/stretchr/testify/mock" // Plugin is an autogenerated mock type for the Plugin type type Plugin struct { diff --git a/go/tasks/pluginmachinery/core/mocks/resource_manager.go b/go/tasks/pluginmachinery/core/mocks/resource_manager.go index 1f01a632e5..3657a69580 100644 --- a/go/tasks/pluginmachinery/core/mocks/resource_manager.go +++ b/go/tasks/pluginmachinery/core/mocks/resource_manager.go @@ -2,12 +2,9 @@ package mocks -import ( - context "context" - - core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" - mock "github.com/stretchr/testify/mock" -) +import context "context" +import core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" +import mock "github.com/stretchr/testify/mock" // ResourceManager is an autogenerated mock type for the ResourceManager type type ResourceManager struct { @@ -53,6 +50,38 @@ func (_m *ResourceManager) AllocateResource(ctx context.Context, namespace core. return r0, r1 } +type ResourceManager_GetID struct { + *mock.Call +} + +func (_m ResourceManager_GetID) Return(_a0 string) *ResourceManager_GetID { + return &ResourceManager_GetID{Call: _m.Call.Return(_a0)} +} + +func (_m *ResourceManager) OnGetID() *ResourceManager_GetID { + c := _m.On("GetID") + return &ResourceManager_GetID{Call: c} +} + +func (_m *ResourceManager) OnGetIDMatch(matchers ...interface{}) *ResourceManager_GetID { + c := _m.On("GetID", matchers...) + return &ResourceManager_GetID{Call: c} +} + +// GetID provides a mock function with given fields: +func (_m *ResourceManager) GetID() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + type ResourceManager_ReleaseResource struct { *mock.Call } diff --git a/go/tasks/pluginmachinery/core/mocks/resource_registrar.go b/go/tasks/pluginmachinery/core/mocks/resource_registrar.go index c2707c49d5..28e934801f 100644 --- a/go/tasks/pluginmachinery/core/mocks/resource_registrar.go +++ b/go/tasks/pluginmachinery/core/mocks/resource_registrar.go @@ -2,12 +2,9 @@ package mocks -import ( - context "context" - - core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" - mock "github.com/stretchr/testify/mock" -) +import context "context" +import core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" +import mock "github.com/stretchr/testify/mock" // ResourceRegistrar is an autogenerated mock type for the ResourceRegistrar type type ResourceRegistrar struct { diff --git a/go/tasks/pluginmachinery/core/mocks/secret_manager.go b/go/tasks/pluginmachinery/core/mocks/secret_manager.go index 84953489b6..3f2f123913 100644 --- a/go/tasks/pluginmachinery/core/mocks/secret_manager.go +++ b/go/tasks/pluginmachinery/core/mocks/secret_manager.go @@ -2,11 +2,9 @@ package mocks -import ( - context "context" +import context "context" - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // SecretManager is an autogenerated mock type for the SecretManager type type SecretManager struct { diff --git a/go/tasks/pluginmachinery/core/mocks/setup_context.go b/go/tasks/pluginmachinery/core/mocks/setup_context.go index 46be413f34..49f6ad851c 100644 --- a/go/tasks/pluginmachinery/core/mocks/setup_context.go +++ b/go/tasks/pluginmachinery/core/mocks/setup_context.go @@ -2,12 +2,9 @@ package mocks -import ( - core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" - mock "github.com/stretchr/testify/mock" - - promutils "github.com/lyft/flytestdlib/promutils" -) +import core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" +import mock "github.com/stretchr/testify/mock" +import promutils "github.com/lyft/flytestdlib/promutils" // SetupContext is an autogenerated mock type for the SetupContext type type SetupContext struct { diff --git a/go/tasks/pluginmachinery/core/mocks/task_execution_context.go b/go/tasks/pluginmachinery/core/mocks/task_execution_context.go index c53bcd0c81..4a2bf2b477 100644 --- a/go/tasks/pluginmachinery/core/mocks/task_execution_context.go +++ b/go/tasks/pluginmachinery/core/mocks/task_execution_context.go @@ -2,16 +2,11 @@ package mocks -import ( - catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" - core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" - - io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" - - mock "github.com/stretchr/testify/mock" - - storage "github.com/lyft/flytestdlib/storage" -) +import catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" +import core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" +import io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" +import mock "github.com/stretchr/testify/mock" +import storage "github.com/lyft/flytestdlib/storage" // TaskExecutionContext is an autogenerated mock type for the TaskExecutionContext type type TaskExecutionContext struct { diff --git a/go/tasks/pluginmachinery/core/mocks/task_execution_id.go b/go/tasks/pluginmachinery/core/mocks/task_execution_id.go index 1b1332936e..e07210e439 100644 --- a/go/tasks/pluginmachinery/core/mocks/task_execution_id.go +++ b/go/tasks/pluginmachinery/core/mocks/task_execution_id.go @@ -2,10 +2,8 @@ package mocks -import ( - flyteidlcore "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" - mock "github.com/stretchr/testify/mock" -) +import flyteidlcore "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" +import mock "github.com/stretchr/testify/mock" // TaskExecutionID is an autogenerated mock type for the TaskExecutionID type type TaskExecutionID struct { diff --git a/go/tasks/pluginmachinery/core/mocks/task_execution_metadata.go b/go/tasks/pluginmachinery/core/mocks/task_execution_metadata.go index c7d7a0b886..3c844d9079 100644 --- a/go/tasks/pluginmachinery/core/mocks/task_execution_metadata.go +++ b/go/tasks/pluginmachinery/core/mocks/task_execution_metadata.go @@ -2,14 +2,10 @@ package mocks -import ( - core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" - mock "github.com/stretchr/testify/mock" - - types "k8s.io/apimachinery/pkg/types" - - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) +import core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" +import mock "github.com/stretchr/testify/mock" +import types "k8s.io/apimachinery/pkg/types" +import v1 "k8s.io/apimachinery/pkg/apis/meta/v1" // TaskExecutionMetadata is an autogenerated mock type for the TaskExecutionMetadata type type TaskExecutionMetadata struct { diff --git a/go/tasks/pluginmachinery/core/mocks/task_overrides.go b/go/tasks/pluginmachinery/core/mocks/task_overrides.go index dc35666c77..9f0167004c 100644 --- a/go/tasks/pluginmachinery/core/mocks/task_overrides.go +++ b/go/tasks/pluginmachinery/core/mocks/task_overrides.go @@ -2,10 +2,8 @@ package mocks -import ( - mock "github.com/stretchr/testify/mock" - v1 "k8s.io/api/core/v1" -) +import mock "github.com/stretchr/testify/mock" +import v1 "k8s.io/api/core/v1" // TaskOverrides is an autogenerated mock type for the TaskOverrides type type TaskOverrides struct { diff --git a/go/tasks/pluginmachinery/core/mocks/task_reader.go b/go/tasks/pluginmachinery/core/mocks/task_reader.go index c29cc26ab5..62ea96ae91 100644 --- a/go/tasks/pluginmachinery/core/mocks/task_reader.go +++ b/go/tasks/pluginmachinery/core/mocks/task_reader.go @@ -2,12 +2,9 @@ package mocks -import ( - context "context" - - core "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" - mock "github.com/stretchr/testify/mock" -) +import context "context" +import core "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" +import mock "github.com/stretchr/testify/mock" // TaskReader is an autogenerated mock type for the TaskReader type type TaskReader struct { diff --git a/go/tasks/pluginmachinery/core/resource_manager.go b/go/tasks/pluginmachinery/core/resource_manager.go index 4f463cb30f..ed8b233b42 100644 --- a/go/tasks/pluginmachinery/core/resource_manager.go +++ b/go/tasks/pluginmachinery/core/resource_manager.go @@ -35,6 +35,7 @@ type ResourceRegistrar interface { // Resource Manager manages a single resource type, and each allocation is of size one type ResourceManager interface { + GetID() string AllocateResource(ctx context.Context, namespace ResourceNamespace, allocationToken string) (AllocationStatus, error) ReleaseResource(ctx context.Context, namespace ResourceNamespace, allocationToken string) error } diff --git a/go/tasks/pluginmachinery/flytek8s/config/config.go b/go/tasks/pluginmachinery/flytek8s/config/config.go index 5889db653a..6883983ebc 100755 --- a/go/tasks/pluginmachinery/flytek8s/config/config.go +++ b/go/tasks/pluginmachinery/flytek8s/config/config.go @@ -1,7 +1,7 @@ package config import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "github.com/lyft/flyteplugins/go/tasks/config" ) @@ -35,7 +35,7 @@ type K8sPluginConfig struct { // Provide additional environment variable pairs that plugin authors will provide to containers DefaultEnvVars map[string]string `json:"default-env-vars" pflag:"-,Additional environment variable that should be injected into every resource"` // Provide additional environment variable pairs whose values resolve from the plugin's execution environment. - DefaultEnvVarsFromEnv map[string]string `json:"default-env-vars-from-env" pflag:",Additional environment variable that should be injected into every resource"` + DefaultEnvVarsFromEnv map[string]string `json:"default-env-vars-from-env" pflag:"-,Additional environment variable that should be injected into every resource"` // Tolerations in the cluster that should be applied for a specific resource // Currently we support simple resource based tolerations only ResourceTolerations map[v1.ResourceName][]v1.Toleration `json:"resource-tolerations" pflag:"-,Default tolerations to be applied for resource of type 'key'"` diff --git a/go/tasks/pluginmachinery/io/mocks/input_file_paths.go b/go/tasks/pluginmachinery/io/mocks/input_file_paths.go index 91ea4938a4..23ace1dd90 100644 --- a/go/tasks/pluginmachinery/io/mocks/input_file_paths.go +++ b/go/tasks/pluginmachinery/io/mocks/input_file_paths.go @@ -2,10 +2,8 @@ package mocks -import ( - storage "github.com/lyft/flytestdlib/storage" - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" +import storage "github.com/lyft/flytestdlib/storage" // InputFilePaths is an autogenerated mock type for the InputFilePaths type type InputFilePaths struct { diff --git a/go/tasks/pluginmachinery/io/mocks/input_reader.go b/go/tasks/pluginmachinery/io/mocks/input_reader.go index 83806da423..44b83e6a7b 100644 --- a/go/tasks/pluginmachinery/io/mocks/input_reader.go +++ b/go/tasks/pluginmachinery/io/mocks/input_reader.go @@ -2,15 +2,11 @@ package mocks -import ( - context "context" +import context "context" +import core "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" - core "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" - - mock "github.com/stretchr/testify/mock" - - storage "github.com/lyft/flytestdlib/storage" -) +import mock "github.com/stretchr/testify/mock" +import storage "github.com/lyft/flytestdlib/storage" // InputReader is an autogenerated mock type for the InputReader type type InputReader struct { diff --git a/go/tasks/pluginmachinery/io/mocks/output_file_paths.go b/go/tasks/pluginmachinery/io/mocks/output_file_paths.go index bd65a3305f..e8438c59b1 100644 --- a/go/tasks/pluginmachinery/io/mocks/output_file_paths.go +++ b/go/tasks/pluginmachinery/io/mocks/output_file_paths.go @@ -2,10 +2,8 @@ package mocks -import ( - storage "github.com/lyft/flytestdlib/storage" - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" +import storage "github.com/lyft/flytestdlib/storage" // OutputFilePaths is an autogenerated mock type for the OutputFilePaths type type OutputFilePaths struct { diff --git a/go/tasks/pluginmachinery/io/mocks/output_reader.go b/go/tasks/pluginmachinery/io/mocks/output_reader.go index c2fbb8f2b9..4eab95124b 100644 --- a/go/tasks/pluginmachinery/io/mocks/output_reader.go +++ b/go/tasks/pluginmachinery/io/mocks/output_reader.go @@ -2,14 +2,10 @@ package mocks -import ( - context "context" - - core "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" - io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" - - mock "github.com/stretchr/testify/mock" -) +import context "context" +import core "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" +import io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" +import mock "github.com/stretchr/testify/mock" // OutputReader is an autogenerated mock type for the OutputReader type type OutputReader struct { diff --git a/go/tasks/pluginmachinery/io/mocks/output_writer.go b/go/tasks/pluginmachinery/io/mocks/output_writer.go index 0414271d01..8f0770e4b0 100644 --- a/go/tasks/pluginmachinery/io/mocks/output_writer.go +++ b/go/tasks/pluginmachinery/io/mocks/output_writer.go @@ -2,14 +2,10 @@ package mocks -import ( - context "context" - - io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" - mock "github.com/stretchr/testify/mock" - - storage "github.com/lyft/flytestdlib/storage" -) +import context "context" +import io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" +import mock "github.com/stretchr/testify/mock" +import storage "github.com/lyft/flytestdlib/storage" // OutputWriter is an autogenerated mock type for the OutputWriter type type OutputWriter struct { diff --git a/go/tasks/pluginmachinery/k8s/mocks/plugin.go b/go/tasks/pluginmachinery/k8s/mocks/plugin.go index 5439f7f4fc..2ad3481c39 100644 --- a/go/tasks/pluginmachinery/k8s/mocks/plugin.go +++ b/go/tasks/pluginmachinery/k8s/mocks/plugin.go @@ -2,14 +2,10 @@ package mocks -import ( - context "context" - - core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" - k8s "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s" - - mock "github.com/stretchr/testify/mock" -) +import context "context" +import core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" +import k8s "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s" +import mock "github.com/stretchr/testify/mock" // Plugin is an autogenerated mock type for the Plugin type type Plugin struct { diff --git a/go/tasks/pluginmachinery/k8s/mocks/plugin_context.go b/go/tasks/pluginmachinery/k8s/mocks/plugin_context.go index a9011ddb0b..1009c8c430 100644 --- a/go/tasks/pluginmachinery/k8s/mocks/plugin_context.go +++ b/go/tasks/pluginmachinery/k8s/mocks/plugin_context.go @@ -2,12 +2,10 @@ package mocks -import ( - core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" - io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" +import core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" +import io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // PluginContext is an autogenerated mock type for the PluginContext type type PluginContext struct { diff --git a/go/tasks/pluginmachinery/k8s/mocks/resource.go b/go/tasks/pluginmachinery/k8s/mocks/resource.go index 85bfa9d21f..084e608889 100644 --- a/go/tasks/pluginmachinery/k8s/mocks/resource.go +++ b/go/tasks/pluginmachinery/k8s/mocks/resource.go @@ -2,16 +2,11 @@ package mocks -import ( - mock "github.com/stretchr/testify/mock" - runtime "k8s.io/apimachinery/pkg/runtime" - - schema "k8s.io/apimachinery/pkg/runtime/schema" - - types "k8s.io/apimachinery/pkg/types" - - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) +import mock "github.com/stretchr/testify/mock" +import runtime "k8s.io/apimachinery/pkg/runtime" +import schema "k8s.io/apimachinery/pkg/runtime/schema" +import types "k8s.io/apimachinery/pkg/types" +import v1 "k8s.io/apimachinery/pkg/apis/meta/v1" // Resource is an autogenerated mock type for the Resource type type Resource struct { diff --git a/go/tasks/pluginmachinery/workqueue/mocks/indexed_work_queue.go b/go/tasks/pluginmachinery/workqueue/mocks/indexed_work_queue.go index b6bd1f12ef..cec17ebf17 100644 --- a/go/tasks/pluginmachinery/workqueue/mocks/indexed_work_queue.go +++ b/go/tasks/pluginmachinery/workqueue/mocks/indexed_work_queue.go @@ -2,12 +2,9 @@ package mocks -import ( - context "context" - - workqueue "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/workqueue" - mock "github.com/stretchr/testify/mock" -) +import context "context" +import mock "github.com/stretchr/testify/mock" +import workqueue "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/workqueue" // IndexedWorkQueue is an autogenerated mock type for the IndexedWorkQueue type type IndexedWorkQueue struct { diff --git a/go/tasks/pluginmachinery/workqueue/mocks/processor.go b/go/tasks/pluginmachinery/workqueue/mocks/processor.go index 84b8bb9a47..1cb9bce5c0 100644 --- a/go/tasks/pluginmachinery/workqueue/mocks/processor.go +++ b/go/tasks/pluginmachinery/workqueue/mocks/processor.go @@ -2,12 +2,9 @@ package mocks -import ( - context "context" - - workqueue "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/workqueue" - mock "github.com/stretchr/testify/mock" -) +import context "context" +import mock "github.com/stretchr/testify/mock" +import workqueue "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/workqueue" // Processor is an autogenerated mock type for the Processor type type Processor struct { diff --git a/go/tasks/pluginmachinery/workqueue/mocks/work_item_info.go b/go/tasks/pluginmachinery/workqueue/mocks/work_item_info.go index 261ca9cc24..2aaad131a8 100644 --- a/go/tasks/pluginmachinery/workqueue/mocks/work_item_info.go +++ b/go/tasks/pluginmachinery/workqueue/mocks/work_item_info.go @@ -2,10 +2,8 @@ package mocks -import ( - workqueue "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/workqueue" - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" +import workqueue "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/workqueue" // WorkItemInfo is an autogenerated mock type for the WorkItemInfo type type WorkItemInfo struct { diff --git a/go/tasks/plugins/array/awsbatch/mocks/batch_service_client.go b/go/tasks/plugins/array/awsbatch/mocks/batch_service_client.go index 810ded0a61..3f6a3d6c61 100644 --- a/go/tasks/plugins/array/awsbatch/mocks/batch_service_client.go +++ b/go/tasks/plugins/array/awsbatch/mocks/batch_service_client.go @@ -2,15 +2,10 @@ package mocks -import ( - context "context" - - batch "github.com/aws/aws-sdk-go/service/batch" - - mock "github.com/stretchr/testify/mock" - - request "github.com/aws/aws-sdk-go/aws/request" -) +import batch "github.com/aws/aws-sdk-go/service/batch" +import context "context" +import mock "github.com/stretchr/testify/mock" +import request "github.com/aws/aws-sdk-go/aws/request" // BatchServiceClient is an autogenerated mock type for the BatchServiceClient type type BatchServiceClient struct { diff --git a/go/tasks/plugins/array/awsbatch/mocks/cache.go b/go/tasks/plugins/array/awsbatch/mocks/cache.go index c50fb1be35..57e86864f3 100644 --- a/go/tasks/plugins/array/awsbatch/mocks/cache.go +++ b/go/tasks/plugins/array/awsbatch/mocks/cache.go @@ -2,10 +2,8 @@ package mocks -import ( - definition "github.com/lyft/flyteplugins/go/tasks/plugins/array/awsbatch/definition" - mock "github.com/stretchr/testify/mock" -) +import definition "github.com/lyft/flyteplugins/go/tasks/plugins/array/awsbatch/definition" +import mock "github.com/stretchr/testify/mock" // Cache is an autogenerated mock type for the Cache type type Cache struct { diff --git a/go/tasks/plugins/array/awsbatch/mocks/client.go b/go/tasks/plugins/array/awsbatch/mocks/client.go index 70faae703c..39f2983d37 100644 --- a/go/tasks/plugins/array/awsbatch/mocks/client.go +++ b/go/tasks/plugins/array/awsbatch/mocks/client.go @@ -2,13 +2,9 @@ package mocks -import ( - context "context" - - batch "github.com/aws/aws-sdk-go/service/batch" - - mock "github.com/stretchr/testify/mock" -) +import batch "github.com/aws/aws-sdk-go/service/batch" +import context "context" +import mock "github.com/stretchr/testify/mock" // Client is an autogenerated mock type for the Client type type Client struct { diff --git a/go/tasks/plugins/array/k8s/transformer.go b/go/tasks/plugins/array/k8s/transformer.go index 9eb7e83f2f..3a7695f3d9 100644 --- a/go/tasks/plugins/array/k8s/transformer.go +++ b/go/tasks/plugins/array/k8s/transformer.go @@ -2,6 +2,7 @@ package k8s import ( "context" + core2 "github.com/lyft/flyteplugins/go/tasks/plugins/array/core" "github.com/lyft/flytestdlib/storage" diff --git a/go/tasks/plugins/hive/client/mocks/qubole_client.go b/go/tasks/plugins/hive/client/mocks/qubole_client.go index 67464e7d1f..c22c833071 100644 --- a/go/tasks/plugins/hive/client/mocks/qubole_client.go +++ b/go/tasks/plugins/hive/client/mocks/qubole_client.go @@ -2,13 +2,9 @@ package mocks -import ( - context "context" - - client "github.com/lyft/flyteplugins/go/tasks/plugins/hive/client" - - mock "github.com/stretchr/testify/mock" -) +import client "github.com/lyft/flyteplugins/go/tasks/plugins/hive/client" +import context "context" +import mock "github.com/stretchr/testify/mock" // QuboleClient is an autogenerated mock type for the QuboleClient type type QuboleClient struct { @@ -23,8 +19,8 @@ func (_m QuboleClient_ExecuteHiveCommand) Return(_a0 *client.QuboleCommandDetail return &QuboleClient_ExecuteHiveCommand{Call: _m.Call.Return(_a0, _a1)} } -func (_m *QuboleClient) OnExecuteHiveCommand(ctx context.Context, commandStr string, timeoutVal uint32, clusterLabel string, accountKey string, tags []string) *QuboleClient_ExecuteHiveCommand { - c := _m.On("ExecuteHiveCommand", ctx, commandStr, timeoutVal, clusterLabel, accountKey, tags) +func (_m *QuboleClient) OnExecuteHiveCommand(ctx context.Context, commandStr string, timeoutVal uint32, clusterPrimaryLabel string, accountKey string, tags []string) *QuboleClient_ExecuteHiveCommand { + c := _m.On("ExecuteHiveCommand", ctx, commandStr, timeoutVal, clusterPrimaryLabel, accountKey, tags) return &QuboleClient_ExecuteHiveCommand{Call: c} } @@ -33,13 +29,13 @@ func (_m *QuboleClient) OnExecuteHiveCommandMatch(matchers ...interface{}) *Qubo return &QuboleClient_ExecuteHiveCommand{Call: c} } -// ExecuteHiveCommand provides a mock function with given fields: ctx, commandStr, timeoutVal, clusterLabel, accountKey, tags -func (_m *QuboleClient) ExecuteHiveCommand(ctx context.Context, commandStr string, timeoutVal uint32, clusterLabel string, accountKey string, tags []string) (*client.QuboleCommandDetails, error) { - ret := _m.Called(ctx, commandStr, timeoutVal, clusterLabel, accountKey, tags) +// ExecuteHiveCommand provides a mock function with given fields: ctx, commandStr, timeoutVal, clusterPrimaryLabel, accountKey, tags +func (_m *QuboleClient) ExecuteHiveCommand(ctx context.Context, commandStr string, timeoutVal uint32, clusterPrimaryLabel string, accountKey string, tags []string) (*client.QuboleCommandDetails, error) { + ret := _m.Called(ctx, commandStr, timeoutVal, clusterPrimaryLabel, accountKey, tags) var r0 *client.QuboleCommandDetails if rf, ok := ret.Get(0).(func(context.Context, string, uint32, string, string, []string) *client.QuboleCommandDetails); ok { - r0 = rf(ctx, commandStr, timeoutVal, clusterLabel, accountKey, tags) + r0 = rf(ctx, commandStr, timeoutVal, clusterPrimaryLabel, accountKey, tags) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*client.QuboleCommandDetails) @@ -48,7 +44,7 @@ func (_m *QuboleClient) ExecuteHiveCommand(ctx context.Context, commandStr strin var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, uint32, string, string, []string) error); ok { - r1 = rf(ctx, commandStr, timeoutVal, clusterLabel, accountKey, tags) + r1 = rf(ctx, commandStr, timeoutVal, clusterPrimaryLabel, accountKey, tags) } else { r1 = ret.Error(1) } diff --git a/go/tasks/plugins/hive/client/qubole_client.go b/go/tasks/plugins/hive/client/qubole_client.go index a30763ccdc..2549e39d02 100644 --- a/go/tasks/plugins/hive/client/qubole_client.go +++ b/go/tasks/plugins/hive/client/qubole_client.go @@ -62,7 +62,7 @@ type RequestBody struct { // Interface to interact with QuboleClient for hive tasks type QuboleClient interface { - ExecuteHiveCommand(ctx context.Context, commandStr string, timeoutVal uint32, clusterLabel string, + ExecuteHiveCommand(ctx context.Context, commandStr string, timeoutVal uint32, clusterPrimaryLabel string, accountKey string, tags []string) (*QuboleCommandDetails, error) KillCommand(ctx context.Context, commandID string, accountKey string) error GetCommandStatus(ctx context.Context, commandID string, accountKey string) (QuboleStatus, error) @@ -167,7 +167,7 @@ func (q *quboleClient) ExecuteHiveCommand( ctx context.Context, commandStr string, timeoutVal uint32, - clusterLabel string, + clusterPrimaryLabel string, accountKey string, tags []string) (*QuboleCommandDetails, error) { @@ -175,7 +175,7 @@ func (q *quboleClient) ExecuteHiveCommand( CommandType: hiveCommandType, Query: commandStr, Timeout: timeoutVal, - ClusterLabel: clusterLabel, + ClusterLabel: clusterPrimaryLabel, Tags: tags, } diff --git a/go/tasks/plugins/hive/config/config.go b/go/tasks/plugins/hive/config/config.go index a1e05a57d6..d1dd51cd1a 100644 --- a/go/tasks/plugins/hive/config/config.go +++ b/go/tasks/plugins/hive/config/config.go @@ -25,16 +25,28 @@ func MustParse(s string) config.URL { return config.URL{URL: *r} } +type ClusterConfig struct { + PrimaryLabel string `json:"primaryLabel" pflag:",The primary label of a given service cluster"` + Labels []string `json:"labels" pflag:",Labels of a given service cluster"` + Limit int `json:"limit" pflag:",Resource quota (in the number of outstanding requests) of the service cluster"` +} + +type DestinationClusterConfig struct { + Project string `json:"project" pflag:",Project of the task which the query belongs to"` + Domain string `json:"domain" pflag:",Domain of the task which the query belongs to"` + ClusterLabel string `json:"clusterLabel" pflag:",The label of the destination cluster this query to be submitted to"` +} + var ( defaultConfig = Config{ - Endpoint: MustParse("https://wellness.qubole.com"), - CommandAPIPath: MustParse("/api/v1.2/commands/"), - AnalyzeLinkPath: MustParse("/v2/analyze"), - TokenKey: "FLYTE_QUBOLE_CLIENT_TOKEN", - Limit: 200, - LruCacheSize: 2000, - Workers: 15, - ClusterLabels: []string{"default"}, + Endpoint: MustParse("https://wellness.qubole.com"), + CommandAPIPath: MustParse("/api/v1.2/commands/"), + AnalyzeLinkPath: MustParse("/v2/analyze"), + TokenKey: "FLYTE_QUBOLE_CLIENT_TOKEN", + LruCacheSize: 2000, + Workers: 15, + ClusterConfigs: []ClusterConfig{{PrimaryLabel: "default", Labels: []string{"default"}, Limit: 250}}, + DestinationClusterConfigs: []DestinationClusterConfig{}, } quboleConfigSection = pluginsConfig.MustRegisterSubSection(quboleConfigSectionKey, &defaultConfig) @@ -42,14 +54,14 @@ var ( // Qubole plugin configs type Config struct { - Endpoint config.URL `json:"endpoint" pflag:",Endpoint for qubole to use"` - CommandAPIPath config.URL `json:"commandApiPath" pflag:",API Path where commands can be launched on Qubole. Should be a valid url."` - AnalyzeLinkPath config.URL `json:"analyzeLinkPath" pflag:",URL path where queries can be visualized on qubole website. Should be a valid url."` - TokenKey string `json:"quboleTokenKey" pflag:",Name of the key where to find Qubole token in the secret manager."` - Limit int `json:"quboleLimit" pflag:",Global limit for concurrent Qubole queries"` - LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"` - Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"` - ClusterLabels []string `json:"clusterLabels" pflag:",List of labels of service clusters"` + Endpoint config.URL `json:"endpoint" pflag:",Endpoint for qubole to use"` + CommandAPIPath config.URL `json:"commandApiPath" pflag:",API Path where commands can be launched on Qubole. Should be a valid url."` + AnalyzeLinkPath config.URL `json:"analyzeLinkPath" pflag:",URL path where queries can be visualized on qubole website. Should be a valid url."` + TokenKey string `json:"quboleTokenKey" pflag:",Name of the key where to find Qubole token in the secret manager."` + LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"` + Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"` + ClusterConfigs []ClusterConfig `json:"clusterConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"` + DestinationClusterConfigs []DestinationClusterConfig `json:"destinationClusterConfigs" pflag:"-,A list configs specifying the destination service cluster for (project, domain)"` } // Retrieves the current config value or default. diff --git a/go/tasks/plugins/hive/config/config_flags.go b/go/tasks/plugins/hive/config/config_flags.go index 3513172f9f..295bd5e2b7 100755 --- a/go/tasks/plugins/hive/config/config_flags.go +++ b/go/tasks/plugins/hive/config/config_flags.go @@ -45,7 +45,6 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "commandApiPath"), defaultConfig.CommandAPIPath.String(), "API Path where commands can be launched on Qubole. Should be a valid url.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "analyzeLinkPath"), defaultConfig.AnalyzeLinkPath.String(), "URL path where queries can be visualized on qubole website. Should be a valid url.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "quboleTokenKey"), defaultConfig.TokenKey, "Name of the key where to find Qubole token in the secret manager.") - cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "quboleLimit"), defaultConfig.Limit, "Global limit for concurrent Qubole queries") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "lruCacheSize"), defaultConfig.LruCacheSize, "Size of the AutoRefreshCache") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "workers"), defaultConfig.Workers, "Number of parallel workers to refresh the cache") return cmdFlags diff --git a/go/tasks/plugins/hive/config/config_flags_test.go b/go/tasks/plugins/hive/config/config_flags_test.go index 90834522ec..2ec62b1d29 100755 --- a/go/tasks/plugins/hive/config/config_flags_test.go +++ b/go/tasks/plugins/hive/config/config_flags_test.go @@ -187,28 +187,6 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_quboleLimit", func(t *testing.T) { - t.Run("DefaultValue", func(t *testing.T) { - // Test that default value is set properly - if vInt, err := cmdFlags.GetInt("quboleLimit"); err == nil { - assert.Equal(t, int(defaultConfig.Limit), vInt) - } else { - assert.FailNow(t, err.Error()) - } - }) - - t.Run("Override", func(t *testing.T) { - testValue := "1" - - cmdFlags.Set("quboleLimit", testValue) - if vInt, err := cmdFlags.GetInt("quboleLimit"); err == nil { - testDecodeJson_Config(t, fmt.Sprintf("%v", vInt), &actual.Limit) - - } else { - assert.FailNow(t, err.Error()) - } - }) - }) t.Run("Test_lruCacheSize", func(t *testing.T) { t.Run("DefaultValue", func(t *testing.T) { // Test that default value is set properly diff --git a/go/tasks/plugins/hive/execution_state.go b/go/tasks/plugins/hive/execution_state.go index 787c714347..5ff0f2b76c 100644 --- a/go/tasks/plugins/hive/execution_state.go +++ b/go/tasks/plugins/hive/execution_state.go @@ -65,14 +65,14 @@ type ExecutionState struct { // This is the main state iteration func HandleExecutionState(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, quboleClient client.QuboleClient, - executionsCache cache.AutoRefresh, resourceNamespace core.ResourceNamespace, cfg *config.Config) (ExecutionState, error) { + executionsCache cache.AutoRefresh, cfg *config.Config) (ExecutionState, error) { var transformError error var newState ExecutionState switch currentState.Phase { case PhaseNotStarted: - newState, transformError = GetAllocationToken(ctx, resourceNamespace, tCtx) + newState, transformError = GetAllocationToken(ctx, tCtx) case PhaseQueued: newState, transformError = KickOffQuery(ctx, tCtx, currentState, quboleClient, executionsCache, cfg) @@ -141,24 +141,25 @@ func ConstructTaskInfo(e ExecutionState) *core.TaskInfo { return nil } -func composeResourceNamespaceWithClusterLabel(ctx context.Context, tCtx core.TaskExecutionContext, resourceNamespace core.ResourceNamespace) (core.ResourceNamespace, error) { - _, clusterLabel, _, _, err := GetQueryInfo(ctx, tCtx) +func composeResourceNamespaceWithClusterPrimaryLabel(ctx context.Context, tCtx core.TaskExecutionContext) (core.ResourceNamespace, error) { + _, clusterLabelOverride, _, _, err := GetQueryInfo(ctx, tCtx) if err != nil { - return resourceNamespace, err + return "", err } - return resourceNamespace.CreateSubNamespace(core.ResourceNamespace(clusterLabel)), nil + clusterPrimaryLabel := getClusterPrimaryLabel(ctx, tCtx, clusterLabelOverride) + return core.ResourceNamespace(clusterPrimaryLabel), nil } -func GetAllocationToken(ctx context.Context, resourceNamespace core.ResourceNamespace, tCtx core.TaskExecutionContext) (ExecutionState, error) { +func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext) (ExecutionState, error) { newState := ExecutionState{} uniqueId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() - resourceNamespaceWithClusterLabel, err := composeResourceNamespaceWithClusterLabel(ctx, tCtx, resourceNamespace) + clusterPrimaryLabel, err := composeResourceNamespaceWithClusterPrimaryLabel(ctx, tCtx) if err != nil { return newState, errors.Wrapf(errors.ResourceManagerFailure, err, "Error getting query info when requesting allocation token %s", uniqueId) } - allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, resourceNamespaceWithClusterLabel, uniqueId) + allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, clusterPrimaryLabel, uniqueId) if err != nil { logger.Errorf(ctx, "Resource manager failed for TaskExecId [%s] token [%s]. error %s", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID(), uniqueId, err) @@ -216,10 +217,73 @@ func GetQueryInfo(ctx context.Context, tCtx core.TaskExecutionContext) ( for k, v := range tCtx.TaskExecutionMetadata().GetLabels() { tags = append(tags, fmt.Sprintf("%s:%s", k, v)) } - + logger.Debugf(ctx, "QueryInfo: query: [%v], cluster: [%v], timeoutSec: [%v], tags: [%v]", query, cluster, timeoutSec, tags) return } +func mapLabelToPrimaryLabel(ctx context.Context, quboleCfg *config.Config, label string) (string, bool) { + primaryLabel := DefaultClusterPrimaryLabel + found := false + + if label == "" { + logger.Debugf(ctx, "Input cluster label is an empty string; falling back to using the default primary label [%v]", label, DefaultClusterPrimaryLabel) + return primaryLabel, found + } + + // Using a linear search because N is small and because of ClusterConfig's struct definition + // which is determined specifically for the readability of the corresponding configmap yaml file + for _, clusterCfg := range quboleCfg.ClusterConfigs { + for _, l := range clusterCfg.Labels { + if label != "" && l == label { + logger.Debugf(ctx, "Found the primary label [%v] for label [%v]", clusterCfg.PrimaryLabel, label) + primaryLabel, found = clusterCfg.PrimaryLabel, true + break + } + } + } + + logger.Debugf(ctx, "Cannot find the primary cluster label for label [%v] in configmap; "+ + "falling back to using the default primary label [%v]", label, DefaultClusterPrimaryLabel) + return primaryLabel, found +} + +func mapProjectDomainToDestinationClusterLabel(ctx context.Context, tCtx core.TaskExecutionContext, quboleCfg *config.Config) (string, bool) { + tExecId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID() + project := tExecId.NodeExecutionId.GetExecutionId().GetProject() + domain := tExecId.NodeExecutionId.GetExecutionId().GetDomain() + logger.Debugf(ctx, "No clusterLabelOverride. Finding the pre-defined cluster label for (project: %v, domain: %v)", project, domain) + // Using a linear search because N is small + for _, m := range quboleCfg.DestinationClusterConfigs { + if project == m.Project && domain == m.Domain { + logger.Debugf(ctx, "Found the pre-defined cluster label [%v] for (project: %v, domain: %v)", m.ClusterLabel, project, domain) + return m.ClusterLabel, true + } + } + + // This function finds the label, not primary label, so in the case where no mapping is found, this function should return an empty string + return "", false +} + +func getClusterPrimaryLabel(ctx context.Context, tCtx core.TaskExecutionContext, clusterLabelOverride string) string { + cfg := config.GetQuboleConfig() + + // If override is not empty and if it has a mapping, we return the mapped primary label + if clusterLabelOverride != "" { + if primaryLabel, found := mapLabelToPrimaryLabel(ctx, cfg, clusterLabelOverride); found { + return primaryLabel + } + } + + // If override is empty or if the override does not have a mapping, we return the primary label mapped using (project, domain) + if clusterLabel, found := mapProjectDomainToDestinationClusterLabel(ctx, tCtx, cfg); found { + primaryLabel, _ := mapLabelToPrimaryLabel(ctx, cfg, clusterLabel) + return primaryLabel + } + + // Else we return the default primary label + return DefaultClusterPrimaryLabel +} + func KickOffQuery(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, quboleClient client.QuboleClient, cache cache.AutoRefresh, cfg *config.Config) (ExecutionState, error) { @@ -229,13 +293,15 @@ func KickOffQuery(ctx context.Context, tCtx core.TaskExecutionContext, currentSt return currentState, errors.Wrapf(errors.RuntimeFailure, err, "Failed to read token from secrets manager") } - query, cluster, tags, timeoutSec, err := GetQueryInfo(ctx, tCtx) + query, clusterLabelOverride, tags, timeoutSec, err := GetQueryInfo(ctx, tCtx) if err != nil { return currentState, err } + clusterPrimaryLabel := getClusterPrimaryLabel(ctx, tCtx, clusterLabelOverride) + cmdDetails, err := quboleClient.ExecuteHiveCommand(ctx, query, timeoutSec, - cluster, apiKey, tags) + clusterPrimaryLabel, apiKey, tags) if err != nil { // If we failed, we'll keep the NotStarted state currentState.CreationFailureCount = currentState.CreationFailureCount + 1 @@ -310,15 +376,15 @@ func Abort(ctx context.Context, tCtx core.TaskExecutionContext, currentState Exe return nil } -func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, resourceNamespace core.ResourceNamespace, _ ExecutionState) error { +func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, _ ExecutionState) error { // Release allocation token uniqueId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() - resourceNamespaceWithClusterLabel, err := composeResourceNamespaceWithClusterLabel(ctx, tCtx, resourceNamespace) + clusterPrimaryLabel, err := composeResourceNamespaceWithClusterPrimaryLabel(ctx, tCtx) if err != nil { return errors.Wrapf(errors.ResourceManagerFailure, err, "Error getting query info when releasing allocation token %s", uniqueId) } - err = tCtx.ResourceManager().ReleaseResource(ctx, resourceNamespaceWithClusterLabel, uniqueId) + err = tCtx.ResourceManager().ReleaseResource(ctx, clusterPrimaryLabel, uniqueId) if err != nil { logger.Errorf(ctx, "Error releasing allocation token [%s] in Finalize [%s]", uniqueId, err) diff --git a/go/tasks/plugins/hive/execution_state_test.go b/go/tasks/plugins/hive/execution_state_test.go index a9abb756da..ef68153459 100644 --- a/go/tasks/plugins/hive/execution_state_test.go +++ b/go/tasks/plugins/hive/execution_state_test.go @@ -5,6 +5,7 @@ import ( "net/url" "testing" + idlCore "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/plugins" mocks2 "github.com/lyft/flytestdlib/cache/mocks" @@ -168,7 +169,7 @@ func TestGetAllocationToken(t *testing.T) { x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything). Return(core.AllocationStatusGranted, nil) - state, err := GetAllocationToken(ctx, quboleResourceNamespace, tCtx) + state, err := GetAllocationToken(ctx, tCtx) assert.NoError(t, err) assert.Equal(t, PhaseQueued, state.Phase) }) @@ -180,7 +181,7 @@ func TestGetAllocationToken(t *testing.T) { x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything). Return(core.AllocationStatusExhausted, nil) - state, err := GetAllocationToken(ctx, quboleResourceNamespace, tCtx) + state, err := GetAllocationToken(ctx, tCtx) assert.NoError(t, err) assert.Equal(t, PhaseNotStarted, state.Phase) }) @@ -192,7 +193,7 @@ func TestGetAllocationToken(t *testing.T) { x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything). Return(core.AllocationStatusNamespaceQuotaExceeded, nil) - state, err := GetAllocationToken(ctx, quboleResourceNamespace, tCtx) + state, err := GetAllocationToken(ctx, tCtx) assert.NoError(t, err) assert.Equal(t, PhaseNotStarted, state.Phase) }) @@ -243,7 +244,7 @@ func TestFinalize(t *testing.T) { called = true }).Return(nil) - err := Finalize(ctx, tCtx, quboleResourceNamespace, state) + err := Finalize(ctx, tCtx, state) assert.NoError(t, err) assert.True(t, called) } @@ -298,3 +299,101 @@ func TestKickOffQuery(t *testing.T) { assert.True(t, getOrCreateCalled) assert.True(t, quboleCalled) } + +func createMockQuboleCfg() *config.Config { + return &config.Config{ + ClusterConfigs: []config.ClusterConfig{ + {PrimaryLabel: "primary A", Labels: []string{"primary A", "A", "label A", "A-prod"}, Limit: 10}, + {PrimaryLabel: "primary B", Labels: []string{"B"}, Limit: 10}, + {PrimaryLabel: "primary C", Labels: []string{"C-prod"}, Limit: 1}, + }, + DestinationClusterConfigs: []config.DestinationClusterConfig{ + {Project: "project A", Domain: "domain X", ClusterLabel: "A-prod"}, + {Project: "project A", Domain: "domain Y", ClusterLabel: "A"}, + {Project: "project A", Domain: "domain Z", ClusterLabel: "B"}, + {Project: "project C", Domain: "domain X", ClusterLabel: "C-prod"}, + }, + } +} + +func Test_mapLabelToPrimaryLabel(t *testing.T) { + ctx := context.TODO() + mockQuboleCfg := createMockQuboleCfg() + + type args struct { + ctx context.Context + quboleCfg *config.Config + label string + } + tests := []struct { + name string + args args + want string + wantFound bool + }{ + {name: "Label has a mapping", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "A-prod"}, want: "primary A", wantFound: true}, + {name: "Label has a typo", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "a"}, want: DefaultClusterPrimaryLabel, wantFound: false}, + {name: "Label has a mapping 2", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "C-prod"}, want: "primary C", wantFound: true}, + {name: "Label has a typo 2", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "C_prod"}, want: DefaultClusterPrimaryLabel, wantFound: false}, + {name: "Label has a mapping 3", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "primary A"}, want: "primary A", wantFound: true}, + {name: "Label has no mapping", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "D"}, want: DefaultClusterPrimaryLabel, wantFound: false}, + {name: "Label is an empty string", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: ""}, want: DefaultClusterPrimaryLabel, wantFound: false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got, found := mapLabelToPrimaryLabel(tt.args.ctx, tt.args.quboleCfg, tt.args.label); got != tt.want || found != tt.wantFound { + t.Errorf("mapLabelToPrimaryLabel() = (%v, %v), want (%v, %v)", got, found, tt.want, tt.wantFound) + } + }) + } +} + +func createMockTaskExecutionContextWithProjectDomain(project string, domain string) *mocks.TaskExecutionContext { + mockTaskExecutionContext := mocks.TaskExecutionContext{} + taskExecId := &pluginsCoreMocks.TaskExecutionID{} + taskExecId.OnGetID().Return(idlCore.TaskExecutionIdentifier{ + NodeExecutionId: &idlCore.NodeExecutionIdentifier{ExecutionId: &idlCore.WorkflowExecutionIdentifier{ + Project: project, + Domain: domain, + Name: "random name", + }}, + }) + + taskMetadata := &pluginsCoreMocks.TaskExecutionMetadata{} + taskMetadata.OnGetTaskExecutionID().Return(taskExecId) + mockTaskExecutionContext.On("TaskExecutionMetadata").Return(taskMetadata) + return &mockTaskExecutionContext +} + +func Test_getClusterPrimaryLabel(t *testing.T) { + ctx := context.TODO() + err := config.SetQuboleConfig(createMockQuboleCfg()) + assert.Nil(t, err) + + type args struct { + ctx context.Context + tCtx core.TaskExecutionContext + clusterLabelOverride string + } + tests := []struct { + name string + args args + want string + }{ + {name: "Override is not empty + override has NO existing mapping + project-domain has an existing mapping", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project A", "domain Z"), clusterLabelOverride: "AAAA"}, want: "primary B"}, + {name: "Override is not empty + override has NO existing mapping + project-domain has NO existing mapping", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project A", "domain blah"), clusterLabelOverride: "blh"}, want: DefaultClusterPrimaryLabel}, + {name: "Override is not empty + override has an existing mapping + project-domain has NO existing mapping", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project blah", "domain blah"), clusterLabelOverride: "C-prod"}, want: "primary C"}, + {name: "Override is not empty + override has an existing mapping + project-domain has an existing mapping", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project A", "domain A"), clusterLabelOverride: "C-prod"}, want: "primary C"}, + {name: "Override is empty + project-domain has an existing mapping", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project A", "domain X"), clusterLabelOverride: ""}, want: "primary A"}, + {name: "Override is empty + project-domain has an existing mapping2", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project A", "domain Z"), clusterLabelOverride: ""}, want: "primary B"}, + {name: "Override is empty + project-domain has NO existing mapping", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project A", "domain blah"), clusterLabelOverride: ""}, want: DefaultClusterPrimaryLabel}, + {name: "Override is empty + project-domain has NO existing mapping2", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project blah", "domain X"), clusterLabelOverride: ""}, want: DefaultClusterPrimaryLabel}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := getClusterPrimaryLabel(tt.args.ctx, tt.args.tCtx, tt.args.clusterLabelOverride); got != tt.want { + t.Errorf("getClusterPrimaryLabel() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/go/tasks/plugins/hive/executor.go b/go/tasks/plugins/hive/executor.go index e533e20e19..b696069a72 100644 --- a/go/tasks/plugins/hive/executor.go +++ b/go/tasks/plugins/hive/executor.go @@ -22,15 +22,15 @@ const quboleHiveExecutorId = "qubole-hive-executor" const pluginStateVersion = 0 const hiveTaskType = "hive" // This needs to match the type defined in Flytekit constants.py -const quboleResourceNamespace core.ResourceNamespace = "qubole" + +const DefaultClusterPrimaryLabel = "default" type QuboleHiveExecutor struct { - id string - metrics QuboleHiveExecutorMetrics - quboleClient client.QuboleClient - executionsCache cache.AutoRefresh - cfg *config.Config - resourceNamespace core.ResourceNamespace + id string + metrics QuboleHiveExecutorMetrics + quboleClient client.QuboleClient + executionsCache cache.AutoRefresh + cfg *config.Config } func (q QuboleHiveExecutor) GetID() string { @@ -51,8 +51,7 @@ func (q QuboleHiveExecutor) Handle(ctx context.Context, tCtx core.TaskExecutionC // Do what needs to be done, and give this function everything it needs to do its job properly // TODO: Play around with making this return a transition directly. How will that pattern affect the multi-Qubole plugin - outgoingState, transformError := HandleExecutionState(ctx, tCtx, incomingState, q.quboleClient, q.executionsCache, - q.resourceNamespace, q.cfg) + outgoingState, transformError := HandleExecutionState(ctx, tCtx, incomingState, q.quboleClient, q.executionsCache, q.cfg) // Return if there was an error if transformError != nil { @@ -94,7 +93,7 @@ func (q QuboleHiveExecutor) Finalize(ctx context.Context, tCtx core.TaskExecutio return errors.Wrapf(errors.CorruptedPluginState, err, "Failed to unmarshal custom state in Finalize") } - return Finalize(ctx, tCtx, q.resourceNamespace, incomingState) + return Finalize(ctx, tCtx, incomingState) } func (q QuboleHiveExecutor) GetProperties() core.PluginProperties { @@ -103,14 +102,31 @@ func (q QuboleHiveExecutor) GetProperties() core.PluginProperties { func QuboleHiveExecutorLoader(ctx context.Context, iCtx core.SetupContext) (core.Plugin, error) { cfg := config.GetQuboleConfig() - q, err := NewQuboleHiveExecutor(ctx, cfg, client.NewQuboleClient(cfg), quboleResourceNamespace, iCtx.SecretManager(), iCtx.MetricsScope()) + return InitializeHiveExecutor(ctx, iCtx, cfg, BuildResourceConfig(cfg), client.NewQuboleClient(cfg)) +} + +func BuildResourceConfig(cfg *config.Config) map[string]int { + resourceConfig := make(map[string]int, len(cfg.ClusterConfigs)) + + for _, clusterCfg := range cfg.ClusterConfigs { + resourceConfig[clusterCfg.PrimaryLabel] = clusterCfg.Limit + } + return resourceConfig +} + +func InitializeHiveExecutor(ctx context.Context, iCtx core.SetupContext, cfg *config.Config, resourceConfig map[string]int, + quboleClient client.QuboleClient) (core.Plugin, error) { + logger.Infof(ctx, "Initializing a Hive executor with a resource config [%v]", resourceConfig) + q, err := NewQuboleHiveExecutor(ctx, cfg, quboleClient, iCtx.SecretManager(), iCtx.MetricsScope()) if err != nil { + logger.Errorf(ctx, "Failed to create a new QuboleHiveExecutor due to error: [%v]", err) return nil, err } - for _, cluster := range cfg.ClusterLabels { - clusteredResourceNamespacePrefix := quboleResourceNamespace.CreateSubNamespace(core.ResourceNamespace(cluster)) - if err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, clusteredResourceNamespacePrefix, cfg.Limit); err != nil { + for clusterPrimaryLabel, clusterLimit := range resourceConfig { + logger.Infof(ctx, "Registering resource quota for cluster [%v]", clusterPrimaryLabel) + if err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, core.ResourceNamespace(clusterPrimaryLabel), clusterLimit); err != nil { + logger.Errorf(ctx, "Resource quota registration for [%v] failed due to error [%v]", clusterPrimaryLabel, err) return nil, err } } @@ -119,8 +135,7 @@ func QuboleHiveExecutorLoader(ctx context.Context, iCtx core.SetupContext) (core } // type PluginLoader func(ctx context.Context, iCtx SetupContext) (Plugin, error) -func NewQuboleHiveExecutor(ctx context.Context, cfg *config.Config, quboleClient client.QuboleClient, resourceNamespace core.ResourceNamespace, - secretManager core.SecretManager, scope promutils.Scope) (QuboleHiveExecutor, error) { +func NewQuboleHiveExecutor(ctx context.Context, cfg *config.Config, quboleClient client.QuboleClient, secretManager core.SecretManager, scope promutils.Scope) (QuboleHiveExecutor, error) { executionsAutoRefreshCache, err := NewQuboleHiveExecutionsCache(ctx, quboleClient, secretManager, cfg, scope.NewSubScope(hiveTaskType)) if err != nil { logger.Errorf(ctx, "Failed to create AutoRefreshCache in QuboleHiveExecutor Setup. Error: %v", err) @@ -133,12 +148,11 @@ func NewQuboleHiveExecutor(ctx context.Context, cfg *config.Config, quboleClient } return QuboleHiveExecutor{ - id: quboleHiveExecutorId, - cfg: cfg, - resourceNamespace: resourceNamespace, - metrics: getQuboleHiveExecutorMetrics(scope), - quboleClient: quboleClient, - executionsCache: executionsAutoRefreshCache, + id: quboleHiveExecutorId, + cfg: cfg, + metrics: getQuboleHiveExecutorMetrics(scope), + quboleClient: quboleClient, + executionsCache: executionsAutoRefreshCache, }, nil }