diff --git a/internal/core/streams/stream_provider_server_side.go b/internal/core/streams/stream_provider_server_side.go index 8db6e4d9..33e4ae48 100644 --- a/internal/core/streams/stream_provider_server_side.go +++ b/internal/core/streams/stream_provider_server_side.go @@ -27,6 +27,10 @@ type serverSideEnvStreamProvider struct { type serverSideEnvStreamRepository struct { store EnvStoreQueries loggers ldlog.Loggers + + mu sync.Mutex + cacheEvent eventsource.Event + eventChecksum string } func (s *serverSideStreamProvider) Handler(credential config.SDKCredential) http.HandlerFunc { @@ -96,18 +100,35 @@ func (r *serverSideEnvStreamRepository) Replay(channel, id string) chan eventsou if err != nil { r.loggers.Errorf("Error getting all flags: %s\n", err.Error()) - } else { - segments, err := r.store.GetAll(ldstoreimpl.Segments()) - if err != nil { - r.loggers.Errorf("Error getting all segments: %s\n", err.Error()) - } else { - allData := []ldstoretypes.Collection{ - {Kind: ldstoreimpl.Features(), Items: removeDeleted(flags)}, - {Kind: ldstoreimpl.Segments(), Items: removeDeleted(segments)}, - } - out <- MakeServerSidePutEvent(allData) - } + return + } + segments, err := r.store.GetAll(ldstoreimpl.Segments()) + if err != nil { + r.loggers.Errorf("Error getting all segments: %s\n", err.Error()) + return + } + + flagsChecksum := hashKeyedItemDescriptors(flags) + segmentsChecksum := hashKeyedItemDescriptors(segments) + eventChecksum := flagsChecksum + segmentsChecksum + + r.mu.Lock() + defer r.mu.Unlock() + if r.eventChecksum == eventChecksum { + out <- r.cacheEvent + return + } + + allData := []ldstoretypes.Collection{ + {Kind: ldstoreimpl.Features(), Items: removeDeleted(flags)}, + {Kind: ldstoreimpl.Segments(), Items: removeDeleted(segments)}, } + event := MakeServerSidePutEvent(allData) + + r.eventChecksum = eventChecksum + r.cacheEvent = event + + out <- event }() return out } diff --git a/internal/core/streams/stream_provider_server_side_flags.go b/internal/core/streams/stream_provider_server_side_flags.go index 162bc958..a97eec53 100644 --- a/internal/core/streams/stream_provider_server_side_flags.go +++ b/internal/core/streams/stream_provider_server_side_flags.go @@ -1,7 +1,9 @@ package streams import ( + "crypto/sha256" "net/http" + "strconv" "sync" "github.com/launchdarkly/ld-relay/v6/config" @@ -27,6 +29,10 @@ type serverSideFlagsOnlyEnvStreamProvider struct { type serverSideFlagsOnlyEnvStreamRepository struct { store EnvStoreQueries loggers ldlog.Loggers + + mu sync.Mutex + cacheEvent eventsource.Event + eventChecksum string } func (s *serverSideFlagsOnlyStreamProvider) Handler(credential config.SDKCredential) http.HandlerFunc { @@ -91,16 +97,44 @@ func (r *serverSideFlagsOnlyEnvStreamRepository) Replay(channel, id string) chan } go func() { defer close(out) - if r.store.IsInitialized() { - flags, err := r.store.GetAll(ldstoreimpl.Features()) - - if err != nil { - r.loggers.Errorf("Error getting all flags: %s\n", err.Error()) - } else { - out <- MakeServerSideFlagsOnlyPutEvent( - []ldstoretypes.Collection{{Kind: ldstoreimpl.Features(), Items: removeDeleted(flags)}}) - } + if !r.store.IsInitialized() { + return + } + flags, err := r.store.GetAll(ldstoreimpl.Features()) + + if err != nil { + r.loggers.Errorf("Error getting all flags: %s\n", err.Error()) + return + } + + checksum := hashKeyedItemDescriptors(flags) + r.mu.Lock() + defer r.mu.Unlock() + if r.eventChecksum == checksum { + out <- r.cacheEvent + return } + + event := MakeServerSideFlagsOnlyPutEvent( + []ldstoretypes.Collection{{Kind: ldstoreimpl.Features(), Items: removeDeleted(flags)}}) + r.eventChecksum = checksum + r.cacheEvent = event + + out <- event }() return out } + +func hashKeyedItemDescriptors(keyedItems []ldstoretypes.KeyedItemDescriptor) string { + h := sha256.New() + for _, keyedItem := range keyedItems { + key := keyedItem.Key + ver := keyedItem.Item.Version + h.Write([]byte(key)) + h.Write([]byte("#")) // something that won't be part of a key + h.Write([]byte(strconv.Itoa(ver))) + h.Write([]byte(";")) // something that won't be part of a version + } + checksum := string(h.Sum(nil)) + return checksum +}