Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support secondary indexes on Cosmos DB #146

Merged
merged 4 commits into from
Jan 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.CosmosDatabase;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosStoredProcedureProperties;
import com.azure.cosmos.models.CosmosStoredProcedureRequestOptions;
import com.scalar.db.api.Delete;
import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.Get;
Expand All @@ -18,10 +20,14 @@
import com.scalar.db.api.Scanner;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.exception.storage.InvalidUsageException;
import com.scalar.db.io.BooleanValue;
import com.scalar.db.io.IntValue;
import com.scalar.db.io.Key;
import com.scalar.db.io.TextValue;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -42,6 +48,8 @@ public class CosmosIntegrationTest {
private static final String METADATA_TABLE = "metadata";
private static final String KEYSPACE = "integration_testing";
private static final String TABLE = "test_table";
private static final String STORED_PROCEDURE_PATH =
"tools/scalar-schema/stored_procedure/mutate.js";
private static final String CONTACT_POINT = System.getenv("COSMOS_URI");
private static final String USERNAME = "not_used";
private static final String PASSWORD = System.getenv("COSMOS_PASSWORD");
Expand All @@ -60,6 +68,17 @@ public void setUp() throws Exception {
new CosmosContainerProperties(TABLE, PARTITION_KEY);
client.getDatabase(KEYSPACE).createContainerIfNotExists(containerProperties);

String storedProcedure =
Files.lines(Paths.get(STORED_PROCEDURE_PATH), StandardCharsets.UTF_8)
.reduce("", (prev, cur) -> prev + cur + System.getProperty("line.separator"));
CosmosStoredProcedureProperties properties =
new CosmosStoredProcedureProperties("mutate.js", storedProcedure);
client
.getDatabase(KEYSPACE)
.getContainer(TABLE)
.getScripts()
.createStoredProcedure(properties, new CosmosStoredProcedureRequestOptions());

storage.with(KEYSPACE, TABLE);
}

Expand Down Expand Up @@ -129,6 +148,50 @@ public void get_GetWithProjectionsGiven_ShouldRetrieveSpecifiedValues()
assertThat(actual.get().getValue(COL_NAME5).isPresent()).isFalse();
}

@Test
public void get_GetGivenForIndexedColumn_ShouldGet() throws ExecutionException {
// Arrange
storage.put(preparePuts().get(0)); // (0,0)
int c3 = 0;
Get get = new Get(new Key(new IntValue(COL_NAME3, c3)));

// Act
Optional<Result> actual = storage.get(get);

// Assert
assertThat(actual.isPresent()).isTrue();
assertThat(actual.get().getValue(COL_NAME1)).isEqualTo(Optional.of(new IntValue(COL_NAME1, 0)));
assertThat(actual.get().getValue(COL_NAME4)).isEqualTo(Optional.of(new IntValue(COL_NAME4, 0)));
}

@Test
public void
get_GetGivenForIndexedColumnMatchingMultipleRecords_ShouldThrowInvalidUsageException() {
// Arrange
populateRecords();
int c3 = 3;
Get get = new Get(new Key(new IntValue(COL_NAME3, c3)));

// Act Assert
assertThatThrownBy(() -> storage.get(get)).isInstanceOf(InvalidUsageException.class);
}

@Test
public void get_GetGivenForIndexedColumnWithClusteringKey_ShouldThrowIllegalArgumentException()
throws ExecutionException {
// Arrange
int c3 = 0;
int c4 = 0;
Get get = new Get(new Key(new IntValue(COL_NAME3, c3)), new Key((new IntValue(COL_NAME4, c4))));

// Act
assertThatCode(
() -> {
storage.get(get);
})
.isInstanceOf(IllegalArgumentException.class);
}

@Test
public void scan_ScanWithPartitionKeyGiven_ShouldRetrieveMultipleResults()
throws ExecutionException {
Expand Down Expand Up @@ -339,6 +402,56 @@ public void scan_ScanWithLimitGiven_ShouldReturnGivenNumberOfResults() throws Ex
.isEqualTo(Optional.of(new IntValue(COL_NAME4, 2)));
}

