Skip to content

Commit

Permalink
Create TaskRun from PipelineRun that runs a Task
Browse files Browse the repository at this point in the history
Added the Task reference to the TaskRun so that when a PipelineRun
creates a TaskRun, it actually executes! (For #61)

While running the integration test, noticed that the PipelineRuns
weren't getting reconciled quickly enough, but adding a tracker which
will invoke reconcile when the created TaskRuns are updated fixed this -
however it did still take > 1 minute to create 3 helloworld TaskRuns and
wait for them to complete, so since 3 was arbitrary, reduced to 2.

Also cleaned up the TaskRun controller a bit: using the Logger object on
the controller/reconciler itself, made the log messages a bit more
descriptive.
  • Loading branch information
bobcatfish committed Oct 11, 2018
1 parent 2509665 commit 3283605
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 34 deletions.
2 changes: 1 addition & 1 deletion cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func main() {
logger, atomicLevel := logging.NewLoggerFromConfig(loggingConfig, logging.ControllerLogKey)
defer logger.Sync()

logger.Info("Starting the Build Controller")
logger.Info("Starting the Pipeline Controller")

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
Expand Down
25 changes: 25 additions & 0 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ import (
"context"
"fmt"
"reflect"
"time"

"github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1"
"github.com/knative/build-pipeline/pkg/reconciler"
"github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/pipelinerun/resources"
"github.com/knative/pkg/controller"

"github.com/knative/pkg/tracker"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -61,6 +64,7 @@ type Reconciler struct {
pipelineLister listers.PipelineLister
taskRunLister listers.TaskRunLister
taskLister listers.TaskLister
tracker tracker.Interface
}

// Check that our Reconciler implements controller.Reconciler
Expand Down Expand Up @@ -91,6 +95,11 @@ func NewController(
UpdateFunc: controller.PassNew(impl.Enqueue),
DeleteFunc: impl.Enqueue,
})

r.tracker = tracker.New(impl.EnqueueKey, 30*time.Second)
taskRunInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: controller.PassNew(r.tracker.OnChanged),
})
return impl
}

Expand Down Expand Up @@ -118,6 +127,17 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
// Don't modify the informer's copy.
pr := original.DeepCopy()

taskRunRef := corev1.ObjectReference{
APIVersion: "build-pipeline.knative.dev/v1alpha1",
Kind: "TaskRun",
Namespace: pr.Namespace,
Name: pr.Name,
}
if err := c.tracker.Track(taskRunRef, pr); err != nil {
c.Logger.Errorf("Failed to create tracker for TaskRuns for PipelineRun %s: %v", pr.Name, err)
return err
}

// Reconcile this copy of the task run and then write back any status
// updates regardless of whether the reconciliation errored out.
err = c.reconcile(ctx, pr)
Expand Down Expand Up @@ -177,6 +197,11 @@ func (c *Reconciler) createTaskRun(t *v1alpha1.Task, trName string, pr *v1alpha1
*metav1.NewControllerRef(pr, groupVersionKind),
},
},
Spec: v1alpha1.TaskRunSpec{
TaskRef: v1alpha1.TaskRef{
Name: t.Name,
},
},
}
return c.PipelineClientSet.PipelineV1alpha1().TaskRuns(t.Namespace).Create(tr)
}
Expand Down
21 changes: 20 additions & 1 deletion pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ import (
informers "github.com/knative/build-pipeline/pkg/client/informers/externalversions"
informersv1alpha1 "github.com/knative/build-pipeline/pkg/client/informers/externalversions/pipeline/v1alpha1"
"github.com/knative/build-pipeline/pkg/reconciler"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
"github.com/knative/pkg/controller"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
fakekubeclientset "k8s.io/client-go/kubernetes/fake"
ktesting "k8s.io/client-go/testing"
)
Expand Down Expand Up @@ -78,6 +81,18 @@ func TestReconcile(t *testing.T) {
if len(client.Actions()) == 0 {
t.Fatalf("Expected client to have been used to create a TaskRun but it wasn't")
}

// Check that the PipelineRun was reconciled correctly
reconciledRun, err := client.Pipeline().PipelineRuns("foo").Get("test-pipeline-run-success", metav1.GetOptions{})
if err != nil {
t.Fatalf("Somehow had error getting reconciled run out of fake client: %s", err)
}
condition := reconciledRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded)
if condition == nil || condition.Status != corev1.ConditionUnknown {
t.Errorf("Expected PipelineRun status to be in progress, but was %s", condition)
}

