Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Scalar-Schema] Set Cosmos secondary index #141

Merged
merged 2 commits into from
Jan 22, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions tools/scalar-schema/java/src/CosmosTableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class CosmosTableMetadata {
private String id;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of my curiosity. Why do we have CosmosTableMetadata in this schema tool project?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class is used for metadata insertion.
Cosmos DB SDK serializes/deserializes the data according to this class's setter and getter.

This tool gives the instance to the Upsert API.

metadata (doto (CosmosTableMetadata.)

private SortedSet<String> partitionKeyNames;
private SortedSet<String> clusteringKeyNames;
private SortedSet<String> secondaryIndexNames;
private SortedMap<String, String> columns;
private List<String> keyNames;

Expand All @@ -34,6 +35,10 @@ public void setClusteringKeyNames(Set<String> clusteringKeyNames) {
this.clusteringKeyNames = ImmutableSortedSet.copyOf(clusteringKeyNames);
}

public void setSecondaryIndexNames(Set<String> secondaryIndexNames) {
this.secondaryIndexNames = ImmutableSortedSet.copyOf(secondaryIndexNames);
}

public void setColumns(Map<String, String> columns) {
this.columns = ImmutableSortedMap.copyOf(columns);
}
Expand All @@ -54,6 +59,10 @@ public Set<String> getClusteringKeyNames() {
return ImmutableSortedSet.copyOf(clusteringKeyNames);
}

public Set<String> getSecondaryIndexNames() {
return ImmutableSortedSet.copyOf(secondaryIndexNames);
}

public Map<String, String> getColumns() {
return Collections.unmodifiableSortedMap(columns);
}
Expand Down
61 changes: 34 additions & 27 deletions tools/scalar-schema/src/scalar_schema/cosmos.clj
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
(def ^:const ^:private ^String
PARTITION_KEY_PATH "/concatenatedPartitionKey/?")
(def ^:const ^:private ^String CLUSTERING_KEY_PATH "/clusteringKey/*")
(def ^:const ^:private ^String SECONDARY_INDEX_PATH "/values/")

(def ^:const ^:private REGISTERED_STORED_PROCEDURE "mutate.js")

Expand All @@ -39,10 +40,10 @@
(catch Exception _ false)))

(defn- container-exists?
[client database container]
[client {:keys [database table]}]
(try
(-> (.getDatabase client database)
(.getContainer container)
(.getContainer table)
.read nil? not)
(catch Exception _ false)))

Expand All @@ -65,45 +66,52 @@
(.replaceThroughput db (make-throughput-properties ru no-scaling)))))

(defn- make-container-properties
[container]
(if (= container common/METADATA_TABLE)
(CosmosContainerProperties. container METADATA_PARTITION_KEY)
[{:keys [table secondary-index]}]
(if (= table common/METADATA_TABLE)
(CosmosContainerProperties. table METADATA_PARTITION_KEY)
(let [policy (doto (IndexingPolicy.)
(.setIncludedPaths
[(IncludedPath. PARTITION_KEY_PATH)
(IncludedPath. CLUSTERING_KEY_PATH)])
(into [(IncludedPath. PARTITION_KEY_PATH)
(IncludedPath. CLUSTERING_KEY_PATH)]
(map #(IncludedPath.
(str SECONDARY_INDEX_PATH % "/?"))
secondary-index)))
(.setExcludedPaths [(ExcludedPath. "/*")]))]
(doto (CosmosContainerProperties. container CONTAINER_PARTITION_KEY)
(doto (CosmosContainerProperties. table CONTAINER_PARTITION_KEY)
(.setIndexingPolicy policy)))))

(defn- create-container
[client database container]
(let [prop (make-container-properties container)]
[client {:keys [database] :as schema}]
(let [prop (make-container-properties schema)]
(-> (.getDatabase client database)
(.createContainerIfNotExists prop))))

(defn- create-metadata
[client schema prefix]
[client
{:keys [database table partition-key clustering-key secondary-index columns]}
prefix]
(let [prefixed-database (if prefix
(str prefix \_ common/METADATA_DATABASE)
common/METADATA_DATABASE)
metadata (doto (CosmosTableMetadata.)
(.setId (common/get-fullname (:database schema)
(:table schema)))
(.setPartitionKeyNames (set (:partition-key schema)))
(.setClusteringKeyNames (set (:clustering-key schema)))
(.setColumns (:columns schema)))]
(.setId (common/get-fullname database table))
(.setPartitionKeyNames (set partition-key))
(.setClusteringKeyNames (set clustering-key))
(.setSecondaryIndexNames (set secondary-index))
(.setColumns columns))]
(when-not (database-exists? client prefixed-database)
(create-database client prefixed-database 400 true))
(when-not (container-exists? client prefixed-database common/METADATA_TABLE)
(create-container client prefixed-database common/METADATA_TABLE))
(when-not (container-exists? client {:database prefixed-database
:table common/METADATA_TABLE})
(create-container client {:database prefixed-database
:table common/METADATA_TABLE}))
(-> (.getDatabase client prefixed-database)
(.getContainer common/METADATA_TABLE)
(.upsertItem metadata))))

(defn- register-stored-procedure
[client database container]
(let [scripts (-> client (.getDatabase database) (.getContainer container)
[client {:keys [database table]}]
(let [scripts (-> client (.getDatabase database) (.getContainer table)
.getScripts)
properties (CosmosStoredProcedureProperties.
REGISTERED_STORED_PROCEDURE
Expand All @@ -112,21 +120,20 @@
(CosmosStoredProcedureRequestOptions.))))

(defn- create-table
[client schema {:keys [ru prefix no-scaling] :or {ru 400 no-scaling false}}]
(let [database (:database schema)
table (:table schema)
ru (if (:ru schema) (:ru schema) ru)]
[client {:keys [database table] :as schema}
{:keys [ru prefix no-scaling] :or {ru 400 no-scaling false}}]
(let [ru (if (:ru schema) (:ru schema) ru)]
(create-metadata client schema prefix)
(if (database-exists? client database)
(do
(update-throughput client database ru no-scaling)
(log/warn database "already exists"))
(create-database client database ru no-scaling))
(if (container-exists? client database table)
(if (container-exists? client schema)
(log/warn (common/get-fullname database table) "already exists")
(do
(create-container client database table)
(register-stored-procedure client database table)))))
(create-container client schema)
(register-stored-procedure client schema)))))

(defn- delete-table
[client {:keys [database]}]
Expand Down