@Test
public void scan_ScanGivenForIndexedColumn_ShouldScan() throws ExecutionException {
// Arrange
populateRecords();
int c3 = 3;
Scan scan = new Scan(new Key(new IntValue(COL_NAME3, c3)));

// Act
List<Result> actual = storage.scan(scan).all();

// Assert
assertThat(actual.size()).isEqualTo(3); // (1,2), (2,1), (3,0)
assertThat(actual.get(0).getValue(COL_NAME1))
.isEqualTo(Optional.of(new IntValue(COL_NAME1, 1)));
assertThat(actual.get(0).getValue(COL_NAME4))
.isEqualTo(Optional.of(new IntValue(COL_NAME4, 2)));
assertThat(actual.get(1).getValue(COL_NAME1))
.isEqualTo(Optional.of(new IntValue(COL_NAME1, 2)));
assertThat(actual.get(1).getValue(COL_NAME4))
.isEqualTo(Optional.of(new IntValue(COL_NAME4, 1)));
assertThat(actual.get(2).getValue(COL_NAME1))
.isEqualTo(Optional.of(new IntValue(COL_NAME1, 3)));
assertThat(actual.get(2).getValue(COL_NAME4))
.isEqualTo(Optional.of(new IntValue(COL_NAME4, 0)));
}

@Test
public void scan_ScanGivenForIndexedColumnWithOrdering_ShouldThrowIllegalArgumentException()
throws ExecutionException {
// Arrange
int c3 = 0;
Scan scan =
new Scan(new Key(new IntValue(COL_NAME3, c3)))
.withOrdering(new Scan.Ordering(COL_NAME4, Scan.Ordering.Order.ASC));

// Act
assertThatThrownBy(() -> storage.scan(scan)).isInstanceOf(IllegalArgumentException.class);
}

@Test
public void scan_ScanGivenForNonIndexedColumn_ShouldThrowIllegalArgumentException() {
// Arrange
populateRecords();
String c2 = "test";
Scan scan = new Scan(new Key(new TextValue(COL_NAME2, c2)));

// Act Assert
assertThatThrownBy(() -> storage.scan(scan)).isInstanceOf(IllegalArgumentException.class);
}

