Skip to content

Commit

Permalink
Introduce InternalTektonResultType as a ResultType
Browse files Browse the repository at this point in the history
In light of tektoncd#3087 the need for a ResultType that is not exposed
as a TaskRunResult or PipelineResourceResult arises.
In tektoncd#3087, a Step can emit a result indicating a Step timeout
has occurred. This is a result that should not be exposed hence
the need for a new ResultType called InternalTektonResultType.
This commit ensures results of this type are filtered out.

Introducing an InternalTektonResultType ensures a future proof
solution to internal results that should not be exposed.
Aside from the example in tektoncd#3087, a present candidate is the
result written out by a Step containing a "StartedAt" key.
Currently this result is filtered out with a specific function.
Marking it as an InternalTektonResultTypes now allows for
this result to automatically be filtered out.

Additionally this commit brings about refactoring (and sometimes
renaming) of functions related to converting pod statuses to
taskrun statuses from pkg/reconciler/taskrun/taskrun.go to
pkg/pod/status/status.go.
This is accompanied with moving unit test cases from taskrun_test.go
to status_test.go.
These unit tests now solely operate on exported functions.
  • Loading branch information
Peaorl committed Aug 26, 2020
1 parent fb296e6 commit ad5e5d3
Show file tree
Hide file tree
Showing 6 changed files with 439 additions and 398 deletions.
2 changes: 2 additions & 0 deletions pkg/apis/pipeline/v1beta1/task_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
TaskRunResultType ResultType = "TaskRunResult"
// PipelineResourceResultType default pipeline result value
PipelineResourceResultType ResultType = "PipelineResourceResult"
// InternalTektonResultType default internal tekton result value
InternalTektonResultType ResultType = "InternalTektonResult"
// UnknownResultType default unknown result type value
UnknownResultType ResultType = ""
)
Expand Down
10 changes: 6 additions & 4 deletions pkg/entrypoint/entrypointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ func (e Entrypointer) Go() error {
// *but* we write postfile to make next steps bail too.
e.WritePostFile(e.PostFile, err)
output = append(output, v1beta1.PipelineResourceResult{
Key: "StartedAt",
Value: time.Now().Format(timeFormat),
Key: "StartedAt",
Value: time.Now().Format(timeFormat),
ResultType: v1beta1.InternalTektonResultType,
})

return err
Expand All @@ -114,8 +115,9 @@ func (e Entrypointer) Go() error {
e.Args = append([]string{e.Entrypoint}, e.Args...)
}
output = append(output, v1beta1.PipelineResourceResult{
Key: "StartedAt",
Value: time.Now().Format(timeFormat),
Key: "StartedAt",
Value: time.Now().Format(timeFormat),
ResultType: v1beta1.InternalTektonResultType,
})

err := e.Runner.Run(e.Args...)
Expand Down
120 changes: 81 additions & 39 deletions pkg/pod/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package pod

import (
"encoding/json"
"fmt"
"sort"
"strings"
Expand Down Expand Up @@ -97,27 +96,51 @@ func SidecarsReady(podStatus corev1.PodStatus) bool {
}

// MakeTaskRunStatus returns a TaskRunStatus based on the Pod's status.
func MakeTaskRunStatus(logger *zap.SugaredLogger, tr v1beta1.TaskRun, pod *corev1.Pod, taskSpec v1beta1.TaskSpec) v1beta1.TaskRunStatus {
func MakeTaskRunStatus(logger *zap.SugaredLogger, tr v1beta1.TaskRun, pod *corev1.Pod, taskSpec v1beta1.TaskSpec) (v1beta1.TaskRunStatus, error) {
trs := &tr.Status
if trs.GetCondition(apis.ConditionSucceeded) == nil || trs.GetCondition(apis.ConditionSucceeded).Status == corev1.ConditionUnknown {
// If the taskRunStatus doesn't exist yet, it's because we just started running
MarkStatusRunning(trs, v1beta1.TaskRunReasonRunning.String(), "Not all Steps in the Task have finished executing")
}

// Complete if we did not find a step that is not complete, or the pod is in a definitely complete phase
complete := areStepsComplete(pod) || pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed

if complete {
updateCompletedTaskRun(trs, pod)
} else {
updateIncompleteTaskRun(trs, pod)
}

trs.PodName = pod.Name
trs.Steps = []v1beta1.StepState{}
trs.Sidecars = []v1beta1.SidecarState{}

var err error

for _, s := range pod.Status.ContainerStatuses {
if IsContainerStep(s.Name) {
if s.State.Terminated != nil && len(s.State.Terminated.Message) != 0 {
message, time, err := removeStartInfoFromTerminationMessage(s)

var results []v1beta1.PipelineResourceResult
results, err = termination.ParseMessage(s.State.Terminated.Message)
if err != nil {
logger.Errorf("error setting the start time of step %q in taskrun %q: %w", s.Name, tr.Name, err)
}
if time != nil {
s.State.Terminated.StartedAt = *time
s.State.Terminated.Message = message
logger.Errorf("termination message could not be parsed as JSON: %v", err)
} else {
//Further processing if the termination message is JSON formatted
var time *metav1.Time
time, err = extractStartedAtTimeFromResults(results)
if err != nil {
logger.Errorf("error setting the start time of step %q in taskrun %q: %v", s.Name, tr.Name, err)
}
if time != nil {
s.State.Terminated.StartedAt = *time
}
if tr.IsSuccessful() {
taskResults, pipelineResourceResults := filterResultsAndResources(results)
trs.TaskRunResults = append(trs.TaskRunResults, taskResults...)
trs.ResourcesResult = append(trs.ResourcesResult, pipelineResourceResults...)
}
}
}
trs.Steps = append(trs.Steps, v1beta1.StepState{
Expand All @@ -135,51 +158,70 @@ func MakeTaskRunStatus(logger *zap.SugaredLogger, tr v1beta1.TaskRun, pod *corev
})
}
}

// Complete if we did not find a step that is not complete, or the pod is in a definitely complete phase
complete := areStepsComplete(pod) || pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed

if complete {
updateCompletedTaskRun(trs, pod)
} else {
updateIncompleteTaskRun(trs, pod)
}
trs.TaskRunResults = removeDuplicateResults(trs.TaskRunResults)

// Sort step states according to the order specified in the TaskRun spec's steps.
trs.Steps = sortTaskRunStepOrder(trs.Steps, taskSpec.Steps)

return *trs
return *trs, err
}

func filterResultsAndResources(results []v1beta1.PipelineResourceResult) ([]v1beta1.TaskRunResult, []v1beta1.PipelineResourceResult) {
var taskResults []v1beta1.TaskRunResult
var pipelineResourceResults []v1beta1.PipelineResourceResult
for _, r := range results {
switch r.ResultType {
case v1beta1.TaskRunResultType:
taskRunResult := v1beta1.TaskRunResult{
Name: r.Key,
Value: r.Value,
}
taskResults = append(taskResults, taskRunResult)
case v1beta1.InternalTektonResultType:
// Internal messages are ignored because they're not used as external result
continue
case v1beta1.PipelineResourceResultType:
fallthrough
default:
pipelineResourceResults = append(pipelineResourceResults, r)
}
}

return taskResults, pipelineResourceResults
}

// removeStartInfoFromTerminationMessage searches for a result called "StartedAt" in the JSON-formatted
// termination message of a step and returns the values to use for sets State.Terminated if it's
// found. The "StartedAt" result is also removed from the list of results in the container status.
func removeStartInfoFromTerminationMessage(s corev1.ContainerStatus) (string, *metav1.Time, error) {
r, err := termination.ParseMessage(s.State.Terminated.Message)
if err != nil {
return "", nil, fmt.Errorf("termination message could not be parsed as JSON: %w", err)
func removeDuplicateResults(taskRunResult []v1beta1.TaskRunResult) []v1beta1.TaskRunResult {
if len(taskRunResult) == 0 {
return nil
}

uniq := make([]v1beta1.TaskRunResult, 0)
latest := make(map[string]v1beta1.TaskRunResult, 0)
for _, res := range taskRunResult {
if _, seen := latest[res.Name]; !seen {
uniq = append(uniq, res)
}
latest[res.Name] = res
}
for i, res := range uniq {
uniq[i] = latest[res.Name]
}
for index, result := range r {
return uniq
}

func extractStartedAtTimeFromResults(results []v1beta1.PipelineResourceResult) (*metav1.Time, error) {

for _, result := range results {
if result.Key == "StartedAt" {
t, err := time.Parse(timeFormat, result.Value)
if err != nil {
return "", nil, fmt.Errorf("could not parse time value %q in StartedAt field: %w", result.Value, err)
return nil, fmt.Errorf("could not parse time value %q in StartedAt field: %w", result.Value, err)
}
message := ""
startedAt := metav1.NewTime(t)
// remove the entry for the starting time
r = append(r[:index], r[index+1:]...)
if len(r) == 0 {
message = ""
} else if bytes, err := json.Marshal(r); err != nil {
return "", nil, fmt.Errorf("error marshalling remaining results back into termination message: %w", err)
} else {
message = string(bytes)
}
return message, &startedAt, nil
return &startedAt, nil
}
}
return "", nil, nil
return nil, nil
}

func updateCompletedTaskRun(trs *v1beta1.TaskRunStatus, pod *corev1.Pod) {
Expand Down
Loading

0 comments on commit ad5e5d3

Please sign in to comment.