Skip to content

Commit

Permalink
GEOMESA-3415 Allow for custom index table prefixes (#3233)
Browse files Browse the repository at this point in the history
* `index.table.prefix` user data specifies the prefix to use
  • Loading branch information
elahrvivaz authored Nov 12, 2024
1 parent 9c923e3 commit 7f4948c
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 29 deletions.
20 changes: 20 additions & 0 deletions docs/user/datastores/index_config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,26 @@ you may instead call the ``indexes`` methods:
.indices(List("id", "z3", "attr"))
.build("mySft")
Configuring Index Table Names
-----------------------------

The names used for index tables attempt to be unique, usually being composed of the catalog table name, the feature type name,
and the index identifier. In certain situations, it may be useful to modify the index table names. For example, in Accumulo
you may want to put index tables in different namespaces that have custom configurations. Table name prefixes can be set
using the user data key ``index.table.prefix``, or, to configure prefixes for a specific index type, ``index.table.prefix.<index>``
where ``<index>`` is an index name such as ``z3`` or ``id``:

.. code-block:: java
import org.locationtech.geomesa.utils.interop.SimpleFeatureTypes;
String spec = "name:String,dtg:Date,*geom:Point:srid=4326";
SimpleFeatureType sft = SimpleFeatureTypes.createType("mySft", spec);
// table names will look like geomesa.custom_mySft_id_v4
sft.getUserData().put("index.table.prefix", "geomesa.custom");
// override table names for just the z3 index
sft.getUserData().put("index.table.prefix.z3", "geomesa_z3.custom");
Configuring Feature ID Encoding
-------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,36 +152,40 @@ class AccumuloDataStore(val connector: AccumuloClient, override val config: Accu
override protected def preSchemaCreate(sft: SimpleFeatureType): Unit = {
import org.locationtech.geomesa.index.conf.SchemaProperties.ValidateDistributedClasspath

// validate that the accumulo runtime is available
val namespace = config.catalog.indexOf('.') match {
// call super first so that user data keys are updated
super.preSchemaCreate(sft)

def getNamespace(prefix: String): String = prefix.indexOf('.') match {
case -1 => ""
case i => config.catalog.substring(0, i)
case i => prefix.substring(0, i)
}
if (namespace.nonEmpty) {
adapter.ensureNamespaceExists(namespace)
}
val canLoad = connector.namespaceOperations().testClassLoad(namespace,
classOf[ProjectVersionIterator].getName, classOf[SortedKeyValueIterator[_, _]].getName)

if (!canLoad) {
val msg = s"Could not load GeoMesa distributed code from the Accumulo classpath for table '${config.catalog}'"
logger.error(msg)
if (ValidateDistributedClasspath.toBoolean.contains(true)) {
val nsMsg = if (namespace.isEmpty) { "" } else { s" for the namespace '$namespace'" }
throw new RuntimeException(s"$msg. You may override this check by setting the system property " +

val prefixes = Seq(config.catalog) ++ sft.getIndices.flatMap(i => sft.getTablePrefix(i.name))
prefixes.map(getNamespace).distinct.foreach { namespace =>
if (namespace.nonEmpty) {
adapter.ensureNamespaceExists(namespace)
}
// validate that the accumulo runtime is available
val canLoad = connector.namespaceOperations().testClassLoad(namespace,
classOf[ProjectVersionIterator].getName, classOf[SortedKeyValueIterator[_, _]].getName)

if (!canLoad) {
val msg = s"Could not load GeoMesa distributed code from the Accumulo classpath"
logger.error(s"$msg for catalog ${config.catalog}")
if (ValidateDistributedClasspath.toBoolean.contains(true)) {
val nsMsg = if (namespace.isEmpty) { "" } else { s" for the namespace '$namespace'" }
throw new RuntimeException(s"$msg. You may override this check by setting the system property " +
s"'${ValidateDistributedClasspath.property}=false'. Otherwise, please verify that the appropriate " +
s"JARs are installed$nsMsg - see http://www.geomesa.org/documentation/user/accumulo/install.html" +
"#installing-the-accumulo-distributed-runtime-library")
}
}
}

if (sft.getVisibilityLevel == VisibilityLevel.Attribute && sft.getAttributeCount > 255) {
throw new IllegalArgumentException("Attribute level visibility only supports up to 255 attributes")
}

super.preSchemaCreate(sft)

// note: dtg should be set appropriately before calling this method
sft.getDtgField.foreach { dtg =>
if (sft.getIndices.exists(i => i.name == JoinIndex.name && i.attributes.headOption.contains(dtg))) {
if (!GeoMesaSchemaValidator.declared(sft, OverrideDtgJoin)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ class AccumuloDataStoreTest extends Specification with TestWithMultipleSfts {
}
}

"delete all associated tables" >> {
"delete all associated tables" in {
val catalog = "AccumuloDataStoreDeleteAllTablesTest"
// note the table needs to be different to prevent testing errors
val ds = DataStoreFinder.getDataStore((dsParams ++ Map(AccumuloDataStoreParams.CatalogParam.key -> catalog)).asJava).asInstanceOf[AccumuloDataStore]
Expand All @@ -534,7 +534,7 @@ class AccumuloDataStoreTest extends Specification with TestWithMultipleSfts {
ds.connector.tableOperations().list().asScala.toSeq must not(containAnyOf(tables))
}

"query on bbox and unbounded temporal" >> {
"query on bbox and unbounded temporal" in {
val sft = createNewSchema("name:String,dtg:Date,*geom:Point:srid=4326")

addFeatures((0 until 6).map { i =>
Expand All @@ -554,7 +554,7 @@ class AccumuloDataStoreTest extends Specification with TestWithMultipleSfts {
read.map(_.getID) must containAllOf(Seq("2", "3", "4"))
}

"create tables with an accumulo namespace" >> {
"create tables with an accumulo namespace" in {
val table = "test.AccumuloDataStoreNamespaceTest"
val params = dsParams ++ Map(AccumuloDataStoreParams.CatalogParam.key -> table)
val dsWithNs = DataStoreFinder.getDataStore(params.asJava).asInstanceOf[AccumuloDataStore]
Expand All @@ -563,7 +563,7 @@ class AccumuloDataStoreTest extends Specification with TestWithMultipleSfts {
dsWithNs.connector.namespaceOperations().exists("test") must beTrue
}

"only create catalog table when necessary" >> {
"only create catalog table when necessary" in {
val table = "AccumuloDataStoreTableTest"
val params = dsParams ++ Map(AccumuloDataStoreParams.CatalogParam.key -> table)
val ds = DataStoreFinder.getDataStore(params.asJava).asInstanceOf[AccumuloDataStore]
Expand All @@ -581,7 +581,7 @@ class AccumuloDataStoreTest extends Specification with TestWithMultipleSfts {
ds.getSchema("test") must not(beNull)
}

"create tables with block cache enabled/disabled" >> {
"create tables with block cache enabled/disabled" in {
foreach(Seq(",geomesa.table.partition=time", "")) { partitioned =>
val sft = createNewSchema(s"name:String:index=true,dtg:Date,*geom:Point:srid=4326;table.cache.enabled='z3,attr'$partitioned")
addFeatures((0 until 6).map { i =>
Expand All @@ -606,5 +606,22 @@ class AccumuloDataStoreTest extends Specification with TestWithMultipleSfts {
}
}
}

"create index tables with a different prefix than the catalog table" in {
val prefix = s"custom.${catalog.replaceFirst(".*\\.", "")}"
foreach(Seq(",geomesa.table.partition=time", "")) { partitioned =>
val userData = s"index.table.prefix='$prefix',index.table.prefix.z3='z3$prefix'$partitioned"
val sft = createNewSchema(s"name:String:index=true,dtg:Date,*geom:Point:srid=4326;$userData")
addFeatures((0 until 6).map { i =>
val sf = new ScalaSimpleFeature(sft, i.toString)
sf.setAttributes(Array[AnyRef](i.toString, s"2012-01-02T05:0$i:07.000Z", s"POINT(45.0 4$i.0)"))
sf
})
foreach(ds.manager.indices(sft)) { index =>
val p = if (index.name == Z3Index.name) { s"z3$prefix" } else { prefix }
foreach(index.getTableNames())(_ must startWith(s"${p}_${sft.getTypeName}_"))
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,10 @@ abstract class GeoMesaFeatureIndex[T, U](val ds: GeoMesaDataStore[_],
*/
protected def generateTableName(partition: Option[String] = None, limit: Option[Int] = None): String = {
import StringSerialization.alphaNumericSafeString
import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType

val prefix = (ds.config.catalog +: Seq(sft.getTypeName, name).map(alphaNumericSafeString)).mkString("_")
val namespace = sft.getTablePrefix(name).getOrElse(ds.config.catalog)
val prefix = (namespace +: Seq(sft.getTypeName, name).map(alphaNumericSafeString)).mkString("_")
val suffix = s"v$version${partition.map(p => s"_$p").getOrElse("")}"

def build(attrs: Seq[String]): String = (prefix +: attrs :+ suffix).mkString("_")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,12 @@ abstract class GeoMesaDataStore[DS <: GeoMesaDataStore[DS]](val config: GeoMesaD

@throws(classOf[IllegalArgumentException])
override protected def preSchemaCreate(sft: SimpleFeatureType): Unit = {
// check for old enabled indices and re-map them
// noinspection ScalaDeprecation
SimpleFeatureTypes.Configs.ENABLED_INDEX_OPTS.drop(1).find(sft.getUserData.containsKey).foreach { key =>
sft.getUserData.put(SimpleFeatureTypes.Configs.EnabledIndices, sft.getUserData.remove(key))
// check for old user data keys and re-map them
InternalConfigs.DeprecatedConfigMappings.foreach { case (from, to) =>
val v = sft.getUserData.remove(from)
if (v != null) {
sft.getUserData.put(to, v)
}
}

// validate column groups
Expand Down Expand Up @@ -155,6 +157,11 @@ abstract class GeoMesaDataStore[DS <: GeoMesaDataStore[DS]](val config: GeoMesaD
InternalConfigs.PartitionConfigMappings.foreach { case (from, to) =>
Option(sft.getUserData.get(from)).foreach(sft.getUserData.put(to, _))
}
InternalConfigs.PartitionConfigPrefixMappings.foreach { case (from, to) =>
sft.getUserData.asScala.toMap.collect {
case (k: String, v) if k.startsWith(from) => sft.getUserData.put(to + k.substring(from.length), v)
}
}
}

// set stats enabled based on the data store config if not explicitly set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.locationtech.geomesa.curve.TimePeriod.TimePeriod
import org.locationtech.geomesa.curve.{TimePeriod, XZSFC}
import org.locationtech.geomesa.utils.conf.{FeatureExpiration, IndexId, SemanticVersion}
import org.locationtech.geomesa.utils.geometry.GeometryPrecision
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.Configs
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.{Configs, InternalConfigs}
import org.locationtech.geomesa.utils.index.VisibilityLevel
import org.locationtech.geomesa.utils.index.VisibilityLevel.VisibilityLevel
import org.locationtech.geomesa.utils.stats.Cardinality
Expand Down Expand Up @@ -333,6 +333,11 @@ object RichSimpleFeatureType extends Conversions {

def isPartitioned: Boolean = sft.getUserData.containsKey(Configs.TablePartitioning)

def getTablePrefix(indexName: String): Option[String] = {
val key = if (isPartitioned) { InternalConfigs.PartitionTablePrefix } else { Configs.IndexTablePrefix }
userData[String](s"$key.$indexName").orElse(userData[String](key))
}

def getRemoteVersion: Option[SemanticVersion] = userData[String](RemoteVersion).map(SemanticVersion.apply)

def getKeywords: Set[String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ object SimpleFeatureTypes {
val IndexAttributeShards = "geomesa.attr.splits"
val IndexIdShards = "geomesa.id.splits"
val IndexIgnoreDtg = "geomesa.ignore.dtg"
val IndexTablePrefix = "index.table.prefix"
val IndexVisibilityLevel = "geomesa.visibility.level"
val IndexXzPrecision = "geomesa.xz.precision"
val IndexZ3Interval = "geomesa.z3.interval"
Expand Down Expand Up @@ -80,6 +81,7 @@ object SimpleFeatureTypes {
val IndexVersions = "geomesa.indices"
val PartitionSplitterClass = "geomesa.splitter.class"
val PartitionSplitterOpts = "geomesa.splitter.opts"
val PartitionTablePrefix = "geomesa.table.prefix"
val RemoteVersion = "gm.remote.version" // note: doesn't start with geomesa so we don't persist it
val PartitionTableCache = "geomesa.table.cache"
val KeywordsDelimiter = "\u0000"
Expand All @@ -90,6 +92,15 @@ object SimpleFeatureTypes {
Configs.TableSplitterClass -> PartitionSplitterClass,
Configs.TableSplitterOpts -> PartitionSplitterOpts,
)
val PartitionConfigPrefixMappings: Map[String, String] = Map(
Configs.IndexTablePrefix -> PartitionTablePrefix,
)

// deprecated configs that we want to re-map for back-compatibility
val DeprecatedConfigMappings: Map[String, String] = Map(
"geomesa.indexes.enabled" -> Configs.EnabledIndices,
"table.indexes.enabled" -> Configs.EnabledIndices,
)
}

object AttributeOptions {
Expand Down

0 comments on commit 7f4948c

Please sign in to comment.