// Check that the expected TaskRun was created
actual := client.Actions()[0].(ktesting.CreateAction).GetObject()
trueB := true
expectedTaskRun := &v1alpha1.TaskRun{
Expand All @@ -92,7 +107,11 @@ func TestReconcile(t *testing.T) {
BlockOwnerDeletion: &trueB,
}},
},
Spec: v1alpha1.TaskRunSpec{},
Spec: v1alpha1.TaskRunSpec{
TaskRef: v1alpha1.TaskRef{
Name: "unit-test-task",
},
},
}
if d := cmp.Diff(actual, expectedTaskRun); d != "" {
t.Errorf("expected to see TaskRun %v created. Diff %s", expectedTaskRun, d)
Expand Down
21 changes: 7 additions & 14 deletions pkg/reconciler/v1alpha1/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
buildinformers "github.com/knative/build/pkg/client/informers/externalversions/build/v1alpha1"
buildlisters "github.com/knative/build/pkg/client/listers/build/v1alpha1"
"github.com/knative/pkg/controller"
"github.com/knative/pkg/logging"
"github.com/knative/pkg/tracker"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -120,13 +119,12 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
c.Logger.Errorf("invalid resource key: %s", key)
return nil
}
logger := logging.FromContext(ctx)

// Get the Task Run resource with this namespace/name
original, err := c.taskRunLister.TaskRuns(namespace).Get(name)
if errors.IsNotFound(err) {
// The resource no longer exists, in which case we stop processing.
logger.Errorf("task run %q in work queue no longer exists", key)
c.Logger.Errorf("task run %q in work queue no longer exists", key)
return nil
} else if err != nil {
return err
Expand All @@ -143,7 +141,7 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
Name: tr.Name,
}
if err := c.tracker.Track(buildRef, tr); err != nil {
logger.Errorf("failed to create tracker for build %s for taskrun %s: %v", buildRef, tr.Name, err)
c.Logger.Errorf("failed to create tracker for build %s for taskrun %s: %v", buildRef, tr.Name, err)
return err
}

Expand All @@ -156,15 +154,13 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
} else if _, err := c.updateStatus(tr); err != nil {
logger.Warn("Failed to update taskRun status", zap.Error(err))
c.Logger.Warn("Failed to update taskRun status", zap.Error(err))
return err
}
return err
}

func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error {
logger := logging.FromContext(ctx)

haveBuild := false
// get build the same as the taskrun, this is the value we use for 1:1 mapping and retrieval
b, err := c.getBuild(tr.Namespace, tr.Name)
Expand All @@ -186,7 +182,7 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error

// taskrun has finished (as child build has finished and status is synced)
if len(tr.Status.Conditions) > 0 && tr.Status.Conditions[0].Status != corev1.ConditionUnknown {
logger.Infof("finished %s", tr.Name)
c.Logger.Infof("finished %s", tr.Name)
return nil
}

Expand All @@ -197,18 +193,17 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error
return err
}

// make build obj from task buildspec
logger.Infof("make build: %s", tr.Name)
c.Logger.Infof("Creating Build %s for TaskRun %s", tr.Name, tr.Name)
if b, err = c.makeBuild(t, tr); err != nil {
return fmt.Errorf("failed to create a build for taskrun: %v", err)
}
}

// sync build status with taskrun status
if len(b.Status.Conditions) > 0 {
logger.Infof("syncing taskrun conditions with build conditions %s", b.Status.Conditions[0])
c.Logger.Infof("Syncing TaskRun %s conditions with Build %s conditions %s", tr.Name, b.Name, b.Status.Conditions[0])
} else {
logger.Infof("syncing taskrun conditions with build conditions []")
c.Logger.Infof("Build %s has no conditions so nothing to update for TaskRun %s", b.Name, tr.Name)
}
tr.Status.Conditions = b.Status.Conditions
return nil
Expand All @@ -221,9 +216,7 @@ func (c *Reconciler) updateStatus(taskrun *v1alpha1.TaskRun) (*v1alpha1.TaskRun,
}
if !reflect.DeepEqual(taskrun.Status, newtaskrun.Status) {
newtaskrun.Status = taskrun.Status
// TODO: for CRD there's no updatestatus, so use normal update
return c.PipelineClientSet.PipelineV1alpha1().TaskRuns(taskrun.Namespace).Update(newtaskrun)
// return configClient.UpdateStatus(newtaskrun)
}
return newtaskrun, nil
}
Expand Down
12 changes: 4 additions & 8 deletions test/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
hwPipelineName = "helloworld-pipeline"
hwPipelineRunName = "helloworld-pipelinerun"
hwPipelineParamsName = "helloworld-pipelineparams"
hwPipelineTaskName1 = "helloworld-task-1"
hwPipelineTaskName2 = "helloworld-task-2"

