From 9ba04b0cd451b4d6463389a216c678c4965f79e3 Mon Sep 17 00:00:00 2001 From: Moshe Good Date: Fri, 20 May 2022 13:13:46 -0400 Subject: [PATCH] Cache the replay event in case we get multiple new client connections --- .../streams/stream_provider_server_side.go | 43 +++++++++---- .../stream_provider_server_side_flags.go | 64 ++++++++++++++++--- 2 files changed, 87 insertions(+), 20 deletions(-) diff --git a/internal/core/streams/stream_provider_server_side.go b/internal/core/streams/stream_provider_server_side.go index 8db6e4d9..33e4ae48 100644 --- a/internal/core/streams/stream_provider_server_side.go +++ b/internal/core/streams/stream_provider_server_side.go @@ -27,6 +27,10 @@ type serverSideEnvStreamProvider struct { type serverSideEnvStreamRepository struct { store EnvStoreQueries loggers ldlog.Loggers + + mu sync.Mutex + cacheEvent eventsource.Event + eventChecksum string } func (s *serverSideStreamProvider) Handler(credential config.SDKCredential) http.HandlerFunc { @@ -96,18 +100,35 @@ func (r *serverSideEnvStreamRepository) Replay(channel, id string) chan eventsou if err != nil { r.loggers.Errorf("Error getting all flags: %s\n", err.Error()) - } else { - segments, err := r.store.GetAll(ldstoreimpl.Segments()) - if err != nil { - r.loggers.Errorf("Error getting all segments: %s\n", err.Error()) - } else { - allData := []ldstoretypes.Collection{ - {Kind: ldstoreimpl.Features(), Items: removeDeleted(flags)}, - {Kind: ldstoreimpl.Segments(), Items: removeDeleted(segments)}, - } - out <- MakeServerSidePutEvent(allData) - } + return + } + segments, err := r.store.GetAll(ldstoreimpl.Segments()) + if err != nil { + r.loggers.Errorf("Error getting all segments: %s\n", err.Error()) + return + } + + flagsChecksum := hashKeyedItemDescriptors(flags) + segmentsChecksum := hashKeyedItemDescriptors(segments) + eventChecksum := flagsChecksum + segmentsChecksum + + r.mu.Lock() + defer r.mu.Unlock() + if r.eventChecksum == eventChecksum { + out <- r.cacheEvent + return + } + + allData := []ldstoretypes.Collection{ + {Kind: ldstoreimpl.Features(), Items: removeDeleted(flags)}, + {Kind: ldstoreimpl.Segments(), Items: removeDeleted(segments)}, } + event := MakeServerSidePutEvent(allData) + + r.eventChecksum = eventChecksum + r.cacheEvent = event + + out <- event }() return out } diff --git a/internal/core/streams/stream_provider_server_side_flags.go b/internal/core/streams/stream_provider_server_side_flags.go index 162bc958..dab0b1d7 100644 --- a/internal/core/streams/stream_provider_server_side_flags.go +++ b/internal/core/streams/stream_provider_server_side_flags.go @@ -1,7 +1,10 @@ package streams import ( + "crypto/md5" "net/http" + "sort" + "strconv" "sync" "github.com/launchdarkly/ld-relay/v6/config" @@ -27,6 +30,10 @@ type serverSideFlagsOnlyEnvStreamProvider struct { type serverSideFlagsOnlyEnvStreamRepository struct { store EnvStoreQueries loggers ldlog.Loggers + + mu sync.Mutex + cacheEvent eventsource.Event + eventChecksum string } func (s *serverSideFlagsOnlyStreamProvider) Handler(credential config.SDKCredential) http.HandlerFunc { @@ -91,16 +98,55 @@ func (r *serverSideFlagsOnlyEnvStreamRepository) Replay(channel, id string) chan } go func() { defer close(out) - if r.store.IsInitialized() { - flags, err := r.store.GetAll(ldstoreimpl.Features()) - - if err != nil { - r.loggers.Errorf("Error getting all flags: %s\n", err.Error()) - } else { - out <- MakeServerSideFlagsOnlyPutEvent( - []ldstoretypes.Collection{{Kind: ldstoreimpl.Features(), Items: removeDeleted(flags)}}) - } + if !r.store.IsInitialized() { + return + } + flags, err := r.store.GetAll(ldstoreimpl.Features()) + + if err != nil { + r.loggers.Errorf("Error getting all flags: %s\n", err.Error()) + return + } + + checksum := hashKeyedItemDescriptors(flags) + r.mu.Lock() + defer r.mu.Unlock() + if r.eventChecksum == checksum { + out <- r.cacheEvent + return } + + event := MakeServerSideFlagsOnlyPutEvent( + []ldstoretypes.Collection{{Kind: ldstoreimpl.Features(), Items: removeDeleted(flags)}}) + r.eventChecksum = checksum + r.cacheEvent = event + + out <- event }() return out } + +type keyVersion struct { + key string + version int +} + +func hashKeyedItemDescriptors(keyedItems []ldstoretypes.KeyedItemDescriptor) string { + kvs := make([]keyVersion, len(keyedItems)) + for i, ki := range keyedItems { + kvs[i] = keyVersion{ki.Key, ki.Item.Version} + } + + // We sort because the order of the data we get may not be consistent. + sort.Slice(kvs, func(a, b int) bool { return kvs[a].key < kvs[b].key }) + + h := md5.New() + for _, kv := range kvs { + h.Write([]byte(kv.key)) + h.Write([]byte("#")) // something that won't be part of a key + h.Write([]byte(strconv.Itoa(kv.version))) + h.Write([]byte(";")) // something that won't be part of a version + } + checksum := string(h.Sum(nil)) + return checksum +}