From 3853b16fbfaec32c4b04639c44f4f93ccbe5901d Mon Sep 17 00:00:00 2001 From: Vincent Demeester Date: Mon, 4 Mar 2019 13:15:20 +0100 Subject: [PATCH] =?UTF-8?q?dag:=20validate=20the=20DAG=20when=20validating?= =?UTF-8?q?=20the=20pipeline=20spec=20=F0=9F=9A=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Migrate the `DAG` struct and how it's build to the API - Update `dag` package to take `v1alpha1.DAG` as argument for Schedulable, … - Validate the DAG after `validateFrom`, and update `validateFrom` with no linear assumption Signed-off-by: Vincent Demeester --- pkg/apis/pipeline/v1alpha1/dag.go | 135 +++++++ pkg/apis/pipeline/v1alpha1/dag_test.go | 327 ++++++++++++++++ .../pipeline/v1alpha1/pipeline_validation.go | 51 ++- .../v1alpha1/pipeline_validation_test.go | 17 +- .../v1alpha1/zz_generated.deepcopy.go | 69 ++++ pkg/reconciler/v1alpha1/pipeline/dag/dag.go | 126 +----- .../v1alpha1/pipeline/dag/dag_test.go | 370 ++---------------- .../v1alpha1/pipelinerun/pipelinerun.go | 4 +- 8 files changed, 612 insertions(+), 487 deletions(-) create mode 100644 pkg/apis/pipeline/v1alpha1/dag.go create mode 100644 pkg/apis/pipeline/v1alpha1/dag_test.go diff --git a/pkg/apis/pipeline/v1alpha1/dag.go b/pkg/apis/pipeline/v1alpha1/dag.go new file mode 100644 index 00000000000..2e08c0b52eb --- /dev/null +++ b/pkg/apis/pipeline/v1alpha1/dag.go @@ -0,0 +1,135 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "fmt" + "strings" +) + +// Node represents a Task in a pipeline. +type Node struct { + // Task represent the PipelineTask in Pipeline + Task PipelineTask + // Prev represent all the Previous task Nodes for the current Task + Prev []*Node + // Next represent all the Next task Nodes for the current Task + Next []*Node +} + +// DAG represents the Pipeline DAG +type DAG struct { + //Nodes represent map of PipelineTask name to Node in Pipeline DAG + Nodes map[string]*Node +} + +// Returns an empty Pipeline DAG +func newDAG() *DAG { + return &DAG{Nodes: map[string]*Node{}} +} + +func (g *DAG) addPipelineTask(t PipelineTask) (*Node, error) { + if _, ok := g.Nodes[t.Name]; ok { + return nil, fmt.Errorf("duplicate pipeline taks") + } + newNode := &Node{ + Task: t, + } + g.Nodes[t.Name] = newNode + return newNode, nil +} + +func linkPipelineTasks(prev *Node, next *Node) error { + // Check for self cycle + if prev.Task.Name == next.Task.Name { + return fmt.Errorf("cycle detected; task %q depends on itself", next.Task.Name) + } + // Check if we are adding cycles. + visited := map[string]bool{prev.Task.Name: true, next.Task.Name: true} + path := []string{next.Task.Name, prev.Task.Name} + if err := visit(next.Task.Name, prev.Prev, path, visited); err != nil { + return fmt.Errorf("cycle detected: %v", err) + } + next.Prev = append(next.Prev, prev) + prev.Next = append(prev.Next, next) + return nil +} + +func visit(currentName string, nodes []*Node, path []string, visited map[string]bool) error { + for _, n := range nodes { + path = append(path, n.Task.Name) + if _, ok := visited[n.Task.Name]; ok { + return fmt.Errorf(getVisitedPath(path)) + } + visited[currentName+"."+n.Task.Name] = true + if err := visit(n.Task.Name, n.Prev, path, visited); err != nil { + return err + } + } + return nil +} + +func getVisitedPath(path []string) string { + // Reverse the path since we traversed the DAG using prev pointers. + for i := len(path)/2 - 1; i >= 0; i-- { + opp := len(path) - 1 - i + path[i], path[opp] = path[opp], path[i] + } + return strings.Join(path, " -> ") +} + +func addLink(pt PipelineTask, previousTask string, nodes map[string]*Node) error { + prev, ok := nodes[previousTask] + if !ok { + return fmt.Errorf("Task %s depends on %s but %s wasn't present in Pipeline", pt.Name, previousTask, previousTask) + } + next, _ := nodes[pt.Name] + if err := linkPipelineTasks(prev, next); err != nil { + return fmt.Errorf("Couldn't create link from %s to %s: %v", prev.Task.Name, next.Task.Name, err) + } + return nil +} + +// BuildDAG returns a valid pipeline DAG. Returns error if the pipeline is invalid +func BuildDAG(tasks []PipelineTask) (*DAG, error) { + d := newDAG() + + // Add all Tasks mentioned in the `PipelineSpec` + for _, pt := range tasks { + if _, err := d.addPipelineTask(pt); err != nil { + return nil, fmt.Errorf("task %s is already present in DAG, can't add it again: %v", pt.Name, err) + } + } + // Process all from and runAfter constraints to add task dependency + for _, pt := range tasks { + for _, previousTask := range pt.RunAfter { + if err := addLink(pt, previousTask, d.Nodes); err != nil { + return nil, fmt.Errorf("couldn't add link between %s and %s: %v", pt.Name, previousTask, err) + } + } + if pt.Resources != nil { + for _, rd := range pt.Resources.Inputs { + for _, previousTask := range rd.From { + if err := addLink(pt, previousTask, d.Nodes); err != nil { + return nil, fmt.Errorf("couldn't add link between %s and %s: %v", pt.Name, previousTask, err) + } + } + } + } + } + return d, nil +} diff --git a/pkg/apis/pipeline/v1alpha1/dag_test.go b/pkg/apis/pipeline/v1alpha1/dag_test.go new file mode 100644 index 00000000000..d490b7c270b --- /dev/null +++ b/pkg/apis/pipeline/v1alpha1/dag_test.go @@ -0,0 +1,327 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "testing" + + "github.com/knative/build-pipeline/pkg/list" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func sameNodes(l, r []*Node) error { + lNames, rNames := []string{}, []string{} + for _, n := range l { + lNames = append(lNames, n.Task.Name) + } + for _, n := range r { + rNames = append(rNames, n.Task.Name) + } + + return list.IsSame(lNames, rNames) +} + +func assertSameDAG(t *testing.T, l, r *DAG) { + t.Helper() + lKeys, rKeys := []string{}, []string{} + + for k := range l.Nodes { + lKeys = append(lKeys, k) + } + for k := range r.Nodes { + rKeys = append(rKeys, k) + } + + // For the DAGs to be the same, they must contain the same nodes + err := list.IsSame(lKeys, rKeys) + if err != nil { + t.Fatalf("DAGS contain different nodes: %v", err) + } + + // If they contain the same nodes, the DAGs will be the same if all + // of the nodes have the same linkages + for k, rn := range r.Nodes { + ln := l.Nodes[k] + + err := sameNodes(rn.Prev, ln.Prev) + if err != nil { + t.Errorf("The %s nodes in the DAG have different previous nodes: %v", k, err) + } + err = sameNodes(rn.Next, ln.Next) + if err != nil { + t.Errorf("The %s nodes in the DAG have different next nodes: %v", k, err) + } + } +} + +func TestBuild_Parallel(t *testing.T) { + a := PipelineTask{Name: "a"} + b := PipelineTask{Name: "b"} + c := PipelineTask{Name: "c"} + + // This test make sure we can create a Pipeline with no links between any Tasks + // (all tasks run in parallel) + // a b c + p := &Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "pipeline"}, + Spec: PipelineSpec{ + Tasks: []PipelineTask{a, b, c}, + }, + } + expectedDAG := &DAG{ + Nodes: map[string]*Node{ + "a": {Task: a}, + "b": {Task: b}, + "c": {Task: c}, + }, + } + g, err := BuildDAG(p.Spec.Tasks) + if err != nil { + t.Fatalf("didn't expect error creating valid Pipeline %v but got %v", p, err) + } + assertSameDAG(t, expectedDAG, g) +} + +func TestBuild_JoinMultipleRoots(t *testing.T) { + a := PipelineTask{Name: "a"} + b := PipelineTask{Name: "b"} + c := PipelineTask{Name: "c"} + xDependsOnA := PipelineTask{ + Name: "x", + Resources: &PipelineTaskResources{ + Inputs: []PipelineTaskInputResource{{From: []string{"a"}}}, + }, + } + yDependsOnARunsAfterB := PipelineTask{ + Name: "y", + RunAfter: []string{"b"}, + Resources: &PipelineTaskResources{ + Inputs: []PipelineTaskInputResource{{From: []string{"a"}}}, + }, + } + zDependsOnX := PipelineTask{ + Name: "z", + Resources: &PipelineTaskResources{ + Inputs: []PipelineTaskInputResource{{From: []string{"x"}}}, + }, + } + + // a b c + // | \ / + // x y + // | + // z + + nodeA := &Node{Task: a} + nodeB := &Node{Task: b} + nodeC := &Node{Task: c} + nodeX := &Node{Task: xDependsOnA} + nodeY := &Node{Task: yDependsOnARunsAfterB} + nodeZ := &Node{Task: zDependsOnX} + + nodeA.Next = []*Node{nodeX, nodeY} + nodeB.Next = []*Node{nodeY} + nodeX.Prev = []*Node{nodeA} + nodeX.Next = []*Node{nodeZ} + nodeY.Prev = []*Node{nodeA, nodeB} + nodeZ.Prev = []*Node{nodeX} + + expectedDAG := &DAG{ + Nodes: map[string]*Node{ + "a": nodeA, + "b": nodeB, + "c": nodeC, + "x": nodeX, + "y": nodeY, + "z": nodeZ}, + } + p := &Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "pipeline"}, + Spec: PipelineSpec{ + Tasks: []PipelineTask{a, xDependsOnA, yDependsOnARunsAfterB, zDependsOnX, b, c}, + }, + } + g, err := BuildDAG(p.Spec.Tasks) + if err != nil { + t.Fatalf("didn't expect error creating valid Pipeline %v but got %v", p, err) + } + assertSameDAG(t, expectedDAG, g) +} + +func TestBuild_FanInFanOut(t *testing.T) { + a := PipelineTask{Name: "a"} + dDependsOnA := PipelineTask{ + Name: "d", + Resources: &PipelineTaskResources{ + Inputs: []PipelineTaskInputResource{{From: []string{"a"}}}, + }, + } + eRunsAfterA := PipelineTask{ + Name: "e", + RunAfter: []string{"a"}, + } + fDependsOnDAndE := PipelineTask{ + Name: "f", + Resources: &PipelineTaskResources{ + Inputs: []PipelineTaskInputResource{{From: []string{"d", "e"}}}, + }, + } + gRunsAfterF := PipelineTask{ + Name: "g", + RunAfter: []string{"f"}, + } + + // This test make sure we don't detect cycle (A -> B -> B -> …) when there is not. + // This means we "visit" a twice, from two different path ; but there is no cycle. + // a + // / \ + // d e + // \ / + // f + // | + // g + nodeA := &Node{Task: a} + nodeD := &Node{Task: dDependsOnA} + nodeE := &Node{Task: eRunsAfterA} + nodeF := &Node{Task: fDependsOnDAndE} + nodeG := &Node{Task: gRunsAfterF} + + nodeA.Next = []*Node{nodeD, nodeE} + nodeD.Prev = []*Node{nodeA} + nodeD.Next = []*Node{nodeF} + nodeE.Prev = []*Node{nodeA} + nodeE.Next = []*Node{nodeF} + nodeF.Prev = []*Node{nodeD, nodeE} + nodeF.Next = []*Node{nodeG} + nodeG.Prev = []*Node{nodeF} + + expectedDAG := &DAG{ + Nodes: map[string]*Node{ + "a": nodeA, + "d": nodeD, + "e": nodeE, + "f": nodeF, + "g": nodeG, + }, + } + p := &Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "pipeline"}, + Spec: PipelineSpec{ + Tasks: []PipelineTask{a, dDependsOnA, eRunsAfterA, fDependsOnDAndE, gRunsAfterF}, + }, + } + g, err := BuildDAG(p.Spec.Tasks) + if err != nil { + t.Fatalf("didn't expect error creating valid Pipeline %v but got %v", p, err) + } + assertSameDAG(t, expectedDAG, g) +} + +func TestBuild_Invalid(t *testing.T) { + a := PipelineTask{Name: "a"} + xDependsOnA := PipelineTask{ + Name: "x", + Resources: &PipelineTaskResources{ + Inputs: []PipelineTaskInputResource{{From: []string{"a"}}}, + }, + } + zDependsOnX := PipelineTask{ + Name: "z", + Resources: &PipelineTaskResources{ + Inputs: []PipelineTaskInputResource{{From: []string{"x"}}}, + }, + } + aDependsOnZ := PipelineTask{ + Name: "a", + Resources: &PipelineTaskResources{ + Inputs: []PipelineTaskInputResource{{From: []string{"z"}}}, + }, + } + xAfterA := PipelineTask{ + Name: "x", + RunAfter: []string{"a"}, + } + zAfterX := PipelineTask{ + Name: "z", + RunAfter: []string{"x"}, + } + aAfterZ := PipelineTask{ + Name: "a", + RunAfter: []string{"z"}, + } + selfLinkFrom := PipelineTask{ + Name: "a", + Resources: &PipelineTaskResources{ + Inputs: []PipelineTaskInputResource{{From: []string{"a"}}}, + }, + } + selfLinkAfter := PipelineTask{ + Name: "a", + RunAfter: []string{"a"}, + } + invalidTaskFrom := PipelineTask{ + Name: "a", + Resources: &PipelineTaskResources{ + Inputs: []PipelineTaskInputResource{{From: []string{"none"}}}, + }, + } + invalidTaskAfter := PipelineTask{ + Name: "a", + RunAfter: []string{"none"}, + } + + tcs := []struct { + name string + spec PipelineSpec + }{{ + name: "self-link-from", + spec: PipelineSpec{Tasks: []PipelineTask{selfLinkFrom}}, + }, { + name: "self-link-after", + spec: PipelineSpec{Tasks: []PipelineTask{selfLinkAfter}}, + }, { + name: "cycle-from", + spec: PipelineSpec{Tasks: []PipelineTask{xDependsOnA, zDependsOnX, aDependsOnZ}}, + }, { + name: "cycle-runAfter", + spec: PipelineSpec{Tasks: []PipelineTask{xAfterA, zAfterX, aAfterZ}}, + }, { + name: "cycle-both", + spec: PipelineSpec{Tasks: []PipelineTask{xDependsOnA, zAfterX, aDependsOnZ}}, + }, { + name: "duplicate-tasks", + spec: PipelineSpec{Tasks: []PipelineTask{a, a}}, + }, { + name: "invalid-task-name-from", + spec: PipelineSpec{Tasks: []PipelineTask{invalidTaskFrom}}, + }, { + name: "invalid-task-name-after", + spec: PipelineSpec{Tasks: []PipelineTask{invalidTaskAfter}}, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + p := &Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: tc.name}, + Spec: tc.spec, + } + if _, err := BuildDAG(p.Spec.Tasks); err == nil { + t.Errorf("expected to see an error for invalid DAG in pipeline %v but had none", tc.spec) + } + }) + } +} diff --git a/pkg/apis/pipeline/v1alpha1/pipeline_validation.go b/pkg/apis/pipeline/v1alpha1/pipeline_validation.go index 2b5005d3b2f..4f15c937029 100644 --- a/pkg/apis/pipeline/v1alpha1/pipeline_validation.go +++ b/pkg/apis/pipeline/v1alpha1/pipeline_validation.go @@ -58,8 +58,8 @@ func validateDeclaredResources(ps *PipelineSpec) error { return nil } -func isOutput(task PipelineTask, resource string) bool { - for _, output := range task.Resources.Outputs { +func isOutput(outputs []PipelineTaskOutputResource, resource string) bool { + for _, output := range outputs { if output.Resource == resource { return true } @@ -69,29 +69,29 @@ func isOutput(task PipelineTask, resource string) bool { // validateFrom ensures that the `from` values make sense: that they rely on values from Tasks // that ran previously, and that the PipelineResource is actually an output of the Task it should come from. -// TODO(#168) when pipelines don't just execute linearly this will need to be more sophisticated func validateFrom(tasks []PipelineTask) error { - for i, t := range tasks { + taskOutputs := map[string][]PipelineTaskOutputResource{} + for _, task := range tasks { + var to []PipelineTaskOutputResource + if task.Resources != nil { + to = make([]PipelineTaskOutputResource, len(task.Resources.Outputs)) + for i, o := range task.Resources.Outputs { + to[i] = o + } + } + taskOutputs[task.Name] = to + } + for _, t := range tasks { if t.Resources != nil { for _, rd := range t.Resources.Inputs { for _, pb := range rd.From { - if i == 0 { - return fmt.Errorf("first Task in Pipeline can't depend on anything before it (b/c there is nothing)") - } - found := false - // Look for previous Task that satisfies constraint - for j := i - 1; j >= 0; j-- { - if tasks[j].Name == pb { - // The input resource must actually be an output of the from tasks - if !isOutput(tasks[j], rd.Resource) { - return fmt.Errorf("the resource %s from %s must be an output but is an input", rd.Resource, pb) - } - found = true - } - } + outputs, found := taskOutputs[pb] if !found { return fmt.Errorf("expected resource %s to be from task %s, but task %s doesn't exist", rd.Resource, pb, pb) } + if !isOutput(outputs, rd.Resource) { + return fmt.Errorf("the resource %s from %s must be an output but is an input", rd.Resource, pb) + } } } } @@ -99,6 +99,16 @@ func validateFrom(tasks []PipelineTask) error { return nil } +// validateGraph ensures the Pipeline's dependency Graph (DAG) make sense: that there is no dependency +// cycle or that they rely on values from Tasks that ran previously, and that the PipelineResource +// is actually an output of the Task it should come from. +func validateGraph(tasks []PipelineTask) error { + if _, err := BuildDAG(tasks); err != nil { + return err + } + return nil +} + // Validate checks that taskNames in the Pipeline are valid and that the graph // of Tasks expressed in the Pipeline makes sense. func (ps *PipelineSpec) Validate() *apis.FieldError { @@ -126,6 +136,11 @@ func (ps *PipelineSpec) Validate() *apis.FieldError { return apis.ErrInvalidValue(err.Error(), "spec.tasks.resources.inputs.from") } + // Validate the pipeline task graph + if err := validateGraph(ps.Tasks); err != nil { + return apis.ErrInvalidValue(err.Error(), "spec.tasks") + } + // The parameter variables should be valid if err := validatePipelineParameterVariables(ps.Tasks, ps.Params); err != nil { return err diff --git a/pkg/apis/pipeline/v1alpha1/pipeline_validation_test.go b/pkg/apis/pipeline/v1alpha1/pipeline_validation_test.go index 0da44288e1e..c51695e2385 100644 --- a/pkg/apis/pipeline/v1alpha1/pipeline_validation_test.go +++ b/pkg/apis/pipeline/v1alpha1/pipeline_validation_test.go @@ -52,16 +52,6 @@ func TestPipelineSpec_Validate_Error(t *testing.T) { tb.PipelineTaskInputResource("the-resource", "great-resource", tb.From("bar"))), )), }, - { - name: "from task is afterward", - p: tb.Pipeline("pipeline", "namespace", tb.PipelineSpec( - tb.PipelineDeclaredResource("great-resource", v1alpha1.PipelineResourceTypeGit), - tb.PipelineTask("foo", "foo-task", - tb.PipelineTaskInputResource("the-resource", "great-resource", tb.From("bar"))), - tb.PipelineTask("bar", "bar-task", - tb.PipelineTaskOutputResource("the-resource", "great-resource")), - )), - }, { name: "unused resources declared", p: tb.Pipeline("pipeline", "namespace", tb.PipelineSpec( @@ -113,6 +103,13 @@ func TestPipelineSpec_Validate_Error(t *testing.T) { tb.PipelineTask("foo", "foo-task", tb.PipelineTaskParam("a-param", "${params.foo} and ${params.does-not-exist}")))), }, + { + name: "invalid dependency graph between the tasks", + p: tb.Pipeline("foo", "namespace", tb.PipelineSpec( + tb.PipelineTask("foo", "foo", tb.RunAfter("bar")), + tb.PipelineTask("bar", "bar", tb.RunAfter("foo")), + )), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go index bf2fa76e781..bbc7d10363d 100644 --- a/pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go @@ -166,6 +166,34 @@ func (in *ClusterTaskList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DAG) DeepCopyInto(out *DAG) { + *out = *in + if in.Nodes != nil { + in, out := &in.Nodes, &out.Nodes + *out = make(map[string]*Node, len(*in)) + for key, val := range *in { + if val == nil { + (*out)[key] = nil + } else { + (*out)[key] = new(Node) + val.DeepCopyInto((*out)[key]) + } + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DAG. +func (in *DAG) DeepCopy() *DAG { + if in == nil { + return nil + } + out := new(DAG) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GCSResource) DeepCopyInto(out *GCSResource) { *out = *in @@ -245,6 +273,47 @@ func (in *Inputs) DeepCopy() *Inputs { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Node) DeepCopyInto(out *Node) { + *out = *in + in.Task.DeepCopyInto(&out.Task) + if in.Prev != nil { + in, out := &in.Prev, &out.Prev + *out = make([]*Node, len(*in)) + for i := range *in { + if (*in)[i] == nil { + (*out)[i] = nil + } else { + (*out)[i] = new(Node) + (*in)[i].DeepCopyInto((*out)[i]) + } + } + } + if in.Next != nil { + in, out := &in.Next, &out.Next + *out = make([]*Node, len(*in)) + for i := range *in { + if (*in)[i] == nil { + (*out)[i] = nil + } else { + (*out)[i] = new(Node) + (*in)[i].DeepCopyInto((*out)[i]) + } + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Node. +func (in *Node) DeepCopy() *Node { + if in == nil { + return nil + } + out := new(Node) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Outputs) DeepCopyInto(out *Outputs) { *out = *in diff --git a/pkg/reconciler/v1alpha1/pipeline/dag/dag.go b/pkg/reconciler/v1alpha1/pipeline/dag/dag.go index cf06adc0759..79ee92ec04d 100644 --- a/pkg/reconciler/v1alpha1/pipeline/dag/dag.go +++ b/pkg/reconciler/v1alpha1/pipeline/dag/dag.go @@ -18,90 +18,18 @@ package dag import ( "fmt" - "strings" "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" "github.com/knative/build-pipeline/pkg/list" ) -// Node represents a Task in a pipeline. -type Node struct { - // Task represent the PipelineTask in Pipeline - Task v1alpha1.PipelineTask - // Prev represent all the Previous task Nodes for the current Task - Prev []*Node - // Next represent all the Next task Nodes for the current Task - Next []*Node -} - -// DAG represents the Pipeline DAG -type DAG struct { - //Nodes represent map of PipelineTask name to Node in Pipeline DAG - Nodes map[string]*Node -} - -// Returns an empty Pipeline DAG -func new() *DAG { - return &DAG{Nodes: map[string]*Node{}} -} - -func (g *DAG) addPipelineTask(t v1alpha1.PipelineTask) (*Node, error) { - if _, ok := g.Nodes[t.Name]; ok { - return nil, fmt.Errorf("duplicate pipeline taks") - } - newNode := &Node{ - Task: t, - } - g.Nodes[t.Name] = newNode - return newNode, nil -} - -func linkPipelineTasks(prev *Node, next *Node) error { - // Check for self cycle - if prev.Task.Name == next.Task.Name { - return fmt.Errorf("cycle detected; task %q depends on itself", next.Task.Name) - } - // Check if we are adding cycles. - visited := map[string]bool{prev.Task.Name: true, next.Task.Name: true} - path := []string{next.Task.Name, prev.Task.Name} - if err := visit(next.Task.Name, prev.Prev, path, visited); err != nil { - return fmt.Errorf("cycle detected: %v", err) - } - next.Prev = append(next.Prev, prev) - prev.Next = append(prev.Next, next) - return nil -} - -func visit(currentName string, nodes []*Node, path []string, visited map[string]bool) error { - for _, n := range nodes { - path = append(path, n.Task.Name) - if _, ok := visited[n.Task.Name]; ok { - return fmt.Errorf(getVisitedPath(path)) - } - visited[currentName+"."+n.Task.Name] = true - if err := visit(n.Task.Name, n.Prev, path, visited); err != nil { - return err - } - } - return nil -} - -func getVisitedPath(path []string) string { - // Reverse the path since we traversed the graph using prev pointers. - for i := len(path)/2 - 1; i >= 0; i-- { - opp := len(path) - 1 - i - path[i], path[opp] = path[opp], path[i] - } - return strings.Join(path, " -> ") -} - // GetSchedulable returns a map of PipelineTask that can be scheduled (keyed // by the name of the PipelineTask) given a list of successfully finished doneTasks. // It returns tasks which have all dependecies marked as done, and thus can be scheduled. If the // specified doneTasks are invalid (i.e. if it is indicated that a Task is // done, but the previous Tasks are not done), an error is returned. -func (g *DAG) GetSchedulable(doneTasks ...string) (map[string]v1alpha1.PipelineTask, error) { - roots := g.getRoots() +func GetSchedulable(g *v1alpha1.DAG, doneTasks ...string) (map[string]v1alpha1.PipelineTask, error) { + roots := getRoots(g) tm := toMap(doneTasks...) d := map[string]v1alpha1.PipelineTask{} @@ -126,8 +54,8 @@ func (g *DAG) GetSchedulable(doneTasks ...string) (map[string]v1alpha1.PipelineT return d, nil } -func (g *DAG) getRoots() []*Node { - n := []*Node{} +func getRoots(g *v1alpha1.DAG) []*v1alpha1.Node { + n := []*v1alpha1.Node{} for _, node := range g.Nodes { if len(node.Prev) == 0 { n = append(n, node) @@ -136,7 +64,7 @@ func (g *DAG) getRoots() []*Node { return n } -func findSchedulable(n *Node, visited map[string]struct{}, doneTasks map[string]struct{}) []v1alpha1.PipelineTask { +func findSchedulable(n *v1alpha1.Node, visited map[string]struct{}, doneTasks map[string]struct{}) []v1alpha1.PipelineTask { if _, ok := visited[n.Task.Name]; ok { return []v1alpha1.PipelineTask{} } @@ -159,7 +87,7 @@ func findSchedulable(n *Node, visited map[string]struct{}, doneTasks map[string] return []v1alpha1.PipelineTask{} } -func isSchedulable(doneTasks map[string]struct{}, prevs []*Node) bool { +func isSchedulable(doneTasks map[string]struct{}, prevs []*v1alpha1.Node) bool { if len(prevs) == 0 { return true } @@ -179,45 +107,3 @@ func toMap(t ...string) map[string]struct{} { } return m } - -func addLink(pt v1alpha1.PipelineTask, previousTask string, nodes map[string]*Node) error { - prev, ok := nodes[previousTask] - if !ok { - return fmt.Errorf("Task %s depends on %s but %s wasn't present in Pipeline", pt.Name, previousTask, previousTask) - } - next, _ := nodes[pt.Name] - if err := linkPipelineTasks(prev, next); err != nil { - return fmt.Errorf("Couldn't create link from %s to %s: %v", prev.Task.Name, next.Task.Name, err) - } - return nil -} - -// Build returns a valid pipeline DAG. Returns error if the pipeline is invalid -func Build(tasks []v1alpha1.PipelineTask) (*DAG, error) { - d := new() - - // Add all Tasks mentioned in the `PipelineSpec` - for _, pt := range tasks { - if _, err := d.addPipelineTask(pt); err != nil { - return nil, fmt.Errorf("task %s is already present in graph, can't add it again: %v", pt.Name, err) - } - } - // Process all from and runAfter constraints to add task dependency - for _, pt := range tasks { - for _, previousTask := range pt.RunAfter { - if err := addLink(pt, previousTask, d.Nodes); err != nil { - return nil, fmt.Errorf("couldn't add link between %s and %s: %v", pt.Name, previousTask, err) - } - } - if pt.Resources != nil { - for _, rd := range pt.Resources.Inputs { - for _, previousTask := range rd.From { - if err := addLink(pt, previousTask, d.Nodes); err != nil { - return nil, fmt.Errorf("couldn't add link between %s and %s: %v", pt.Name, previousTask, err) - } - } - } - } - } - return d, nil -} diff --git a/pkg/reconciler/v1alpha1/pipeline/dag/dag_test.go b/pkg/reconciler/v1alpha1/pipeline/dag/dag_test.go index b517bccf504..70127683b37 100644 --- a/pkg/reconciler/v1alpha1/pipeline/dag/dag_test.go +++ b/pkg/reconciler/v1alpha1/pipeline/dag/dag_test.go @@ -20,355 +20,51 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" - "github.com/knative/build-pipeline/pkg/list" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func sameNodes(l, r []*Node) error { - lNames, rNames := []string{}, []string{} - for _, n := range l { - lNames = append(lNames, n.Task.Name) - } - for _, n := range r { - rNames = append(rNames, n.Task.Name) - } - - return list.IsSame(lNames, rNames) -} - -func assertSameDAG(t *testing.T, l, r *DAG) { +func testGraph(t *testing.T) *v1alpha1.DAG { + // b a + // | / \ + // | | x + // | | / | + // | y | + // \ / z + // w t.Helper() - lKeys, rKeys := []string{}, []string{} - - for k := range l.Nodes { - lKeys = append(lKeys, k) - } - for k := range r.Nodes { - rKeys = append(rKeys, k) - } - - // For the DAGs to be the same, they must contain the same nodes - err := list.IsSame(lKeys, rKeys) - if err != nil { - t.Fatalf("DAGS contain different nodes: %v", err) - } - - // If they contain the same nodes, the DAGs will be the same if all - // of the nodes have the same linkages - for k, rn := range r.Nodes { - ln := l.Nodes[k] - - err := sameNodes(rn.Prev, ln.Prev) - if err != nil { - t.Errorf("The %s nodes in the DAG have different previous nodes: %v", k, err) - } - err = sameNodes(rn.Next, ln.Next) - if err != nil { - t.Errorf("The %s nodes in the DAG have different next nodes: %v", k, err) - } - } -} - -func TestBuild_Parallel(t *testing.T) { - a := v1alpha1.PipelineTask{Name: "a"} - b := v1alpha1.PipelineTask{Name: "b"} - c := v1alpha1.PipelineTask{Name: "c"} - - // This test make sure we can create a Pipeline with no links between any Tasks - // (all tasks run in parallel) - // a b c - p := &v1alpha1.Pipeline{ - ObjectMeta: metav1.ObjectMeta{Name: "pipeline"}, - Spec: v1alpha1.PipelineSpec{ - Tasks: []v1alpha1.PipelineTask{a, b, c}, - }, - } - expectedDAG := &DAG{ - Nodes: map[string]*Node{ - "a": {Task: a}, - "b": {Task: b}, - "c": {Task: c}, - }, - } - g, err := Build(p.Spec.Tasks) - if err != nil { - t.Fatalf("didn't expect error creating valid Pipeline %v but got %v", p, err) - } - assertSameDAG(t, expectedDAG, g) -} - -func TestBuild_JoinMultipleRoots(t *testing.T) { - a := v1alpha1.PipelineTask{Name: "a"} - b := v1alpha1.PipelineTask{Name: "b"} - c := v1alpha1.PipelineTask{Name: "c"} - xDependsOnA := v1alpha1.PipelineTask{ - Name: "x", - Resources: &v1alpha1.PipelineTaskResources{ - Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"a"}}}, + g, err := v1alpha1.BuildDAG([]v1alpha1.PipelineTask{ + { + Name: "a", }, - } - yDependsOnARunsAfterB := v1alpha1.PipelineTask{ - Name: "y", - RunAfter: []string{"b"}, - Resources: &v1alpha1.PipelineTaskResources{ - Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"a"}}}, + { + Name: "b", }, - } - zDependsOnX := v1alpha1.PipelineTask{ - Name: "z", - Resources: &v1alpha1.PipelineTaskResources{ - Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"x"}}}, + { + Name: "w", + RunAfter: []string{"b", "y"}, }, - } - - // a b c - // | \ / - // x y - // | - // z - - nodeA := &Node{Task: a} - nodeB := &Node{Task: b} - nodeC := &Node{Task: c} - nodeX := &Node{Task: xDependsOnA} - nodeY := &Node{Task: yDependsOnARunsAfterB} - nodeZ := &Node{Task: zDependsOnX} - - nodeA.Next = []*Node{nodeX, nodeY} - nodeB.Next = []*Node{nodeY} - nodeX.Prev = []*Node{nodeA} - nodeX.Next = []*Node{nodeZ} - nodeY.Prev = []*Node{nodeA, nodeB} - nodeZ.Prev = []*Node{nodeX} - - expectedDAG := &DAG{ - Nodes: map[string]*Node{ - "a": nodeA, - "b": nodeB, - "c": nodeC, - "x": nodeX, - "y": nodeY, - "z": nodeZ}, - } - p := &v1alpha1.Pipeline{ - ObjectMeta: metav1.ObjectMeta{Name: "pipeline"}, - Spec: v1alpha1.PipelineSpec{ - Tasks: []v1alpha1.PipelineTask{a, xDependsOnA, yDependsOnARunsAfterB, zDependsOnX, b, c}, + { + Name: "x", + RunAfter: []string{"a"}, }, - } - g, err := Build(p.Spec.Tasks) - if err != nil { - t.Fatalf("didn't expect error creating valid Pipeline %v but got %v", p, err) - } - assertSameDAG(t, expectedDAG, g) -} - -func TestBuild_FanInFanOut(t *testing.T) { - a := v1alpha1.PipelineTask{Name: "a"} - dDependsOnA := v1alpha1.PipelineTask{ - Name: "d", - Resources: &v1alpha1.PipelineTaskResources{ - Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"a"}}}, - }, - } - eRunsAfterA := v1alpha1.PipelineTask{ - Name: "e", - RunAfter: []string{"a"}, - } - fDependsOnDAndE := v1alpha1.PipelineTask{ - Name: "f", - Resources: &v1alpha1.PipelineTaskResources{ - Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"d", "e"}}}, - }, - } - gRunsAfterF := v1alpha1.PipelineTask{ - Name: "g", - RunAfter: []string{"f"}, - } - - // This test make sure we don't detect cycle (A -> B -> B -> …) when there is not. - // This means we "visit" a twice, from two different path ; but there is no cycle. - // a - // / \ - // d e - // \ / - // f - // | - // g - nodeA := &Node{Task: a} - nodeD := &Node{Task: dDependsOnA} - nodeE := &Node{Task: eRunsAfterA} - nodeF := &Node{Task: fDependsOnDAndE} - nodeG := &Node{Task: gRunsAfterF} - - nodeA.Next = []*Node{nodeD, nodeE} - nodeD.Prev = []*Node{nodeA} - nodeD.Next = []*Node{nodeF} - nodeE.Prev = []*Node{nodeA} - nodeE.Next = []*Node{nodeF} - nodeF.Prev = []*Node{nodeD, nodeE} - nodeF.Next = []*Node{nodeG} - nodeG.Prev = []*Node{nodeF} - - expectedDAG := &DAG{ - Nodes: map[string]*Node{ - "a": nodeA, - "d": nodeD, - "e": nodeE, - "f": nodeF, - "g": nodeG, + { + Name: "y", + RunAfter: []string{"a", "x"}, }, - } - p := &v1alpha1.Pipeline{ - ObjectMeta: metav1.ObjectMeta{Name: "pipeline"}, - Spec: v1alpha1.PipelineSpec{ - Tasks: []v1alpha1.PipelineTask{a, dDependsOnA, eRunsAfterA, fDependsOnDAndE, gRunsAfterF}, + { + Name: "z", + RunAfter: []string{"x"}, }, - } - g, err := Build(p.Spec.Tasks) + }) if err != nil { - t.Fatalf("didn't expect error creating valid Pipeline %v but got %v", p, err) - } - assertSameDAG(t, expectedDAG, g) -} - -func TestBuild_Invalid(t *testing.T) { - a := v1alpha1.PipelineTask{Name: "a"} - xDependsOnA := v1alpha1.PipelineTask{ - Name: "x", - Resources: &v1alpha1.PipelineTaskResources{ - Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"a"}}}, - }, - } - zDependsOnX := v1alpha1.PipelineTask{ - Name: "z", - Resources: &v1alpha1.PipelineTaskResources{ - Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"x"}}}, - }, - } - aDependsOnZ := v1alpha1.PipelineTask{ - Name: "a", - Resources: &v1alpha1.PipelineTaskResources{ - Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"z"}}}, - }, - } - xAfterA := v1alpha1.PipelineTask{ - Name: "x", - RunAfter: []string{"a"}, - } - zAfterX := v1alpha1.PipelineTask{ - Name: "z", - RunAfter: []string{"x"}, - } - aAfterZ := v1alpha1.PipelineTask{ - Name: "a", - RunAfter: []string{"z"}, - } - selfLinkFrom := v1alpha1.PipelineTask{ - Name: "a", - Resources: &v1alpha1.PipelineTaskResources{ - Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"a"}}}, - }, - } - selfLinkAfter := v1alpha1.PipelineTask{ - Name: "a", - RunAfter: []string{"a"}, - } - invalidTaskFrom := v1alpha1.PipelineTask{ - Name: "a", - Resources: &v1alpha1.PipelineTaskResources{ - Inputs: []v1alpha1.PipelineTaskInputResource{{From: []string{"none"}}}, - }, - } - invalidTaskAfter := v1alpha1.PipelineTask{ - Name: "a", - RunAfter: []string{"none"}, - } - - tcs := []struct { - name string - spec v1alpha1.PipelineSpec - }{{ - name: "self-link-from", - spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{selfLinkFrom}}, - }, { - name: "self-link-after", - spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{selfLinkAfter}}, - }, { - name: "cycle-from", - spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{xDependsOnA, zDependsOnX, aDependsOnZ}}, - }, { - name: "cycle-runAfter", - spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{xAfterA, zAfterX, aAfterZ}}, - }, { - name: "cycle-both", - spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{xDependsOnA, zAfterX, aDependsOnZ}}, - }, { - name: "duplicate-tasks", - spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{a, a}}, - }, { - name: "invalid-task-name-from", - spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{invalidTaskFrom}}, - }, { - name: "invalid-task-name-after", - spec: v1alpha1.PipelineSpec{Tasks: []v1alpha1.PipelineTask{invalidTaskAfter}}, - }, - } - for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - p := &v1alpha1.Pipeline{ - ObjectMeta: metav1.ObjectMeta{Name: tc.name}, - Spec: tc.spec, - } - if _, err := Build(p.Spec.Tasks); err == nil { - t.Errorf("expected to see an error for invalid DAG in pipeline %v but had none", tc.spec) - } - }) - } -} - -func testGraph() *DAG { - // b a - // | / \ - // | | x - // | | / | - // | y | - // \ / z - // w - g := new() - g.Nodes["a"] = &Node{ - Task: v1alpha1.PipelineTask{Name: "a"}, - } - g.Nodes["b"] = &Node{ - Task: v1alpha1.PipelineTask{Name: "b"}, - } - g.Nodes["x"] = &Node{ - Task: v1alpha1.PipelineTask{Name: "x"}, - } - linkPipelineTasks(g.Nodes["a"], g.Nodes["x"]) - - g.Nodes["y"] = &Node{ - Task: v1alpha1.PipelineTask{Name: "y"}, - } - linkPipelineTasks(g.Nodes["a"], g.Nodes["y"]) - linkPipelineTasks(g.Nodes["x"], g.Nodes["y"]) - - g.Nodes["z"] = &Node{ - Task: v1alpha1.PipelineTask{Name: "z"}, - } - linkPipelineTasks(g.Nodes["x"], g.Nodes["z"]) - - g.Nodes["w"] = &Node{ - Task: v1alpha1.PipelineTask{Name: "w"}, + t.Fatal(err) } - linkPipelineTasks(g.Nodes["y"], g.Nodes["w"]) - linkPipelineTasks(g.Nodes["b"], g.Nodes["w"]) return g } func TestGetSchedulable(t *testing.T) { - g := testGraph() + g := testGraph(t) tcs := []struct { name string finished []string @@ -438,11 +134,11 @@ func TestGetSchedulable(t *testing.T) { }} for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - tasks, err := g.GetSchedulable(tc.finished...) + tasks, err := GetSchedulable(g, tc.finished...) if err != nil { t.Fatalf("Didn't expect error when getting next tasks for %v but got %v", tc.finished, err) } - if d := cmp.Diff(tasks, tc.expectedTasks); d != "" { + if d := cmp.Diff(tasks, tc.expectedTasks, cmpopts.IgnoreFields(v1alpha1.PipelineTask{}, "RunAfter")); d != "" { t.Errorf("expected that with %v done, %v would be ready to schedule but was different: %s", tc.finished, tc.expectedTasks, d) } }) @@ -450,7 +146,7 @@ func TestGetSchedulable(t *testing.T) { } func TestGetSchedulable_Invalid(t *testing.T) { - g := testGraph() + g := testGraph(t) tcs := []struct { name string finished []string @@ -478,7 +174,7 @@ func TestGetSchedulable_Invalid(t *testing.T) { }} for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - _, err := g.GetSchedulable(tc.finished...) + _, err := GetSchedulable(g, tc.finished...) if err == nil { t.Fatalf("Expected error for invalid done tasks %v but got none", tc.finished) } diff --git a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go index 78553d1ac3f..ef895bad197 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go @@ -222,7 +222,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er p = p.DeepCopy() - d, err := dag.Build(p.Spec.Tasks) + d, err := v1alpha1.BuildDAG(p.Spec.Tasks) if err != nil { // This Run has failed, so we need to mark it as failed and stop reconciling it pr.Status.SetCondition(&duckv1alpha1.Condition{ @@ -336,7 +336,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er return cancelPipelineRun(pr, pipelineState, c.PipelineClientSet) } - candidateTasks, err := d.GetSchedulable(pipelineState.SuccessfulPipelineTaskNames()...) + candidateTasks, err := dag.GetSchedulable(d, pipelineState.SuccessfulPipelineTaskNames()...) if err != nil { c.Logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err) }