Skip to content

Commit

Permalink
Resolve all PipelineResources first before continuing
Browse files Browse the repository at this point in the history
As part of #1184 I need to call `GetSetup` on all PipelineResources
early on in PipelineRun execution. Since PipelineRuns declare all their
resource up front, I wanted to be able to resolve all of them at once,
then call `GetSetup` on all of them. Also, as Pipelines got more complex
(we added Conditions) it turned out we were retrieving the resources in
a few different places. Also in #1324 @pritidesai is making it so that
these Resources can be provided by spec. By resolving all of this up
front at once, we can simplify the logic later on. And you can see in
this commit that we are able to reduce the responsibilities of
ResolvePipelineRun a bit too!
  • Loading branch information
bobcatfish committed Sep 24, 2019
1 parent 3873c3c commit cae91a6
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 295 deletions.
24 changes: 13 additions & 11 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
})
return nil
}
providedResources, err := resources.GetResourcesFromBindings(p, pr)
if err != nil {
if err := resources.ValidateResourceBindings(p, pr); err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
pr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Expand All @@ -254,6 +253,18 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
})
return nil
}
providedResources, err := resources.GetResourcesFromBindings(pr, c.resourceLister.PipelineResources(pr.Namespace).Get)
if err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
pr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: ReasonCouldntGetResource,
Message: fmt.Sprintf("PipelineRun %s can't be Run; it tries to bind Resources that don't exist: %s",
fmt.Sprintf("%s/%s", p.Namespace, pr.Name), err),
})
return nil
}

// Ensure that the parameters from the PipelineRun are overriding Pipeline parameters with the same type.
// Weird substitution issues can occur if this is not validated (ApplyParameters() does not verify type).
Expand Down Expand Up @@ -284,7 +295,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
func(name string) (v1alpha1.TaskInterface, error) {
return c.clusterTaskLister.Get(name)
},
c.resourceLister.PipelineResources(pr.Namespace).Get,
func(name string) (*v1alpha1.Condition, error) {
return c.conditionLister.Conditions(pr.Namespace).Get(name)
},
Expand All @@ -301,14 +311,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
Message: fmt.Sprintf("Pipeline %s can't be Run; it contains Tasks that don't exist: %s",
fmt.Sprintf("%s/%s", p.Namespace, p.Name), err),
})
case *resources.ResourceNotFoundError:
pr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: ReasonCouldntGetResource,
Message: fmt.Sprintf("PipelineRun %s can't be Run; it tries to bind Resources that don't exist: %s",
fmt.Sprintf("%s/%s", p.Namespace, pr.Name), err),
})
case *resources.ConditionNotFoundError:
pr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ func TestReconcile(t *testing.T) {
)

