Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache the replay event in case we get multiple new client connections #189

Merged
merged 2 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
github.com/stretchr/testify v1.7.0
go.opencensus.io v0.23.0
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/gcfg.v1 v1.2.3
gopkg.in/launchdarkly/go-jsonstream.v1 v1.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4=
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
51 changes: 38 additions & 13 deletions internal/core/streams/stream_provider_server_side.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

"github.com/launchdarkly/ld-relay/v6/config"
"golang.org/x/sync/singleflight"

"github.com/launchdarkly/eventsource"
"gopkg.in/launchdarkly/go-sdk-common.v2/ldlog"
Expand All @@ -27,6 +28,8 @@ type serverSideEnvStreamProvider struct {
type serverSideEnvStreamRepository struct {
store EnvStoreQueries
loggers ldlog.Loggers

flightGroup singleflight.Group
}

func (s *serverSideStreamProvider) Handler(credential config.SDKCredential) http.HandlerFunc {
Expand Down Expand Up @@ -92,22 +95,44 @@ func (r *serverSideEnvStreamRepository) Replay(channel, id string) chan eventsou
}
go func() {
defer close(out)
event, err := r.getReplayEvent(channel, id)
if err != nil {
return
}
out <- event
}()
return out
}

// getReplayEvent will return a ServerSidePutEvent with all the data needed for a Replay.
func (r *serverSideEnvStreamRepository) getReplayEvent(channel, id string) (eventsource.Event, error) {
data, err, _ := r.flightGroup.Do("getReplayEvent", func() (interface{}, error) {
flags, err := r.store.GetAll(ldstoreimpl.Features())

if err != nil {
r.loggers.Errorf("Error getting all flags: %s\n", err.Error())
} else {
segments, err := r.store.GetAll(ldstoreimpl.Segments())
if err != nil {
r.loggers.Errorf("Error getting all segments: %s\n", err.Error())
} else {
allData := []ldstoretypes.Collection{
{Kind: ldstoreimpl.Features(), Items: removeDeleted(flags)},
{Kind: ldstoreimpl.Segments(), Items: removeDeleted(segments)},
}
out <- MakeServerSidePutEvent(allData)
}
return nil, err
}
}()
return out
segments, err := r.store.GetAll(ldstoreimpl.Segments())
if err != nil {
r.loggers.Errorf("Error getting all segments: %s\n", err.Error())
return nil, err
}

allData := []ldstoretypes.Collection{
{Kind: ldstoreimpl.Features(), Items: removeDeleted(flags)},
{Kind: ldstoreimpl.Segments(), Items: removeDeleted(segments)},
}

event := MakeServerSidePutEvent(allData)
return event, nil
})

if err != nil {
return nil, err
}

// panic if it's not an eventsource.Event - as this should be impossible
event := data.(eventsource.Event)
return event, nil
}
43 changes: 34 additions & 9 deletions internal/core/streams/stream_provider_server_side_flags.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package streams

import (
"fmt"
"net/http"
"sync"

"github.com/launchdarkly/ld-relay/v6/config"
"golang.org/x/sync/singleflight"

"github.com/launchdarkly/eventsource"
"gopkg.in/launchdarkly/go-sdk-common.v2/ldlog"
Expand All @@ -27,6 +29,8 @@ type serverSideFlagsOnlyEnvStreamProvider struct {
type serverSideFlagsOnlyEnvStreamRepository struct {
store EnvStoreQueries
loggers ldlog.Loggers

flightGroup singleflight.Group
}

func (s *serverSideFlagsOnlyStreamProvider) Handler(credential config.SDKCredential) http.HandlerFunc {
Expand Down Expand Up @@ -91,16 +95,37 @@ func (r *serverSideFlagsOnlyEnvStreamRepository) Replay(channel, id string) chan
}
go func() {
defer close(out)
if r.store.IsInitialized() {
flags, err := r.store.GetAll(ldstoreimpl.Features())

if err != nil {
r.loggers.Errorf("Error getting all flags: %s\n", err.Error())
} else {
out <- MakeServerSideFlagsOnlyPutEvent(
[]ldstoretypes.Collection{{Kind: ldstoreimpl.Features(), Items: removeDeleted(flags)}})
}
event, err := r.getReplayEvent(channel, id)
if err != nil {
return
}
out <- event
}()
return out
}

func (r *serverSideFlagsOnlyEnvStreamRepository) getReplayEvent(channel, id string) (eventsource.Event, error) {
data, err, _ := r.flightGroup.Do("getReplayEvent", func() (interface{}, error) {
if !r.store.IsInitialized() {
return nil, fmt.Errorf("cannot replay events as store is not initialized.")
}
flags, err := r.store.GetAll(ldstoreimpl.Features())

if err != nil {
r.loggers.Errorf("Error getting all flags: %s\n", err.Error())
return nil, err
}

event := MakeServerSideFlagsOnlyPutEvent(
[]ldstoretypes.Collection{{Kind: ldstoreimpl.Features(), Items: removeDeleted(flags)}})
return event, nil
})

if err != nil {
return nil, err
}

// panic if it's not an eventsource.Event - as this should be impossible
event := data.(eventsource.Event)
return event, nil
}
82 changes: 82 additions & 0 deletions internal/core/streams/stream_provider_server_side_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"
"time"

"github.com/launchdarkly/eventsource"
"github.com/launchdarkly/ld-relay/v6/internal/basictypes"
"github.com/launchdarkly/ld-relay/v6/internal/core/sharedtest"

Expand Down Expand Up @@ -202,4 +203,85 @@ func TestStreamProviderServerSide(t *testing.T) {
verifyHandlerHeartbeat(t, sp, esp, validCredential)
})
})

