Skip to content

Commit

Permalink
Implement queueing
Browse files Browse the repository at this point in the history
Pipeline runs belonging to one repository now run sequentially. If a
pipeline run cannot start immediately, it is created as "pending", and a
process is started to periodically check if it can start.

Since the pipeline manager service may be restarted, it check on boot if
there are any pending runs for the repositories under its control and
starts the periodic check for those.

We can improve on the design by adding a signal in the finish task of a
pipeline run that the run will soon finish, reducing the up to 30s wait
time for the next run.

Closes #394.
  • Loading branch information
michaelsauter committed Mar 28, 2022
1 parent 8e69e9e commit b675529
Show file tree
Hide file tree
Showing 10 changed files with 509 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ listed in the changelog.
- Gradle build fails when it contains more than one test class ([#414](https://github.com/opendevstack/ods-pipeline/issues/414))
- Gradle proxy settings are set during prepare-local-env ([#291](https://github.com/opendevstack/ods-pipeline/issues/291))
- Pipeline creation fails when branch names contain slashes ([#466](https://github.com/opendevstack/ods-pipeline/issues/466))
- Race conditions between pipelines of the same repository ([#394](https://github.com/opendevstack/ods-pipeline/issues/394))

## [0.2.0] - 2021-12-22
### Added
Expand Down
1 change: 1 addition & 0 deletions cmd/pipeline-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func serve() error {
TektonClient: tClient,
BitbucketClient: bitbucketClient,
PipelineRunPruner: pruner,
Logger: logger,
})
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions docs/design/software-design-specification.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,8 @@ A pipeline is created or updated corresponding to the Git branch received in the

A PVC is created per repository unless it exists already. The name is equal to `ods-workspace-<component>` (shortened to 63 characters if longer). This PVC is then used in the pipeline as a shared workspace.

When no other pipeline run for the same repository is running or pending, the created/updated pipeline is started immediately. Otherwise a pending pipeline run is created, and a periodic polling is kicked off to allow the run to start once possible. Since the pipeline manager does not persist state about pending pipeline runs, polling is also started for all repositories in the related Bitbucket project when the server boots.

Pipelines and pipeline runs are pruned when a webhook trigger is received. Pipeline runs that are newer than the configured time window are protected from pruning. Older pipeline runs are cleaned up to not grow beyond the configured maximum amount. If all pipeline runs of one pipeline can be pruned, the whole pipeline is pruned. The pruning strategy is applied per repository and stage (DEV, QA, PROD) to avoid aggressive pruning of QA and PROD pipeline runs.
|===

Expand Down
3 changes: 3 additions & 0 deletions docs/design/software-requirements-specification.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ The tasks shall create artifacts of their work. Those artifacts shall be stored

| SRS-PIPELINE-MANAGER-5
| The pipeline manager shall prune pipelines and pipeline runs per repository and stage.

| SRS-PIPELINE-MANAGER-6
| The pipeline manager shall prevent concurrent pipeline runs for one repository.
|===

=== Tasks Requirements
Expand Down
14 changes: 14 additions & 0 deletions internal/manager/bitbucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,17 @@ func shouldSkip(bitbucketClient bitbucket.CommitClientInterface, projectKey, rep
}
return isCiSkipInCommitMessage(c.Message)
}

// getRepoNames retrieves the name of all repositories within the project
// identified by projectKey.
func getRepoNames(bitbucketClient bitbucket.RepoClientInterface, projectKey string) ([]string, error) {
repos := []string{}
rl, err := bitbucketClient.RepoList(projectKey)
if err != nil {
return repos, err
}
for _, n := range rl.Values {
repos = append(repos, n.Name)
}
return repos, nil
}
12 changes: 6 additions & 6 deletions internal/manager/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ const (
)

// createPipelineRun creates a PipelineRun resource
func createPipelineRun(tektonClient tektonClient.ClientPipelineRunInterface, ctxt context.Context, pData PipelineData) (*tekton.PipelineRun, error) {
pr, err := tektonClient.CreatePipelineRun(ctxt, &tekton.PipelineRun{
func createPipelineRun(tektonClient tektonClient.ClientPipelineRunInterface, ctxt context.Context, pData PipelineData, needQueueing bool) (*tekton.PipelineRun, error) {
pr := &tekton.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
GenerateName: fmt.Sprintf("%s-", pData.Name),
Labels: pipelineLabels(pData),
Expand All @@ -55,11 +55,11 @@ func createPipelineRun(tektonClient tektonClient.ClientPipelineRunInterface, ctx
},
},
},
}, metav1.CreateOptions{})
if err != nil {
return nil, err
}
return pr, nil
if needQueueing {
pr.Spec.Status = tekton.PipelineRunSpecStatusPending
}
return tektonClient.CreatePipelineRun(ctxt, pr, metav1.CreateOptions{})
}

// listPipelineRuns lists pipeline runs associated with repository.
Expand Down
64 changes: 64 additions & 0 deletions internal/manager/pipeline_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package manager

import (
"context"
"testing"

tektonClient "github.com/opendevstack/pipeline/internal/tekton"
"github.com/opendevstack/pipeline/pkg/config"
tekton "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
)

func TestCreatePipelineRun(t *testing.T) {
tc := &tektonClient.TestClient{}
ctxt := context.TODO()
pData := PipelineData{
Name: "foo",
Repository: "repo",
GitRef: "branch",
Stage: config.DevStage,
PVC: "pvc",
}
pr, err := createPipelineRun(tc, ctxt, pData, false)
if err != nil {
t.Fatal(err)
}
if pr.GenerateName != "foo-" {
t.Fatalf("Expected generated name to be foo-, got: %s", pr.GenerateName)
}
if pr.Spec.PipelineRef.Name != "foo" {
t.Fatalf("Expected pipeline ref to be foo, got: %s", pr.Spec.PipelineRef.Name)
}
if pr.Spec.Status != "" {
t.Fatalf("Expected status to be empty, got: %s", pr.Spec.Status)
}
if pr.Labels[repositoryLabel] != pData.Repository {
t.Fatalf("Expected label %s to be %s, got: %s", repositoryLabel, pData.Repository, pr.Labels[repositoryLabel])
}
if pr.Labels[gitRefLabel] != pData.GitRef {
t.Fatalf("Expected label %s to be %s, got: %s", gitRefLabel, pData.GitRef, pr.Labels[gitRefLabel])
}
if pr.Labels[stageLabel] != pData.Stage {
t.Fatalf("Expected label %s to be %s, got: %s", stageLabel, pData.Stage, pr.Labels[stageLabel])
}
workspaceCfg := pr.Spec.Workspaces[0]
if workspaceCfg.Name != sharedWorkspaceName {
t.Fatalf("Expected generated name to be %s, got: %s", sharedWorkspaceName, workspaceCfg.Name)
}
if workspaceCfg.PersistentVolumeClaim.ClaimName != "pvc" {
t.Fatalf("Expected generated name to be pvc, got: %s", workspaceCfg.Name)
}
if len(tc.CreatedPipelineRuns) != 1 {
t.Fatal("No pipeline run created")
}
pr, err = createPipelineRun(tc, ctxt, pData, true)
if err != nil {
t.Fatal(err)
}
if pr.Spec.Status != tekton.PipelineRunSpecStatusPending {
t.Fatalf("Expected status to be pending, got: %s", pr.Spec.Status)
}
if len(tc.CreatedPipelineRuns) != 2 {
t.Fatal("No pipeline run created")
}
}
143 changes: 143 additions & 0 deletions internal/manager/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package manager

import (
"context"
"fmt"
"math/rand"
"time"

tektonClient "github.com/opendevstack/pipeline/internal/tekton"
"github.com/opendevstack/pipeline/pkg/logging"
tekton "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// pipelineRunQueue manages multiple queues. These queues
// can be polled in vertain intervals.
type pipelineRunQueue struct {
queues map[string]bool
pollInterval time.Duration
// logger is the logger to send logging messages to.
logger logging.LeveledLoggerInterface
}

// StartPolling periodically checks status for given identifier.
// The time until the first time is not more than maxInitialWait.
func (q *pipelineRunQueue) StartPolling(pt QueueAdvancer, identifier string, maxInitialWait time.Duration) chan bool {
quit := make(chan bool)
if q.queues[identifier] {
close(quit)
return quit
}
q.queues[identifier] = true

maxInitialWaitSeconds := int(maxInitialWait.Seconds())
var ticker *time.Ticker
if maxInitialWaitSeconds > 1 {
initialWaitSeconds := rand.Intn(maxInitialWaitSeconds-1) + 1
ticker = time.NewTicker(time.Duration(initialWaitSeconds) * time.Second)
} else {
ticker = time.NewTicker(time.Second)
}
go func() {
for {
select {
case <-quit:
q.queues[identifier] = false
ticker.Stop()
return
case <-ticker.C:
ticker.Stop()
ticker = time.NewTicker(q.pollInterval)
q.logger.Debugf("Advancing queue for %s ...", identifier)
queueLength, err := pt.AdvanceQueue(identifier)
if err != nil {
q.logger.Warnf("error during poll tick: %s", err)
}
if queueLength == 0 {
q.logger.Debugf("Stopping to poll for %s ...", identifier)
close(quit)
}
}
}
}()

return quit
}

// QueueAdvancer is the interface passed to
// *pipelineRunQueue#StartPolling.
type QueueAdvancer interface {
// AdvanceQueue is called for each poll step.
AdvanceQueue(repository string) (int, error)
}

// Queue represents a pipeline run Queue. Pipelines of one repository must
// not run in parallel.
type Queue struct {
TektonClient tektonClient.ClientPipelineRunInterface
}

// needsQueueing checks if any run has either:
// - pending status set OR
// - is progressing
func needsQueueing(pipelineRuns *tekton.PipelineRunList) bool {
for _, pr := range pipelineRuns.Items {
if pr.Spec.Status == tekton.PipelineRunSpecStatusPending || pipelineRunIsProgressing(pr) {
return true
}
}
return false
}

// AdvanceQueue starts the oldest pending pipeline run if there is no
// progressing pipeline run at the moment.
// It returns the queue length.
func (s *Server) AdvanceQueue(repository string) (int, error) {
s.Mutex.Lock()
defer s.Mutex.Unlock()
ctxt, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
pipelineRuns, err := listPipelineRuns(s.TektonClient, ctxt, repository)
if err != nil {
return 0, fmt.Errorf("could not retrieve existing pipeline runs: %w", err)
}
s.Logger.Debugf("Found %d pipeline runs related to repository %s.", len(pipelineRuns.Items), repository)
if len(pipelineRuns.Items) == 0 {
return 0, nil
}

var foundRunning bool
pendingPrs := []tekton.PipelineRun{}
for _, pr := range pipelineRuns.Items {
if pr.IsPending() {
pendingPrs = append(pendingPrs, pr)
continue
}
if pipelineRunIsProgressing(pr) {
foundRunning = true
continue
}
}
s.Logger.Debugf("Found runs for repo %s in state running=%v, pending=%d.", repository, foundRunning, len(pendingPrs))

if !foundRunning && len(pendingPrs) > 0 {
// update oldest pending PR
sortPipelineRunsDescending(pendingPrs)
oldestPR := pendingPrs[len(pendingPrs)-1]
pendingPrs = pendingPrs[:len(pendingPrs)-1]
s.Logger.Infof("Starting pending pipeline run %s ...", oldestPR.Name)
oldestPR.Spec.Status = "" // remove pending status -> starts pipeline run
_, err := s.TektonClient.UpdatePipelineRun(ctxt, &oldestPR, metav1.UpdateOptions{})
if err != nil {
return len(pendingPrs), fmt.Errorf("could not update pipeline run %s: %w", oldestPR.Name, err)
}
}
return len(pendingPrs), nil
}

// pipelineRunIsProgressing returns true if the PR is not done, not pending,
// not cancelled, and not timed out.
func pipelineRunIsProgressing(pr tekton.PipelineRun) bool {
return !(pr.IsDone() || pr.IsPending() || pr.IsCancelled() || pr.IsTimedOut())
}
Loading

0 comments on commit b675529

Please sign in to comment.