Skip to content

Commit

Permalink
fix(worker): reduce httpclient timeout and remove some context (#5735)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored Mar 3, 2021
1 parent 91599db commit c61ab8b
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 13 deletions.
4 changes: 4 additions & 0 deletions engine/worker/internal/handler_cds_version_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (

func setVersionHandler(ctx context.Context, wk *CurrentWorker) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := workerruntime.SetJobID(ctx, wk.currentJob.wJob.ID)
ctx = workerruntime.SetStepOrder(ctx, wk.currentJob.currentStepIndex)
ctx = workerruntime.SetStepName(ctx, wk.currentJob.currentStepName)

data, err := ioutil.ReadAll(r.Body)
if err != nil {
writeError(w, r, sdk.NewError(sdk.ErrWrongRequest, err))
Expand Down
3 changes: 0 additions & 3 deletions engine/worker/internal/handler_tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package internal
import (
"context"
"net/http"
"time"

"github.com/ovh/cds/engine/worker/pkg/workerruntime"
"github.com/ovh/cds/sdk"
Expand All @@ -27,8 +26,6 @@ func tagHandler(ctx context.Context, wk *CurrentWorker) http.HandlerFunc {
})
}

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := wk.client.QueueJobTag(ctx, wk.currentJob.wJob.ID, tags); err != nil {
newError := sdk.NewErrorFrom(sdk.ErrUnknownError, "unable to create tag on CDS: %v", err)
writeError(w, r, newError)
Expand Down
9 changes: 2 additions & 7 deletions engine/worker/internal/take.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ import (
)

func (w *CurrentWorker) Take(ctx context.Context, job sdk.WorkflowNodeJobRun) error {
ctxQueueTakeJob, cancelQueueTakeJob := context.WithTimeout(ctx, 20*time.Second)
defer cancelQueueTakeJob()
info, err := w.client.QueueTakeJob(ctxQueueTakeJob, job)
info, err := w.client.QueueTakeJob(ctx, job)
if err != nil {
return sdk.WrapError(err, "Unable to take job %d", job.ID)
}
Expand Down Expand Up @@ -150,14 +148,11 @@ func (w *CurrentWorker) Take(ctx context.Context, job sdk.WorkflowNodeJobRun) er
var lasterr error
for try := 1; try <= 10; try++ {
log.Info(ctx, "takeWorkflowJob> Sending build result...")
ctxSendResult, cancelSendResult := context.WithTimeout(ctx, 120*time.Second)
lasterr = w.client.QueueSendResult(ctxSendResult, job.ID, res)
lasterr = w.client.QueueSendResult(ctx, job.ID, res)
if lasterr == nil {
log.Info(ctx, "takeWorkflowJob> Send build result OK")
cancelSendResult()
return nil
}
cancelSendResult()
if ctx.Err() != nil {
log.Info(ctx, "takeWorkflowJob> Cannot send build result: HTTP %v - worker cancelled - giving up", lasterr)
return nil
Expand Down
2 changes: 1 addition & 1 deletion engine/worker/internal/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (wk *CurrentWorker) Init(name, hatcheryName, apiEndpoint, token string, mod
wk.register.model = model
wk.register.token = token
wk.register.apiEndpoint = apiEndpoint
wk.client = cdsclient.NewWorker(apiEndpoint, name, cdsclient.NewHTTPClient(time.Second*360, insecure))
wk.client = cdsclient.NewWorker(apiEndpoint, name, cdsclient.NewHTTPClient(time.Second*10, insecure))
return nil
}

Expand Down
15 changes: 13 additions & 2 deletions sdk/cdsclient/client_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,19 @@ func (c *client) QueueJobRelease(ctx context.Context, id int64) error {

func (c *client) QueueSendResult(ctx context.Context, id int64, res sdk.Result) error {
path := fmt.Sprintf("/queue/workflows/%d/result", id)
_, err := c.PostJSON(ctx, path, res, nil)
return err
b, err := json.Marshal(res)
if err != nil {
return sdk.WithStack(err)
}
result, _, code, err := c.Stream(ctx, c.HTTPNoTimeoutClient(), "POST", path, bytes.NewBuffer(b), nil)
if err != nil {
return err
}
defer result.Close()
if code >= 300 {
return sdk.WithStack(fmt.Errorf("unable to send job result. HTTP code error : %d", code))
}
return nil
}

func (c *client) QueueSendCoverage(ctx context.Context, id int64, report coverage.Report) error {
Expand Down

0 comments on commit c61ab8b

Please sign in to comment.