Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(worker, cdn): logs #5265

Merged
merged 4 commits into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions engine/api/workflow/execute_node_job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ func AddLog(db gorp.SqlExecutor, job *sdk.WorkflowNodeJobRun, logs *sdk.Log, max

// ignore the log if max size already reached
if maxReached := truncateLogs(maxLogSize, size, logs); maxReached {
log.Debug("truncated logs")
return nil
}

Expand Down
2 changes: 0 additions & 2 deletions engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,6 @@ func (api *API) postWorkflowJobLogsHandler() service.Handler {
return err
}

log.Debug("postWorkflowJobLogsHandler> Logs: %+v", logs)

if err := workflow.AddLog(api.mustDB(), pbJob, &logs, api.Config.Log.StepMaxSize); err != nil {
return err
}
Expand Down
2 changes: 0 additions & 2 deletions engine/api/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -1333,8 +1333,6 @@ func (api *API) getWorkflowNodeRunJobStepHandler() service.Handler {
StepLogs: *ls,
}

log.Debug("logs: %+v", result)

return service.WriteJSON(w, result, http.StatusOK)
}
}
Expand Down
8 changes: 3 additions & 5 deletions engine/cdn/cdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,9 @@ func (s *Service) Serve(c context.Context) error {

//Gracefully shutdown the http server
go func() {
select {
case <-ctx.Done():
log.Info(ctx, "CDN> Shutdown HTTP Server")
_ = server.Shutdown(ctx)
}
<-ctx.Done()
log.Info(ctx, "CDN> Shutdown HTTP Server")
_ = server.Shutdown(ctx)
}()

//Start the http server
Expand Down
110 changes: 88 additions & 22 deletions engine/cdn/cdn_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

"github.com/go-gorp/gorp"
gocache "github.com/patrickmn/go-cache"

"github.com/ovh/cds/engine/api/observability"
Expand All @@ -34,11 +35,9 @@ func (s *Service) RunTcpLogServer(ctx context.Context) {

//Gracefully shutdown the tcp server
go func() {
select {
case <-ctx.Done():
log.Info(ctx, "CDN> Shutdown tcp log Server")
_ = listener.Close()
}
<-ctx.Done()
log.Info(ctx, "CDN> Shutdown tcp log Server")
_ = listener.Close()
}()

go func() {
Expand All @@ -61,6 +60,15 @@ func (s *Service) handleConnection(ctx context.Context, conn net.Conn) {
defer func() {
_ = conn.Close()
}()

chanMessages := make(chan handledMessage, 1000)
sdk.GoRoutine(context.Background(), "cdn-msgreader-"+sdk.UUID(), func(ctx context.Context) {
if err := s.processLogs(ctx, chanMessages); err != nil {
log.Error(ctx, "error while processing logs: %v", err)
}
})
defer close(chanMessages)

bufReader := bufio.NewReader(conn)
for {
bytes, err := bufReader.ReadBytes(byte(0))
Expand All @@ -71,15 +79,15 @@ func (s *Service) handleConnection(ctx context.Context, conn net.Conn) {
// remove byte(0)
bytes = bytes[:len(bytes)-1]

if err := s.handleLogMessage(ctx, bytes); err != nil {
if err := s.handleLogMessage(ctx, chanMessages, bytes); err != nil {
observability.Record(ctx, Errors, 1)
log.Error(ctx, "cdn.log> %v", err)
continue
}
}
}

func (s *Service) handleLogMessage(ctx context.Context, messageReceived []byte) error {
func (s *Service) handleLogMessage(ctx context.Context, chanMessages chan<- handledMessage, messageReceived []byte) error {
m := hook.Message{}
if err := m.UnmarshalJSON(messageReceived); err != nil {
return sdk.WrapError(err, "unable to unmarshall gelf message: %s", string(messageReceived))
Expand All @@ -99,7 +107,7 @@ func (s *Service) handleLogMessage(ctx context.Context, messageReceived []byte)
switch {
case signature.Worker != nil:
observability.Record(ctx, WorkerLogReceived, 1)
return s.handleWorkerLog(ctx, signature.Worker.WorkerID, sig, m)
return s.handleWorkerLog(ctx, chanMessages, signature.Worker.WorkerID, sig, m)
case signature.Service != nil:
observability.Record(ctx, ServiceLogReceived, 1)
return s.handleServiceLog(ctx, signature.Service.HatcheryID, signature.Service.HatcheryName, signature.Service.WorkerName, sig, m)
Expand All @@ -108,7 +116,7 @@ func (s *Service) handleLogMessage(ctx context.Context, messageReceived []byte)
}
}

func (s *Service) handleWorkerLog(ctx context.Context, workerID string, sig interface{}, m hook.Message) error {
func (s *Service) handleWorkerLog(ctx context.Context, chanMessages chan<- handledMessage, workerID string, sig interface{}, m hook.Message) error {
var signature log.Signature
var workerData sdk.Worker
cacheData, ok := logCache.Get(fmt.Sprintf("worker-%s", workerID))
Expand All @@ -128,16 +136,71 @@ func (s *Service) handleWorkerLog(ctx context.Context, workerID string, sig inte
return sdk.WithStack(sdk.ErrForbidden)
}

pbJob, err := workflow.LoadNodeJobRun(ctx, s.Db, s.Cache, signature.JobID)
if err != nil {
return err
chanMessages <- handledMessage{
signature: signature,
m: m,
}

return nil
}

type handledMessage struct {
signature log.Signature
m hook.Message
}

func (s *Service) processLogs(ctx context.Context, chanMessages <-chan handledMessage) error {
var t0 = time.Now()
var nbMessages int
defer func() {
delta := time.Since(t0).Seconds()
log.Info(ctx, "processLogs - %d messages received in %.3f seconds", nbMessages, delta)
}()
for {
select {
case <-ctx.Done():
return ctx.Err()

case msg, more := <-chanMessages:
nbMessages++
if msg.signature.Worker == nil {
if !more {
return nil
}
continue
}

tx, err := s.Db.Begin()
if err != nil {
log.Error(ctx, "unable to start tx: %v", err)
continue
}
defer tx.Rollback() // nolint

currentLog := buildMessage(msg.signature, msg.m)
if err := s.processLog(ctx, tx, msg.signature, currentLog); err != nil {
log.Error(ctx, "unable to process log: %+v", err)
continue
}

if err := tx.Commit(); err != nil {
log.Error(ctx, "unable to commit tx: %+v", err)
continue
}

if !more {
return nil
}
}
}
}

func buildMessage(signature log.Signature, m hook.Message) string {
logDate := time.Unix(0, int64(m.Time*1e9))
logs := sdk.Log{
JobID: pbJob.ID,
JobID: signature.JobID,
LastModified: &logDate,
NodeRunID: pbJob.WorkflowNodeRunID,
NodeRunID: signature.NodeRunID,
Start: &logDate,
StepOrder: signature.Worker.StepOrder,
Val: m.Full,
Expand Down Expand Up @@ -165,16 +228,19 @@ func (s *Service) handleWorkerLog(ctx context.Context, workerID string, sig inte
lvl = "EMERGENCY"
}
logs.Val = fmt.Sprintf("[%s] %s", lvl, logs.Val)
tx, err := s.Db.Begin()
if err != nil {
return sdk.WithStack(err)
}
defer tx.Rollback() // nolint
return logs.Val
}

if err := workflow.AddLog(tx, pbJob, &logs, s.Cfg.Log.StepMaxSize); err != nil {
return err
func (s *Service) processLog(ctx context.Context, db gorp.SqlExecutor, signature log.Signature, message string) error {
now := time.Now()
l := sdk.Log{
JobID: signature.JobID,
NodeRunID: signature.NodeRunID,
LastModified: &now,
StepOrder: signature.Worker.StepOrder,
Val: message,
}
return sdk.WithStack(tx.Commit())
return workflow.AddLog(db, nil, &l, s.Cfg.Log.StepMaxSize)
}

func (s *Service) handleServiceLog(ctx context.Context, hatcheryID int64, hatcheryName string, workerName string, sig interface{}, m hook.Message) error {
Expand Down
9 changes: 8 additions & 1 deletion engine/hatchery/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/ovh/cds/sdk/hatchery"
"github.com/ovh/cds/sdk/jws"
"github.com/ovh/cds/sdk/log"
"github.com/ovh/cds/sdk/log/hook"
)

type Common struct {
Expand Down Expand Up @@ -193,7 +194,12 @@ func (c *Common) getPanicDumpListHandler() service.Handler {
func (c *Common) InitServiceLogger() error {
var signer jose.Signer
if c.Common.ServiceInstance.LogServerAdress != "" {
logger, _, err := log.New(context.Background(), c.Common.ServiceInstance.LogServerAdress)
var graylogCfg = &hook.Config{
Addr: c.Common.ServiceInstance.LogServerAdress,
Protocol: "tcp",
}
logger, _, err := log.New(context.Background(), graylogCfg)

if err != nil {
return sdk.WithStack(err)
}
Expand All @@ -218,6 +224,7 @@ func (c *Common) SendServiceLog(ctx context.Context, servicesLogs []sdk.ServiceL
WorkerName: s.WorkerName,
},
JobID: s.WorkflowNodeJobRunID,
NodeRunID: s.WorkflowNodeRunID,
Timestamp: time.Now().UnixNano(),
}
signature, err := jws.Sign(c.Signer, dataToSign)
Expand Down
13 changes: 10 additions & 3 deletions engine/worker/internal/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ func (w *CurrentWorker) runAction(ctx context.Context, a sdk.Action, jobID int64
var t0 = time.Now()
defer func() {
w.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf("End of step \"%s\" (%s)", actionName, sdk.Round(time.Since(t0), time.Second).String()))
if w.logger.gelfLogger != nil {
w.logger.gelfLogger.hook.Flush()
}
}()

//If the action is disabled; skip it
Expand Down Expand Up @@ -516,16 +519,16 @@ func (w *CurrentWorker) ProcessJob(jobInfo sdk.WorkflowNodeJobRunData) (res sdk.
w.logger.logChan = make(chan sdk.Log)
go func() {
if err := w.logProcessor(ctx, jobInfo.NodeJobRun.ID); err != nil {
log.Error(ctx, "processJob> Logs processor error: %v", err)
log.Warning(ctx, "processJob> Logs processor error: %v", err)
}
}()
defer func() {
if err := w.drainLogsAndCloseLogger(ctx); err != nil {
log.Error(ctx, "processJob> Drain logs error: %v", err)
log.Warning(ctx, "processJob> Drain logs error: %v", err)
}
}()
defer func() {
log.Error(ctx, "processJob> Status: %s | Reason: %s", res.Status, res.Reason)
log.Warning(ctx, "processJob> Status: %s | Reason: %s", res.Status, res.Reason)
}()

wdFile, wdAbs, err := w.setupWorkingDirectory(ctx, jobInfo)
Expand Down Expand Up @@ -619,6 +622,10 @@ func (w *CurrentWorker) ProcessJob(jobInfo sdk.WorkflowNodeJobRunData) (res sdk.
if err := teardownDirectory(w.basedir, ""); err != nil {
log.Error(ctx, "Cannot remove basedir content: %s", err)
}
// Flushing logs
if w.logger.gelfLogger != nil {
w.logger.gelfLogger.hook.Flush()
}

return res
}
16 changes: 15 additions & 1 deletion engine/worker/internal/take.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ovh/cds/sdk"
"github.com/ovh/cds/sdk/jws"
"github.com/ovh/cds/sdk/log"
"github.com/ovh/cds/sdk/log/hook"
)

func (w *CurrentWorker) Take(ctx context.Context, job sdk.WorkflowNodeJobRun) error {
Expand Down Expand Up @@ -48,10 +49,23 @@ func (w *CurrentWorker) Take(ctx context.Context, job sdk.WorkflowNodeJobRun) er

if info.GelfServiceAddr != "" {
log.Info(ctx, "Setup step logger %s", info.GelfServiceAddr)
l, h, err := log.New(ctx, info.GelfServiceAddr)
throttlePolicy := hook.NewDefaultThrottlePolicy()

var graylogCfg = &hook.Config{
Addr: info.GelfServiceAddr,
Protocol: "tcp",
ThrottlePolicy: &hook.ThrottlePolicyConfig{
Amount: 100,
Period: 10 * time.Millisecond,
Policy: throttlePolicy,
},
}

l, h, err := log.New(ctx, graylogCfg)
if err != nil {
return sdk.WithStack(err)
}

w.logger.gelfLogger = new(logger)
w.logger.gelfLogger.logger = l
w.logger.gelfLogger.hook = h
Expand Down
2 changes: 1 addition & 1 deletion engine/worker/internal/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,14 @@ func (wk *CurrentWorker) SendLog(ctx context.Context, level workerruntime.Level,
StepOrder: int64(stepOrder),
},
JobID: wk.currentJob.wJob.ID,
NodeRunID: wk.currentJob.wJob.WorkflowNodeRunID,
Timestamp: time.Now().UnixNano(),
}
signature, err := jws.Sign(wk.currentJob.signer, dataToSign)
if err != nil {
log.Error(ctx, "unable to sign logs: %v", err)
}
wk.logger.gelfLogger.logger.WithField(log.ExtraFieldSignature, signature).Log(logLevel, s)
wk.logger.gelfLogger.hook.Flush()
}

func (wk *CurrentWorker) Name() string {
Expand Down
Loading