diff --git a/engine/worker/internal/handler_cds_version_set.go b/engine/worker/internal/handler_cds_version_set.go index 2a6faf682f..c67cfe4da5 100644 --- a/engine/worker/internal/handler_cds_version_set.go +++ b/engine/worker/internal/handler_cds_version_set.go @@ -12,6 +12,10 @@ import ( func setVersionHandler(ctx context.Context, wk *CurrentWorker) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + ctx := workerruntime.SetJobID(ctx, wk.currentJob.wJob.ID) + ctx = workerruntime.SetStepOrder(ctx, wk.currentJob.currentStepIndex) + ctx = workerruntime.SetStepName(ctx, wk.currentJob.currentStepName) + data, err := ioutil.ReadAll(r.Body) if err != nil { writeError(w, r, sdk.NewError(sdk.ErrWrongRequest, err)) diff --git a/engine/worker/internal/handler_tag.go b/engine/worker/internal/handler_tag.go index 92b85feb61..4d5615fede 100644 --- a/engine/worker/internal/handler_tag.go +++ b/engine/worker/internal/handler_tag.go @@ -3,7 +3,6 @@ package internal import ( "context" "net/http" - "time" "github.com/ovh/cds/engine/worker/pkg/workerruntime" "github.com/ovh/cds/sdk" @@ -27,8 +26,6 @@ func tagHandler(ctx context.Context, wk *CurrentWorker) http.HandlerFunc { }) } - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() if err := wk.client.QueueJobTag(ctx, wk.currentJob.wJob.ID, tags); err != nil { newError := sdk.NewErrorFrom(sdk.ErrUnknownError, "unable to create tag on CDS: %v", err) writeError(w, r, newError) diff --git a/engine/worker/internal/take.go b/engine/worker/internal/take.go index 13c76e772b..142de6253d 100644 --- a/engine/worker/internal/take.go +++ b/engine/worker/internal/take.go @@ -15,9 +15,7 @@ import ( ) func (w *CurrentWorker) Take(ctx context.Context, job sdk.WorkflowNodeJobRun) error { - ctxQueueTakeJob, cancelQueueTakeJob := context.WithTimeout(ctx, 20*time.Second) - defer cancelQueueTakeJob() - info, err := w.client.QueueTakeJob(ctxQueueTakeJob, job) + info, err := w.client.QueueTakeJob(ctx, job) if err != nil { return sdk.WrapError(err, "Unable to take job %d", job.ID) } @@ -150,14 +148,11 @@ func (w *CurrentWorker) Take(ctx context.Context, job sdk.WorkflowNodeJobRun) er var lasterr error for try := 1; try <= 10; try++ { log.Info(ctx, "takeWorkflowJob> Sending build result...") - ctxSendResult, cancelSendResult := context.WithTimeout(ctx, 120*time.Second) - lasterr = w.client.QueueSendResult(ctxSendResult, job.ID, res) + lasterr = w.client.QueueSendResult(ctx, job.ID, res) if lasterr == nil { log.Info(ctx, "takeWorkflowJob> Send build result OK") - cancelSendResult() return nil } - cancelSendResult() if ctx.Err() != nil { log.Info(ctx, "takeWorkflowJob> Cannot send build result: HTTP %v - worker cancelled - giving up", lasterr) return nil diff --git a/engine/worker/internal/types.go b/engine/worker/internal/types.go index 7ed559593a..72d47f3013 100644 --- a/engine/worker/internal/types.go +++ b/engine/worker/internal/types.go @@ -82,7 +82,7 @@ func (wk *CurrentWorker) Init(name, hatcheryName, apiEndpoint, token string, mod wk.register.model = model wk.register.token = token wk.register.apiEndpoint = apiEndpoint - wk.client = cdsclient.NewWorker(apiEndpoint, name, cdsclient.NewHTTPClient(time.Second*360, insecure)) + wk.client = cdsclient.NewWorker(apiEndpoint, name, cdsclient.NewHTTPClient(time.Second*10, insecure)) return nil } diff --git a/sdk/cdsclient/client_queue.go b/sdk/cdsclient/client_queue.go index 4149977675..28b9b2e317 100644 --- a/sdk/cdsclient/client_queue.go +++ b/sdk/cdsclient/client_queue.go @@ -234,8 +234,19 @@ func (c *client) QueueJobRelease(ctx context.Context, id int64) error { func (c *client) QueueSendResult(ctx context.Context, id int64, res sdk.Result) error { path := fmt.Sprintf("/queue/workflows/%d/result", id) - _, err := c.PostJSON(ctx, path, res, nil) - return err + b, err := json.Marshal(res) + if err != nil { + return sdk.WithStack(err) + } + result, _, code, err := c.Stream(ctx, c.HTTPNoTimeoutClient(), "POST", path, bytes.NewBuffer(b), nil) + if err != nil { + return err + } + defer result.Close() + if code >= 300 { + return sdk.WithStack(fmt.Errorf("unable to send job result. HTTP code error : %d", code)) + } + return nil } func (c *client) QueueSendCoverage(ctx context.Context, id int64, report coverage.Report) error {