Skip to content

Commit

Permalink
fix(api): do not loose logs when api restart (#5298)
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux authored Jul 6, 2020
1 parent 1c64809 commit 10e3e4b
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 95 deletions.
4 changes: 3 additions & 1 deletion engine/api/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@ func Key(args ...string) string {

//Store is an interface
type Store interface {
Keys(pattern string) ([]string, error)
Get(key string, value interface{}) (bool, error)
Set(key string, value interface{}) error
SetWithTTL(key string, value interface{}, ttl int) error
SetWithDuration(key string, value interface{}, duration time.Duration) error
UpdateTTL(key string, ttl int) error
Delete(key string) error
DeleteAll(key string) error
Exist(key string) (bool, error)
Enqueue(queueName string, value interface{}) error
DequeueWithContext(c context.Context, queueName string, value interface{}) error
DequeueWithContext(c context.Context, queueName string, waitDuration time.Duration, value interface{}) error
QueueLen(queueName string) (int, error)
RemoveFromQueue(queueName string, memberKey string) error
Publish(ctx context.Context, queueName string, value interface{}) error
Expand Down
28 changes: 26 additions & 2 deletions engine/api/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ func NewRedisStore(host, password string, ttl int) (*RedisStore, error) {
}, nil
}

// Keys List keys from pattern
func (s *RedisStore) Keys(pattern string) ([]string, error) {
if s.Client == nil {
return nil, sdk.WithStack(fmt.Errorf("redis> cannot get redis client"))
}
keys, err := s.Client.Keys(pattern).Result()
if err != nil {
return nil, sdk.WrapError(err, "redis> cannot list keys: %s", pattern)
}
return keys, nil
}

// Get a key from redis
func (s *RedisStore) Get(key string, value interface{}) (bool, error) {
if s.Client == nil {
Expand Down Expand Up @@ -173,6 +185,18 @@ func (s *RedisStore) DeleteAll(pattern string) error {
return nil
}

// Exist test is key exists
func (s *RedisStore) Exist(key string) (bool, error) {
if s.Client == nil {
return false, sdk.WithStack(fmt.Errorf("redis> cannot get redis client"))
}
ok, err := s.Client.Exists(key).Result()
if err != nil {
return false, sdk.WrapError(err, "unable to test if key %s exists", key)
}
return ok == 1, nil
}

// Enqueue pushes to queue
func (s *RedisStore) Enqueue(queueName string, value interface{}) error {
if s.Client == nil {
Expand Down Expand Up @@ -203,13 +227,13 @@ func (s *RedisStore) QueueLen(queueName string) (int, error) {
}

// DequeueWithContext gets from queue This is blocking while there is nothing in the queue, it can be cancelled with a context.Context
func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, value interface{}) error {
func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, waitDuration time.Duration, value interface{}) error {
if s.Client == nil {
return sdk.WithStack(fmt.Errorf("redis> cannot get redis client"))
}

var elem string
ticker := time.NewTicker(250 * time.Millisecond).C
ticker := time.NewTicker(waitDuration).C
for elem == "" {
select {
case <-ticker:
Expand Down
2 changes: 1 addition & 1 deletion engine/api/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func Subscribe(ch chan<- sdk.Event) {
func DequeueEvent(ctx context.Context, db *gorp.DbMap) {
for {
e := sdk.Event{}
if err := store.DequeueWithContext(ctx, "events", &e); err != nil {
if err := store.DequeueWithContext(ctx, "events", 250*time.Millisecond, &e); err != nil {
log.Error(ctx, "Event.DequeueEvent> store.DequeueWithContext err: %v", err)
continue
}
Expand Down
3 changes: 2 additions & 1 deletion engine/api/repositoriesmanager/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/go-gorp/gorp"
"github.com/ovh/cds/engine/api/cache"
Expand All @@ -15,7 +16,7 @@ import (
func ReceiveEvents(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Store) {
for {
e := sdk.Event{}
if err := store.DequeueWithContext(ctx, "events_repositoriesmanager", &e); err != nil {
if err := store.DequeueWithContext(ctx, "events_repositoriesmanager", 250*time.Millisecond, &e); err != nil {
log.Error(ctx, "repositoriesmanager.ReceiveEvents > store.DequeueWithContext err: %v", err)
continue
}
Expand Down
Loading

0 comments on commit 10e3e4b

Please sign in to comment.