Skip to content

Commit

Permalink
Cache the replay event in case we get multiple new client connections (
Browse files Browse the repository at this point in the history
…#189)

* Cache the replay event in case we get multiple new client connections

* Use singleflight to ensure only one replay event is generated at a time

Co-authored-by: Moshe Good <[email protected]>
  • Loading branch information
moshegood and moshegood authored Jun 13, 2022
1 parent 59eabfd commit cf390a7
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 22 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
github.com/stretchr/testify v1.7.0
go.opencensus.io v0.23.0
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/gcfg.v1 v1.2.3
gopkg.in/launchdarkly/go-jsonstream.v1 v1.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4=
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
51 changes: 38 additions & 13 deletions internal/core/streams/stream_provider_server_side.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

"github.com/launchdarkly/ld-relay/v6/config"
"golang.org/x/sync/singleflight"

"github.com/launchdarkly/eventsource"
"gopkg.in/launchdarkly/go-sdk-common.v2/ldlog"
Expand All @@ -27,6 +28,8 @@ type serverSideEnvStreamProvider struct {
type serverSideEnvStreamRepository struct {
store EnvStoreQueries
loggers ldlog.Loggers

flightGroup singleflight.Group
}

func (s *serverSideStreamProvider) Handler(credential config.SDKCredential) http.HandlerFunc {
Expand Down Expand Up @@ -92,22 +95,44 @@ func (r *serverSideEnvStreamRepository) Replay(channel, id string) chan eventsou
}
go func() {
defer close(out)
event, err := r.getReplayEvent(channel, id)
if err != nil {
return
}
out <- event
}()
return out
}

// getReplayEvent will return a ServerSidePutEvent with all the data needed for a Replay.
func (r *serverSideEnvStreamRepository) getReplayEvent(channel, id string) (eventsource.Event, error) {
data, err, _ := r.flightGroup.Do("getReplayEvent", func() (interface{}, error) {
flags, err := r.store.GetAll(ldstoreimpl.Features())

if err != nil {
r.loggers.Errorf("Error getting all flags: %s\n", err.Error())
} else {
segments, err := r.store.GetAll(ldstoreimpl.Segments())
if err != nil {
r.loggers.Errorf("Error getting all segments: %s\n", err.Error())
} else {
allData := []ldstoretypes.Collection{
{Kind: ldstoreimpl.Features(), Items: removeDeleted(flags)},
{Kind: ldstoreimpl.Segments(), Items: removeDeleted(segments)},
}
out <- MakeServerSidePutEvent(allData)
}
return nil, err
}
}()
return out
segments, err := r.store.GetAll(ldstoreimpl.Segments())
if err != nil {
r.loggers.Errorf("Error getting all segments: %s\n", err.Error())
return nil, err
}

allData := []ldstoretypes.Collection{
{Kind: ldstoreimpl.Features(), Items: removeDeleted(flags)},
{Kind: ldstoreimpl.Segments(), Items: removeDeleted(segments)},
}

event := MakeServerSidePutEvent(allData)
return event, nil
})

if err != nil {
return nil, err
}

// panic if it's not an eventsource.Event - as this should be impossible
event := data.(eventsource.Event)
return event, nil
}
43 changes: 34 additions & 9 deletions internal/core/streams/stream_provider_server_side_flags.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package streams

