Skip to content

Commit

Permalink
fix(scheduler): runs stuck in pending state (#722)
Browse files Browse the repository at this point in the history
A re-write of the run scheduler to resolve at least a couple of bugs:

* a run could be stuck in pending state if a workspace has only just
been created
* a run could be stuck in pending state after `otfd` is restarted
  • Loading branch information
leg100 authored Jan 18, 2025
1 parent 8df5334 commit 3d4306d
Show file tree
Hide file tree
Showing 26 changed files with 626 additions and 741 deletions.
5 changes: 2 additions & 3 deletions internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/leg100/otf/internal/resource"
"github.com/leg100/otf/internal/run"
"github.com/leg100/otf/internal/runner"
"github.com/leg100/otf/internal/scheduler"
"github.com/leg100/otf/internal/sql"
"github.com/leg100/otf/internal/state"
"github.com/leg100/otf/internal/team"
Expand Down Expand Up @@ -562,8 +561,8 @@ func (d *Daemon) Start(ctx context.Context, started chan struct{}) error {
Logger: d.Logger,
Exclusive: true,
DB: d.DB,
LockID: internal.Int64(scheduler.LockID),
System: scheduler.NewScheduler(scheduler.Options{
LockID: internal.Int64(run.SchedulerLockID),
System: run.NewScheduler(run.SchedulerOptions{
Logger: d.Logger,
WorkspaceClient: d.Workspaces,
RunClient: d.Runs,
Expand Down
4 changes: 2 additions & 2 deletions internal/integration/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestIntegration_Agents(t *testing.T) {
defer unsub()

// create a run on ws1
_ = daemon.createRun(t, ctx, ws1, nil)
_ = daemon.createRun(t, ctx, ws1, nil, nil)

// wait for job to be allocated to agent1
Wait(t, jobsSub, func(event pubsub.Event[*runner.Job]) bool {
Expand All @@ -66,7 +66,7 @@ func TestIntegration_Agents(t *testing.T) {
})

// create a run on ws2
_ = daemon.createRun(t, ctx, ws2, nil)
_ = daemon.createRun(t, ctx, ws2, nil, nil)

// wait for job to be allocated to agent2
Wait(t, jobsSub, func(event pubsub.Event[*runner.Job]) bool {
Expand Down
41 changes: 37 additions & 4 deletions internal/integration/daemon_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/leg100/otf/internal/module"
"github.com/leg100/otf/internal/notifications"
"github.com/leg100/otf/internal/organization"
"github.com/leg100/otf/internal/pubsub"
"github.com/leg100/otf/internal/releases"
"github.com/leg100/otf/internal/resource"
"github.com/leg100/otf/internal/run"
Expand All @@ -43,6 +44,8 @@ type (
*github.TestServer
// dowloader allows tests to download terraform
downloader
// run subscription for tests to check on run events
runEvents <-chan pubsub.Event[*run.Run]
}

// downloader downloads terraform versions
Expand Down Expand Up @@ -130,6 +133,10 @@ func setup(t *testing.T, cfg *config, gopts ...github.TestServerOption) (*testDa
// don't proceed until daemon has started.
<-started

// Subscribe to run events
runEvents, unsub := d.Runs.Watch(ctx)
t.Cleanup(unsub)

t.Cleanup(func() {
cancel() // terminates daemon
<-done // don't exit test until daemon is fully terminated
Expand All @@ -139,6 +146,7 @@ func setup(t *testing.T, cfg *config, gopts ...github.TestServerOption) (*testDa
Daemon: d,
TestServer: githubServer,
downloader: releases.NewDownloader(cfg.terraformBinDir),
runEvents: runEvents,
}

// create a dedicated user account and context for test to use.
Expand Down Expand Up @@ -191,6 +199,29 @@ func (s *testDaemon) getWorkspace(t *testing.T, ctx context.Context, workspaceID
return ws
}

func (s *testDaemon) getRun(t *testing.T, ctx context.Context, runID resource.ID) *run.Run {
t.Helper()

run, err := s.Runs.Get(ctx, runID)
require.NoError(t, err)
return run
}

func (s *testDaemon) waitRunStatus(t *testing.T, runID resource.ID, status run.Status) {
t.Helper()

for event := range s.runEvents {
if event.Payload.ID == runID {
if event.Payload.Status == status {
break
}
if event.Payload.Done() && event.Payload.Status != status {
t.Fatalf("expected run status %s but run finished with status %s", status, event.Payload.Status)
}
}
}
}

func (s *testDaemon) createVCSProvider(t *testing.T, ctx context.Context, org *organization.Organization) *vcsprovider.VCSProvider {
t.Helper()

Expand Down Expand Up @@ -306,7 +337,7 @@ func (s *testDaemon) createAndUploadConfigurationVersion(t *testing.T, ctx conte
return cv
}

func (s *testDaemon) createRun(t *testing.T, ctx context.Context, ws *workspace.Workspace, cv *configversion.ConfigurationVersion) *run.Run {
func (s *testDaemon) createRun(t *testing.T, ctx context.Context, ws *workspace.Workspace, cv *configversion.ConfigurationVersion, opts *run.CreateOptions) *run.Run {
t.Helper()

if ws == nil {
Expand All @@ -315,10 +346,12 @@ func (s *testDaemon) createRun(t *testing.T, ctx context.Context, ws *workspace.
if cv == nil {
cv = s.createConfigurationVersion(t, ctx, ws, nil)
}
if opts == nil {
opts = &run.CreateOptions{}
}
opts.ConfigurationVersionID = &cv.ID

run, err := s.Runs.Create(ctx, ws.ID, run.CreateOptions{
ConfigurationVersionID: &cv.ID,
})
run, err := s.Runs.Create(ctx, ws.ID, *opts)
require.NoError(t, err)
return run
}
Expand Down
8 changes: 4 additions & 4 deletions internal/integration/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestLogs(t *testing.T) {

t.Run("upload chunk", func(t *testing.T) {
svc, _, ctx := setup(t, nil)
run := svc.createRun(t, ctx, nil, nil)
run := svc.createRun(t, ctx, nil, nil, nil)

err := svc.Logs.PutChunk(ctx, logs.PutChunkOptions{
RunID: run.ID,
Expand All @@ -29,7 +29,7 @@ func TestLogs(t *testing.T) {

t.Run("reject empty chunk", func(t *testing.T) {
svc, _, ctx := setup(t, nil)
run := svc.createRun(t, ctx, nil, nil)
run := svc.createRun(t, ctx, nil, nil, nil)

err := svc.Logs.PutChunk(ctx, logs.PutChunkOptions{
RunID: run.ID,
Expand All @@ -40,7 +40,7 @@ func TestLogs(t *testing.T) {

t.Run("get chunk", func(t *testing.T) {
svc, _, ctx := setup(t, nil)
run := svc.createRun(t, ctx, nil, nil)
run := svc.createRun(t, ctx, nil, nil, nil)

err := svc.Logs.PutChunk(ctx, logs.PutChunkOptions{
RunID: run.ID,
Expand Down Expand Up @@ -141,7 +141,7 @@ func TestClusterLogs(t *testing.T) {
t.Cleanup(func() { cancel() })

// create run on local node
run := local.createRun(t, ctx, nil, nil)
run := local.createRun(t, ctx, nil, nil, nil)

// follow run's plan logs on remote node
sub, err := remote.Logs.Tail(ctx, logs.GetChunkOptions{
Expand Down
2 changes: 1 addition & 1 deletion internal/integration/notification_gcppubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestIntegration_NotificationGCPPubSub(t *testing.T) {
require.NoError(t, err)

cv := daemon.createAndUploadConfigurationVersion(t, ctx, ws, nil)
run := daemon.createRun(t, ctx, ws, cv)
run := daemon.createRun(t, ctx, ws, cv, nil)

// gcp-pubsub messages are not necessarily received in the same order as
// they are sent, so wait til all expected messages are received and then
Expand Down
2 changes: 1 addition & 1 deletion internal/integration/notification_slack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestIntegration_NotificationSlack(t *testing.T) {
require.NoError(t, err)

cv := daemon.createAndUploadConfigurationVersion(t, ctx, ws, nil)
_ = daemon.createRun(t, ctx, ws, cv)
_ = daemon.createRun(t, ctx, ws, cv, nil)

assert.Regexp(t, `run pending`, <-got)
assert.Regexp(t, `run planning`, <-got)
Expand Down
4 changes: 2 additions & 2 deletions internal/integration/remote_state_sharing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestRemoteStateSharing(t *testing.T) {
// listen to run events, and create run and apply
sub, unsub := daemon.Runs.Watch(ctx)
defer unsub()
_ = daemon.createRun(t, ctx, producer, producerCV)
_ = daemon.createRun(t, ctx, producer, producerCV, nil)
applied:
for event := range sub {
switch event.Payload.Status {
Expand Down Expand Up @@ -84,7 +84,7 @@ output "remote_foo" {
require.NoError(t, err)

// create run and apply
_ = daemon.createRun(t, ctx, consumer, consumerCV)
_ = daemon.createRun(t, ctx, consumer, consumerCV, nil)
for event := range sub {
switch event.Payload.Status {
case run.RunPlanned:
Expand Down
2 changes: 1 addition & 1 deletion internal/integration/retry_run_ui_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestIntegration_RetryRunUI(t *testing.T) {
sub, unsub := daemon.Runs.Watch(ctx)
defer unsub()
// create a run and wait for it reach planned-and-finished state
r := daemon.createRun(t, ctx, ws, cv)
r := daemon.createRun(t, ctx, ws, cv, nil)
for event := range sub {
if event.Payload.Status == run.RunErrored {
t.Fatal("run unexpectedly errored")
Expand Down
2 changes: 1 addition & 1 deletion internal/integration/run_cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestIntegration_RunCancel(t *testing.T) {
require.NoError(t, err)

cv := daemon.createAndUploadConfigurationVersion(t, ctx, ws, nil)
r := daemon.createRun(t, ctx, ws, cv)
r := daemon.createRun(t, ctx, ws, cv, nil)

// fake bin process has started
require.Equal(t, "started", <-got)
Expand Down
2 changes: 1 addition & 1 deletion internal/integration/run_error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestRunError(t *testing.T) {
defer logsUnsub()

// create run
_ = daemon.createRun(t, ctx, ws, cv)
_ = daemon.createRun(t, ctx, ws, cv, nil)

// wait for the run to report an error status and for the logs to contain
// the error message.
Expand Down
92 changes: 92 additions & 0 deletions internal/integration/run_scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package integration

import (
"testing"
"time"

"github.com/leg100/otf/internal/pubsub"
"github.com/leg100/otf/internal/resource"
otfrun "github.com/leg100/otf/internal/run"
"github.com/leg100/otf/internal/workspace"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestRunScheduler(t *testing.T) {
integrationTest(t)

daemon, _, ctx := setup(t, &config{})
user := userFromContext(t, ctx)

// watch workspace events
workspaceEvents, unsub := daemon.Workspaces.Watch(ctx)
defer unsub()

ws := daemon.createWorkspace(t, ctx, nil)
cv := daemon.createAndUploadConfigurationVersion(t, ctx, ws, nil)
run1 := daemon.createRun(t, ctx, ws, cv, nil)
run2 := daemon.createRun(t, ctx, ws, cv, nil)

// Wait for Run#1 to lock workspace
waitWorkspaceLock(t, workspaceEvents, &run1.ID)

// Wait for Run#1 to be planned
daemon.waitRunStatus(t, run1.ID, otfrun.RunPlanned)
// Run#2 should still be pending
assert.Equal(t, otfrun.RunPending, daemon.getRun(t, ctx, run2.ID).Status)

// Apply Run#1
err := daemon.Runs.Apply(ctx, run1.ID)
require.NoError(t, err)

// Wait for Run#1 to be applied
daemon.waitRunStatus(t, run1.ID, otfrun.RunApplied)

// Wait for Run#2 to lock workspace
waitWorkspaceLock(t, workspaceEvents, &run2.ID)

// Wait for Run#2 to be planned&finished (because there are no changes)
daemon.waitRunStatus(t, run2.ID, otfrun.RunPlannedAndFinished)

// Wait for workspace to be unlocked
waitWorkspaceLock(t, workspaceEvents, nil)

// User locks workspace
_, err = daemon.Workspaces.Lock(ctx, ws.ID, nil)
require.NoError(t, err)

// Create another run, it should remain in pending status.
run3 := daemon.createRun(t, ctx, ws, cv, nil)

// Workspace should still be locked by user
waitWorkspaceLock(t, workspaceEvents, &user.ID)

// User unlocks workspace
_, err = daemon.Workspaces.Unlock(ctx, ws.ID, nil, false)
require.NoError(t, err)

// Run #3 should now proceed to planned&finished
daemon.waitRunStatus(t, run3.ID, otfrun.RunPlannedAndFinished)
}

func waitWorkspaceLock(t *testing.T, events <-chan pubsub.Event[*workspace.Workspace], lock *resource.ID) {
t.Helper()

timeout := time.After(5 * time.Second)
for {
select {
case event := <-events:
if lock != nil {
if event.Payload.Lock != nil && *lock == *event.Payload.Lock {
return
}
} else {
if event.Payload.Lock == nil {
return
}
}
case <-timeout:
t.Fatalf("timed out waiting for workspace lock condition")
}
}
}
Loading

0 comments on commit 3d4306d

Please sign in to comment.