Skip to content

Commit

Permalink
support secondary indexes on DynamoDB (#147)
Browse files Browse the repository at this point in the history
  • Loading branch information
yito88 authored Jan 22, 2021
1 parent ea97de7 commit a68aa4c
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.scalar.db.api.Scanner;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.exception.storage.InvalidUsageException;
import com.scalar.db.exception.storage.NoMutationException;
import com.scalar.db.io.BooleanValue;
import com.scalar.db.io.IntValue;
Expand All @@ -44,6 +45,7 @@
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.LocalSecondaryIndex;
Expand Down Expand Up @@ -154,6 +156,50 @@ public void get_GetWithProjectionsGiven_ShouldRetrieveSpecifiedValues()
assertThat(actual.get().getValue(COL_NAME5).isPresent()).isFalse();
}

@Test
public void get_GetGivenForIndexedColumn_ShouldGet() throws ExecutionException {
// Arrange
storage.put(preparePuts().get(0)); // (0,0)
int c3 = 0;
Get get = new Get(new Key(new IntValue(COL_NAME3, c3)));

// Act
Optional<Result> actual = storage.get(get);

// Assert
assertThat(actual.isPresent()).isTrue();
assertThat(actual.get().getValue(COL_NAME1)).isEqualTo(Optional.of(new IntValue(COL_NAME1, 0)));
assertThat(actual.get().getValue(COL_NAME4)).isEqualTo(Optional.of(new IntValue(COL_NAME4, 0)));
}

@Test
public void
get_GetGivenForIndexedColumnMatchingMultipleRecords_ShouldThrowInvalidUsageException() {
// Arrange
populateRecords();
int c3 = 3;
Get get = new Get(new Key(new IntValue(COL_NAME3, c3)));

// Act Assert
assertThatThrownBy(() -> storage.get(get)).isInstanceOf(InvalidUsageException.class);
}

@Test
public void get_GetGivenForIndexedColumnWithClusteringKey_ShouldThrowIllegalArgumentException()
throws ExecutionException {
// Arrange
int c3 = 0;
int c4 = 0;
Get get = new Get(new Key(new IntValue(COL_NAME3, c3)), new Key((new IntValue(COL_NAME4, c4))));

// Act
assertThatCode(
() -> {
storage.get(get);
})
.isInstanceOf(IllegalArgumentException.class);
}

@Test
public void scan_ScanWithPartitionKeyGiven_ShouldRetrieveMultipleResults()
throws ExecutionException {
Expand Down Expand Up @@ -267,7 +313,8 @@ public void scan_ScanWithStartInclusiveRangeGiven_ShouldRetrieveResultsOfGivenRa
}