import (
"fmt"
"net/http"
"sync"

"github.com/launchdarkly/ld-relay/v6/config"
"golang.org/x/sync/singleflight"

"github.com/launchdarkly/eventsource"
"gopkg.in/launchdarkly/go-sdk-common.v2/ldlog"
Expand All @@ -27,6 +29,8 @@ type serverSideFlagsOnlyEnvStreamProvider struct {
type serverSideFlagsOnlyEnvStreamRepository struct {
store EnvStoreQueries
loggers ldlog.Loggers

flightGroup singleflight.Group
}

func (s *serverSideFlagsOnlyStreamProvider) Handler(credential config.SDKCredential) http.HandlerFunc {
Expand Down Expand Up @@ -91,16 +95,37 @@ func (r *serverSideFlagsOnlyEnvStreamRepository) Replay(channel, id string) chan
}
go func() {
defer close(out)
if r.store.IsInitialized() {
flags, err := r.store.GetAll(ldstoreimpl.Features())

if err != nil {
r.loggers.Errorf("Error getting all flags: %s\n", err.Error())
} else {
out <- MakeServerSideFlagsOnlyPutEvent(
[]ldstoretypes.Collection{{Kind: ldstoreimpl.Features(), Items: removeDeleted(flags)}})
}
event, err := r.getReplayEvent(channel, id)
if err != nil {
return
}
out <- event
}()
return out
}

func (r *serverSideFlagsOnlyEnvStreamRepository) getReplayEvent(channel, id string) (eventsource.Event, error) {
data, err, _ := r.flightGroup.Do("getReplayEvent", func() (interface{}, error) {
if !r.store.IsInitialized() {
return nil, fmt.Errorf("cannot replay events as store is not initialized.")
}
flags, err := r.store.GetAll(ldstoreimpl.Features())

if err != nil {
r.loggers.Errorf("Error getting all flags: %s\n", err.Error())
return nil, err
}

event := MakeServerSideFlagsOnlyPutEvent(
[]ldstoretypes.Collection{{Kind: ldstoreimpl.Features(), Items: removeDeleted(flags)}})
return event, nil
})

if err != nil {
return nil, err
}

// panic if it's not an eventsource.Event - as this should be impossible
event := data.(eventsource.Event)
return event, nil
}
82 changes: 82 additions & 0 deletions internal/core/streams/stream_provider_server_side_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"
"time"

"github.com/launchdarkly/eventsource"
"github.com/launchdarkly/ld-relay/v6/internal/basictypes"
"github.com/launchdarkly/ld-relay/v6/internal/core/sharedtest"

Expand Down Expand Up @@ -202,4 +203,85 @@ func TestStreamProviderServerSide(t *testing.T) {
verifyHandlerHeartbeat(t, sp, esp, validCredential)
})
})

t.Run("Replay", func(t *testing.T) {
store := makeMockStore(nil, nil)
store.latency = 100 * time.Millisecond

repo := &serverSideEnvStreamRepository{store: store, loggers: ldlog.NewDisabledLoggers()}

newData := []ldstoretypes.Collection{
{Kind: ldstoreimpl.Features(), Items: store.flags},
{Kind: ldstoreimpl.Segments(), Items: store.segments},
}
expected := MakeServerSidePutEvent(newData)
expectedData := expected.Data()
t.Run("ConsecutiveCalls", func(t *testing.T) {
eventChannel := repo.Replay("", "")
select {
case actual := <-eventChannel:
if actual == nil {
t.Fatal("Expected an event from Replay")
}
if expectedData != actual.Data() {
t.Errorf("Replay() = %v; wanted %v", actual.Data(), expectedData)
}
case <-time.After(time.Second):
t.Error("expected an event to be returned")
}
eventChannel = repo.Replay("", "")
select {
case actual := <-eventChannel:
if actual == nil {
t.Fatal("Expected an event from Replay")
}
if expectedData != actual.Data() {
t.Errorf("Replay() = %v; wanted %v", actual.Data(), expectedData)
}
case <-time.After(time.Second):
t.Error("expected an event to be returned")
}

})
t.Run("ConcurrentCalls", func(t *testing.T) {
eventChannel1 := repo.Replay("", "")
eventChannel2 := repo.Replay("", "")
var event1, event2 eventsource.Event
var events int

for events < 2 {
select {
case event, ok := <-eventChannel1:
if !ok {
break
}
if event == nil {
t.Fatal("Expected an event from Replay")
}
if expectedData != event.Data() {
t.Errorf("Replay() = %v; wanted %v", event.Data(), expectedData)
}
event1 = event
events++
case event, ok := <-eventChannel2:
if !ok {
break
}
if event == nil {
t.Fatal("Expected an event from Replay")
}
if expectedData != event.Data() {
t.Errorf("Replay() = %v; wanted %v", event.Data(), expectedData)
}
event2 = event
events++
case <-time.After(time.Second):
t.Error("expected an event to be returned")
}
}
if event1 != event2 {
t.Error("Expected the same exact event to be returned from the flightgroup")
}
})
})
}
5 changes: 5 additions & 0 deletions internal/core/streams/stream_testdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package streams

import (
"errors"
"time"

"github.com/launchdarkly/ld-relay/v6/config"
"github.com/launchdarkly/ld-relay/v6/internal/core/sharedtest"
Expand Down Expand Up @@ -49,13 +50,17 @@ type mockStoreQueries struct {
fakeSegmentsError error
flags []ldstoretypes.KeyedItemDescriptor
segments []ldstoretypes.KeyedItemDescriptor
latency time.Duration
}

func (q mockStoreQueries) IsInitialized() bool {
return q.initialized
}

func (q mockStoreQueries) GetAll(kind ldstoretypes.DataKind) ([]ldstoretypes.KeyedItemDescriptor, error) {
if q.latency > 0 {
time.Sleep(q.latency)
}
switch kind {
case ldstoreimpl.Features():
return q.flags, q.fakeFlagsError
Expand Down

0 comments on commit cf390a7

Please sign in to comment.