diff --git a/.github/workflows/ci-build.yaml b/.github/workflows/ci-build.yaml index 013b858676a7..b8465d986425 100644 --- a/.github/workflows/ci-build.yaml +++ b/.github/workflows/ci-build.yaml @@ -108,9 +108,6 @@ jobs: - test: test-executor containerRuntimeExecutor: emissary profile: minimal - - test: test-executor - containerRuntimeExecutor: kubelet - profile: minimal - test: test-executor containerRuntimeExecutor: pns profile: minimal diff --git a/cmd/argoexec/commands/root.go b/cmd/argoexec/commands/root.go index ca1f6f19aa88..022205a5322b 100644 --- a/cmd/argoexec/commands/root.go +++ b/cmd/argoexec/commands/root.go @@ -24,7 +24,6 @@ import ( "github.com/argoproj/argo-workflows/v3/workflow/common" "github.com/argoproj/argo-workflows/v3/workflow/executor" "github.com/argoproj/argo-workflows/v3/workflow/executor/emissary" - "github.com/argoproj/argo-workflows/v3/workflow/executor/kubelet" "github.com/argoproj/argo-workflows/v3/workflow/executor/pns" ) @@ -112,8 +111,6 @@ func initExecutor() *executor.WorkflowExecutor { var cre executor.ContainerRuntimeExecutor log.Infof("Creating a %s executor", executorType) switch executorType { - case common.ContainerRuntimeExecutorKubelet: - cre, err = kubelet.NewKubeletExecutor(namespace, podName) case common.ContainerRuntimeExecutorPNS: cre, err = pns.NewPNSExecutor(clientset, podName, namespace) default: diff --git a/config/config.go b/config/config.go index 8bdb79faf33e..e79e7666df53 100644 --- a/config/config.go +++ b/config/config.go @@ -37,12 +37,6 @@ type Config struct { ContainerRuntimeExecutors ContainerRuntimeExecutors `json:"containerRuntimeExecutors,omitempty"` - // KubeletPort is needed when using the kubelet containerRuntimeExecutor, default to 10250 - KubeletPort int `json:"kubeletPort,omitempty"` - - // KubeletInsecure disable the TLS verification of the kubelet containerRuntimeExecutor, default to false - KubeletInsecure bool `json:"kubeletInsecure,omitempty"` - // ArtifactRepository contains the default location of an artifact repository for container artifacts ArtifactRepository wfv1.ArtifactRepository `json:"artifactRepository,omitempty"` diff --git a/docs/workflow-controller-configmap.yaml b/docs/workflow-controller-configmap.yaml index b16a165f9169..01fc661f6a74 100644 --- a/docs/workflow-controller-configmap.yaml +++ b/docs/workflow-controller-configmap.yaml @@ -172,7 +172,7 @@ data: # (available v2.4-v3.3) dockerSockPath: /var/someplace/else/docker.sock - # kubelet port when using kubelet executor (default: 10250) + # kubelet port when using kubelet executor (default: 10250) (kubelet executor will be deprecated use emissary instead) kubeletPort: 10250 # disable the TLS verification of the kubelet executor (default: false) diff --git a/docs/workflow-executors.md b/docs/workflow-executors.md index a0d276a3dbfc..6b42ad4d495e 100644 --- a/docs/workflow-executors.md +++ b/docs/workflow-executors.md @@ -67,6 +67,8 @@ The emissary will exit with code 64 if it fails. This may indicate a bug in the ## Kubelet (kubelet) +⚠️Deprecated. + * Secure * No `privileged` access * Cannot escape the privileges of the pod's service account diff --git a/test/e2e/argo_server_test.go b/test/e2e/argo_server_test.go index 0ab0e1cda3ce..73bf4e2188f3 100644 --- a/test/e2e/argo_server_test.go +++ b/test/e2e/argo_server_test.go @@ -854,7 +854,6 @@ func (s *ArgoServerSuite) TestWorkflowService() { }) s.Run("Terminate", func() { - s.Need(fixtures.None(fixtures.Kubelet)) s.e().PUT("/api/v1/workflows/argo/" + name + "/terminate"). Expect(). Status(200) @@ -873,7 +872,6 @@ func (s *ArgoServerSuite) TestWorkflowService() { }) s.Run("Resubmit", func() { - s.Need(fixtures.BaseLayerArtifacts) s.e().PUT("/api/v1/workflows/argo/" + name + "/resubmit"). WithBytes([]byte(`{"memoized": true}`)). Expect(). @@ -1347,7 +1345,6 @@ spec: }) s.Run("Resubmit", func() { - s.Need(fixtures.BaseLayerArtifacts) s.e().PUT("/api/v1/archived-workflows/{uid}/resubmit", uid). WithBytes([]byte(`{"namespace": "argo", "memoized": false}`)). Expect(). diff --git a/test/e2e/artifacts_test.go b/test/e2e/artifacts_test.go index dcfab31cd512..72cc7887d06f 100644 --- a/test/e2e/artifacts_test.go +++ b/test/e2e/artifacts_test.go @@ -36,7 +36,6 @@ func (s *ArtifactsSuite) TestOutputOnMount() { } func (s *ArtifactsSuite) TestOutputOnInput() { - s.Need(fixtures.BaseLayerArtifacts) // I believe this would work on both K8S and Kubelet, but validation does not allow it s.Given(). Workflow("@testdata/output-on-input-workflow.yaml"). When(). @@ -45,7 +44,6 @@ func (s *ArtifactsSuite) TestOutputOnInput() { } func (s *ArtifactsSuite) TestArtifactPassing() { - s.Need(fixtures.BaseLayerArtifacts) s.Given(). Workflow("@smoke/artifact-passing.yaml"). When(). @@ -54,7 +52,6 @@ func (s *ArtifactsSuite) TestArtifactPassing() { } func (s *ArtifactsSuite) TestDefaultParameterOutputs() { - s.Need(fixtures.BaseLayerArtifacts) s.Given(). Workflow(` apiVersion: argoproj.io/v1alpha1 @@ -107,7 +104,6 @@ spec: } func (s *ArtifactsSuite) TestSameInputOutputPathOptionalArtifact() { - s.Need(fixtures.BaseLayerArtifacts) s.Given(). Workflow("@testdata/same-input-output-path-optional.yaml"). When(). @@ -146,7 +142,6 @@ func (s *ArtifactsSuite) TestMainLog() { } }) }) - s.Need(fixtures.None(fixtures.Kubelet)) s.Run("ActiveDeadlineSeconds", func() { s.Given(). Workflow("@expectedfailures/timeouts-step.yaml"). diff --git a/test/e2e/fixtures/needs.go b/test/e2e/fixtures/needs.go index 8a3bdf7b36f6..772914c26542 100644 --- a/test/e2e/fixtures/needs.go +++ b/test/e2e/fixtures/needs.go @@ -10,12 +10,7 @@ var ( CI Need = func(s *E2ESuite) (bool, string) { return os.Getenv("CI") != "", "CI" } - BaseLayerArtifacts Need = func(s *E2ESuite) (bool, string) { - met, _ := None(Kubelet)(s) - return met, "base layer artifact support" - } Emissary = Executor("emissary") - Kubelet = Executor("kubelet") PNS = Executor("pns") ) diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index 6811874fb620..d9f311dcb4ab 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -24,7 +24,6 @@ type FunctionalSuite struct { } func (s *FunctionalSuite) TestArchiveStrategies() { - s.Need(fixtures.BaseLayerArtifacts) s.Given(). Workflow(`@testdata/archive-strategies.yaml`). When(). @@ -388,7 +387,6 @@ func (s *FunctionalSuite) TestEventOnPVCFail() { } func (s *FunctionalSuite) TestArtifactRepositoryRef() { - s.Need(fixtures.BaseLayerArtifacts) s.Given(). Workflow("@testdata/artifact-repository-ref.yaml"). When(). @@ -536,7 +534,6 @@ spec: } func (s *FunctionalSuite) TestParameterAggregation() { - s.Need(fixtures.BaseLayerArtifacts) s.Given(). Workflow("@functional/param-aggregation.yaml"). When(). @@ -860,7 +857,6 @@ spec: } func (s *FunctionalSuite) TestOutputArtifactS3BucketCreationEnabled() { - s.Need(fixtures.BaseLayerArtifacts) s.Given(). Workflow("@testdata/output-artifact-with-s3-bucket-creation-enabled.yaml"). When(). diff --git a/test/e2e/run_as_not_root_test.go b/test/e2e/run_as_not_root_test.go index 989fa0fa8588..10a60d0f3893 100644 --- a/test/e2e/run_as_not_root_test.go +++ b/test/e2e/run_as_not_root_test.go @@ -24,7 +24,6 @@ func (s *RunAsNonRootSuite) TestRunAsNonRootWorkflow() { } func (s *RunAsNonRootSuite) TestRunAsNonRootWithOutputParams() { - s.Need(fixtures.None(fixtures.Kubelet)) s.Given(). Workflow("@smoke/runasnonroot-output-params-pipeline.yaml"). When(). diff --git a/test/e2e/signals_test.go b/test/e2e/signals_test.go index 255ec2859d6c..ec241a799403 100644 --- a/test/e2e/signals_test.go +++ b/test/e2e/signals_test.go @@ -24,12 +24,6 @@ type SignalsSuite struct { fixtures.E2ESuite } -func (s *SignalsSuite) SetupSuite() { - s.E2ESuite.SetupSuite() - // Because k8ssapi and kubelet execute `sh -c 'kill 15 1'` to they do not work. - s.Need(fixtures.None(fixtures.Kubelet)) -} - func (s *SignalsSuite) TestStopBehavior() { s.Given(). Workflow("@functional/stop-terminate.yaml"). diff --git a/workflow/common/common.go b/workflow/common/common.go index e386f684f7d0..594720c584b2 100644 --- a/workflow/common/common.go +++ b/workflow/common/common.go @@ -150,9 +150,6 @@ const ( // EnvAgentPatchRate is the rate that the Argo Agent will patch the Workflow TaskSet EnvAgentPatchRate = "ARGO_AGENT_PATCH_RATE" - // ContainerRuntimeExecutorKubelet to use the kubelet as container runtime executor - ContainerRuntimeExecutorKubelet = "kubelet" - // ContainerRuntimeExecutorPNS indicates to use process namespace sharing as the container runtime executor ContainerRuntimeExecutorPNS = "pns" diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 5372f0e13c94..c091421af595 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -7512,11 +7512,6 @@ func TestGetContainerRuntimeExecutor(t *testing.T) { executor := controller.GetContainerRuntimeExecutor(labels.Set{}) assert.Equal(t, common.ContainerRuntimeExecutorEmissary, executor) }) - t.Run("CLIParameter", func(t *testing.T) { - controller.containerRuntimeExecutor = common.ContainerRuntimeExecutorKubelet - executor := controller.GetContainerRuntimeExecutor(labels.Set{}) - assert.Equal(t, common.ContainerRuntimeExecutorKubelet, executor) - }) controller.Config.ContainerRuntimeExecutor = "pns" controller.Config.ContainerRuntimeExecutors = config.ContainerRuntimeExecutors{ { diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index f7a3f183bade..696e1af5a515 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -601,27 +601,6 @@ func (woc *wfOperationCtx) createEnvVars() []apiv1.EnvVar { if woc.controller.Config.Executor != nil { execEnvVars = append(execEnvVars, woc.controller.Config.Executor.Env...) } - switch woc.getContainerRuntimeExecutor() { - case common.ContainerRuntimeExecutorKubelet: - execEnvVars = append(execEnvVars, - apiv1.EnvVar{ - Name: common.EnvVarDownwardAPINodeIP, - ValueFrom: &apiv1.EnvVarSource{ - FieldRef: &apiv1.ObjectFieldSelector{ - FieldPath: "status.hostIP", - }, - }, - }, - apiv1.EnvVar{ - Name: common.EnvVarKubeletPort, - Value: strconv.Itoa(woc.controller.Config.KubeletPort), - }, - apiv1.EnvVar{ - Name: common.EnvVarKubeletInsecure, - Value: strconv.FormatBool(woc.controller.Config.KubeletInsecure), - }, - ) - } return execEnvVars } diff --git a/workflow/controller/workflowpod_test.go b/workflow/controller/workflowpod_test.go index c4aba5725dab..28338e350e18 100644 --- a/workflow/controller/workflowpod_test.go +++ b/workflow/controller/workflowpod_test.go @@ -715,30 +715,6 @@ func TestVolumeAndVolumeMounts(t *testing.T) { }, } - // For Kubelet executor - t.Run("Kubelet", func(t *testing.T) { - ctx := context.Background() - woc := newWoc() - woc.volumes = volumes - woc.execWf.Spec.Templates[0].Container.VolumeMounts = volumeMounts - woc.controller.Config.ContainerRuntimeExecutor = common.ContainerRuntimeExecutorKubelet - - tmplCtx, err := woc.createTemplateContext(wfv1.ResourceScopeLocal, "") - assert.NoError(t, err) - _, err = woc.executeContainer(ctx, woc.execWf.Spec.Entrypoint, tmplCtx.GetTemplateScope(), &woc.execWf.Spec.Templates[0], &wfv1.WorkflowStep{}, &executeTemplateOpts{}) - assert.NoError(t, err) - pods, err := listPods(woc) - assert.NoError(t, err) - assert.Len(t, pods.Items, 1) - pod := pods.Items[0] - assert.Equal(t, 2, len(pod.Spec.Volumes)) - assert.Equal(t, "var-run-argo", pod.Spec.Volumes[0].Name) - assert.Equal(t, "volume-name", pod.Spec.Volumes[1].Name) - assert.Equal(t, 2, len(pod.Spec.Containers[1].VolumeMounts)) - assert.Equal(t, "volume-name", pod.Spec.Containers[1].VolumeMounts[0].Name) - assert.Equal(t, "var-run-argo", pod.Spec.Containers[1].VolumeMounts[1].Name) - }) - // For emissary executor t.Run("Emissary", func(t *testing.T) { ctx := context.Background() diff --git a/workflow/executor/executor.go b/workflow/executor/executor.go index 839710eb7dc5..5eeb465982b3 100644 --- a/workflow/executor/executor.go +++ b/workflow/executor/executor.go @@ -507,12 +507,6 @@ func (we *WorkflowExecutor) SaveParameters(ctx context.Context) error { var output *wfv1.AnyString if we.isBaseImagePath(param.ValueFrom.Path) { - executorType := os.Getenv(common.EnvVarContainerRuntimeExecutor) - if executorType == common.ContainerRuntimeExecutorKubelet { - log.Infof("Copying output parameter %s from base image layer %s is not supported for kubelet executor. "+ - "Consider using an emptyDir volume: https://argoproj.github.io/argo-workflows/empty-dir/.", param.Name, param.ValueFrom.Path) - continue - } log.Infof("Copying %s from base image layer", param.ValueFrom.Path) fileContents, err := we.RuntimeExecutor.GetFileContents(common.MainContainerName, param.ValueFrom.Path) if err != nil { diff --git a/workflow/executor/kubelet/client.go b/workflow/executor/kubelet/client.go deleted file mode 100644 index 4ec97754265b..000000000000 --- a/workflow/executor/kubelet/client.go +++ /dev/null @@ -1,305 +0,0 @@ -package kubelet - -import ( - "bytes" - "context" - "crypto/tls" - "crypto/x509" - "encoding/json" - "fmt" - "io" - "io/ioutil" - "net/http" - "net/url" - "os" - "strconv" - "syscall" - "time" - - "github.com/gorilla/websocket" - log "github.com/sirupsen/logrus" - corev1 "k8s.io/api/core/v1" - - "github.com/argoproj/argo-workflows/v3/errors" - "github.com/argoproj/argo-workflows/v3/workflow/common" - execcommon "github.com/argoproj/argo-workflows/v3/workflow/executor/common" -) - -const ( - readWSResponseTimeout = time.Minute * 1 -) - -type kubeletClient struct { - httpClient *http.Client - httpHeader http.Header - websocketDialer *websocket.Dialer - - // kubeletEndpoint is host:port without any scheme like: - // - 127.0.0.1:10250 - // - my-host.com:10250 - kubeletEndpoint string - namespace string - podName string -} - -var _ execcommon.KubernetesClientInterface = &kubeletClient{} - -func newKubeletClient(namespace, podName string) (*kubeletClient, error) { - kubeletHost := os.Getenv(common.EnvVarDownwardAPINodeIP) - if kubeletHost == "" { - return nil, fmt.Errorf("empty envvar %s", common.EnvVarDownwardAPINodeIP) - } - kubeletPort, _ := strconv.Atoi(os.Getenv(common.EnvVarKubeletPort)) - if kubeletPort == 0 { - kubeletPort = 10250 - log.Infof("Non configured envvar %s, defaulting the kubelet port to %d", common.EnvVarKubeletPort, kubeletPort) - } - b, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token") - if err != nil { - return nil, errors.InternalWrapError(err) - } - bearerToken := string(b) - - // G402: TLS MinVersion too low. (gosec) - tlsConfig := &tls.Config{} //nolint:gosec - if os.Getenv(common.EnvVarKubeletInsecure) == "true" { - log.Warningf("Using a kubelet client with insecure options") - tlsConfig.InsecureSkipVerify = true - } else { - log.Warningf("Loading service account ca.crt as certificate authority to reach the kubelet api") - caCert, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") - if err != nil { - return nil, errors.InternalWrapError(err) - } - caCertPool := x509.NewCertPool() - if !caCertPool.AppendCertsFromPEM(caCert) { - return nil, errors.InternalWrapError(fmt.Errorf("fail to load certificate authority: %s", string(caCert))) - } - tlsConfig.RootCAs = caCertPool - } - return &kubeletClient{ - httpClient: &http.Client{ - Transport: &http.Transport{TLSClientConfig: tlsConfig}, - Timeout: time.Second * 60, - }, - httpHeader: http.Header{ - "Authorization": {"bearer " + bearerToken}, - }, - websocketDialer: &websocket.Dialer{ - TLSClientConfig: tlsConfig, - HandshakeTimeout: time.Second * 5, - }, - kubeletEndpoint: fmt.Sprintf("%s:%d", kubeletHost, kubeletPort), - namespace: namespace, - podName: podName, - }, nil -} - -func checkHTTPErr(resp *http.Response) error { - if resp.StatusCode != http.StatusOK { - b, _ := ioutil.ReadAll(resp.Body) - _ = resp.Body.Close() - return errors.InternalWrapError(fmt.Errorf("unexpected non 200 status code: %d, body: %s", resp.StatusCode, string(b))) - } - return nil -} - -func (k *kubeletClient) getPod() (*corev1.Pod, error) { - u, err := url.ParseRequestURI(fmt.Sprintf("https://%s/pods", k.kubeletEndpoint)) - if err != nil { - return nil, errors.InternalWrapError(err) - } - resp, err := k.httpClient.Do(&http.Request{ - Method: http.MethodGet, - Header: k.httpHeader, - URL: u, - }) - if err != nil { - return nil, errors.InternalWrapError(err) - } - log.Infof("List pod %d (kubelet)", resp.StatusCode) // log that we are listing pods from Kubelet - err = checkHTTPErr(resp) - if err != nil { - return nil, err - } - podListDecoder := json.NewDecoder(resp.Body) - podList := &corev1.PodList{} - err = podListDecoder.Decode(podList) - if err != nil { - _ = resp.Body.Close() - return nil, errors.InternalWrapError(err) - } - if err := resp.Body.Close(); err != nil { - return nil, err - } - for _, item := range podList.Items { - if item.Namespace == k.namespace && item.Name == k.podName { - return &item, nil - } - } - return nil, fmt.Errorf("pod %q is not found in the pod list", k.podName) -} - -func (k *kubeletClient) GetLogStream(containerName string) (io.ReadCloser, error) { - resp, err := k.doRequestLogs(k.namespace, k.podName, containerName) - if err != nil { - return nil, err - } - return resp.Body, nil -} - -func (k *kubeletClient) doRequestLogs(namespace, podName, containerName string) (*http.Response, error) { - u, err := url.ParseRequestURI(fmt.Sprintf("https://%s/containerLogs/%s/%s/%s", k.kubeletEndpoint, namespace, podName, containerName)) - if err != nil { - return nil, errors.InternalWrapError(err) - } - resp, err := k.httpClient.Do(&http.Request{ - Method: http.MethodGet, - Header: k.httpHeader, - URL: u, - }) - if err != nil { - return nil, err - } - err = checkHTTPErr(resp) - if err != nil { - return nil, err - } - return resp, nil -} - -func (k *kubeletClient) GetContainerStatus(ctx context.Context, containerName string) (*corev1.Pod, *corev1.ContainerStatus, error) { - pod, containerStatus, err := k.GetContainerStatuses(ctx) - if err != nil { - return nil, nil, err - } - for _, s := range containerStatus { - if containerName == s.Name { - return pod, &s, nil - } - } - return nil, nil, fmt.Errorf("container %q is not found in the pod", containerName) -} - -func (k *kubeletClient) GetContainerStatuses(ctx context.Context) (*corev1.Pod, []corev1.ContainerStatus, error) { - pod, err := k.getPod() - if err != nil { - return nil, nil, errors.InternalWrapError(err) - } - return pod, pod.Status.ContainerStatuses, nil -} - -func (k *kubeletClient) exec(u *url.URL) (*url.URL, error) { - _, resp, err := k.websocketDialer.Dial(u.String(), k.httpHeader) - if resp == nil { - return nil, err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusFound { - return nil, err - } - log.Infof("Exec for %s with status code: %d", u.String(), resp.StatusCode) - redirect, err := url.Parse(resp.Header.Get("Location")) - if err != nil { - return nil, err - } - redirect.Scheme = "ws" - log.Infof("Exec for %s returns URL: %s", u.String(), redirect.String()) - return redirect, nil -} - -func (k *kubeletClient) readFileContents(u *url.URL) (*bytes.Buffer, error) { - conn, resp, err := k.websocketDialer.Dial(u.String(), nil) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - timeout := time.NewTimer(readWSResponseTimeout) - defer timeout.Stop() - - buf := &bytes.Buffer{} - for { - select { - case <-timeout.C: - return nil, fmt.Errorf("timeout of %s reached while reading file contents", readWSResponseTimeout) - - default: - _, b, err := conn.ReadMessage() - if err != nil { - if websocket.IsCloseError(err, websocket.CloseNormalClosure) { - return buf, conn.Close() - } - log.Errorf("Unexpected error while reading messages on WS: %v", err) - _ = conn.Close() - return buf, err - } - if len(b) < 1 { - continue - } - i := 0 - // skip SOH (start of heading) - if int(b[0]) == 1 { - i = 1 - } - _, err = buf.Write(b[i:]) - if err != nil { - log.Errorf("Unexpected error while reading messages on WS: %v", err) - _ = conn.Close() - return nil, err - } - } - } -} - -// createArchive exec in the given containerName and create a tarball of the given sourcePath. Works with directory -func (k *kubeletClient) CreateArchive(ctx context.Context, containerName, sourcePath string) (*bytes.Buffer, error) { - return k.getCommandOutput(containerName, fmt.Sprintf("command=tar&command=-cf&command=-&command=%s&output=1", sourcePath)) -} - -// GetFileContents exec in the given containerName and cat the given sourcePath. -func (k *kubeletClient) GetFileContents(containerName, sourcePath string) (*bytes.Buffer, error) { - return k.getCommandOutput(containerName, fmt.Sprintf("command=cat&command=%s&output=1", sourcePath)) -} - -func (k *kubeletClient) getCommandOutput(containerName, command string) (*bytes.Buffer, error) { - pod, container, err := k.GetContainerStatus(context.Background(), containerName) - if err != nil { - return nil, errors.InternalWrapError(err) - } - if container.State.Terminated != nil { - return nil, fmt.Errorf("container %q is terminated: %v", containerName, container.State.Terminated.String()) - } - u, err := url.ParseRequestURI(fmt.Sprintf("wss://%s/exec/%s/%s/%s?%s", k.kubeletEndpoint, pod.Namespace, pod.Name, containerName, command)) - if err != nil { - return nil, err - } - u, err = k.exec(u) - if err != nil { - return nil, err - } - return k.readFileContents(u) -} - -// WaitForTermination of the given container, set the timeout to 0 to discard it -func (k *kubeletClient) WaitForTermination(ctx context.Context, containerNames []string, timeout time.Duration) error { - return execcommon.WaitForTermination(ctx, k, containerNames, timeout) -} - -func (k *kubeletClient) KillContainer(pod *corev1.Pod, container *corev1.ContainerStatus, sig syscall.Signal) error { - u, err := url.ParseRequestURI(fmt.Sprintf("wss://%s/exec/%s/%s/%s?command=/bin/sh&&command=-c&command=kill+-%d+1&output=1&error=1", k.kubeletEndpoint, pod.Namespace, pod.Name, container.Name, sig)) - if err != nil { - return errors.InternalWrapError(err) - } - _, err = k.exec(u) - return err -} - -func (k *kubeletClient) KillGracefully(ctx context.Context, containerNames []string, terminationGracePeriodDuration time.Duration) error { - return execcommon.KillGracefully(ctx, k, containerNames, terminationGracePeriodDuration) -} - -func (k *kubeletClient) CopyArchive(ctx context.Context, containerName, sourcePath, destPath string) error { - return execcommon.CopyArchive(ctx, k, containerName, sourcePath, destPath) -} diff --git a/workflow/executor/kubelet/kubelet.go b/workflow/executor/kubelet/kubelet.go deleted file mode 100644 index 818da7097ef6..000000000000 --- a/workflow/executor/kubelet/kubelet.go +++ /dev/null @@ -1,62 +0,0 @@ -package kubelet - -import ( - "context" - "io" - "time" - - log "github.com/sirupsen/logrus" - - "github.com/argoproj/argo-workflows/v3/errors" -) - -type KubeletExecutor struct { - cli *kubeletClient -} - -func NewKubeletExecutor(namespace, podName string) (*KubeletExecutor, error) { - cli, err := newKubeletClient(namespace, podName) - if err != nil { - return nil, errors.InternalWrapError(err) - } - return &KubeletExecutor{ - cli: cli, - }, nil -} - -func (k *KubeletExecutor) GetFileContents(containerName string, sourcePath string) (string, error) { - return "", errors.Errorf(errors.CodeNotImplemented, "GetFileContents() is not implemented in the kubelet executor.") -} - -func (k *KubeletExecutor) CopyFile(containerName string, sourcePath string, destPath string, compressionLevel int) error { - return errors.Errorf(errors.CodeNotImplemented, "CopyFile() is not implemented in the kubelet executor.") -} - -func (k *KubeletExecutor) GetOutputStream(ctx context.Context, containerName string, combinedOutput bool) (io.ReadCloser, error) { - if !combinedOutput { - log.Warn("non combined output unsupported") - } - return k.cli.GetLogStream(containerName) -} - -// Wait for the container to complete -func (k *KubeletExecutor) Wait(ctx context.Context, containerNames []string) error { - return k.cli.WaitForTermination(ctx, containerNames, 0) -} - -// Kill kills a list of containers first with a SIGTERM then with a SIGKILL after a grace period -func (k *KubeletExecutor) Kill(ctx context.Context, containerNames []string, terminationGracePeriodDuration time.Duration) error { - return k.cli.KillGracefully(ctx, containerNames, terminationGracePeriodDuration) -} - -func (k *KubeletExecutor) ListContainerNames(ctx context.Context) ([]string, error) { - pod, err := k.cli.getPod() - if err != nil { - return nil, err - } - var containerNames []string - for _, c := range pod.Status.ContainerStatuses { - containerNames = append(containerNames, c.Name) - } - return containerNames, nil -} diff --git a/workflow/validate/validate.go b/workflow/validate/validate.go index ad4524d56783..e92776aaeb08 100644 --- a/workflow/validate/validate.go +++ b/workflow/validate/validate.go @@ -35,8 +35,7 @@ type ValidateOpts struct { Lint bool // ContainerRuntimeExecutor will trigger additional validation checks specific to different - // types of executors. For example, the inability of kubelet/k8s executors to copy artifacts - // out of the base image layer. + // types of executors. ContainerRuntimeExecutor string // IgnoreEntrypoint indicates to skip/ignore the EntryPoint validation on workflow spec. @@ -1011,24 +1010,6 @@ func (ctx *templateValidationCtx) validateBaseImageOutputs(tmpl *wfv1.Template) } } } - case common.ContainerRuntimeExecutorKubelet: - // for kubelet/k8s fail validation if we detect artifact is copied from base image layer - errMsg := fmt.Sprintf("%s executor does not support outputs from base image layer. Use an emptyDir: https://argoproj.github.io/argo-workflows/empty-dir/", ctx.ContainerRuntimeExecutor) - for _, out := range tmpl.Outputs.Artifacts { - if common.FindOverlappingVolume(tmpl, out.Path) == nil { - return errors.Errorf(errors.CodeBadRequest, "templates.%s.outputs.artifacts.%s: %s", tmpl.Name, out.Name, errMsg) - } - } - for _, out := range tmpl.Outputs.Parameters { - if out.ValueFrom == nil { - continue - } - if out.ValueFrom.Path != "" { - if common.FindOverlappingVolume(tmpl, out.ValueFrom.Path) == nil { - return errors.Errorf(errors.CodeBadRequest, "templates.%s.outputs.parameters.%s: %s", tmpl.Name, out.Name, errMsg) - } - } - } } return nil } diff --git a/workflow/validate/validate_test.go b/workflow/validate/validate_test.go index 47d8a53e7dc1..f6f68e73b418 100644 --- a/workflow/validate/validate_test.go +++ b/workflow/validate/validate_test.go @@ -1581,41 +1581,6 @@ spec: path: /mnt ` -var nonPathOutputParameter = ` -apiVersion: argoproj.io/v1alpha1 -kind: Workflow -metadata: - generateName: non-path-out-param- -spec: - entrypoint: non-path-out-param - templates: - - name: non-path-out-param - steps: - - - name: non-path-resource-out-param - template: non-path-resource-out-param - outputs: - parameters: - - name: param - valueFrom: - parameter: "{{steps.non-path-resource-out-param.outputs.parameters.json}}" - - name: non-path-resource-out-param - resource: - action: create - manifest: | - apiVersion: v1 - kind: ConfigMap - metadata: - name: whalesay-cm - outputs: - parameters: - - name: json - valueFrom: - jsonPath: '{.metadata.name}' - - name: jqfliter - valueFrom: - jqFilter: . -` - // TestBaseImageOutputVerify verifies we error when we detect the condition when the container // runtime executor doesn't support output artifacts from a base image layer, and fails validation func TestBaseImageOutputVerify(t *testing.T) { @@ -1623,20 +1588,10 @@ func TestBaseImageOutputVerify(t *testing.T) { wfBaseOutParam := unmarshalWf(baseImageOutputParameter) wfEmptyDirOutArt := unmarshalWf(volumeMountOutputArtifact) wfBaseWithEmptyDirOutArt := unmarshalWf(baseImageDirWithEmptyDirOutputArtifact) - wfNonPathOutputParam := unmarshalWf(nonPathOutputParameter) var err error - for _, executor := range []string{common.ContainerRuntimeExecutorKubelet, common.ContainerRuntimeExecutorPNS, common.ContainerRuntimeExecutorEmissary, ""} { + for _, executor := range []string{common.ContainerRuntimeExecutorPNS, common.ContainerRuntimeExecutorEmissary, ""} { switch executor { - case common.ContainerRuntimeExecutorKubelet: - _, err = ValidateWorkflow(wftmplGetter, cwftmplGetter, wfBaseOutArt, ValidateOpts{ContainerRuntimeExecutor: executor}) - assert.Error(t, err) - _, err = ValidateWorkflow(wftmplGetter, cwftmplGetter, wfBaseOutParam, ValidateOpts{ContainerRuntimeExecutor: executor}) - assert.Error(t, err) - _, err = ValidateWorkflow(wftmplGetter, cwftmplGetter, wfBaseWithEmptyDirOutArt, ValidateOpts{ContainerRuntimeExecutor: executor}) - assert.Error(t, err) - _, err = ValidateWorkflow(wftmplGetter, cwftmplGetter, wfNonPathOutputParam, ValidateOpts{ContainerRuntimeExecutor: executor}) - assert.NoError(t, err) case common.ContainerRuntimeExecutorPNS: _, err = ValidateWorkflow(wftmplGetter, cwftmplGetter, wfBaseOutArt, ValidateOpts{ContainerRuntimeExecutor: executor}) assert.NoError(t, err) @@ -2520,10 +2475,6 @@ func TestDagAndStepLevelOutputArtifactsForDiffExecutor(t *testing.T) { _, err := validateWithOptions(dagAndStepLevelOutputArtifacts, ValidateOpts{ContainerRuntimeExecutor: common.ContainerRuntimeExecutorPNS}) assert.NoError(t, err) }) - t.Run("KubeletExecutor", func(t *testing.T) { - _, err := validateWithOptions(dagAndStepLevelOutputArtifacts, ValidateOpts{ContainerRuntimeExecutor: common.ContainerRuntimeExecutorKubelet}) - assert.NoError(t, err) - }) } var testWorkflowTemplateLabels = `