-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add extra_properties to hive table properties #16846
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -222,6 +222,7 @@ | |
import static io.trino.plugin.hive.HiveTableProperties.CSV_QUOTE; | ||
import static io.trino.plugin.hive.HiveTableProperties.CSV_SEPARATOR; | ||
import static io.trino.plugin.hive.HiveTableProperties.EXTERNAL_LOCATION_PROPERTY; | ||
import static io.trino.plugin.hive.HiveTableProperties.EXTRA_PROPERTIES; | ||
import static io.trino.plugin.hive.HiveTableProperties.NULL_FORMAT_PROPERTY; | ||
import static io.trino.plugin.hive.HiveTableProperties.ORC_BLOOM_FILTER_COLUMNS; | ||
import static io.trino.plugin.hive.HiveTableProperties.ORC_BLOOM_FILTER_FPP; | ||
|
@@ -238,6 +239,7 @@ | |
import static io.trino.plugin.hive.HiveTableProperties.getAvroSchemaUrl; | ||
import static io.trino.plugin.hive.HiveTableProperties.getBucketProperty; | ||
import static io.trino.plugin.hive.HiveTableProperties.getExternalLocation; | ||
import static io.trino.plugin.hive.HiveTableProperties.getExtraProperties; | ||
import static io.trino.plugin.hive.HiveTableProperties.getFooterSkipCount; | ||
import static io.trino.plugin.hive.HiveTableProperties.getHeaderSkipCount; | ||
import static io.trino.plugin.hive.HiveTableProperties.getHiveStorageFormat; | ||
|
@@ -375,6 +377,22 @@ public class HiveMetadata | |
|
||
public static final String MODIFYING_NON_TRANSACTIONAL_TABLE_MESSAGE = "Modifying Hive table rows is only supported for transactional tables"; | ||
|
||
private static final Set<String> TABLE_PROPERTIES_TO_SKIP = ImmutableSet.of( | ||
PRESTO_VERSION_NAME, | ||
PRESTO_QUERY_ID_NAME, | ||
BUCKETING_VERSION, | ||
"EXTERNAL", | ||
"numFiles", | ||
"totalSize", | ||
"last_modified_time", | ||
"transient_lastDdlTime", | ||
"last_modified_by", | ||
"STATS_GENERATED_VIA_STATS_TASK", | ||
"COLUMN_STATS_ACCURATE", | ||
"numRows", | ||
"rawDataSize", | ||
"numFilesErasureCoded"); | ||
|
||
private final CatalogName catalogName; | ||
private final SemiTransactionalHiveMetastore metastore; | ||
private final boolean autoCommit; | ||
|
@@ -655,84 +673,91 @@ private ConnectorTableMetadata doGetTableMetadata(ConnectorSession session, Sche | |
properties.put(SORTED_BY_PROPERTY, property.getSortedBy()); | ||
}); | ||
|
||
TableParameterProvider tableParameterProvider = new TableParameterProvider(table); | ||
|
||
// Transactional properties | ||
String transactionalProperty = table.getParameters().get(HiveMetadata.TRANSACTIONAL); | ||
String transactionalProperty = tableParameterProvider.getParameter(HiveMetadata.TRANSACTIONAL); | ||
if (parseBoolean(transactionalProperty)) { | ||
properties.put(HiveTableProperties.TRANSACTIONAL, true); | ||
} | ||
|
||
// ORC format specific properties | ||
String orcBloomFilterColumns = table.getParameters().get(ORC_BLOOM_FILTER_COLUMNS_KEY); | ||
String orcBloomFilterColumns = tableParameterProvider.getParameter(ORC_BLOOM_FILTER_COLUMNS_KEY); | ||
if (orcBloomFilterColumns != null) { | ||
properties.put(ORC_BLOOM_FILTER_COLUMNS, Splitter.on(',').trimResults().omitEmptyStrings().splitToList(orcBloomFilterColumns)); | ||
} | ||
String orcBloomFilterFfp = table.getParameters().get(ORC_BLOOM_FILTER_FPP_KEY); | ||
String orcBloomFilterFfp = tableParameterProvider.getParameter(ORC_BLOOM_FILTER_FPP_KEY); | ||
if (orcBloomFilterFfp != null) { | ||
properties.put(ORC_BLOOM_FILTER_FPP, Double.parseDouble(orcBloomFilterFfp)); | ||
} | ||
|
||
// Avro specific property | ||
String avroSchemaUrl = table.getParameters().get(AVRO_SCHEMA_URL_KEY); | ||
String avroSchemaUrl = tableParameterProvider.getParameter(AVRO_SCHEMA_URL_KEY); | ||
if (avroSchemaUrl != null) { | ||
properties.put(AVRO_SCHEMA_URL, avroSchemaUrl); | ||
} | ||
String avroSchemaLiteral = table.getParameters().get(AVRO_SCHEMA_LITERAL_KEY); | ||
String avroSchemaLiteral = tableParameterProvider.getParameter(AVRO_SCHEMA_LITERAL_KEY); | ||
if (avroSchemaLiteral != null) { | ||
properties.put(AVRO_SCHEMA_LITERAL, avroSchemaLiteral); | ||
} | ||
|
||
// Textfile and CSV specific properties | ||
getSerdeProperty(table, SKIP_HEADER_COUNT_KEY) | ||
getSerdeProperty(tableParameterProvider, SKIP_HEADER_COUNT_KEY) | ||
.ifPresent(skipHeaderCount -> properties.put(SKIP_HEADER_LINE_COUNT, Integer.valueOf(skipHeaderCount))); | ||
getSerdeProperty(table, SKIP_FOOTER_COUNT_KEY) | ||
getSerdeProperty(tableParameterProvider, SKIP_FOOTER_COUNT_KEY) | ||
.ifPresent(skipFooterCount -> properties.put(SKIP_FOOTER_LINE_COUNT, Integer.valueOf(skipFooterCount))); | ||
|
||
// Multi-format property | ||
getSerdeProperty(table, NULL_FORMAT_KEY) | ||
getSerdeProperty(tableParameterProvider, NULL_FORMAT_KEY) | ||
.ifPresent(nullFormat -> properties.put(NULL_FORMAT_PROPERTY, nullFormat)); | ||
|
||
// Textfile specific properties | ||
getSerdeProperty(table, TEXT_FIELD_SEPARATOR_KEY) | ||
getSerdeProperty(tableParameterProvider, TEXT_FIELD_SEPARATOR_KEY) | ||
.ifPresent(fieldSeparator -> properties.put(TEXTFILE_FIELD_SEPARATOR, fieldSeparator)); | ||
getSerdeProperty(table, TEXT_FIELD_SEPARATOR_ESCAPE_KEY) | ||
getSerdeProperty(tableParameterProvider, TEXT_FIELD_SEPARATOR_ESCAPE_KEY) | ||
.ifPresent(fieldEscape -> properties.put(TEXTFILE_FIELD_SEPARATOR_ESCAPE, fieldEscape)); | ||
|
||
// CSV specific properties | ||
getCsvSerdeProperty(table, CSV_SEPARATOR_KEY) | ||
getCsvSerdeProperty(tableParameterProvider, CSV_SEPARATOR_KEY) | ||
.ifPresent(csvSeparator -> properties.put(CSV_SEPARATOR, csvSeparator)); | ||
getCsvSerdeProperty(table, CSV_QUOTE_KEY) | ||
getCsvSerdeProperty(tableParameterProvider, CSV_QUOTE_KEY) | ||
.ifPresent(csvQuote -> properties.put(CSV_QUOTE, csvQuote)); | ||
getCsvSerdeProperty(table, CSV_ESCAPE_KEY) | ||
getCsvSerdeProperty(tableParameterProvider, CSV_ESCAPE_KEY) | ||
.ifPresent(csvEscape -> properties.put(CSV_ESCAPE, csvEscape)); | ||
|
||
// REGEX specific properties | ||
getSerdeProperty(table, REGEX_KEY) | ||
getSerdeProperty(tableParameterProvider, REGEX_KEY) | ||
.ifPresent(regex -> properties.put(REGEX_PATTERN, regex)); | ||
getSerdeProperty(table, REGEX_CASE_SENSITIVE_KEY) | ||
getSerdeProperty(tableParameterProvider, REGEX_CASE_SENSITIVE_KEY) | ||
.ifPresent(regexCaseInsensitive -> properties.put(REGEX_CASE_INSENSITIVE, parseBoolean(regexCaseInsensitive))); | ||
|
||
Optional<String> comment = Optional.ofNullable(table.getParameters().get(TABLE_COMMENT)); | ||
Optional<String> comment = Optional.ofNullable(tableParameterProvider.getParameter(TABLE_COMMENT)); | ||
|
||
String autoPurgeProperty = table.getParameters().get(AUTO_PURGE_KEY); | ||
String autoPurgeProperty = tableParameterProvider.getParameter(AUTO_PURGE_KEY); | ||
if (parseBoolean(autoPurgeProperty)) { | ||
properties.put(AUTO_PURGE, true); | ||
} | ||
|
||
// Partition Projection specific properties | ||
properties.putAll(partitionProjectionService.getPartitionProjectionTrinoTableProperties(table)); | ||
properties.putAll(partitionProjectionService.getPartitionProjectionTrinoTableProperties(tableParameterProvider)); | ||
|
||
Map<String, String> remainingProperties = tableParameterProvider.getUnconsumedProperties(); | ||
if (!remainingProperties.isEmpty()) { | ||
properties.put(EXTRA_PROPERTIES, remainingProperties); | ||
} | ||
|
||
return new ConnectorTableMetadata(tableName, columns.build(), properties.buildOrThrow(), comment); | ||
} | ||
|
||
private static Optional<String> getCsvSerdeProperty(Table table, String key) | ||
private static Optional<String> getCsvSerdeProperty(TableParameterProvider tableParameterProvider, String key) | ||
{ | ||
return getSerdeProperty(table, key).map(csvSerdeProperty -> csvSerdeProperty.substring(0, 1)); | ||
return getSerdeProperty(tableParameterProvider, key).map(csvSerdeProperty -> csvSerdeProperty.substring(0, 1)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we certain than |
||
} | ||
|
||
private static Optional<String> getSerdeProperty(Table table, String key) | ||
private static Optional<String> getSerdeProperty(TableParameterProvider tableParameterProvider, String key) | ||
{ | ||
String serdePropertyValue = table.getStorage().getSerdeParameters().get(key); | ||
String tablePropertyValue = table.getParameters().get(key); | ||
String serdePropertyValue = tableParameterProvider.getTable().getStorage().getSerdeParameters().get(key); | ||
String tablePropertyValue = tableParameterProvider.getParameter(key); | ||
if (serdePropertyValue != null && tablePropertyValue != null && !tablePropertyValue.equals(serdePropertyValue)) { | ||
// in Hive one can set conflicting values for the same property, in such case it looks like table properties are used | ||
throw new TrinoException( | ||
|
@@ -1157,6 +1182,27 @@ else if (avroSchemaLiteral != null) { | |
tableProperties.put("numFiles", "-1"); | ||
tableProperties.put("totalSize", "-1"); | ||
|
||
// Extra properties | ||
getExtraProperties(tableMetadata.getProperties()) | ||
.ifPresent(extraProperties -> { | ||
Set<String> invalidProperties = ImmutableSet.<String>builder() | ||
.addAll(TABLE_PROPERTIES_TO_SKIP) | ||
.addAll(tableProperties.buildOrThrow().keySet()) | ||
.build(); | ||
|
||
if (invalidProperties.stream().anyMatch(extraProperties::containsKey)) { | ||
throw new TrinoException( | ||
INVALID_TABLE_PROPERTY, | ||
"Invalid keys in extra_properties: %s".formatted( | ||
extraProperties.keySet().stream() | ||
.filter(invalidProperties::contains) | ||
.collect(joining(" ,")))); | ||
} | ||
for (Map.Entry<String, String> extraProperty : extraProperties.entrySet()) { | ||
tableProperties.put(extraProperty.getKey(), extraProperty.getValue()); | ||
} | ||
}); | ||
|
||
// Table comment property | ||
tableMetadata.getComment().ifPresent(value -> tableProperties.put(TABLE_COMMENT, value)); | ||
|
||
|
@@ -3746,7 +3792,7 @@ public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session, | |
} | ||
// we need to chop off any "$partitions" and similar suffixes from table name while querying the metastore for the Table object | ||
TableNameSplitResult tableNameSplit = splitTableName(tableName.getTableName()); | ||
Optional<Table> table = metastore.getTable(tableName.getSchemaName(), tableNameSplit.getBaseTableName()); | ||
Optional<Table> table = metastore.getTable(tableName.getSchemaName(), tableNameSplit.baseTableName()); | ||
if (table.isEmpty() || VIRTUAL_VIEW.name().equals(table.get().getTableType())) { | ||
return Optional.empty(); | ||
} | ||
|
@@ -3760,7 +3806,7 @@ public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session, | |
name.getCatalogName(), | ||
new SchemaTableName( | ||
name.getSchemaTableName().getSchemaName(), | ||
name.getSchemaTableName().getTableName() + tableNameSplit.getSuffix().orElse("")))); | ||
name.getSchemaTableName().getTableName() + tableNameSplit.suffix().orElse("")))); | ||
} | ||
|
||
private Optional<CatalogSchemaTableName> redirectTableToIceberg(ConnectorSession session, Table table) | ||
|
@@ -3807,25 +3853,12 @@ private static TableNameSplitResult splitTableName(String tableName) | |
new TableNameSplitResult(tableName.substring(0, metadataMarkerIndex), Optional.of(tableName.substring(metadataMarkerIndex))); | ||
} | ||
|
||
private static class TableNameSplitResult | ||
private record TableNameSplitResult(String baseTableName, Optional<String> suffix) | ||
{ | ||
private final String baseTableName; | ||
private final Optional<String> suffix; | ||
|
||
public TableNameSplitResult(String baseTableName, Optional<String> suffix) | ||
{ | ||
this.baseTableName = requireNonNull(baseTableName, "baseTableName is null"); | ||
this.suffix = requireNonNull(suffix, "suffix is null"); | ||
} | ||
|
||
public String getBaseTableName() | ||
{ | ||
return baseTableName; | ||
} | ||
|
||
public Optional<String> getSuffix() | ||
private TableNameSplitResult | ||
{ | ||
return suffix; | ||
requireNonNull(baseTableName, "baseTableName is null"); | ||
requireNonNull(suffix, "suffix is null"); | ||
} | ||
} | ||
|
||
|
@@ -3859,4 +3892,33 @@ public boolean supportsReportingWrittenBytes(ConnectorSession session, SchemaTab | |
{ | ||
return true; | ||
} | ||
|
||
public static class TableParameterProvider | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's extract this class from |
||
{ | ||
private final Table table; | ||
private final Set<String> consumedParameters = new HashSet<>(TABLE_PROPERTIES_TO_SKIP); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ImmutableSet? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This set is per-instance and mutated as part of |
||
|
||
public TableParameterProvider(Table table) | ||
{ | ||
this.table = requireNonNull(table, "table is null"); | ||
} | ||
|
||
public String getParameter(String key) | ||
{ | ||
consumedParameters.add(key); | ||
return table.getParameters().get(key); | ||
} | ||
|
||
public Table getTable() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class should likely include the logic of |
||
{ | ||
return table; | ||
} | ||
|
||
public Map<String, String> getUnconsumedProperties() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
{ | ||
return table.getParameters().entrySet().stream() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could be tracked incrementally as part of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But if we need to use the same properties at multiple places then we might not be able to handle them. |
||
.filter(entry -> !consumedParameters.contains(entry.getKey())) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we lowercase the values entered in and also the values used for checking here just to be on the safe side? |
||
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,8 @@ | |
import io.trino.spi.TrinoException; | ||
import io.trino.spi.session.PropertyMetadata; | ||
import io.trino.spi.type.ArrayType; | ||
import io.trino.spi.type.MapType; | ||
import io.trino.spi.type.TypeManager; | ||
|
||
import javax.inject.Inject; | ||
|
||
|
@@ -69,13 +71,15 @@ public class HiveTableProperties | |
public static final String REGEX_CASE_INSENSITIVE = "regex_case_insensitive"; | ||
public static final String TRANSACTIONAL = "transactional"; | ||
public static final String AUTO_PURGE = "auto_purge"; | ||
public static final String EXTRA_PROPERTIES = "extra_properties"; | ||
|
||
private final List<PropertyMetadata<?>> tableProperties; | ||
|
||
@Inject | ||
public HiveTableProperties( | ||
HiveConfig config, | ||
OrcWriterConfig orcWriterConfig) | ||
OrcWriterConfig orcWriterConfig, | ||
TypeManager typeManager) | ||
{ | ||
tableProperties = ImmutableList.of( | ||
stringProperty( | ||
|
@@ -173,7 +177,25 @@ public HiveTableProperties( | |
PARTITION_PROJECTION_LOCATION_TEMPLATE, | ||
"Partition projection location template", | ||
null, | ||
false)); | ||
false), | ||
new PropertyMetadata<>( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add docs to |
||
EXTRA_PROPERTIES, | ||
"Extra table properties", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extra -> Arbitrary I'd argue that arbitrary is a better name than extra.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure that I agree with "arbitrary", because these properties presumably have meaning- just not one that Trino understands. Maybe... "additional_properties"? It's pretty close to "extra_properties", but maybe slightly clearer than "extra". Naming is hard. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I find that additional is in the very same category as extra.
Definitely, that's why actually arbitrary is definitely not the best choice. From Chat GPT
From the answer above, we have a few candidates:
I tend to go with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on offliine discussion we finalized the property name to |
||
new MapType(VARCHAR, VARCHAR, typeManager.getTypeOperators()), | ||
Map.class, | ||
null, | ||
false, | ||
value -> { | ||
Map<String, String> extraProperties = (Map<String, String>) value; | ||
if (extraProperties.isEmpty()) { | ||
throw new TrinoException(INVALID_TABLE_PROPERTY, "Extra table properties cannot be empty"); | ||
} | ||
if (extraProperties.containsValue(null)) { | ||
throw new TrinoException(INVALID_TABLE_PROPERTY, format("Extra table property value cannot be null '%s'", extraProperties)); | ||
} | ||
return extraProperties; | ||
}, | ||
value -> value)); | ||
} | ||
|
||
public List<PropertyMetadata<?>> getTableProperties() | ||
|
@@ -311,4 +333,9 @@ public static Optional<Boolean> isAutoPurge(Map<String, Object> tableProperties) | |
{ | ||
return Optional.ofNullable((Boolean) tableProperties.get(AUTO_PURGE)); | ||
} | ||
|
||
public static Optional<Map<String, String>> getExtraProperties(Map<String, Object> tableProperties) | ||
{ | ||
return Optional.ofNullable((Map<String, String>) tableProperties.get(EXTRA_PROPERTIES)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this property get produced ? I'm not aware of it.