From d7716b3538213a4e89024e59294d1b9dfeae21ea Mon Sep 17 00:00:00 2001 From: Andrea Frittoli Date: Thu, 2 Jul 2020 15:38:51 +0100 Subject: [PATCH] Add pipeline run support for cloud events Replace the pipeline run controller own config store with the shared one used by the taskrun controller too. The pipeline run config store is only useful to the artifact storage, however the artifact storage loads the config by fetching the configmap via the kube client, so it does not use the config store. Attaching the shared config store to the controller, along with the cloud events client, enables the pipeline run controller to start sending cloud events for all events where we send k8s events today (except for error ones). Add a reconciler unit test to verify that events are sent when the sink is configured. Drop reconciler/pipelinerun/config because it's not used. It was injected in the pipeline run controller before, but not used. We can add the store for artifact configs back in a different commit, but it wil have to be part of the shared store. --- docs/events.md | 19 ++- internal/builder/v1beta1/pipeline.go | 7 + pkg/reconciler/pipelinerun/config/store.go | 83 ------------ .../pipelinerun/config/store_test.go | 57 -------- .../testdata/config-artifact-bucket.yaml | 21 --- pkg/reconciler/pipelinerun/controller.go | 6 +- pkg/reconciler/pipelinerun/pipelinerun.go | 4 + .../pipelinerun/pipelinerun_test.go | 127 ++++++++++++++++++ pkg/reconciler/taskrun/controller.go | 1 - 9 files changed, 154 insertions(+), 171 deletions(-) delete mode 100644 pkg/reconciler/pipelinerun/config/store.go delete mode 100644 pkg/reconciler/pipelinerun/config/store_test.go delete mode 100644 pkg/reconciler/pipelinerun/config/testdata/config-artifact-bucket.yaml diff --git a/docs/events.md b/docs/events.md index 5280cc2bb77..0bcf333ddde 100644 --- a/docs/events.md +++ b/docs/events.md @@ -51,10 +51,15 @@ retrieving those events using the `kubectl describe` command. Tekton can also em When you [configure a sink](install.md#configuring-cloudevents-notifications), Tekton emits events as described in the table below. -Resource |Event |Event Type -:---------|:-------:|:---------------------------------------------------------- -`TaskRun` | `Started` | `dev.tekton.event.taskrun.started.v1` -`TaskRun` | `Running` | `dev.tekton.event.taskrun.runnning.v1` -`TaskRun` | `Condition Change while Running` | `dev.tekton.event.taskrun.unknown.v1` -`TaskRun` | `Succeed` | `dev.tekton.event.taskrun.successful.v1` -`TaskRun` | `Failed` | `dev.tekton.event.taskrun.failed.v1` +Resource |Event |Event Type +:-------------|:-------:|:---------------------------------------------------------- +`TaskRun` | `Started` | `dev.tekton.event.taskrun.started.v1` +`TaskRun` | `Running` | `dev.tekton.event.taskrun.runnning.v1` +`TaskRun` | `Condition Change while Running` | `dev.tekton.event.taskrun.unknown.v1` +`TaskRun` | `Succeed` | `dev.tekton.event.taskrun.successful.v1` +`TaskRun` | `Failed` | `dev.tekton.event.taskrun.failed.v1` +`PipelineRun` | `Started` | `dev.tekton.event.pipelinerun.started.v1` +`PipelineRun` | `Running` | `dev.tekton.event.pipelinerun.runnning.v1` +`PipelineRun` | `Condition Change while Running` | `dev.tekton.event.pipelinerun.unknown.v1` +`PipelineRun` | `Succeed` | `dev.tekton.event.pipelinerun.successful.v1` +`PipelineRun` | `Failed` | `dev.tekton.event.pipelinerun.failed.v1` diff --git a/internal/builder/v1beta1/pipeline.go b/internal/builder/v1beta1/pipeline.go index e2a1379c8da..54f0a1ca50c 100644 --- a/internal/builder/v1beta1/pipeline.go +++ b/internal/builder/v1beta1/pipeline.go @@ -361,6 +361,13 @@ func PipelineRunNamespace(namespace string) PipelineRunOp { } } +// PipelineRunSelfLink adds a SelfLink +func PipelineRunSelfLink(selflink string) PipelineRunOp { + return func(tr *v1beta1.PipelineRun) { + tr.ObjectMeta.SelfLink = selflink + } +} + // PipelineRunSpec sets the PipelineRunSpec, references Pipeline with specified name, to the PipelineRun. // Any number of PipelineRunSpec modifier can be passed to transform it. func PipelineRunSpec(name string, ops ...PipelineRunSpecOp) PipelineRunOp { diff --git a/pkg/reconciler/pipelinerun/config/store.go b/pkg/reconciler/pipelinerun/config/store.go deleted file mode 100644 index 7f46dd41e72..00000000000 --- a/pkg/reconciler/pipelinerun/config/store.go +++ /dev/null @@ -1,83 +0,0 @@ -/* -Copyright 2019 The Tekton Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package config - -import ( - "context" - - "github.com/tektoncd/pipeline/pkg/apis/pipeline" - "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1/storage" - "github.com/tektoncd/pipeline/pkg/artifacts" - "knative.dev/pkg/configmap" -) - -type cfgKey struct{} - -// +k8s:deepcopy-gen=false -type Config struct { - ArtifactBucket *storage.ArtifactBucket -} - -func FromContext(ctx context.Context) *Config { - return ctx.Value(cfgKey{}).(*Config) -} - -func ToContext(ctx context.Context, c *Config) context.Context { - return context.WithValue(ctx, cfgKey{}, c) -} - -// +k8s:deepcopy-gen=false -type Store struct { - *configmap.UntypedStore - - images pipeline.Images -} - -func NewStore(images pipeline.Images, logger configmap.Logger) *Store { - return &Store{ - UntypedStore: configmap.NewUntypedStore( - "pipelinerun", - logger, - configmap.Constructors{ - artifacts.GetBucketConfigName(): artifacts.NewArtifactBucketConfigFromConfigMap(images), - }, - ), - images: images, - } -} - -func (s *Store) ToContext(ctx context.Context) context.Context { - return ToContext(ctx, s.Load()) -} - -func (s *Store) Load() *Config { - ep := s.UntypedLoad(artifacts.GetBucketConfigName()) - if ep == nil { - return &Config{ - ArtifactBucket: &storage.ArtifactBucket{ - Location: "", - ShellImage: s.images.ShellImage, - GsutilImage: s.images.GsutilImage, - }, - } - } - - return &Config{ - ArtifactBucket: ep.(*storage.ArtifactBucket).DeepCopy(), - } - -} diff --git a/pkg/reconciler/pipelinerun/config/store_test.go b/pkg/reconciler/pipelinerun/config/store_test.go deleted file mode 100644 index 101e3b2991b..00000000000 --- a/pkg/reconciler/pipelinerun/config/store_test.go +++ /dev/null @@ -1,57 +0,0 @@ -/* -Copyright 2019 The Tekton Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package config - -import ( - "context" - "testing" - - "github.com/google/go-cmp/cmp" - "github.com/tektoncd/pipeline/pkg/apis/pipeline" - "github.com/tektoncd/pipeline/pkg/artifacts" - ttesting "github.com/tektoncd/pipeline/pkg/reconciler/testing" - "github.com/tektoncd/pipeline/test/diff" - - test "github.com/tektoncd/pipeline/pkg/reconciler/testing" -) - -func TestStoreLoadWithContext(t *testing.T) { - store := NewStore(pipeline.Images{}, ttesting.TestLogger(t)) - bucketConfig := test.ConfigMapFromTestFile(t, "config-artifact-bucket") - store.OnConfigChanged(bucketConfig) - - config := FromContext(store.ToContext(context.Background())) - - expected, _ := artifacts.NewArtifactBucketConfigFromConfigMap(pipeline.Images{})(bucketConfig) - if d := cmp.Diff(expected, config.ArtifactBucket); d != "" { - t.Errorf("Unexpected controller config %s", diff.PrintWantGot(d)) - } -} -func TestStoreImmutableConfig(t *testing.T) { - store := NewStore(pipeline.Images{}, ttesting.TestLogger(t)) - store.OnConfigChanged(test.ConfigMapFromTestFile(t, "config-artifact-bucket")) - - config := store.Load() - - config.ArtifactBucket.Location = "mutated" - - newConfig := store.Load() - - if newConfig.ArtifactBucket.Location == "mutated" { - t.Error("Controller config is not immutable") - } -} diff --git a/pkg/reconciler/pipelinerun/config/testdata/config-artifact-bucket.yaml b/pkg/reconciler/pipelinerun/config/testdata/config-artifact-bucket.yaml deleted file mode 100644 index 1838e28dfcb..00000000000 --- a/pkg/reconciler/pipelinerun/config/testdata/config-artifact-bucket.yaml +++ /dev/null @@ -1,21 +0,0 @@ -# Copyright 2019 The Tekton Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -apiVersion: v1 -kind: ConfigMap -metadata: - name: config-artifact-bucket - namespace: tekton-pipelines -data: - location: "gs://build-pipeline-fake-bucket" diff --git a/pkg/reconciler/pipelinerun/controller.go b/pkg/reconciler/pipelinerun/controller.go index 2f157930b1e..3be00815dd5 100644 --- a/pkg/reconciler/pipelinerun/controller.go +++ b/pkg/reconciler/pipelinerun/controller.go @@ -20,6 +20,7 @@ import ( "context" "time" + "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/pkg/apis/pipeline" pipelineclient "github.com/tektoncd/pipeline/pkg/client/injection/client" conditioninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/condition" @@ -31,7 +32,7 @@ import ( pipelinerunreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1beta1/pipelinerun" resourceinformer "github.com/tektoncd/pipeline/pkg/client/resource/injection/informers/resource/v1alpha1/pipelineresource" "github.com/tektoncd/pipeline/pkg/reconciler" - "github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/config" + cloudeventclient "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" "github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim" "k8s.io/client-go/tools/cache" kubeclient "knative.dev/pkg/client/injection/kube/client" @@ -72,11 +73,12 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex resourceLister: resourceInformer.Lister(), conditionLister: conditionInformer.Lister(), timeoutHandler: timeoutHandler, + cloudEventClient: cloudeventclient.Get(ctx), metrics: metrics, pvcHandler: volumeclaim.NewPVCHandler(kubeclientset, logger), } impl := pipelinerunreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options { - configStore := config.NewStore(images, logger.Named("config-store")) + configStore := config.NewStore(logger.Named("config-store")) configStore.WatchConfigs(cmw) return controller.Options{ AgentName: pipeline.PipelineRunControllerName, diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index 01ca1e31fe9..b30eb273955 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -40,6 +40,7 @@ import ( "github.com/tektoncd/pipeline/pkg/contexts" "github.com/tektoncd/pipeline/pkg/reconciler" "github.com/tektoncd/pipeline/pkg/reconciler/events" + "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" "github.com/tektoncd/pipeline/pkg/reconciler/pipeline/dag" "github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/resources" "github.com/tektoncd/pipeline/pkg/reconciler/taskrun" @@ -113,6 +114,7 @@ type Reconciler struct { clusterTaskLister listers.ClusterTaskLister resourceLister resourcelisters.PipelineResourceLister conditionLister listersv1alpha1.ConditionLister + cloudEventClient cloudevent.CEClient tracker tracker.Interface timeoutHandler *reconciler.TimeoutSet metrics *Recorder @@ -129,6 +131,8 @@ var ( // resource with the current status of the resource. func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun) pkgreconciler.Event { logger := logging.FromContext(ctx) + ctx = cloudevent.ToContext(ctx, c.cloudEventClient) + // Read the initial condition before := pr.Status.GetCondition(apis.ConditionSucceeded) diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index 50687bd3c6f..611b592bddb 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -28,6 +28,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" tbv1alpha1 "github.com/tektoncd/pipeline/internal/builder/v1alpha1" tb "github.com/tektoncd/pipeline/internal/builder/v1beta1" + "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" @@ -81,12 +82,37 @@ func getRunName(pr *v1beta1.PipelineRun) string { return strings.Join([]string{pr.Namespace, pr.Name}, "/") } +func ensureConfigurationConfigMapsExist(d *test.Data) { + var defaultsExists, featureFlagsExists bool + for _, cm := range d.ConfigMaps { + if cm.Name == config.GetDefaultsConfigName() { + defaultsExists = true + } + if cm.Name == config.GetFeatureFlagsConfigName() { + featureFlagsExists = true + } + } + if !defaultsExists { + d.ConfigMaps = append(d.ConfigMaps, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: config.GetDefaultsConfigName(), Namespace: system.GetNamespace()}, + Data: map[string]string{}, + }) + } + if !featureFlagsExists { + d.ConfigMaps = append(d.ConfigMaps, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.GetNamespace()}, + Data: map[string]string{}, + }) + } +} + // getPipelineRunController returns an instance of the PipelineRun controller/reconciler that has been seeded with // d, where d represents the state of the system (existing resources) needed for the test. func getPipelineRunController(t *testing.T, d test.Data) (test.Assets, func()) { //unregisterMetrics() ctx, _ := ttesting.SetupFakeContext(t) ctx, cancel := context.WithCancel(ctx) + ensureConfigurationConfigMapsExist(&d) c, informers := test.SeedTestData(t, ctx, d) configMapWatcher := configmap.NewInformedWatcher(c.Kube, system.GetNamespace()) @@ -95,6 +121,9 @@ func getPipelineRunController(t *testing.T, d test.Data) (test.Assets, func()) { if la, ok := ctl.Reconciler.(reconciler.LeaderAware); ok { la.Promote(reconciler.UniversalBucket(), func(reconciler.Bucket, types.NamespacedName) {}) } + if err := configMapWatcher.Start(ctx.Done()); err != nil { + t.Fatalf("error starting configmap watcher: %v", err) + } return test.Assets{ Logger: logging.FromContext(ctx), @@ -116,6 +145,11 @@ func checkEvents(t *testing.T, fr *record.FakeRecorder, testName string, wantEve return eventFromChannel(fr.Events, testName, wantEvents) } +func checkCloudEvents(t *testing.T, fce *cloudevent.FakeClient, testName string, wantEvents []string) error { + t.Helper() + return eventFromChannel(fce.Events, testName, wantEvents) +} + func eventFromChannel(c chan string, testName string, wantEvents []string) error { // We get events from a channel, so the timeout is here to avoid waiting // on the channel forever if fewer than expected events are received. @@ -3753,3 +3787,96 @@ func getTaskRunStatus(t string, status corev1.ConditionStatus) *v1beta1.Pipeline }, } } + +// TestReconcile_CloudEvents runs reconcile with a cloud event sink configured +// to ensure that events are sent in different cases +func TestReconcile_CloudEvents(t *testing.T) { + names.TestingSeed() + + prs := []*v1beta1.PipelineRun{ + tb.PipelineRun("test-pipelinerun", + tb.PipelineRunNamespace("foo"), + tb.PipelineRunSelfLink("/pipeline/1234"), + tb.PipelineRunSpec("test-pipeline"), + ), + } + ps := []*v1beta1.Pipeline{ + tb.Pipeline("test-pipeline", + tb.PipelineNamespace("foo"), + tb.PipelineSpec(tb.PipelineTask("test-1", "test-task")), + ), + } + ts := []*v1beta1.Task{ + tb.Task("test-task", tb.TaskNamespace("foo"), + tb.TaskSpec(tb.Step("foo", tb.StepName("simple-step"), + tb.StepCommand("/mycmd"), tb.StepEnvVar("foo", "bar"), + ))), + } + cms := []*corev1.ConfigMap{ + { + ObjectMeta: metav1.ObjectMeta{Name: config.GetDefaultsConfigName(), Namespace: system.GetNamespace()}, + Data: map[string]string{ + "default-cloud-events-sink": "http://synk:8080", + }, + }, + } + + d := test.Data{ + PipelineRuns: prs, + Pipelines: ps, + Tasks: ts, + ConfigMaps: cms, + } + + names.TestingSeed() + + testAssets, cancel := getPipelineRunController(t, d) + defer cancel() + c := testAssets.Controller + clients := testAssets.Clients + + if err := c.Reconciler.Reconcile(context.Background(), "foo/test-pipelinerun"); err != nil { + t.Fatalf("Error reconciling: %s", err) + } + + if len(clients.Pipeline.Actions()) == 0 { + t.Fatalf("Expected client to have been used to create a TaskRun but it wasn't") + } + + // Check that the PipelineRun was reconciled correctly + reconciledRun, err := clients.Pipeline.TektonV1beta1().PipelineRuns("foo").Get("test-pipelinerun", metav1.GetOptions{}) + if err != nil { + t.Fatalf("Somehow had error getting reconciled run out of fake client: %s", err) + } + + // This PipelineRun is in progress now and the status should reflect that + condition := reconciledRun.Status.GetCondition(apis.ConditionSucceeded) + if condition == nil || condition.Status != corev1.ConditionUnknown { + t.Errorf("Expected PipelineRun status to be in progress, but was %v", condition) + } + if condition != nil && condition.Reason != v1beta1.PipelineRunReasonRunning.String() { + t.Errorf("Expected reason %q but was %s", v1beta1.PipelineRunReasonRunning.String(), condition.Reason) + } + + if len(reconciledRun.Status.TaskRuns) != 1 { + t.Errorf("Expected PipelineRun status to include the TaskRun status items that can run immediately: %v", reconciledRun.Status.TaskRuns) + } + + wantEvents := []string{ + "Normal Started", + "Normal Running Tasks Completed: 0", + } + err = checkEvents(t, testAssets.Recorder, "reconcile-cloud-events", wantEvents) + if !(err == nil) { + t.Errorf(err.Error()) + } + wantCloudEvents := []string{ + `(?s)dev.tekton.event.pipelinerun.started.v1.*test-pipelinerun`, + `(?s)dev.tekton.event.pipelinerun.running.v1.*test-pipelinerun`, + } + ceClient := clients.CloudEvents.(cloudevent.FakeClient) + err = checkCloudEvents(t, &ceClient, "reconcile-cloud-events", wantCloudEvents) + if !(err == nil) { + t.Errorf(err.Error()) + } +} diff --git a/pkg/reconciler/taskrun/controller.go b/pkg/reconciler/taskrun/controller.go index 4dcbe6d2c7c..f05d53b9d30 100644 --- a/pkg/reconciler/taskrun/controller.go +++ b/pkg/reconciler/taskrun/controller.go @@ -67,7 +67,6 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex KubeClientSet: kubeclientset, PipelineClientSet: pipelineclientset, Images: images, - taskRunLister: taskRunInformer.Lister(), taskLister: taskInformer.Lister(), clusterTaskLister: clusterTaskInformer.Lister(),