// ignore IgnoreUnexported ignore both after and before steps fields
if d := cmp.Diff(actual, expectedTaskRun, cmpopts.SortSlices(func(x, y v1alpha1.TaskResourceBinding) bool { return x.Name < y.Name })); d != "" {
t.Errorf("expected to see TaskRun %v created. Diff %s", expectedTaskRun, d)
if d := cmp.Diff(expectedTaskRun, actual, cmpopts.SortSlices(func(x, y v1alpha1.TaskResourceBinding) bool { return x.Name < y.Name })); d != "" {
t.Errorf("expected to see TaskRun %v created. Diff (-want, +got): %s", expectedTaskRun, d)
}
// test taskrun is able to recreate correct pipeline-pvc-name
if expectedTaskRun.GetPipelineRunPVCName() != "test-pipeline-run-success-pvc" {
Expand Down
155 changes: 60 additions & 95 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (t ResolvedPipelineRunTask) IsSuccessful() bool {
return false
}

// IsFailed returns true only if the taskrun itself has failed
// IsFailure returns true only if the taskrun itself has failed
func (t ResolvedPipelineRunTask) IsFailure() bool {
if t.TaskRun == nil {
return false
Expand Down Expand Up @@ -169,12 +169,24 @@ func (state PipelineRunState) SuccessfulPipelineTaskNames() []string {
// GetTaskRun is a function that will retrieve the TaskRun name.
type GetTaskRun func(name string) (*v1alpha1.TaskRun, error)

// GetResourcesFromBindings will validate that all PipelineResources declared in Pipeline p are bound in PipelineRun pr
// and if so, will return a map from the declared name of the PipelineResource (which is how the PipelineResource will
// be referred to in the PipelineRun) to the ResourceRef.
func GetResourcesFromBindings(p *v1alpha1.Pipeline, pr *v1alpha1.PipelineRun) (map[string]v1alpha1.PipelineResourceRef, error) {
resources := map[string]v1alpha1.PipelineResourceRef{}
// GetResourcesFromBindings will retreive all Resources bound in PipelineRun pr and return a map
// from the declared name of the PipelineResource (which is how the PipelineResource will
// be referred to in the PipelineRun) to the PipelineResource, obtained via getResource.
func GetResourcesFromBindings(pr *v1alpha1.PipelineRun, getResource resources.GetResource) (map[string]*v1alpha1.PipelineResource, error) {
resources := map[string]*v1alpha1.PipelineResource{}

for _, resource := range pr.Spec.Resources {
r, err := getResource(resource.ResourceRef.Name)
if err != nil {
return resources, xerrors.Errorf("Error following resource reference for %s: %w", resource.Name, err)
}
resources[resource.Name] = r
}
return resources, nil
}

// ValidateResourceBindings validate that the PipelineResources declared in Pipeline p are bound in PipelineRun.
func ValidateResourceBindings(p *v1alpha1.Pipeline, pr *v1alpha1.PipelineRun) error {
required := make([]string, 0, len(p.Spec.Resources))
for _, resource := range p.Spec.Resources {
required = append(required, resource.Name)
Expand All @@ -183,42 +195,10 @@ func GetResourcesFromBindings(p *v1alpha1.Pipeline, pr *v1alpha1.PipelineRun) (m
for _, resource := range pr.Spec.Resources {
provided = append(provided, resource.Name)
}
err := list.IsSame(required, provided)
if err != nil {
return resources, xerrors.Errorf("PipelineRun bound resources didn't match Pipeline: %w", err)
if err := list.IsSame(required, provided); err != nil {
return xerrors.Errorf("PipelineRun bound resources didn't match Pipeline: %w", err)
}

for _, resource := range pr.Spec.Resources {
resources[resource.Name] = resource.ResourceRef
}
return resources, nil
}

func getPipelineRunTaskResources(pt v1alpha1.PipelineTask, providedResources map[string]v1alpha1.PipelineResourceRef) ([]v1alpha1.TaskResourceBinding, []v1alpha1.TaskResourceBinding, error) {
inputs, outputs := []v1alpha1.TaskResourceBinding{}, []v1alpha1.TaskResourceBinding{}
if pt.Resources != nil {
for _, taskInput := range pt.Resources.Inputs {
resource, ok := providedResources[taskInput.Resource]
if !ok {
return inputs, outputs, xerrors.Errorf("pipelineTask tried to use input resource %s not present in declared resources", taskInput.Resource)
}
inputs = append(inputs, v1alpha1.TaskResourceBinding{
Name: taskInput.Name,
ResourceRef: resource,
})
}
for _, taskOutput := range pt.Resources.Outputs {
resource, ok := providedResources[taskOutput.Resource]
if !ok {
return outputs, outputs, xerrors.Errorf("pipelineTask tried to use output resource %s not present in declared resources", taskOutput.Resource)
}
outputs = append(outputs, v1alpha1.TaskResourceBinding{
Name: taskOutput.Name,
ResourceRef: resource,
})
}
}
return inputs, outputs, nil
return nil
}

// TaskNotFoundError indicates that the resolution failed because a referenced Task couldn't be retrieved
Expand All @@ -231,15 +211,6 @@ func (e *TaskNotFoundError) Error() string {
return fmt.Sprintf("Couldn't retrieve Task %q: %s", e.Name, e.Msg)
}

// ResourceNotFoundError indicates that the resolution failed because a referenced PipelineResource couldn't be retrieved
type ResourceNotFoundError struct {
Msg string
}

func (e *ResourceNotFoundError) Error() string {
return fmt.Sprintf("Couldn't retrieve PipelineResource: %s", e.Msg)
}

type ConditionNotFoundError struct {
Name string
Msg string
Expand All @@ -252,17 +223,15 @@ func (e *ConditionNotFoundError) Error() string {
// ResolvePipelineRun retrieves all Tasks instances which are reference by tasks, getting
// instances from getTask. If it is unable to retrieve an instance of a referenced Task, it
// will return an error, otherwise it returns a list of all of the Tasks retrieved.
// It will retrieve the Resources needed for the TaskRun as well using getResource and the mapping
// of providedResources.
// It will retrieve the Resources needed for the TaskRun using the mapping of providedResources.
func ResolvePipelineRun(
pipelineRun v1alpha1.PipelineRun,
getTask resources.GetTask,
getTaskRun resources.GetTaskRun,
getClusterTask resources.GetClusterTask,
getResource resources.GetResource,
getCondition GetCondition,
tasks []v1alpha1.PipelineTask,
providedResources map[string]v1alpha1.PipelineResourceRef,
providedResources map[string]*v1alpha1.PipelineResource,
) (PipelineRunState, error) {

state := []*ResolvedPipelineRunTask{}
Expand All @@ -289,16 +258,10 @@ func ResolvePipelineRun(
}
}

// Get all the resources that this task will be using, if any
inputs, outputs, err := getPipelineRunTaskResources(pt, providedResources)
if err != nil {
return nil, xerrors.Errorf("unexpected error which should have been caught by Pipeline webhook: %w", err)
}

spec := t.TaskSpec()
rtr, err := resources.ResolveTaskResources(&spec, t.TaskMetadata().Name, pt.TaskRef.Kind, inputs, outputs, getResource)
rtr, err := ResolvePipelineTaskResources(pt, &spec, t.TaskMetadata().Name, pt.TaskRef.Kind, providedResources)
if err != nil {
return nil, &ResourceNotFoundError{Msg: err.Error()}
return nil, xerrors.Errorf("couldn't match referenced resources with declared resources: %w", err)
}
rprt.ResolvedTaskResources = rtr

Expand All @@ -314,7 +277,7 @@ func ResolvePipelineRun(

// Get all conditions that this pipelineTask will be using, if any
if len(pt.Conditions) > 0 {
rcc, err := resolveConditionChecks(&pt, pipelineRun.Status.TaskRuns, rprt.TaskRunName, getTaskRun, getCondition, getResource, providedResources)
rcc, err := resolveConditionChecks(&pt, pipelineRun.Status.TaskRuns, rprt.TaskRunName, getTaskRun, getCondition, providedResources)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -360,7 +323,6 @@ func GetPipelineConditionStatus(pr *v1alpha1.PipelineRun, state PipelineRunState
// 2. Any one TaskRun has failed - >Failed. This should change with #1020 and #1023
// 3. All tasks are done or are skipped (i.e. condition check failed).-> Success
// 4. A Task or Condition is running right now or there are things left to run -> Running

if pr.IsTimedOut() {
return &apis.Condition{
Type: apis.ConditionSucceeded,
Expand Down Expand Up @@ -489,10 +451,7 @@ func ValidateFrom(state PipelineRunState) error {
return nil
}

func resolveConditionChecks(pt *v1alpha1.PipelineTask,
taskRunStatus map[string]*v1alpha1.PipelineRunTaskRunStatus,
taskRunName string, getTaskRun resources.GetTaskRun, getCondition GetCondition,
getResource resources.GetResource, providedResources map[string]v1alpha1.PipelineResourceRef) ([]*ResolvedConditionCheck, error) {
func resolveConditionChecks(pt *v1alpha1.PipelineTask, taskRunStatus map[string]*v1alpha1.PipelineRunTaskRunStatus, taskRunName string, getTaskRun resources.GetTaskRun, getCondition GetCondition, providedResources map[string]*v1alpha1.PipelineResource) ([]*ResolvedConditionCheck, error) {
rccs := []*ResolvedConditionCheck{}
for _, ptc := range pt.Conditions {
cName := ptc.ConditionRef
Expand All @@ -510,47 +469,53 @@ func resolveConditionChecks(pt *v1alpha1.PipelineTask,
return nil, xerrors.Errorf("error retrieving ConditionCheck %s for taskRun name %s : %w", conditionCheckName, taskRunName, err)
}
}
conditionResources := map[string]*v1alpha1.PipelineResource{}
for _, declared := range ptc.Resources {
r, ok := providedResources[declared.Resource]
if !ok {
return nil, xerrors.Errorf("resources %s missing for condition %s in pipeline task %s", declared.Resource, cName, pt.Name)
}
conditionResources[declared.Name] = r
}

rcc := ResolvedConditionCheck{
Condition: c,
ConditionCheckName: conditionCheckName,
ConditionCheck: v1alpha1.NewConditionCheck(cctr),
PipelineTaskCondition: &ptc,
}

if len(ptc.Resources) > 0 {
r, err := resolveConditionResources(ptc.Resources, getResource, providedResources)
if err != nil {
return nil, xerrors.Errorf("cloud not resolve resources for condition %s in pipeline task %s: %w", cName, pt.Name, err)
}
rcc.ResolvedResources = r
ResolvedResources: conditionResources,
}

rccs = append(rccs, &rcc)
}
return rccs, nil
}

func resolveConditionResources(prc []v1alpha1.PipelineConditionResource,
getResource resources.GetResource,
providedResources map[string]v1alpha1.PipelineResourceRef,
) (map[string]*v1alpha1.PipelineResource, error) {
rr := make(map[string]*v1alpha1.PipelineResource)
for _, r := range prc {
// First get a ref to actual resource name from its bound name
resourceRef, ok := providedResources[r.Resource]
if !ok {
return nil, xerrors.Errorf("resource %s not present in declared resources", r.Resource)
// ResolvePipelineTaskResources matches PipelineResources referenced by pt inputs and outputs with the
// providedResources and returns an instance of ResolvedTaskResources.
func ResolvePipelineTaskResources(pt v1alpha1.PipelineTask, ts *v1alpha1.TaskSpec, taskName string, kind v1alpha1.TaskKind, providedResources map[string]*v1alpha1.PipelineResource) (*resources.ResolvedTaskResources, error) {
rtr := resources.ResolvedTaskResources{
TaskName: taskName,
TaskSpec: ts,
Kind: kind,
Inputs: map[string]*v1alpha1.PipelineResource{},
Outputs: map[string]*v1alpha1.PipelineResource{},
}
if pt.Resources != nil {
for _, taskInput := range pt.Resources.Inputs {
resource, ok := providedResources[taskInput.Resource]
if !ok {
return nil, xerrors.Errorf("pipelineTask tried to use input resource %s not present in declared resources", taskInput.Resource)
}
rtr.Inputs[taskInput.Name] = resource
}

// Next, fetch the actual resource definition
gotResource, err := getResource(resourceRef.Name)
if err != nil {
return nil, xerrors.Errorf("could not retrieve resource %s: %w", r.Name, err)
for _, taskOutput := range pt.Resources.Outputs {
resource, ok := providedResources[taskOutput.Resource]
if !ok {
return nil, xerrors.Errorf("pipelineTask tried to use output resource %s not present in declared resources", taskOutput.Resource)
}
rtr.Outputs[taskOutput.Name] = resource
}

// Finally add it to the resolved resources map
rr[r.Name] = gotResource
}
return rr, nil
return &rtr, nil
}
Loading

0 comments on commit cae91a6

Please sign in to comment.