Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update taskrun/pipelinerun timeout logic to not rely on resync behavior #604

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 30 additions & 20 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ const (
)

var (
masterURL string
kubeconfig string
masterURL string
kubeconfig string
resyncPeriod = 10 * time.Hour
)

func main() {
Expand Down Expand Up @@ -99,8 +100,8 @@ func main() {
Logger: logger,
}

kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, opt.ResyncPeriod)
pipelineInformerFactory := pipelineinformers.NewSharedInformerFactory(pipelineClient, opt.ResyncPeriod)
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, resyncPeriod)
pipelineInformerFactory := pipelineinformers.NewSharedInformerFactory(pipelineClient, resyncPeriod)

taskInformer := pipelineInformerFactory.Tekton().V1alpha1().Tasks()
clusterTaskInformer := pipelineInformerFactory.Tekton().V1alpha1().ClusterTasks()
Expand All @@ -110,26 +111,35 @@ func main() {

pipelineInformer := pipelineInformerFactory.Tekton().V1alpha1().Pipelines()
pipelineRunInformer := pipelineInformerFactory.Tekton().V1alpha1().PipelineRuns()
timeoutHandler := reconciler.NewTimeoutHandler(logger, kubeClient, pipelineClient, stopCh)

trc := taskrun.NewController(opt,
taskRunInformer,
taskInformer,
clusterTaskInformer,
resourceInformer,
podInformer,
nil, //entrypoint cache will be initialized by controller if not provided
timeoutHandler,
)
prc := pipelinerun.NewController(opt,
pipelineRunInformer,
pipelineInformer,
taskInformer,
clusterTaskInformer,
taskRunInformer,
resourceInformer,
timeoutHandler,
)
// Build all of our controllers, with the clients constructed above.
controllers := []*controller.Impl{
// Pipeline Controllers
taskrun.NewController(opt,
taskRunInformer,
taskInformer,
clusterTaskInformer,
resourceInformer,
podInformer,
nil, //entrypoint cache will be initialized by controller if not provided
),
pipelinerun.NewController(opt,
pipelineRunInformer,
pipelineInformer,
taskInformer,
clusterTaskInformer,
taskRunInformer,
resourceInformer,
),
trc,
prc,
}
timeoutHandler.AddtrCallBackFunc(trc.Enqueue)
timeoutHandler.AddPrCallBackFunc(prc.Enqueue)
timeoutHandler.CheckTimeouts()

// Watch the logging config map and dynamically update logging levels.
configMapWatcher.Watch(logging.ConfigName, logging.UpdateLevelFromConfigMap(logger, atomicLevel, logging.ControllerLogKey))
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/pipeline/v1alpha1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,13 @@ func (pr *PipelineRun) GetOwnerReference() []metav1.OwnerReference {
*metav1.NewControllerRef(pr, groupVersionKind),
}
}

// IsDone returns true if the PipelineRun's status indicates that it is done.
func (pr *PipelineRunStatus) IsDone() bool {
return !pr.GetCondition(duckv1alpha1.ConditionSucceeded).IsUnknown()
}

// IsCancelled returns true if the PipelineRun's spec status is set to Cancelled state
func (sp PipelineRunSpec) IsCancelled() bool {
return sp.Status == PipelineRunSpecStatusCancelled
}
21 changes: 21 additions & 0 deletions pkg/apis/pipeline/v1alpha1/pipelinerun_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,24 @@ func TestInitializeConditions(t *testing.T) {
t.Fatalf("PipelineRun status getting reset")
}
}
func TestPipelineRunIsDone(t *testing.T) {
p := &PipelineRun{}
foo := &duckv1alpha1.Condition{
Type: duckv1alpha1.ConditionSucceeded,
Status: corev1.ConditionFalse,
}
p.Status.SetCondition(foo)
if !p.Status.IsDone() {
t.Fatal("Expected pipelinerun status to be done")
}
}

func TestPipelineRunIsCancelled(t *testing.T) {
ps := PipelineRunSpec{
Status: PipelineRunSpecStatusCancelled,
}

if !ps.IsCancelled() {
t.Fatal("Expected pipelinerun status to be cancelled")
}
}
10 changes: 10 additions & 0 deletions pkg/apis/pipeline/v1alpha1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,13 @@ func (tr *TaskRun) HasPipelineRunOwnerReference() bool {
}
return false
}

