Skip to content

Commit

Permalink
Merge pull request #262 from launchdarkly/eb/ch115311/unapplied-patch…
Browse files Browse the repository at this point in the history
…-warning

(#1) make logging of big segments patch version mismatch clearer and use Warn level
  • Loading branch information
eli-darkly authored Jul 21, 2021
2 parents 2135a31 + 6796029 commit 048acef
Show file tree
Hide file tree
Showing 7 changed files with 375 additions and 106 deletions.
8 changes: 5 additions & 3 deletions internal/core/bigsegments/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ type BigSegmentStore interface {
// The rest of the interface methods are non-exported because they're only relevant within
// this package, and no implementations of the interface are created outside of this package.

// applyPatch is used to apply updates to the store.
applyPatch(patch bigSegmentPatch) error
// applyPatch is used to apply updates to the store. If successful, it returns (true, nil); if
// the patch was not applied because its PreviousVersion did not match the current cursor, it
// returns (false, nil); a non-nil second value indicates a database error.
applyPatch(patch bigSegmentPatch) (bool, error)
// getCursor loads the synchronization cursor from the external store.
getCursor() (string, error)
// setSynchronizedOn stores the synchronization time in the external store
Expand Down Expand Up @@ -73,7 +75,7 @@ type nullBigSegmentStore struct{}

func (s *nullBigSegmentStore) Close() error { return nil }

func (s *nullBigSegmentStore) applyPatch(patch bigSegmentPatch) error { return nil }
func (s *nullBigSegmentStore) applyPatch(patch bigSegmentPatch) (bool, error) { return false, nil }

func (s *nullBigSegmentStore) getCursor() (string, error) { return "", nil }

Expand Down
19 changes: 14 additions & 5 deletions internal/core/bigsegments/store_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ func testGenericAll(
t *testing.T,
withBigSegmentStore func(t *testing.T, action func(BigSegmentStore, bigSegmentOperations)),
) {
patch1 := newPatchBuilder("segment.g1", "1", "").
addIncludes("included1", "included2").addExcludes("excluded1", "excluded2").build()
patch2 := newPatchBuilder("segment.g1", "2", "1").
removeIncludes("included1").removeExcludes("excluded1").build()

t.Run("synchronizedOn", func(t *testing.T) {
withBigSegmentStore(t, func(store BigSegmentStore, operations bigSegmentOperations) {
sync1, err := store.GetSynchronizedOn()
Expand All @@ -40,8 +45,9 @@ func testGenericAll(
t.Run("applyPatchSequence", func(t *testing.T) {
withBigSegmentStore(t, func(store BigSegmentStore, operations bigSegmentOperations) {
// apply initial patch that adds users
err := store.applyPatch(patch1)
success, err := store.applyPatch(patch1)
require.NoError(t, err)
require.True(t, success)

cursor, err := store.getCursor()
require.NoError(t, err)
Expand All @@ -56,8 +62,9 @@ func testGenericAll(
assert.Equal(t, true, membership)

// apply second patch in sequence that removes users
err = store.applyPatch(patch2)
success, err = store.applyPatch(patch2)
require.NoError(t, err)
require.True(t, success)

cursor, err = store.getCursor()
require.NoError(t, err)
Expand All @@ -72,8 +79,9 @@ func testGenericAll(
assert.Equal(t, false, membership)

// apply old patch
err = store.applyPatch(patch1)
require.Error(t, err)
success, err = store.applyPatch(patch1)
require.NoError(t, err)
require.False(t, success)

cursor, err = store.getCursor()
require.NoError(t, err)
Expand Down Expand Up @@ -101,8 +109,9 @@ func testGenericAll(
},
}

err := store.applyPatch(patch)
success, err := store.applyPatch(patch)
require.NoError(t, err)
require.True(t, success)

membership, err := operations.isUserIncluded(patch.SegmentID, strconv.FormatUint(uint64(userCount-1), 10))
assert.Equal(t, true, membership)
Expand Down
20 changes: 17 additions & 3 deletions internal/core/bigsegments/store_dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (store *dynamoDBBigSegmentStore) makeTransactionItem(updateExpression, attr
}
}

func (store *dynamoDBBigSegmentStore) applyPatch(patch bigSegmentPatch) error {
func (store *dynamoDBBigSegmentStore) applyPatch(patch bigSegmentPatch) (bool, error) {
bigSegmentsMetadataKeyWithPrefix := dynamoDBMetadataKey(store.prefix)

var conditionExpression *string
Expand Down Expand Up @@ -163,7 +163,18 @@ func (store *dynamoDBBigSegmentStore) applyPatch(patch bigSegmentPatch) error {
TransactItems: transactionBatch,
})
if err != nil {
return err
// DynamoDB doesn't seem to provide a more convenient programmatic way to distinguish
// "transaction was cancelled due to the condition check" from other errors here; we
// need to go to this trouble because we want the synchronizer to be able to log an
// out-of-order update in a clear way that doesn't look like a random database error.
if tce, ok := err.(*dynamodb.TransactionCanceledException); ok {
for _, reason := range tce.CancellationReasons {
if reason.Code != nil && *reason.Code == "ConditionalCheckFailed" {
return false, nil
}
}
}
return false, err
}
transactionBatch = transactionBatch[:0]
}
Expand All @@ -183,7 +194,10 @@ func (store *dynamoDBBigSegmentStore) applyPatch(patch bigSegmentPatch) error {
}

_, err := store.client.PutItem(&putCursorInput)
return err
if err == nil {
return true, nil
}
return false, err
}

func (store *dynamoDBBigSegmentStore) getCursor() (string, error) {
Expand Down
18 changes: 11 additions & 7 deletions internal/core/bigsegments/store_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package bigsegments

import (
"context"
"errors"
"fmt"
"strconv"

Expand Down Expand Up @@ -77,17 +76,19 @@ func newRedisBigSegmentStore(
}

// applyPatch is used to apply updates to the store.
func (r *redisBigSegmentStore) applyPatch(patch bigSegmentPatch) error {
func (r *redisBigSegmentStore) applyPatch(patch bigSegmentPatch) (bool, error) {
ctx := context.Background()

updated := false

err := r.client.Watch(ctx, func(tx *redis.Tx) error {
cursor, err := r.client.Get(ctx, redisCursorKey(r.prefix)).Result()
if err != nil && err != redis.Nil {
return err
}

if err != redis.Nil && cursor != patch.PreviousVersion {
return errors.New("attempted to apply old patch")
if err == nil && cursor != patch.PreviousVersion {
return err
}

result, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
Expand Down Expand Up @@ -134,13 +135,16 @@ func (r *redisBigSegmentStore) applyPatch(patch bigSegmentPatch) error {
if err != nil {
return nil
}
if len(result) > 0 {
return result[0].Err()
for _, r := range result {
if r.Err() != nil {
return r.Err()
}
}
updated = true
return nil
}, redisLockKey(r.prefix))

return err
return updated, err
}

func (r *redisBigSegmentStore) getCursor() (string, error) {
Expand Down
86 changes: 53 additions & 33 deletions internal/core/bigsegments/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (
)

const (
unboundedPollPath = "/sdk/big-segments/revisions"
unboundedStreamPath = "/big-segments"
streamReadTimeout = 5 * time.Minute
retryInterval = 10 * time.Second
synchronizedOnInterval = 30 * time.Second
unboundedPollPath = "/sdk/big-segments/revisions"
unboundedStreamPath = "/big-segments"
streamReadTimeout = 5 * time.Minute
defaultStreamRetryInterval = 10 * time.Second
synchronizedOnInterval = 30 * time.Second
)

// BigSegmentSynchronizer synchronizes big segment state for a given environment.
Expand Down Expand Up @@ -53,15 +53,16 @@ type BigSegmentSynchronizerFactory func(

// defaultBigSegmentSynchronizer is the standard implementation of BigSegmentSynchronizer.
type defaultBigSegmentSynchronizer struct {
httpConfig httpconfig.HTTPConfig
store BigSegmentStore
pollURI string
streamURI string
envID config.EnvironmentID
sdkKey config.SDKKey
closeChan chan struct{}
closeOnce sync.Once
loggers ldlog.Loggers
httpConfig httpconfig.HTTPConfig
store BigSegmentStore
pollURI string
streamURI string
envID config.EnvironmentID
sdkKey config.SDKKey
streamRetryInterval time.Duration
closeChan chan struct{}
closeOnce sync.Once
loggers ldlog.Loggers
}

// DefaultBigSegmentSynchronizerFactory creates the default implementation of BigSegmentSynchronizer.
Expand All @@ -87,14 +88,15 @@ func newDefaultBigSegmentSynchronizer(
loggers ldlog.Loggers,
) *defaultBigSegmentSynchronizer {
s := defaultBigSegmentSynchronizer{
httpConfig: httpConfig,
store: store,
pollURI: strings.TrimSuffix(pollURI, "/") + unboundedPollPath,
streamURI: strings.TrimSuffix(streamURI, "/") + unboundedStreamPath,
envID: envID,
sdkKey: sdkKey,
closeChan: make(chan struct{}),
loggers: loggers,
httpConfig: httpConfig,
store: store,
pollURI: strings.TrimSuffix(pollURI, "/") + unboundedPollPath,
streamURI: strings.TrimSuffix(streamURI, "/") + unboundedStreamPath,
envID: envID,
sdkKey: sdkKey,
streamRetryInterval: defaultStreamRetryInterval,
closeChan: make(chan struct{}),
loggers: loggers,
}

s.loggers.SetPrefix("BigSegmentSynchronizer:")
Expand Down Expand Up @@ -122,7 +124,7 @@ func (s *defaultBigSegmentSynchronizer) Close() {

func (s *defaultBigSegmentSynchronizer) syncSupervisor() {
for {
timer := time.NewTimer(retryInterval)
timer := time.NewTimer(s.streamRetryInterval)
err := s.sync()
if err != nil {
s.loggers.Error("Synchronization failed:", err)
Expand Down Expand Up @@ -173,6 +175,7 @@ func (s *defaultBigSegmentSynchronizer) sync() error {
continue
}

s.loggers.Debug("Marking store as synchronized")
err = s.store.setSynchronizedOn(ldtime.UnixMillisNow())
if err != nil {
s.loggers.Error("Updating store timestamp failed:", err)
Expand Down Expand Up @@ -239,9 +242,9 @@ func (s *defaultBigSegmentSynchronizer) poll() (bool, error) {
return false, err
}

count, err := s.applyPatches(responseBody)
totalCount, _, err := s.applyPatches(responseBody)

return count == 0, err
return totalCount == 0, err
}

func (s *defaultBigSegmentSynchronizer) connectStream() (*es.Stream, error) {
Expand Down Expand Up @@ -276,13 +279,18 @@ func (s *defaultBigSegmentSynchronizer) consumeStream(stream *es.Stream) error {
select {
case event, ok := <-stream.Events:
if !ok {
s.loggers.Debug("Stream ended")
return nil
}

s.loggers.Debug("Received update(s) from stream")
if _, err := s.applyPatches([]byte(event.Data())); err != nil {
totalCount, appliedCount, err := s.applyPatches([]byte(event.Data()))
if err != nil {
return err
}
if appliedCount < totalCount {
return nil // forces a restart if we got an out-of-order patch
}

if err := s.store.setSynchronizedOn(ldtime.UnixMillisNow()); err != nil {
return err
Expand All @@ -298,21 +306,33 @@ func (s *defaultBigSegmentSynchronizer) consumeStream(stream *es.Stream) error {
}
}

func (s *defaultBigSegmentSynchronizer) applyPatches(jsonData []byte) (int, error) {
// Returns total number of patches, number of patches applied, error
func (s *defaultBigSegmentSynchronizer) applyPatches(jsonData []byte) (int, int, error) {
var patches []bigSegmentPatch
err := json.Unmarshal(jsonData, &patches)
if err != nil {
return 0, err
return 0, 0, err
}

successCount := 0
for _, patch := range patches {
s.loggers.Debugf("Received patch for version %q (from previous version %q)", patch.Version, patch.PreviousVersion)
if err := s.store.applyPatch(patch); err != nil {
return 0, err
success, err := s.store.applyPatch(patch)
if err != nil {
return len(patches), successCount, err
}
if !success {
s.loggers.Warnf("Received a patch to previous version %q which was not the latest known version; skipping", patch.PreviousVersion)
break
}
successCount++
}
if len(patches) > 0 {
s.loggers.Infof("Applied %d updates", len(patches))
if successCount > 0 {
updatesDesc := "updates"
if successCount == 1 {
updatesDesc = "update"
}
s.loggers.Infof("Applied %d %s", successCount, updatesDesc)
}
return len(patches), nil
return len(patches), successCount, nil
}
Loading

0 comments on commit 048acef

Please sign in to comment.