hwContainerName = "helloworld-busybox"
taskOutput = "do you want to build a snowman"
Expand Down Expand Up @@ -87,19 +89,13 @@ func getHelloWorldPipeline(namespace string) *v1alpha1.Pipeline {
Spec: v1alpha1.PipelineSpec{
Tasks: []v1alpha1.PipelineTask{
v1alpha1.PipelineTask{
Name: "helloworld-task-1",
Name: hwPipelineTaskName1,
TaskRef: v1alpha1.TaskRef{
Name: hwTaskName,
},
},
v1alpha1.PipelineTask{
Name: "helloworld-task-2",
TaskRef: v1alpha1.TaskRef{
Name: hwTaskName,
},
},
v1alpha1.PipelineTask{
Name: "helloworld-task-3",
Name: hwPipelineTaskName2,
TaskRef: v1alpha1.TaskRef{
Name: hwTaskName,
},
Expand Down
2 changes: 1 addition & 1 deletion test/crd_checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
// we can get to that failure faster - knative/serving is currently using `6 * time.Minute`
// which we could use, or we could use timeouts more specific to what each `Task` is
// actually expected to do.
timeout = 60 * time.Second
timeout = 2 * time.Minute
)

// WaitForTaskRunState polls the status of the TaskRun called name from client every
Expand Down
14 changes: 8 additions & 6 deletions test/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
package test

import (
"strings"
"testing"

duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
Expand Down Expand Up @@ -64,17 +65,18 @@ func TestPipelineRun(t *testing.T) {

logger.Infof("Making sure the expected TaskRuns were created")
expectedTaskRuns := []string{
hwPipelineName + hwPipelineTaskName1,
hwPipelineName + hwPipelineTaskName2,
strings.Join([]string{hwPipelineRunName, hwPipelineTaskName1}, "-"),
strings.Join([]string{hwPipelineRunName, hwPipelineTaskName2}, "-"),
}
for _, runName := range expectedTaskRuns {
r, err := c.TaskRunClient.Get(runName, metav1.GetOptions{})
if err != nil {
t.Errorf("Couldn't get expected TaskRun %s: %s", runName, err)
}
c := r.Status.GetCondition(duckv1alpha1.ConditionSucceeded)
if c.Status != corev1.ConditionTrue {
t.Errorf("Expected TaskRun %s to have succeeded but Status is %s", runName, c.Status)
} else {
c := r.Status.GetCondition(duckv1alpha1.ConditionSucceeded)
if c.Status != corev1.ConditionTrue {
t.Errorf("Expected TaskRun %s to have succeeded but Status is %s", runName, c.Status)
}
}
}
}
8 changes: 5 additions & 3 deletions test/taskrun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"testing"

duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
knativetest "github.com/knative/pkg/test"
"github.com/knative/pkg/test/logging"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -53,13 +54,14 @@ func TestTaskRun(t *testing.T) {
t.Fatalf("Failed to create TaskRun `%s`: %s", hwTaskRunName, err)
}

// Verify status of TaskRun (wait for it)
logger.Infof("Waiting for TaskRun %s in namespace %s to complete", hwTaskRunName, namespace)
if err := WaitForTaskRunState(c, hwTaskRunName, func(tr *v1alpha1.TaskRun) (bool, error) {
if len(tr.Status.Conditions) > 0 && tr.Status.Conditions[0].Status == corev1.ConditionTrue {
c := tr.Status.GetCondition(duckv1alpha1.ConditionSucceeded)
if c != nil && c.Status == corev1.ConditionTrue {
return true, nil
}
return false, nil
}, "TaskRunCompleted"); err != nil {
}, "TaskRunSuccess"); err != nil {
t.Errorf("Error waiting for TaskRun %s to finish: %s", hwTaskRunName, err)
}

Expand Down

0 comments on commit 3283605

Please sign in to comment.