Skip to content

Commit

Permalink
Merge pull request #263 from launchdarkly/eb/ch115285/fix-ddb-updates
Browse files Browse the repository at this point in the history
(#2) fix DynamoDB updates for big segments metadata
  • Loading branch information
eli-darkly authored Jul 21, 2021
2 parents 048acef + 31ff8f6 commit 61e4965
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 32 deletions.
20 changes: 20 additions & 0 deletions internal/core/bigsegments/store_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func testGenericAll(

t.Run("applyPatchSequence", func(t *testing.T) {
withBigSegmentStore(t, func(store BigSegmentStore, operations bigSegmentOperations) {
// first set synchronizedOn, so we can verify that applying a patch does *not* change that value
initialSyncTime := ldtime.UnixMillisecondTime(99999)
require.NoError(t, store.setSynchronizedOn(initialSyncTime))

// apply initial patch that adds users
success, err := store.applyPatch(patch1)
require.NoError(t, err)
Expand Down Expand Up @@ -83,6 +87,22 @@ func testGenericAll(
require.NoError(t, err)
require.False(t, success)

// verify that the stored cursor was updated
cursor, err = store.getCursor()
require.NoError(t, err)
assert.Equal(t, patch2.Version, cursor)

// verify that the sync time is still there
syncTime, err := store.GetSynchronizedOn()
require.NoError(t, err)
assert.Equal(t, initialSyncTime, syncTime)

// now update the sync time and verify that that doesn't affect the cursor
newSyncTime := initialSyncTime + 1
require.NoError(t, store.setSynchronizedOn(newSyncTime))
syncTime, err = store.GetSynchronizedOn()
require.NoError(t, err)
assert.Equal(t, newSyncTime, syncTime)
cursor, err = store.getCursor()
require.NoError(t, err)
assert.Equal(t, patch2.Version, cursor)
Expand Down
74 changes: 42 additions & 32 deletions internal/core/bigsegments/store_dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
const (
tablePartitionKey = "namespace"
tableSortKey = "key"
dynamoDBCursorAttr = "cursor"
dynamoDBCursorAttr = "lastVersion"
dynamoDBIncludedAttr = "included"
dynamoDBExcludedAttr = "excluded"
dynamoDBSyncTimeAttr = "synchronizedOn"
Expand Down Expand Up @@ -96,28 +96,27 @@ func (store *dynamoDBBigSegmentStore) makeTransactionItem(updateExpression, attr
}
}

func makeCursorUpdateCondition(previousVersion string) (string, map[string]*string, map[string]*dynamodb.AttributeValue) {
names := map[string]*string{"#0": aws.String(dynamoDBCursorAttr)}
if previousVersion == "" {
return "attribute_not_exists(#0)", names, nil
}
return "#0 = :0", names, map[string]*dynamodb.AttributeValue{
":0": {S: aws.String(previousVersion)},
}
}

func (store *dynamoDBBigSegmentStore) applyPatch(patch bigSegmentPatch) (bool, error) {
bigSegmentsMetadataKeyWithPrefix := dynamoDBMetadataKey(store.prefix)

var conditionExpression *string
var expressionAttributeValues map[string]*dynamodb.AttributeValue
if patch.PreviousVersion == "" {
conditionExpression = aws.String("attribute_not_exists(#0)")
} else {
conditionExpression = aws.String("#0 = :0")
expressionAttributeValues = map[string]*dynamodb.AttributeValue{
":0": {S: aws.String(patch.PreviousVersion)},
}
}
txConditionExpression, txExprAttrNames, txExprAttrValues := makeCursorUpdateCondition(patch.PreviousVersion)

conditionCheckItem := &dynamodb.TransactWriteItem{
ConditionCheck: &dynamodb.ConditionCheck{
ConditionExpression: conditionExpression,
ExpressionAttributeValues: expressionAttributeValues,
ConditionExpression: aws.String(txConditionExpression),
TableName: aws.String(store.table),
ExpressionAttributeNames: map[string]*string{
"#0": aws.String(dynamoDBCursorAttr),
},
ExpressionAttributeNames: txExprAttrNames,
ExpressionAttributeValues: txExprAttrValues,
Key: map[string]*dynamodb.AttributeValue{
tablePartitionKey: {S: aws.String(bigSegmentsMetadataKeyWithPrefix)},
tableSortKey: {S: aws.String(bigSegmentsMetadataKeyWithPrefix)},
Expand Down Expand Up @@ -179,21 +178,26 @@ func (store *dynamoDBBigSegmentStore) applyPatch(patch bigSegmentPatch) (bool, e
transactionBatch = transactionBatch[:0]
}

putCursorInput := dynamodb.PutItemInput{
ConditionExpression: conditionExpression,
ExpressionAttributeValues: expressionAttributeValues,
updateConditionExpression, updateExprAttrNames, updateExprAttrValues := makeCursorUpdateCondition(patch.PreviousVersion)
if updateExprAttrValues == nil {
updateExprAttrValues = map[string]*dynamodb.AttributeValue{}
}
updateExprAttrValues[":1"] = &dynamodb.AttributeValue{
S: aws.String(patch.Version),
}
updateCursorInput := dynamodb.UpdateItemInput{
ConditionExpression: aws.String(updateConditionExpression),
TableName: aws.String(store.table),
ExpressionAttributeNames: map[string]*string{
"#0": aws.String(dynamoDBCursorAttr),
},
Item: map[string]*dynamodb.AttributeValue{
tablePartitionKey: {S: aws.String(bigSegmentsMetadataKeyWithPrefix)},
tableSortKey: {S: aws.String(bigSegmentsMetadataKeyWithPrefix)},
dynamoDBCursorAttr: {S: aws.String(patch.Version)},
ExpressionAttributeNames: updateExprAttrNames,
ExpressionAttributeValues: updateExprAttrValues,
Key: map[string]*dynamodb.AttributeValue{
tablePartitionKey: {S: aws.String(bigSegmentsMetadataKeyWithPrefix)},
tableSortKey: {S: aws.String(bigSegmentsMetadataKeyWithPrefix)},
},
UpdateExpression: aws.String("SET #0 = :1"),
}

_, err := store.client.PutItem(&putCursorInput)
_, err := store.client.UpdateItem(&updateCursorInput)
if err == nil {
return true, nil
}
Expand Down Expand Up @@ -227,12 +231,18 @@ func (store *dynamoDBBigSegmentStore) getCursor() (string, error) {
func (store *dynamoDBBigSegmentStore) setSynchronizedOn(synchronizedOn ldtime.UnixMillisecondTime) error {
bigSegmentsMetadataKeyWithPrefix := dynamoDBMetadataKey(store.prefix)
unixMilliseconds := strconv.FormatUint(uint64(synchronizedOn), 10)
_, err := store.client.PutItem(&dynamodb.PutItemInput{
_, err := store.client.UpdateItem(&dynamodb.UpdateItemInput{
TableName: aws.String(store.table),
Item: map[string]*dynamodb.AttributeValue{
tablePartitionKey: {S: aws.String(bigSegmentsMetadataKeyWithPrefix)},
tableSortKey: {S: aws.String(bigSegmentsMetadataKeyWithPrefix)},
dynamoDBSyncTimeAttr: {N: aws.String(unixMilliseconds)},
Key: map[string]*dynamodb.AttributeValue{
tablePartitionKey: {S: aws.String(bigSegmentsMetadataKeyWithPrefix)},
tableSortKey: {S: aws.String(bigSegmentsMetadataKeyWithPrefix)},
},
UpdateExpression: aws.String("SET #0 = :0"),
ExpressionAttributeNames: map[string]*string{"#0": aws.String(dynamoDBSyncTimeAttr)},
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
":0": {
N: aws.String(unixMilliseconds),
},
},
})
return err
Expand Down

0 comments on commit 61e4965

Please sign in to comment.