Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(api): do not loose logs when api restart #5298

Merged
merged 8 commits into from
Jul 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion engine/api/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@ func Key(args ...string) string {

//Store is an interface
type Store interface {
Keys(pattern string) ([]string, error)
Get(key string, value interface{}) (bool, error)
Set(key string, value interface{}) error
SetWithTTL(key string, value interface{}, ttl int) error
SetWithDuration(key string, value interface{}, duration time.Duration) error
UpdateTTL(key string, ttl int) error
Delete(key string) error
DeleteAll(key string) error
Exist(key string) (bool, error)
Enqueue(queueName string, value interface{}) error
DequeueWithContext(c context.Context, queueName string, value interface{}) error
DequeueWithContext(c context.Context, queueName string, waitDuration time.Duration, value interface{}) error
QueueLen(queueName string) (int, error)
RemoveFromQueue(queueName string, memberKey string) error
Publish(ctx context.Context, queueName string, value interface{}) error
Expand Down
28 changes: 26 additions & 2 deletions engine/api/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ func NewRedisStore(host, password string, ttl int) (*RedisStore, error) {
}, nil
}

// Keys List keys from pattern
func (s *RedisStore) Keys(pattern string) ([]string, error) {
if s.Client == nil {
return nil, sdk.WithStack(fmt.Errorf("redis> cannot get redis client"))
}
keys, err := s.Client.Keys(pattern).Result()
if err != nil {
return nil, sdk.WrapError(err, "redis> cannot list keys: %s", pattern)
}
return keys, nil
}

// Get a key from redis
func (s *RedisStore) Get(key string, value interface{}) (bool, error) {
if s.Client == nil {
Expand Down Expand Up @@ -173,6 +185,18 @@ func (s *RedisStore) DeleteAll(pattern string) error {
return nil
}

// Exist test is key exists
func (s *RedisStore) Exist(key string) (bool, error) {
if s.Client == nil {
return false, sdk.WithStack(fmt.Errorf("redis> cannot get redis client"))
}
ok, err := s.Client.Exists(key).Result()
if err != nil {
return false, sdk.WrapError(err, "unable to test if key %s exists", key)
}
return ok == 1, nil
}

// Enqueue pushes to queue
func (s *RedisStore) Enqueue(queueName string, value interface{}) error {
if s.Client == nil {
Expand Down Expand Up @@ -203,13 +227,13 @@ func (s *RedisStore) QueueLen(queueName string) (int, error) {
}

// DequeueWithContext gets from queue This is blocking while there is nothing in the queue, it can be cancelled with a context.Context
func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, value interface{}) error {
func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, waitDuration time.Duration, value interface{}) error {
if s.Client == nil {
return sdk.WithStack(fmt.Errorf("redis> cannot get redis client"))
}

var elem string
ticker := time.NewTicker(250 * time.Millisecond).C
ticker := time.NewTicker(waitDuration).C
for elem == "" {
select {
case <-ticker:
Expand Down
2 changes: 1 addition & 1 deletion engine/api/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func Subscribe(ch chan<- sdk.Event) {
func DequeueEvent(ctx context.Context, db *gorp.DbMap) {
for {
e := sdk.Event{}
if err := store.DequeueWithContext(ctx, "events", &e); err != nil {
if err := store.DequeueWithContext(ctx, "events", 250*time.Millisecond, &e); err != nil {
log.Error(ctx, "Event.DequeueEvent> store.DequeueWithContext err: %v", err)
continue
}
Expand Down
3 changes: 2 additions & 1 deletion engine/api/repositoriesmanager/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/go-gorp/gorp"
"github.com/ovh/cds/engine/api/cache"
Expand All @@ -15,7 +16,7 @@ import (
func ReceiveEvents(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Store) {
for {
e := sdk.Event{}
if err := store.DequeueWithContext(ctx, "events_repositoriesmanager", &e); err != nil {
if err := store.DequeueWithContext(ctx, "events_repositoriesmanager", 250*time.Millisecond, &e); err != nil {
log.Error(ctx, "repositoriesmanager.ReceiveEvents > store.DequeueWithContext err: %v", err)
continue
}
Expand Down
Loading