t.Run("Replay", func(t *testing.T) {
store := makeMockStore(nil, nil)
store.latency = 100 * time.Millisecond

repo := &serverSideEnvStreamRepository{store: store, loggers: ldlog.NewDisabledLoggers()}

newData := []ldstoretypes.Collection{
{Kind: ldstoreimpl.Features(), Items: store.flags},
{Kind: ldstoreimpl.Segments(), Items: store.segments},
}
expected := MakeServerSidePutEvent(newData)
expectedData := expected.Data()
t.Run("ConsecutiveCalls", func(t *testing.T) {
eventChannel := repo.Replay("", "")
select {
case actual := <-eventChannel:
if actual == nil {
t.Fatal("Expected an event from Replay")
}
if expectedData != actual.Data() {
t.Errorf("Replay() = %v; wanted %v", actual.Data(), expectedData)
}
case <-time.After(time.Second):
t.Error("expected an event to be returned")
}
eventChannel = repo.Replay("", "")
select {
case actual := <-eventChannel:
if actual == nil {
t.Fatal("Expected an event from Replay")
}
if expectedData != actual.Data() {
t.Errorf("Replay() = %v; wanted %v", actual.Data(), expectedData)
}
case <-time.After(time.Second):
t.Error("expected an event to be returned")
}

})
t.Run("ConcurrentCalls", func(t *testing.T) {
Copy link
Contributor

@eli-darkly eli-darkly Jun 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As part of the unit test polishing work I'm doing, I'm changing how this one works because I think there's a logic error in it.

The test on line 282 does not, I think, actually verify that the computation was not done twice: event1 != event2 is comparing two interface values, and in Go that is not a reference equality test (as in, "are they literally the same instance of the type"), it is a test of whether they have the same concrete types and whether their values as those concrete types are equal. In other words, if a and b are separately created instances of some struct type and their fields have the same values, and you cast them both to some interface type, the resulting interface values are equal. And since the data store in this case returns the same flag/segment data every time it is queried (the same empty data, since it was created with flags and segments of nil), this test can't possibly fail. So I'm implementing this differently, using a store that is instrumented to return different data for each query.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, my previous comment was not quite right. The test as written does work, in that if I remove the actual fix and run the new test, the test does fail— it detects inequality of the events on line 282. However, that is because of an implementation detail that I'd rather not rely on: the use of the streams.deferredEvent type, which memoizes the value of the event. That forces Go to treat the equality test as reference equality because the type contains things that can't be compared by value. But if the concrete types of event1 and event2 were a simpler struct type that just had fields for the event name and data, then line 282 would not work. I'd rather have this test be independent of such implementation details that aren't directly relevant to the logic we're testing, so I still think it's best to use an instrumented store as I described.

eventChannel1 := repo.Replay("", "")
eventChannel2 := repo.Replay("", "")
var event1, event2 eventsource.Event
var events int

for events < 2 {
select {
case event, ok := <-eventChannel1:
if !ok {
break
}
if event == nil {
t.Fatal("Expected an event from Replay")
}
if expectedData != event.Data() {
t.Errorf("Replay() = %v; wanted %v", event.Data(), expectedData)
}
event1 = event
events++
case event, ok := <-eventChannel2:
if !ok {
break
}
if event == nil {
t.Fatal("Expected an event from Replay")
}
if expectedData != event.Data() {
t.Errorf("Replay() = %v; wanted %v", event.Data(), expectedData)
}
event2 = event
events++
case <-time.After(time.Second):
t.Error("expected an event to be returned")
}
}
if event1 != event2 {
t.Error("Expected the same exact event to be returned from the flightgroup")
}
})
})
}
5 changes: 5 additions & 0 deletions internal/core/streams/stream_testdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package streams

import (
"errors"
"time"

"github.com/launchdarkly/ld-relay/v6/config"
"github.com/launchdarkly/ld-relay/v6/internal/core/sharedtest"
Expand Down Expand Up @@ -49,13 +50,17 @@ type mockStoreQueries struct {
fakeSegmentsError error
flags []ldstoretypes.KeyedItemDescriptor
segments []ldstoretypes.KeyedItemDescriptor
latency time.Duration
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this for testing concurrent calls to Replay - to ensure we have both calls use the same flightgroup.

}

func (q mockStoreQueries) IsInitialized() bool {
return q.initialized
}

func (q mockStoreQueries) GetAll(kind ldstoretypes.DataKind) ([]ldstoretypes.KeyedItemDescriptor, error) {
if q.latency > 0 {
time.Sleep(q.latency)
}
switch kind {
case ldstoreimpl.Features():
return q.flags, q.fakeFlagsError
Expand Down