diff --git a/engine/api/cache/cache.go b/engine/api/cache/cache.go index af03f81593..1634cf029e 100644 --- a/engine/api/cache/cache.go +++ b/engine/api/cache/cache.go @@ -22,6 +22,7 @@ func Key(args ...string) string { //Store is an interface type Store interface { + Keys(pattern string) ([]string, error) Get(key string, value interface{}) (bool, error) Set(key string, value interface{}) error SetWithTTL(key string, value interface{}, ttl int) error @@ -29,8 +30,9 @@ type Store interface { UpdateTTL(key string, ttl int) error Delete(key string) error DeleteAll(key string) error + Exist(key string) (bool, error) Enqueue(queueName string, value interface{}) error - DequeueWithContext(c context.Context, queueName string, value interface{}) error + DequeueWithContext(c context.Context, queueName string, waitDuration time.Duration, value interface{}) error QueueLen(queueName string) (int, error) RemoveFromQueue(queueName string, memberKey string) error Publish(ctx context.Context, queueName string, value interface{}) error diff --git a/engine/api/cache/redis.go b/engine/api/cache/redis.go index 3bb57ecace..077025d80a 100644 --- a/engine/api/cache/redis.go +++ b/engine/api/cache/redis.go @@ -71,6 +71,18 @@ func NewRedisStore(host, password string, ttl int) (*RedisStore, error) { }, nil } +// Keys List keys from pattern +func (s *RedisStore) Keys(pattern string) ([]string, error) { + if s.Client == nil { + return nil, sdk.WithStack(fmt.Errorf("redis> cannot get redis client")) + } + keys, err := s.Client.Keys(pattern).Result() + if err != nil { + return nil, sdk.WrapError(err, "redis> cannot list keys: %s", pattern) + } + return keys, nil +} + // Get a key from redis func (s *RedisStore) Get(key string, value interface{}) (bool, error) { if s.Client == nil { @@ -173,6 +185,18 @@ func (s *RedisStore) DeleteAll(pattern string) error { return nil } +// Exist test is key exists +func (s *RedisStore) Exist(key string) (bool, error) { + if s.Client == nil { + return false, sdk.WithStack(fmt.Errorf("redis> cannot get redis client")) + } + ok, err := s.Client.Exists(key).Result() + if err != nil { + return false, sdk.WrapError(err, "unable to test if key %s exists", key) + } + return ok == 1, nil +} + // Enqueue pushes to queue func (s *RedisStore) Enqueue(queueName string, value interface{}) error { if s.Client == nil { @@ -203,13 +227,13 @@ func (s *RedisStore) QueueLen(queueName string) (int, error) { } // DequeueWithContext gets from queue This is blocking while there is nothing in the queue, it can be cancelled with a context.Context -func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, value interface{}) error { +func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, waitDuration time.Duration, value interface{}) error { if s.Client == nil { return sdk.WithStack(fmt.Errorf("redis> cannot get redis client")) } var elem string - ticker := time.NewTicker(250 * time.Millisecond).C + ticker := time.NewTicker(waitDuration).C for elem == "" { select { case <-ticker: diff --git a/engine/api/event/event.go b/engine/api/event/event.go index 9d6219840e..9b3c4bad42 100644 --- a/engine/api/event/event.go +++ b/engine/api/event/event.go @@ -139,7 +139,7 @@ func Subscribe(ch chan<- sdk.Event) { func DequeueEvent(ctx context.Context, db *gorp.DbMap) { for { e := sdk.Event{} - if err := store.DequeueWithContext(ctx, "events", &e); err != nil { + if err := store.DequeueWithContext(ctx, "events", 250*time.Millisecond, &e); err != nil { log.Error(ctx, "Event.DequeueEvent> store.DequeueWithContext err: %v", err) continue } diff --git a/engine/api/repositoriesmanager/events.go b/engine/api/repositoriesmanager/events.go index f73ae2b313..dcd6c8153d 100644 --- a/engine/api/repositoriesmanager/events.go +++ b/engine/api/repositoriesmanager/events.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "time" "github.com/go-gorp/gorp" "github.com/ovh/cds/engine/api/cache" @@ -15,7 +16,7 @@ import ( func ReceiveEvents(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Store) { for { e := sdk.Event{} - if err := store.DequeueWithContext(ctx, "events_repositoriesmanager", &e); err != nil { + if err := store.DequeueWithContext(ctx, "events_repositoriesmanager", 250*time.Millisecond, &e); err != nil { log.Error(ctx, "repositoriesmanager.ReceiveEvents > store.DequeueWithContext err: %v", err) continue } diff --git a/engine/cdn/cdn_log.go b/engine/cdn/cdn_log.go index 7fbd9e8004..6a1a10f7ae 100644 --- a/engine/cdn/cdn_log.go +++ b/engine/cdn/cdn_log.go @@ -6,12 +6,13 @@ import ( "crypto/rsa" "fmt" "net" + "strconv" "strings" "time" - "github.com/go-gorp/gorp" gocache "github.com/patrickmn/go-cache" + "github.com/ovh/cds/engine/api/cache" "github.com/ovh/cds/engine/api/observability" "github.com/ovh/cds/engine/api/services" "github.com/ovh/cds/engine/api/worker" @@ -23,7 +24,10 @@ import ( ) var ( - logCache = gocache.New(20*time.Minute, 30*time.Minute) + logCache = gocache.New(20*time.Minute, 30*time.Minute) + keyJobLogQueue = cache.Key("cdn", "log", "job") + keyJobHearbeat = cache.Key("cdn", "log", "heartbeat") + keyJobLock = cache.Key("cdn", "log", "lock") ) func (s *Service) RunTcpLogServer(ctx context.Context) { @@ -40,6 +44,11 @@ func (s *Service) RunTcpLogServer(ctx context.Context) { _ = listener.Close() }() + // Looking for something to dequeue + sdk.GoRoutine(ctx, "cdn-waiting-job", func(ctx context.Context) { + s.waitingJobs(ctx) + }) + go func() { for { conn, err := listener.Accept() @@ -56,24 +65,11 @@ func (s *Service) RunTcpLogServer(ctx context.Context) { }() } -func (s *Service) handleConnectionChannel(ctx context.Context) chan<- handledMessage { - chanMessages := make(chan handledMessage, 1000) - sdk.GoRoutine(context.Background(), "cdn-msgreader-"+sdk.UUID(), func(ctx context.Context) { - if err := s.processLogs(ctx, chanMessages); err != nil { - log.Error(ctx, "error while processing logs: %v", err) - } - }) - return chanMessages -} - func (s *Service) handleConnection(ctx context.Context, conn net.Conn) { defer func() { _ = conn.Close() }() - chanMessages := s.handleConnectionChannel(ctx) - defer close(chanMessages) - bufReader := bufio.NewReader(conn) for { bytes, err := bufReader.ReadBytes(byte(0)) @@ -84,7 +80,7 @@ func (s *Service) handleConnection(ctx context.Context, conn net.Conn) { // remove byte(0) bytes = bytes[:len(bytes)-1] - if err := s.handleLogMessage(ctx, chanMessages, bytes); err != nil { + if err := s.handleLogMessage(ctx, bytes); err != nil { observability.Record(ctx, Errors, 1) log.Error(ctx, "cdn.log> %v", err) continue @@ -92,7 +88,7 @@ func (s *Service) handleConnection(ctx context.Context, conn net.Conn) { } } -func (s *Service) handleLogMessage(ctx context.Context, chanMessages chan<- handledMessage, messageReceived []byte) error { +func (s *Service) handleLogMessage(ctx context.Context, messageReceived []byte) error { m := hook.Message{} if err := m.UnmarshalJSON(messageReceived); err != nil { return sdk.WrapError(err, "unable to unmarshall gelf message: %s", string(messageReceived)) @@ -112,7 +108,7 @@ func (s *Service) handleLogMessage(ctx context.Context, chanMessages chan<- hand switch { case signature.Worker != nil: observability.Record(ctx, WorkerLogReceived, 1) - return s.handleWorkerLog(ctx, chanMessages, signature.Worker.WorkerID, sig, m) + return s.handleWorkerLog(ctx, signature.Worker.WorkerID, sig, m) case signature.Service != nil: observability.Record(ctx, ServiceLogReceived, 1) return s.handleServiceLog(ctx, signature.Service.HatcheryID, signature.Service.HatcheryName, signature.Service.WorkerName, sig, m) @@ -121,7 +117,7 @@ func (s *Service) handleLogMessage(ctx context.Context, chanMessages chan<- hand } } -func (s *Service) handleWorkerLog(ctx context.Context, chanMessages chan<- handledMessage, workerID string, sig interface{}, m hook.Message) error { +func (s *Service) handleWorkerLog(ctx context.Context, workerID string, sig interface{}, m hook.Message) error { var signature log.Signature var workerData sdk.Worker cacheData, ok := logCache.Get(fmt.Sprintf("worker-%s", workerID)) @@ -141,64 +137,20 @@ func (s *Service) handleWorkerLog(ctx context.Context, chanMessages chan<- handl return sdk.WithStack(sdk.ErrForbidden) } - chanMessages <- handledMessage{ - signature: signature, - m: m, + hm := handledMessage{ + Signature: signature, + Msg: m, + } + cacheKey := cache.Key(keyJobLogQueue, strconv.Itoa(int(signature.JobID))) + if err := s.Cache.Enqueue(cacheKey, hm); err != nil { + return err } - return nil } type handledMessage struct { - signature log.Signature - m hook.Message -} - -func (s *Service) processLogs(ctx context.Context, chanMessages <-chan handledMessage) error { - var t0 = time.Now() - var nbMessages int - defer func() { - delta := time.Since(t0).Seconds() - log.Info(ctx, "processLogs - %d messages received in %.3f seconds", nbMessages, delta) - }() - for { - select { - case <-ctx.Done(): - return ctx.Err() - - case msg, more := <-chanMessages: - nbMessages++ - if msg.signature.Worker == nil { - if !more { - return nil - } - continue - } - - tx, err := s.Db.Begin() - if err != nil { - log.Error(ctx, "unable to start tx: %v", err) - continue - } - - currentLog := buildMessage(msg.signature, msg.m) - if err := s.processLog(ctx, tx, msg.signature, currentLog); err != nil { - log.Error(ctx, "unable to process log: %+v", err) - tx.Rollback() // nolint - continue - } - - if err := tx.Commit(); err != nil { - log.Error(ctx, "unable to commit tx: %+v", err) - tx.Rollback() // nolint - continue - } - - if !more { - return nil - } - } - } + Signature log.Signature + Msg hook.Message } func buildMessage(signature log.Signature, m hook.Message) string { @@ -237,10 +189,6 @@ func buildMessage(signature log.Signature, m hook.Message) string { return logs.Val } -func (s *Service) processLog(ctx context.Context, db gorp.SqlExecutor, signature log.Signature, message string) error { - return workflow.AppendLog(db, signature.JobID, signature.NodeRunID, signature.Worker.StepOrder, message, s.Cfg.Log.StepMaxSize) -} - func (s *Service) handleServiceLog(ctx context.Context, hatcheryID int64, hatcheryName string, workerName string, sig interface{}, m hook.Message) error { var signature log.Signature @@ -324,3 +272,139 @@ func (s *Service) getHatchery(ctx context.Context, hatcheryID int64, hatcheryNam logCache.Set(fmt.Sprintf("hatchery-key-%d", hatcheryID), pk, gocache.DefaultExpiration) return pk, nil } + +func (s *Service) waitingJobs(ctx context.Context) { + tick := time.NewTicker(250 * time.Millisecond) + defer tick.Stop() + for { + select { + case <-ctx.Done(): + return + case _ = <-tick.C: + // List all queues + keyListQueue := cache.Key(keyJobLogQueue, "*") + listKeys, err := s.Cache.Keys(keyListQueue) + if err != nil { + log.Error(ctx, "unable to list jobs queues %s", keyListQueue) + continue + } + + // For each key, check if heartbeat key exist + for _, k := range listKeys { + keyParts := strings.Split(k, ":") + jobID := keyParts[len(keyParts)-1] + + jobQueueKey, err := s.canDequeue(jobID) + if err != nil { + log.Error(ctx, "unable to check canDequeue %s: %v", jobQueueKey, err) + continue + } + if jobQueueKey == "" { + continue + } + + sdk.GoRoutine(ctx, "cdn-dequeue-job-message", func(ctx context.Context) { + if err := s.dequeueJobMessages(ctx, jobQueueKey, jobID); err != nil { + log.Error(ctx, "unable to dequeue redis incoming job queue: %v", err) + } + }) + } + } + } +} + +func (s *Service) dequeueJobMessages(ctx context.Context, jobLogsQueueKey string, jobID string) error { + log.Info(ctx, "Dequeue %s", jobLogsQueueKey) + var t0 = time.Now() + var t1 = time.Now() + var nbMessages int + defer func() { + delta := t1.Sub(t0) + log.Info(ctx, "processLogs[%s] - %d messages received in %v", jobLogsQueueKey, nbMessages, delta) + }() + + defer func() { + // Remove heartbeat + _ = s.Cache.Delete(cache.Key(keyJobHearbeat, jobID)) + }() + + tick := time.NewTicker(5 * time.Second) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-tick.C: + b, err := s.Cache.Exist(jobLogsQueueKey) + if err != nil { + log.Error(ctx, "unable to check if queue still exist: %v", err) + continue + } else if !b { + // leave dequeue if queue does not exist anymore + log.Info(ctx, "leaving job queue %s (queue no more exists)", jobLogsQueueKey) + return nil + } + // heartbeat + heartbeatKey := cache.Key(keyJobHearbeat, jobID) + if err := s.Cache.SetWithTTL(heartbeatKey, true, 30); err != nil { + log.Error(ctx, "unable to hearbeat %s: %v", heartbeatKey, err) + continue + } + default: + dequeuCtx, cancel := context.WithTimeout(ctx, 15*time.Second) + var hm handledMessage + if err := s.Cache.DequeueWithContext(dequeuCtx, jobLogsQueueKey, 30*time.Millisecond, &hm); err != nil { + cancel() + if strings.Contains(err.Error(), "context deadline exceeded") { + return nil + } + log.Error(ctx, "unable to dequeue job logs queue %s: %v", jobLogsQueueKey, err) + continue + } + cancel() + if hm.Signature.Worker == nil { + continue + } + nbMessages++ + t1 = time.Now() + + currentLog := buildMessage(hm.Signature, hm.Msg) + if err := workflow.AppendLog(s.Db, hm.Signature.JobID, hm.Signature.NodeRunID, hm.Signature.Worker.StepOrder, currentLog, s.Cfg.Log.StepMaxSize); err != nil { + log.Error(ctx, "unable to process log: %+v", err) + } + } + } +} + +func (s *Service) canDequeue(jobID string) (string, error) { + jobQueueKey := cache.Key(keyJobLogQueue, jobID) + heatbeatKey := cache.Key(keyJobHearbeat, jobID) + + // Take a lock + lockKey := cache.Key(keyJobLock, jobID) + b, err := s.Cache.Lock(lockKey, 5*time.Second, 0, 1) + if err != nil { + return "", err + } + defer func() { + _ = s.Cache.Unlock(lockKey) + }() + if !b { + return "", nil + } + + exist, err := s.Cache.Exist(heatbeatKey) + if err != nil { + return "", err + } + // if key exist, that mean that someone is already dequeuing + if exist { + return "", nil + } + + //hearbeat + heartbeatKey := cache.Key(keyJobHearbeat, jobID) + if err := s.Cache.SetWithTTL(heartbeatKey, true, 30); err != nil { + return "", err + } + return jobQueueKey, nil +} diff --git a/engine/cdn/cdn_log_test.go b/engine/cdn/cdn_log_test.go index 3348080256..7d36e29f4f 100644 --- a/engine/cdn/cdn_log_test.go +++ b/engine/cdn/cdn_log_test.go @@ -65,16 +65,32 @@ func TestWorkerLog(t *testing.T) { "host": "host", "_line":1, "_pid": 1, "_prefix": "prefix", "full_message": "this is my message", "_Signature": "%s"}` message = fmt.Sprintf(message, signatureField) - chanMessages := s.handleConnectionChannel(context.TODO()) - require.NoError(t, s.handleLogMessage(context.TODO(), chanMessages, []byte(message))) - close(chanMessages) - - time.Sleep(100 * time.Millisecond) + require.NoError(t, s.handleLogMessage(context.TODO(), []byte(message))) + + ctx, cancel := context.WithCancel(context.TODO()) + go s.waitingJobs(ctx) + + cpt := 0 + for { + logs, err := workflow.LoadLogs(s.Db, dbj.ID) + require.NoError(t, err) + + t.Logf("%d: %+v", cpt, logs) + if len(logs) == 0 { + if cpt > 20 { + t.Fail() + break + } + cpt++ + time.Sleep(250 * time.Millisecond) + continue + } + require.Len(t, logs, 1) + require.Equal(t, "[ALERT] this is my message\n", logs[0].Val) + break + } + cancel() - logs, err := workflow.LoadLogs(s.Db, dbj.ID) - require.NoError(t, err) - require.Len(t, logs, 1) - require.Equal(t, "[ALERT] this is my message\n", logs[0].Val) } func TestServiceLog(t *testing.T) { @@ -127,9 +143,7 @@ func TestServiceLog(t *testing.T) { "host": "host", "_line":1, "_pid": 1, "_prefix": "prefix", "full_message": "this is my service message", "_Signature": "%s"}` message = fmt.Sprintf(message, signatureField) - chanMessages := s.handleConnectionChannel(context.TODO()) - require.NoError(t, s.handleLogMessage(context.TODO(), chanMessages, []byte(message))) - close(chanMessages) + require.NoError(t, s.handleLogMessage(context.TODO(), []byte(message))) logs, err := workflow.LoadServiceLog(db, dbj.ID, signature.Service.RequirementName) require.NoError(t, err) diff --git a/engine/hooks/scheduler.go b/engine/hooks/scheduler.go index 3b86c18448..ae73f8f119 100644 --- a/engine/hooks/scheduler.go +++ b/engine/hooks/scheduler.go @@ -262,7 +262,7 @@ func (s *Service) dequeueTaskExecutions(ctx context.Context) error { if ctx.Err() != nil { return ctx.Err() } - if err := s.Cache.DequeueWithContext(ctx, schedulerQueueKey, &taskKey); err != nil { + if err := s.Cache.DequeueWithContext(ctx, schedulerQueueKey, 250*time.Millisecond, &taskKey); err != nil { continue } s.Dao.dequeuedIncr() diff --git a/engine/repositories/processor.go b/engine/repositories/processor.go index b84b38810d..62adc3504b 100644 --- a/engine/repositories/processor.go +++ b/engine/repositories/processor.go @@ -14,7 +14,7 @@ import ( func (s *Service) processor(ctx context.Context) error { for { var uuid string - if err := s.dao.store.DequeueWithContext(ctx, processorKey, &uuid); err != nil { + if err := s.dao.store.DequeueWithContext(ctx, processorKey, 250*time.Millisecond, &uuid); err != nil { log.Error(ctx, "repositories > processor > store.DequeueWithContext err: %v", err) continue }