From 6ff3be39b3a0aa912420aa328ab252c4414dfc37 Mon Sep 17 00:00:00 2001 From: Haytham AbuelFutuh Date: Fri, 3 Jan 2020 12:13:41 -0800 Subject: [PATCH 01/16] wip --- .../go/tasks/plugins/hive/config/config.go | 17 ++++++++------- .../go/tasks/plugins/hive/executor.go | 21 +++++++++++++++---- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/hive/config/config.go b/flyteplugins/go/tasks/plugins/hive/config/config.go index a1e05a57d6..b29180fc2f 100644 --- a/flyteplugins/go/tasks/plugins/hive/config/config.go +++ b/flyteplugins/go/tasks/plugins/hive/config/config.go @@ -42,14 +42,15 @@ var ( // Qubole plugin configs type Config struct { - Endpoint config.URL `json:"endpoint" pflag:",Endpoint for qubole to use"` - CommandAPIPath config.URL `json:"commandApiPath" pflag:",API Path where commands can be launched on Qubole. Should be a valid url."` - AnalyzeLinkPath config.URL `json:"analyzeLinkPath" pflag:",URL path where queries can be visualized on qubole website. Should be a valid url."` - TokenKey string `json:"quboleTokenKey" pflag:",Name of the key where to find Qubole token in the secret manager."` - Limit int `json:"quboleLimit" pflag:",Global limit for concurrent Qubole queries"` - LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"` - Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"` - ClusterLabels []string `json:"clusterLabels" pflag:",List of labels of service clusters"` + Endpoint config.URL `json:"endpoint" pflag:",Endpoint for qubole to use"` + CommandAPIPath config.URL `json:"commandApiPath" pflag:",API Path where commands can be launched on Qubole. Should be a valid url."` + AnalyzeLinkPath config.URL `json:"analyzeLinkPath" pflag:",URL path where queries can be visualized on qubole website. Should be a valid url."` + TokenKey string `json:"quboleTokenKey" pflag:",Name of the key where to find Qubole token in the secret manager."` + Limit int `json:"quboleLimit" pflag:",Global limit for concurrent Qubole queries"` + LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"` + Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"` + HiveConfig map[string]int `json:"clusterLabels" pflag:",List of labels of service clusters"` + PrestoConfig map[string]int } // Retrieves the current config value or default. diff --git a/flyteplugins/go/tasks/plugins/hive/executor.go b/flyteplugins/go/tasks/plugins/hive/executor.go index e533e20e19..81772015c0 100644 --- a/flyteplugins/go/tasks/plugins/hive/executor.go +++ b/flyteplugins/go/tasks/plugins/hive/executor.go @@ -103,19 +103,32 @@ func (q QuboleHiveExecutor) GetProperties() core.PluginProperties { func QuboleHiveExecutorLoader(ctx context.Context, iCtx core.SetupContext) (core.Plugin, error) { cfg := config.GetQuboleConfig() - q, err := NewQuboleHiveExecutor(ctx, cfg, client.NewQuboleClient(cfg), quboleResourceNamespace, iCtx.SecretManager(), iCtx.MetricsScope()) + return InitializeHiveExecutor(ctx, iCtx, BuildResourceConfig(cfg), client.NewQuboleClient(cfg)) +} + +func BuildResourceConfig() { + resourceConfig := make(map[string]int, len(cfg.ClusterLabels)) + + for _, cluster := range cfg.Presto { + resourceConfig[cluster] = cfg.Limit + } +} + +func InitializeHiveExecutor(ctx context.Context, iCtx core.SetupContext, resourceConfig map[string]int, + quboleClient client.QuboleClient) (core.Plugin, error) { + q, err := NewQuboleHiveExecutor(ctx, cfg, quboleClient, quboleResourceNamespace, iCtx.SecretManager(), iCtx.MetricsScope()) if err != nil { return nil, err } - for _, cluster := range cfg.ClusterLabels { - clusteredResourceNamespacePrefix := quboleResourceNamespace.CreateSubNamespace(core.ResourceNamespace(cluster)) - if err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, clusteredResourceNamespacePrefix, cfg.Limit); err != nil { + for clusterName, limit := range resourceConfig { + if err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, clusterName, limit); err != nil { return nil, err } } return q, nil + } // type PluginLoader func(ctx context.Context, iCtx SetupContext) (Plugin, error) From d83e648eadae07a8b984c4dab9d20908876ef551 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Fri, 3 Jan 2020 16:10:21 -0800 Subject: [PATCH 02/16] refactor the resource namespace for hive plugin and rerun go generate --- flyteplugins/Gopkg.lock | 204 ++++++++++-------- flyteplugins/Makefile | 2 +- .../catalog/mocks/async_client.go | 10 +- .../pluginmachinery/catalog/mocks/client.go | 13 +- .../catalog/mocks/download_future.go | 6 +- .../catalog/mocks/download_response.go | 6 +- .../pluginmachinery/catalog/mocks/future.go | 6 +- .../catalog/mocks/upload_future.go | 6 +- .../core/mocks/events_recorder.go | 9 +- .../pluginmachinery/core/mocks/kube_client.go | 8 +- .../pluginmachinery/core/mocks/plugin.go | 9 +- .../core/mocks/resource_manager.go | 9 +- .../core/mocks/resource_registrar.go | 9 +- .../core/mocks/secret_manager.go | 6 +- .../core/mocks/setup_context.go | 9 +- .../core/mocks/task_execution_context.go | 15 +- .../core/mocks/task_execution_id.go | 6 +- .../core/mocks/task_execution_metadata.go | 12 +- .../core/mocks/task_overrides.go | 6 +- .../pluginmachinery/core/mocks/task_reader.go | 9 +- .../pluginmachinery/flytek8s/config/config.go | 4 +- .../io/mocks/input_file_paths.go | 6 +- .../pluginmachinery/io/mocks/input_reader.go | 12 +- .../io/mocks/output_file_paths.go | 6 +- .../pluginmachinery/io/mocks/output_reader.go | 12 +- .../pluginmachinery/io/mocks/output_writer.go | 12 +- .../tasks/pluginmachinery/k8s/mocks/plugin.go | 12 +- .../k8s/mocks/plugin_context.go | 8 +- .../pluginmachinery/k8s/mocks/resource.go | 15 +- .../workqueue/mocks/indexed_work_queue.go | 9 +- .../workqueue/mocks/processor.go | 9 +- .../workqueue/mocks/work_item_info.go | 6 +- .../awsbatch/mocks/batch_service_client.go | 13 +- .../plugins/array/awsbatch/mocks/cache.go | 6 +- .../plugins/array/awsbatch/mocks/client.go | 10 +- .../hive/client/mocks/qubole_client.go | 10 +- .../go/tasks/plugins/hive/config/config.go | 24 ++- .../tasks/plugins/hive/config/config_flags.go | 1 - .../plugins/hive/config/config_flags_test.go | 22 -- .../plugins/hive/execution_state_test.go | 8 +- .../go/tasks/plugins/hive/executor.go | 56 ++--- 41 files changed, 263 insertions(+), 358 deletions(-) diff --git a/flyteplugins/Gopkg.lock b/flyteplugins/Gopkg.lock index ebc7b1a46f..b06c57ad97 100644 --- a/flyteplugins/Gopkg.lock +++ b/flyteplugins/Gopkg.lock @@ -2,23 +2,26 @@ [[projects]] - digest = "1:9facd370d4b5aaf82bb6549c358eefc71c771458fc91d1afe724434aece9d886" + digest = "1:cb193a00b8b032b70a5bad3b57da90867fcfea7e3d23581e71dda50fca9cecd5" name = "cloud.google.com/go" packages = ["compute/metadata"] pruneopts = "" - revision = "cfe8f6d1fe6976d03af790d7a8b9bf6aa73287bd" - version = "v0.47.0" + revision = "e5804108aed715c49f731aab1a847f063fb23681" + version = "v0.50.0" [[projects]] - digest = "1:9a11be778d5fcb8e4873e64a097dfd2862d8665d9e2d969b90810d5272e51acb" + digest = "1:d7ba847f449aef80f09d6b25315ff83f37f8231fe6a83d1ed38baac5892584ce" name = "github.com/Azure/azure-sdk-for-go" - packages = ["storage"] + packages = [ + "storage", + "version", + ] pruneopts = "" - revision = "2d49bb8f2cee530cc16f1f1a9f0aae763dee257d" - version = "v10.2.1-beta" + revision = "781d9fb592a1818cb7fb8e0855ea9c00b095a9e9" + version = "v37.2.0" [[projects]] - digest = "1:e4a02906493a47ee87ef61aeea130ce6624da07349a6dc62494a4e72b550ca8e" + digest = "1:b82e05494fb7e33ff9e4421171be18159a8afb614e3794572ddfe2db28a05d5f" name = "github.com/Azure/go-autorest" packages = [ "autorest", @@ -29,8 +32,8 @@ "tracing", ] pruneopts = "" - revision = "3492b2aff5036c67228ab3c7dba3577c871db200" - version = "v13.3.0" + revision = "21d4b01533b1005be0d020da67a6d3f8ebdf0141" + version = "v13.3.1" [[projects]] digest = "1:e1549ae10031ac55dd7d26ac4d480130ddbdf97f9a26ebbedff089aa0335798f" @@ -45,10 +48,11 @@ version = "v0.1.3" [[projects]] - digest = "1:5cd3c3d202b439040a051b6fa21f538af6469c94f9758dfb4f97a24ceb144149" + digest = "1:7980af5e19c3739b07cc411ca5d59cb5851a359f6a070348aa23be387e3b8e7e" name = "github.com/aws/aws-sdk-go" packages = [ "aws", + "aws/arn", "aws/awserr", "aws/awsutil", "aws/client", @@ -86,14 +90,15 @@ "private/protocol/xml/xmlutil", "service/batch", "service/s3", + "service/s3/internal/arn", "service/s3/s3iface", "service/s3/s3manager", "service/sts", "service/sts/stsiface", ] pruneopts = "" - revision = "2f232d11486e77d344da0723340b566d3ff7865a" - version = "v1.25.24" + revision = "a1e6946e8014a793d989e64ef5566315010ce898" + version = "v1.27.0" [[projects]] digest = "1:ac2a05be7167c495fe8aaf8aaf62ecf81e78d2180ecb04e16778dc6c185c96a5" @@ -104,12 +109,12 @@ version = "v1.0.1" [[projects]] - digest = "1:545ae40d6dde46043a71bdfd7f9a17f2353ce16277c83ac685af231b4b7c4beb" + digest = "1:d25acc7560ed91f825cb9b01a1e945bb1117cdcaba19077137e2d9ffc9cf6d05" name = "github.com/cespare/xxhash" packages = ["."] pruneopts = "" - revision = "de209a9ffae3256185a6bb135d1a0ada7b2b5f09" - version = "v2.1.0" + revision = "d7df74196a9e781ede915320c11c378c1b2f3a1f" + version = "v2.1.1" [[projects]] digest = "1:193f6d32d751f26540aa8eeedc114ce0a51f9e77b6c22dda3a4db4e5f65aec66" @@ -136,28 +141,28 @@ version = "v3.2.0" [[projects]] - digest = "1:46ddeb9dd35d875ac7568c4dc1fc96ce424e034bdbb984239d8ffc151398ec01" + digest = "1:cdcdaf690213dd7daa324a427928c6e7b062085c5fd6d4272db8fb0afba8dac3" name = "github.com/evanphx/json-patch" packages = ["."] pruneopts = "" - revision = "026c730a0dcc5d11f93f1cf1cc65b01247ea7b6f" - version = "v4.5.0" + revision = "bf22ed9311622d93e213ba31e4ae7a5771e5d379" + version = "v4.6.0" [[projects]] - digest = "1:e988ed0ca0d81f4d28772760c02ee95084961311291bdfefc1b04617c178b722" + digest = "1:afb117f8c84e70a479a3c4c2d9250b29634b65d392d2339ee833626905a28ecd" name = "github.com/fatih/color" packages = ["."] pruneopts = "" - revision = "5b77d2a35fb0ede96d138fc9a99f5c9b6aef11b4" - version = "v1.7.0" + revision = "2e5e248695e7e4a0f9ff6d548b83b99199326f49" + version = "v1.8.0" [[projects]] - branch = "master" - digest = "1:c47fa36ae2c154748003f5d39c5ca94e86ec144b37e4975fad91a1c7ff5047fb" + digest = "1:eb53021a8aa3f599d29c7102e65026242bdedce998a54837dc67f14b6a97c5fd" name = "github.com/fsnotify/fsnotify" packages = ["."] pruneopts = "" - revision = "4bf2d1fec78374803a39307bfb8d340688f4f28e" + revision = "c2828203cd70a50dcccfb2761f8b1f8ceef9a8e9" + version = "v1.4.7" [[projects]] digest = "1:65587005c6fa4293c0b8a2e457e689df7fda48cc5e1f5449ea2c1e7784551558" @@ -188,11 +193,11 @@ [[projects]] branch = "master" - digest = "1:156e8134a4ab3efcbcbc6e3dd5069bfb70ed8cbb1cdd7db557bd9035c363f335" + digest = "1:bd738ea0dd7d1ce14f6410e1a966c946bcb1855db767a16836d5c56ca3f2f2a0" name = "github.com/golang/groupcache" packages = ["lru"] pruneopts = "" - revision = "611e8accdfc92c4187d399e95ce826046d4c8d73" + revision = "215e87163ea771ffa998a96c611387313bb5a403" [[projects]] digest = "1:b852d2b62be24e445fcdbad9ce3015b44c207815d631230dfce3f14e7803f5bf" @@ -253,7 +258,7 @@ version = "v0.3.1" [[projects]] - digest = "1:1ea91d049b6a609f628ecdfda32e85f445a0d3671980dcbf7cbe1bbd7ee6aabc" + digest = "1:3c582fec3bc9ac9fd1809e58ed6139341afb16e83241d0a2edd7a293cc0e2792" name = "github.com/graymeta/stow" packages = [ ".", @@ -265,7 +270,8 @@ "swift", ] pruneopts = "" - revision = "903027f87de7054953efcdb8ba70d5dc02df38c7" + revision = "926777b8f7aaf9b57d475c612baebb0968532c48" + version = "v0.2.4" [[projects]] digest = "1:7f6f07500a0b7d3766b00fa466040b97f2f5b5f3eef2ecabfe516e703b05119a" @@ -313,12 +319,12 @@ revision = "c2b33e84" [[projects]] - digest = "1:64bdeae058b988b2b198326b1ca6155497e904e697348d838add8a6e4c25842e" + digest = "1:fb8bce9822eac1e2aeee6c2621cf25c6dec8f8f5f50a09a4a894d7932bfb2106" name = "github.com/json-iterator/go" packages = ["."] pruneopts = "" - revision = "03217c3e97663914aec3faafde50d081f197a0a2" - version = "v1.1.8" + revision = "acfec88f7a0d5140ace3dcdbee10184e3684a9e1" + version = "v1.1.9" [[projects]] digest = "1:0f51cee70b0d254dbc93c22666ea2abf211af81c1701a96d04e2284b408621db" @@ -329,7 +335,7 @@ version = "v1.0.2" [[projects]] - digest = "1:24508da9d4741637264e1f5deeceb58150415974ce08efe1f0764ffa881d833d" + digest = "1:7e7dbc278fab0a772ac3eb0f97c75677e151887fc9b8f065639090e0af0747a0" name = "github.com/lyft/flyteidl" packages = [ "clients/go/coreutils", @@ -339,11 +345,11 @@ "gen/pb-go/flyteidl/plugins", ] pruneopts = "" - revision = "7f8cdc2cb0f613bb62b8b44167828326b733f8a5" - version = "v0.16.3" + revision = "28c0dfb6608b70262aac9cb1ff83a750521ded8e" + version = "v0.16.5" [[projects]] - digest = "1:7fee6ae151d081a92d58a53099adcfccf89e424aef03eb20531ae0285da8613d" + digest = "1:6c8d5d4b7189d903f7a2bcca20d5e2d721493f51406a16dcf27caf09315f522c" name = "github.com/lyft/flytestdlib" packages = [ "atomic", @@ -363,8 +369,8 @@ "utils", ] pruneopts = "" - revision = "f9214671dc3b5e86ff029bd514fc18be38c1c3f0" - version = "v0.2.28" + revision = "ee0dc8209867ca344a70003924bfb6be12fd29ce" + version = "v0.2.31" [[projects]] digest = "1:ae39921edb7f801f7ce1b6b5484f9715a1dd2b52cb645daef095cd10fd6ee774" @@ -378,20 +384,20 @@ version = "v1.8.1" [[projects]] - digest = "1:9ea83adf8e96d6304f394d40436f2eb44c1dc3250d223b74088cc253a6cd0a1c" + digest = "1:9b58ad18b38cf0d44b1d006fce319e45f200eafa406fe7f65fe0d0194b9f7a9f" name = "github.com/mattn/go-colorable" packages = ["."] pruneopts = "" - revision = "167de6bfdfba052fa6b2d3664c8f5272e23c9072" - version = "v0.0.9" + revision = "98ec13f34aabf44cc914c65a1cfb7b9bc815aef1" + version = "v0.1.4" [[projects]] - digest = "1:5794dadc5456af270fc564d98efbfa1f3627cd6c733ca772d14409793772064f" + digest = "1:f438921468fbc770ac4c97628f188913a099d4f004226e4aeb6e0f62f5b14fd1" name = "github.com/mattn/go-isatty" packages = ["."] pruneopts = "" - revision = "88ba11cfdc67c7588b30042edf244b2875f892b6" - version = "v0.0.10" + revision = "31745d66dd679ac0ac4f8d3ecff168fce6170c6a" + version = "v0.0.11" [[projects]] digest = "1:63722a4b1e1717be7b98fc686e0b30d5e7f734b9e93d7dee86293b6deab7ea28" @@ -426,12 +432,12 @@ version = "1.0.1" [[projects]] - branch = "master" - digest = "1:b6c101f6c8ab09c631e969c30d3a4b42aeca82580499253bad77cb2426d4fc27" + digest = "1:f05d92116f66e924481201d313512e58cd85c135ba6d9818ab5f6b19501f8e76" name = "github.com/ncw/swift" packages = ["."] pruneopts = "" - revision = "a24ef33bc9b7e59ae4bed9e87a51d7bc76122731" + revision = "f737f4e00462f79ff2e0ddbcfb09331ce7ec4fa9" + version = "v1.0.49" [[projects]] digest = "1:5f347ea0b4656f17f0ddffae1419dafaac4bc6ed57c92d58cadc7452f8a9ac3f" @@ -469,12 +475,12 @@ version = "v0.9.4" [[projects]] - branch = "master" - digest = "1:0a565f69553dd41b3de790fde3532e9237142f2637899e20cd3e7396f0c4f2f7" + digest = "1:ff7a5f44653e65cf1a0577bbe3f2cdaf514930348f6df581bbd687bbe35ead5b" name = "github.com/prometheus/client_model" packages = ["go"] pruneopts = "" - revision = "14fe0d1b01d4d5fc031dd4bec1823bd3ebbe8016" + revision = "d1d2010b5beead3fa1c5f271a5cf626e40b3ad6e" + version = "v0.1.0" [[projects]] digest = "1:8904acfa3ef080005c1fc0670ed0471739d1e211be5638cfa6af536b701942ae" @@ -489,7 +495,7 @@ version = "v0.7.0" [[projects]] - digest = "1:af5cd8219fd15c06eadaab455c0beb72f2f7bb32d298acb401d30c452a8dbd7e" + digest = "1:4c64aa254bc24990bc0216de9dd955ff83f061e9baac7ed2ffc293442ab7514a" name = "github.com/prometheus/procfs" packages = [ ".", @@ -497,12 +503,12 @@ "internal/util", ] pruneopts = "" - revision = "499c85531f756d1129edd26485a5f73871eeb308" - version = "v0.0.5" + revision = "6d489fc7f1d9cd890a250f3ea3431b1744b9623f" + version = "v0.0.8" [[projects]] digest = "1:7f569d906bdd20d906b606415b7d794f798f91a62fcfb6a4daa6d50690fb7a3f" - name = "github.com/satori/uuid" + name = "github.com/satori/go.uuid" packages = ["."] pruneopts = "" revision = "f58768cc1a7a7e77a3bd49e98cdd21419399b6a3" @@ -528,12 +534,12 @@ version = "v1.2.2" [[projects]] - digest = "1:ae3493c780092be9d576a1f746ab967293ec165e8473425631f06658b6212afc" + digest = "1:0a3d8f7dc17afdc30a5a5becb45fb91ff28aaa64645c69836ccd600d88e9ed9e" name = "github.com/spf13/cast" packages = ["."] pruneopts = "" - revision = "8c9545af88b134710ab1cd196795e7f2388358d7" - version = "v1.3.0" + revision = "1ffadf551085444af981432dd0f6d1160c11ec64" + version = "v1.3.1" [[projects]] digest = "1:0c63b3c7ad6d825a898f28cb854252a3b29d37700c68a117a977263f5ec94efe" @@ -560,11 +566,12 @@ version = "v1.0.5" [[projects]] - digest = "1:c25a789c738f7cc8ec7f34026badd4e117853f329334a5aa45cf5d0727d7d442" + digest = "1:ede5f300103cb012aafde77c692dc853c4b590bb412d3c7965a11748a5c37635" name = "github.com/spf13/viper" packages = ["."] pruneopts = "" - revision = "ae103d7e593e371c69e832d5eb3347e2b80cbbc9" + revision = "eabbc68a3ecd5cf8c11a2f84dbda5e7a38493b2f" + version = "v1.6.1" [[projects]] digest = "1:711eebe744c0151a9d09af2315f0bb729b2ec7637ef4c410fa90a18ef74b65b6" @@ -585,6 +592,14 @@ revision = "221dbe5ed46703ee255b1da0dec05086f5035f62" version = "v1.4.0" +[[projects]] + digest = "1:e818a738c880c5571b3817e9173add5c598a90f8615fa7a9bafc23fea774a603" + name = "github.com/subosito/gotenv" + packages = ["."] + pruneopts = "" + revision = "2ef7124db659d49edac6aa459693a15ae36c671a" + version = "v1.2.0" + [[projects]] branch = "master" digest = "1:f05401b06311e9ab82c76f89022a3503139a9f608233fdbf14386ed1e4fc9520" @@ -595,7 +610,7 @@ source = "github.com/enghabu/mockery" [[projects]] - digest = "1:1967fb934ef747bf690fcc56487a06c46bf674bd91cb3381a78a7e4d5c2e1a82" + digest = "1:61ff2a14b0f9a396d92bb6967ecd1ae4c4767ff3e008acdfd8291af134336d9f" name = "go.opencensus.io" packages = [ ".", @@ -616,20 +631,20 @@ "trace/tracestate", ] pruneopts = "" - revision = "59d1ce35d30f3c25ba762169da2a37eab6ffa041" - version = "v0.22.1" + revision = "aad2c527c5defcf89b5afab7f37274304195a6b2" + version = "v0.22.2" [[projects]] branch = "master" - digest = "1:47cd7b63835b62da6a8c3817fafde2e07aaf952ce170a6ecafedb06946be6ddb" + digest = "1:623570fddb99ef064b125c7bf3161f14aba21fdb787896b6c5070d34187c86f8" name = "golang.org/x/crypto" packages = ["ssh/terminal"] pruneopts = "" - revision = "8986dd9e96cf0a6f74da406c005ba3df38527c04" + revision = "53104e6ec876ad4e22ad27cce588b01392043c1b" [[projects]] branch = "master" - digest = "1:196c20449a0cc94113029193d7ce9e36e46e676c27a2fe06522243f57ced19c5" + digest = "1:cab37ea831bb9be343b41a7673e64b6bb8399bc0f051180c04ce561573ed2c89" name = "golang.org/x/net" packages = [ "context", @@ -642,11 +657,11 @@ "trace", ] pruneopts = "" - revision = "fe3aa8a4527195a6057b3fad46619d7d090e99b5" + revision = "c0dbc17a35534bf2e581d7a942408dc936316da4" [[projects]] branch = "master" - digest = "1:01bdbbc604dcd5afb6f66a717f69ad45e9643c72d5bc11678d44ffa5c50f9e42" + digest = "1:44d75e002fa89506cca17f921eb07aee69526d7519bb7f1f2b0e017380c5e011" name = "golang.org/x/oauth2" packages = [ ".", @@ -656,18 +671,18 @@ "jwt", ] pruneopts = "" - revision = "0f29369cfe4552d0e4bcddc57cc75f4d7e672a33" + revision = "858c2ad4c8b6c5d10852cb89079f6ca1c7309787" [[projects]] branch = "master" - digest = "1:a155b7400cb9270dda3f63651160c0349a9e16855975956c79e618e9f30c160c" + digest = "1:8c945c7d15d859d00371d42149761229fcc8dcd839ff2b998defba22fc9f7d24" name = "golang.org/x/sys" packages = [ "unix", "windows", ] pruneopts = "" - revision = "f43be2a4598cf3a47be9f94f0c28197ed9eae611" + revision = "a1369afcdac740082c63165b07ec83b531884be2" [[projects]] digest = "1:740b51a55815493a8d0f2b1e0d0ae48fe48953bf7eaf3fcc4198823bf67768c0" @@ -704,18 +719,17 @@ [[projects]] branch = "master" - digest = "1:61bfb54376228f1111f1f0f2d609971251f49ba1e3d8dd801249b521010ae0ca" + digest = "1:9d4ac09a835404ae9306c6e1493cf800ecbb0f3f828f4333b3e055de4c962eea" name = "golang.org/x/xerrors" packages = [ ".", "internal", ] pruneopts = "" - revision = "1b5146add8981d58be77b16229c0ff0f8bebd8c1" + revision = "9bdfabe68543c54f90421aeb9a60ef8061b5b544" [[projects]] - branch = "master" - digest = "1:6cad05171b0cb6abda71bcea1abfc392b32816264a0cfd15ea3ed629a25219df" + digest = "1:1e9e6f4d4ca4c890f3de6dbaf071a9a428991c571fbaeae210981729cd19cd1a" name = "google.golang.org/api" packages = [ "googleapi", @@ -729,7 +743,8 @@ "transport/http/internal/propagation", ] pruneopts = "" - revision = "4b5ab5e994c422e7c6359a085062f721716b4c22" + revision = "aa5d4e47691e7ae1aebb5221ff8e4beea23fad72" + version = "v0.15.0" [[projects]] digest = "1:c4404231035fad619a12f82ae3f0f8f9edc1cc7f34e7edad7a28ccac5336cc96" @@ -752,17 +767,19 @@ [[projects]] branch = "master" - digest = "1:b29336aebd5fb26d3f6b8e776969db4e4e29765ff7313eb43d4593961a64e438" + digest = "1:ba64d3acd79ad4dee34b76de29a73959114a204f1f35791807171a152d41aa38" name = "google.golang.org/genproto" packages = ["googleapis/rpc/status"] pruneopts = "" - revision = "919d9bdd9fe6f1a5dd95ce5d5e4cdb8fd3c516d0" + revision = "f3c370f40bfba3cb25c5c2f823a1a8031b5ad724" [[projects]] - digest = "1:30d215704e78c21ffff90aa8e86ca1a438fec2837bad082116510fe5a862af5e" + digest = "1:7af390490e636a6adc9c76b37a3c823195fbf375a02c4d9506b4dd49d5d2409a" name = "google.golang.org/grpc" packages = [ ".", + "attributes", + "backoff", "balancer", "balancer/base", "balancer/roundrobin", @@ -778,10 +795,13 @@ "internal/backoff", "internal/balancerload", "internal/binarylog", + "internal/buffer", "internal/channelz", "internal/envconfig", "internal/grpcrand", "internal/grpcsync", + "internal/resolver/dns", + "internal/resolver/passthrough", "internal/syscall", "internal/transport", "keepalive", @@ -789,16 +809,14 @@ "naming", "peer", "resolver", - "resolver/dns", - "resolver/passthrough", "serviceconfig", "stats", "status", "tap", ] pruneopts = "" - revision = "f6d0f9ee430895e87ef1ceb5ac8f39725bafceef" - version = "v1.24.0" + revision = "f5b0812e6fe574d90da76b205e9eb51f6ddb1919" + version = "v1.26.0" [[projects]] digest = "1:75fb3fcfc73a8c723efde7777b40e8e8ff9babf30d8c56160d01beffea8a95a6" @@ -809,12 +827,20 @@ version = "v0.9.1" [[projects]] - digest = "1:ab9547706f32a7535bb4f25d6b58ad00436630593cd3e3ed4602f1613ed84783" + digest = "1:d68a26e5d4ba1d9213041aad76d2d6b6d68dba853ceb585235311bd86748e473" + name = "gopkg.in/ini.v1" + packages = ["."] + pruneopts = "" + revision = "94291fffe2b14f4632ec0e67c1bfecfc1287a168" + version = "v1.51.1" + +[[projects]] + digest = "1:5a53f6ef09fb1ac261a97f8a72e8837ff53cbaa969022a6679da210e4cbe9b0f" name = "gopkg.in/yaml.v2" packages = ["."] pruneopts = "" - revision = "f221b8435cfb71e54062f6c6e99e9ade30b124d5" - version = "v2.2.4" + revision = "1f64d6156d11335c3f22d9330b0ad14fc1e789ce" + version = "v2.2.7" [[projects]] digest = "1:4a456d15ea81830e9aaf685ad36100c8be2adfb646996db18432073e07949fe9" @@ -959,15 +985,15 @@ [[projects]] branch = "master" - digest = "1:ad13d36fb31a3e590b143439610f1a35b4033437ebf565dbc14a72ed4bd61dfb" + digest = "1:16a343bd9d820ae320de4d1eaa8acc7a214aac4b38fb21d03255d3a457d861df" name = "k8s.io/kube-openapi" packages = ["pkg/util/proto"] pruneopts = "" - revision = "0270cf2f1c1d995d34b36019a6f65d58e6e33ad4" + revision = "30be4d16710ac61bce31eb28a01054596fe6a9f1" [[projects]] branch = "master" - digest = "1:eea15d4ad177b9185f26550b314024a374838830b33a0954b179cad16f2e39a7" + digest = "1:7b3e186eef10afc7c1f868fea5bd20575e4960e3113af467275f750b628bc64e" name = "k8s.io/utils" packages = [ "buffer", @@ -975,7 +1001,7 @@ "trace", ] pruneopts = "" - revision = "2b95a09bc58df43d4032504619706b6a38293a47" + revision = "f07c713de88362aef7545072487d6118bd4a3d4a" [[projects]] digest = "1:d65cadd0126bf9385147e2853e53fda65fc419c04da329b3b077fb4765e3737c" diff --git a/flyteplugins/Makefile b/flyteplugins/Makefile index f2e2a3125f..3c4f908bc3 100755 --- a/flyteplugins/Makefile +++ b/flyteplugins/Makefile @@ -8,7 +8,7 @@ update_boilerplate: generate: which pflags || (go get github.com/lyft/flytestdlib/cli/pflags) - which mockery || (go install github.com/lyft/flyteidl/vendor/github.com/vektra/mockery/cmd/mockery) + which mockery || (go install github.com/lyft/flyteplugins/vendor/github.com/vektra/mockery/cmd/mockery) which enumer || (go get github.com/alvaroloes/enumer) @go generate ./... diff --git a/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/async_client.go b/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/async_client.go index 46fc8c5eca..7ec246a001 100644 --- a/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/async_client.go +++ b/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/async_client.go @@ -2,13 +2,9 @@ package mocks -import ( - context "context" - - catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" - - mock "github.com/stretchr/testify/mock" -) +import catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" +import context "context" +import mock "github.com/stretchr/testify/mock" // AsyncClient is an autogenerated mock type for the AsyncClient type type AsyncClient struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/client.go b/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/client.go index d3bedd110d..04dcf7a3e5 100644 --- a/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/client.go +++ b/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/client.go @@ -2,15 +2,10 @@ package mocks -import ( - context "context" - - catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" - - io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" - - mock "github.com/stretchr/testify/mock" -) +import catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" +import context "context" +import io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" +import mock "github.com/stretchr/testify/mock" // Client is an autogenerated mock type for the Client type type Client struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/download_future.go b/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/download_future.go index ba08ced6f0..6c555a243d 100644 --- a/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/download_future.go +++ b/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/download_future.go @@ -2,10 +2,8 @@ package mocks -import ( - catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" - mock "github.com/stretchr/testify/mock" -) +import catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" +import mock "github.com/stretchr/testify/mock" // DownloadFuture is an autogenerated mock type for the DownloadFuture type type DownloadFuture struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/download_response.go b/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/download_response.go index cff414ea80..a58f93fa01 100644 --- a/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/download_response.go +++ b/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/download_response.go @@ -2,11 +2,9 @@ package mocks -import ( - bitarray "github.com/lyft/flytestdlib/bitarray" +import bitarray "github.com/lyft/flytestdlib/bitarray" - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // DownloadResponse is an autogenerated mock type for the DownloadResponse type type DownloadResponse struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/future.go b/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/future.go index 710d46b753..c08d0b949a 100644 --- a/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/future.go +++ b/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/future.go @@ -2,10 +2,8 @@ package mocks -import ( - catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" - mock "github.com/stretchr/testify/mock" -) +import catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" +import mock "github.com/stretchr/testify/mock" // Future is an autogenerated mock type for the Future type type Future struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/upload_future.go b/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/upload_future.go index c6bd559186..42686bd74c 100644 --- a/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/upload_future.go +++ b/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/upload_future.go @@ -2,10 +2,8 @@ package mocks -import ( - catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" - mock "github.com/stretchr/testify/mock" -) +import catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" +import mock "github.com/stretchr/testify/mock" // UploadFuture is an autogenerated mock type for the UploadFuture type type UploadFuture struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/core/mocks/events_recorder.go b/flyteplugins/go/tasks/pluginmachinery/core/mocks/events_recorder.go index ae493a1ec2..a2ba4a287d 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/mocks/events_recorder.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/mocks/events_recorder.go @@ -2,12 +2,9 @@ package mocks -import ( - context "context" - - core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" - mock "github.com/stretchr/testify/mock" -) +import context "context" +import core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" +import mock "github.com/stretchr/testify/mock" // EventsRecorder is an autogenerated mock type for the EventsRecorder type type EventsRecorder struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/core/mocks/kube_client.go b/flyteplugins/go/tasks/pluginmachinery/core/mocks/kube_client.go index 1404b317b5..ca48835335 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/mocks/kube_client.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/mocks/kube_client.go @@ -2,12 +2,10 @@ package mocks -import ( - cache "sigs.k8s.io/controller-runtime/pkg/cache" - client "sigs.k8s.io/controller-runtime/pkg/client" +import cache "sigs.k8s.io/controller-runtime/pkg/cache" +import client "sigs.k8s.io/controller-runtime/pkg/client" - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // KubeClient is an autogenerated mock type for the KubeClient type type KubeClient struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/core/mocks/plugin.go b/flyteplugins/go/tasks/pluginmachinery/core/mocks/plugin.go index 1d925a798b..74b6ae5b5a 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/mocks/plugin.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/mocks/plugin.go @@ -2,12 +2,9 @@ package mocks -import ( - context "context" - - core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" - mock "github.com/stretchr/testify/mock" -) +import context "context" +import core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" +import mock "github.com/stretchr/testify/mock" // Plugin is an autogenerated mock type for the Plugin type type Plugin struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/core/mocks/resource_manager.go b/flyteplugins/go/tasks/pluginmachinery/core/mocks/resource_manager.go index 1f01a632e5..eefa650c8c 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/mocks/resource_manager.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/mocks/resource_manager.go @@ -2,12 +2,9 @@ package mocks -import ( - context "context" - - core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" - mock "github.com/stretchr/testify/mock" -) +import context "context" +import core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" +import mock "github.com/stretchr/testify/mock" // ResourceManager is an autogenerated mock type for the ResourceManager type type ResourceManager struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/core/mocks/resource_registrar.go b/flyteplugins/go/tasks/pluginmachinery/core/mocks/resource_registrar.go index c2707c49d5..28e934801f 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/mocks/resource_registrar.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/mocks/resource_registrar.go @@ -2,12 +2,9 @@ package mocks -import ( - context "context" - - core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" - mock "github.com/stretchr/testify/mock" -) +import context "context" +import core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" +import mock "github.com/stretchr/testify/mock" // ResourceRegistrar is an autogenerated mock type for the ResourceRegistrar type type ResourceRegistrar struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/core/mocks/secret_manager.go b/flyteplugins/go/tasks/pluginmachinery/core/mocks/secret_manager.go index 84953489b6..3f2f123913 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/mocks/secret_manager.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/mocks/secret_manager.go @@ -2,11 +2,9 @@ package mocks -import ( - context "context" +import context "context" - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // SecretManager is an autogenerated mock type for the SecretManager type type SecretManager struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/core/mocks/setup_context.go b/flyteplugins/go/tasks/pluginmachinery/core/mocks/setup_context.go index 46be413f34..49f6ad851c 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/mocks/setup_context.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/mocks/setup_context.go @@ -2,12 +2,9 @@ package mocks -import ( - core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" - mock "github.com/stretchr/testify/mock" - - promutils "github.com/lyft/flytestdlib/promutils" -) +import core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" +import mock "github.com/stretchr/testify/mock" +import promutils "github.com/lyft/flytestdlib/promutils" // SetupContext is an autogenerated mock type for the SetupContext type type SetupContext struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_context.go b/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_context.go index c53bcd0c81..4a2bf2b477 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_context.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_context.go @@ -2,16 +2,11 @@ package mocks -import ( - catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" - core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" - - io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" - - mock "github.com/stretchr/testify/mock" - - storage "github.com/lyft/flytestdlib/storage" -) +import catalog "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" +import core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" +import io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" +import mock "github.com/stretchr/testify/mock" +import storage "github.com/lyft/flytestdlib/storage" // TaskExecutionContext is an autogenerated mock type for the TaskExecutionContext type type TaskExecutionContext struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_id.go b/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_id.go index 1b1332936e..e07210e439 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_id.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_id.go @@ -2,10 +2,8 @@ package mocks -import ( - flyteidlcore "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" - mock "github.com/stretchr/testify/mock" -) +import flyteidlcore "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" +import mock "github.com/stretchr/testify/mock" // TaskExecutionID is an autogenerated mock type for the TaskExecutionID type type TaskExecutionID struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_metadata.go b/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_metadata.go index c7d7a0b886..3c844d9079 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_metadata.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_metadata.go @@ -2,14 +2,10 @@ package mocks -import ( - core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" - mock "github.com/stretchr/testify/mock" - - types "k8s.io/apimachinery/pkg/types" - - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) +import core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" +import mock "github.com/stretchr/testify/mock" +import types "k8s.io/apimachinery/pkg/types" +import v1 "k8s.io/apimachinery/pkg/apis/meta/v1" // TaskExecutionMetadata is an autogenerated mock type for the TaskExecutionMetadata type type TaskExecutionMetadata struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_overrides.go b/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_overrides.go index dc35666c77..9f0167004c 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_overrides.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_overrides.go @@ -2,10 +2,8 @@ package mocks -import ( - mock "github.com/stretchr/testify/mock" - v1 "k8s.io/api/core/v1" -) +import mock "github.com/stretchr/testify/mock" +import v1 "k8s.io/api/core/v1" // TaskOverrides is an autogenerated mock type for the TaskOverrides type type TaskOverrides struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_reader.go b/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_reader.go index c29cc26ab5..62ea96ae91 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_reader.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_reader.go @@ -2,12 +2,9 @@ package mocks -import ( - context "context" - - core "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" - mock "github.com/stretchr/testify/mock" -) +import context "context" +import core "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" +import mock "github.com/stretchr/testify/mock" // TaskReader is an autogenerated mock type for the TaskReader type type TaskReader struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go index 5889db653a..6883983ebc 100755 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go @@ -1,7 +1,7 @@ package config import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "github.com/lyft/flyteplugins/go/tasks/config" ) @@ -35,7 +35,7 @@ type K8sPluginConfig struct { // Provide additional environment variable pairs that plugin authors will provide to containers DefaultEnvVars map[string]string `json:"default-env-vars" pflag:"-,Additional environment variable that should be injected into every resource"` // Provide additional environment variable pairs whose values resolve from the plugin's execution environment. - DefaultEnvVarsFromEnv map[string]string `json:"default-env-vars-from-env" pflag:",Additional environment variable that should be injected into every resource"` + DefaultEnvVarsFromEnv map[string]string `json:"default-env-vars-from-env" pflag:"-,Additional environment variable that should be injected into every resource"` // Tolerations in the cluster that should be applied for a specific resource // Currently we support simple resource based tolerations only ResourceTolerations map[v1.ResourceName][]v1.Toleration `json:"resource-tolerations" pflag:"-,Default tolerations to be applied for resource of type 'key'"` diff --git a/flyteplugins/go/tasks/pluginmachinery/io/mocks/input_file_paths.go b/flyteplugins/go/tasks/pluginmachinery/io/mocks/input_file_paths.go index 91ea4938a4..23ace1dd90 100644 --- a/flyteplugins/go/tasks/pluginmachinery/io/mocks/input_file_paths.go +++ b/flyteplugins/go/tasks/pluginmachinery/io/mocks/input_file_paths.go @@ -2,10 +2,8 @@ package mocks -import ( - storage "github.com/lyft/flytestdlib/storage" - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" +import storage "github.com/lyft/flytestdlib/storage" // InputFilePaths is an autogenerated mock type for the InputFilePaths type type InputFilePaths struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/io/mocks/input_reader.go b/flyteplugins/go/tasks/pluginmachinery/io/mocks/input_reader.go index 83806da423..44b83e6a7b 100644 --- a/flyteplugins/go/tasks/pluginmachinery/io/mocks/input_reader.go +++ b/flyteplugins/go/tasks/pluginmachinery/io/mocks/input_reader.go @@ -2,15 +2,11 @@ package mocks -import ( - context "context" +import context "context" +import core "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" - core "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" - - mock "github.com/stretchr/testify/mock" - - storage "github.com/lyft/flytestdlib/storage" -) +import mock "github.com/stretchr/testify/mock" +import storage "github.com/lyft/flytestdlib/storage" // InputReader is an autogenerated mock type for the InputReader type type InputReader struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/io/mocks/output_file_paths.go b/flyteplugins/go/tasks/pluginmachinery/io/mocks/output_file_paths.go index bd65a3305f..e8438c59b1 100644 --- a/flyteplugins/go/tasks/pluginmachinery/io/mocks/output_file_paths.go +++ b/flyteplugins/go/tasks/pluginmachinery/io/mocks/output_file_paths.go @@ -2,10 +2,8 @@ package mocks -import ( - storage "github.com/lyft/flytestdlib/storage" - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" +import storage "github.com/lyft/flytestdlib/storage" // OutputFilePaths is an autogenerated mock type for the OutputFilePaths type type OutputFilePaths struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/io/mocks/output_reader.go b/flyteplugins/go/tasks/pluginmachinery/io/mocks/output_reader.go index c2fbb8f2b9..4eab95124b 100644 --- a/flyteplugins/go/tasks/pluginmachinery/io/mocks/output_reader.go +++ b/flyteplugins/go/tasks/pluginmachinery/io/mocks/output_reader.go @@ -2,14 +2,10 @@ package mocks -import ( - context "context" - - core "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" - io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" - - mock "github.com/stretchr/testify/mock" -) +import context "context" +import core "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" +import io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" +import mock "github.com/stretchr/testify/mock" // OutputReader is an autogenerated mock type for the OutputReader type type OutputReader struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/io/mocks/output_writer.go b/flyteplugins/go/tasks/pluginmachinery/io/mocks/output_writer.go index 0414271d01..8f0770e4b0 100644 --- a/flyteplugins/go/tasks/pluginmachinery/io/mocks/output_writer.go +++ b/flyteplugins/go/tasks/pluginmachinery/io/mocks/output_writer.go @@ -2,14 +2,10 @@ package mocks -import ( - context "context" - - io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" - mock "github.com/stretchr/testify/mock" - - storage "github.com/lyft/flytestdlib/storage" -) +import context "context" +import io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" +import mock "github.com/stretchr/testify/mock" +import storage "github.com/lyft/flytestdlib/storage" // OutputWriter is an autogenerated mock type for the OutputWriter type type OutputWriter struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/k8s/mocks/plugin.go b/flyteplugins/go/tasks/pluginmachinery/k8s/mocks/plugin.go index 5439f7f4fc..2ad3481c39 100644 --- a/flyteplugins/go/tasks/pluginmachinery/k8s/mocks/plugin.go +++ b/flyteplugins/go/tasks/pluginmachinery/k8s/mocks/plugin.go @@ -2,14 +2,10 @@ package mocks -import ( - context "context" - - core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" - k8s "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s" - - mock "github.com/stretchr/testify/mock" -) +import context "context" +import core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" +import k8s "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s" +import mock "github.com/stretchr/testify/mock" // Plugin is an autogenerated mock type for the Plugin type type Plugin struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/k8s/mocks/plugin_context.go b/flyteplugins/go/tasks/pluginmachinery/k8s/mocks/plugin_context.go index a9011ddb0b..1009c8c430 100644 --- a/flyteplugins/go/tasks/pluginmachinery/k8s/mocks/plugin_context.go +++ b/flyteplugins/go/tasks/pluginmachinery/k8s/mocks/plugin_context.go @@ -2,12 +2,10 @@ package mocks -import ( - core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" - io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" +import core "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" +import io "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // PluginContext is an autogenerated mock type for the PluginContext type type PluginContext struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/k8s/mocks/resource.go b/flyteplugins/go/tasks/pluginmachinery/k8s/mocks/resource.go index 85bfa9d21f..084e608889 100644 --- a/flyteplugins/go/tasks/pluginmachinery/k8s/mocks/resource.go +++ b/flyteplugins/go/tasks/pluginmachinery/k8s/mocks/resource.go @@ -2,16 +2,11 @@ package mocks -import ( - mock "github.com/stretchr/testify/mock" - runtime "k8s.io/apimachinery/pkg/runtime" - - schema "k8s.io/apimachinery/pkg/runtime/schema" - - types "k8s.io/apimachinery/pkg/types" - - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) +import mock "github.com/stretchr/testify/mock" +import runtime "k8s.io/apimachinery/pkg/runtime" +import schema "k8s.io/apimachinery/pkg/runtime/schema" +import types "k8s.io/apimachinery/pkg/types" +import v1 "k8s.io/apimachinery/pkg/apis/meta/v1" // Resource is an autogenerated mock type for the Resource type type Resource struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/workqueue/mocks/indexed_work_queue.go b/flyteplugins/go/tasks/pluginmachinery/workqueue/mocks/indexed_work_queue.go index b6bd1f12ef..cec17ebf17 100644 --- a/flyteplugins/go/tasks/pluginmachinery/workqueue/mocks/indexed_work_queue.go +++ b/flyteplugins/go/tasks/pluginmachinery/workqueue/mocks/indexed_work_queue.go @@ -2,12 +2,9 @@ package mocks -import ( - context "context" - - workqueue "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/workqueue" - mock "github.com/stretchr/testify/mock" -) +import context "context" +import mock "github.com/stretchr/testify/mock" +import workqueue "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/workqueue" // IndexedWorkQueue is an autogenerated mock type for the IndexedWorkQueue type type IndexedWorkQueue struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/workqueue/mocks/processor.go b/flyteplugins/go/tasks/pluginmachinery/workqueue/mocks/processor.go index 84b8bb9a47..1cb9bce5c0 100644 --- a/flyteplugins/go/tasks/pluginmachinery/workqueue/mocks/processor.go +++ b/flyteplugins/go/tasks/pluginmachinery/workqueue/mocks/processor.go @@ -2,12 +2,9 @@ package mocks -import ( - context "context" - - workqueue "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/workqueue" - mock "github.com/stretchr/testify/mock" -) +import context "context" +import mock "github.com/stretchr/testify/mock" +import workqueue "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/workqueue" // Processor is an autogenerated mock type for the Processor type type Processor struct { diff --git a/flyteplugins/go/tasks/pluginmachinery/workqueue/mocks/work_item_info.go b/flyteplugins/go/tasks/pluginmachinery/workqueue/mocks/work_item_info.go index 261ca9cc24..2aaad131a8 100644 --- a/flyteplugins/go/tasks/pluginmachinery/workqueue/mocks/work_item_info.go +++ b/flyteplugins/go/tasks/pluginmachinery/workqueue/mocks/work_item_info.go @@ -2,10 +2,8 @@ package mocks -import ( - workqueue "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/workqueue" - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" +import workqueue "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/workqueue" // WorkItemInfo is an autogenerated mock type for the WorkItemInfo type type WorkItemInfo struct { diff --git a/flyteplugins/go/tasks/plugins/array/awsbatch/mocks/batch_service_client.go b/flyteplugins/go/tasks/plugins/array/awsbatch/mocks/batch_service_client.go index 810ded0a61..3f6a3d6c61 100644 --- a/flyteplugins/go/tasks/plugins/array/awsbatch/mocks/batch_service_client.go +++ b/flyteplugins/go/tasks/plugins/array/awsbatch/mocks/batch_service_client.go @@ -2,15 +2,10 @@ package mocks -import ( - context "context" - - batch "github.com/aws/aws-sdk-go/service/batch" - - mock "github.com/stretchr/testify/mock" - - request "github.com/aws/aws-sdk-go/aws/request" -) +import batch "github.com/aws/aws-sdk-go/service/batch" +import context "context" +import mock "github.com/stretchr/testify/mock" +import request "github.com/aws/aws-sdk-go/aws/request" // BatchServiceClient is an autogenerated mock type for the BatchServiceClient type type BatchServiceClient struct { diff --git a/flyteplugins/go/tasks/plugins/array/awsbatch/mocks/cache.go b/flyteplugins/go/tasks/plugins/array/awsbatch/mocks/cache.go index c50fb1be35..57e86864f3 100644 --- a/flyteplugins/go/tasks/plugins/array/awsbatch/mocks/cache.go +++ b/flyteplugins/go/tasks/plugins/array/awsbatch/mocks/cache.go @@ -2,10 +2,8 @@ package mocks -import ( - definition "github.com/lyft/flyteplugins/go/tasks/plugins/array/awsbatch/definition" - mock "github.com/stretchr/testify/mock" -) +import definition "github.com/lyft/flyteplugins/go/tasks/plugins/array/awsbatch/definition" +import mock "github.com/stretchr/testify/mock" // Cache is an autogenerated mock type for the Cache type type Cache struct { diff --git a/flyteplugins/go/tasks/plugins/array/awsbatch/mocks/client.go b/flyteplugins/go/tasks/plugins/array/awsbatch/mocks/client.go index 70faae703c..39f2983d37 100644 --- a/flyteplugins/go/tasks/plugins/array/awsbatch/mocks/client.go +++ b/flyteplugins/go/tasks/plugins/array/awsbatch/mocks/client.go @@ -2,13 +2,9 @@ package mocks -import ( - context "context" - - batch "github.com/aws/aws-sdk-go/service/batch" - - mock "github.com/stretchr/testify/mock" -) +import batch "github.com/aws/aws-sdk-go/service/batch" +import context "context" +import mock "github.com/stretchr/testify/mock" // Client is an autogenerated mock type for the Client type type Client struct { diff --git a/flyteplugins/go/tasks/plugins/hive/client/mocks/qubole_client.go b/flyteplugins/go/tasks/plugins/hive/client/mocks/qubole_client.go index 67464e7d1f..9896311992 100644 --- a/flyteplugins/go/tasks/plugins/hive/client/mocks/qubole_client.go +++ b/flyteplugins/go/tasks/plugins/hive/client/mocks/qubole_client.go @@ -2,13 +2,9 @@ package mocks -import ( - context "context" - - client "github.com/lyft/flyteplugins/go/tasks/plugins/hive/client" - - mock "github.com/stretchr/testify/mock" -) +import client "github.com/lyft/flyteplugins/go/tasks/plugins/hive/client" +import context "context" +import mock "github.com/stretchr/testify/mock" // QuboleClient is an autogenerated mock type for the QuboleClient type type QuboleClient struct { diff --git a/flyteplugins/go/tasks/plugins/hive/config/config.go b/flyteplugins/go/tasks/plugins/hive/config/config.go index b29180fc2f..ac502e6a10 100644 --- a/flyteplugins/go/tasks/plugins/hive/config/config.go +++ b/flyteplugins/go/tasks/plugins/hive/config/config.go @@ -25,16 +25,20 @@ func MustParse(s string) config.URL { return config.URL{URL: *r} } +type ClusterConfig struct { + Label string `json:"label" pflag:",Label of the service cluster"` + Limit int `json:"limit" pflag:",Resource quota of the service cluster"` +} + var ( defaultConfig = Config{ Endpoint: MustParse("https://wellness.qubole.com"), CommandAPIPath: MustParse("/api/v1.2/commands/"), AnalyzeLinkPath: MustParse("/v2/analyze"), TokenKey: "FLYTE_QUBOLE_CLIENT_TOKEN", - Limit: 200, LruCacheSize: 2000, Workers: 15, - ClusterLabels: []string{"default"}, + ClusterConfigs: []ClusterConfig{{Label: "default", Limit: 200}}, } quboleConfigSection = pluginsConfig.MustRegisterSubSection(quboleConfigSectionKey, &defaultConfig) @@ -42,15 +46,13 @@ var ( // Qubole plugin configs type Config struct { - Endpoint config.URL `json:"endpoint" pflag:",Endpoint for qubole to use"` - CommandAPIPath config.URL `json:"commandApiPath" pflag:",API Path where commands can be launched on Qubole. Should be a valid url."` - AnalyzeLinkPath config.URL `json:"analyzeLinkPath" pflag:",URL path where queries can be visualized on qubole website. Should be a valid url."` - TokenKey string `json:"quboleTokenKey" pflag:",Name of the key where to find Qubole token in the secret manager."` - Limit int `json:"quboleLimit" pflag:",Global limit for concurrent Qubole queries"` - LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"` - Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"` - HiveConfig map[string]int `json:"clusterLabels" pflag:",List of labels of service clusters"` - PrestoConfig map[string]int + Endpoint config.URL `json:"endpoint" pflag:",Endpoint for qubole to use"` + CommandAPIPath config.URL `json:"commandApiPath" pflag:",API Path where commands can be launched on Qubole. Should be a valid url."` + AnalyzeLinkPath config.URL `json:"analyzeLinkPath" pflag:",URL path where queries can be visualized on qubole website. Should be a valid url."` + TokenKey string `json:"quboleTokenKey" pflag:",Name of the key where to find Qubole token in the secret manager."` + LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"` + Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"` + ClusterConfigs []ClusterConfig `json:"clusterConfigs" pflag:"-,List of cluster configs. Each of the configs corresponds to a service cluster"` } // Retrieves the current config value or default. diff --git a/flyteplugins/go/tasks/plugins/hive/config/config_flags.go b/flyteplugins/go/tasks/plugins/hive/config/config_flags.go index 3513172f9f..295bd5e2b7 100755 --- a/flyteplugins/go/tasks/plugins/hive/config/config_flags.go +++ b/flyteplugins/go/tasks/plugins/hive/config/config_flags.go @@ -45,7 +45,6 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "commandApiPath"), defaultConfig.CommandAPIPath.String(), "API Path where commands can be launched on Qubole. Should be a valid url.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "analyzeLinkPath"), defaultConfig.AnalyzeLinkPath.String(), "URL path where queries can be visualized on qubole website. Should be a valid url.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "quboleTokenKey"), defaultConfig.TokenKey, "Name of the key where to find Qubole token in the secret manager.") - cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "quboleLimit"), defaultConfig.Limit, "Global limit for concurrent Qubole queries") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "lruCacheSize"), defaultConfig.LruCacheSize, "Size of the AutoRefreshCache") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "workers"), defaultConfig.Workers, "Number of parallel workers to refresh the cache") return cmdFlags diff --git a/flyteplugins/go/tasks/plugins/hive/config/config_flags_test.go b/flyteplugins/go/tasks/plugins/hive/config/config_flags_test.go index 90834522ec..2ec62b1d29 100755 --- a/flyteplugins/go/tasks/plugins/hive/config/config_flags_test.go +++ b/flyteplugins/go/tasks/plugins/hive/config/config_flags_test.go @@ -187,28 +187,6 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_quboleLimit", func(t *testing.T) { - t.Run("DefaultValue", func(t *testing.T) { - // Test that default value is set properly - if vInt, err := cmdFlags.GetInt("quboleLimit"); err == nil { - assert.Equal(t, int(defaultConfig.Limit), vInt) - } else { - assert.FailNow(t, err.Error()) - } - }) - - t.Run("Override", func(t *testing.T) { - testValue := "1" - - cmdFlags.Set("quboleLimit", testValue) - if vInt, err := cmdFlags.GetInt("quboleLimit"); err == nil { - testDecodeJson_Config(t, fmt.Sprintf("%v", vInt), &actual.Limit) - - } else { - assert.FailNow(t, err.Error()) - } - }) - }) t.Run("Test_lruCacheSize", func(t *testing.T) { t.Run("DefaultValue", func(t *testing.T) { // Test that default value is set properly diff --git a/flyteplugins/go/tasks/plugins/hive/execution_state_test.go b/flyteplugins/go/tasks/plugins/hive/execution_state_test.go index a9abb756da..f874947907 100644 --- a/flyteplugins/go/tasks/plugins/hive/execution_state_test.go +++ b/flyteplugins/go/tasks/plugins/hive/execution_state_test.go @@ -168,7 +168,7 @@ func TestGetAllocationToken(t *testing.T) { x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything). Return(core.AllocationStatusGranted, nil) - state, err := GetAllocationToken(ctx, quboleResourceNamespace, tCtx) + state, err := GetAllocationToken(ctx, quboleHiveExecutorId, tCtx) assert.NoError(t, err) assert.Equal(t, PhaseQueued, state.Phase) }) @@ -180,7 +180,7 @@ func TestGetAllocationToken(t *testing.T) { x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything). Return(core.AllocationStatusExhausted, nil) - state, err := GetAllocationToken(ctx, quboleResourceNamespace, tCtx) + state, err := GetAllocationToken(ctx, quboleHiveExecutorId, tCtx) assert.NoError(t, err) assert.Equal(t, PhaseNotStarted, state.Phase) }) @@ -192,7 +192,7 @@ func TestGetAllocationToken(t *testing.T) { x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything). Return(core.AllocationStatusNamespaceQuotaExceeded, nil) - state, err := GetAllocationToken(ctx, quboleResourceNamespace, tCtx) + state, err := GetAllocationToken(ctx, quboleHiveExecutorId, tCtx) assert.NoError(t, err) assert.Equal(t, PhaseNotStarted, state.Phase) }) @@ -243,7 +243,7 @@ func TestFinalize(t *testing.T) { called = true }).Return(nil) - err := Finalize(ctx, tCtx, quboleResourceNamespace, state) + err := Finalize(ctx, tCtx, quboleHiveExecutorId, state) assert.NoError(t, err) assert.True(t, called) } diff --git a/flyteplugins/go/tasks/plugins/hive/executor.go b/flyteplugins/go/tasks/plugins/hive/executor.go index 81772015c0..91e9bb23dd 100644 --- a/flyteplugins/go/tasks/plugins/hive/executor.go +++ b/flyteplugins/go/tasks/plugins/hive/executor.go @@ -22,21 +22,23 @@ const quboleHiveExecutorId = "qubole-hive-executor" const pluginStateVersion = 0 const hiveTaskType = "hive" // This needs to match the type defined in Flytekit constants.py -const quboleResourceNamespace core.ResourceNamespace = "qubole" type QuboleHiveExecutor struct { - id string - metrics QuboleHiveExecutorMetrics - quboleClient client.QuboleClient - executionsCache cache.AutoRefresh - cfg *config.Config - resourceNamespace core.ResourceNamespace + id string + metrics QuboleHiveExecutorMetrics + quboleClient client.QuboleClient + executionsCache cache.AutoRefresh + cfg *config.Config } func (q QuboleHiveExecutor) GetID() string { return q.id } +func (q QuboleHiveExecutor) GetResourceNamespace() core.ResourceNamespace { + return core.ResourceNamespace(q.GetID()) +} + func (q QuboleHiveExecutor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) { incomingState := ExecutionState{} @@ -52,7 +54,7 @@ func (q QuboleHiveExecutor) Handle(ctx context.Context, tCtx core.TaskExecutionC // Do what needs to be done, and give this function everything it needs to do its job properly // TODO: Play around with making this return a transition directly. How will that pattern affect the multi-Qubole plugin outgoingState, transformError := HandleExecutionState(ctx, tCtx, incomingState, q.quboleClient, q.executionsCache, - q.resourceNamespace, q.cfg) + q.GetResourceNamespace(), q.cfg) // Return if there was an error if transformError != nil { @@ -94,7 +96,7 @@ func (q QuboleHiveExecutor) Finalize(ctx context.Context, tCtx core.TaskExecutio return errors.Wrapf(errors.CorruptedPluginState, err, "Failed to unmarshal custom state in Finalize") } - return Finalize(ctx, tCtx, q.resourceNamespace, incomingState) + return Finalize(ctx, tCtx, q.GetResourceNamespace(), incomingState) } func (q QuboleHiveExecutor) GetProperties() core.PluginProperties { @@ -103,37 +105,38 @@ func (q QuboleHiveExecutor) GetProperties() core.PluginProperties { func QuboleHiveExecutorLoader(ctx context.Context, iCtx core.SetupContext) (core.Plugin, error) { cfg := config.GetQuboleConfig() - return InitializeHiveExecutor(ctx, iCtx, BuildResourceConfig(cfg), client.NewQuboleClient(cfg)) + return InitializeHiveExecutor(ctx, iCtx, cfg, quboleHiveExecutorId, BuildResourceConfig(cfg), client.NewQuboleClient(cfg)) } -func BuildResourceConfig() { - resourceConfig := make(map[string]int, len(cfg.ClusterLabels)) +func BuildResourceConfig(cfg *config.Config) map[string]int { + resourceConfig := make(map[string]int, len(cfg.ClusterConfigs)) - for _, cluster := range cfg.Presto { - resourceConfig[cluster] = cfg.Limit + for _, clusterCfg := range cfg.ClusterConfigs { + resourceConfig[clusterCfg.Label] = clusterCfg.Limit } + return resourceConfig } -func InitializeHiveExecutor(ctx context.Context, iCtx core.SetupContext, resourceConfig map[string]int, +func InitializeHiveExecutor(ctx context.Context, iCtx core.SetupContext, cfg *config.Config, resourceNamespace core.ResourceNamespace, resourceConfig map[string]int, quboleClient client.QuboleClient) (core.Plugin, error) { - q, err := NewQuboleHiveExecutor(ctx, cfg, quboleClient, quboleResourceNamespace, iCtx.SecretManager(), iCtx.MetricsScope()) + + q, err := NewQuboleHiveExecutor(ctx, cfg, quboleClient, iCtx.SecretManager(), iCtx.MetricsScope()) if err != nil { return nil, err } - for clusterName, limit := range resourceConfig { - if err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, clusterName, limit); err != nil { + for clusterLabel, clusterLimit := range resourceConfig { + namespaceWithClusterLabel := resourceNamespace.CreateSubNamespace(core.ResourceNamespace(clusterLabel)) + if err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, namespaceWithClusterLabel, clusterLimit); err != nil { return nil, err } } return q, nil - } // type PluginLoader func(ctx context.Context, iCtx SetupContext) (Plugin, error) -func NewQuboleHiveExecutor(ctx context.Context, cfg *config.Config, quboleClient client.QuboleClient, resourceNamespace core.ResourceNamespace, - secretManager core.SecretManager, scope promutils.Scope) (QuboleHiveExecutor, error) { +func NewQuboleHiveExecutor(ctx context.Context, cfg *config.Config, quboleClient client.QuboleClient, secretManager core.SecretManager, scope promutils.Scope) (QuboleHiveExecutor, error) { executionsAutoRefreshCache, err := NewQuboleHiveExecutionsCache(ctx, quboleClient, secretManager, cfg, scope.NewSubScope(hiveTaskType)) if err != nil { logger.Errorf(ctx, "Failed to create AutoRefreshCache in QuboleHiveExecutor Setup. Error: %v", err) @@ -146,12 +149,11 @@ func NewQuboleHiveExecutor(ctx context.Context, cfg *config.Config, quboleClient } return QuboleHiveExecutor{ - id: quboleHiveExecutorId, - cfg: cfg, - resourceNamespace: resourceNamespace, - metrics: getQuboleHiveExecutorMetrics(scope), - quboleClient: quboleClient, - executionsCache: executionsAutoRefreshCache, + id: quboleHiveExecutorId, + cfg: cfg, + metrics: getQuboleHiveExecutorMetrics(scope), + quboleClient: quboleClient, + executionsCache: executionsAutoRefreshCache, }, nil } From 3e97a32da0eb12e6db69ae93ee5cd2e1ca335e01 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Mon, 6 Jan 2020 13:15:47 -0800 Subject: [PATCH 03/16] Refactor to simplify the resource namespace for resource manager --- .../core/mocks/resource_manager.go | 32 +++++++++++++++++++ .../pluginmachinery/core/resource_manager.go | 1 + 2 files changed, 33 insertions(+) diff --git a/flyteplugins/go/tasks/pluginmachinery/core/mocks/resource_manager.go b/flyteplugins/go/tasks/pluginmachinery/core/mocks/resource_manager.go index eefa650c8c..3657a69580 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/mocks/resource_manager.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/mocks/resource_manager.go @@ -50,6 +50,38 @@ func (_m *ResourceManager) AllocateResource(ctx context.Context, namespace core. return r0, r1 } +type ResourceManager_GetID struct { + *mock.Call +} + +func (_m ResourceManager_GetID) Return(_a0 string) *ResourceManager_GetID { + return &ResourceManager_GetID{Call: _m.Call.Return(_a0)} +} + +func (_m *ResourceManager) OnGetID() *ResourceManager_GetID { + c := _m.On("GetID") + return &ResourceManager_GetID{Call: c} +} + +func (_m *ResourceManager) OnGetIDMatch(matchers ...interface{}) *ResourceManager_GetID { + c := _m.On("GetID", matchers...) + return &ResourceManager_GetID{Call: c} +} + +// GetID provides a mock function with given fields: +func (_m *ResourceManager) GetID() string { + ret := _m.Called() + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + type ResourceManager_ReleaseResource struct { *mock.Call } diff --git a/flyteplugins/go/tasks/pluginmachinery/core/resource_manager.go b/flyteplugins/go/tasks/pluginmachinery/core/resource_manager.go index 4f463cb30f..ed8b233b42 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/resource_manager.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/resource_manager.go @@ -35,6 +35,7 @@ type ResourceRegistrar interface { // Resource Manager manages a single resource type, and each allocation is of size one type ResourceManager interface { + GetID() string AllocateResource(ctx context.Context, namespace ResourceNamespace, allocationToken string) (AllocationStatus, error) ReleaseResource(ctx context.Context, namespace ResourceNamespace, allocationToken string) error } From e096fc08d662ae744846aa5beb72d7559f48f402 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Tue, 7 Jan 2020 13:47:09 -0800 Subject: [PATCH 04/16] Add unit to pflag of ClusterConfig.Limit --- flyteplugins/go/tasks/plugins/hive/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flyteplugins/go/tasks/plugins/hive/config/config.go b/flyteplugins/go/tasks/plugins/hive/config/config.go index ac502e6a10..5c988e630f 100644 --- a/flyteplugins/go/tasks/plugins/hive/config/config.go +++ b/flyteplugins/go/tasks/plugins/hive/config/config.go @@ -27,7 +27,7 @@ func MustParse(s string) config.URL { type ClusterConfig struct { Label string `json:"label" pflag:",Label of the service cluster"` - Limit int `json:"limit" pflag:",Resource quota of the service cluster"` + Limit int `json:"limit" pflag:",Resource quota (in the number of outstanding requests) of the service cluster"` } var ( From d5a144f3a71838f0cc428e4becac8662dea28149 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Wed, 8 Jan 2020 10:47:16 -0800 Subject: [PATCH 05/16] increase quota for default qubole cluster --- flyteplugins/go/tasks/plugins/hive/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flyteplugins/go/tasks/plugins/hive/config/config.go b/flyteplugins/go/tasks/plugins/hive/config/config.go index 5c988e630f..a8d98171d6 100644 --- a/flyteplugins/go/tasks/plugins/hive/config/config.go +++ b/flyteplugins/go/tasks/plugins/hive/config/config.go @@ -38,7 +38,7 @@ var ( TokenKey: "FLYTE_QUBOLE_CLIENT_TOKEN", LruCacheSize: 2000, Workers: 15, - ClusterConfigs: []ClusterConfig{{Label: "default", Limit: 200}}, + ClusterConfigs: []ClusterConfig{{Label: "default", Limit: 250}}, } quboleConfigSection = pluginsConfig.MustRegisterSubSection(quboleConfigSectionKey, &defaultConfig) From 3fac7a1bdf2b9fba91103cc186458aaae5632dc6 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Wed, 8 Jan 2020 15:46:34 -0800 Subject: [PATCH 06/16] Introducing primary cluster label. 1. Adding Primary cluster label to ClusterConfig; making the labels field of ClusterConfig a list that mirrors the labels in Qubole. 2. Adding project destination cluster config section to qubole config. 3. Adding the logic that makes sure we always use primary cluster label to execute hive request and do resource management --- flyteplugins/go/tasks/logs/config.go | 1 - .../go/tasks/plugins/array/k8s/transformer.go | 1 + .../plugins/hive/client/qubole_client.go | 6 +- .../go/tasks/plugins/hive/config/config.go | 41 ++++++++----- .../go/tasks/plugins/hive/execution_state.go | 61 ++++++++++++++++--- .../go/tasks/plugins/hive/executor.go | 10 +-- 6 files changed, 86 insertions(+), 34 deletions(-) diff --git a/flyteplugins/go/tasks/logs/config.go b/flyteplugins/go/tasks/logs/config.go index e47202c71a..d52c9e6eec 100755 --- a/flyteplugins/go/tasks/logs/config.go +++ b/flyteplugins/go/tasks/logs/config.go @@ -30,4 +30,3 @@ func GetLogConfig() *LogConfig { func SetLogConfig(logConfig *LogConfig) error { return logConfigSection.SetConfig(logConfig) } - diff --git a/flyteplugins/go/tasks/plugins/array/k8s/transformer.go b/flyteplugins/go/tasks/plugins/array/k8s/transformer.go index 9eb7e83f2f..3a7695f3d9 100644 --- a/flyteplugins/go/tasks/plugins/array/k8s/transformer.go +++ b/flyteplugins/go/tasks/plugins/array/k8s/transformer.go @@ -2,6 +2,7 @@ package k8s import ( "context" + core2 "github.com/lyft/flyteplugins/go/tasks/plugins/array/core" "github.com/lyft/flytestdlib/storage" diff --git a/flyteplugins/go/tasks/plugins/hive/client/qubole_client.go b/flyteplugins/go/tasks/plugins/hive/client/qubole_client.go index a30763ccdc..2549e39d02 100644 --- a/flyteplugins/go/tasks/plugins/hive/client/qubole_client.go +++ b/flyteplugins/go/tasks/plugins/hive/client/qubole_client.go @@ -62,7 +62,7 @@ type RequestBody struct { // Interface to interact with QuboleClient for hive tasks type QuboleClient interface { - ExecuteHiveCommand(ctx context.Context, commandStr string, timeoutVal uint32, clusterLabel string, + ExecuteHiveCommand(ctx context.Context, commandStr string, timeoutVal uint32, clusterPrimaryLabel string, accountKey string, tags []string) (*QuboleCommandDetails, error) KillCommand(ctx context.Context, commandID string, accountKey string) error GetCommandStatus(ctx context.Context, commandID string, accountKey string) (QuboleStatus, error) @@ -167,7 +167,7 @@ func (q *quboleClient) ExecuteHiveCommand( ctx context.Context, commandStr string, timeoutVal uint32, - clusterLabel string, + clusterPrimaryLabel string, accountKey string, tags []string) (*QuboleCommandDetails, error) { @@ -175,7 +175,7 @@ func (q *quboleClient) ExecuteHiveCommand( CommandType: hiveCommandType, Query: commandStr, Timeout: timeoutVal, - ClusterLabel: clusterLabel, + ClusterLabel: clusterPrimaryLabel, Tags: tags, } diff --git a/flyteplugins/go/tasks/plugins/hive/config/config.go b/flyteplugins/go/tasks/plugins/hive/config/config.go index a8d98171d6..b31b09dbee 100644 --- a/flyteplugins/go/tasks/plugins/hive/config/config.go +++ b/flyteplugins/go/tasks/plugins/hive/config/config.go @@ -26,19 +26,27 @@ func MustParse(s string) config.URL { } type ClusterConfig struct { - Label string `json:"label" pflag:",Label of the service cluster"` - Limit int `json:"limit" pflag:",Resource quota (in the number of outstanding requests) of the service cluster"` + PrimaryLabel string `json:"primaryLabel" pflag:",The primary label of a given service cluster"` + Labels []string `json:"labels" pflag:",Labels of a given service cluster"` + Limit int `json:"limit" pflag:",Resource quota (in the number of outstanding requests) of the service cluster"` +} + +type ProjectDestinationClusterConfig struct { + Project string `json:"project" pflag:",Project of the task which the query belongs to"` + Domain string `json:"domain" pflag:",Domain of the task which the query belongs to"` + ClusterLabel string `json:"clusterLabel" pflag:",The label of the destination cluster this query to be submitted to"` } var ( defaultConfig = Config{ - Endpoint: MustParse("https://wellness.qubole.com"), - CommandAPIPath: MustParse("/api/v1.2/commands/"), - AnalyzeLinkPath: MustParse("/v2/analyze"), - TokenKey: "FLYTE_QUBOLE_CLIENT_TOKEN", - LruCacheSize: 2000, - Workers: 15, - ClusterConfigs: []ClusterConfig{{Label: "default", Limit: 250}}, + Endpoint: MustParse("https://wellness.qubole.com"), + CommandAPIPath: MustParse("/api/v1.2/commands/"), + AnalyzeLinkPath: MustParse("/v2/analyze"), + TokenKey: "FLYTE_QUBOLE_CLIENT_TOKEN", + LruCacheSize: 2000, + Workers: 15, + ClusterConfigs: []ClusterConfig{{PrimaryLabel: "default", Labels: []string{"default"}, Limit: 250}}, + ProjectDestinationClusterConfigs: []ProjectDestinationClusterConfig{}, } quboleConfigSection = pluginsConfig.MustRegisterSubSection(quboleConfigSectionKey, &defaultConfig) @@ -46,13 +54,14 @@ var ( // Qubole plugin configs type Config struct { - Endpoint config.URL `json:"endpoint" pflag:",Endpoint for qubole to use"` - CommandAPIPath config.URL `json:"commandApiPath" pflag:",API Path where commands can be launched on Qubole. Should be a valid url."` - AnalyzeLinkPath config.URL `json:"analyzeLinkPath" pflag:",URL path where queries can be visualized on qubole website. Should be a valid url."` - TokenKey string `json:"quboleTokenKey" pflag:",Name of the key where to find Qubole token in the secret manager."` - LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"` - Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"` - ClusterConfigs []ClusterConfig `json:"clusterConfigs" pflag:"-,List of cluster configs. Each of the configs corresponds to a service cluster"` + Endpoint config.URL `json:"endpoint" pflag:",Endpoint for qubole to use"` + CommandAPIPath config.URL `json:"commandApiPath" pflag:",API Path where commands can be launched on Qubole. Should be a valid url."` + AnalyzeLinkPath config.URL `json:"analyzeLinkPath" pflag:",URL path where queries can be visualized on qubole website. Should be a valid url."` + TokenKey string `json:"quboleTokenKey" pflag:",PrimaryLabel of the key where to find Qubole token in the secret manager."` + LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"` + Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"` + ClusterConfigs []ClusterConfig `json:"clusterConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"` + ProjectDestinationClusterConfigs []ProjectDestinationClusterConfig `json:"projectDestinationClusterConfigs" pflag:"-,A list configs specifying the destination service cluster for (project, domain)"` } // Retrieves the current config value or default. diff --git a/flyteplugins/go/tasks/plugins/hive/execution_state.go b/flyteplugins/go/tasks/plugins/hive/execution_state.go index 787c714347..28df2a1cc7 100644 --- a/flyteplugins/go/tasks/plugins/hive/execution_state.go +++ b/flyteplugins/go/tasks/plugins/hive/execution_state.go @@ -141,24 +141,25 @@ func ConstructTaskInfo(e ExecutionState) *core.TaskInfo { return nil } -func composeResourceNamespaceWithClusterLabel(ctx context.Context, tCtx core.TaskExecutionContext, resourceNamespace core.ResourceNamespace) (core.ResourceNamespace, error) { - _, clusterLabel, _, _, err := GetQueryInfo(ctx, tCtx) +func composeResourceNamespaceWithClusterPrimaryLabel(ctx context.Context, tCtx core.TaskExecutionContext, resourceNamespace core.ResourceNamespace) (core.ResourceNamespace, error) { + _, clusterLabelOverride, _, _, err := GetQueryInfo(ctx, tCtx) if err != nil { return resourceNamespace, err } - return resourceNamespace.CreateSubNamespace(core.ResourceNamespace(clusterLabel)), nil + clusterPrimaryLabel := getClusterPrimaryLabel(ctx, tCtx, clusterLabelOverride) + return resourceNamespace.CreateSubNamespace(core.ResourceNamespace(clusterPrimaryLabel)), nil } func GetAllocationToken(ctx context.Context, resourceNamespace core.ResourceNamespace, tCtx core.TaskExecutionContext) (ExecutionState, error) { newState := ExecutionState{} uniqueId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() - resourceNamespaceWithClusterLabel, err := composeResourceNamespaceWithClusterLabel(ctx, tCtx, resourceNamespace) + resourceNamespaceWithClusterPrimaryLabel, err := composeResourceNamespaceWithClusterPrimaryLabel(ctx, tCtx, resourceNamespace) if err != nil { return newState, errors.Wrapf(errors.ResourceManagerFailure, err, "Error getting query info when requesting allocation token %s", uniqueId) } - allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, resourceNamespaceWithClusterLabel, uniqueId) + allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, resourceNamespaceWithClusterPrimaryLabel, uniqueId) if err != nil { logger.Errorf(ctx, "Resource manager failed for TaskExecId [%s] token [%s]. error %s", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID(), uniqueId, err) @@ -216,10 +217,48 @@ func GetQueryInfo(ctx context.Context, tCtx core.TaskExecutionContext) ( for k, v := range tCtx.TaskExecutionMetadata().GetLabels() { tags = append(tags, fmt.Sprintf("%s:%s", k, v)) } - + logger.Debugf(ctx, "QueryInfo: query: [%v], cluster: [%v], timeoutSec: [%v], tags: [%v]", query, cluster, timeoutSec, tags) return } +func mapLabelToPrimaryLabel(quboleCfg *config.Config, label string) string { + if label == "" { + return DefaultClusterPrimaryLabel + } + // Using a linear search because N is small and because of ClusterConfig's struct definition + // which is determined specifically for the readability of the corresponding configmap yaml file + for _, clusterCfg := range quboleCfg.ClusterConfigs { + for _, l := range clusterCfg.Labels { + if l == label { + return clusterCfg.PrimaryLabel + } + } + } + return DefaultClusterPrimaryLabel +} + +func getClusterPrimaryLabel(ctx context.Context, tCtx core.TaskExecutionContext, labelOverride string) string { + cfg := config.GetQuboleConfig() + label := "" + // If there's no override, we look up in our mapping to find the proper primary cluster label according to the project and the domain + if labelOverride == "" { + project := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID().GetTaskId().GetProject() + domain := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID().GetTaskId().GetDomain() + // Using a linear search because N is small + for _, m := range cfg.ProjectDestinationClusterConfigs { + if project == m.Project && domain == m.Domain { + label = m.ClusterLabel + break + } + } + } else { + label = labelOverride + } + primaryLabel := mapLabelToPrimaryLabel(cfg, label) + logger.Debugf(ctx, "Cluster label override = [%v]. Getting the primary label of label [%v] = [%v]", labelOverride, label, primaryLabel) + return primaryLabel +} + func KickOffQuery(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, quboleClient client.QuboleClient, cache cache.AutoRefresh, cfg *config.Config) (ExecutionState, error) { @@ -229,13 +268,15 @@ func KickOffQuery(ctx context.Context, tCtx core.TaskExecutionContext, currentSt return currentState, errors.Wrapf(errors.RuntimeFailure, err, "Failed to read token from secrets manager") } - query, cluster, tags, timeoutSec, err := GetQueryInfo(ctx, tCtx) + query, clusterLabelOverride, tags, timeoutSec, err := GetQueryInfo(ctx, tCtx) if err != nil { return currentState, err } + clusterPrimaryLabel := getClusterPrimaryLabel(ctx, tCtx, clusterLabelOverride) + cmdDetails, err := quboleClient.ExecuteHiveCommand(ctx, query, timeoutSec, - cluster, apiKey, tags) + clusterPrimaryLabel, apiKey, tags) if err != nil { // If we failed, we'll keep the NotStarted state currentState.CreationFailureCount = currentState.CreationFailureCount + 1 @@ -313,12 +354,12 @@ func Abort(ctx context.Context, tCtx core.TaskExecutionContext, currentState Exe func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, resourceNamespace core.ResourceNamespace, _ ExecutionState) error { // Release allocation token uniqueId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() - resourceNamespaceWithClusterLabel, err := composeResourceNamespaceWithClusterLabel(ctx, tCtx, resourceNamespace) + resourceNamespaceWithClusterPrimaryLabel, err := composeResourceNamespaceWithClusterPrimaryLabel(ctx, tCtx, resourceNamespace) if err != nil { return errors.Wrapf(errors.ResourceManagerFailure, err, "Error getting query info when releasing allocation token %s", uniqueId) } - err = tCtx.ResourceManager().ReleaseResource(ctx, resourceNamespaceWithClusterLabel, uniqueId) + err = tCtx.ResourceManager().ReleaseResource(ctx, resourceNamespaceWithClusterPrimaryLabel, uniqueId) if err != nil { logger.Errorf(ctx, "Error releasing allocation token [%s] in Finalize [%s]", uniqueId, err) diff --git a/flyteplugins/go/tasks/plugins/hive/executor.go b/flyteplugins/go/tasks/plugins/hive/executor.go index 91e9bb23dd..f6891a1f13 100644 --- a/flyteplugins/go/tasks/plugins/hive/executor.go +++ b/flyteplugins/go/tasks/plugins/hive/executor.go @@ -23,6 +23,8 @@ const pluginStateVersion = 0 const hiveTaskType = "hive" // This needs to match the type defined in Flytekit constants.py +const DefaultClusterPrimaryLabel = "default" + type QuboleHiveExecutor struct { id string metrics QuboleHiveExecutorMetrics @@ -112,7 +114,7 @@ func BuildResourceConfig(cfg *config.Config) map[string]int { resourceConfig := make(map[string]int, len(cfg.ClusterConfigs)) for _, clusterCfg := range cfg.ClusterConfigs { - resourceConfig[clusterCfg.Label] = clusterCfg.Limit + resourceConfig[clusterCfg.PrimaryLabel] = clusterCfg.Limit } return resourceConfig } @@ -125,9 +127,9 @@ func InitializeHiveExecutor(ctx context.Context, iCtx core.SetupContext, cfg *co return nil, err } - for clusterLabel, clusterLimit := range resourceConfig { - namespaceWithClusterLabel := resourceNamespace.CreateSubNamespace(core.ResourceNamespace(clusterLabel)) - if err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, namespaceWithClusterLabel, clusterLimit); err != nil { + for clusterPrimaryLabel, clusterLimit := range resourceConfig { + namespaceWithClusterPrimaryLabel := resourceNamespace.CreateSubNamespace(core.ResourceNamespace(clusterPrimaryLabel)) + if err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, namespaceWithClusterPrimaryLabel, clusterLimit); err != nil { return nil, err } } From 0529c684dc22b8cc44a8d506386582ece5851338 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Wed, 8 Jan 2020 16:09:02 -0800 Subject: [PATCH 07/16] fix build error and improve logging --- .../go/tasks/plugins/hive/execution_state.go | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/hive/execution_state.go b/flyteplugins/go/tasks/plugins/hive/execution_state.go index 28df2a1cc7..1e0d74d7e1 100644 --- a/flyteplugins/go/tasks/plugins/hive/execution_state.go +++ b/flyteplugins/go/tasks/plugins/hive/execution_state.go @@ -221,8 +221,9 @@ func GetQueryInfo(ctx context.Context, tCtx core.TaskExecutionContext) ( return } -func mapLabelToPrimaryLabel(quboleCfg *config.Config, label string) string { +func mapLabelToPrimaryLabel(ctx context.Context, quboleCfg *config.Config, label string) string { if label == "" { + logger.Debugf(ctx, "Input cluster label is an empty string; falling back to using the default primary label [%v]", label, DefaultClusterPrimaryLabel) return DefaultClusterPrimaryLabel } // Using a linear search because N is small and because of ClusterConfig's struct definition @@ -230,32 +231,39 @@ func mapLabelToPrimaryLabel(quboleCfg *config.Config, label string) string { for _, clusterCfg := range quboleCfg.ClusterConfigs { for _, l := range clusterCfg.Labels { if l == label { + logger.Debugf(ctx, "Found the primary label [%v] for label [%v]", clusterCfg.PrimaryLabel, label) return clusterCfg.PrimaryLabel } } } + logger.Debugf(ctx, "Cannot find the primary cluster label for label [%v] in configmap; "+ + "falling back to using the default primary label [%v]", label, DefaultClusterPrimaryLabel) return DefaultClusterPrimaryLabel } -func getClusterPrimaryLabel(ctx context.Context, tCtx core.TaskExecutionContext, labelOverride string) string { +func getClusterPrimaryLabel(ctx context.Context, tCtx core.TaskExecutionContext, clusterLabelOverride string) string { cfg := config.GetQuboleConfig() - label := "" - // If there's no override, we look up in our mapping to find the proper primary cluster label according to the project and the domain - if labelOverride == "" { - project := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID().GetTaskId().GetProject() - domain := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID().GetTaskId().GetDomain() + clusterLabel := "" + // If there's no override, we look up in our mapping to find the proper primary cluster clusterLabel according to the project and the domain + if clusterLabelOverride == "" { + tExecId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID() + project := tExecId.NodeExecutionId.GetExecutionId().GetProject() + domain := tExecId.NodeExecutionId.GetExecutionId().GetDomain() + logger.Debugf(ctx, "No clusterLabelOverride. Finding the pre-defined cluster label for (project: %v, domain: %v)", project, domain) // Using a linear search because N is small for _, m := range cfg.ProjectDestinationClusterConfigs { if project == m.Project && domain == m.Domain { - label = m.ClusterLabel + clusterLabel = m.ClusterLabel + logger.Debugf(ctx, "Found the pre-defined cluster label [%v] for (project: %v, domain: %v)", clusterLabel, project, domain) break } } } else { - label = labelOverride + clusterLabel = clusterLabelOverride + logger.Debugf(ctx, "clusterLabelOverride exists = [%v]. Using it as clusterLabel = [%v]", clusterLabelOverride, clusterLabel) } - primaryLabel := mapLabelToPrimaryLabel(cfg, label) - logger.Debugf(ctx, "Cluster label override = [%v]. Getting the primary label of label [%v] = [%v]", labelOverride, label, primaryLabel) + primaryLabel := mapLabelToPrimaryLabel(ctx, cfg, clusterLabel) + logger.Debugf(ctx, "Cluster clusterLabel override = [%v]. Getting the primary clusterLabel of clusterLabel [%v] = [%v]", clusterLabelOverride, clusterLabel, primaryLabel) return primaryLabel } From 277668781f5f111168c7e918cbed08bea23d7dba Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Wed, 8 Jan 2020 16:13:36 -0800 Subject: [PATCH 08/16] revert a wrong pflag and run make generate again --- .../plugins/hive/client/mocks/qubole_client.go | 14 +++++++------- .../go/tasks/plugins/hive/config/config.go | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/hive/client/mocks/qubole_client.go b/flyteplugins/go/tasks/plugins/hive/client/mocks/qubole_client.go index 9896311992..c22c833071 100644 --- a/flyteplugins/go/tasks/plugins/hive/client/mocks/qubole_client.go +++ b/flyteplugins/go/tasks/plugins/hive/client/mocks/qubole_client.go @@ -19,8 +19,8 @@ func (_m QuboleClient_ExecuteHiveCommand) Return(_a0 *client.QuboleCommandDetail return &QuboleClient_ExecuteHiveCommand{Call: _m.Call.Return(_a0, _a1)} } -func (_m *QuboleClient) OnExecuteHiveCommand(ctx context.Context, commandStr string, timeoutVal uint32, clusterLabel string, accountKey string, tags []string) *QuboleClient_ExecuteHiveCommand { - c := _m.On("ExecuteHiveCommand", ctx, commandStr, timeoutVal, clusterLabel, accountKey, tags) +func (_m *QuboleClient) OnExecuteHiveCommand(ctx context.Context, commandStr string, timeoutVal uint32, clusterPrimaryLabel string, accountKey string, tags []string) *QuboleClient_ExecuteHiveCommand { + c := _m.On("ExecuteHiveCommand", ctx, commandStr, timeoutVal, clusterPrimaryLabel, accountKey, tags) return &QuboleClient_ExecuteHiveCommand{Call: c} } @@ -29,13 +29,13 @@ func (_m *QuboleClient) OnExecuteHiveCommandMatch(matchers ...interface{}) *Qubo return &QuboleClient_ExecuteHiveCommand{Call: c} } -// ExecuteHiveCommand provides a mock function with given fields: ctx, commandStr, timeoutVal, clusterLabel, accountKey, tags -func (_m *QuboleClient) ExecuteHiveCommand(ctx context.Context, commandStr string, timeoutVal uint32, clusterLabel string, accountKey string, tags []string) (*client.QuboleCommandDetails, error) { - ret := _m.Called(ctx, commandStr, timeoutVal, clusterLabel, accountKey, tags) +// ExecuteHiveCommand provides a mock function with given fields: ctx, commandStr, timeoutVal, clusterPrimaryLabel, accountKey, tags +func (_m *QuboleClient) ExecuteHiveCommand(ctx context.Context, commandStr string, timeoutVal uint32, clusterPrimaryLabel string, accountKey string, tags []string) (*client.QuboleCommandDetails, error) { + ret := _m.Called(ctx, commandStr, timeoutVal, clusterPrimaryLabel, accountKey, tags) var r0 *client.QuboleCommandDetails if rf, ok := ret.Get(0).(func(context.Context, string, uint32, string, string, []string) *client.QuboleCommandDetails); ok { - r0 = rf(ctx, commandStr, timeoutVal, clusterLabel, accountKey, tags) + r0 = rf(ctx, commandStr, timeoutVal, clusterPrimaryLabel, accountKey, tags) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*client.QuboleCommandDetails) @@ -44,7 +44,7 @@ func (_m *QuboleClient) ExecuteHiveCommand(ctx context.Context, commandStr strin var r1 error if rf, ok := ret.Get(1).(func(context.Context, string, uint32, string, string, []string) error); ok { - r1 = rf(ctx, commandStr, timeoutVal, clusterLabel, accountKey, tags) + r1 = rf(ctx, commandStr, timeoutVal, clusterPrimaryLabel, accountKey, tags) } else { r1 = ret.Error(1) } diff --git a/flyteplugins/go/tasks/plugins/hive/config/config.go b/flyteplugins/go/tasks/plugins/hive/config/config.go index b31b09dbee..1756aeadae 100644 --- a/flyteplugins/go/tasks/plugins/hive/config/config.go +++ b/flyteplugins/go/tasks/plugins/hive/config/config.go @@ -57,7 +57,7 @@ type Config struct { Endpoint config.URL `json:"endpoint" pflag:",Endpoint for qubole to use"` CommandAPIPath config.URL `json:"commandApiPath" pflag:",API Path where commands can be launched on Qubole. Should be a valid url."` AnalyzeLinkPath config.URL `json:"analyzeLinkPath" pflag:",URL path where queries can be visualized on qubole website. Should be a valid url."` - TokenKey string `json:"quboleTokenKey" pflag:",PrimaryLabel of the key where to find Qubole token in the secret manager."` + TokenKey string `json:"quboleTokenKey" pflag:",Name of the key where to find Qubole token in the secret manager."` LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"` Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"` ClusterConfigs []ClusterConfig `json:"clusterConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"` From 9031161bf7df3f7c8813b8fa947df5869930263c Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Thu, 9 Jan 2020 11:31:52 -0800 Subject: [PATCH 09/16] Add unit tests for the label mapping logic --- .../plugins/hive/execution_state_test.go | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/flyteplugins/go/tasks/plugins/hive/execution_state_test.go b/flyteplugins/go/tasks/plugins/hive/execution_state_test.go index f874947907..529b2c27ab 100644 --- a/flyteplugins/go/tasks/plugins/hive/execution_state_test.go +++ b/flyteplugins/go/tasks/plugins/hive/execution_state_test.go @@ -5,6 +5,7 @@ import ( "net/url" "testing" + idlCore "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/plugins" mocks2 "github.com/lyft/flytestdlib/cache/mocks" @@ -298,3 +299,99 @@ func TestKickOffQuery(t *testing.T) { assert.True(t, getOrCreateCalled) assert.True(t, quboleCalled) } + +func createMockQuboleCfg() *config.Config { + return &config.Config{ + ClusterConfigs: []config.ClusterConfig{ + {PrimaryLabel: "primary A", Labels: []string{"primary A", "A", "label A", "A-prod"}, Limit: 10}, + {PrimaryLabel: "primary B", Labels: []string{"B"}, Limit: 10}, + {PrimaryLabel: "primary C", Labels: []string{"C-prod"}, Limit: 1}, + }, + ProjectDestinationClusterConfigs: []config.ProjectDestinationClusterConfig{ + {Project: "project A", Domain: "domain X", ClusterLabel: "A-prod"}, + {Project: "project A", Domain: "domain Y", ClusterLabel: "A"}, + {Project: "project A", Domain: "domain Z", ClusterLabel: "B"}, + {Project: "project C", Domain: "domain X", ClusterLabel: "C-prod"}, + }, + } +} + +func Test_mapLabelToPrimaryLabel(t *testing.T) { + ctx := context.TODO() + mockQuboleCfg := createMockQuboleCfg() + + type args struct { + ctx context.Context + quboleCfg *config.Config + label string + } + tests := []struct { + name string + args args + want string + }{ + {name: "Label has a mapping", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "A-prod"}, want: "primary A"}, + {name: "Label has a typo", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "a"}, want: DefaultClusterPrimaryLabel}, + {name: "Label has a mapping 2", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "C-prod"}, want: "primary C"}, + {name: "Label has a typo 2", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "C_prod"}, want: DefaultClusterPrimaryLabel}, + {name: "Label has a mapping 3", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "primary A"}, want: "primary A"}, + {name: "Label has no mapping", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "D"}, want: DefaultClusterPrimaryLabel}, + {name: "Label is an empty string", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: ""}, want: DefaultClusterPrimaryLabel}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := mapLabelToPrimaryLabel(tt.args.ctx, tt.args.quboleCfg, tt.args.label); got != tt.want { + t.Errorf("mapLabelToPrimaryLabel() = %v, want %v", got, tt.want) + } + }) + } +} + +func createMockTaskExecutionContextWithProjectDomain(project string, domain string) *mocks.TaskExecutionContext { + mockTaskExecutionContext := mocks.TaskExecutionContext{} + taskExecId := &pluginsCoreMocks.TaskExecutionID{} + taskExecId.OnGetID().Return(idlCore.TaskExecutionIdentifier{ + NodeExecutionId: &idlCore.NodeExecutionIdentifier{ExecutionId: &idlCore.WorkflowExecutionIdentifier{ + Project: project, + Domain: domain, + Name: "random name", + }}, + }) + + taskMetadata := &pluginsCoreMocks.TaskExecutionMetadata{} + taskMetadata.OnGetTaskExecutionID().Return(taskExecId) + mockTaskExecutionContext.On("TaskExecutionMetadata").Return(taskMetadata) + return &mockTaskExecutionContext +} + +func Test_getClusterPrimaryLabel(t *testing.T) { + ctx := context.TODO() + config.SetQuboleConfig(createMockQuboleCfg()) + + type args struct { + ctx context.Context + tCtx core.TaskExecutionContext + clusterLabelOverride string + } + tests := []struct { + name string + args args + want string + }{ + {name: "Override is not empty + override has an existing mapping", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project A", "domain X"), clusterLabelOverride: "label A"}, want: "primary A"}, + {name: "Override is not empty + override has NO existing mapping", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project A", "domain Z"), clusterLabelOverride: "blh"}, want: DefaultClusterPrimaryLabel}, + {name: "Override is not empty + override has an existing mapping + project-domain has NO existing mapping", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project blah", "domain blah"), clusterLabelOverride: "C-prod"}, want: "primary C"}, + {name: "Override is not empty + override has an existing mapping + project-domain has an existing mapping", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project A", "domain A"), clusterLabelOverride: "C-prod"}, want: "primary C"}, + {name: "Override is empty + project-domain has an existing mapping", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project A", "domain X"), clusterLabelOverride: ""}, want: "primary A"}, + {name: "Override is empty + project-domain has an existing mapping2", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project A", "domain Z"), clusterLabelOverride: ""}, want: "primary B"}, + {name: "Override is empty + project-domain has NO existing mapping", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project A", "domain blah"), clusterLabelOverride: ""}, want: DefaultClusterPrimaryLabel}, + {name: "Override is empty + project-domain has NO existing mapping2", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project blah", "domain X"), clusterLabelOverride: ""}, want: DefaultClusterPrimaryLabel}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := getClusterPrimaryLabel(tt.args.ctx, tt.args.tCtx, tt.args.clusterLabelOverride); got != tt.want { + t.Errorf("getClusterPrimaryLabel() = %v, want %v", got, tt.want) + } + }) + } +} From b53feafcda9ef6a1d32bb4736219e9e3b718cde9 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Thu, 9 Jan 2020 11:36:56 -0800 Subject: [PATCH 10/16] dep ensure update --- flyteplugins/Gopkg.lock | 51 +++++++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/flyteplugins/Gopkg.lock b/flyteplugins/Gopkg.lock index b06c57ad97..2bf63a6c1a 100644 --- a/flyteplugins/Gopkg.lock +++ b/flyteplugins/Gopkg.lock @@ -2,23 +2,23 @@ [[projects]] - digest = "1:cb193a00b8b032b70a5bad3b57da90867fcfea7e3d23581e71dda50fca9cecd5" + digest = "1:975b4c23997bb1cb942934f3144732b2d6d3c7d250c2d8e53fe96de6c5a1ecea" name = "cloud.google.com/go" packages = ["compute/metadata"] pruneopts = "" - revision = "e5804108aed715c49f731aab1a847f063fb23681" - version = "v0.50.0" + revision = "b4cdc8d6eb508c4e74df26094d1adb678c87f818" + version = "v0.51.0" [[projects]] - digest = "1:d7ba847f449aef80f09d6b25315ff83f37f8231fe6a83d1ed38baac5892584ce" + digest = "1:e4d91a70242d1adc40f296d397be3bfd6f3728d2abc2445189b60d9e8aded5c3" name = "github.com/Azure/azure-sdk-for-go" packages = [ "storage", "version", ] pruneopts = "" - revision = "781d9fb592a1818cb7fb8e0855ea9c00b095a9e9" - version = "v37.2.0" + revision = "2e62f976f4de2713ef1b8bb3254bebb81b95b638" + version = "v38.0.0" [[projects]] digest = "1:b82e05494fb7e33ff9e4421171be18159a8afb614e3794572ddfe2db28a05d5f" @@ -48,7 +48,7 @@ version = "v0.1.3" [[projects]] - digest = "1:7980af5e19c3739b07cc411ca5d59cb5851a359f6a070348aa23be387e3b8e7e" + digest = "1:e08d1378855f8ac73a1d60f3ea4ac230fab251c50c753bf3eae22766ef480060" name = "github.com/aws/aws-sdk-go" packages = [ "aws", @@ -77,6 +77,7 @@ "internal/sdkrand", "internal/sdkuri", "internal/shareddefaults", + "internal/strings", "private/protocol", "private/protocol/eventstream", "private/protocol/eventstream/eventstreamapi", @@ -97,8 +98,8 @@ "service/sts/stsiface", ] pruneopts = "" - revision = "a1e6946e8014a793d989e64ef5566315010ce898" - version = "v1.27.0" + revision = "e92e1b8eb5fa589199a2bbcdf84a0342fbc40dc6" + version = "v1.27.4" [[projects]] digest = "1:ac2a05be7167c495fe8aaf8aaf62ecf81e78d2180ecb04e16778dc6c185c96a5" @@ -149,12 +150,12 @@ version = "v4.6.0" [[projects]] - digest = "1:afb117f8c84e70a479a3c4c2d9250b29634b65d392d2339ee833626905a28ecd" + digest = "1:38c537dca3034658912a9ed95ae5d0d300a6a37cd04ca53143e056a5170a3c83" name = "github.com/fatih/color" packages = ["."] pruneopts = "" - revision = "2e5e248695e7e4a0f9ff6d548b83b99199326f49" - version = "v1.8.0" + revision = "daf2830f2741ebb735b21709a520c5f37d642d85" + version = "v1.9.0" [[projects]] digest = "1:eb53021a8aa3f599d29c7102e65026242bdedce998a54837dc67f14b6a97c5fd" @@ -216,7 +217,7 @@ version = "v1.3.2" [[projects]] - digest = "1:9fcb267c272bc5054564b392e3ff7e65e35400fd9914afb1d169f92b95e7dbc9" + digest = "1:c35d25413dce110e4ff2ca871c648bf5365ab8c954021394cabd39f5e0566fd8" name = "github.com/google/go-cmp" packages = [ "cmp", @@ -226,8 +227,8 @@ "cmp/internal/value", ] pruneopts = "" - revision = "2d0692c2e9617365a95b295612ac0d4415ba4627" - version = "v0.3.1" + revision = "5a6f75716e1203a923a78c9efb94089d857df0f6" + version = "v0.4.0" [[projects]] digest = "1:8d4a577a9643f713c25a32151c0f26af7228b4b97a219b5ddb7fd38d16f6e673" @@ -636,11 +637,11 @@ [[projects]] branch = "master" - digest = "1:623570fddb99ef064b125c7bf3161f14aba21fdb787896b6c5070d34187c86f8" + digest = "1:47ec6ad813222524cbcd6c5c23b9a6a1a3f45b502fe5088fd59e6ba6a1436db3" name = "golang.org/x/crypto" packages = ["ssh/terminal"] pruneopts = "" - revision = "53104e6ec876ad4e22ad27cce588b01392043c1b" + revision = "61a87790db17894570dfb32dbaa0a4af9ce60cb4" [[projects]] branch = "master" @@ -661,7 +662,7 @@ [[projects]] branch = "master" - digest = "1:44d75e002fa89506cca17f921eb07aee69526d7519bb7f1f2b0e017380c5e011" + digest = "1:571c7f844acf3c916ac5997f82f227c49a38490f1fca65afd47d64d188ced96e" name = "golang.org/x/oauth2" packages = [ ".", @@ -671,18 +672,18 @@ "jwt", ] pruneopts = "" - revision = "858c2ad4c8b6c5d10852cb89079f6ca1c7309787" + revision = "bf48bf16ab8d622ce64ec6ce98d2c98f916b6303" [[projects]] branch = "master" - digest = "1:8c945c7d15d859d00371d42149761229fcc8dcd839ff2b998defba22fc9f7d24" + digest = "1:ca984d839d46cda7860ac472476ef53423d8be07f19822f59a7c575127f995be" name = "golang.org/x/sys" packages = [ "unix", "windows", ] pruneopts = "" - revision = "a1369afcdac740082c63165b07ec83b531884be2" + revision = "548cf772de5052aa878ccb47cdeb7d262b75c8ec" [[projects]] digest = "1:740b51a55815493a8d0f2b1e0d0ae48fe48953bf7eaf3fcc4198823bf67768c0" @@ -767,11 +768,11 @@ [[projects]] branch = "master" - digest = "1:ba64d3acd79ad4dee34b76de29a73959114a204f1f35791807171a152d41aa38" + digest = "1:a58dce82b2e7f7174b861598654aa2d8c7f1a545eb0c961f87cc569029ce1941" name = "google.golang.org/genproto" packages = ["googleapis/rpc/status"] pruneopts = "" - revision = "f3c370f40bfba3cb25c5c2f823a1a8031b5ad724" + revision = "bd8f9a0ef82f9870cb10caef4f23c348069600cb" [[projects]] digest = "1:7af390490e636a6adc9c76b37a3c823195fbf375a02c4d9506b4dd49d5d2409a" @@ -993,7 +994,7 @@ [[projects]] branch = "master" - digest = "1:7b3e186eef10afc7c1f868fea5bd20575e4960e3113af467275f750b628bc64e" + digest = "1:fa34b9e9fa4395fc2a63a64265dda7bff9a9ab4b5bd94020be3ff866dc87ca0f" name = "k8s.io/utils" packages = [ "buffer", @@ -1001,7 +1002,7 @@ "trace", ] pruneopts = "" - revision = "f07c713de88362aef7545072487d6118bd4a3d4a" + revision = "94aeca20bf0991bf33922a5938174b9147ab8ca7" [[projects]] digest = "1:d65cadd0126bf9385147e2853e53fda65fc419c04da329b3b077fb4765e3737c" From 440ef60a6f224c730bdc5f49022bc7ff14f36362 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Thu, 9 Jan 2020 16:58:17 -0800 Subject: [PATCH 11/16] rename config item and add logs --- .../go/tasks/plugins/hive/config/config.go | 34 +++++++++---------- .../go/tasks/plugins/hive/execution_state.go | 2 +- .../plugins/hive/execution_state_test.go | 2 +- .../go/tasks/plugins/hive/executor.go | 2 ++ 4 files changed, 21 insertions(+), 19 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/hive/config/config.go b/flyteplugins/go/tasks/plugins/hive/config/config.go index 1756aeadae..d1dd51cd1a 100644 --- a/flyteplugins/go/tasks/plugins/hive/config/config.go +++ b/flyteplugins/go/tasks/plugins/hive/config/config.go @@ -31,7 +31,7 @@ type ClusterConfig struct { Limit int `json:"limit" pflag:",Resource quota (in the number of outstanding requests) of the service cluster"` } -type ProjectDestinationClusterConfig struct { +type DestinationClusterConfig struct { Project string `json:"project" pflag:",Project of the task which the query belongs to"` Domain string `json:"domain" pflag:",Domain of the task which the query belongs to"` ClusterLabel string `json:"clusterLabel" pflag:",The label of the destination cluster this query to be submitted to"` @@ -39,14 +39,14 @@ type ProjectDestinationClusterConfig struct { var ( defaultConfig = Config{ - Endpoint: MustParse("https://wellness.qubole.com"), - CommandAPIPath: MustParse("/api/v1.2/commands/"), - AnalyzeLinkPath: MustParse("/v2/analyze"), - TokenKey: "FLYTE_QUBOLE_CLIENT_TOKEN", - LruCacheSize: 2000, - Workers: 15, - ClusterConfigs: []ClusterConfig{{PrimaryLabel: "default", Labels: []string{"default"}, Limit: 250}}, - ProjectDestinationClusterConfigs: []ProjectDestinationClusterConfig{}, + Endpoint: MustParse("https://wellness.qubole.com"), + CommandAPIPath: MustParse("/api/v1.2/commands/"), + AnalyzeLinkPath: MustParse("/v2/analyze"), + TokenKey: "FLYTE_QUBOLE_CLIENT_TOKEN", + LruCacheSize: 2000, + Workers: 15, + ClusterConfigs: []ClusterConfig{{PrimaryLabel: "default", Labels: []string{"default"}, Limit: 250}}, + DestinationClusterConfigs: []DestinationClusterConfig{}, } quboleConfigSection = pluginsConfig.MustRegisterSubSection(quboleConfigSectionKey, &defaultConfig) @@ -54,14 +54,14 @@ var ( // Qubole plugin configs type Config struct { - Endpoint config.URL `json:"endpoint" pflag:",Endpoint for qubole to use"` - CommandAPIPath config.URL `json:"commandApiPath" pflag:",API Path where commands can be launched on Qubole. Should be a valid url."` - AnalyzeLinkPath config.URL `json:"analyzeLinkPath" pflag:",URL path where queries can be visualized on qubole website. Should be a valid url."` - TokenKey string `json:"quboleTokenKey" pflag:",Name of the key where to find Qubole token in the secret manager."` - LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"` - Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"` - ClusterConfigs []ClusterConfig `json:"clusterConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"` - ProjectDestinationClusterConfigs []ProjectDestinationClusterConfig `json:"projectDestinationClusterConfigs" pflag:"-,A list configs specifying the destination service cluster for (project, domain)"` + Endpoint config.URL `json:"endpoint" pflag:",Endpoint for qubole to use"` + CommandAPIPath config.URL `json:"commandApiPath" pflag:",API Path where commands can be launched on Qubole. Should be a valid url."` + AnalyzeLinkPath config.URL `json:"analyzeLinkPath" pflag:",URL path where queries can be visualized on qubole website. Should be a valid url."` + TokenKey string `json:"quboleTokenKey" pflag:",Name of the key where to find Qubole token in the secret manager."` + LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"` + Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"` + ClusterConfigs []ClusterConfig `json:"clusterConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"` + DestinationClusterConfigs []DestinationClusterConfig `json:"destinationClusterConfigs" pflag:"-,A list configs specifying the destination service cluster for (project, domain)"` } // Retrieves the current config value or default. diff --git a/flyteplugins/go/tasks/plugins/hive/execution_state.go b/flyteplugins/go/tasks/plugins/hive/execution_state.go index 1e0d74d7e1..48ea2603ce 100644 --- a/flyteplugins/go/tasks/plugins/hive/execution_state.go +++ b/flyteplugins/go/tasks/plugins/hive/execution_state.go @@ -251,7 +251,7 @@ func getClusterPrimaryLabel(ctx context.Context, tCtx core.TaskExecutionContext, domain := tExecId.NodeExecutionId.GetExecutionId().GetDomain() logger.Debugf(ctx, "No clusterLabelOverride. Finding the pre-defined cluster label for (project: %v, domain: %v)", project, domain) // Using a linear search because N is small - for _, m := range cfg.ProjectDestinationClusterConfigs { + for _, m := range cfg.DestinationClusterConfigs { if project == m.Project && domain == m.Domain { clusterLabel = m.ClusterLabel logger.Debugf(ctx, "Found the pre-defined cluster label [%v] for (project: %v, domain: %v)", clusterLabel, project, domain) diff --git a/flyteplugins/go/tasks/plugins/hive/execution_state_test.go b/flyteplugins/go/tasks/plugins/hive/execution_state_test.go index 529b2c27ab..3285202f5a 100644 --- a/flyteplugins/go/tasks/plugins/hive/execution_state_test.go +++ b/flyteplugins/go/tasks/plugins/hive/execution_state_test.go @@ -307,7 +307,7 @@ func createMockQuboleCfg() *config.Config { {PrimaryLabel: "primary B", Labels: []string{"B"}, Limit: 10}, {PrimaryLabel: "primary C", Labels: []string{"C-prod"}, Limit: 1}, }, - ProjectDestinationClusterConfigs: []config.ProjectDestinationClusterConfig{ + DestinationClusterConfigs: []config.DestinationClusterConfig{ {Project: "project A", Domain: "domain X", ClusterLabel: "A-prod"}, {Project: "project A", Domain: "domain Y", ClusterLabel: "A"}, {Project: "project A", Domain: "domain Z", ClusterLabel: "B"}, diff --git a/flyteplugins/go/tasks/plugins/hive/executor.go b/flyteplugins/go/tasks/plugins/hive/executor.go index f6891a1f13..7fa833c2c8 100644 --- a/flyteplugins/go/tasks/plugins/hive/executor.go +++ b/flyteplugins/go/tasks/plugins/hive/executor.go @@ -129,7 +129,9 @@ func InitializeHiveExecutor(ctx context.Context, iCtx core.SetupContext, cfg *co for clusterPrimaryLabel, clusterLimit := range resourceConfig { namespaceWithClusterPrimaryLabel := resourceNamespace.CreateSubNamespace(core.ResourceNamespace(clusterPrimaryLabel)) + logger.Infof(ctx, "Registering resource quota [%v]", clusterPrimaryLabel) if err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, namespaceWithClusterPrimaryLabel, clusterLimit); err != nil { + logger.Errorf(ctx, "Resource quota registration for [%v] failed due to error [%v]", clusterPrimaryLabel, err) return nil, err } } From 5cc3986821ff30581da6c3131c10cb98e0e867ad Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Thu, 9 Jan 2020 17:01:22 -0800 Subject: [PATCH 12/16] fixed test --- flyteplugins/go/tasks/plugins/hive/execution_state_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flyteplugins/go/tasks/plugins/hive/execution_state_test.go b/flyteplugins/go/tasks/plugins/hive/execution_state_test.go index 3285202f5a..e0cb22265e 100644 --- a/flyteplugins/go/tasks/plugins/hive/execution_state_test.go +++ b/flyteplugins/go/tasks/plugins/hive/execution_state_test.go @@ -366,7 +366,8 @@ func createMockTaskExecutionContextWithProjectDomain(project string, domain stri func Test_getClusterPrimaryLabel(t *testing.T) { ctx := context.TODO() - config.SetQuboleConfig(createMockQuboleCfg()) + err := config.SetQuboleConfig(createMockQuboleCfg()) + assert.Nil(t, err) type args struct { ctx context.Context From 3aa4edf336a2fdd6ac307ad440505377db977120 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Fri, 10 Jan 2020 15:20:41 -0800 Subject: [PATCH 13/16] Make plugin resource namespace consistent everywhere --- .../go/tasks/plugins/hive/execution_state.go | 22 +++++++++---------- .../go/tasks/plugins/hive/executor.go | 21 +++++++----------- 2 files changed, 19 insertions(+), 24 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/hive/execution_state.go b/flyteplugins/go/tasks/plugins/hive/execution_state.go index 48ea2603ce..19962b650a 100644 --- a/flyteplugins/go/tasks/plugins/hive/execution_state.go +++ b/flyteplugins/go/tasks/plugins/hive/execution_state.go @@ -65,14 +65,14 @@ type ExecutionState struct { // This is the main state iteration func HandleExecutionState(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, quboleClient client.QuboleClient, - executionsCache cache.AutoRefresh, resourceNamespace core.ResourceNamespace, cfg *config.Config) (ExecutionState, error) { + executionsCache cache.AutoRefresh, cfg *config.Config) (ExecutionState, error) { var transformError error var newState ExecutionState switch currentState.Phase { case PhaseNotStarted: - newState, transformError = GetAllocationToken(ctx, resourceNamespace, tCtx) + newState, transformError = GetAllocationToken(ctx, tCtx) case PhaseQueued: newState, transformError = KickOffQuery(ctx, tCtx, currentState, quboleClient, executionsCache, cfg) @@ -141,25 +141,25 @@ func ConstructTaskInfo(e ExecutionState) *core.TaskInfo { return nil } -func composeResourceNamespaceWithClusterPrimaryLabel(ctx context.Context, tCtx core.TaskExecutionContext, resourceNamespace core.ResourceNamespace) (core.ResourceNamespace, error) { +func composeResourceNamespaceWithClusterPrimaryLabel(ctx context.Context, tCtx core.TaskExecutionContext) (core.ResourceNamespace, error) { _, clusterLabelOverride, _, _, err := GetQueryInfo(ctx, tCtx) if err != nil { - return resourceNamespace, err + return "", err } clusterPrimaryLabel := getClusterPrimaryLabel(ctx, tCtx, clusterLabelOverride) - return resourceNamespace.CreateSubNamespace(core.ResourceNamespace(clusterPrimaryLabel)), nil + return core.ResourceNamespace(clusterPrimaryLabel), nil } -func GetAllocationToken(ctx context.Context, resourceNamespace core.ResourceNamespace, tCtx core.TaskExecutionContext) (ExecutionState, error) { +func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext) (ExecutionState, error) { newState := ExecutionState{} uniqueId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() - resourceNamespaceWithClusterPrimaryLabel, err := composeResourceNamespaceWithClusterPrimaryLabel(ctx, tCtx, resourceNamespace) + clusterPrimaryLabel, err := composeResourceNamespaceWithClusterPrimaryLabel(ctx, tCtx) if err != nil { return newState, errors.Wrapf(errors.ResourceManagerFailure, err, "Error getting query info when requesting allocation token %s", uniqueId) } - allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, resourceNamespaceWithClusterPrimaryLabel, uniqueId) + allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, clusterPrimaryLabel, uniqueId) if err != nil { logger.Errorf(ctx, "Resource manager failed for TaskExecId [%s] token [%s]. error %s", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID(), uniqueId, err) @@ -359,15 +359,15 @@ func Abort(ctx context.Context, tCtx core.TaskExecutionContext, currentState Exe return nil } -func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, resourceNamespace core.ResourceNamespace, _ ExecutionState) error { +func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, _ ExecutionState) error { // Release allocation token uniqueId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() - resourceNamespaceWithClusterPrimaryLabel, err := composeResourceNamespaceWithClusterPrimaryLabel(ctx, tCtx, resourceNamespace) + clusterPrimaryLabel, err := composeResourceNamespaceWithClusterPrimaryLabel(ctx, tCtx) if err != nil { return errors.Wrapf(errors.ResourceManagerFailure, err, "Error getting query info when releasing allocation token %s", uniqueId) } - err = tCtx.ResourceManager().ReleaseResource(ctx, resourceNamespaceWithClusterPrimaryLabel, uniqueId) + err = tCtx.ResourceManager().ReleaseResource(ctx, clusterPrimaryLabel, uniqueId) if err != nil { logger.Errorf(ctx, "Error releasing allocation token [%s] in Finalize [%s]", uniqueId, err) diff --git a/flyteplugins/go/tasks/plugins/hive/executor.go b/flyteplugins/go/tasks/plugins/hive/executor.go index 7fa833c2c8..b696069a72 100644 --- a/flyteplugins/go/tasks/plugins/hive/executor.go +++ b/flyteplugins/go/tasks/plugins/hive/executor.go @@ -37,10 +37,6 @@ func (q QuboleHiveExecutor) GetID() string { return q.id } -func (q QuboleHiveExecutor) GetResourceNamespace() core.ResourceNamespace { - return core.ResourceNamespace(q.GetID()) -} - func (q QuboleHiveExecutor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (core.Transition, error) { incomingState := ExecutionState{} @@ -55,8 +51,7 @@ func (q QuboleHiveExecutor) Handle(ctx context.Context, tCtx core.TaskExecutionC // Do what needs to be done, and give this function everything it needs to do its job properly // TODO: Play around with making this return a transition directly. How will that pattern affect the multi-Qubole plugin - outgoingState, transformError := HandleExecutionState(ctx, tCtx, incomingState, q.quboleClient, q.executionsCache, - q.GetResourceNamespace(), q.cfg) + outgoingState, transformError := HandleExecutionState(ctx, tCtx, incomingState, q.quboleClient, q.executionsCache, q.cfg) // Return if there was an error if transformError != nil { @@ -98,7 +93,7 @@ func (q QuboleHiveExecutor) Finalize(ctx context.Context, tCtx core.TaskExecutio return errors.Wrapf(errors.CorruptedPluginState, err, "Failed to unmarshal custom state in Finalize") } - return Finalize(ctx, tCtx, q.GetResourceNamespace(), incomingState) + return Finalize(ctx, tCtx, incomingState) } func (q QuboleHiveExecutor) GetProperties() core.PluginProperties { @@ -107,7 +102,7 @@ func (q QuboleHiveExecutor) GetProperties() core.PluginProperties { func QuboleHiveExecutorLoader(ctx context.Context, iCtx core.SetupContext) (core.Plugin, error) { cfg := config.GetQuboleConfig() - return InitializeHiveExecutor(ctx, iCtx, cfg, quboleHiveExecutorId, BuildResourceConfig(cfg), client.NewQuboleClient(cfg)) + return InitializeHiveExecutor(ctx, iCtx, cfg, BuildResourceConfig(cfg), client.NewQuboleClient(cfg)) } func BuildResourceConfig(cfg *config.Config) map[string]int { @@ -119,18 +114,18 @@ func BuildResourceConfig(cfg *config.Config) map[string]int { return resourceConfig } -func InitializeHiveExecutor(ctx context.Context, iCtx core.SetupContext, cfg *config.Config, resourceNamespace core.ResourceNamespace, resourceConfig map[string]int, +func InitializeHiveExecutor(ctx context.Context, iCtx core.SetupContext, cfg *config.Config, resourceConfig map[string]int, quboleClient client.QuboleClient) (core.Plugin, error) { - + logger.Infof(ctx, "Initializing a Hive executor with a resource config [%v]", resourceConfig) q, err := NewQuboleHiveExecutor(ctx, cfg, quboleClient, iCtx.SecretManager(), iCtx.MetricsScope()) if err != nil { + logger.Errorf(ctx, "Failed to create a new QuboleHiveExecutor due to error: [%v]", err) return nil, err } for clusterPrimaryLabel, clusterLimit := range resourceConfig { - namespaceWithClusterPrimaryLabel := resourceNamespace.CreateSubNamespace(core.ResourceNamespace(clusterPrimaryLabel)) - logger.Infof(ctx, "Registering resource quota [%v]", clusterPrimaryLabel) - if err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, namespaceWithClusterPrimaryLabel, clusterLimit); err != nil { + logger.Infof(ctx, "Registering resource quota for cluster [%v]", clusterPrimaryLabel) + if err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, core.ResourceNamespace(clusterPrimaryLabel), clusterLimit); err != nil { logger.Errorf(ctx, "Resource quota registration for [%v] failed due to error [%v]", clusterPrimaryLabel, err) return nil, err } From cd0264f57be5b70d6e0d5930f0e8e21d34fd2a8d Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Fri, 10 Jan 2020 15:38:40 -0800 Subject: [PATCH 14/16] Further fixing some function calls to adapt to the changes of function signatures --- .../go/tasks/plugins/hive/execution_state_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/hive/execution_state_test.go b/flyteplugins/go/tasks/plugins/hive/execution_state_test.go index e0cb22265e..16708acf7a 100644 --- a/flyteplugins/go/tasks/plugins/hive/execution_state_test.go +++ b/flyteplugins/go/tasks/plugins/hive/execution_state_test.go @@ -169,7 +169,7 @@ func TestGetAllocationToken(t *testing.T) { x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything). Return(core.AllocationStatusGranted, nil) - state, err := GetAllocationToken(ctx, quboleHiveExecutorId, tCtx) + state, err := GetAllocationToken(ctx, tCtx) assert.NoError(t, err) assert.Equal(t, PhaseQueued, state.Phase) }) @@ -181,7 +181,7 @@ func TestGetAllocationToken(t *testing.T) { x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything). Return(core.AllocationStatusExhausted, nil) - state, err := GetAllocationToken(ctx, quboleHiveExecutorId, tCtx) + state, err := GetAllocationToken(ctx, tCtx) assert.NoError(t, err) assert.Equal(t, PhaseNotStarted, state.Phase) }) @@ -193,7 +193,7 @@ func TestGetAllocationToken(t *testing.T) { x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything). Return(core.AllocationStatusNamespaceQuotaExceeded, nil) - state, err := GetAllocationToken(ctx, quboleHiveExecutorId, tCtx) + state, err := GetAllocationToken(ctx, tCtx) assert.NoError(t, err) assert.Equal(t, PhaseNotStarted, state.Phase) }) @@ -244,7 +244,7 @@ func TestFinalize(t *testing.T) { called = true }).Return(nil) - err := Finalize(ctx, tCtx, quboleHiveExecutorId, state) + err := Finalize(ctx, tCtx, state) assert.NoError(t, err) assert.True(t, called) } From 1fae82ba34dd531d87247d1d067ae3b1cef4932a Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Sat, 11 Jan 2020 11:41:10 -0800 Subject: [PATCH 15/16] Correcting the primary-cluster label overriding and mapping logic in hive plugin; fixing and adding tests to verify --- .../go/tasks/plugins/hive/execution_state.go | 90 +++++++++++++------ .../plugins/hive/execution_state_test.go | 29 +++--- 2 files changed, 80 insertions(+), 39 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/hive/execution_state.go b/flyteplugins/go/tasks/plugins/hive/execution_state.go index 19962b650a..91143688ee 100644 --- a/flyteplugins/go/tasks/plugins/hive/execution_state.go +++ b/flyteplugins/go/tasks/plugins/hive/execution_state.go @@ -221,50 +221,90 @@ func GetQueryInfo(ctx context.Context, tCtx core.TaskExecutionContext) ( return } -func mapLabelToPrimaryLabel(ctx context.Context, quboleCfg *config.Config, label string) string { +func mapLabelToPrimaryLabel(ctx context.Context, quboleCfg *config.Config, label string) (string, bool) { + primaryLabel := DefaultClusterPrimaryLabel + found := false + if label == "" { logger.Debugf(ctx, "Input cluster label is an empty string; falling back to using the default primary label [%v]", label, DefaultClusterPrimaryLabel) - return DefaultClusterPrimaryLabel + return primaryLabel, found } + // Using a linear search because N is small and because of ClusterConfig's struct definition // which is determined specifically for the readability of the corresponding configmap yaml file for _, clusterCfg := range quboleCfg.ClusterConfigs { for _, l := range clusterCfg.Labels { - if l == label { + if label != "" && l == label { logger.Debugf(ctx, "Found the primary label [%v] for label [%v]", clusterCfg.PrimaryLabel, label) - return clusterCfg.PrimaryLabel + primaryLabel, found = clusterCfg.PrimaryLabel, true + break } } } + logger.Debugf(ctx, "Cannot find the primary cluster label for label [%v] in configmap; "+ "falling back to using the default primary label [%v]", label, DefaultClusterPrimaryLabel) - return DefaultClusterPrimaryLabel + return primaryLabel, found +} + +func mapProjectDomainToDestinationClusterLabel(ctx context.Context, tCtx core.TaskExecutionContext, quboleCfg *config.Config) (string, bool) { + tExecId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID() + project := tExecId.NodeExecutionId.GetExecutionId().GetProject() + domain := tExecId.NodeExecutionId.GetExecutionId().GetDomain() + logger.Debugf(ctx, "No clusterLabelOverride. Finding the pre-defined cluster label for (project: %v, domain: %v)", project, domain) + // Using a linear search because N is small + for _, m := range quboleCfg.DestinationClusterConfigs { + if project == m.Project && domain == m.Domain { + logger.Debugf(ctx, "Found the pre-defined cluster label [%v] for (project: %v, domain: %v)", m.ClusterLabel, project, domain) + return m.ClusterLabel, true + } + } + + // This function finds the label, not primary label, so in the case where no mapping is found, this function should return an empty string + return "", false } func getClusterPrimaryLabel(ctx context.Context, tCtx core.TaskExecutionContext, clusterLabelOverride string) string { cfg := config.GetQuboleConfig() - clusterLabel := "" - // If there's no override, we look up in our mapping to find the proper primary cluster clusterLabel according to the project and the domain - if clusterLabelOverride == "" { - tExecId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID() - project := tExecId.NodeExecutionId.GetExecutionId().GetProject() - domain := tExecId.NodeExecutionId.GetExecutionId().GetDomain() - logger.Debugf(ctx, "No clusterLabelOverride. Finding the pre-defined cluster label for (project: %v, domain: %v)", project, domain) - // Using a linear search because N is small - for _, m := range cfg.DestinationClusterConfigs { - if project == m.Project && domain == m.Domain { - clusterLabel = m.ClusterLabel - logger.Debugf(ctx, "Found the pre-defined cluster label [%v] for (project: %v, domain: %v)", clusterLabel, project, domain) - break - } + + // If override is not empty and if it has a mapping, we return the mapped primary label + if clusterLabelOverride != "" { + if primaryLabel, found := mapLabelToPrimaryLabel(ctx, cfg, clusterLabelOverride); found { + return primaryLabel } - } else { - clusterLabel = clusterLabelOverride - logger.Debugf(ctx, "clusterLabelOverride exists = [%v]. Using it as clusterLabel = [%v]", clusterLabelOverride, clusterLabel) } - primaryLabel := mapLabelToPrimaryLabel(ctx, cfg, clusterLabel) - logger.Debugf(ctx, "Cluster clusterLabel override = [%v]. Getting the primary clusterLabel of clusterLabel [%v] = [%v]", clusterLabelOverride, clusterLabel, primaryLabel) - return primaryLabel + + // If override is empty or if the override does not have a mapping, we return the primary label mapped using (project, domain) + if clusterLabel, found := mapProjectDomainToDestinationClusterLabel(ctx, tCtx, cfg); found { + primaryLabel, _ := mapLabelToPrimaryLabel(ctx, cfg, clusterLabel) + return primaryLabel + } + + // Else we return the default primary label + return DefaultClusterPrimaryLabel + /* + // If there's no override, we look up in our mapping to find the proper primary cluster clusterLabel according to the project and the domain + if clusterLabelOverride == "" { + tExecId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID() + project := tExecId.NodeExecutionId.GetExecutionId().GetProject() + domain := tExecId.NodeExecutionId.GetExecutionId().GetDomain() + logger.Debugf(ctx, "No clusterLabelOverride. Finding the pre-defined cluster label for (project: %v, domain: %v)", project, domain) + // Using a linear search because N is small + for _, m := range cfg.DestinationClusterConfigs { + if project == m.Project && domain == m.Domain { + clusterLabel = m.ClusterLabel + logger.Debugf(ctx, "Found the pre-defined cluster label [%v] for (project: %v, domain: %v)", clusterLabel, project, domain) + break + } + } + } else { + clusterLabel = clusterLabelOverride + logger.Debugf(ctx, "clusterLabelOverride exists = [%v]. Using it as clusterLabel = [%v]", clusterLabelOverride, clusterLabel) + } + primaryLabel := mapLabelToPrimaryLabel(ctx, cfg, clusterLabel) + logger.Debugf(ctx, "Cluster clusterLabel override = [%v]. Getting the primary clusterLabel of clusterLabel [%v] = [%v]", clusterLabelOverride, clusterLabel, primaryLabel) + return primaryLabel + */ } func KickOffQuery(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, quboleClient client.QuboleClient, diff --git a/flyteplugins/go/tasks/plugins/hive/execution_state_test.go b/flyteplugins/go/tasks/plugins/hive/execution_state_test.go index 16708acf7a..ef68153459 100644 --- a/flyteplugins/go/tasks/plugins/hive/execution_state_test.go +++ b/flyteplugins/go/tasks/plugins/hive/execution_state_test.go @@ -326,22 +326,23 @@ func Test_mapLabelToPrimaryLabel(t *testing.T) { label string } tests := []struct { - name string - args args - want string + name string + args args + want string + wantFound bool }{ - {name: "Label has a mapping", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "A-prod"}, want: "primary A"}, - {name: "Label has a typo", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "a"}, want: DefaultClusterPrimaryLabel}, - {name: "Label has a mapping 2", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "C-prod"}, want: "primary C"}, - {name: "Label has a typo 2", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "C_prod"}, want: DefaultClusterPrimaryLabel}, - {name: "Label has a mapping 3", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "primary A"}, want: "primary A"}, - {name: "Label has no mapping", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "D"}, want: DefaultClusterPrimaryLabel}, - {name: "Label is an empty string", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: ""}, want: DefaultClusterPrimaryLabel}, + {name: "Label has a mapping", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "A-prod"}, want: "primary A", wantFound: true}, + {name: "Label has a typo", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "a"}, want: DefaultClusterPrimaryLabel, wantFound: false}, + {name: "Label has a mapping 2", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "C-prod"}, want: "primary C", wantFound: true}, + {name: "Label has a typo 2", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "C_prod"}, want: DefaultClusterPrimaryLabel, wantFound: false}, + {name: "Label has a mapping 3", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "primary A"}, want: "primary A", wantFound: true}, + {name: "Label has no mapping", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: "D"}, want: DefaultClusterPrimaryLabel, wantFound: false}, + {name: "Label is an empty string", args: args{ctx: ctx, quboleCfg: mockQuboleCfg, label: ""}, want: DefaultClusterPrimaryLabel, wantFound: false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := mapLabelToPrimaryLabel(tt.args.ctx, tt.args.quboleCfg, tt.args.label); got != tt.want { - t.Errorf("mapLabelToPrimaryLabel() = %v, want %v", got, tt.want) + if got, found := mapLabelToPrimaryLabel(tt.args.ctx, tt.args.quboleCfg, tt.args.label); got != tt.want || found != tt.wantFound { + t.Errorf("mapLabelToPrimaryLabel() = (%v, %v), want (%v, %v)", got, found, tt.want, tt.wantFound) } }) } @@ -379,8 +380,8 @@ func Test_getClusterPrimaryLabel(t *testing.T) { args args want string }{ - {name: "Override is not empty + override has an existing mapping", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project A", "domain X"), clusterLabelOverride: "label A"}, want: "primary A"}, - {name: "Override is not empty + override has NO existing mapping", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project A", "domain Z"), clusterLabelOverride: "blh"}, want: DefaultClusterPrimaryLabel}, + {name: "Override is not empty + override has NO existing mapping + project-domain has an existing mapping", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project A", "domain Z"), clusterLabelOverride: "AAAA"}, want: "primary B"}, + {name: "Override is not empty + override has NO existing mapping + project-domain has NO existing mapping", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project A", "domain blah"), clusterLabelOverride: "blh"}, want: DefaultClusterPrimaryLabel}, {name: "Override is not empty + override has an existing mapping + project-domain has NO existing mapping", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project blah", "domain blah"), clusterLabelOverride: "C-prod"}, want: "primary C"}, {name: "Override is not empty + override has an existing mapping + project-domain has an existing mapping", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project A", "domain A"), clusterLabelOverride: "C-prod"}, want: "primary C"}, {name: "Override is empty + project-domain has an existing mapping", args: args{ctx: ctx, tCtx: createMockTaskExecutionContextWithProjectDomain("project A", "domain X"), clusterLabelOverride: ""}, want: "primary A"}, From 7bd4a4544aa7c98df5a872d257857de98eaaac33 Mon Sep 17 00:00:00 2001 From: Chang-Hong Hsu Date: Mon, 13 Jan 2020 10:58:15 -0800 Subject: [PATCH 16/16] remove old comment --- .../go/tasks/plugins/hive/execution_state.go | 23 ------------------- 1 file changed, 23 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/hive/execution_state.go b/flyteplugins/go/tasks/plugins/hive/execution_state.go index 91143688ee..5ff0f2b76c 100644 --- a/flyteplugins/go/tasks/plugins/hive/execution_state.go +++ b/flyteplugins/go/tasks/plugins/hive/execution_state.go @@ -282,29 +282,6 @@ func getClusterPrimaryLabel(ctx context.Context, tCtx core.TaskExecutionContext, // Else we return the default primary label return DefaultClusterPrimaryLabel - /* - // If there's no override, we look up in our mapping to find the proper primary cluster clusterLabel according to the project and the domain - if clusterLabelOverride == "" { - tExecId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID() - project := tExecId.NodeExecutionId.GetExecutionId().GetProject() - domain := tExecId.NodeExecutionId.GetExecutionId().GetDomain() - logger.Debugf(ctx, "No clusterLabelOverride. Finding the pre-defined cluster label for (project: %v, domain: %v)", project, domain) - // Using a linear search because N is small - for _, m := range cfg.DestinationClusterConfigs { - if project == m.Project && domain == m.Domain { - clusterLabel = m.ClusterLabel - logger.Debugf(ctx, "Found the pre-defined cluster label [%v] for (project: %v, domain: %v)", clusterLabel, project, domain) - break - } - } - } else { - clusterLabel = clusterLabelOverride - logger.Debugf(ctx, "clusterLabelOverride exists = [%v]. Using it as clusterLabel = [%v]", clusterLabelOverride, clusterLabel) - } - primaryLabel := mapLabelToPrimaryLabel(ctx, cfg, clusterLabel) - logger.Debugf(ctx, "Cluster clusterLabel override = [%v]. Getting the primary clusterLabel of clusterLabel [%v] = [%v]", clusterLabelOverride, clusterLabel, primaryLabel) - return primaryLabel - */ } func KickOffQuery(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, quboleClient client.QuboleClient,