// IsDone returns true if the TaskRun's status indicates that it is done.
func (st *TaskRunStatus) IsDone() bool {
return !st.GetCondition(duckv1alpha1.ConditionSucceeded).IsUnknown()
}

// IsCancelled returns true if the TaskRun's spec status is set to Cancelled state
func (sp TaskRunSpec) IsCancelled() bool {
return sp.Status == TaskRunSpecStatusCancelled
}
22 changes: 22 additions & 0 deletions pkg/apis/pipeline/v1alpha1/taskrun_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -125,3 +126,24 @@ func TestTaskRun_HasPipelineRun(t *testing.T) {
})
}
}
func TestTaskRunIsDone(t *testing.T) {
tr := &TaskRun{}
foo := &duckv1alpha1.Condition{
Type: duckv1alpha1.ConditionSucceeded,
Status: corev1.ConditionFalse,
}
tr.Status.SetCondition(foo)
if !tr.Status.IsDone() {
t.Fatal("Expected pipelinerun status to be done")
}
}

func TestTaskRunIsCancelled(t *testing.T) {
ts := TaskRunSpec{
Status: TaskRunSpecStatusCancelled,
}

if !ts.IsCancelled() {
t.Fatal("Expected pipelinerun status to be cancelled")
}
}
228 changes: 228 additions & 0 deletions pkg/reconciler/timeout_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package reconciler