@Test
public void scan_ScanWithStartExclusiveRangeGiven_ShouldReturnInclusiveResults() throws ExecutionException {
public void scan_ScanWithStartExclusiveRangeGiven_ShouldReturnInclusiveResults()
throws ExecutionException {
// Arrange
populateRecords();
int pKey = 0;
Expand Down Expand Up @@ -361,6 +408,56 @@ public void scan_ScanWithLimitGiven_ShouldReturnGivenNumberOfResults() throws Ex
.isEqualTo(Optional.of(new IntValue(COL_NAME4, 2)));
}

@Test
public void scan_ScanGivenForIndexedColumn_ShouldScan() throws ExecutionException {
// Arrange
populateRecords();
int c3 = 3;
Scan scan = new Scan(new Key(new IntValue(COL_NAME3, c3)));

// Act
List<Result> actual = storage.scan(scan).all();

// Assert
assertThat(actual.size()).isEqualTo(3); // (3,0), (2,1), (1,2)
assertThat(actual.get(0).getValue(COL_NAME1))
.isEqualTo(Optional.of(new IntValue(COL_NAME1, 3)));
assertThat(actual.get(0).getValue(COL_NAME4))
.isEqualTo(Optional.of(new IntValue(COL_NAME4, 0)));
assertThat(actual.get(1).getValue(COL_NAME1))
.isEqualTo(Optional.of(new IntValue(COL_NAME1, 2)));
assertThat(actual.get(1).getValue(COL_NAME4))
.isEqualTo(Optional.of(new IntValue(COL_NAME4, 1)));
assertThat(actual.get(2).getValue(COL_NAME1))
.isEqualTo(Optional.of(new IntValue(COL_NAME1, 1)));
assertThat(actual.get(2).getValue(COL_NAME4))
.isEqualTo(Optional.of(new IntValue(COL_NAME4, 2)));
}

@Test
public void scan_ScanGivenForIndexedColumnWithOrdering_ShouldThrowIllegalArgumentException()
throws ExecutionException {
// Arrange
int c3 = 0;
Scan scan =
new Scan(new Key(new IntValue(COL_NAME3, c3)))
.withOrdering(new Scan.Ordering(COL_NAME4, Scan.Ordering.Order.ASC));

// Act
assertThatThrownBy(() -> storage.scan(scan)).isInstanceOf(IllegalArgumentException.class);
}

@Test
public void scan_ScanGivenForNonIndexedColumn_ShouldThrowIllegalArgumentException() {
// Arrange
populateRecords();
String c2 = "test";
Scan scan = new Scan(new Key(new TextValue(COL_NAME2, c2)));

// Act Assert
assertThatThrownBy(() -> storage.scan(scan)).isInstanceOf(IllegalArgumentException.class);
}

@Test
public void put_SinglePutGiven_ShouldStoreProperly() throws ExecutionException {
// Arrange
Expand Down Expand Up @@ -1121,7 +1218,7 @@ public static void setUpBeforeClass() throws Exception {

// wait for the creation
try {
Thread.sleep(10000);
Thread.sleep(15000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Expand Down Expand Up @@ -1174,6 +1271,11 @@ private static void createUserTable() {
.attributeName(COL_NAME4)
.attributeType(ScalarAttributeType.N)
.build());
attributeDefinitions.add(
AttributeDefinition.builder()
.attributeName(COL_NAME3)
.attributeType(ScalarAttributeType.N)
.build());
builder.attributeDefinitions(attributeDefinitions);

List<KeySchemaElement> keySchemaElements = new ArrayList<>();
Expand All @@ -1192,14 +1294,26 @@ private static void createUserTable() {
LocalSecondaryIndex.builder()
.indexName(KEYSPACE + "." + TABLE + ".index." + COL_NAME4)
.keySchema(indexKeys)
.projection(
Projection.builder()
.projectionType(ProjectionType.INCLUDE)
.nonKeyAttributes(COL_NAME1, COL_NAME2, COL_NAME3, COL_NAME5)
.build())
.projection(Projection.builder().projectionType(ProjectionType.ALL).build())
.build();
builder.localSecondaryIndexes(index);

List<KeySchemaElement> globalIndexKeys = new ArrayList<>();
globalIndexKeys.add(
KeySchemaElement.builder().attributeName(COL_NAME3).keyType(KeyType.HASH).build());
GlobalSecondaryIndex globalIndex =
GlobalSecondaryIndex.builder()
.indexName(KEYSPACE + "." + TABLE + ".global_index." + COL_NAME3)
.keySchema(globalIndexKeys)
.projection(Projection.builder().projectionType(ProjectionType.ALL).build())
.provisionedThroughput(
ProvisionedThroughput.builder()
.readCapacityUnits(10L)
.writeCapacityUnits(10L)
.build())
.build();
builder.globalSecondaryIndexes(globalIndex);

client.createTable(builder.build());
}

Expand Down Expand Up @@ -1229,6 +1343,7 @@ private static void insertMetadata() {
values.put("table", AttributeValue.builder().s(KEYSPACE + "." + TABLE).build());
values.put("partitionKey", AttributeValue.builder().ss(Arrays.asList(COL_NAME1)).build());
values.put("clusteringKey", AttributeValue.builder().ss(Arrays.asList(COL_NAME4)).build());
values.put("secondaryIndex", AttributeValue.builder().ss(Arrays.asList(COL_NAME3)).build());
Map<String, AttributeValue> columns = new HashMap<>();
columns.put(COL_NAME1, AttributeValue.builder().s("int").build());
columns.put(COL_NAME2, AttributeValue.builder().s("text").build());
Expand Down
11 changes: 8 additions & 3 deletions src/main/java/com/scalar/db/storage/dynamo/Dynamo.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.scalar.db.api.Scanner;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.exception.storage.InvalidUsageException;
import com.scalar.db.storage.Utility;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -99,24 +100,28 @@ public Optional<String> getTable() {
@Nonnull
public Optional<Result> get(Get get) throws ExecutionException {
Utility.setTargetToIfNot(get, namespacePrefix, namespace, tableName);
DynamoTableMetadata metadata = metadataManager.getTableMetadata(get);
Utility.checkGetOperation(get, metadata);

List<Map<String, AttributeValue>> items = selectStatementHandler.handle(get);

if (items.size() > 1) {
throw new InvalidUsageException("please use scan() for non-exact match selection");
}
if (items.isEmpty() || items.get(0) == null) {
return Optional.empty();
}

DynamoTableMetadata metadata = metadataManager.getTableMetadata(get);
return Optional.of(new ResultImpl(items.get(0), get, metadata));
}

@Override
public Scanner scan(Scan scan) throws ExecutionException {
Utility.setTargetToIfNot(scan, namespacePrefix, namespace, tableName);
DynamoTableMetadata metadata = metadataManager.getTableMetadata(scan);
Utility.checkScanOperation(scan, metadata);

List<Map<String, AttributeValue>> items = selectStatementHandler.handle(scan);

DynamoTableMetadata metadata = metadataManager.getTableMetadata(scan);
return new ScannerImpl(items, scan, metadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class DynamoOperation {
static final String RANGE_KEY_ALIAS = ":sk";
static final String RANGE_CONDITION = " BETWEEN :sk0 AND :sk1";
static final String INDEX_NAME_PREFIX = "index";
static final String GLOBAL_INDEX_NAME_PREFIX = "global_index";

private final Operation operation;
private final DynamoTableMetadata metadata;
Expand Down Expand Up @@ -58,6 +59,11 @@ public String getIndexName(String clusteringKey) {
return getTableName() + "." + INDEX_NAME_PREFIX + "." + clusteringKey;
}

@Nonnull
public String getGlobalIndexName(String column) {
return getTableName() + "." + GLOBAL_INDEX_NAME_PREFIX + "." + column;
}

@Nonnull
public Map<String, AttributeValue> getKeyMap() {
Map<String, AttributeValue> keyMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import com.scalar.db.api.Get;
import com.scalar.db.api.Operation;
import com.scalar.db.api.Scan;
import com.scalar.db.api.Selection;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.io.Value;
import com.scalar.db.storage.Utility;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -50,6 +52,12 @@ public List<Map<String, AttributeValue>> handle(Operation operation) throws Exec
checkArgument(operation, Get.class, Scan.class);

try {
if (Utility.isSecondaryIndexSpecified(
operation, metadataManager.getTableMetadata(operation))) {
// convert to a mutable list for the Scanner
return new ArrayList<>(executeQueryWithIndex((Selection) operation).items());
}

if (operation instanceof Get) {
GetItemResponse response = executeGet((Get) operation);
if (response.hasItem()) {
Expand Down Expand Up @@ -85,6 +93,34 @@ private GetItemResponse executeGet(Get get) {
return client.getItem(builder.build());
}

private QueryResponse executeQueryWithIndex(Selection selection) {
DynamoOperation dynamoOperation = new DynamoOperation(selection, metadataManager);
Value keyValue = selection.getPartitionKey().get().get(0);
String column = keyValue.getName();
String indexTable = dynamoOperation.getGlobalIndexName(column);
QueryRequest.Builder builder =
QueryRequest.builder().tableName(dynamoOperation.getTableName()).indexName(indexTable);

String condition = column + " = " + DynamoOperation.VALUE_ALIAS + "0";
ValueBinder binder = new ValueBinder(DynamoOperation.VALUE_ALIAS);
keyValue.accept(binder);
Map<String, AttributeValue> bindMap = binder.build();
builder.keyConditionExpression(condition).expressionAttributeValues(bindMap);

if (!selection.getProjections().isEmpty()) {
builder.projectionExpression(String.join(",", selection.getProjections()));
}

if (selection instanceof Scan) {
Scan scan = (Scan) selection;
if (scan.getLimit() > 0) {
builder.limit(scan.getLimit());
}
}

return client.query(builder.build());
}

private QueryResponse executeQuery(Scan scan) {
DynamoOperation dynamoOperation = new DynamoOperation(scan, metadataManager);
QueryRequest.Builder builder = QueryRequest.builder().tableName(dynamoOperation.getTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public void setUp() throws Exception {
.thenReturn(new HashSet<String>(Arrays.asList(ANY_NAME_1)));
when(metadata.getClusteringKeyNames())
.thenReturn(new HashSet<String>(Arrays.asList(ANY_NAME_2)));
when(metadata.getSecondaryIndexNames())
.thenReturn(new HashSet<String>(Arrays.asList(ANY_NAME_3)));
when(metadata.getKeyNames()).thenReturn(Arrays.asList(ANY_NAME_1, ANY_NAME_2));
}

Expand Down Expand Up @@ -122,6 +124,35 @@ public void handle_GetOperationNoItemReturned_ShouldReturnEmptyList() throws Exc
assertThat(actual).isEmpty();
}

@Test
public void handle_GetOperationWithIndexGiven_ShouldCallQuery() {
// Arrange
when(client.query(any(QueryRequest.class))).thenReturn(queryResponse);
Map<String, AttributeValue> expected = new HashMap<>();
when(queryResponse.items()).thenReturn(Arrays.asList(expected));

Key indexKey = new Key(new TextValue(ANY_NAME_3, ANY_TEXT_3));
Get get = new Get(indexKey).forNamespace(ANY_KEYSPACE_NAME).forTable(ANY_TABLE_NAME);
String expectedKeyCondition = ANY_NAME_3 + " = " + DynamoOperation.VALUE_ALIAS + "0";
Map<String, AttributeValue> expectedBindMap = new HashMap<>();
expectedBindMap.put(
DynamoOperation.VALUE_ALIAS + "0", AttributeValue.builder().s(ANY_TEXT_3).build());

// Act Assert
assertThatCode(
() -> {
handler.handle(get);
})
.doesNotThrowAnyException();

// Assert
ArgumentCaptor<QueryRequest> captor = ArgumentCaptor.forClass(QueryRequest.class);
verify(client).query(captor.capture());
QueryRequest actualRequest = captor.getValue();
assertThat(actualRequest.keyConditionExpression()).isEqualTo(expectedKeyCondition);
assertThat(actualRequest.expressionAttributeValues()).isEqualTo(expectedBindMap);
}

@Test
public void handle_GetOperationDynamoDbExceptionThrown_ShouldThrowExecutionException() {
// Arrange
Expand Down Expand Up @@ -169,6 +200,35 @@ public void handle_ScanOperationGiven_ShouldCallQuery() {
assertThat(actualRequest.expressionAttributeValues()).isEqualTo(expectedBindMap);
}

@Test
public void handle_ScanOperationWithIndexGiven_ShouldCallQuery() {
// Arrange
when(client.query(any(QueryRequest.class))).thenReturn(queryResponse);
Map<String, AttributeValue> expected = new HashMap<>();
when(queryResponse.items()).thenReturn(Arrays.asList(expected));

Key indexKey = new Key(new TextValue(ANY_NAME_3, ANY_TEXT_3));
Scan scan = new Scan(indexKey).forNamespace(ANY_KEYSPACE_NAME).forTable(ANY_TABLE_NAME);
String expectedKeyCondition = ANY_NAME_3 + " = " + DynamoOperation.VALUE_ALIAS + "0";
Map<String, AttributeValue> expectedBindMap = new HashMap<>();
expectedBindMap.put(
DynamoOperation.VALUE_ALIAS + "0", AttributeValue.builder().s(ANY_TEXT_3).build());

// Act Assert
assertThatCode(
() -> {
handler.handle(scan);
})
.doesNotThrowAnyException();

// Assert
ArgumentCaptor<QueryRequest> captor = ArgumentCaptor.forClass(QueryRequest.class);
verify(client).query(captor.capture());
QueryRequest actualRequest = captor.getValue();
assertThat(actualRequest.keyConditionExpression()).isEqualTo(expectedKeyCondition);
assertThat(actualRequest.expressionAttributeValues()).isEqualTo(expectedBindMap);
}

@Test
public void handle_ScanOperationCosmosExceptionThrown_ShouldThrowExecutionException() {
// Arrange
Expand Down

0 comments on commit a68aa4c

Please sign in to comment.