From 5dc41d458be2af705a8433058efe2f33254ce572 Mon Sep 17 00:00:00 2001 From: Kamil Endruszkiewicz Date: Wed, 22 Feb 2023 10:29:49 +0100 Subject: [PATCH] Read spark generated statistics in hive connector --- docs/src/main/sphinx/connector/hive.rst | 2 + .../metastore/thrift/ThriftHiveMetastore.java | 22 +- .../thrift/ThriftHiveMetastoreFactory.java | 3 + .../thrift/ThriftMetastoreConfig.java | 14 ++ .../ThriftMetastoreParameterParserUtils.java | 60 ++++- .../metastore/thrift/ThriftMetastoreUtil.java | 17 +- .../thrift/ThriftSparkMetastoreUtil.java | 167 +++++++++++++ .../thrift/TestThriftMetastoreConfig.java | 5 +- .../thrift/TestThriftMetastoreUtil.java | 43 ++++ .../thrift/TestThriftSparkMetastoreUtil.java | 221 ++++++++++++++++++ ...EnvSinglenodeSparkHiveNoStatsFallback.java | 84 +++++++ .../suite/suites/Suite7NonGeneric.java | 4 + .../hive.properties | 8 + .../spark-defaults.conf | 9 + .../io/trino/tests/product/TestGroups.java | 1 + .../hive/TestHiveSparkCompatibility.java | 205 ++++++++++++++++ 16 files changed, 860 insertions(+), 5 deletions(-) create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftSparkMetastoreUtil.java create mode 100644 plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestThriftSparkMetastoreUtil.java create mode 100644 testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkHiveNoStatsFallback.java create mode 100644 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-hive-no-stats-fallback/hive.properties create mode 100644 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-hive-no-stats-fallback/spark-defaults.conf diff --git a/docs/src/main/sphinx/connector/hive.rst b/docs/src/main/sphinx/connector/hive.rst index edded256dc36..e4ded25bbccb 100644 --- a/docs/src/main/sphinx/connector/hive.rst +++ b/docs/src/main/sphinx/connector/hive.rst @@ -630,6 +630,8 @@ Property Name Description ``KERBEROS``. Default is ``NONE``. * - ``hive.metastore.thrift.impersonation.enabled`` - Enable Hive metastore end user impersonation. + * - ``hive.metastore.thrift.use-spark-table-statistics-fallback`` + - Enable usage of table statistics generated by Apache Spark when hive table statistics are not available * - ``hive.metastore.thrift.delegation-token.cache-ttl`` - Time to live delegation token cache for metastore. Default is ``1h``. * - ``hive.metastore.thrift.delegation-token.cache-maximum-size`` diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java index 5bdafd082fb4..fe0ca923c499 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java @@ -123,6 +123,7 @@ import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromMetastoreApiTable; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromRolePrincipalGrants; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromTrinoPrincipalType; +import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.getBasicStatisticsWithSparkFallback; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.getHiveBasicStatistics; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.isAvroTableWithSchemaSet; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.parsePrivilege; @@ -156,6 +157,7 @@ public class ThriftHiveMetastore private final boolean deleteFilesOnDrop; private final boolean translateHiveViews; private final boolean assumeCanonicalPartitionKeys; + private final boolean useSparkTableStatisticsFallback; private final ThriftMetastoreStats stats; private final ExecutorService writeStatisticsExecutor; @@ -172,6 +174,7 @@ public ThriftHiveMetastore( boolean deleteFilesOnDrop, boolean translateHiveViews, boolean assumeCanonicalPartitionKeys, + boolean useSparkTableStatisticsFallback, ThriftMetastoreStats stats, ExecutorService writeStatisticsExecutor) { @@ -187,6 +190,7 @@ public ThriftHiveMetastore( this.deleteFilesOnDrop = deleteFilesOnDrop; this.translateHiveViews = translateHiveViews; this.assumeCanonicalPartitionKeys = assumeCanonicalPartitionKeys; + this.useSparkTableStatisticsFallback = useSparkTableStatisticsFallback; this.stats = requireNonNull(stats, "stats is null"); this.writeStatisticsExecutor = requireNonNull(writeStatisticsExecutor, "writeStatisticsExecutor is null"); } @@ -325,7 +329,16 @@ public PartitionStatistics getTableStatistics(Table table) List dataColumns = table.getSd().getCols().stream() .map(FieldSchema::getName) .collect(toImmutableList()); - HiveBasicStatistics basicStatistics = getHiveBasicStatistics(table.getParameters()); + Map parameters = table.getParameters(); + HiveBasicStatistics basicStatistics = getHiveBasicStatistics(parameters); + + if (useSparkTableStatisticsFallback && basicStatistics.getRowCount().isEmpty()) { + PartitionStatistics sparkTableStatistics = ThriftSparkMetastoreUtil.getTableStatistics(table); + if (sparkTableStatistics.getBasicStatistics().getRowCount().isPresent()) { + return sparkTableStatistics; + } + } + Map columnStatistics = getTableColumnStatistics(table.getDbName(), table.getTableName(), dataColumns, basicStatistics.getRowCount()); return new PartitionStatistics(basicStatistics, columnStatistics); } @@ -366,7 +379,12 @@ public Map getPartitionStatistics(Table table, List Map partitionBasicStatistics = partitions.stream() .collect(toImmutableMap( partition -> makePartName(partitionColumns, partition.getValues()), - partition -> getHiveBasicStatistics(partition.getParameters()))); + partition -> { + if (useSparkTableStatisticsFallback) { + return getBasicStatisticsWithSparkFallback(partition.getParameters()); + } + return getHiveBasicStatistics(partition.getParameters()); + })); Map partitionRowCounts = partitionBasicStatistics.entrySet().stream() .collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().getRowCount())); Map> partitionColumnStatistics = getPartitionColumnStatistics( diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastoreFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastoreFactory.java index 012b36b0e590..22e6ad93b158 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastoreFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastoreFactory.java @@ -43,6 +43,7 @@ public class ThriftHiveMetastoreFactory private final boolean deleteFilesOnDrop; private final boolean translateHiveViews; private final boolean assumeCanonicalPartitionKeys; + private final boolean useSparkTableStatisticsFallback; private final ExecutorService writeStatisticsExecutor; private final ThriftMetastoreStats stats = new ThriftMetastoreStats(); @@ -69,6 +70,7 @@ public ThriftHiveMetastoreFactory( this.maxWaitForLock = thriftConfig.getMaxWaitForTransactionLock(); this.assumeCanonicalPartitionKeys = thriftConfig.isAssumeCanonicalPartitionKeys(); + this.useSparkTableStatisticsFallback = thriftConfig.isUseSparkTableStatisticsFallback(); this.writeStatisticsExecutor = requireNonNull(writeStatisticsExecutor, "writeStatisticsExecutor is null"); } @@ -101,6 +103,7 @@ public ThriftMetastore createMetastore(Optional identity) deleteFilesOnDrop, translateHiveViews, assumeCanonicalPartitionKeys, + useSparkTableStatisticsFallback, stats, writeStatisticsExecutor); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreConfig.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreConfig.java index 63f5205762af..c930163e2f75 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreConfig.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreConfig.java @@ -39,6 +39,7 @@ public class ThriftMetastoreConfig private Duration maxBackoffDelay = RetryDriver.DEFAULT_SLEEP_TIME; private Duration maxRetryTime = RetryDriver.DEFAULT_MAX_RETRY_TIME; private boolean impersonationEnabled; + private boolean useSparkTableStatisticsFallback = true; private Duration delegationTokenCacheTtl = new Duration(1, TimeUnit.HOURS); // The default lifetime in Hive is 7 days (metastore.cluster.delegation.token.max-lifetime) private long delegationTokenCacheMaximumSize = 1000; private boolean deleteFilesOnDrop; @@ -158,6 +159,19 @@ public ThriftMetastoreConfig setImpersonationEnabled(boolean impersonationEnable return this; } + public boolean isUseSparkTableStatisticsFallback() + { + return useSparkTableStatisticsFallback; + } + + @Config("hive.metastore.thrift.use-spark-table-statistics-fallback") + @ConfigDescription("Enable usage of table statistics generated by Apache Spark when hive table statistics are not available") + public ThriftMetastoreConfig setUseSparkTableStatisticsFallback(boolean useSparkTableStatisticsFallback) + { + this.useSparkTableStatisticsFallback = useSparkTableStatisticsFallback; + return this; + } + @NotNull @MinDuration("0ms") public Duration getDelegationTokenCacheTtl() diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreParameterParserUtils.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreParameterParserUtils.java index 6517b5d45bda..f6c35e8b9d66 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreParameterParserUtils.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreParameterParserUtils.java @@ -13,16 +13,31 @@ */ package io.trino.plugin.hive.metastore.thrift; +import com.google.common.primitives.Doubles; import com.google.common.primitives.Longs; import javax.annotation.Nullable; +import java.math.BigDecimal; +import java.time.DateTimeException; +import java.time.LocalDate; +import java.util.Optional; +import java.util.OptionalDouble; import java.util.OptionalLong; -class ThriftMetastoreParameterParserUtils +final class ThriftMetastoreParameterParserUtils { private ThriftMetastoreParameterParserUtils() {} + static Optional toBoolean(@Nullable String parameterValue) + { + if (parameterValue == null) { + return Optional.empty(); + } + Boolean value = Boolean.parseBoolean(parameterValue); + return Optional.of(value); + } + static OptionalLong toLong(@Nullable String parameterValue) { if (parameterValue == null) { @@ -34,4 +49,47 @@ static OptionalLong toLong(@Nullable String parameterValue) } return OptionalLong.of(longValue); } + + static OptionalDouble toDouble(@Nullable String parameterValue) + { + if (parameterValue == null) { + return OptionalDouble.empty(); + } + Double doubleValue = Doubles.tryParse(parameterValue); + if (doubleValue == null || doubleValue < 0) { + return OptionalDouble.empty(); + } + return OptionalDouble.of(doubleValue); + } + + static Optional toDecimal(@Nullable String parameterValue) + { + if (parameterValue == null) { + return Optional.empty(); + } + try { + BigDecimal decimal = new BigDecimal(parameterValue); + if (decimal.compareTo(BigDecimal.ZERO) < 0) { + return Optional.empty(); + } + return Optional.of(decimal); + } + catch (NumberFormatException exception) { + return Optional.empty(); + } + } + + static Optional toDate(@Nullable String parameterValue) + { + if (parameterValue == null) { + return Optional.empty(); + } + try { + LocalDate date = LocalDate.parse(parameterValue); + return Optional.of(date); + } + catch (DateTimeException exception) { + return Optional.empty(); + } + } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreUtil.java index 5cad97cfa4da..07d2b270ba38 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreUtil.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftMetastoreUtil.java @@ -125,6 +125,7 @@ import static io.trino.plugin.hive.metastore.HivePrivilegeInfo.HivePrivilege.SELECT; import static io.trino.plugin.hive.metastore.HivePrivilegeInfo.HivePrivilege.UPDATE; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreParameterParserUtils.toLong; +import static io.trino.plugin.hive.metastore.thrift.ThriftSparkMetastoreUtil.getSparkBasicStatistics; import static io.trino.plugin.hive.type.Category.PRIMITIVE; import static io.trino.spi.security.PrincipalType.ROLE; import static io.trino.spi.security.PrincipalType.USER; @@ -144,10 +145,10 @@ public final class ThriftMetastoreUtil { + public static final String NUM_ROWS = "numRows"; private static final String PUBLIC_ROLE_NAME = "public"; private static final String ADMIN_ROLE_NAME = "admin"; private static final String NUM_FILES = "numFiles"; - public static final String NUM_ROWS = "numRows"; private static final String RAW_DATA_SIZE = "rawDataSize"; private static final String TOTAL_SIZE = "totalSize"; private static final Set STATS_PROPERTIES = ImmutableSet.of(NUM_FILES, NUM_ROWS, RAW_DATA_SIZE, TOTAL_SIZE); @@ -749,6 +750,20 @@ public static HiveBasicStatistics getHiveBasicStatistics(Map par return new HiveBasicStatistics(numFiles, numRows, inMemoryDataSizeInBytes, onDiskDataSizeInBytes); } + public static HiveBasicStatistics getBasicStatisticsWithSparkFallback(Map parameters) + { + HiveBasicStatistics basicStatistics = getHiveBasicStatistics(parameters); + // Partitioned table without statistics + if (basicStatistics.getRowCount().isEmpty() || basicStatistics.getRowCount().getAsLong() == 0L) { + HiveBasicStatistics sparkBasicStatistics = getSparkBasicStatistics(parameters); + if (sparkBasicStatistics.getRowCount().isPresent()) { + return sparkBasicStatistics; + } + } + + return basicStatistics; + } + public static Map updateStatisticsParameters(Map parameters, HiveBasicStatistics statistics) { ImmutableMap.Builder result = ImmutableMap.builder(); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftSparkMetastoreUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftSparkMetastoreUtil.java new file mode 100644 index 000000000000..5b75c4837073 --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftSparkMetastoreUtil.java @@ -0,0 +1,167 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.thrift; + +import com.google.common.annotations.VisibleForTesting; +import io.trino.hive.thrift.metastore.FieldSchema; +import io.trino.hive.thrift.metastore.Table; +import io.trino.plugin.hive.HiveBasicStatistics; +import io.trino.plugin.hive.HiveType; +import io.trino.plugin.hive.PartitionStatistics; +import io.trino.plugin.hive.metastore.HiveColumnStatistics; +import io.trino.plugin.hive.type.PrimitiveTypeInfo; +import io.trino.plugin.hive.type.TypeInfo; + +import java.util.AbstractMap; +import java.util.Map; +import java.util.OptionalDouble; +import java.util.OptionalLong; + +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static io.trino.plugin.hive.metastore.HiveColumnStatistics.createBinaryColumnStatistics; +import static io.trino.plugin.hive.metastore.HiveColumnStatistics.createBooleanColumnStatistics; +import static io.trino.plugin.hive.metastore.HiveColumnStatistics.createDateColumnStatistics; +import static io.trino.plugin.hive.metastore.HiveColumnStatistics.createDecimalColumnStatistics; +import static io.trino.plugin.hive.metastore.HiveColumnStatistics.createDoubleColumnStatistics; +import static io.trino.plugin.hive.metastore.HiveColumnStatistics.createIntegerColumnStatistics; +import static io.trino.plugin.hive.metastore.HiveColumnStatistics.createStringColumnStatistics; +import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreParameterParserUtils.toDate; +import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreParameterParserUtils.toDecimal; +import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreParameterParserUtils.toDouble; +import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreParameterParserUtils.toLong; +import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.NUM_ROWS; +import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.getTotalSizeInBytes; +import static io.trino.plugin.hive.type.Category.PRIMITIVE; + +final class ThriftSparkMetastoreUtil +{ + private static final String SPARK_SQL_STATS_PREFIX = "spark.sql.statistics."; + private static final String COLUMN_STATS_PREFIX = SPARK_SQL_STATS_PREFIX + "colStats."; + private static final String NUM_FILES = "numFiles"; + private static final String RAW_DATA_SIZE = "rawDataSize"; + private static final String TOTAL_SIZE = "totalSize"; + private static final String COLUMN_MIN = "min"; + private static final String COLUMN_MAX = "max"; + + private ThriftSparkMetastoreUtil() {} + + public static PartitionStatistics getTableStatistics(Table table) + { + Map parameters = table.getParameters(); + HiveBasicStatistics sparkBasicStatistics = getSparkBasicStatistics(parameters); + if (sparkBasicStatistics.getRowCount().isEmpty()) { + return PartitionStatistics.empty(); + } + + Map columnStatistics = table.getSd().getCols().stream() + .map(fieldSchema -> new AbstractMap.SimpleEntry<>( + fieldSchema.getName(), + fromMetastoreColumnStatistics(fieldSchema, parameters, sparkBasicStatistics.getRowCount().getAsLong()))) + .collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); + return new PartitionStatistics(sparkBasicStatistics, columnStatistics); + } + + public static HiveBasicStatistics getSparkBasicStatistics(Map parameters) + { + OptionalLong rowCount = toLong(parameters.get(SPARK_SQL_STATS_PREFIX + NUM_ROWS)); + if (rowCount.isEmpty()) { + return HiveBasicStatistics.createEmptyStatistics(); + } + OptionalLong fileCount = toLong(parameters.get(SPARK_SQL_STATS_PREFIX + NUM_FILES)); + OptionalLong inMemoryDataSizeInBytes = toLong(parameters.get(SPARK_SQL_STATS_PREFIX + RAW_DATA_SIZE)); + OptionalLong onDiskDataSizeInBytes = toLong(parameters.get(SPARK_SQL_STATS_PREFIX + TOTAL_SIZE)); + return new HiveBasicStatistics(fileCount, rowCount, inMemoryDataSizeInBytes, onDiskDataSizeInBytes); + } + + @VisibleForTesting + static HiveColumnStatistics fromMetastoreColumnStatistics(FieldSchema fieldSchema, Map columnStatistics, long rowCount) + { + HiveType type = HiveType.valueOf(fieldSchema.getType()); + TypeInfo typeInfo = type.getTypeInfo(); + if (typeInfo.getCategory() != PRIMITIVE) { + // Spark does not support table statistics for non-primitive types + return HiveColumnStatistics.empty(); + } + String field = COLUMN_STATS_PREFIX + fieldSchema.getName() + "."; + OptionalLong maxLength = toLong(columnStatistics.get(field + "maxLen")); + OptionalDouble avgLength = toDouble(columnStatistics.get(field + "avgLen")); + OptionalLong nullsCount = toLong(columnStatistics.get(field + "nullCount")); + OptionalLong distinctValuesCount = toLong(columnStatistics.get(field + "distinctCount")); + + return switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) { + case BOOLEAN -> createBooleanColumnStatistics( + OptionalLong.empty(), + OptionalLong.empty(), + nullsCount); + case BYTE, SHORT, INT, LONG -> createIntegerColumnStatistics( + toLong(columnStatistics.get(field + COLUMN_MIN)), + toLong(columnStatistics.get(field + COLUMN_MAX)), + nullsCount, + fromMetastoreDistinctValuesCount(distinctValuesCount, nullsCount, rowCount)); + case TIMESTAMP -> createIntegerColumnStatistics( + OptionalLong.empty(), + OptionalLong.empty(), + nullsCount, + fromMetastoreDistinctValuesCount(distinctValuesCount, nullsCount, rowCount)); + case FLOAT, DOUBLE -> createDoubleColumnStatistics( + toDouble(columnStatistics.get(field + COLUMN_MIN)), + toDouble(columnStatistics.get(field + COLUMN_MAX)), + nullsCount, + fromMetastoreDistinctValuesCount(distinctValuesCount, nullsCount, rowCount)); + case STRING, VARCHAR, CHAR -> createStringColumnStatistics( + maxLength, + getTotalSizeInBytes(avgLength, OptionalLong.of(rowCount), nullsCount), + nullsCount, + fromMetastoreDistinctValuesCount(distinctValuesCount, nullsCount, rowCount)); + case DATE -> createDateColumnStatistics( + toDate(columnStatistics.get(field + COLUMN_MIN)), + toDate(columnStatistics.get(field + COLUMN_MAX)), + nullsCount, + fromMetastoreDistinctValuesCount(distinctValuesCount, nullsCount, rowCount)); + case BINARY -> createBinaryColumnStatistics( + maxLength, + getTotalSizeInBytes(avgLength, OptionalLong.of(rowCount), nullsCount), + nullsCount); + case DECIMAL -> createDecimalColumnStatistics( + toDecimal(columnStatistics.get(field + COLUMN_MIN)), + toDecimal(columnStatistics.get(field + COLUMN_MAX)), + nullsCount, + fromMetastoreDistinctValuesCount(distinctValuesCount, nullsCount, rowCount)); + case TIMESTAMPLOCALTZ, INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME, VOID, UNKNOWN -> HiveColumnStatistics.empty(); + }; + } + + /** + * Hive calculates NDV considering null as a distinct value, but Spark doesn't + */ + private static OptionalLong fromMetastoreDistinctValuesCount(OptionalLong distinctValuesCount, OptionalLong nullsCount, long rowCount) + { + if (distinctValuesCount.isPresent() && nullsCount.isPresent()) { + return OptionalLong.of(fromMetastoreDistinctValuesCount(distinctValuesCount.getAsLong(), nullsCount.getAsLong(), rowCount)); + } + return OptionalLong.empty(); + } + + private static long fromMetastoreDistinctValuesCount(long distinctValuesCount, long nullsCount, long rowCount) + { + long nonNullsCount = rowCount - nullsCount; + // normalize distinctValuesCount in case there is a non-null element + if (nonNullsCount > 0 && distinctValuesCount == 0) { + distinctValuesCount = 1; + } + + // the metastore may store an estimate, so the value stored may be higher than the total number of rows + return Math.min(distinctValuesCount, nonNullsCount); + } +} diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestThriftMetastoreConfig.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestThriftMetastoreConfig.java index c5f089e57b3b..da3fa3277a96 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestThriftMetastoreConfig.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestThriftMetastoreConfig.java @@ -50,6 +50,7 @@ public void testDefaults() .setTruststorePath(null) .setTruststorePassword(null) .setImpersonationEnabled(false) + .setUseSparkTableStatisticsFallback(true) .setDelegationTokenCacheTtl(new Duration(1, HOURS)) .setDelegationTokenCacheMaximumSize(1000) .setDeleteFilesOnDrop(false) @@ -85,6 +86,7 @@ public void testExplicitPropertyMappings() .put("hive.metastore.thrift.txn-lock-max-wait", "5m") .put("hive.metastore.thrift.write-statistics-threads", "10") .put("hive.metastore.thrift.assume-canonical-partition-keys", "true") + .put("hive.metastore.thrift.use-spark-table-statistics-fallback", "false") .buildOrThrow(); ThriftMetastoreConfig expected = new ThriftMetastoreConfig() @@ -106,7 +108,8 @@ public void testExplicitPropertyMappings() .setDeleteFilesOnDrop(true) .setMaxWaitForTransactionLock(new Duration(5, MINUTES)) .setAssumeCanonicalPartitionKeys(true) - .setWriteStatisticsThreads(10); + .setWriteStatisticsThreads(10) + .setUseSparkTableStatisticsFallback(false); assertFullMapping(properties, expected); } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestThriftMetastoreUtil.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestThriftMetastoreUtil.java index c57bda5e10cc..4e13a7599394 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestThriftMetastoreUtil.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestThriftMetastoreUtil.java @@ -39,6 +39,7 @@ import java.math.BigDecimal; import java.time.LocalDate; +import java.util.Map; import java.util.Optional; import java.util.OptionalDouble; import java.util.OptionalLong; @@ -52,6 +53,7 @@ import static io.trino.hive.thrift.metastore.ColumnStatisticsData.longStats; import static io.trino.hive.thrift.metastore.ColumnStatisticsData.stringStats; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromMetastoreApiColumnStatistics; +import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.getBasicStatisticsWithSparkFallback; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.getHiveBasicStatistics; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.toMetastoreDecimal; import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.updateStatisticsParameters; @@ -336,6 +338,47 @@ public void testSingleDistinctValue() assertEquals(actual.getDistinctValuesCount(), OptionalLong.of(1)); } + @Test + public void testSparkFallbackGetBasicStatistics() + { + // only spark stats + Map tableParameters = Map.of( + "spark.sql.statistics.numFiles", "1", + "spark.sql.statistics.numRows", "2", + "spark.sql.statistics.rawDataSize", "3", + "spark.sql.statistics.totalSize", "4"); + HiveBasicStatistics actual = getBasicStatisticsWithSparkFallback(tableParameters); + assertEquals(actual, new HiveBasicStatistics(OptionalLong.of(1), OptionalLong.of(2), OptionalLong.of(3), OptionalLong.of(4))); + actual = getHiveBasicStatistics(tableParameters); + assertEquals(actual, new HiveBasicStatistics(OptionalLong.empty(), OptionalLong.empty(), OptionalLong.empty(), OptionalLong.empty())); + // empty hive and not empty spark stats + tableParameters = Map.of( + "numFiles", "0", + "numRows", "0", + "rawDataSize", "0", + "totalSize", "0", + "spark.sql.statistics.numFiles", "1", + "spark.sql.statistics.numRows", "2", + "spark.sql.statistics.rawDataSize", "3", + "spark.sql.statistics.totalSize", "4"); + actual = getBasicStatisticsWithSparkFallback(tableParameters); + assertEquals(actual, new HiveBasicStatistics(OptionalLong.of(1), OptionalLong.of(2), OptionalLong.of(3), OptionalLong.of(4))); + actual = getHiveBasicStatistics(tableParameters); + assertEquals(actual, new HiveBasicStatistics(OptionalLong.of(0), OptionalLong.of(0), OptionalLong.of(0), OptionalLong.of(0))); + // not empty hive and not empty spark stats + tableParameters = Map.of( + "numFiles", "10", + "numRows", "20", + "rawDataSize", "30", + "totalSize", "40", + "spark.sql.statistics.numFiles", "1", + "spark.sql.statistics.numRows", "2", + "spark.sql.statistics.rawDataSize", "3", + "spark.sql.statistics.totalSize", "4"); + actual = getBasicStatisticsWithSparkFallback(tableParameters); + assertEquals(actual, new HiveBasicStatistics(OptionalLong.of(10), OptionalLong.of(20), OptionalLong.of(30), OptionalLong.of(40))); + } + @Test public void testBasicStatisticsRoundTrip() { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestThriftSparkMetastoreUtil.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestThriftSparkMetastoreUtil.java new file mode 100644 index 000000000000..171856d550dc --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestThriftSparkMetastoreUtil.java @@ -0,0 +1,221 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.metastore.thrift; + +import io.trino.hive.thrift.metastore.FieldSchema; +import io.trino.plugin.hive.metastore.BooleanStatistics; +import io.trino.plugin.hive.metastore.DateStatistics; +import io.trino.plugin.hive.metastore.DecimalStatistics; +import io.trino.plugin.hive.metastore.DoubleStatistics; +import io.trino.plugin.hive.metastore.HiveColumnStatistics; +import io.trino.plugin.hive.metastore.IntegerStatistics; +import org.testng.annotations.Test; + +import java.math.BigDecimal; +import java.time.LocalDate; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalDouble; +import java.util.OptionalLong; + +import static io.trino.plugin.hive.metastore.thrift.ThriftSparkMetastoreUtil.fromMetastoreColumnStatistics; +import static org.apache.hadoop.hive.serde.serdeConstants.BIGINT_TYPE_NAME; +import static org.apache.hadoop.hive.serde.serdeConstants.BINARY_TYPE_NAME; +import static org.apache.hadoop.hive.serde.serdeConstants.BOOLEAN_TYPE_NAME; +import static org.apache.hadoop.hive.serde.serdeConstants.DATE_TYPE_NAME; +import static org.apache.hadoop.hive.serde.serdeConstants.DECIMAL_TYPE_NAME; +import static org.apache.hadoop.hive.serde.serdeConstants.DOUBLE_TYPE_NAME; +import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME; +import static org.testng.Assert.assertEquals; + +public class TestThriftSparkMetastoreUtil +{ + @Test + public void testSparkLongStatsToColumnStatistics() + { + Map columnStatistics = Map.of("spark.sql.statistics.colStats.c_long.min", "1", + "spark.sql.statistics.colStats.c_long.distinctCount", "4", + "spark.sql.statistics.colStats.c_long.maxLen", "4", + "spark.sql.statistics.colStats.c_long.avgLen", "4", + "spark.sql.statistics.colStats.c_long.nullCount", "0", + "spark.sql.statistics.colStats.c_long.version", "2", + "spark.sql.statistics.colStats.c_long.max", "4"); + HiveColumnStatistics actualNotExists = fromMetastoreColumnStatistics(new FieldSchema("c_long_not_exists", BIGINT_TYPE_NAME, null), columnStatistics, 4); + assertEquals( + actualNotExists, + HiveColumnStatistics.builder() + .setIntegerStatistics(new IntegerStatistics(OptionalLong.empty(), OptionalLong.empty())) + .build()); + + HiveColumnStatistics actual = fromMetastoreColumnStatistics(new FieldSchema("c_long", BIGINT_TYPE_NAME, null), columnStatistics, 4); + assertEquals( + actual, + HiveColumnStatistics.builder() + .setIntegerStatistics(new IntegerStatistics(OptionalLong.of(1), OptionalLong.of(4))) + .setNullsCount(0) + .setDistinctValuesCount(4) + .build()); + } + + @Test + public void testSparkDoubleStatsToColumnStatistics() + { + Map columnStatistics = Map.of("spark.sql.statistics.colStats.c_double.min", "0.3", + "spark.sql.statistics.colStats.c_double.distinctCount", "10", + "spark.sql.statistics.colStats.c_double.maxLen", "4", + "spark.sql.statistics.colStats.c_double.avgLen", "4", + "spark.sql.statistics.colStats.c_double.nullCount", "1", + "spark.sql.statistics.colStats.c_double.version", "2", + "spark.sql.statistics.colStats.c_double.max", "3.3"); + HiveColumnStatistics actualNotExists = fromMetastoreColumnStatistics(new FieldSchema("c_double_not_exists", DOUBLE_TYPE_NAME, null), columnStatistics, 10); + assertEquals( + actualNotExists, + HiveColumnStatistics.builder() + .setDoubleStatistics(new DoubleStatistics(OptionalDouble.empty(), OptionalDouble.empty())) + .build()); + + HiveColumnStatistics actual = fromMetastoreColumnStatistics(new FieldSchema("c_double", DOUBLE_TYPE_NAME, null), columnStatistics, 10); + assertEquals( + actual, + HiveColumnStatistics.builder() + .setDoubleStatistics(new DoubleStatistics(OptionalDouble.of(0.3), OptionalDouble.of(3.3))) + .setNullsCount(1) + .setDistinctValuesCount(9) + .build()); + } + + @Test + public void testSparkDecimalStatsToColumnStatistics() + { + Map columnStatistics = Map.of("spark.sql.statistics.colStats.c_decimal.min", "0.3", + "spark.sql.statistics.colStats.c_decimal.distinctCount", "10", + "spark.sql.statistics.colStats.c_decimal.maxLen", "4", + "spark.sql.statistics.colStats.c_decimal.avgLen", "4", + "spark.sql.statistics.colStats.c_decimal.nullCount", "1", + "spark.sql.statistics.colStats.c_decimal.version", "2", + "spark.sql.statistics.colStats.c_decimal.max", "3.3"); + HiveColumnStatistics actualNotExists = fromMetastoreColumnStatistics(new FieldSchema("c_decimal_not_exists", DECIMAL_TYPE_NAME, null), columnStatistics, 10); + assertEquals( + actualNotExists, + HiveColumnStatistics.builder() + .setDecimalStatistics(new DecimalStatistics(Optional.empty(), Optional.empty())) + .build()); + + HiveColumnStatistics actual = fromMetastoreColumnStatistics(new FieldSchema("c_decimal", DECIMAL_TYPE_NAME, null), columnStatistics, 10); + assertEquals( + actual, + HiveColumnStatistics.builder() + .setDecimalStatistics(new DecimalStatistics(Optional.of(new BigDecimal("0.3")), Optional.of(new BigDecimal("3.3")))) + .setNullsCount(1) + .setDistinctValuesCount(9) + .build()); + } + + @Test + public void testSparkBooleanStatsToColumnStatistics() + { + Map columnStatistics = Map.of("spark.sql.statistics.colStats.c_bool.min", "false", + "spark.sql.statistics.colStats.c_bool.distinctCount", "2", + "spark.sql.statistics.colStats.c_bool.maxLen", "1", + "spark.sql.statistics.colStats.c_bool.avgLen", "1", + "spark.sql.statistics.colStats.c_bool.nullCount", "1", + "spark.sql.statistics.colStats.c_bool.version", "2", + "spark.sql.statistics.colStats.c_bool.max", "true"); + HiveColumnStatistics actualNotExists = fromMetastoreColumnStatistics(new FieldSchema("c_bool_not_exists", BOOLEAN_TYPE_NAME, null), columnStatistics, 10); + assertEquals( + actualNotExists, + HiveColumnStatistics.builder() + .setBooleanStatistics(new BooleanStatistics(OptionalLong.empty(), OptionalLong.empty())) + .build()); + + HiveColumnStatistics actual = fromMetastoreColumnStatistics(new FieldSchema("c_bool", BOOLEAN_TYPE_NAME, null), columnStatistics, 10); + assertEquals( + actual, + HiveColumnStatistics.builder() + .setBooleanStatistics(new BooleanStatistics(OptionalLong.empty(), OptionalLong.empty())) + .setNullsCount(1) + .build()); + } + + @Test + public void testSparkDateStatsToColumnStatistics() + { + Map columnStatistics = Map.of("spark.sql.statistics.colStats.c_date.min", "2000-01-01", + "spark.sql.statistics.colStats.c_date.distinctCount", "10", + "spark.sql.statistics.colStats.c_date.maxLen", "4", + "spark.sql.statistics.colStats.c_date.avgLen", "4", + "spark.sql.statistics.colStats.c_date.nullCount", "3", + "spark.sql.statistics.colStats.c_date.version", "2", + "spark.sql.statistics.colStats.c_date.max", "2030-12-31"); + HiveColumnStatistics actualNotExists = fromMetastoreColumnStatistics(new FieldSchema("c_date_not_exists", DATE_TYPE_NAME, null), columnStatistics, 10); + assertEquals( + actualNotExists, + HiveColumnStatistics.builder() + .setDateStatistics((new DateStatistics(Optional.empty(), Optional.empty()))) + .build()); + + HiveColumnStatistics actual = fromMetastoreColumnStatistics(new FieldSchema("c_date", DATE_TYPE_NAME, null), columnStatistics, 10); + assertEquals( + actual, + HiveColumnStatistics.builder() + .setDateStatistics((new DateStatistics(Optional.of(LocalDate.of(2000, 1, 1)), Optional.of(LocalDate.of(2030, 12, 31))))) + .setNullsCount(3) + .setDistinctValuesCount(7) + .build()); + } + + @Test + public void testSparkStringStatsToColumnStatistics() + { + Map columnStatistics = Map.of( + "spark.sql.statistics.colStats.c_char.distinctCount", "3", + "spark.sql.statistics.colStats.c_char.avgLen", "10", + "spark.sql.statistics.colStats.char_col.maxLen", "10", + "spark.sql.statistics.colStats.c_char.nullCount", "7", + "spark.sql.statistics.colStats.c_char.version", "2"); + HiveColumnStatistics actualNotExists = fromMetastoreColumnStatistics(new FieldSchema("c_char_not_exists", STRING_TYPE_NAME, null), columnStatistics, 10); + assertEquals(actualNotExists, HiveColumnStatistics.builder().build()); + + HiveColumnStatistics actual = fromMetastoreColumnStatistics(new FieldSchema("c_char", STRING_TYPE_NAME, null), columnStatistics, 10); + assertEquals( + actual, + HiveColumnStatistics.builder() + .setNullsCount(7) + .setDistinctValuesCount(3) + .setTotalSizeInBytes(30) + .build()); + } + + @Test + public void testSparkBinaryStatsToColumnStatistics() + { + Map columnStatistics = Map.of( + "spark.sql.statistics.colStats.c_bin.distinctCount", "7", + "spark.sql.statistics.colStats.c_bin.avgLen", "10", + "spark.sql.statistics.colStats.c_bin.maxLen", "10", + "spark.sql.statistics.colStats.c_bin.nullCount", "3", + "spark.sql.statistics.colStats.c_bin.version", "2"); + HiveColumnStatistics actualNotExists = fromMetastoreColumnStatistics(new FieldSchema("c_bin_not_exists", BINARY_TYPE_NAME, null), columnStatistics, 10); + assertEquals(actualNotExists, HiveColumnStatistics.builder().build()); + + HiveColumnStatistics actual = fromMetastoreColumnStatistics(new FieldSchema("c_bin", BINARY_TYPE_NAME, null), columnStatistics, 10); + assertEquals( + actual, + HiveColumnStatistics.builder() + .setNullsCount(3) + .setTotalSizeInBytes(70) + .setMaxValueSizeInBytes(10) + .build()); + } +} diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkHiveNoStatsFallback.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkHiveNoStatsFallback.java new file mode 100644 index 000000000000..7ae4eb9cd15f --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkHiveNoStatsFallback.java @@ -0,0 +1,84 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.launcher.env.environment; + +import com.google.common.collect.ImmutableList; +import io.trino.tests.product.launcher.docker.DockerFiles; +import io.trino.tests.product.launcher.docker.DockerFiles.ResourceProvider; +import io.trino.tests.product.launcher.env.DockerContainer; +import io.trino.tests.product.launcher.env.Environment; +import io.trino.tests.product.launcher.env.EnvironmentConfig; +import io.trino.tests.product.launcher.env.EnvironmentProvider; +import io.trino.tests.product.launcher.env.common.Hadoop; +import io.trino.tests.product.launcher.env.common.Standard; +import io.trino.tests.product.launcher.env.common.TestsEnvironment; +import io.trino.tests.product.launcher.testcontainers.PortBinder; +import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy; + +import javax.inject.Inject; + +import static io.trino.tests.product.launcher.docker.ContainerUtil.forSelectedPorts; +import static io.trino.tests.product.launcher.env.EnvironmentContainers.HADOOP; +import static java.util.Objects.requireNonNull; +import static org.testcontainers.utility.MountableFile.forHostPath; + +@TestsEnvironment +public class EnvSinglenodeSparkHiveNoStatsFallback + extends EnvironmentProvider +{ + private static final int SPARK_THRIFT_PORT = 10213; + + private final PortBinder portBinder; + private final String hadoopImagesVersion; + private final ResourceProvider configDir; + + @Inject + public EnvSinglenodeSparkHiveNoStatsFallback(Standard standard, Hadoop hadoop, DockerFiles dockerFiles, EnvironmentConfig config, PortBinder portBinder) + { + super(ImmutableList.of(standard, hadoop)); + requireNonNull(dockerFiles, "dockerFiles is null"); + this.portBinder = requireNonNull(portBinder, "portBinder is null"); + this.hadoopImagesVersion = config.getHadoopImagesVersion(); + this.configDir = dockerFiles.getDockerFilesHostDirectory("conf/environment/singlenode-spark-hive-no-stats-fallback"); + } + + @Override + public void extendEnvironment(Environment.Builder builder) + { + builder.configureContainer(HADOOP, container -> container.setDockerImageName("ghcr.io/trinodb/testing/hdp3.1-hive:" + hadoopImagesVersion)); + builder.addConnector("hive", forHostPath(configDir.getPath("hive.properties"))); + builder.addContainer(createSpark()).containerDependsOn("spark", HADOOP); + } + + @SuppressWarnings("resource") + private DockerContainer createSpark() + { + DockerContainer container = new DockerContainer("ghcr.io/trinodb/testing/spark3-iceberg:" + hadoopImagesVersion, "spark") + .withEnv("HADOOP_USER_NAME", "hive") + .withCopyFileToContainer(forHostPath(configDir.getPath("spark-defaults.conf")), "/spark/conf/spark-defaults.conf") + .withCommand( + "spark-submit", + "--master", "local[*]", + "--class", "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2", + "--name", "Thrift JDBC/ODBC Server", + "--conf", "spark.hive.server2.thrift.port=" + SPARK_THRIFT_PORT, + "spark-internal") + .withStartupCheckStrategy(new IsRunningStartupCheckStrategy()) + .waitingFor(forSelectedPorts(SPARK_THRIFT_PORT)); + + portBinder.exposePort(container, SPARK_THRIFT_PORT); + + return container; + } +} diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite7NonGeneric.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite7NonGeneric.java index c8b81e9d62ec..ae8a5901d05c 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite7NonGeneric.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite7NonGeneric.java @@ -21,6 +21,7 @@ import io.trino.tests.product.launcher.env.environment.EnvMultinodeSqlserver; import io.trino.tests.product.launcher.env.environment.EnvSinglenodeKerberosHdfsImpersonationCrossRealm; import io.trino.tests.product.launcher.env.environment.EnvSinglenodeSparkHive; +import io.trino.tests.product.launcher.env.environment.EnvSinglenodeSparkHiveNoStatsFallback; import io.trino.tests.product.launcher.env.environment.EnvTwoKerberosHives; import io.trino.tests.product.launcher.env.environment.EnvTwoMixedHives; import io.trino.tests.product.launcher.suite.Suite; @@ -49,6 +50,9 @@ public List getTestRuns(EnvironmentConfig config) testOnEnvironment(EnvSinglenodeSparkHive.class) .withGroups("configured_features", "hive_spark") .build(), + testOnEnvironment(EnvSinglenodeSparkHiveNoStatsFallback.class) + .withGroups("configured_features", "hive_spark_no_stats_fallback") + .build(), testOnEnvironment(EnvSinglenodeKerberosHdfsImpersonationCrossRealm.class) .withGroups("configured_features", "storage_formats", "cli", "hdfs_impersonation") .build(), diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-hive-no-stats-fallback/hive.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-hive-no-stats-fallback/hive.properties new file mode 100644 index 000000000000..66e861f1fc52 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-hive-no-stats-fallback/hive.properties @@ -0,0 +1,8 @@ +connector.name=hive +hive.metastore.uri=thrift://hadoop-master:9083 +hive.metastore.thrift.use-spark-table-statistics-fallback=false +hive.config.resources=/docker/presto-product-tests/conf/presto/etc/hive-default-fs-site.xml +hive.allow-drop-table=true + +# Note: it's currently unclear why this one is needed, while also hive.orc.time-zone=UTC is not needed. +hive.parquet.time-zone=UTC diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-hive-no-stats-fallback/spark-defaults.conf b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-hive-no-stats-fallback/spark-defaults.conf new file mode 100644 index 000000000000..b26c24ce0a39 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-hive-no-stats-fallback/spark-defaults.conf @@ -0,0 +1,9 @@ +spark.sql.catalogImplementation=hive +spark.sql.warehouse.dir=hdfs://hadoop-master:9000/user/hive/warehouse +spark.sql.hive.thriftServer.singleSession=false + +spark.hadoop.fs.defaultFS=hdfs://hadoop-master:9000 +spark.hive.metastore.uris=thrift://hadoop-master:9083 +spark.hive.metastore.warehouse.dir=hdfs://hadoop-master:9000/user/hive/warehouse +spark.hive.metastore.schema.verification=false +spark.hive.enforce.bucketing=false diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java index a09b3a8ca79b..8c31271611c9 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java @@ -44,6 +44,7 @@ public final class TestGroups public static final String HDFS_NO_IMPERSONATION = "hdfs_no_impersonation"; public static final String HIVE_PARTITIONING = "hive_partitioning"; public static final String HIVE_SPARK = "hive_spark"; + public static final String HIVE_SPARK_NO_STATS_FALLBACK = "hive_spark_no_stats_fallback"; public static final String HIVE_COMPRESSION = "hive_compression"; public static final String HIVE_TRANSACTIONAL = "hive_transactional"; public static final String HIVE_VIEWS = "hive_views"; diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveSparkCompatibility.java index abb4b449656f..11975c5f3664 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveSparkCompatibility.java @@ -13,6 +13,7 @@ */ package io.trino.tests.product.hive; +import com.google.common.collect.ImmutableList; import io.trino.tempto.ProductTest; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -28,6 +29,7 @@ import static io.trino.tempto.assertions.QueryAssert.assertThat; import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.tests.product.TestGroups.HIVE_SPARK; +import static io.trino.tests.product.TestGroups.HIVE_SPARK_NO_STATS_FALLBACK; import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; import static io.trino.tests.product.utils.QueryExecutors.onHive; import static io.trino.tests.product.utils.QueryExecutors.onSpark; @@ -645,4 +647,207 @@ public void testReadSparkBucketedTable() onSpark().executeQuery("DROP TABLE " + sparkTableName); } + + @Test(groups = {HIVE_SPARK, PROFILE_SPECIFIC_TESTS}) + public void testReadSparkStatisticsUnpartitionedTable() + { + String tableName1 = "test_trino_reading_spark_statistics_table_" + randomNameSuffix(); + String tableName2 = "test_trino_reading_spark_statistics_table_" + randomNameSuffix(); + try { + onSpark().executeQuery(format("CREATE TABLE %s(" + + "c_tinyint BYTE, " + + "c_smallint SMALLINT, " + + "c_int INT, " + + "c_bigint BIGINT, " + + "c_float REAL, " + + "c_double DOUBLE, " + + "c_decimal DECIMAL(10,0), " + + "c_decimal_w_params DECIMAL(10,5), " + + "c_timestamp TIMESTAMP, " + + "c_date DATE, " + + "c_string STRING, " + + "c_varchar VARCHAR(10), " + + "c_char CHAR(10), " + + "c_boolean BOOLEAN, " + + "c_binary BINARY" + + ")", tableName1)); + + onSpark().executeQuery(format("INSERT INTO %s VALUES " + + "(120, 32760, 2147483640, 9223372036854775800, 123.340, 234.560, CAST(343.0 AS DECIMAL(10, 0)), CAST(345.670 AS DECIMAL(10, 5)), TIMESTAMP '2015-05-10 12:15:30', DATE '2015-05-08', 'p1 varchar', CAST('varchar10' AS VARCHAR(10)), CAST('p1 char10' AS CHAR(10)), false, CAST('p1 binary' as BINARY))," + + "(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null)", tableName1)); + // Copy table to avoid hive metastore cache + onSpark().executeQuery(format("CREATE TABLE %s as SELECT * FROM %s", tableName2, tableName1)); + // test table statistics only + onSpark().executeQuery(format("ANALYZE TABLE %s COMPUTE STATISTICS", tableName2)); + assertThat(onTrino().executeQuery(format("SHOW STATS FOR %s", tableName2))).containsOnly(ImmutableList.of( + row("c_tinyint", null, null, null, null, null, null), + row("c_smallint", null, null, null, null, null, null), + row("c_int", null, null, null, null, null, null), + row("c_bigint", null, null, null, null, null, null), + row("c_float", null, null, null, null, null, null), + row("c_double", null, null, null, null, null, null), + row("c_decimal", null, null, null, null, null, null), + row("c_decimal_w_params", null, null, null, null, null, null), + row("c_timestamp", null, null, null, null, null, null), + row("c_date", null, null, null, null, null, null), + row("c_string", null, null, null, null, null, null), + row("c_varchar", null, null, null, null, null, null), + row("c_char", null, null, null, null, null, null), + row("c_boolean", null, null, null, null, null, null), + row("c_binary", null, null, null, null, null, null), + row(null, null, null, null, 2.0, null, null))); + // test table and column statistics + onSpark().executeQuery(format("ANALYZE TABLE %s COMPUTE STATISTICS FOR ALL COLUMNS", tableName1)); + assertThat(onTrino().executeQuery(format("SHOW STATS FOR %s", tableName1))).containsOnly(ImmutableList.of( + row("c_tinyint", null, 1.0, 0.5, null, "120", "120"), + row("c_smallint", null, 1.0, 0.5, null, "32760", "32760"), + row("c_int", null, 1.0, 0.5, null, "2147483640", "2147483640"), + row("c_bigint", null, 1.0, 0.5, null, "9223372036854775807", "9223372036854775807"), + row("c_float", null, 1.0, 0.5, null, "123.34", "123.34"), + row("c_double", null, 1.0, 0.5, null, "234.56", "234.56"), + row("c_decimal", null, 1.0, 0.5, null, "343.0", "343.0"), + row("c_decimal_w_params", null, 1.0, 0.5, null, "345.67", "345.67"), + row("c_timestamp", null, 1.0, 0.5, null, null, null), + row("c_date", null, 1.0, 0.5, null, "2015-05-08", "2015-05-08"), + row("c_string", 10.0, 1.0, 0.5, null, null, null), + row("c_varchar", 9.0, 1.0, 0.5, null, null, null), + row("c_char", 10.0, 1.0, 0.5, null, null, null), + row("c_boolean", null, null, 0.5, null, null, null), + row("c_binary", 9.0, null, 0.5, null, null, null), + row(null, null, null, null, 2.0, null, null))); + } + finally { + onTrino().executeQuery(format("DROP TABLE IF EXISTS %s", tableName1)); + onTrino().executeQuery(format("DROP TABLE IF EXISTS %s", tableName2)); + } + } + + @Test(groups = {HIVE_SPARK, PROFILE_SPECIFIC_TESTS}) + public void testReadSparkStatisticsPartitionedTable() + { + String tableName = "test_trino_reading_spark_statistics_table_" + randomNameSuffix(); + try { + onSpark().executeQuery(format("CREATE TABLE %s(" + + "c_tinyint BYTE, " + + "c_smallint SMALLINT, " + + "c_int INT, " + + "c_bigint BIGINT, " + + "c_float REAL, " + + "c_double DOUBLE, " + + "c_decimal DECIMAL(10,0), " + + "c_decimal_w_params DECIMAL(10,5), " + + "c_timestamp TIMESTAMP, " + + "c_date DATE, " + + "c_string STRING, " + + "c_varchar VARCHAR(10), " + + "c_char CHAR(10), " + + "c_boolean BOOLEAN, " + + "c_binary BINARY," + + "p_bigint BIGINT, " + + "p_varchar VARCHAR(15) " + + ") partitioned by (p_bigint, p_varchar)", tableName)); + + onSpark().executeQuery("set hive.exec.dynamic.partition.mode=nonstrict"); + onSpark().executeQuery(format("INSERT INTO %s VALUES " + + "(120, 32760, 2147483640, 9223372036854775800, 123.340, 234.560, CAST(343.0 AS DECIMAL(10, 0)), CAST(345.670 AS DECIMAL(10, 5)), TIMESTAMP '2015-05-10 12:15:30', DATE '2015-05-08', 'p1 varchar', CAST('varchar10' AS VARCHAR(10)), CAST('p1 char10' AS CHAR(10)), false, CAST('p1 binary' as BINARY), 1, 'partition 1')," + + "(120, 32760, 2147483640, 9223372036854775800, 123.340, 234.560, CAST(343.0 AS DECIMAL(10, 0)), CAST(345.670 AS DECIMAL(10, 5)), TIMESTAMP '2015-05-10 12:15:30', DATE '2015-05-08', 'p1 varchar', CAST('varchar10' AS VARCHAR(10)), CAST('p1 char10' AS CHAR(10)), false, CAST('p1 binary' as BINARY), 2, 'partition 2')," + + "(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, 2, 'partition 2')," + + "(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, 1, 'partition 1')", tableName)); + onSpark().executeQuery(format("ANALYZE TABLE %s COMPUTE STATISTICS FOR ALL COLUMNS", tableName)); + onSpark().executeQuery(format("ANALYZE TABLE %s PARTITION(p_bigint, p_varchar) COMPUTE STATISTICS", tableName)); + // test statistics for whole table + assertThat(onTrino().executeQuery(format("SHOW STATS FOR %s", tableName))).containsOnly(ImmutableList.of( + row("c_tinyint", null, null, null, null, null, null), + row("c_smallint", null, null, null, null, null, null), + row("c_int", null, null, null, null, null, null), + row("c_bigint", null, null, null, null, null, null), + row("c_float", null, null, null, null, null, null), + row("c_double", null, null, null, null, null, null), + row("c_decimal", null, null, null, null, null, null), + row("c_decimal_w_params", null, null, null, null, null, null), + row("c_timestamp", null, null, null, null, null, null), + row("c_date", null, null, null, null, null, null), + row("c_string", null, null, null, null, null, null), + row("c_varchar", null, null, null, null, null, null), + row("c_char", null, null, null, null, null, null), + row("c_boolean", null, null, null, null, null, null), + row("c_binary", null, null, null, null, null, null), + row("p_bigint", null, 2.0, 0.0, null, "1", "2"), + row("p_varchar", 44.0, 2.0, 0.0, null, null, null), + row(null, null, null, null, 4.0, null, null))); + // test statistics for single partition + assertThat(onTrino().executeQuery(format("SHOW STATS FOR (SELECT * FROM %s where p_bigint = 1)", tableName))).containsOnly(ImmutableList.of( + row("c_tinyint", null, null, null, null, null, null), + row("c_smallint", null, null, null, null, null, null), + row("c_int", null, null, null, null, null, null), + row("c_bigint", null, null, null, null, null, null), + row("c_float", null, null, null, null, null, null), + row("c_double", null, null, null, null, null, null), + row("c_decimal", null, null, null, null, null, null), + row("c_decimal_w_params", null, null, null, null, null, null), + row("c_timestamp", null, null, null, null, null, null), + row("c_date", null, null, null, null, null, null), + row("c_string", null, null, null, null, null, null), + row("c_varchar", null, null, null, null, null, null), + row("c_char", null, null, null, null, null, null), + row("c_boolean", null, null, null, null, null, null), + row("c_binary", null, null, null, null, null, null), + row("p_bigint", null, 1.0, 0.0, null, "1", "1"), + row("p_varchar", 22.0, 1.0, 0.0, null, null, null), + row(null, null, null, null, 2.0, null, null))); + } + finally { + onTrino().executeQuery(format("DROP TABLE IF EXISTS %s", tableName)); + } + } + + @Test(groups = {HIVE_SPARK_NO_STATS_FALLBACK, PROFILE_SPECIFIC_TESTS}) + public void testIgnoringSparkStatisticsWithDisabledFallback() + { + String tableName = "test_trino_reading_spark_statistics_table_" + randomNameSuffix(); + try { + onSpark().executeQuery(format("CREATE TABLE %s(" + + "c_tinyint BYTE, " + + "c_smallint SMALLINT, " + + "c_int INT, " + + "c_bigint BIGINT, " + + "c_float REAL, " + + "c_double DOUBLE, " + + "c_decimal DECIMAL(10,0), " + + "c_decimal_w_params DECIMAL(10,5), " + + "c_timestamp TIMESTAMP, " + + "c_date DATE, " + + "c_string STRING, " + + "c_varchar VARCHAR(10), " + + "c_char CHAR(10), " + + "c_boolean BOOLEAN, " + + "c_binary BINARY" + + ")", tableName)); + + onSpark().executeQuery(format("INSERT INTO %s VALUES " + + "(120, 32760, 2147483640, 9223372036854775800, 123.340, 234.560, CAST(343.0 AS DECIMAL(10, 0)), CAST(345.670 AS DECIMAL(10, 5)), TIMESTAMP '2015-05-10 12:15:30', DATE '2015-05-08', 'p1 varchar', CAST('varchar10' AS VARCHAR(10)), CAST('p1 char10' AS CHAR(10)), false, CAST('p1 binary' as BINARY))," + + "(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null)", tableName)); + onSpark().executeQuery(format("ANALYZE TABLE %s COMPUTE STATISTICS FOR ALL COLUMNS", tableName)); + assertThat(onTrino().executeQuery(format("SHOW STATS FOR %s", tableName))).containsOnly(ImmutableList.of( + row("c_tinyint", null, null, null, null, null, null), + row("c_smallint", null, null, null, null, null, null), + row("c_int", null, null, null, null, null, null), + row("c_bigint", null, null, null, null, null, null), + row("c_float", null, null, null, null, null, null), + row("c_double", null, null, null, null, null, null), + row("c_decimal", null, null, null, null, null, null), + row("c_decimal_w_params", null, null, null, null, null, null), + row("c_timestamp", null, null, null, null, null, null), + row("c_date", null, null, null, null, null, null), + row("c_string", null, null, null, null, null, null), + row("c_varchar", null, null, null, null, null, null), + row("c_char", null, null, null, null, null, null), + row("c_boolean", null, null, null, null, null, null), + row("c_binary", null, null, null, null, null, null), + row(null, null, null, null, null, null, null))); + } + finally { + onTrino().executeQuery(format("DROP TABLE IF EXISTS %s", tableName)); + } + } }