Skip to content

Commit

Permalink
Cache the replay event in case we get multiple new client connections
Browse files Browse the repository at this point in the history
  • Loading branch information
moshegood committed May 24, 2022
1 parent 59eabfd commit 726cc5c
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 20 deletions.
43 changes: 32 additions & 11 deletions internal/core/streams/stream_provider_server_side.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ type serverSideEnvStreamProvider struct {
type serverSideEnvStreamRepository struct {
store EnvStoreQueries
loggers ldlog.Loggers

mu sync.Mutex
cacheEvent eventsource.Event
eventChecksum string
}

func (s *serverSideStreamProvider) Handler(credential config.SDKCredential) http.HandlerFunc {
Expand Down Expand Up @@ -96,18 +100,35 @@ func (r *serverSideEnvStreamRepository) Replay(channel, id string) chan eventsou

if err != nil {
r.loggers.Errorf("Error getting all flags: %s\n", err.Error())
} else {
segments, err := r.store.GetAll(ldstoreimpl.Segments())
if err != nil {
r.loggers.Errorf("Error getting all segments: %s\n", err.Error())
} else {
allData := []ldstoretypes.Collection{
{Kind: ldstoreimpl.Features(), Items: removeDeleted(flags)},
{Kind: ldstoreimpl.Segments(), Items: removeDeleted(segments)},
}
out <- MakeServerSidePutEvent(allData)
}
return
}
segments, err := r.store.GetAll(ldstoreimpl.Segments())
if err != nil {
r.loggers.Errorf("Error getting all segments: %s\n", err.Error())
return
}

flagsChecksum := hashKeyedItemDescriptors(flags)
segmentsChecksum := hashKeyedItemDescriptors(segments)
eventChecksum := flagsChecksum + segmentsChecksum

r.mu.Lock()
defer r.mu.Unlock()
if r.eventChecksum == eventChecksum {
out <- r.cacheEvent
return
}

allData := []ldstoretypes.Collection{
{Kind: ldstoreimpl.Features(), Items: removeDeleted(flags)},
{Kind: ldstoreimpl.Segments(), Items: removeDeleted(segments)},
}
event := MakeServerSidePutEvent(allData)

r.eventChecksum = eventChecksum
r.cacheEvent = event

out <- event
}()
return out
}
52 changes: 43 additions & 9 deletions internal/core/streams/stream_provider_server_side_flags.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package streams

import (
"crypto/sha256"
"net/http"
"strconv"
"sync"

"github.com/launchdarkly/ld-relay/v6/config"
Expand All @@ -27,6 +29,10 @@ type serverSideFlagsOnlyEnvStreamProvider struct {
type serverSideFlagsOnlyEnvStreamRepository struct {
store EnvStoreQueries
loggers ldlog.Loggers

mu sync.Mutex
cacheEvent eventsource.Event
eventChecksum string
}

func (s *serverSideFlagsOnlyStreamProvider) Handler(credential config.SDKCredential) http.HandlerFunc {
Expand Down Expand Up @@ -91,16 +97,44 @@ func (r *serverSideFlagsOnlyEnvStreamRepository) Replay(channel, id string) chan
}
go func() {
defer close(out)
if r.store.IsInitialized() {
flags, err := r.store.GetAll(ldstoreimpl.Features())

if err != nil {
r.loggers.Errorf("Error getting all flags: %s\n", err.Error())
} else {
out <- MakeServerSideFlagsOnlyPutEvent(
[]ldstoretypes.Collection{{Kind: ldstoreimpl.Features(), Items: removeDeleted(flags)}})
}
if !r.store.IsInitialized() {
return
}
flags, err := r.store.GetAll(ldstoreimpl.Features())

if err != nil {
r.loggers.Errorf("Error getting all flags: %s\n", err.Error())
return
}

checksum := hashKeyedItemDescriptors(flags)
r.mu.Lock()
defer r.mu.Unlock()
if r.eventChecksum == checksum {
out <- r.cacheEvent
return
}

event := MakeServerSideFlagsOnlyPutEvent(
[]ldstoretypes.Collection{{Kind: ldstoreimpl.Features(), Items: removeDeleted(flags)}})
r.eventChecksum = checksum
r.cacheEvent = event

out <- event
}()
return out
}

func hashKeyedItemDescriptors(keyedItems []ldstoretypes.KeyedItemDescriptor) string {
h := sha256.New()
for _, keyedItem := range keyedItems {
key := keyedItem.Key
ver := keyedItem.Item.Version
h.Write([]byte(key))
h.Write([]byte("#")) // something that won't be part of a key
h.Write([]byte(strconv.Itoa(ver)))
h.Write([]byte(";")) // something that won't be part of a version
}
checksum := string(h.Sum(nil))
return checksum
}

0 comments on commit 726cc5c

Please sign in to comment.