-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Pipeline runs belonging to one repository now run sequentially. If a pipeline run cannot start immediately, it is created as "pending", and a process is started to periodically check if it can start. Since the pipeline manager service may be restarted, it check on boot if there are any pending runs for the repositories under its control and starts the periodic check for those. We can improve on the design by adding a signal in the finish task of a pipeline run that the run will soon finish, reducing the up to 30s wait time for the next run. Closes #394.
- Loading branch information
1 parent
77663e6
commit e1d6372
Showing
5 changed files
with
424 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
package manager | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"math/rand" | ||
"sort" | ||
"time" | ||
|
||
tektonClient "github.com/opendevstack/pipeline/internal/tekton" | ||
"github.com/opendevstack/pipeline/pkg/logging" | ||
tekton "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
) | ||
|
||
// pipelineRunQueue manages multiple queues. These queues | ||
// can be polled in vertain intervals. | ||
type pipelineRunQueue struct { | ||
queues map[string]bool | ||
pollInterval time.Duration | ||
// logger is the logger to send logging messages to. | ||
logger logging.LeveledLoggerInterface | ||
} | ||
|
||
// StartPolling periodically checks status for given identifier. | ||
// The time until the first time is not more than maxInitialWait. | ||
func (q *pipelineRunQueue) StartPolling(pt QueueAdvancer, identifier string, maxInitialWait time.Duration) chan bool { | ||
quit := make(chan bool) | ||
if q.queues[identifier] { | ||
close(quit) | ||
return quit | ||
} | ||
q.queues[identifier] = true | ||
|
||
maxInitialWaitSeconds := int(maxInitialWait.Seconds()) | ||
var ticker *time.Ticker | ||
if maxInitialWaitSeconds > 1 { | ||
initialWaitSeconds := rand.Intn(maxInitialWaitSeconds-1) + 1 | ||
ticker = time.NewTicker(time.Duration(initialWaitSeconds) * time.Second) | ||
} else { | ||
ticker = time.NewTicker(time.Second) | ||
} | ||
go func() { | ||
for { | ||
select { | ||
case <-quit: | ||
q.queues[identifier] = false | ||
ticker.Stop() | ||
return | ||
case <-ticker.C: | ||
ticker.Stop() | ||
ticker = time.NewTicker(q.pollInterval) | ||
queueLength, err := pt.AdvanceQueue(identifier) | ||
if err != nil { | ||
q.logger.Warnf("error during poll tick: %s", err) | ||
} | ||
if queueLength == 0 { | ||
close(quit) | ||
} | ||
} | ||
} | ||
}() | ||
|
||
return quit | ||
} | ||
|
||
// QueueAdvancer is the interface passed to | ||
// *pipelineRunQueue#StartPolling. | ||
type QueueAdvancer interface { | ||
// AdvanceQueue is called for each poll step. | ||
AdvanceQueue(repository string) (int, error) | ||
} | ||
|
||
// Queue represents a pipeline run Queue. Pipelines of one repository must | ||
// not run in parallel. | ||
type Queue struct { | ||
TektonClient tektonClient.ClientPipelineRunInterface | ||
} | ||
|
||
// needsQueueing checks if any run has either: | ||
// - pending status set OR | ||
// - is progressing | ||
func needsQueueing(pipelineRuns *tekton.PipelineRunList) bool { | ||
for _, pr := range pipelineRuns.Items { | ||
if pr.Spec.Status == tekton.PipelineRunSpecStatusPending || pipelineRunIsProgressing(pr) { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
|
||
// AdvanceQueue starts the oldest pending pipeline run if there is no | ||
// progressing pipeline run at the moment. | ||
// It returns the queue length. | ||
func (s *Server) AdvanceQueue(repository string) (int, error) { | ||
s.Mutex.Lock() | ||
defer s.Mutex.Unlock() | ||
ctxt, cancel := context.WithTimeout(context.Background(), 5*time.Minute) | ||
defer cancel() | ||
pipelineRuns, err := listPipelineRuns(s.TektonClient, ctxt, repository) | ||
if err != nil { | ||
return 0, fmt.Errorf("could not retrieve existing pipeline runs: %w", err) | ||
} | ||
|
||
var foundRunning bool | ||
pendingPrs := []tekton.PipelineRun{} | ||
for _, pr := range pipelineRuns.Items { | ||
if pr.IsPending() { | ||
pendingPrs = append(pendingPrs, pr) | ||
continue | ||
} | ||
if pipelineRunIsProgressing(pr) { | ||
foundRunning = true | ||
continue | ||
} | ||
} | ||
|
||
if !foundRunning && len(pendingPrs) > 0 { | ||
// update oldest pending PR | ||
sortPipelineRunsDescending(pendingPrs) | ||
oldestPR := pendingPrs[len(pendingPrs)-1] | ||
pendingPrs = pendingPrs[:len(pendingPrs)-1] | ||
oldestPR.Spec.Status = "" // remove pending status -> starts pipeline run | ||
_, err := s.TektonClient.UpdatePipelineRun(ctxt, &oldestPR, metav1.UpdateOptions{}) | ||
if err != nil { | ||
return len(pendingPrs), fmt.Errorf("could not update pipeline run %s: %w", oldestPR.Name, err) | ||
} | ||
} | ||
return len(pendingPrs), nil | ||
} | ||
|
||
// pipelineRunIsProgressing returns true if the PR is not done, not pending, | ||
// not cancelled, and not timed out. | ||
func pipelineRunIsProgressing(pr tekton.PipelineRun) bool { | ||
return !(pr.IsDone() || pr.IsPending() || pr.IsCancelled() || pr.IsTimedOut()) | ||
} | ||
|
||
// sortPipelineRunsDescending sorts pipeline runs by time (descending) | ||
func sortPipelineRunsDescending(pipelineRuns []tekton.PipelineRun) { | ||
sort.Slice(pipelineRuns, func(i, j int) bool { | ||
return pipelineRuns[j].CreationTimestamp.Time.Before(pipelineRuns[i].CreationTimestamp.Time) | ||
}) | ||
} |
Oops, something went wrong.