import (
"sync"

"fmt"
"time"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
clientset "github.com/tektoncd/pipeline/pkg/client/clientset/versioned"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

var (
defaultFunc = func(i interface{}, l *zap.SugaredLogger) {}
)

const (
defaultTimeout = 10 * time.Minute
)

// TimeoutSet contains required k8s interfaces to handle build timeouts
type TimeoutSet struct {
logger *zap.SugaredLogger
kubeclientset kubernetes.Interface
pipelineclientset clientset.Interface
taskRuncallbackFunc func(interface{})
pipelineruncallbackFunc func(interface{})
stopCh <-chan struct{}
statusMap *sync.Map
done map[string]chan bool
doneMut sync.Mutex
}

// NewTimeoutHandler returns TimeoutSet filled structure
func NewTimeoutHandler(
logger *zap.SugaredLogger,
kubeclientset kubernetes.Interface,
pipelineclientset clientset.Interface,
stopCh <-chan struct{},
) *TimeoutSet {
return &TimeoutSet{
logger: logger,
kubeclientset: kubeclientset,
pipelineclientset: pipelineclientset,
stopCh: stopCh,
statusMap: &sync.Map{},
done: make(map[string]chan bool),
doneMut: sync.Mutex{},
}
}

func (t *TimeoutSet) AddtrCallBackFunc(f func(interface{})) {
t.taskRuncallbackFunc = f
}

func (t *TimeoutSet) AddPrCallBackFunc(f func(interface{})) {
t.pipelineruncallbackFunc = f
}

func (t *TimeoutSet) Release(key string) {
t.doneMut.Lock()
defer t.doneMut.Unlock()
if finished, ok := t.done[key]; ok {
delete(t.done, key)
close(finished)
}
}

func (t *TimeoutSet) StatusLock(key string) {
m, _ := t.statusMap.LoadOrStore(key, &sync.Mutex{})
mut := m.(*sync.Mutex)
mut.Lock()
}

func (t *TimeoutSet) StatusUnlock(key string) {
m, ok := t.statusMap.Load(key)
if !ok {
return
}
mut := m.(*sync.Mutex)
mut.Unlock()
}

func (t *TimeoutSet) ReleaseKey(key string) {
t.statusMap.Delete(key)
}

func getTimeout(d *metav1.Duration) time.Duration {
timeout := defaultTimeout
if d != nil {
timeout = d.Duration
}
return timeout
}

// checkPipelineRunTimeouts function creates goroutines to wait for pipelinerun to
// finish/timeout in a given namespace
func (t *TimeoutSet) checkPipelineRunTimeouts(namespace string) {
pipelineRuns, err := t.pipelineclientset.TektonV1alpha1().PipelineRuns(namespace).List(metav1.ListOptions{})
if err != nil {
t.logger.Errorf("Can't get taskruns list in namespace %s: %s", namespace, err)
}
for _, pipelineRun := range pipelineRuns.Items {
pipelineRun := pipelineRun
if pipelineRun.Status.IsDone() {
continue
}
if pipelineRun.Spec.IsCancelled() {
continue
}
go t.WaitPipelineRun(&pipelineRun)
}
}

// CheckTimeouts function iterates through all namespaces and calls corresponding
// taskrun/pipelinerun timeout functions
func (t *TimeoutSet) CheckTimeouts() {
namespaces, err := t.kubeclientset.CoreV1().Namespaces().List(metav1.ListOptions{})
if err != nil {
t.logger.Errorf("Can't get namespaces list: %s", err)
}
for _, namespace := range namespaces.Items {
t.checkTaskRunTimeouts(namespace.GetName())
t.checkPipelineRunTimeouts(namespace.GetName())
}
}

// checkTaskRunTimeouts function creates goroutines to wait for pipelinerun to
// finish/timeout in a given namespace
func (t *TimeoutSet) checkTaskRunTimeouts(namespace string) {
taskruns, err := t.pipelineclientset.TektonV1alpha1().TaskRuns(namespace).List(metav1.ListOptions{})
if err != nil {
t.logger.Errorf("Can't get taskruns list in namespace %s: %s", namespace, err)
}
for _, taskrun := range taskruns.Items {
taskrun := taskrun
if taskrun.Status.IsDone() {
continue
}
if taskrun.Spec.IsCancelled() {
continue
}
go t.WaitTaskRun(&taskrun)
}
}

// WaitTaskRun function creates a blocking function for taskrun to wait for
// 1. Stop signal, 2. TaskRun to complete or 3. Taskrun to time out
func (t *TimeoutSet) WaitTaskRun(tr *v1alpha1.TaskRun) {
key := fmt.Sprintf("%s/%s/%s", "TaskRun", tr.Namespace, tr.Name)

timeout := getTimeout(tr.Spec.Timeout)
runtime := time.Duration(0)

t.StatusLock(key)
if tr.Status.StartTime != nil && !tr.Status.StartTime.Time.IsZero() {
runtime = time.Since(tr.Status.StartTime.Time)
}
t.StatusUnlock(key)
timeout -= runtime

var finished chan bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this can be extracted (as it's common for TaskRun and PipelineRun) to also hide the use of doneMut.

finished := getOrCreateFinishedChan(key)
// […]
func getOrCreateFinishedChan(key string) chan bool {
	var finished chan bool
	doneMut.Lock()
	if existingfinishedChan, ok := done[key]; ok {
		finished = existingfinishedChan
	} else {
		finished = make(chan bool)
	}
	done[key] = finished
	doneMut.Unlock()
        return finished
}

t.doneMut.Lock()
if existingfinishedChan, ok := t.done[key]; ok {
finished = existingfinishedChan
} else {
finished = make(chan bool)
}
t.done[key] = finished
t.doneMut.Unlock()

defer t.Release(key)

select {
case <-t.stopCh:
case <-finished:
case <-time.After(timeout):
t.StatusLock(key)
if t.taskRuncallbackFunc == nil {
defaultFunc(tr, t.logger)
} else {
t.taskRuncallbackFunc(tr)
}
t.StatusUnlock(key)
}
}

// WaitPipelineRun function creates a blocking function for pipelinerun to wait for
// 1. Stop signal, 2. pipelinerun to complete or 3. pipelinerun to time out
func (t *TimeoutSet) WaitPipelineRun(pr *v1alpha1.PipelineRun) {
key := fmt.Sprintf("%s/%s/%s", "PipelineRun", pr.Namespace, pr.Name)
timeout := getTimeout(pr.Spec.Timeout)

runtime := time.Duration(0)
t.StatusLock(key)
if pr.Status.StartTime != nil && !pr.Status.StartTime.Time.IsZero() {
runtime = time.Since(pr.Status.StartTime.Time)
}
t.StatusUnlock(key)
timeout -= runtime

var finished chan bool
t.doneMut.Lock()
if existingfinishedChan, ok := t.done[key]; ok {
finished = existingfinishedChan
} else {
finished = make(chan bool)
}
t.done[key] = finished
t.doneMut.Unlock()
defer t.Release(key)

select {
case <-t.stopCh:
case <-finished:
case <-time.After(timeout):
t.StatusLock(key)
if t.pipelineruncallbackFunc == nil {
defaultFunc(pr, t.logger)
} else {
t.pipelineruncallbackFunc(pr)
}
t.StatusUnlock(key)
}
}
Loading