From f8d2b30d4b400f9ea0ac2612d5d4a2a196447935 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Samin?= Date: Mon, 24 May 2021 14:50:49 +0200 Subject: [PATCH] feat(sdk): restart goroutines (#5821) Signed-off-by: francois samin --- cli/cdsctl/events.go | 4 +- cli/cdsctl/workflow_log.go | 2 +- cli/cdsctl/workflow_transform_as_code.go | 4 +- engine/api/api.go | 20 ++-- engine/api/api_test.go | 4 +- engine/api/application_test.go | 2 +- engine/api/environment_ascode_test.go | 2 +- engine/api/migrate/migration.go | 2 +- engine/api/pipeline_test.go | 2 +- engine/api/websocket_test.go | 8 +- engine/api/workflow_ascode_test.go | 4 +- engine/api/workflow_purge_test.go | 2 +- engine/api/workflow_run_test.go | 2 +- engine/cdn/cdn.go | 3 +- engine/cdn/cdn_gc_test.go | 14 +-- engine/cdn/cdn_item_test.go | 8 +- engine/cdn/cdn_log_store_test.go | 6 +- engine/cdn/cdn_log_tcp_test.go | 10 +- engine/cdn/cdn_sync_test.go | 10 +- engine/cdn/cdn_test.go | 8 +- engine/cdn/item_logs_handler_test.go | 4 +- engine/cdn/item_upload_test.go | 4 +- engine/cdn/storage/dao_test.go | 2 +- engine/cdn/storage/nfs/nfs_test.go | 11 +- engine/cdn/storage/storageunit_run.go | 2 +- engine/cdn/storage/storageunit_test.go | 8 +- engine/cdn/storage/webdav/webdav.go | 2 +- engine/elasticsearch/elasticsearch.go | 2 +- engine/hatchery/kubernetes/kubernetes.go | 2 +- engine/hatchery/local/local.go | 2 +- engine/hatchery/marathon/marathon.go | 2 +- engine/hatchery/openstack/openstack.go | 2 +- engine/hatchery/swarm/swarm.go | 2 +- engine/hatchery/vsphere/hatchery.go | 2 +- engine/hatchery/vsphere/hatchery_test.go | 6 +- engine/hooks/hooks.go | 2 +- engine/migrateservice/service.go | 2 +- engine/repositories/repositories.go | 2 +- engine/repositories/repositories_test.go | 2 +- engine/ui/ui.go | 2 +- engine/vcs/vcs.go | 2 +- engine/vcs/vcs_test.go | 4 +- engine/worker/internal/builtin.go | 2 +- sdk/goroutine.go | 122 ++++++++++++++++++----- sdk/goroutine_test.go | 16 +-- 45 files changed, 199 insertions(+), 127 deletions(-) diff --git a/cli/cdsctl/events.go b/cli/cdsctl/events.go index eb891377ed..41aac8a819 100644 --- a/cli/cdsctl/events.go +++ b/cli/cdsctl/events.go @@ -60,8 +60,8 @@ func eventsListenRun(v cli.Values) error { chanMessageToSend := make(chan []sdk.WebsocketFilter) chanErrorReceived := make(chan error) - sdk.NewGoRoutines().Run(ctx, "WebsocketEventsListenCmd", func(ctx context.Context) { - client.WebsocketEventsListen(ctx, sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived) + sdk.NewGoRoutines(ctx).Run(ctx, "WebsocketEventsListenCmd", func(ctx context.Context) { + client.WebsocketEventsListen(ctx, sdk.NewGoRoutines(ctx), chanMessageToSend, chanMessageReceived, chanErrorReceived) }) switch { diff --git a/cli/cdsctl/workflow_log.go b/cli/cdsctl/workflow_log.go index 047a536583..fe136ccfa6 100644 --- a/cli/cdsctl/workflow_log.go +++ b/cli/cdsctl/workflow_log.go @@ -437,7 +437,7 @@ func workflowLogStreamRun(v cli.Values) error { chanMsgReceived := make(chan json.RawMessage) chanErrorReceived := make(chan error) - goRoutines := sdk.NewGoRoutines() + goRoutines := sdk.NewGoRoutines(ctx) goRoutines.Exec(ctx, "WebsocketEventsListenCmd", func(ctx context.Context) { for ctx.Err() == nil { if err := client.RequestWebsocket(ctx, goRoutines, fmt.Sprintf("%s/item/stream", link.CDNURL), chanMessageToSend, chanMsgReceived, chanErrorReceived); err != nil { diff --git a/cli/cdsctl/workflow_transform_as_code.go b/cli/cdsctl/workflow_transform_as_code.go index edcf6a0982..2662cef13b 100644 --- a/cli/cdsctl/workflow_transform_as_code.go +++ b/cli/cdsctl/workflow_transform_as_code.go @@ -53,8 +53,8 @@ func workflowTransformAsCodeRun(v cli.Values) (interface{}, error) { chanMessageToSend := make(chan []sdk.WebsocketFilter) chanErrorReceived := make(chan error) - sdk.NewGoRoutines().Run(ctx, "WebsocketEventsListenCmd", func(ctx context.Context) { - client.WebsocketEventsListen(ctx, sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived) + sdk.NewGoRoutines(ctx).Run(ctx, "WebsocketEventsListenCmd", func(ctx context.Context) { + client.WebsocketEventsListen(ctx, sdk.NewGoRoutines(ctx), chanMessageToSend, chanMessageReceived, chanErrorReceived) }) ope, err := client.WorkflowTransformAsCode(projectKey, v.GetString(_WorkflowName), branch, message) diff --git a/engine/api/api.go b/engine/api/api.go index b9e9f4d383..3c2d2e9592 100644 --- a/engine/api/api.go +++ b/engine/api/api.go @@ -545,7 +545,7 @@ func (a *API) Serve(ctx context.Context) error { } log.Info(ctx, "Initializing HTTP router") - a.GoRoutines = sdk.NewGoRoutines() + a.GoRoutines = sdk.NewGoRoutines(ctx) a.Router = &Router{ Mux: mux.NewRouter(), Background: ctx, @@ -652,12 +652,12 @@ func (a *API) Serve(ctx context.Context) error { log.Error(ctx, "error while initializing event system: %s", err) } - a.GoRoutines.Run(ctx, "event.dequeue", func(ctx context.Context) { + a.GoRoutines.RunWithRestart(ctx, "event.dequeue", func(ctx context.Context) { event.DequeueEvent(ctx, a.mustDB()) }) log.Info(ctx, "Initializing internal routines...") - a.GoRoutines.Run(ctx, "maintenance.Subscribe", func(ctx context.Context) { + a.GoRoutines.RunWithRestart(ctx, "maintenance.Subscribe", func(ctx context.Context) { if err := a.listenMaintenance(ctx); err != nil { log.Error(ctx, "error while initializing listen maintenance routine: %s", err) } @@ -668,7 +668,7 @@ func (a *API) Serve(ctx context.Context) error { log.Error(ctx, "error while initializing worker models routine: %s", err) } }) - a.GoRoutines.Run(ctx, "worker.Initialize", func(ctx context.Context) { + a.GoRoutines.RunWithRestart(ctx, "worker.Initialize", func(ctx context.Context) { if err := worker.Initialize(ctx, a.DBConnectionFactory.GetDBMap(gorpmapping.Mapper), a.Cache); err != nil { log.Error(ctx, "error while initializing workers routine: %s", err) } @@ -684,25 +684,25 @@ func (a *API) Serve(ctx context.Context) error { a.GoRoutines.Run(ctx, "audit.ComputeWorkflowAudit", func(ctx context.Context) { audit.ComputeWorkflowAudit(ctx, a.DBConnectionFactory.GetDBMap(gorpmapping.Mapper)) }) - a.GoRoutines.Run(ctx, "auditCleanerRoutine(ctx", func(ctx context.Context) { + a.GoRoutines.Run(ctx, "auditCleanerRoutine", func(ctx context.Context) { auditCleanerRoutine(ctx, a.DBConnectionFactory.GetDBMap(gorpmapping.Mapper)) }) - a.GoRoutines.Run(ctx, "repositoriesmanager.ReceiveEvents", func(ctx context.Context) { + a.GoRoutines.RunWithRestart(ctx, "repositoriesmanager.ReceiveEvents", func(ctx context.Context) { repositoriesmanager.ReceiveEvents(ctx, a.DBConnectionFactory.GetDBMap(gorpmapping.Mapper), a.Cache) }) - a.GoRoutines.Run(ctx, "services.KillDeadServices", func(ctx context.Context) { + a.GoRoutines.RunWithRestart(ctx, "services.KillDeadServices", func(ctx context.Context) { services.KillDeadServices(ctx, a.mustDB) }) a.GoRoutines.Run(ctx, "broadcast.Initialize", func(ctx context.Context) { broadcast.Initialize(ctx, a.DBConnectionFactory.GetDBMap(gorpmapping.Mapper)) }) - a.GoRoutines.Run(ctx, "api.serviceAPIHeartbeat", func(ctx context.Context) { + a.GoRoutines.RunWithRestart(ctx, "api.serviceAPIHeartbeat", func(ctx context.Context) { a.serviceAPIHeartbeat(ctx) }) - a.GoRoutines.Run(ctx, "authentication.SessionCleaner", func(ctx context.Context) { + a.GoRoutines.RunWithRestart(ctx, "authentication.SessionCleaner", func(ctx context.Context) { authentication.SessionCleaner(ctx, a.mustDB, 10*time.Second) }) - a.GoRoutines.Run(ctx, "api.WorkflowRunCraft", func(ctx context.Context) { + a.GoRoutines.RunWithRestart(ctx, "api.WorkflowRunCraft", func(ctx context.Context) { a.WorkflowRunCraft(ctx, 100*time.Millisecond) }) diff --git a/engine/api/api_test.go b/engine/api/api_test.go index 9d9d9ed139..7f57b7e524 100644 --- a/engine/api/api_test.go +++ b/engine/api/api_test.go @@ -42,7 +42,7 @@ func newTestAPI(t *testing.T, bootstrapFunc ...test.Bootstrapf) (*API, *test.Fak api.AuthenticationDrivers[sdk.ConsumerBuiltin] = builtin.NewDriver() api.AuthenticationDrivers[sdk.ConsumerTest] = authdrivertest.NewDriver(t) api.AuthenticationDrivers[sdk.ConsumerTest2] = authdrivertest.NewDriver(t) - api.GoRoutines = sdk.NewGoRoutines() + api.GoRoutines = sdk.NewGoRoutines(context.TODO()) api.InitRouter() @@ -90,7 +90,7 @@ func newTestServer(t *testing.T, bootstrapFunc ...test.Bootstrapf) (*API, *test. api.AuthenticationDrivers = make(map[sdk.AuthConsumerType]sdk.AuthDriver) api.AuthenticationDrivers[sdk.ConsumerLocal] = local.NewDriver(context.TODO(), false, "http://localhost:8080", "") api.AuthenticationDrivers[sdk.ConsumerBuiltin] = builtin.NewDriver() - api.GoRoutines = sdk.NewGoRoutines() + api.GoRoutines = sdk.NewGoRoutines(context.TODO()) api.InitRouter() ts := httptest.NewServer(router.Mux) diff --git a/engine/api/application_test.go b/engine/api/application_test.go index 00c2c9ccd1..897c6e286c 100644 --- a/engine/api/application_test.go +++ b/engine/api/application_test.go @@ -274,7 +274,7 @@ func TestUpdateAsCodeApplicationHandler(t *testing.T) { chanMessageReceived := make(chan sdk.WebsocketEvent) chanMessageToSend := make(chan []sdk.WebsocketFilter) chanErrorReceived := make(chan error) - go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived) + go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(context.TODO()), chanMessageToSend, chanMessageReceived, chanErrorReceived) chanMessageToSend <- []sdk.WebsocketFilter{{ Type: sdk.WebsocketFilterTypeAscodeEvent, ProjectKey: proj.Key, diff --git a/engine/api/environment_ascode_test.go b/engine/api/environment_ascode_test.go index a50cd73ef1..46024564b5 100644 --- a/engine/api/environment_ascode_test.go +++ b/engine/api/environment_ascode_test.go @@ -233,7 +233,7 @@ func TestUpdateAsCodeEnvironmentHandler(t *testing.T) { chanMessageReceived := make(chan sdk.WebsocketEvent) chanMessageToSend := make(chan []sdk.WebsocketFilter) chanErrorReceived := make(chan error) - go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived) + go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(context.TODO()), chanMessageToSend, chanMessageReceived, chanErrorReceived) chanMessageToSend <- []sdk.WebsocketFilter{{ Type: sdk.WebsocketFilterTypeAscodeEvent, ProjectKey: proj.Key, diff --git a/engine/api/migrate/migration.go b/engine/api/migrate/migration.go index 1106144985..5f2ef932c5 100644 --- a/engine/api/migrate/migration.go +++ b/engine/api/migrate/migration.go @@ -46,7 +46,7 @@ func Run(ctx context.Context, db gorp.SqlExecutor) { wg.Add(1) } - sdk.NewGoRoutines().Run(ctx, "migrate_"+currentMigration.Name, func(contex context.Context) { + sdk.NewGoRoutines(ctx).Run(ctx, "migrate_"+currentMigration.Name, func(contex context.Context) { defer func() { if currentMigration.Blocker { wg.Done() diff --git a/engine/api/pipeline_test.go b/engine/api/pipeline_test.go index 5e4c127990..5100a46d7f 100644 --- a/engine/api/pipeline_test.go +++ b/engine/api/pipeline_test.go @@ -205,7 +205,7 @@ func TestUpdateAsCodePipelineHandler(t *testing.T) { chanMessageToSend := make(chan []sdk.WebsocketFilter) chanErrorReceived := make(chan error) - go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived) + go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(context.TODO()), chanMessageToSend, chanMessageReceived, chanErrorReceived) chanMessageToSend <- []sdk.WebsocketFilter{{ Type: sdk.WebsocketFilterTypeAscodeEvent, ProjectKey: proj.Key, diff --git a/engine/api/websocket_test.go b/engine/api/websocket_test.go index a99db96ad3..4e9434b841 100644 --- a/engine/api/websocket_test.go +++ b/engine/api/websocket_test.go @@ -45,7 +45,7 @@ func Test_websocketWrongFilters(t *testing.T) { InsecureSkipVerifyTLS: true, BuitinConsumerAuthenticationToken: jws, }) - go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived) + go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(context.TODO()), chanMessageToSend, chanMessageReceived, chanErrorReceived) // Subscribe to project without project key chanMessageToSend <- []sdk.WebsocketFilter{{ @@ -126,7 +126,7 @@ func Test_websocketGetWorkflowEvent(t *testing.T) { InsecureSkipVerifyTLS: true, SessionToken: jwt, }) - go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived) + go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(context.TODO()), chanMessageToSend, chanMessageReceived, chanErrorReceived) var lastResponse *sdk.WebsocketEvent go func() { for e := range chanMessageReceived { @@ -268,7 +268,7 @@ func TestWebsocketNoEventLoose(t *testing.T) { InsecureSkipVerifyTLS: true, SessionToken: jwt, }) - go client1.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chan1MessageToSend, chan1MessageReceived, chan1ErrorReceived) + go client1.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(ctx), chan1MessageToSend, chan1MessageReceived, chan1ErrorReceived) var client1EventCount int64 go func() { for { @@ -299,7 +299,7 @@ func TestWebsocketNoEventLoose(t *testing.T) { SessionToken: jwt, }) var client2EventCount int64 - go client2.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chan2MessageToSend, chan2MessageReceived, chan2ErrorReceived) + go client2.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(ctx), chan2MessageToSend, chan2MessageReceived, chan2ErrorReceived) go func() { for { select { diff --git a/engine/api/workflow_ascode_test.go b/engine/api/workflow_ascode_test.go index 056f185253..87be103a28 100644 --- a/engine/api/workflow_ascode_test.go +++ b/engine/api/workflow_ascode_test.go @@ -211,7 +211,7 @@ func TestPostUpdateWorkflowAsCodeHandler(t *testing.T) { chanMessageReceived := make(chan sdk.WebsocketEvent) chanMessageToSend := make(chan []sdk.WebsocketFilter) chanErrorReceived := make(chan error) - go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived) + go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(context.TODO()), chanMessageToSend, chanMessageReceived, chanErrorReceived) chanMessageToSend <- []sdk.WebsocketFilter{{ Type: sdk.WebsocketFilterTypeAscodeEvent, ProjectKey: proj.Key, @@ -422,7 +422,7 @@ func TestPostMigrateWorkflowAsCodeHandler(t *testing.T) { chanMessageReceived := make(chan sdk.WebsocketEvent) chanMessageToSend := make(chan []sdk.WebsocketFilter) chanErrorReceived := make(chan error) - go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived) + go client.WebsocketEventsListen(context.TODO(), sdk.NewGoRoutines(context.TODO()), chanMessageToSend, chanMessageReceived, chanErrorReceived) chanMessageToSend <- []sdk.WebsocketFilter{{ Type: sdk.WebsocketFilterTypeAscodeEvent, ProjectKey: proj.Key, diff --git a/engine/api/workflow_purge_test.go b/engine/api/workflow_purge_test.go index 90031e8663..5503d10357 100644 --- a/engine/api/workflow_purge_test.go +++ b/engine/api/workflow_purge_test.go @@ -98,7 +98,7 @@ func Test_purgeDryRunHandler(t *testing.T) { }) contextWS, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - go client.WebsocketEventsListen(contextWS, sdk.NewGoRoutines(), chanMessageToSend, chanMessageReceived, chanErrorReceived) + go client.WebsocketEventsListen(contextWS, sdk.NewGoRoutines(context.TODO()), chanMessageToSend, chanMessageReceived, chanErrorReceived) // Subscribe to workflow retention chanMessageToSend <- []sdk.WebsocketFilter{{ diff --git a/engine/api/workflow_run_test.go b/engine/api/workflow_run_test.go index 9cc0f984fa..255f3edfac 100644 --- a/engine/api/workflow_run_test.go +++ b/engine/api/workflow_run_test.go @@ -1407,7 +1407,7 @@ func Test_postWorkflowRunAsyncFailedHandler(t *testing.T) { OperationUUID: ope.UUID, } - ascode.UpdateAsCodeResult(context.TODO(), api.mustDB(), api.Cache, sdk.NewGoRoutines(), *proj, *w1, app, ed, u) + ascode.UpdateAsCodeResult(context.TODO(), api.mustDB(), api.Cache, sdk.NewGoRoutines(context.TODO()), *proj, *w1, app, ed, u) // Prepare request uri := router.GetRoute("POST", api.postWorkflowRunHandler, map[string]string{ diff --git a/engine/cdn/cdn.go b/engine/cdn/cdn.go index 984228032b..f52eaa3f72 100644 --- a/engine/cdn/cdn.go +++ b/engine/cdn/cdn.go @@ -36,8 +36,7 @@ const ( // New returns a new service func New() *Service { s := new(Service) - s.GoRoutines = sdk.NewGoRoutines() - + s.GoRoutines = sdk.NewGoRoutines(context.Background()) return s } diff --git a/engine/cdn/cdn_gc_test.go b/engine/cdn/cdn_gc_test.go index f6258bc0f5..0bd0d60ca0 100644 --- a/engine/cdn/cdn_gc_test.go +++ b/engine/cdn/cdn_gc_test.go @@ -39,7 +39,7 @@ func TestCleanSynchronizedItem(t *testing.T) { Cache: cache, Mapper: m, } - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.TODO()) tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*") require.NoError(t, err) @@ -47,7 +47,7 @@ func TestCleanSynchronizedItem(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) t.Cleanup(cancel) - cdnUnits, err := storage.Init(ctx, m, cache, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{ + cdnUnits, err := storage.Init(ctx, m, cache, db.DbMap, sdk.NewGoRoutines(ctx), storage.Configuration{ HashLocatorSalt: "thisismysalt", Buffers: map[string]storage.BufferConfiguration{ "redis_buffer": { @@ -208,7 +208,7 @@ func TestCleanSynchronizedItemWithDisabledStorage(t *testing.T) { Cache: cache, Mapper: m, } - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.TODO()) tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*") require.NoError(t, err) @@ -216,7 +216,7 @@ func TestCleanSynchronizedItemWithDisabledStorage(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) t.Cleanup(cancel) - cdnUnits, err := storage.Init(ctx, m, cache, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{ + cdnUnits, err := storage.Init(ctx, m, cache, db.DbMap, sdk.NewGoRoutines(ctx), storage.Configuration{ HashLocatorSalt: "thisismysalt", Buffers: map[string]storage.BufferConfiguration{ "redis_buffer": { @@ -351,7 +351,7 @@ func TestCleanWaitingItem(t *testing.T) { Cache: cache, Mapper: m, } - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.TODO()) ctx, cancel := context.WithCancel(context.TODO()) t.Cleanup(cancel) @@ -402,7 +402,7 @@ func TestCleanWaitingItemWithoutItemUnit(t *testing.T) { Cache: cache, Mapper: m, } - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.TODO()) ctx, cancel := context.WithCancel(context.TODO()) t.Cleanup(cancel) @@ -446,7 +446,7 @@ func TestPurgeItem(t *testing.T) { Cache: cache, Mapper: m, } - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.TODO()) ctx, cancel := context.WithCancel(context.TODO()) t.Cleanup(cancel) diff --git a/engine/cdn/cdn_item_test.go b/engine/cdn/cdn_item_test.go index 4d728ad7c1..6c336c3ab7 100644 --- a/engine/cdn/cdn_item_test.go +++ b/engine/cdn/cdn_item_test.go @@ -41,7 +41,7 @@ func TestGetItemValue(t *testing.T) { Cache: cache, Mapper: m, } - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.TODO()) ctx, cancel := context.WithCancel(context.TODO()) t.Cleanup(cancel) @@ -202,7 +202,7 @@ func TestGetItemValue_ThousandLines(t *testing.T) { Cache: cache, Mapper: m, } - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.TODO()) ctx, cancel := context.WithCancel(context.TODO()) t.Cleanup(cancel) @@ -309,7 +309,7 @@ func TestGetItemValue_Reverse(t *testing.T) { Cache: cache, Mapper: m, } - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.TODO()) ctx, cancel := context.WithCancel(context.TODO()) t.Cleanup(cancel) @@ -419,7 +419,7 @@ func TestGetItemValue_ThousandLinesReverse(t *testing.T) { Mapper: m, } s.Cfg.Log.StepMaxSize = 200000 - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.TODO()) ctx, cancel := context.WithCancel(context.TODO()) t.Cleanup(cancel) diff --git a/engine/cdn/cdn_log_store_test.go b/engine/cdn/cdn_log_store_test.go index 6d4773da87..242310c01a 100644 --- a/engine/cdn/cdn_log_store_test.go +++ b/engine/cdn/cdn_log_store_test.go @@ -40,7 +40,7 @@ func TestStoreNewStepLog(t *testing.T) { Cache: cache, Mapper: m, } - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.TODO()) ctx, cancel := context.WithCancel(context.TODO()) t.Cleanup(cancel) @@ -123,7 +123,7 @@ func TestStoreLastStepLog(t *testing.T) { Cache: cache, Mapper: m, } - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.TODO()) ctx, cancel := context.WithCancel(context.TODO()) t.Cleanup(cancel) @@ -212,7 +212,7 @@ func TestStoreNewServiceLog(t *testing.T) { Cache: cache, Mapper: m, } - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.TODO()) ctx, cancel := context.WithCancel(context.TODO()) t.Cleanup(cancel) diff --git a/engine/cdn/cdn_log_tcp_test.go b/engine/cdn/cdn_log_tcp_test.go index f13d21cb1d..88aa417bc8 100644 --- a/engine/cdn/cdn_log_tcp_test.go +++ b/engine/cdn/cdn_log_tcp_test.go @@ -61,7 +61,7 @@ func TestWorkerLogCDNEnabled(t *testing.T) { } tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*") require.NoError(t, err) - cdnUnits, err := storage.Init(context.TODO(), m, store, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{ + cdnUnits, err := storage.Init(context.TODO(), m, store, db.DbMap, sdk.NewGoRoutines(context.TODO()), storage.Configuration{ HashLocatorSalt: "thisismysalt", Buffers: map[string]storage.BufferConfiguration{ "redis_buffer": { @@ -84,7 +84,7 @@ func TestWorkerLogCDNEnabled(t *testing.T) { s.Units = cdnUnits s.Cfg.Log.StepMaxSize = 1000 - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.TODO()) signature := cdn.Signature{ Worker: &cdn.SignatureWorker{ @@ -177,10 +177,10 @@ func TestServiceLogCDNDisabled(t *testing.T) { Mapper: m, } s.Cfg.Log.StepMaxSize = 1000 - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.TODO()) tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*") require.NoError(t, err) - cdnUnits, err := storage.Init(context.TODO(), m, store, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{ + cdnUnits, err := storage.Init(context.TODO(), m, store, db.DbMap, sdk.NewGoRoutines(context.TODO()), storage.Configuration{ HashLocatorSalt: "thisismysalt", Buffers: map[string]storage.BufferConfiguration{ "redis_buffer": { @@ -292,7 +292,7 @@ func TestStoreTruncatedLogs(t *testing.T) { Cache: cache, Mapper: m, } - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.TODO()) ctx, ccl := context.WithCancel(context.TODO()) t.Cleanup(ccl) diff --git a/engine/cdn/cdn_sync_test.go b/engine/cdn/cdn_sync_test.go index 857f35c6f3..34b22762f9 100644 --- a/engine/cdn/cdn_sync_test.go +++ b/engine/cdn/cdn_sync_test.go @@ -39,10 +39,10 @@ func TestSyncBuffer(t *testing.T) { Cache: cache, Mapper: m, Common: service.Common{ - GoRoutines: sdk.NewGoRoutines(), + GoRoutines: sdk.NewGoRoutines(context.TODO()), }, } - cdnUnits, err := storage.Init(context.Background(), m, cache, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{ + cdnUnits, err := storage.Init(context.Background(), m, cache, db.DbMap, sdk.NewGoRoutines(context.TODO()), storage.Configuration{ HashLocatorSalt: "thisismysalt", Buffers: map[string]storage.BufferConfiguration{ "redis_buffer": { @@ -96,7 +96,7 @@ func TestSyncLog(t *testing.T) { Cache: cache, Mapper: m, Common: service.Common{ - GoRoutines: sdk.NewGoRoutines(), + GoRoutines: sdk.NewGoRoutines(context.TODO()), }, } @@ -106,7 +106,7 @@ func TestSyncLog(t *testing.T) { tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-*") require.NoError(t, err) - cdnUnits, err := storage.Init(ctx, m, cache, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{ + cdnUnits, err := storage.Init(ctx, m, cache, db.DbMap, sdk.NewGoRoutines(ctx), storage.Configuration{ HashLocatorSalt: "thisismysalt", SyncNbElements: 100, SyncSeconds: 1, @@ -136,7 +136,7 @@ func TestSyncLog(t *testing.T) { }) require.NoError(t, err) - cdnUnits.Start(ctx, sdk.NewGoRoutines()) + cdnUnits.Start(ctx, sdk.NewGoRoutines(ctx)) s.Units = cdnUnits cdsStorage, ok := s.Units.Storages[0].(*cds.CDS) diff --git a/engine/cdn/cdn_test.go b/engine/cdn/cdn_test.go index df8e1ff6e5..226491eb2e 100644 --- a/engine/cdn/cdn_test.go +++ b/engine/cdn/cdn_test.go @@ -64,11 +64,11 @@ func newTestService(t *testing.T) (*Service, *test.FakeTransaction) { Cache: cache, Mapper: m, } - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.TODO()) if fakeAPIPrivateKey.key == nil { fakeAPIPrivateKey.key, _ = jws.NewRandomRSAKey() } - s.Common.GoRoutines = sdk.NewGoRoutines() + s.Common.GoRoutines = sdk.NewGoRoutines(context.TODO()) s.ParsedAPIPublicKey = &fakeAPIPrivateKey.key.PublicKey ctx, cancel := context.WithCancel(context.Background()) @@ -123,7 +123,7 @@ func newRunningStorageUnits(t *testing.T, m *gorpmapper.Mapper, dbMap *gorp.DbMa ctx, cancel := context.WithTimeout(ctx, 5*time.Second) t.Cleanup(cancel) - cdnUnits, err := storage.Init(ctx, m, store, dbMap, sdk.NewGoRoutines(), storage.Configuration{ + cdnUnits, err := storage.Init(ctx, m, store, dbMap, sdk.NewGoRoutines(ctx), storage.Configuration{ SyncSeconds: 2, SyncNbElements: 100, PurgeSeconds: 30, @@ -161,6 +161,6 @@ func newRunningStorageUnits(t *testing.T, m *gorpmapper.Mapper, dbMap *gorp.DbMa }, }) require.NoError(t, err) - cdnUnits.Start(ctx, sdk.NewGoRoutines()) + cdnUnits.Start(ctx, sdk.NewGoRoutines(ctx)) return cdnUnits } diff --git a/engine/cdn/item_logs_handler_test.go b/engine/cdn/item_logs_handler_test.go index 8903456ed3..6d42cb3c0d 100644 --- a/engine/cdn/item_logs_handler_test.go +++ b/engine/cdn/item_logs_handler_test.go @@ -383,7 +383,7 @@ func TestGetItemLogsStreamHandler(t *testing.T) { chanMsgReceived := make(chan json.RawMessage, 10) chanErrorReceived := make(chan error, 10) go func() { - chanErrorReceived <- client.RequestWebsocket(ctx, sdk.NewGoRoutines(), uri, chanMsgToSend, chanMsgReceived, chanErrorReceived) + chanErrorReceived <- client.RequestWebsocket(ctx, sdk.NewGoRoutines(ctx), uri, chanMsgToSend, chanMsgReceived, chanErrorReceived) }() buf, err := json.Marshal(sdk.CDNStreamFilter{ ItemType: sdk.CDNTypeItemStepLog, @@ -441,7 +441,7 @@ func TestGetItemLogsStreamHandler(t *testing.T) { ctx, cancel = context.WithTimeout(context.TODO(), time.Second*10) t.Cleanup(func() { cancel() }) go func() { - chanErrorReceived <- client.RequestWebsocket(ctx, sdk.NewGoRoutines(), uri, chanMsgToSend, chanMsgReceived, chanErrorReceived) + chanErrorReceived <- client.RequestWebsocket(ctx, sdk.NewGoRoutines(ctx), uri, chanMsgToSend, chanMsgReceived, chanErrorReceived) }() buf, err = json.Marshal(sdk.CDNStreamFilter{ ItemType: sdk.CDNTypeItemStepLog, diff --git a/engine/cdn/item_upload_test.go b/engine/cdn/item_upload_test.go index 3c0c94f13b..5ff443837d 100644 --- a/engine/cdn/item_upload_test.go +++ b/engine/cdn/item_upload_test.go @@ -53,7 +53,7 @@ func TestPostUploadHandler(t *testing.T) { tmpDir2, err := ioutil.TempDir("", t.Name()+"-cdn-2-*") require.NoError(t, err) - cdnUnits, err := storage.Init(ctx, s.Mapper, s.Cache, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{ + cdnUnits, err := storage.Init(ctx, s.Mapper, s.Cache, db.DbMap, sdk.NewGoRoutines(ctx), storage.Configuration{ SyncSeconds: 1, SyncNbElements: 1000, PurgeNbElements: 1000, @@ -100,7 +100,7 @@ func TestPostUploadHandler(t *testing.T) { }) require.NoError(t, err) s.Units = cdnUnits - cdnUnits.Start(ctx, sdk.NewGoRoutines()) + cdnUnits.Start(ctx, sdk.NewGoRoutines(ctx)) // Mock cds client s.Client = cdsclient.New(cdsclient.Config{Host: "http://lolcat.api", InsecureSkipVerifyTLS: false}) diff --git a/engine/cdn/storage/dao_test.go b/engine/cdn/storage/dao_test.go index 95b4ee95b9..18d10cc693 100644 --- a/engine/cdn/storage/dao_test.go +++ b/engine/cdn/storage/dao_test.go @@ -48,7 +48,7 @@ func TestLoadAllItemIDUnknownByUnit(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) t.Cleanup(cancel) - cdnUnits, err := storage.Init(ctx, m, store, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{ + cdnUnits, err := storage.Init(ctx, m, store, db.DbMap, sdk.NewGoRoutines(ctx), storage.Configuration{ HashLocatorSalt: "thisismysalt", Buffers: map[string]storage.BufferConfiguration{ "redis_buffer": { diff --git a/engine/cdn/storage/nfs/nfs_test.go b/engine/cdn/storage/nfs/nfs_test.go index 898fd2b638..0ed8de2be6 100644 --- a/engine/cdn/storage/nfs/nfs_test.go +++ b/engine/cdn/storage/nfs/nfs_test.go @@ -3,16 +3,17 @@ package nfs import ( "context" "fmt" + "io/ioutil" + "os" + "testing" + "time" + "github.com/ovh/cds/engine/cdn/storage" "github.com/ovh/cds/sdk" "github.com/ovh/symmecrypt/ciphers/aesgcm" "github.com/ovh/symmecrypt/keyloader" "github.com/rockbears/log" "github.com/stretchr/testify/require" - "io/ioutil" - "os" - "testing" - "time" ) // To run export the mentionned env variables: NFS_HOST, NFS_PARTITION @@ -63,7 +64,7 @@ func TestNFSReadWrite(t *testing.T) { require.NotNil(t, d) bd, is := d.(storage.BufferUnit) require.True(t, is) - bd.New(sdk.NewGoRoutines(), storage.AbstractUnitConfig{}) + bd.New(sdk.NewGoRoutines(ctx), storage.AbstractUnitConfig{}) err := bd.Init(ctx, &storage.NFSBufferConfiguration{ Host: nfsHost, TargetPartition: nfsTargetPath, diff --git a/engine/cdn/storage/storageunit_run.go b/engine/cdn/storage/storageunit_run.go index 085cd42502..3e23cca111 100644 --- a/engine/cdn/storage/storageunit_run.go +++ b/engine/cdn/storage/storageunit_run.go @@ -160,7 +160,7 @@ func (x *RunningStorageUnits) runItem(ctx context.Context, tx gorpmapper.SqlExec chanError := make(chan error) pr, pw := io.Pipe() - gr := sdk.NewGoRoutines() + gr := sdk.NewGoRoutines(ctx) gr.Exec(ctx, "runningStorageUnits.runItem.read", func(ctx context.Context) { defer pw.Close() diff --git a/engine/cdn/storage/storageunit_test.go b/engine/cdn/storage/storageunit_test.go index f842f1e66e..17fd2a0f46 100644 --- a/engine/cdn/storage/storageunit_test.go +++ b/engine/cdn/storage/storageunit_test.go @@ -47,7 +47,7 @@ func TestDeduplicationCrossType(t *testing.T) { tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*") require.NoError(t, err) - cdnUnits, err := storage.Init(ctx, m, cache, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{ + cdnUnits, err := storage.Init(ctx, m, cache, db.DbMap, sdk.NewGoRoutines(ctx), storage.Configuration{ SyncSeconds: 10, SyncNbElements: 100, HashLocatorSalt: "thisismysalt", @@ -83,7 +83,7 @@ func TestDeduplicationCrossType(t *testing.T) { }) require.NoError(t, err) require.NotNil(t, cdnUnits) - cdnUnits.Start(ctx, sdk.NewGoRoutines()) + cdnUnits.Start(ctx, sdk.NewGoRoutines(ctx)) units, err := storage.LoadAllUnits(ctx, m, db.DbMap) require.NoError(t, err) @@ -242,7 +242,7 @@ func TestRun(t *testing.T) { tmpDir2, err := ioutil.TempDir("", t.Name()+"-cdn-2-*") require.NoError(t, err) - cdnUnits, err := storage.Init(ctx, m, cache, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{ + cdnUnits, err := storage.Init(ctx, m, cache, db.DbMap, sdk.NewGoRoutines(ctx), storage.Configuration{ SyncSeconds: 10, SyncNbElements: 100, HashLocatorSalt: "thisismysalt", @@ -284,7 +284,7 @@ func TestRun(t *testing.T) { }) require.NoError(t, err) require.NotNil(t, cdnUnits) - cdnUnits.Start(ctx, sdk.NewGoRoutines()) + cdnUnits.Start(ctx, sdk.NewGoRoutines(ctx)) units, err := storage.LoadAllUnits(ctx, m, db.DbMap) require.NoError(t, err) diff --git a/engine/cdn/storage/webdav/webdav.go b/engine/cdn/storage/webdav/webdav.go index e96401feaa..b5be62aeb0 100644 --- a/engine/cdn/storage/webdav/webdav.go +++ b/engine/cdn/storage/webdav/webdav.go @@ -83,7 +83,7 @@ func (s *Webdav) NewWriter(ctx context.Context, i sdk.CDNItemUnit) (io.WriteClos return nil, err } pr, pw := io.Pipe() - gr := sdk.NewGoRoutines() + gr := sdk.NewGoRoutines(ctx) gr.Exec(ctx, "webdav.newWriter", func(ctx context.Context) { if err := s.client.WriteStream(f, pr, os.FileMode(0600)); err != nil { log.Error(context.Background(), "unable to write stream %s: %v", f, err) diff --git a/engine/elasticsearch/elasticsearch.go b/engine/elasticsearch/elasticsearch.go index b8f3da4d54..dac6d86559 100644 --- a/engine/elasticsearch/elasticsearch.go +++ b/engine/elasticsearch/elasticsearch.go @@ -20,7 +20,7 @@ var esClient *elastic.Client // New returns a new service func New() *Service { s := new(Service) - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.Background()) return s } diff --git a/engine/hatchery/kubernetes/kubernetes.go b/engine/hatchery/kubernetes/kubernetes.go index d452753ac7..72d038124a 100644 --- a/engine/hatchery/kubernetes/kubernetes.go +++ b/engine/hatchery/kubernetes/kubernetes.go @@ -29,7 +29,7 @@ import ( // New instanciates a new hatchery local func New() *HatcheryKubernetes { s := new(HatcheryKubernetes) - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.Background()) return s } diff --git a/engine/hatchery/local/local.go b/engine/hatchery/local/local.go index cb23255017..c609ec2664 100644 --- a/engine/hatchery/local/local.go +++ b/engine/hatchery/local/local.go @@ -28,7 +28,7 @@ import ( // New instanciates a new hatchery local func New() *HatcheryLocal { s := new(HatcheryLocal) - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.Background()) s.LocalWorkerRunner = new(localWorkerRunner) return s } diff --git a/engine/hatchery/marathon/marathon.go b/engine/hatchery/marathon/marathon.go index e506d117e3..3d3021c0c0 100644 --- a/engine/hatchery/marathon/marathon.go +++ b/engine/hatchery/marathon/marathon.go @@ -29,7 +29,7 @@ import ( // New instanciates a new Hatchery Marathon func New() *HatcheryMarathon { s := new(HatcheryMarathon) - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.Background()) return s } diff --git a/engine/hatchery/openstack/openstack.go b/engine/hatchery/openstack/openstack.go index 4d747ed151..e8c75a1e60 100644 --- a/engine/hatchery/openstack/openstack.go +++ b/engine/hatchery/openstack/openstack.go @@ -40,7 +40,7 @@ var _ hatchery.InterfaceWithModels = new(HatcheryOpenstack) // New instanciates a new Hatchery Openstack func New() *HatcheryOpenstack { s := new(HatcheryOpenstack) - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.Background()) return s } diff --git a/engine/hatchery/swarm/swarm.go b/engine/hatchery/swarm/swarm.go index b455ccc0e8..1f07a00a9a 100644 --- a/engine/hatchery/swarm/swarm.go +++ b/engine/hatchery/swarm/swarm.go @@ -32,7 +32,7 @@ import ( // New instanciates a new Hatchery Swarm func New() *HatcherySwarm { s := new(HatcherySwarm) - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.Background()) return s } diff --git a/engine/hatchery/vsphere/hatchery.go b/engine/hatchery/vsphere/hatchery.go index 49e8ecbf45..40067b7b28 100644 --- a/engine/hatchery/vsphere/hatchery.go +++ b/engine/hatchery/vsphere/hatchery.go @@ -26,7 +26,7 @@ import ( // New instanciates a new Hatchery vsphere func New() *HatcheryVSphere { s := new(HatcheryVSphere) - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.Background()) return s } diff --git a/engine/hatchery/vsphere/hatchery_test.go b/engine/hatchery/vsphere/hatchery_test.go index b18d30905b..7eba00a1e4 100644 --- a/engine/hatchery/vsphere/hatchery_test.go +++ b/engine/hatchery/vsphere/hatchery_test.go @@ -381,7 +381,7 @@ func TestHatcheryVSphere_Status(t *testing.T) { vSphereClient: c, Common: hatchery.Common{ Common: service.Common{ - GoRoutines: sdk.NewGoRoutines(), + GoRoutines: sdk.NewGoRoutines(context.Background()), }, }, } @@ -452,7 +452,7 @@ func TestHatcheryVSphere_provisioning_do_nothing(t *testing.T) { vSphereClient: c, Common: hatchery.Common{ Common: service.Common{ - GoRoutines: sdk.NewGoRoutines(), + GoRoutines: sdk.NewGoRoutines(context.Background()), Client: cdsclient, }, }, @@ -537,7 +537,7 @@ func TestHatcheryVSphere_provisioning_start_one(t *testing.T) { vSphereClient: c, Common: hatchery.Common{ Common: service.Common{ - GoRoutines: sdk.NewGoRoutines(), + GoRoutines: sdk.NewGoRoutines(context.Background()), Client: cdsclient, }, }, diff --git a/engine/hooks/hooks.go b/engine/hooks/hooks.go index 9d166dea0c..3a0cdf0575 100644 --- a/engine/hooks/hooks.go +++ b/engine/hooks/hooks.go @@ -18,7 +18,7 @@ import ( // New returns a new service func New() *Service { s := new(Service) - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.Background()) return s } diff --git a/engine/migrateservice/service.go b/engine/migrateservice/service.go index 3ffaf32483..c67bdedf19 100644 --- a/engine/migrateservice/service.go +++ b/engine/migrateservice/service.go @@ -48,7 +48,7 @@ type Configuration struct { // New instanciates a new API object func New() service.Service { s := &dbmigservice{} - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.Background()) return s } diff --git a/engine/repositories/repositories.go b/engine/repositories/repositories.go index 932abcde77..383692110b 100644 --- a/engine/repositories/repositories.go +++ b/engine/repositories/repositories.go @@ -18,7 +18,7 @@ import ( // New returns a new service func New() *Service { s := new(Service) - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.Background()) return s } diff --git a/engine/repositories/repositories_test.go b/engine/repositories/repositories_test.go index 4dcde56674..2e1a6428ce 100644 --- a/engine/repositories/repositories_test.go +++ b/engine/repositories/repositories_test.go @@ -53,7 +53,7 @@ func newTestService(t *testing.T) (*Service, error) { } service := new(Service) - service.GoRoutines = sdk.NewGoRoutines() + service.GoRoutines = sdk.NewGoRoutines(ctx) if fakeAPIPrivateKey.key == nil { fakeAPIPrivateKey.key, _ = jws.NewRandomRSAKey() } diff --git a/engine/ui/ui.go b/engine/ui/ui.go index 5ff275ff17..35306b0ae3 100644 --- a/engine/ui/ui.go +++ b/engine/ui/ui.go @@ -27,7 +27,7 @@ import ( // New returns a new service func New() *Service { s := new(Service) - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.Background()) s.Router = &api.Router{ Mux: mux.NewRouter(), } diff --git a/engine/vcs/vcs.go b/engine/vcs/vcs.go index eaa79e452b..d0dad87c08 100644 --- a/engine/vcs/vcs.go +++ b/engine/vcs/vcs.go @@ -23,7 +23,7 @@ import ( // New returns a new service func New() *Service { s := new(Service) - s.GoRoutines = sdk.NewGoRoutines() + s.GoRoutines = sdk.NewGoRoutines(context.Background()) return s } diff --git a/engine/vcs/vcs_test.go b/engine/vcs/vcs_test.go index 5a5a3d2380..e373d5b5a6 100644 --- a/engine/vcs/vcs_test.go +++ b/engine/vcs/vcs_test.go @@ -20,7 +20,7 @@ import ( "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/cdsclient" "github.com/ovh/cds/sdk/jws" - "github.com/ovh/cds/sdk/log" + cdslog "github.com/ovh/cds/sdk/log" ) var ( @@ -57,7 +57,7 @@ func newTestService(t *testing.T) (*Service, error) { } service := new(Service) - service.GoRoutines = sdk.NewGoRoutines() + service.GoRoutines = sdk.NewGoRoutines(ctx) if fakeAPIPrivateKey.key == nil { fakeAPIPrivateKey.key, _ = jws.NewRandomRSAKey() } diff --git a/engine/worker/internal/builtin.go b/engine/worker/internal/builtin.go index 2675164160..7d329a685c 100644 --- a/engine/worker/internal/builtin.go +++ b/engine/worker/internal/builtin.go @@ -54,7 +54,7 @@ func (w *CurrentWorker) runBuiltin(ctx context.Context, a sdk.Action, secrets [] func (w *CurrentWorker) runGRPCPlugin(ctx context.Context, a sdk.Action) sdk.Result { chanRes := make(chan sdk.Result, 1) done := make(chan struct{}) - sdk.NewGoRoutines().Run(ctx, "runGRPCPlugin", func(ctx context.Context) { + sdk.NewGoRoutines(ctx).Run(ctx, "runGRPCPlugin", func(ctx context.Context) { action.RunGRPCPlugin(ctx, a.Name, w.currentJob.params, a, w, chanRes, done) }) diff --git a/sdk/goroutine.go b/sdk/goroutine.go index c669666361..aedec2e367 100644 --- a/sdk/goroutine.go +++ b/sdk/goroutine.go @@ -15,6 +15,7 @@ import ( "runtime/pprof" "strconv" "sync" + "time" panicparsestack "github.com/maruel/panicparse/stack" cdslog "github.com/ovh/cds/sdk/log" @@ -23,44 +24,103 @@ import ( "github.com/pkg/errors" ) +type GoRoutine struct { + ctx context.Context + Name string + Func func(ctx context.Context) + Restart bool + Active bool +} + // GoRoutines contains list of routines that have to stay up type GoRoutines struct { mutex sync.Mutex - status map[string]bool + status []*GoRoutine +} + +func (m *GoRoutines) GoRoutine(name string) *GoRoutine { + m.mutex.Lock() + defer m.mutex.Unlock() + for _, g := range m.status { + if g.Name == name { + return g + } + } + return nil } // NewGoRoutines instanciates a new GoRoutineManager -func NewGoRoutines() *GoRoutines { - return &GoRoutines{ - mutex: sync.Mutex{}, - status: make(map[string]bool), +func NewGoRoutines(ctx context.Context) *GoRoutines { + m := &GoRoutines{} + m.Exec(ctx, "GoRoutines-restart", func(ctx context.Context) { + m.restartGoRoutines(ctx) + }) + return m +} + +func (m *GoRoutines) restartGoRoutines(ctx context.Context) { + t := time.NewTicker(10 * time.Second) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + for _, g := range m.status { + if !g.Active && g.Restart { + log.Info(ctx, "restarting goroutine %q", g.Name) + m.exec(g) + } + } + } } } // Run runs the function within a goroutine with a panic recovery, and keep GoRoutine status. func (m *GoRoutines) Run(c context.Context, name string, fn func(ctx context.Context)) { m.mutex.Lock() - m.status[name] = true - m.mutex.Unlock() - m.Exec(c, name, fn) + defer m.mutex.Unlock() + g := &GoRoutine{ + ctx: c, + Name: name, + Func: fn, + Active: true, + Restart: false, + } + m.status = append(m.status, g) + m.exec(g) } -// GetStatus returns the monitoring status of goroutines that should be running -func (m *GoRoutines) GetStatus() []MonitoringStatusLine { +// RunWithRestart runs the function within a goroutine with a panic recovery, and keep GoRoutine status. +// if the goroutine is stopped, it will ne restarted +func (m *GoRoutines) RunWithRestart(c context.Context, name string, fn func(ctx context.Context)) { m.mutex.Lock() defer m.mutex.Unlock() + g := &GoRoutine{ + ctx: c, + Name: name, + Func: fn, + Active: true, + Restart: true, + } + m.status = append(m.status, g) + m.exec(g) +} + +// GetStatus returns the monitoring status of goroutines that should be running +func (m *GoRoutines) GetStatus() []MonitoringStatusLine { lines := make([]MonitoringStatusLine, len(m.status)) i := 0 - for name, isActive := range m.status { + for _, g := range m.status { status := MonitoringStatusAlert value := "NOT running" - if isActive { + if g.Active { status = MonitoringStatusOK value = "Running" } lines[i] = MonitoringStatusLine{ Status: status, - Component: "goroutine/" + name, + Component: "goroutine/" + g.Name, Value: value, } i++ @@ -68,13 +128,16 @@ func (m *GoRoutines) GetStatus() []MonitoringStatusLine { return lines } -// Exec runs the function within a goroutine with a panic recovery -func (m *GoRoutines) Exec(c context.Context, name string, fn func(ctx context.Context)) { +func (m *GoRoutines) exec(g *GoRoutine) { hostname, _ := os.Hostname() go func(ctx context.Context) { - ctx = context.WithValue(ctx, cdslog.Goroutine, name) + ctx = context.WithValue(ctx, cdslog.Goroutine, g.Name) - labels := pprof.Labels("goroutine-name", name, "goroutine-hostname", hostname, "goroutine-id", fmt.Sprintf("%d", GoroutineID())) + labels := pprof.Labels( + "goroutine-name", g.Name, + "goroutine-hostname", hostname, + "goroutine-id", fmt.Sprintf("%d", GoroutineID()), + ) goroutineCtx := pprof.WithLabels(ctx, labels) pprof.SetGoroutineLabels(goroutineCtx) @@ -83,17 +146,26 @@ func (m *GoRoutines) Exec(c context.Context, name string, fn func(ctx context.Co buf := make([]byte, 1<<16) runtime.Stack(buf, false) ctx = context.WithValue(ctx, cdslog.Stacktrace, string(buf)) - log.Error(ctx, "[PANIC][%s] %s failed", hostname, name) - } - m.mutex.Lock() - if _, ok := m.status[name]; ok { - m.status[name] = false + log.Error(ctx, "[PANIC][%s] %s failed", hostname, g.Name) } - m.mutex.Unlock() + g.Active = false }() - fn(goroutineCtx) - }(c) + g.Active = true + g.Func(goroutineCtx) + }(g.ctx) +} + +// Exec runs the function within a goroutine with a panic recovery +func (m *GoRoutines) Exec(c context.Context, name string, fn func(ctx context.Context)) { + g := &GoRoutine{ + ctx: c, + Name: name, + Func: fn, + Active: true, + Restart: false, + } + m.exec(g) } // code from https://github.com/golang/net/blob/master/http2/gotrack.go diff --git a/sdk/goroutine_test.go b/sdk/goroutine_test.go index 8ab8758bf3..c1bf9a7e7d 100644 --- a/sdk/goroutine_test.go +++ b/sdk/goroutine_test.go @@ -23,7 +23,7 @@ func Test_GoroutineTools(t *testing.T) { var wg = new(sync.WaitGroup) ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - NewGoRoutines().Exec(ctx, "test_goroutine", func(ctx context.Context) { + NewGoRoutines(ctx).Exec(ctx, "test_goroutine", func(ctx context.Context) { wg.Add(1) <-ctx.Done() wg.Done() @@ -42,7 +42,7 @@ func Test_GoroutineTools(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - NewGoRoutines().Exec(ctx, "test_goroutine", func(ctx context.Context) { + NewGoRoutines(ctx).Exec(ctx, "test_goroutine", func(ctx context.Context) { wg.Add(1) <-ctx.Done() wg.Done() @@ -63,18 +63,18 @@ func Test_GoroutineTools(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - m := NewGoRoutines() + m := NewGoRoutines(ctx) m.Run(ctx, "test_goroutine_loop", func(ctx context.Context) { wg.Add(1) - _, ok := m.status["test_goroutine_loop"] - require.True(t, ok) - require.True(t, m.status["test_goroutine_loop"]) + s := m.GoRoutine("test_goroutine_loop") + require.NotNil(t, s) + require.True(t, s.Active) <-ctx.Done() wg.Done() }) - _, ok := m.status["test_goroutine_loop"] - require.True(t, ok) + s := m.GoRoutine("test_goroutine_loop") + require.NotNil(t, s) require.Equal(t, 1, len(m.GetStatus())) }) }