@Test
public void delete_DeleteWithPartitionKeyAndClusteringKeyGiven_ShouldDeleteSingleRecordProperly()
throws ExecutionException {
Expand Down Expand Up @@ -539,6 +652,7 @@ public static void setUpBeforeClass() throws Exception {
metadata.setId(KEYSPACE + "." + TABLE);
metadata.setPartitionKeyNames(new HashSet<>(Arrays.asList(COL_NAME1)));
metadata.setClusteringKeyNames(new HashSet<>(Arrays.asList(COL_NAME4)));
metadata.setSecondaryIndexNames(new HashSet<>(Arrays.asList(COL_NAME3)));
Map<String, String> columns = new HashMap<>();
columns.put(COL_NAME1, "int");
columns.put(COL_NAME2, "text");
Expand Down
11 changes: 8 additions & 3 deletions src/main/java/com/scalar/db/storage/cosmos/Cosmos.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.scalar.db.api.Scanner;
import com.scalar.db.config.DatabaseConfig;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.exception.storage.InvalidUsageException;
import com.scalar.db.storage.Utility;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -104,24 +105,28 @@ public Optional<String> getTable() {
@Nonnull
public Optional<Result> get(Get get) throws ExecutionException {
Utility.setTargetToIfNot(get, namespacePrefix, namespace, tableName);
CosmosTableMetadata metadata = metadataManager.getTableMetadata(get);
Utility.checkGetOperation(get, metadata);

List<Record> records = selectStatementHandler.handle(get);

if (records.size() > 1) {
throw new InvalidUsageException("please use scan() for non-exact match selection");
}
if (records.isEmpty() || records.get(0) == null) {
return Optional.empty();
}

CosmosTableMetadata metadata = metadataManager.getTableMetadata(get);
return Optional.of(new ResultImpl(records.get(0), get, metadata));
}

@Override
public Scanner scan(Scan scan) throws ExecutionException {
Utility.setTargetToIfNot(scan, namespacePrefix, namespace, tableName);
CosmosTableMetadata metadata = metadataManager.getTableMetadata(scan);
Utility.checkScanOperation(scan, metadata);

List<Record> records = selectStatementHandler.handle(scan);

CosmosTableMetadata metadata = metadataManager.getTableMetadata(scan);
return new ScannerImpl(records, scan, metadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.scalar.db.api.Operation;
import com.scalar.db.api.Scan;
import com.scalar.db.io.Value;
import com.scalar.db.storage.Utility;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand All @@ -19,6 +20,7 @@
import org.jooq.OrderField;
import org.jooq.SQLDialect;
import org.jooq.SelectConditionStep;
import org.jooq.SelectWhereStep;
import org.jooq.conf.ParamType;
import org.jooq.impl.DSL;

Expand Down Expand Up @@ -53,6 +55,10 @@ private List<Record> executeRead(Operation operation) throws CosmosException {
CosmosOperation cosmosOperation = new CosmosOperation(operation, metadataManager);
cosmosOperation.checkArgument(Get.class);

if (Utility.isSecondaryIndexSpecified(operation, metadataManager.getTableMetadata(operation))) {
return executeReadWithIndex(operation);
}

String id = cosmosOperation.getId();
PartitionKey partitionKey = cosmosOperation.getCosmosPartitionKey();

Expand All @@ -61,32 +67,48 @@ private List<Record> executeRead(Operation operation) throws CosmosException {
return Arrays.asList(record);
}

private List<Record> executeReadWithIndex(Operation operation) throws CosmosException {
String query = makeQueryWithIndex(operation);
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
CosmosPagedIterable<Record> iterable =
getContainer(operation).queryItems(query, options, Record.class);

return Lists.newArrayList(iterable);
}

private List<Record> executeQuery(Operation operation) throws CosmosException {
CosmosOperation cosmosOperation = new CosmosOperation(operation, metadataManager);
cosmosOperation.checkArgument(Scan.class);
Scan scan = (Scan) operation;

String concatenatedPartitionKey = cosmosOperation.getConcatenatedPartitionKey();
SelectConditionStep<org.jooq.Record> select =
DSL.using(SQLDialect.DEFAULT)
.selectFrom("Record r")
.where(DSL.field("r.concatenatedPartitionKey").eq(concatenatedPartitionKey));

setStart(select, scan);
setEnd(select, scan);

setOrderings(select, scan.getOrderings());
String query;
CosmosQueryRequestOptions options;
if (Utility.isSecondaryIndexSpecified(scan, metadataManager.getTableMetadata(operation))) {
query = makeQueryWithIndex(scan);
options = new CosmosQueryRequestOptions();
} else {
String concatenatedPartitionKey = cosmosOperation.getConcatenatedPartitionKey();
SelectConditionStep<org.jooq.Record> select =
DSL.using(SQLDialect.DEFAULT)
.selectFrom("Record r")
.where(DSL.field("r.concatenatedPartitionKey").eq(concatenatedPartitionKey));

setStart(select, scan);
setEnd(select, scan);

setOrderings(select, scan.getOrderings());

query = select.getSQL(ParamType.INLINED);
options =
new CosmosQueryRequestOptions().setPartitionKey(cosmosOperation.getCosmosPartitionKey());
}

String query = select.getSQL(ParamType.INLINED);
if (scan.getLimit() > 0) {
// Add limit as a string
// because JOOQ doesn't support OFFSET LIMIT clause which Cosmos DB requires
query += " offset 0 limit " + scan.getLimit();
}

CosmosQueryRequestOptions options =
new CosmosQueryRequestOptions().setPartitionKey(cosmosOperation.getCosmosPartitionKey());

CosmosPagedIterable<Record> iterable =
getContainer(scan).queryItems(query, options, Record.class);

Expand Down Expand Up @@ -161,4 +183,23 @@ private void setOrderings(
select.orderBy(orderField);
});
}

private String makeQueryWithIndex(Operation operation) {
SelectWhereStep<org.jooq.Record> select = DSL.using(SQLDialect.DEFAULT).selectFrom("Record r");
Value keyValue = operation.getPartitionKey().get().get(0);
CosmosTableMetadata metadata = metadataManager.getTableMetadata(operation);
String fieldName;
if (metadata.getClusteringKeyNames().contains(keyValue.getName())) {
fieldName = "r.clusteringKey.";
} else {
fieldName = "r.values.";
}
Field<Object> field = DSL.field(fieldName + keyValue.getName());

ValueBinder binder = new ValueBinder();
binder.set(v -> select.where(field.eq(v)));
keyValue.accept(binder);

return select.getSQL(ParamType.INLINED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public void setUp() throws Exception {
when(metadata.getPartitionKeyNames())
.thenReturn(new HashSet<String>(Arrays.asList(ANY_NAME_1)));
when(metadata.getKeyNames()).thenReturn(Arrays.asList(ANY_NAME_1, ANY_NAME_2));
when(metadata.getSecondaryIndexNames())
.thenReturn(new HashSet<String>(Arrays.asList(ANY_NAME_3)));
}

private Get prepareGet() {
Expand Down Expand Up @@ -107,6 +109,29 @@ public void handle_GetOperationGiven_ShouldCallReadItem() {
verify(container).readItem(id, cosmosPartitionKey, Record.class);
}

@Test
public void handle_GetOperationWithIndexGiven_ShouldCallQueryItems() {
// Arrange
when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class)))
.thenReturn(responseIterable);
Record expected = new Record();
when(responseIterable.iterator()).thenReturn(Arrays.asList(expected).iterator());
Key indexKey = new Key(new TextValue(ANY_NAME_3, ANY_TEXT_3));
Get get = new Get(indexKey).forNamespace(ANY_KEYSPACE_NAME).forTable(ANY_TABLE_NAME);
String query =
"select * from Record r where r.values." + ANY_NAME_3 + " = '" + ANY_TEXT_3 + "'";

// Act Assert
assertThatCode(
() -> {
handler.handle(get);
})
.doesNotThrowAnyException();

// Assert
verify(container).queryItems(eq(query), any(CosmosQueryRequestOptions.class), eq(Record.class));
}

@Test
public void handle_CosmosExceptionWithNotFound_ShouldReturnEmptyList() throws Exception {
// Arrange
Expand Down Expand Up @@ -167,6 +192,30 @@ public void handle_ScanOperationGiven_ShouldCallQueryItems() {
verify(container).queryItems(eq(query), any(CosmosQueryRequestOptions.class), eq(Record.class));
}

@Test
public void handle_ScanOperationWithIndexGiven_ShouldCallQueryItems() {
// Arrange
when(container.queryItems(anyString(), any(CosmosQueryRequestOptions.class), eq(Record.class)))
.thenReturn(responseIterable);
Record expected = new Record();
when(responseIterable.iterator()).thenReturn(Arrays.asList(expected).iterator());

Key indexKey = new Key(new TextValue(ANY_NAME_3, ANY_TEXT_3));
Scan scan = new Scan(indexKey).forNamespace(ANY_KEYSPACE_NAME).forTable(ANY_TABLE_NAME);
String query =
"select * from Record r where r.values." + ANY_NAME_3 + " = '" + ANY_TEXT_3 + "'";

// Act Assert
assertThatCode(
() -> {
handler.handle(scan);
})
.doesNotThrowAnyException();

// Assert
verify(container).queryItems(eq(query), any(CosmosQueryRequestOptions.class), eq(Record.class));
}

@Test
public void handle_ScanOperationCosmosExceptionThrown_ShouldThrowExecutionException() {
// Arrange
Expand Down