From 261c7374eb06c451d4926d70b1da551f1d8239cc Mon Sep 17 00:00:00 2001 From: francois samin Date: Wed, 17 Jun 2020 08:56:56 +0200 Subject: [PATCH 1/4] fix(worker): logs --- engine/api/workflow/execute_node_job_run.go | 1 + engine/api/workflow_queue.go | 2 - engine/api/workflow_run.go | 2 - engine/cdn/cdn.go | 8 +- engine/cdn/cdn_log.go | 97 +++++++++++--- engine/cdn/types.go | 5 +- engine/hatchery/serve.go | 1 + engine/hatchery/swarm/swarm_util_kill.go | 16 +-- engine/worker/internal/run.go | 13 +- engine/worker/internal/take.go | 45 +++++++ engine/worker/internal/types.go | 2 +- sdk/log/hook/gelf.go | 25 ++-- sdk/log/hook/hook.go | 140 ++++++++++++-------- sdk/log/log.go | 1 + 14 files changed, 248 insertions(+), 110 deletions(-) diff --git a/engine/api/workflow/execute_node_job_run.go b/engine/api/workflow/execute_node_job_run.go index 875cc1b78d..c9e8fbe82e 100644 --- a/engine/api/workflow/execute_node_job_run.go +++ b/engine/api/workflow/execute_node_job_run.go @@ -581,6 +581,7 @@ func AddLog(db gorp.SqlExecutor, job *sdk.WorkflowNodeJobRun, logs *sdk.Log, max // ignore the log if max size already reached if maxReached := truncateLogs(maxLogSize, size, logs); maxReached { + log.Debug("truncated logs") return nil } diff --git a/engine/api/workflow_queue.go b/engine/api/workflow_queue.go index 7bbd7150ce..642f388f5a 100644 --- a/engine/api/workflow_queue.go +++ b/engine/api/workflow_queue.go @@ -567,8 +567,6 @@ func (api *API) postWorkflowJobLogsHandler() service.Handler { return err } - log.Debug("postWorkflowJobLogsHandler> Logs: %+v", logs) - if err := workflow.AddLog(api.mustDB(), pbJob, &logs, api.Config.Log.StepMaxSize); err != nil { return err } diff --git a/engine/api/workflow_run.go b/engine/api/workflow_run.go index 6f94cc0af9..2389a18558 100644 --- a/engine/api/workflow_run.go +++ b/engine/api/workflow_run.go @@ -1333,8 +1333,6 @@ func (api *API) getWorkflowNodeRunJobStepHandler() service.Handler { StepLogs: *ls, } - log.Debug("logs: %+v", result) - return service.WriteJSON(w, result, http.StatusOK) } } diff --git a/engine/cdn/cdn.go b/engine/cdn/cdn.go index 236b5108ec..22eec4c7a7 100644 --- a/engine/cdn/cdn.go +++ b/engine/cdn/cdn.go @@ -88,11 +88,9 @@ func (s *Service) Serve(c context.Context) error { //Gracefully shutdown the http server go func() { - select { - case <-ctx.Done(): - log.Info(ctx, "CDN> Shutdown HTTP Server") - _ = server.Shutdown(ctx) - } + <-ctx.Done() + log.Info(ctx, "CDN> Shutdown HTTP Server") + _ = server.Shutdown(ctx) }() //Start the http server diff --git a/engine/cdn/cdn_log.go b/engine/cdn/cdn_log.go index 2be656b232..d936a2ac58 100644 --- a/engine/cdn/cdn_log.go +++ b/engine/cdn/cdn_log.go @@ -6,9 +6,11 @@ import ( "crypto/rsa" "fmt" "net" + "runtime" "strings" "time" + "github.com/go-gorp/gorp" gocache "github.com/patrickmn/go-cache" "github.com/ovh/cds/engine/api/observability" @@ -34,13 +36,22 @@ func (s *Service) RunTcpLogServer(ctx context.Context) { //Gracefully shutdown the tcp server go func() { - select { - case <-ctx.Done(): - log.Info(ctx, "CDN> Shutdown tcp log Server") - _ = listener.Close() - } + <-ctx.Done() + log.Info(ctx, "CDN> Shutdown tcp log Server") + _ = listener.Close() }() + var nbCPU = runtime.NumCPU() + s.ChanMessages = make(chan handledMessage, 10000) + for i := 0; i < nbCPU; i++ { + go func() { + log.Debug("process logs") + if err := s.processLogs(ctx); err != nil { + log.Error(ctx, err.Error()) + } + }() + } + go func() { for { conn, err := listener.Accept() @@ -128,16 +139,63 @@ func (s *Service) handleWorkerLog(ctx context.Context, workerID string, sig inte return sdk.WithStack(sdk.ErrForbidden) } - pbJob, err := workflow.LoadNodeJobRun(ctx, s.Db, s.Cache, signature.JobID) - if err != nil { - return err + s.ChanMessages <- handledMessage{ + signature: signature, + m: m, } + return nil +} + +type handledMessage struct { + signature log.Signature + m hook.Message +} + +func (s *Service) processLogs(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case msg := <-s.ChanMessages: + tx, err := s.Db.Begin() + if err != nil { + log.Error(ctx, "unable to start tx: %v", err) + continue + } + defer tx.Rollback() // nolint + + if len(msg.m.AggregatedMessages) > 0 { + var currentLog string + for _, m1 := range msg.m.AggregatedMessages { + currentLog += buildMessage(msg.signature, *m1) + } + if err := s.processLog(ctx, tx, msg.signature, currentLog); err != nil { + log.Error(ctx, "unable to process log: %+v", err) + continue + } + } else { + currentLog := buildMessage(msg.signature, msg.m) + if err := s.processLog(ctx, tx, msg.signature, currentLog); err != nil { + log.Error(ctx, "unable to process log: %+v", err) + continue + } + } + + if err := tx.Commit(); err != nil { + log.Error(ctx, "unable to commit tx: %+v", err) + continue + } + } + } +} + +func buildMessage(signature log.Signature, m hook.Message) string { logDate := time.Unix(0, int64(m.Time*1e9)) logs := sdk.Log{ - JobID: pbJob.ID, + JobID: signature.JobID, LastModified: &logDate, - NodeRunID: pbJob.WorkflowNodeRunID, + NodeRunID: signature.NodeRunID, Start: &logDate, StepOrder: signature.Worker.StepOrder, Val: m.Full, @@ -165,16 +223,19 @@ func (s *Service) handleWorkerLog(ctx context.Context, workerID string, sig inte lvl = "EMERGENCY" } logs.Val = fmt.Sprintf("[%s] %s", lvl, logs.Val) - tx, err := s.Db.Begin() - if err != nil { - return sdk.WithStack(err) - } - defer tx.Rollback() // nolint + return logs.Val +} - if err := workflow.AddLog(tx, pbJob, &logs, s.Cfg.Log.StepMaxSize); err != nil { - return err +func (s *Service) processLog(ctx context.Context, db gorp.SqlExecutor, signature log.Signature, message string) error { + now := time.Now() + l := sdk.Log{ + JobID: signature.JobID, + NodeRunID: signature.NodeRunID, + LastModified: &now, + StepOrder: signature.Worker.StepOrder, + Val: message, } - return sdk.WithStack(tx.Commit()) + return workflow.AddLog(db, nil, &l, s.Cfg.Log.StepMaxSize) } func (s *Service) handleServiceLog(ctx context.Context, hatcheryID int64, hatcheryName string, workerName string, sig interface{}, m hook.Message) error { diff --git a/engine/cdn/types.go b/engine/cdn/types.go index 054110600d..f439f38135 100644 --- a/engine/cdn/types.go +++ b/engine/cdn/types.go @@ -12,8 +12,9 @@ type Service struct { service.Common Cfg Configuration //Router *api.Router - Db *gorp.DbMap - Cache cache.Store + Db *gorp.DbMap + Cache cache.Store + ChanMessages chan handledMessage } // Configuration is the hooks configuration structure diff --git a/engine/hatchery/serve.go b/engine/hatchery/serve.go index 68d9be05dd..4e21ba6fd0 100644 --- a/engine/hatchery/serve.go +++ b/engine/hatchery/serve.go @@ -218,6 +218,7 @@ func (c *Common) SendServiceLog(ctx context.Context, servicesLogs []sdk.ServiceL WorkerName: s.WorkerName, }, JobID: s.WorkflowNodeJobRunID, + NodeRunID: s.WorkflowNodeRunID, Timestamp: time.Now().UnixNano(), } signature, err := jws.Sign(c.Signer, dataToSign) diff --git a/engine/hatchery/swarm/swarm_util_kill.go b/engine/hatchery/swarm/swarm_util_kill.go index 6dfbbc8e06..0048bf662e 100644 --- a/engine/hatchery/swarm/swarm_util_kill.go +++ b/engine/hatchery/swarm/swarm_util_kill.go @@ -130,14 +130,14 @@ func (h *HatcherySwarm) killAndRemoveContainer(ctx context.Context, dockerClient } } - ctxDockerRemove, cancelList := context.WithTimeout(context.Background(), 20*time.Second) - defer cancelList() - if err := dockerClient.ContainerRemove(ctxDockerRemove, ID, types.ContainerRemoveOptions{RemoveVolumes: true, Force: true}); err != nil { - // container could be already removed by a previous call to docker - if !strings.Contains(err.Error(), "No such container") && !strings.Contains(err.Error(), "is already in progress") { - log.Error(ctx, "Unable to remove container %s from %s: %v", ID, dockerClient.name, err) - } - } + //ctxDockerRemove, cancelList := context.WithTimeout(context.Background(), 20*time.Second) + //defer cancelList() + //if err := dockerClient.ContainerRemove(ctxDockerRemove, ID, types.ContainerRemoveOptions{RemoveVolumes: true, Force: true}); err != nil { + // // container could be already removed by a previous call to docker + // if !strings.Contains(err.Error(), "No such container") && !strings.Contains(err.Error(), "is already in progress") { + // log.Error(ctx, "Unable to remove container %s from %s: %v", ID, dockerClient.name, err) + // } + //} return nil } diff --git a/engine/worker/internal/run.go b/engine/worker/internal/run.go index 3ec74f150b..767f995fb5 100644 --- a/engine/worker/internal/run.go +++ b/engine/worker/internal/run.go @@ -189,6 +189,9 @@ func (w *CurrentWorker) runAction(ctx context.Context, a sdk.Action, jobID int64 var t0 = time.Now() defer func() { w.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("End of step \"%s\" (%s)", actionName, sdk.Round(time.Since(t0), time.Second).String())) + if w.logger.gelfLogger != nil { + w.logger.gelfLogger.hook.Flush() + } }() //If the action is disabled; skip it @@ -516,16 +519,16 @@ func (w *CurrentWorker) ProcessJob(jobInfo sdk.WorkflowNodeJobRunData) (res sdk. w.logger.logChan = make(chan sdk.Log) go func() { if err := w.logProcessor(ctx, jobInfo.NodeJobRun.ID); err != nil { - log.Error(ctx, "processJob> Logs processor error: %v", err) + log.Warning(ctx, "processJob> Logs processor error: %v", err) } }() defer func() { if err := w.drainLogsAndCloseLogger(ctx); err != nil { - log.Error(ctx, "processJob> Drain logs error: %v", err) + log.Warning(ctx, "processJob> Drain logs error: %v", err) } }() defer func() { - log.Error(ctx, "processJob> Status: %s | Reason: %s", res.Status, res.Reason) + log.Warning(ctx, "processJob> Status: %s | Reason: %s", res.Status, res.Reason) }() wdFile, wdAbs, err := w.setupWorkingDirectory(ctx, jobInfo) @@ -619,6 +622,10 @@ func (w *CurrentWorker) ProcessJob(jobInfo sdk.WorkflowNodeJobRunData) (res sdk. if err := teardownDirectory(w.basedir, ""); err != nil { log.Error(ctx, "Cannot remove basedir content: %s", err) } + // Flushing logs + if w.logger.gelfLogger != nil { + w.logger.gelfLogger.hook.Flush() + } return res } diff --git a/engine/worker/internal/take.go b/engine/worker/internal/take.go index 12ca07bc6b..c73b49d5bd 100644 --- a/engine/worker/internal/take.go +++ b/engine/worker/internal/take.go @@ -3,6 +3,7 @@ package internal import ( "context" "encoding/base64" + "fmt" "strings" "time" @@ -10,6 +11,8 @@ import ( "github.com/ovh/cds/sdk" "github.com/ovh/cds/sdk/jws" "github.com/ovh/cds/sdk/log" + "github.com/ovh/cds/sdk/log/hook" + "github.com/sirupsen/logrus" ) func (w *CurrentWorker) Take(ctx context.Context, job sdk.WorkflowNodeJobRun) error { @@ -55,6 +58,48 @@ func (w *CurrentWorker) Take(ctx context.Context, job sdk.WorkflowNodeJobRun) er w.logger.gelfLogger = new(logger) w.logger.gelfLogger.logger = l w.logger.gelfLogger.hook = h + w.logger.gelfLogger.hook.FlushPolicy = &hook.FlushPolicy{ + Delay: 100 * time.Millisecond, + FlushFunc: func(c chan *hook.Message) { + fmt.Printf("[graylog] aggregating %d messages from the channel\n", len(c)) + var remainingMessages = make([]*hook.Message, 0, len(c)) + L: + for { + select { + case m := <-c: + remainingMessages = append(remainingMessages, m) + default: + break L + } + } + + s := "aggregrated messages" + AggrMsg := hook.Message{ + Version: "1.1", + Host: h.Hostname, + Short: s, + Full: s, + Time: float64(time.Now().UnixNano()) / 1e9, + Level: int32(logrus.ErrorLevel), + Pid: h.Pid, + Facility: h.Facility, + AggregatedMessages: remainingMessages, + } + c <- &AggrMsg + + var exit = &sdk.False + time.AfterFunc(30*time.Second, func() { + exit = &sdk.True + }) + + for len(c) > 0 && !*exit { + log.Debug("waiting for the bulk message to be sent") + time.Sleep(time.Second) + } + + }, + Timeout: 30 * time.Second, + } } start := time.Now() diff --git a/engine/worker/internal/types.go b/engine/worker/internal/types.go index cc998f525d..7a5b5ffd05 100644 --- a/engine/worker/internal/types.go +++ b/engine/worker/internal/types.go @@ -134,6 +134,7 @@ func (wk *CurrentWorker) SendLog(ctx context.Context, level workerruntime.Level, StepOrder: int64(stepOrder), }, JobID: wk.currentJob.wJob.ID, + NodeRunID: wk.currentJob.wJob.WorkflowNodeRunID, Timestamp: time.Now().UnixNano(), } signature, err := jws.Sign(wk.currentJob.signer, dataToSign) @@ -141,7 +142,6 @@ func (wk *CurrentWorker) SendLog(ctx context.Context, level workerruntime.Level, log.Error(ctx, "unable to sign logs: %v", err) } wk.logger.gelfLogger.logger.WithField(log.ExtraFieldSignature, signature).Log(logLevel, s) - wk.logger.gelfLogger.hook.Flush() } func (wk *CurrentWorker) Name() string { diff --git a/sdk/log/hook/gelf.go b/sdk/log/hook/gelf.go index c330f7ffa1..36731e4e99 100644 --- a/sdk/log/hook/gelf.go +++ b/sdk/log/hook/gelf.go @@ -23,18 +23,19 @@ const ( // Message represents the contents of the GELF message. It is gzipped // before sending. type Message struct { - Version string `json:"version"` - Host string `json:"host"` - Short string `json:"short_message"` - Full string `json:"full_message,omitempty"` - Time float64 `json:"timestamp"` - Level int32 `json:"level"` - Pid int `json:"_pid,omitempty"` - Facility string `json:"_facility,omitempty"` // optional, deprecated, send as additional field - File string `json:"_file,omitempty"` // optional, deprecated, send as additional field - Line int `json:"_line,omitempty"` // optional, deprecated, send as additional field - Prefix string `json:"_prefix,omitempty"` - Extra map[string]interface{} `json:"-"` + Version string `json:"version"` + Host string `json:"host"` + Short string `json:"short_message"` + Full string `json:"full_message,omitempty"` + Time float64 `json:"timestamp"` + Level int32 `json:"level"` + Pid int `json:"_pid,omitempty"` + Facility string `json:"_facility,omitempty"` // optional, deprecated, send as additional field + File string `json:"_file,omitempty"` // optional, deprecated, send as additional field + Line int `json:"_line,omitempty"` // optional, deprecated, send as additional field + Prefix string `json:"_prefix,omitempty"` + Extra map[string]interface{} `json:"-"` + AggregatedMessages []*Message `json:"embeddedMessages"` } type innerMessage Message //against circular (Un)MarshalJSON diff --git a/sdk/log/hook/hook.go b/sdk/log/hook/hook.go index d8bd881896..5b2fe847c5 100644 --- a/sdk/log/hook/hook.go +++ b/sdk/log/hook/hook.go @@ -58,13 +58,14 @@ var BufSize uint = 16384 // Config is the required configuration for creating a Graylog hook type Config struct { - Addr string - Protocol string - Hostname string - Facility string - TLSConfig *tls.Config - SendPolicy SendPolicy - Merge func(...map[string]interface{}) map[string]interface{} + Addr string + Protocol string + Hostname string + Facility string + TLSConfig *tls.Config + SendPolicy SendPolicy + FlushPolicy *FlushPolicy + Merge func(...map[string]interface{}) map[string]interface{} } // Hook to send logs to a logging service compatible with the Graylog API and the GELF format. @@ -81,13 +82,18 @@ type Hook struct { // Default is logrus.InfoLevel. Threshold logrus.Level - merge MergeFields - pid int - gelfLogger Writer - messages chan *Message - done chan struct{} - closed bool - l sync.Mutex + merge MergeFields + Pid int + gelfLogger Writer + messages chan *Message + flushMutex sync.Mutex + FlushPolicy *FlushPolicy +} + +type FlushPolicy struct { + Delay time.Duration + FlushFunc func(chan *Message) + Timeout time.Duration } // NewHook creates a hook to be added to an instance of logger. @@ -130,54 +136,83 @@ func NewHook(cfg *Config, extra map[string]interface{}) (*Hook, error) { cfg.SendPolicy = DropPolicy } + if cfg.FlushPolicy == nil { + cfg.FlushPolicy = DefaultFlushPolicy + } + merge := mergeFields if cfg.Merge != nil { merge = cfg.Merge } - fmt.Fprintf(os.Stderr, "[graylog] using endpoint: %s\n", cfg.Addr) + fmt.Fprintf(os.Stdout, "[graylog] using endpoint: %s\n", cfg.Addr) hook := &Hook{ - Facility: cfg.Facility, - Hostname: hostname, - Extra: extra, - SendPolicy: cfg.SendPolicy, - Threshold: logrus.DebugLevel, - merge: merge, - pid: os.Getpid(), - gelfLogger: w, - messages: make(chan *Message, BufSize), - done: make(chan struct{}, 1), + Facility: cfg.Facility, + Hostname: hostname, + Extra: extra, + SendPolicy: cfg.SendPolicy, + Threshold: logrus.DebugLevel, + merge: merge, + Pid: os.Getpid(), + gelfLogger: w, + messages: make(chan *Message, BufSize), + FlushPolicy: cfg.FlushPolicy, } go hook.fire() // Log in background + + if hook.FlushPolicy.Delay != 0 { + go func() { + ticker := time.NewTicker(hook.FlushPolicy.Delay) + defer ticker.Stop() + <-ticker.C + fmt.Println("triggering autoflush...") + hook.Flush() + }() + } + return hook, nil } +var ( + True = true + False = false +) + +var DefaultFlushPolicy = &FlushPolicy{ + Delay: 0, + Timeout: 1 * time.Minute, + FlushFunc: func(c chan *Message) { + fmt.Printf("[graylog] dropping %d messages\n", len(c)) + L: + for { + select { + case <-c: + default: + break L + } + } + }, +} + // Flush sends all remaining logs in the buffer to Graylog before returning func (hook *Hook) Flush() { - hook.l.Lock() - defer hook.l.Unlock() - if hook.closed { - return - } + hook.flushMutex.Lock() + defer hook.flushMutex.Unlock() + fmt.Printf("[graylog] FLUSH\n") - // cloes send channel to start flushing - close(hook.messages) + var exit = &False + time.AfterFunc(hook.FlushPolicy.Timeout, func() { + exit = &True + }) - // then simply wait for fire to empty the messages - // or timeout after a minute - select { - case <-hook.done: - close(hook.done) - case <-time.After(time.Minute): - fmt.Fprintln(os.Stderr, "[graylog] flushing timed out") + for len(hook.messages) > 0 && !*exit { + time.Sleep(time.Second) + } + if len(hook.messages) > 0 { + hook.FlushPolicy.FlushFunc(hook.messages) } - - hook.messages = make(chan *Message, BufSize) - hook.done = make(chan struct{}, 1) - - go hook.fire() } // Fire is called when a log event is fired. @@ -191,37 +226,28 @@ func (hook *Hook) Fire(entry *logrus.Entry) error { file, line := getCallerIgnoringLogMulti(1) msg := hook.messageFromEntry(entry, file, line) - hook.l.Lock() - defer hook.l.Unlock() - if hook.closed { - return nil - } - hook.SendPolicy(msg, hook.messages) return nil } // fire will loop on the 'buf' channel, and write entries to graylog func (hook *Hook) fire() { - r := retrier.New(retrier.ExponentialBackoff(3, time.Second), nil) + r := retrier.New(retrier.ExponentialBackoff(20, time.Millisecond), nil) // consume message buffer for message := range hook.messages { // we retry at least 3 times to write message to graylog. - // gelf package also has its own retry behaviour, which is - // roughly trying for a good ~15 minutes. err := r.Run(func() error { if err := hook.gelfLogger.WriteMessage(message); err != nil { - fmt.Fprintln(os.Stderr, "[graylog] could not write message to Graylog:", err) + fmt.Fprintln(os.Stdout, "[graylog] could not write message to Graylog:", err) return err } return nil }) // if after all the retries we still cannot write the message, just skip if err != nil { - fmt.Fprintln(os.Stderr, "[graylog] could not write message to Graylog after several retries:", err) + fmt.Fprintln(os.Stdout, "[graylog] could not write message to Graylog after several retries:", err) } } - hook.done <- struct{}{} } // Levels returns the available logging levels. @@ -259,7 +285,7 @@ func (hook *Hook) messageFromEntry(entry *logrus.Entry, file string, line int) * Full: full, Time: float64(entry.Time.UnixNano()) / 1e9, Level: int32(priorities[entry.Level]), - Pid: hook.pid, + Pid: hook.Pid, Facility: hook.Facility, File: file, Line: line, diff --git a/sdk/log/log.go b/sdk/log/log.go index dad041dfbc..cbe7774fd9 100644 --- a/sdk/log/log.go +++ b/sdk/log/log.go @@ -241,6 +241,7 @@ type Signature struct { Worker *SignatureWorker Service *SignatureService JobID int64 + NodeRunID int64 Timestamp int64 } From cf743051f6b4ceb2fad869e56e69b24e767197dd Mon Sep 17 00:00:00 2001 From: francois samin Date: Thu, 18 Jun 2020 15:35:32 +0200 Subject: [PATCH 2/4] fix(worker): logs --- engine/cdn/cdn_log.go | 73 +++++++------ engine/cdn/types.go | 5 +- engine/hatchery/serve.go | 8 +- engine/worker/internal/take.go | 59 +++------- sdk/log/hook/gelf.go | 25 +++-- sdk/log/hook/hook.go | 190 ++++++++++++++------------------- sdk/log/hook/stack.go | 70 ++++++++++++ sdk/log/hook/throttle.go | 78 ++++++++++++++ sdk/log/log.go | 25 ++--- 9 files changed, 312 insertions(+), 221 deletions(-) create mode 100644 sdk/log/hook/stack.go create mode 100644 sdk/log/hook/throttle.go diff --git a/engine/cdn/cdn_log.go b/engine/cdn/cdn_log.go index d936a2ac58..aacf3f4aeb 100644 --- a/engine/cdn/cdn_log.go +++ b/engine/cdn/cdn_log.go @@ -6,7 +6,6 @@ import ( "crypto/rsa" "fmt" "net" - "runtime" "strings" "time" @@ -41,17 +40,6 @@ func (s *Service) RunTcpLogServer(ctx context.Context) { _ = listener.Close() }() - var nbCPU = runtime.NumCPU() - s.ChanMessages = make(chan handledMessage, 10000) - for i := 0; i < nbCPU; i++ { - go func() { - log.Debug("process logs") - if err := s.processLogs(ctx); err != nil { - log.Error(ctx, err.Error()) - } - }() - } - go func() { for { conn, err := listener.Accept() @@ -72,6 +60,15 @@ func (s *Service) handleConnection(ctx context.Context, conn net.Conn) { defer func() { _ = conn.Close() }() + + chanMessages := make(chan handledMessage, 1000) + sdk.GoRoutine(context.Background(), "cdn-msgreader-"+sdk.UUID(), func(ctx context.Context) { + if err := s.processLogs(ctx, chanMessages); err != nil { + log.Error(ctx, "error while processing logs: %v", err) + } + }) + defer close(chanMessages) + bufReader := bufio.NewReader(conn) for { bytes, err := bufReader.ReadBytes(byte(0)) @@ -82,7 +79,7 @@ func (s *Service) handleConnection(ctx context.Context, conn net.Conn) { // remove byte(0) bytes = bytes[:len(bytes)-1] - if err := s.handleLogMessage(ctx, bytes); err != nil { + if err := s.handleLogMessage(ctx, chanMessages, bytes); err != nil { observability.Record(ctx, Errors, 1) log.Error(ctx, "cdn.log> %v", err) continue @@ -90,7 +87,7 @@ func (s *Service) handleConnection(ctx context.Context, conn net.Conn) { } } -func (s *Service) handleLogMessage(ctx context.Context, messageReceived []byte) error { +func (s *Service) handleLogMessage(ctx context.Context, chanMessages chan<- handledMessage, messageReceived []byte) error { m := hook.Message{} if err := m.UnmarshalJSON(messageReceived); err != nil { return sdk.WrapError(err, "unable to unmarshall gelf message: %s", string(messageReceived)) @@ -110,7 +107,7 @@ func (s *Service) handleLogMessage(ctx context.Context, messageReceived []byte) switch { case signature.Worker != nil: observability.Record(ctx, WorkerLogReceived, 1) - return s.handleWorkerLog(ctx, signature.Worker.WorkerID, sig, m) + return s.handleWorkerLog(ctx, chanMessages, signature.Worker.WorkerID, sig, m) case signature.Service != nil: observability.Record(ctx, ServiceLogReceived, 1) return s.handleServiceLog(ctx, signature.Service.HatcheryID, signature.Service.HatcheryName, signature.Service.WorkerName, sig, m) @@ -119,7 +116,7 @@ func (s *Service) handleLogMessage(ctx context.Context, messageReceived []byte) } } -func (s *Service) handleWorkerLog(ctx context.Context, workerID string, sig interface{}, m hook.Message) error { +func (s *Service) handleWorkerLog(ctx context.Context, chanMessages chan<- handledMessage, workerID string, sig interface{}, m hook.Message) error { var signature log.Signature var workerData sdk.Worker cacheData, ok := logCache.Get(fmt.Sprintf("worker-%s", workerID)) @@ -139,7 +136,7 @@ func (s *Service) handleWorkerLog(ctx context.Context, workerID string, sig inte return sdk.WithStack(sdk.ErrForbidden) } - s.ChanMessages <- handledMessage{ + chanMessages <- handledMessage{ signature: signature, m: m, } @@ -152,12 +149,27 @@ type handledMessage struct { m hook.Message } -func (s *Service) processLogs(ctx context.Context) error { +func (s *Service) processLogs(ctx context.Context, chanMessages <-chan handledMessage) error { + var t0 = time.Now() + var nbMessages int + defer func() { + delta := time.Since(t0).Seconds() + log.Info(ctx, "processLogs - %d messages received in %.3f seconds", nbMessages, delta) + }() for { select { case <-ctx.Done(): return ctx.Err() - case msg := <-s.ChanMessages: + + case msg, more := <-chanMessages: + nbMessages++ + if msg.signature.Worker == nil { + if !more { + return nil + } + continue + } + tx, err := s.Db.Begin() if err != nil { log.Error(ctx, "unable to start tx: %v", err) @@ -165,27 +177,20 @@ func (s *Service) processLogs(ctx context.Context) error { } defer tx.Rollback() // nolint - if len(msg.m.AggregatedMessages) > 0 { - var currentLog string - for _, m1 := range msg.m.AggregatedMessages { - currentLog += buildMessage(msg.signature, *m1) - } - if err := s.processLog(ctx, tx, msg.signature, currentLog); err != nil { - log.Error(ctx, "unable to process log: %+v", err) - continue - } - } else { - currentLog := buildMessage(msg.signature, msg.m) - if err := s.processLog(ctx, tx, msg.signature, currentLog); err != nil { - log.Error(ctx, "unable to process log: %+v", err) - continue - } + currentLog := buildMessage(msg.signature, msg.m) + if err := s.processLog(ctx, tx, msg.signature, currentLog); err != nil { + log.Error(ctx, "unable to process log: %+v", err) + continue } if err := tx.Commit(); err != nil { log.Error(ctx, "unable to commit tx: %+v", err) continue } + + if !more { + return nil + } } } } diff --git a/engine/cdn/types.go b/engine/cdn/types.go index f439f38135..054110600d 100644 --- a/engine/cdn/types.go +++ b/engine/cdn/types.go @@ -12,9 +12,8 @@ type Service struct { service.Common Cfg Configuration //Router *api.Router - Db *gorp.DbMap - Cache cache.Store - ChanMessages chan handledMessage + Db *gorp.DbMap + Cache cache.Store } // Configuration is the hooks configuration structure diff --git a/engine/hatchery/serve.go b/engine/hatchery/serve.go index 4e21ba6fd0..1f98718154 100644 --- a/engine/hatchery/serve.go +++ b/engine/hatchery/serve.go @@ -23,6 +23,7 @@ import ( "github.com/ovh/cds/sdk/hatchery" "github.com/ovh/cds/sdk/jws" "github.com/ovh/cds/sdk/log" + "github.com/ovh/cds/sdk/log/hook" ) type Common struct { @@ -193,7 +194,12 @@ func (c *Common) getPanicDumpListHandler() service.Handler { func (c *Common) InitServiceLogger() error { var signer jose.Signer if c.Common.ServiceInstance.LogServerAdress != "" { - logger, _, err := log.New(context.Background(), c.Common.ServiceInstance.LogServerAdress) + var graylogCfg = &hook.Config{ + Addr: c.Common.ServiceInstance.LogServerAdress, + Protocol: "tcp", + } + logger, _, err := log.New(context.Background(), graylogCfg) + if err != nil { return sdk.WithStack(err) } diff --git a/engine/worker/internal/take.go b/engine/worker/internal/take.go index c73b49d5bd..472cbe2df1 100644 --- a/engine/worker/internal/take.go +++ b/engine/worker/internal/take.go @@ -3,7 +3,6 @@ package internal import ( "context" "encoding/base64" - "fmt" "strings" "time" @@ -12,7 +11,6 @@ import ( "github.com/ovh/cds/sdk/jws" "github.com/ovh/cds/sdk/log" "github.com/ovh/cds/sdk/log/hook" - "github.com/sirupsen/logrus" ) func (w *CurrentWorker) Take(ctx context.Context, job sdk.WorkflowNodeJobRun) error { @@ -51,55 +49,26 @@ func (w *CurrentWorker) Take(ctx context.Context, job sdk.WorkflowNodeJobRun) er if info.GelfServiceAddr != "" { log.Info(ctx, "Setup step logger %s", info.GelfServiceAddr) - l, h, err := log.New(ctx, info.GelfServiceAddr) + throttlePolicy := hook.NewDefaultThrottlePolicy() + + var graylogCfg = &hook.Config{ + Addr: info.GelfServiceAddr, + Protocol: "tcp", + ThrottlePolicy: &hook.ThrottlePolicyConfig{ + Amount: 100, + Period: 10 * time.Millisecond, + Policy: throttlePolicy, + }, + } + + l, h, err := log.New(ctx, graylogCfg) if err != nil { return sdk.WithStack(err) } + w.logger.gelfLogger = new(logger) w.logger.gelfLogger.logger = l w.logger.gelfLogger.hook = h - w.logger.gelfLogger.hook.FlushPolicy = &hook.FlushPolicy{ - Delay: 100 * time.Millisecond, - FlushFunc: func(c chan *hook.Message) { - fmt.Printf("[graylog] aggregating %d messages from the channel\n", len(c)) - var remainingMessages = make([]*hook.Message, 0, len(c)) - L: - for { - select { - case m := <-c: - remainingMessages = append(remainingMessages, m) - default: - break L - } - } - - s := "aggregrated messages" - AggrMsg := hook.Message{ - Version: "1.1", - Host: h.Hostname, - Short: s, - Full: s, - Time: float64(time.Now().UnixNano()) / 1e9, - Level: int32(logrus.ErrorLevel), - Pid: h.Pid, - Facility: h.Facility, - AggregatedMessages: remainingMessages, - } - c <- &AggrMsg - - var exit = &sdk.False - time.AfterFunc(30*time.Second, func() { - exit = &sdk.True - }) - - for len(c) > 0 && !*exit { - log.Debug("waiting for the bulk message to be sent") - time.Sleep(time.Second) - } - - }, - Timeout: 30 * time.Second, - } } start := time.Now() diff --git a/sdk/log/hook/gelf.go b/sdk/log/hook/gelf.go index 36731e4e99..c330f7ffa1 100644 --- a/sdk/log/hook/gelf.go +++ b/sdk/log/hook/gelf.go @@ -23,19 +23,18 @@ const ( // Message represents the contents of the GELF message. It is gzipped // before sending. type Message struct { - Version string `json:"version"` - Host string `json:"host"` - Short string `json:"short_message"` - Full string `json:"full_message,omitempty"` - Time float64 `json:"timestamp"` - Level int32 `json:"level"` - Pid int `json:"_pid,omitempty"` - Facility string `json:"_facility,omitempty"` // optional, deprecated, send as additional field - File string `json:"_file,omitempty"` // optional, deprecated, send as additional field - Line int `json:"_line,omitempty"` // optional, deprecated, send as additional field - Prefix string `json:"_prefix,omitempty"` - Extra map[string]interface{} `json:"-"` - AggregatedMessages []*Message `json:"embeddedMessages"` + Version string `json:"version"` + Host string `json:"host"` + Short string `json:"short_message"` + Full string `json:"full_message,omitempty"` + Time float64 `json:"timestamp"` + Level int32 `json:"level"` + Pid int `json:"_pid,omitempty"` + Facility string `json:"_facility,omitempty"` // optional, deprecated, send as additional field + File string `json:"_file,omitempty"` // optional, deprecated, send as additional field + Line int `json:"_line,omitempty"` // optional, deprecated, send as additional field + Prefix string `json:"_prefix,omitempty"` + Extra map[string]interface{} `json:"-"` } type innerMessage Message //against circular (Un)MarshalJSON diff --git a/sdk/log/hook/hook.go b/sdk/log/hook/hook.go index 5b2fe847c5..211513190e 100644 --- a/sdk/log/hook/hook.go +++ b/sdk/log/hook/hook.go @@ -9,7 +9,6 @@ import ( "fmt" "os" "strings" - "sync" "time" "github.com/eapache/go-resiliency/retrier" @@ -43,10 +42,6 @@ var priorities = map[logrus.Level]Priority{ logrus.DebugLevel: LOG_DEBUG, } -// SendPolicy defines the policy to use when the buffer is full (drop, block, flush, ...) -// The default policy is to drop the message as it's always copied to stderr anyway. -type SendPolicy func(*Message, chan *Message) - // MergeFields defines a function to merge fields. It used for example to define your own field // convientions to match with your graylog service. type MergeFields func(...map[string]interface{}) map[string]interface{} @@ -58,23 +53,19 @@ var BufSize uint = 16384 // Config is the required configuration for creating a Graylog hook type Config struct { - Addr string - Protocol string - Hostname string - Facility string - TLSConfig *tls.Config - SendPolicy SendPolicy - FlushPolicy *FlushPolicy - Merge func(...map[string]interface{}) map[string]interface{} + Addr string + Protocol string + Hostname string + Facility string + TLSConfig *tls.Config + Merge func(...map[string]interface{}) map[string]interface{} + ThrottlePolicy *ThrottlePolicyConfig } // Hook to send logs to a logging service compatible with the Graylog API and the GELF format. type Hook struct { Facility string Hostname string - // Sending policy is used to deal with Graylog connection failure. - // If nil, DropPolicy is used by default, dropping logs when connection failure happens. - SendPolicy SendPolicy // Extra fields to send to Graylog for each log entry. Extra map[string]interface{} // Minimum logging level to send to Graylog. @@ -82,18 +73,13 @@ type Hook struct { // Default is logrus.InfoLevel. Threshold logrus.Level - merge MergeFields - Pid int - gelfLogger Writer - messages chan *Message - flushMutex sync.Mutex - FlushPolicy *FlushPolicy -} + merge MergeFields + Pid int + gelfLogger Writer -type FlushPolicy struct { - Delay time.Duration - FlushFunc func(chan *Message) - Timeout time.Duration + throttleStack *Stack + throttleTicker *time.Ticker + throttlePolicy ThrottlePolicy } // NewHook creates a hook to be added to an instance of logger. @@ -132,87 +118,39 @@ func NewHook(cfg *Config, extra map[string]interface{}) (*Hook, error) { return nil, err } - if cfg.SendPolicy == nil { - cfg.SendPolicy = DropPolicy - } - - if cfg.FlushPolicy == nil { - cfg.FlushPolicy = DefaultFlushPolicy - } - merge := mergeFields if cfg.Merge != nil { merge = cfg.Merge } - fmt.Fprintf(os.Stdout, "[graylog] using endpoint: %s\n", cfg.Addr) - hook := &Hook{ - Facility: cfg.Facility, - Hostname: hostname, - Extra: extra, - SendPolicy: cfg.SendPolicy, - Threshold: logrus.DebugLevel, - merge: merge, - Pid: os.Getpid(), - gelfLogger: w, - messages: make(chan *Message, BufSize), - FlushPolicy: cfg.FlushPolicy, - } - - go hook.fire() // Log in background - - if hook.FlushPolicy.Delay != 0 { - go func() { - ticker := time.NewTicker(hook.FlushPolicy.Delay) - defer ticker.Stop() - <-ticker.C - fmt.Println("triggering autoflush...") - hook.Flush() - }() + Facility: cfg.Facility, + Hostname: hostname, + Extra: extra, + Threshold: logrus.DebugLevel, + merge: merge, + Pid: os.Getpid(), + gelfLogger: w, } - return hook, nil -} - -var ( - True = true - False = false -) - -var DefaultFlushPolicy = &FlushPolicy{ - Delay: 0, - Timeout: 1 * time.Minute, - FlushFunc: func(c chan *Message) { - fmt.Printf("[graylog] dropping %d messages\n", len(c)) - L: - for { - select { - case <-c: - default: - break L - } + if cfg.ThrottlePolicy == nil { + dfault := NewDefaultThrottlePolicy() + cfg.ThrottlePolicy = &ThrottlePolicyConfig{ + Amount: 5, + Period: 10 * time.Second, + Policy: dfault, } - }, -} + } -// Flush sends all remaining logs in the buffer to Graylog before returning -func (hook *Hook) Flush() { - hook.flushMutex.Lock() - defer hook.flushMutex.Unlock() - fmt.Printf("[graylog] FLUSH\n") + hook.throttleStack = NewStack(cfg.ThrottlePolicy.Amount) + hook.throttleTicker = time.NewTicker(cfg.ThrottlePolicy.Period) + hook.throttlePolicy = cfg.ThrottlePolicy.Policy + hook.throttlePolicy.Init(hook) + fmt.Fprintf(os.Stdout, "[graylog] using endpoint: %s\n", cfg.Addr) - var exit = &False - time.AfterFunc(hook.FlushPolicy.Timeout, func() { - exit = &True - }) + go hook.fire() // Log in background - for len(hook.messages) > 0 && !*exit { - time.Sleep(time.Second) - } - if len(hook.messages) > 0 { - hook.FlushPolicy.FlushFunc(hook.messages) - } + return hook, nil } // Fire is called when a log event is fired. @@ -225,27 +163,57 @@ func (hook *Hook) Fire(entry *logrus.Entry) error { // performance impact due to locking file, line := getCallerIgnoringLogMulti(1) msg := hook.messageFromEntry(entry, file, line) + return hook.FireMessage(*msg) +} - hook.SendPolicy(msg, hook.messages) +type FireMessageFunc func(msg Message) error + +func (hook *Hook) IsThrottled() bool { + return !hook.throttleStack.Empty() +} + +func (hook *Hook) FireMessage(msg Message) error { + switch { + case !hook.IsThrottled() && !hook.throttlePolicy.PendingTrailingMessages(): + hook.throttleStack.Push(msg) + default: + hook.throttlePolicy.HandleTrailingMessage(msg) + } return nil } -// fire will loop on the 'buf' channel, and write entries to graylog +func (hook *Hook) Flush() { + hook.throttlePolicy.Flush() +} + +var r = retrier.New(retrier.ExponentialBackoff(20, time.Millisecond), nil) + +func (hook *Hook) send(m Message) { + // we retry at least 3 times to write message to graylog. + err := r.Run(func() error { + if err := hook.gelfLogger.WriteMessage(&m); err != nil { + fmt.Fprintln(os.Stderr, "[graylog] could not write message to Graylog:", err) + return err + } + return nil + }) + // if after all the retries we still cannot write the message, just skip + if err != nil { + fmt.Fprintln(os.Stderr, "[graylog] could not write message to Graylog after several retries:", err) + } +} + +// fire will loop on the 'throttled' channel, and write entries to graylog func (hook *Hook) fire() { - r := retrier.New(retrier.ExponentialBackoff(20, time.Millisecond), nil) - // consume message buffer - for message := range hook.messages { - // we retry at least 3 times to write message to graylog. - err := r.Run(func() error { - if err := hook.gelfLogger.WriteMessage(message); err != nil { - fmt.Fprintln(os.Stdout, "[graylog] could not write message to Graylog:", err) - return err + defer hook.throttleTicker.Stop() + for range hook.throttleTicker.C { + for { + time.Sleep(time.Millisecond) + m, has := hook.throttleStack.Pop() + if !has { + continue } - return nil - }) - // if after all the retries we still cannot write the message, just skip - if err != nil { - fmt.Fprintln(os.Stdout, "[graylog] could not write message to Graylog after several retries:", err) + hook.send(m) } } } diff --git a/sdk/log/hook/stack.go b/sdk/log/hook/stack.go new file mode 100644 index 0000000000..75f8f6f426 --- /dev/null +++ b/sdk/log/hook/stack.go @@ -0,0 +1,70 @@ +package hook + +import "sync" + +// NewStack returns a new stack. +func NewStack(cap int) *Stack { + s := &Stack{ + size: cap, + cap: cap, + messages: make([]Message, cap), + } + return s +} + +// Stack is a basic LIFO stack that resizes as needed. +type Stack struct { + mutex sync.Mutex + messages []Message + size int + head int + tail int + count int + cap int +} + +// Push adds a node to the queue. +func (q *Stack) Push(n Message) { + q.mutex.Lock() + defer q.mutex.Unlock() + + if q.head == q.tail && q.count > 0 { + messages := make([]Message, len(q.messages)+q.size) + copy(messages, q.messages[q.head:]) + copy(messages[len(q.messages)-q.head:], q.messages[:q.head]) + q.head = 0 + q.tail = len(q.messages) + q.messages = messages + } + q.messages[q.tail] = n + q.tail = (q.tail + 1) % len(q.messages) + q.count++ +} + +func (q *Stack) Ready() bool { + q.mutex.Lock() + defer q.mutex.Unlock() + + return q.count < q.cap +} + +func (q *Stack) Empty() bool { + q.mutex.Lock() + defer q.mutex.Unlock() + + return q.count == 0 +} + +// Pop removes and returns a node from the queue in first to last order. +func (q *Stack) Pop() (Message, bool) { + q.mutex.Lock() + defer q.mutex.Unlock() + + if q.count == 0 { + return Message{}, false + } + node := q.messages[q.head] + q.head = (q.head + 1) % len(q.messages) + q.count-- + return node, true +} diff --git a/sdk/log/hook/throttle.go b/sdk/log/hook/throttle.go new file mode 100644 index 0000000000..9b58c5de67 --- /dev/null +++ b/sdk/log/hook/throttle.go @@ -0,0 +1,78 @@ +package hook + +import ( + "fmt" + "os" + "sync" + "time" +) + +type ThrottlePolicyConfig struct { + Amount int + Period time.Duration + Policy ThrottlePolicy +} + +type ThrottlePolicy interface { + Init(hook *Hook) + HandleTrailingMessage(m Message) + PendingTrailingMessages() bool + Flush() +} + +func NewDefaultThrottlePolicy() ThrottlePolicy { + o := &DefaultThrottlePolicy{} + return o +} + +var _ ThrottlePolicy = new(DefaultThrottlePolicy) + +type DefaultThrottlePolicy struct { + mutex sync.Mutex + buffer chan Message +} + +func (d *DefaultThrottlePolicy) Init(hook *Hook) { + d.buffer = make(chan Message, BufSize) + go func() { + for { + time.Sleep(1 * time.Millisecond) + if !hook.IsThrottled() { + m, more := <-d.buffer + hook.throttleStack.Push(m) + if !more { + fmt.Fprintf(os.Stderr, "[graylog] exiting trailing message goroutine...\n") + break + } + } + } + }() +} + +func (d *DefaultThrottlePolicy) HandleTrailingMessage(m Message) { + d.mutex.Lock() + defer d.mutex.Unlock() + + select { + case d.buffer <- m: + default: + fmt.Fprintf(os.Stderr, "[graylog] message dropped\n") + } +} + +func (d *DefaultThrottlePolicy) Flush() { + d.mutex.Lock() + defer d.mutex.Unlock() + + fmt.Fprintf(os.Stderr, "[graylog] flush\n") + for len(d.buffer) != 0 { + time.Sleep(time.Second) + } +} + +func (d *DefaultThrottlePolicy) PendingTrailingMessages() bool { + d.mutex.Lock() + defer d.mutex.Unlock() + + return len(d.buffer) > 0 +} diff --git a/sdk/log/log.go b/sdk/log/log.go index cbe7774fd9..62ade2a6ef 100644 --- a/sdk/log/log.go +++ b/sdk/log/log.go @@ -8,8 +8,9 @@ import ( "os" "strings" - loghook "github.com/ovh/cds/sdk/log/hook" log "github.com/sirupsen/logrus" + + "github.com/ovh/cds/sdk/log/hook" ) // Conf contains log configuration @@ -36,8 +37,8 @@ const ( ) var ( - logger Logger - hook *loghook.Hook + logger Logger + graylogHook *hook.Hook ) // Logger defines the logs levels used @@ -84,7 +85,7 @@ func Initialize(ctx context.Context, conf *Conf) { log.SetFormatter(&CDSFormatter{}) if conf.GraylogHost != "" && conf.GraylogPort != "" { - graylogcfg := &loghook.Config{ + graylogcfg := &hook.Config{ Addr: fmt.Sprintf("%s:%s", conf.GraylogHost, conf.GraylogPort), Protocol: conf.GraylogProtocol, TLSConfig: &tls.Config{ServerName: conf.GraylogHost}, @@ -124,12 +125,12 @@ func Initialize(ctx context.Context, conf *Conf) { extra["CDSHostname"] = hostname var errhook error - hook, errhook = loghook.NewHook(graylogcfg, extra) + graylogHook, errhook = hook.NewHook(graylogcfg, extra) if errhook != nil { log.Errorf("Error while initialize graylog hook: %v", errhook) } else { - log.AddHook(hook) + log.AddHook(graylogHook) log.SetOutput(ioutil.Discard) } } @@ -137,8 +138,8 @@ func Initialize(ctx context.Context, conf *Conf) { go func() { <-ctx.Done() Info(ctx, "Draining logs...") - if hook != nil { - hook.Flush() + if graylogHook != nil { + graylogHook.Flush() } }() } @@ -258,14 +259,10 @@ type SignatureService struct { WorkerName string } -func New(ctx context.Context, logServerAddr string) (*log.Logger, *loghook.Hook, error) { +func New(ctx context.Context, graylogcfg *hook.Config) (*log.Logger, *hook.Hook, error) { newLogger := log.New() - graylogcfg := &loghook.Config{ - Addr: logServerAddr, - Protocol: "tcp", - } extra := map[string]interface{}{} - hook, err := loghook.NewHook(graylogcfg, extra) + hook, err := hook.NewHook(graylogcfg, extra) if err != nil { return nil, nil, fmt.Errorf("unable to add hook: %v", err) } From 310c021ebccb749ce559ed7f4341ebff4e404edb Mon Sep 17 00:00:00 2001 From: francois samin Date: Thu, 18 Jun 2020 15:41:47 +0200 Subject: [PATCH 3/4] fix Signed-off-by: francois samin --- engine/hatchery/swarm/swarm_util_kill.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/engine/hatchery/swarm/swarm_util_kill.go b/engine/hatchery/swarm/swarm_util_kill.go index 0048bf662e..6dfbbc8e06 100644 --- a/engine/hatchery/swarm/swarm_util_kill.go +++ b/engine/hatchery/swarm/swarm_util_kill.go @@ -130,14 +130,14 @@ func (h *HatcherySwarm) killAndRemoveContainer(ctx context.Context, dockerClient } } - //ctxDockerRemove, cancelList := context.WithTimeout(context.Background(), 20*time.Second) - //defer cancelList() - //if err := dockerClient.ContainerRemove(ctxDockerRemove, ID, types.ContainerRemoveOptions{RemoveVolumes: true, Force: true}); err != nil { - // // container could be already removed by a previous call to docker - // if !strings.Contains(err.Error(), "No such container") && !strings.Contains(err.Error(), "is already in progress") { - // log.Error(ctx, "Unable to remove container %s from %s: %v", ID, dockerClient.name, err) - // } - //} + ctxDockerRemove, cancelList := context.WithTimeout(context.Background(), 20*time.Second) + defer cancelList() + if err := dockerClient.ContainerRemove(ctxDockerRemove, ID, types.ContainerRemoveOptions{RemoveVolumes: true, Force: true}); err != nil { + // container could be already removed by a previous call to docker + if !strings.Contains(err.Error(), "No such container") && !strings.Contains(err.Error(), "is already in progress") { + log.Error(ctx, "Unable to remove container %s from %s: %v", ID, dockerClient.name, err) + } + } return nil } From 6d28b72e23cc7cb234b2c8908c35227957003fac Mon Sep 17 00:00:00 2001 From: francois samin Date: Fri, 19 Jun 2020 13:13:29 +0200 Subject: [PATCH 4/4] fix Signed-off-by: francois samin --- engine/cdn/cdn_log.go | 15 ++++++++++----- engine/cdn/cdn_log_test.go | 13 +++++++++++-- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/engine/cdn/cdn_log.go b/engine/cdn/cdn_log.go index aacf3f4aeb..455fc6fee6 100644 --- a/engine/cdn/cdn_log.go +++ b/engine/cdn/cdn_log.go @@ -56,17 +56,22 @@ func (s *Service) RunTcpLogServer(ctx context.Context) { }() } -func (s *Service) handleConnection(ctx context.Context, conn net.Conn) { - defer func() { - _ = conn.Close() - }() - +func (s *Service) handleConnectionChannel(ctx context.Context) chan<- handledMessage { chanMessages := make(chan handledMessage, 1000) sdk.GoRoutine(context.Background(), "cdn-msgreader-"+sdk.UUID(), func(ctx context.Context) { if err := s.processLogs(ctx, chanMessages); err != nil { log.Error(ctx, "error while processing logs: %v", err) } }) + return chanMessages +} + +func (s *Service) handleConnection(ctx context.Context, conn net.Conn) { + defer func() { + _ = conn.Close() + }() + + chanMessages := s.handleConnectionChannel(ctx) defer close(chanMessages) bufReader := bufio.NewReader(conn) diff --git a/engine/cdn/cdn_log_test.go b/engine/cdn/cdn_log_test.go index fcb3d85650..3348080256 100644 --- a/engine/cdn/cdn_log_test.go +++ b/engine/cdn/cdn_log_test.go @@ -50,6 +50,7 @@ func TestWorkerLog(t *testing.T) { StepOrder: 0, }, JobID: dbj.ID, + NodeRunID: jobRun.WorkflowNodeRunID, Timestamp: time.Now().UnixNano(), } logCache.Set(fmt.Sprintf("worker-%s", signature.Worker.WorkerID), sdk.Worker{ @@ -64,7 +65,11 @@ func TestWorkerLog(t *testing.T) { "host": "host", "_line":1, "_pid": 1, "_prefix": "prefix", "full_message": "this is my message", "_Signature": "%s"}` message = fmt.Sprintf(message, signatureField) - require.NoError(t, s.handleLogMessage(context.TODO(), []byte(message))) + chanMessages := s.handleConnectionChannel(context.TODO()) + require.NoError(t, s.handleLogMessage(context.TODO(), chanMessages, []byte(message))) + close(chanMessages) + + time.Sleep(100 * time.Millisecond) logs, err := workflow.LoadLogs(s.Db, dbj.ID) require.NoError(t, err) @@ -108,6 +113,7 @@ func TestServiceLog(t *testing.T) { RequirementName: "service-1", }, JobID: dbj.ID, + NodeRunID: jobRun.WorkflowNodeRunID, Timestamp: time.Now().UnixNano(), } @@ -121,9 +127,12 @@ func TestServiceLog(t *testing.T) { "host": "host", "_line":1, "_pid": 1, "_prefix": "prefix", "full_message": "this is my service message", "_Signature": "%s"}` message = fmt.Sprintf(message, signatureField) - require.NoError(t, s.handleLogMessage(context.TODO(), []byte(message))) + chanMessages := s.handleConnectionChannel(context.TODO()) + require.NoError(t, s.handleLogMessage(context.TODO(), chanMessages, []byte(message))) + close(chanMessages) logs, err := workflow.LoadServiceLog(db, dbj.ID, signature.Service.RequirementName) require.NoError(t, err) require.Equal(t, "this is my service message\n", logs.Val) + }