Skip to content

Commit

Permalink
Merge pull request #264 from launchdarkly/eb/ch115321/smart-sync-start
Browse files Browse the repository at this point in the history
(#3) only start big seg synchronizer if necessary
  • Loading branch information
eli-darkly authored Jul 21, 2021
2 parents 61e4965 + 9c3934a commit d9361bd
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 7 deletions.
11 changes: 9 additions & 2 deletions internal/core/bigsegments/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ type BigSegmentSynchronizer interface {
// Start begins synchronization of an environment.
//
// This method does not block.
//
// If the BigSegmentSynchronizer has already been started, or has been closed, this has no effect.
Start()

// Close ends synchronization of an evironment.
// Close ends synchronization of an environment.
//
// This method does not block.
//
// The BigSegmentSynchronizer cannot be restarted after calling Close.
Close()
}

Expand All @@ -60,6 +64,7 @@ type defaultBigSegmentSynchronizer struct {
envID config.EnvironmentID
sdkKey config.SDKKey
streamRetryInterval time.Duration
startOnce sync.Once
closeChan chan struct{}
closeOnce sync.Once
loggers ldlog.Loggers
Expand Down Expand Up @@ -113,7 +118,9 @@ func (m httpStatusError) Error() string {
}

func (s *defaultBigSegmentSynchronizer) Start() {
go s.syncSupervisor()
s.startOnce.Do(func() {
go s.syncSupervisor()
})
}

func (s *defaultBigSegmentSynchronizer) Close() {
Expand Down
9 changes: 9 additions & 0 deletions internal/core/internal/store/relay_feature_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ func (a *SSERelayDataStoreAdapter) GetStore() interfaces.DataStore {
return store
}

// GetUpdates returns the EnvStreamUpdates that will receive all updates sent to this store. This is
// exposed for testing so that we can simulate receiving updates from LaunchDarkly to this component.
func (a *SSERelayDataStoreAdapter) GetUpdates() streams.EnvStreamUpdates {
a.mu.RLock()
updates := a.updates
a.mu.RUnlock()
return updates
}

// NewSSERelayDataStoreAdapter creates a new instance where the store has not yet been created.
func NewSSERelayDataStoreAdapter(
wrappedFactory interfaces.DataStoreFactory,
Expand Down
57 changes: 55 additions & 2 deletions internal/core/relayenv/env_context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"gopkg.in/launchdarkly/go-sdk-common.v2/ldlog"
ldeval "gopkg.in/launchdarkly/go-server-sdk-evaluation.v1"
"gopkg.in/launchdarkly/go-server-sdk-evaluation.v1/ldmodel"
ld "gopkg.in/launchdarkly/go-server-sdk.v5"
"gopkg.in/launchdarkly/go-server-sdk.v5/interfaces"
"gopkg.in/launchdarkly/go-server-sdk.v5/interfaces/ldstoretypes"
Expand Down Expand Up @@ -105,6 +106,12 @@ type envContextStoreQueries struct {
context *envContextImpl
}

// Implementation of the EnvStreamUpdates interface that intercepts all updates from the SDK to the
// data store.
type envContextStreamUpdates struct {
context *envContextImpl
}

// NewEnvContext creates the internal implementation of EnvContext.
//
// It immediately begins trying to initialize the SDK client for this environment. Since that might
Expand Down Expand Up @@ -188,7 +195,9 @@ func NewEnvContext(
httpConfig, bigSegmentStore, allConfig.Main.BaseURI.String(), allConfig.Main.StreamURI.String(),
envConfig.EnvID, envConfig.SDKKey, envLoggers)
thingsToCleanUp.AddFunc(envContext.bigSegmentSync.Close)
envContext.bigSegmentSync.Start()
// We deliberate do not call bigSegmentSync.Start() here because we don't want the synchronizer to
// start until we know that at least one big segment exists. That's implemented by the
// envContextStreamUpdates methods.

// This function allows us to tell our big segment store wrapper (see sdks package) whether or not
// to really query the big segment store. Currently it always returns true because we are always
Expand All @@ -210,6 +219,10 @@ func NewEnvContext(
envContext.envStreams = envStreams
thingsToCleanUp.AddCloser(envStreams)

envStreamUpdates := &envContextStreamUpdates{
context: envContext,
}

for c := range credentials {
envStreams.AddCredential(c)
}
Expand All @@ -228,7 +241,7 @@ func NewEnvContext(
if dataStoreFactory == nil {
dataStoreFactory = ldcomponents.InMemoryDataStore()
}
storeAdapter := store.NewSSERelayDataStoreAdapter(dataStoreFactory, envStreams)
storeAdapter := store.NewSSERelayDataStoreAdapter(dataStoreFactory, envStreamUpdates)
envContext.storeAdapter = storeAdapter

var eventDispatcher *events.EventDispatcher
Expand Down Expand Up @@ -603,6 +616,46 @@ func (q envContextStoreQueries) GetAll(kind ldstoretypes.DataKind) ([]ldstoretyp
return nil, nil
}

func (u *envContextStreamUpdates) SendAllDataUpdate(allData []ldstoretypes.Collection) {
// We use this delegator, rather than sending updates directory to context.envStreams, so that we
// can detect the presence of a big segment and turn on the big segment synchronizer as needed.
u.context.envStreams.SendAllDataUpdate(allData)
if u.context.bigSegmentSync == nil {
return
}
hasBigSegment := false
for _, coll := range allData {
if coll.Kind == ldstoreimpl.Segments() {
for _, keyedItem := range coll.Items {
if s, ok := keyedItem.Item.Item.(*ldmodel.Segment); ok && s.Unbounded {
hasBigSegment = true
break
}
}
}
}
if hasBigSegment {
u.context.bigSegmentSync.Start() // has no effect if already started
}
}

func (u *envContextStreamUpdates) SendSingleItemUpdate(kind ldstoretypes.DataKind, key string, item ldstoretypes.ItemDescriptor) {
// See comments in SendAllDataUpdate.
u.context.envStreams.SendSingleItemUpdate(kind, key, item)
if u.context.bigSegmentSync == nil {
return
}
hasBigSegment := false
if kind == ldstoreimpl.Segments() {
if s, ok := item.Item.(*ldmodel.Segment); ok && s.Unbounded {
hasBigSegment = true
}
}
if hasBigSegment {
u.context.bigSegmentSync.Start() // has no effect if already started
}
}

func makeLogPrefix(logNameMode LogNameMode, sdkKey config.SDKKey, envID config.EnvironmentID) string {
name := string(sdkKey)
if logNameMode == LogNameIsEnvID && envID != "" {
Expand Down
193 changes: 190 additions & 3 deletions internal/core/relayenv/env_context_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ import (
"net/http"
"net/http/httptest"
"regexp"
"sync"
"testing"
"time"

"github.com/launchdarkly/ld-relay/v6/config"
"github.com/launchdarkly/ld-relay/v6/internal/basictypes"
"github.com/launchdarkly/ld-relay/v6/internal/core/bigsegments"
"github.com/launchdarkly/ld-relay/v6/internal/core/httpconfig"
"github.com/launchdarkly/ld-relay/v6/internal/core/internal/metrics"
"github.com/launchdarkly/ld-relay/v6/internal/core/sdks"
"github.com/launchdarkly/ld-relay/v6/internal/core/sharedtest"
st "github.com/launchdarkly/ld-relay/v6/internal/core/sharedtest"
"github.com/launchdarkly/ld-relay/v6/internal/core/sharedtest/testclient"

Expand All @@ -22,6 +26,9 @@ import (
"gopkg.in/launchdarkly/go-sdk-common.v2/ldlogtest"
"gopkg.in/launchdarkly/go-sdk-common.v2/ldvalue"
ldevents "gopkg.in/launchdarkly/go-sdk-events.v1"
"gopkg.in/launchdarkly/go-server-sdk-evaluation.v1/ldbuilders"
"gopkg.in/launchdarkly/go-server-sdk.v5/interfaces/ldstoretypes"
"gopkg.in/launchdarkly/go-server-sdk.v5/ldcomponents/ldstoreimpl"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -505,6 +512,145 @@ func TestEventDispatcherIsNotCreatedIfSendEventsIsTrueAndNotInOfflineMode(t *tes
// // detail in the unit tests in the sdks package.
// }

func TestBigSegmentsSynchronizerIsCreatedIfBigSegmentStoreExists(t *testing.T) {
envConfig := st.EnvMain.Config
allConfig := config.Config{}

fakeBigSegmentStoreFactory := func(config.EnvConfig, config.Config, ldlog.Loggers) (bigsegments.BigSegmentStore, error) {
return bigsegments.NewNullBigSegmentStore(), nil
}
fakeSynchronizerFactory := &mockBigSegmentSynchronizerFactory{}

mockLog := ldlogtest.NewMockLog()
defer mockLog.DumpIfTestFailed(t)

env, err := NewEnvContext(EnvContextImplParams{
Identifiers: EnvIdentifiers{ConfiguredName: st.EnvMain.Name},
EnvConfig: envConfig,
AllConfig: allConfig,
BigSegmentStoreFactory: fakeBigSegmentStoreFactory,
BigSegmentSynchronizerFactory: fakeSynchronizerFactory.create,
ClientFactory: testclient.FakeLDClientFactory(true),
Loggers: mockLog.Loggers,
}, nil)
require.NoError(t, err)

if assert.NotNil(t, fakeSynchronizerFactory.synchronizer) {
assert.False(t, fakeSynchronizerFactory.synchronizer.isStarted())
assert.False(t, fakeSynchronizerFactory.synchronizer.isClosed())
}

env.Close()

assert.True(t, fakeSynchronizerFactory.synchronizer.isClosed())
}

func TestBigSegmentsSynchronizerIsStartedByFullDataUpdateWithBigSegment(t *testing.T) {
envConfig := st.EnvMain.Config
allConfig := config.Config{}

fakeBigSegmentStoreFactory := func(config.EnvConfig, config.Config, ldlog.Loggers) (bigsegments.BigSegmentStore, error) {
return bigsegments.NewNullBigSegmentStore(), nil
}
fakeSynchronizerFactory := &mockBigSegmentSynchronizerFactory{}

mockLog := ldlogtest.NewMockLog()
defer mockLog.DumpIfTestFailed(t)

env, err := NewEnvContext(EnvContextImplParams{
Identifiers: EnvIdentifiers{ConfiguredName: st.EnvMain.Name},
EnvConfig: envConfig,
AllConfig: allConfig,
BigSegmentStoreFactory: fakeBigSegmentStoreFactory,
BigSegmentSynchronizerFactory: fakeSynchronizerFactory.create,
ClientFactory: testclient.FakeLDClientFactory(true),
Loggers: mockLog.Loggers,
}, nil)
require.NoError(t, err)
defer env.Close()

synchronizer := fakeSynchronizerFactory.synchronizer
require.NotNil(t, synchronizer)
assert.False(t, synchronizer.isStarted())

// Simulate receiving some data
updates := env.(*envContextImpl).storeAdapter.GetUpdates()

s1 := ldbuilders.NewSegmentBuilder("s1").Build()
dataWithNoBigSegment := []ldstoretypes.Collection{
{
Kind: ldstoreimpl.Segments(),
Items: []ldstoretypes.KeyedItemDescriptor{
{Key: "s1", Item: sharedtest.SegmentDesc(s1)},
},
},
}
updates.SendAllDataUpdate(dataWithNoBigSegment)

assert.False(t, synchronizer.isStarted())

s2 := ldbuilders.NewSegmentBuilder("s2").Unbounded(true).Generation(1).Build()
dataWithBigSegment := []ldstoretypes.Collection{
{
Kind: ldstoreimpl.Segments(),
Items: []ldstoretypes.KeyedItemDescriptor{
{Key: "s1", Item: sharedtest.SegmentDesc(s1)},
{Key: "s2", Item: sharedtest.SegmentDesc(s2)},
},
},
}
updates.SendAllDataUpdate(dataWithBigSegment)

assert.True(t, synchronizer.isStarted())
}

func TestBigSegmentsSynchronizerIsStartedBySingleItemUpdateWithBigSegment(t *testing.T) {
envConfig := st.EnvMain.Config
allConfig := config.Config{}

fakeBigSegmentStoreFactory := func(config.EnvConfig, config.Config, ldlog.Loggers) (bigsegments.BigSegmentStore, error) {
return bigsegments.NewNullBigSegmentStore(), nil
}
fakeSynchronizerFactory := &mockBigSegmentSynchronizerFactory{}

mockLog := ldlogtest.NewMockLog()
defer mockLog.DumpIfTestFailed(t)

env, err := NewEnvContext(EnvContextImplParams{
Identifiers: EnvIdentifiers{ConfiguredName: st.EnvMain.Name},
EnvConfig: envConfig,
AllConfig: allConfig,
BigSegmentStoreFactory: fakeBigSegmentStoreFactory,
BigSegmentSynchronizerFactory: fakeSynchronizerFactory.create,
ClientFactory: testclient.FakeLDClientFactory(true),
Loggers: mockLog.Loggers,
}, nil)
require.NoError(t, err)
defer env.Close()

synchronizer := fakeSynchronizerFactory.synchronizer
require.NotNil(t, synchronizer)
assert.False(t, synchronizer.isStarted())

// Simulate receiving some data
updates := env.(*envContextImpl).storeAdapter.GetUpdates()

f1 := ldbuilders.NewFlagBuilder("f1").Build()
updates.SendSingleItemUpdate(ldstoreimpl.Features(), f1.Key, sharedtest.FlagDesc(f1))

assert.False(t, synchronizer.isStarted())

s1 := ldbuilders.NewSegmentBuilder("s1").Build()
updates.SendSingleItemUpdate(ldstoreimpl.Segments(), s1.Key, sharedtest.SegmentDesc(s1))

assert.False(t, synchronizer.isStarted())

s2 := ldbuilders.NewSegmentBuilder("s2").Unbounded(true).Generation(1).Build()
updates.SendSingleItemUpdate(ldstoreimpl.Segments(), s2.Key, sharedtest.SegmentDesc(s2))

assert.True(t, synchronizer.isStarted())
}

// This method forces the metrics events exporter to post an event to the event publisher, and then triggers a
// flush of the event publisher. Because both of those actions are asynchronous, it may be necessary to call it
// more than once to ensure that the newly posted event is included in the flush.
Expand All @@ -515,8 +661,49 @@ func flushMetricsEvents(c *envContextImpl) {
}
}

type mockBigSegmentSynchronizer struct{}
type mockBigSegmentSynchronizerFactory struct {
synchronizer *mockBigSegmentSynchronizer
}

func (f *mockBigSegmentSynchronizerFactory) create(
httpConfig httpconfig.HTTPConfig,
store bigsegments.BigSegmentStore,
pollURI string,
streamURI string,
envID config.EnvironmentID,
sdkKey config.SDKKey,
loggers ldlog.Loggers,
) bigsegments.BigSegmentSynchronizer {
f.synchronizer = &mockBigSegmentSynchronizer{}
return f.synchronizer
}

type mockBigSegmentSynchronizer struct {
started bool
closed bool
lock sync.Mutex
}

func (s *mockBigSegmentSynchronizer) Start() {
s.lock.Lock()
s.started = true
s.lock.Unlock()
}

func (s *mockBigSegmentSynchronizer) Close() {
s.lock.Lock()
s.closed = true
s.lock.Unlock()
}

func (s *mockBigSegmentSynchronizer) Start() {}
func (s *mockBigSegmentSynchronizer) isStarted() bool {
s.lock.Lock()
defer s.lock.Unlock()
return s.started
}

func (s *mockBigSegmentSynchronizer) Close() {}
func (s *mockBigSegmentSynchronizer) isClosed() bool {
s.lock.Lock()
defer s.lock.Unlock()
return s.closed
}

0 comments on commit d9361bd

Please sign in to comment.