Skip to content

Commit

Permalink
Read spark generated statistics in hive connector
Browse files Browse the repository at this point in the history
  • Loading branch information
Dith3r authored and raunaqmorarka committed Feb 28, 2023
1 parent ce41ce1 commit 5dc41d4
Show file tree
Hide file tree
Showing 16 changed files with 860 additions and 5 deletions.
2 changes: 2 additions & 0 deletions docs/src/main/sphinx/connector/hive.rst
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,8 @@ Property Name Description
``KERBEROS``. Default is ``NONE``.
* - ``hive.metastore.thrift.impersonation.enabled``
- Enable Hive metastore end user impersonation.
* - ``hive.metastore.thrift.use-spark-table-statistics-fallback``
- Enable usage of table statistics generated by Apache Spark when hive table statistics are not available
* - ``hive.metastore.thrift.delegation-token.cache-ttl``
- Time to live delegation token cache for metastore. Default is ``1h``.
* - ``hive.metastore.thrift.delegation-token.cache-maximum-size``
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromMetastoreApiTable;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromRolePrincipalGrants;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.fromTrinoPrincipalType;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.getBasicStatisticsWithSparkFallback;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.getHiveBasicStatistics;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.isAvroTableWithSchemaSet;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.parsePrivilege;
Expand Down Expand Up @@ -156,6 +157,7 @@ public class ThriftHiveMetastore
private final boolean deleteFilesOnDrop;
private final boolean translateHiveViews;
private final boolean assumeCanonicalPartitionKeys;
private final boolean useSparkTableStatisticsFallback;
private final ThriftMetastoreStats stats;
private final ExecutorService writeStatisticsExecutor;

Expand All @@ -172,6 +174,7 @@ public ThriftHiveMetastore(
boolean deleteFilesOnDrop,
boolean translateHiveViews,
boolean assumeCanonicalPartitionKeys,
boolean useSparkTableStatisticsFallback,
ThriftMetastoreStats stats,
ExecutorService writeStatisticsExecutor)
{
Expand All @@ -187,6 +190,7 @@ public ThriftHiveMetastore(
this.deleteFilesOnDrop = deleteFilesOnDrop;
this.translateHiveViews = translateHiveViews;
this.assumeCanonicalPartitionKeys = assumeCanonicalPartitionKeys;
this.useSparkTableStatisticsFallback = useSparkTableStatisticsFallback;
this.stats = requireNonNull(stats, "stats is null");
this.writeStatisticsExecutor = requireNonNull(writeStatisticsExecutor, "writeStatisticsExecutor is null");
}
Expand Down Expand Up @@ -325,7 +329,16 @@ public PartitionStatistics getTableStatistics(Table table)
List<String> dataColumns = table.getSd().getCols().stream()
.map(FieldSchema::getName)
.collect(toImmutableList());
HiveBasicStatistics basicStatistics = getHiveBasicStatistics(table.getParameters());
Map<String, String> parameters = table.getParameters();
HiveBasicStatistics basicStatistics = getHiveBasicStatistics(parameters);

if (useSparkTableStatisticsFallback && basicStatistics.getRowCount().isEmpty()) {
PartitionStatistics sparkTableStatistics = ThriftSparkMetastoreUtil.getTableStatistics(table);
if (sparkTableStatistics.getBasicStatistics().getRowCount().isPresent()) {
return sparkTableStatistics;
}
}

Map<String, HiveColumnStatistics> columnStatistics = getTableColumnStatistics(table.getDbName(), table.getTableName(), dataColumns, basicStatistics.getRowCount());
return new PartitionStatistics(basicStatistics, columnStatistics);
}
Expand Down Expand Up @@ -366,7 +379,12 @@ public Map<String, PartitionStatistics> getPartitionStatistics(Table table, List
Map<String, HiveBasicStatistics> partitionBasicStatistics = partitions.stream()
.collect(toImmutableMap(
partition -> makePartName(partitionColumns, partition.getValues()),
partition -> getHiveBasicStatistics(partition.getParameters())));
partition -> {
if (useSparkTableStatisticsFallback) {
return getBasicStatisticsWithSparkFallback(partition.getParameters());
}
return getHiveBasicStatistics(partition.getParameters());
}));
Map<String, OptionalLong> partitionRowCounts = partitionBasicStatistics.entrySet().stream()
.collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().getRowCount()));
Map<String, Map<String, HiveColumnStatistics>> partitionColumnStatistics = getPartitionColumnStatistics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class ThriftHiveMetastoreFactory
private final boolean deleteFilesOnDrop;
private final boolean translateHiveViews;
private final boolean assumeCanonicalPartitionKeys;
private final boolean useSparkTableStatisticsFallback;
private final ExecutorService writeStatisticsExecutor;
private final ThriftMetastoreStats stats = new ThriftMetastoreStats();

Expand All @@ -69,6 +70,7 @@ public ThriftHiveMetastoreFactory(
this.maxWaitForLock = thriftConfig.getMaxWaitForTransactionLock();

this.assumeCanonicalPartitionKeys = thriftConfig.isAssumeCanonicalPartitionKeys();
this.useSparkTableStatisticsFallback = thriftConfig.isUseSparkTableStatisticsFallback();
this.writeStatisticsExecutor = requireNonNull(writeStatisticsExecutor, "writeStatisticsExecutor is null");
}

Expand Down Expand Up @@ -101,6 +103,7 @@ public ThriftMetastore createMetastore(Optional<ConnectorIdentity> identity)
deleteFilesOnDrop,
translateHiveViews,
assumeCanonicalPartitionKeys,
useSparkTableStatisticsFallback,
stats,
writeStatisticsExecutor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class ThriftMetastoreConfig
private Duration maxBackoffDelay = RetryDriver.DEFAULT_SLEEP_TIME;
private Duration maxRetryTime = RetryDriver.DEFAULT_MAX_RETRY_TIME;
private boolean impersonationEnabled;
private boolean useSparkTableStatisticsFallback = true;
private Duration delegationTokenCacheTtl = new Duration(1, TimeUnit.HOURS); // The default lifetime in Hive is 7 days (metastore.cluster.delegation.token.max-lifetime)
private long delegationTokenCacheMaximumSize = 1000;
private boolean deleteFilesOnDrop;
Expand Down Expand Up @@ -158,6 +159,19 @@ public ThriftMetastoreConfig setImpersonationEnabled(boolean impersonationEnable
return this;
}

public boolean isUseSparkTableStatisticsFallback()
{
return useSparkTableStatisticsFallback;
}

@Config("hive.metastore.thrift.use-spark-table-statistics-fallback")
@ConfigDescription("Enable usage of table statistics generated by Apache Spark when hive table statistics are not available")
public ThriftMetastoreConfig setUseSparkTableStatisticsFallback(boolean useSparkTableStatisticsFallback)
{
this.useSparkTableStatisticsFallback = useSparkTableStatisticsFallback;
return this;
}

@NotNull
@MinDuration("0ms")
public Duration getDelegationTokenCacheTtl()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,31 @@
*/
package io.trino.plugin.hive.metastore.thrift;

import com.google.common.primitives.Doubles;
import com.google.common.primitives.Longs;

import javax.annotation.Nullable;

import java.math.BigDecimal;
import java.time.DateTimeException;
import java.time.LocalDate;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.OptionalLong;

class ThriftMetastoreParameterParserUtils
final class ThriftMetastoreParameterParserUtils
{
private ThriftMetastoreParameterParserUtils() {}

static Optional<Boolean> toBoolean(@Nullable String parameterValue)
{
if (parameterValue == null) {
return Optional.empty();
}
Boolean value = Boolean.parseBoolean(parameterValue);
return Optional.of(value);
}

static OptionalLong toLong(@Nullable String parameterValue)
{
if (parameterValue == null) {
Expand All @@ -34,4 +49,47 @@ static OptionalLong toLong(@Nullable String parameterValue)
}
return OptionalLong.of(longValue);
}

static OptionalDouble toDouble(@Nullable String parameterValue)
{
if (parameterValue == null) {
return OptionalDouble.empty();
}
Double doubleValue = Doubles.tryParse(parameterValue);
if (doubleValue == null || doubleValue < 0) {
return OptionalDouble.empty();
}
return OptionalDouble.of(doubleValue);
}

static Optional<BigDecimal> toDecimal(@Nullable String parameterValue)
{
if (parameterValue == null) {
return Optional.empty();
}
try {
BigDecimal decimal = new BigDecimal(parameterValue);
if (decimal.compareTo(BigDecimal.ZERO) < 0) {
return Optional.empty();
}
return Optional.of(decimal);
}
catch (NumberFormatException exception) {
return Optional.empty();
}
}

static Optional<LocalDate> toDate(@Nullable String parameterValue)
{
if (parameterValue == null) {
return Optional.empty();
}
try {
LocalDate date = LocalDate.parse(parameterValue);
return Optional.of(date);
}
catch (DateTimeException exception) {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
import static io.trino.plugin.hive.metastore.HivePrivilegeInfo.HivePrivilege.SELECT;
import static io.trino.plugin.hive.metastore.HivePrivilegeInfo.HivePrivilege.UPDATE;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreParameterParserUtils.toLong;
import static io.trino.plugin.hive.metastore.thrift.ThriftSparkMetastoreUtil.getSparkBasicStatistics;
import static io.trino.plugin.hive.type.Category.PRIMITIVE;
import static io.trino.spi.security.PrincipalType.ROLE;
import static io.trino.spi.security.PrincipalType.USER;
Expand All @@ -144,10 +145,10 @@

public final class ThriftMetastoreUtil
{
public static final String NUM_ROWS = "numRows";
private static final String PUBLIC_ROLE_NAME = "public";
private static final String ADMIN_ROLE_NAME = "admin";
private static final String NUM_FILES = "numFiles";
public static final String NUM_ROWS = "numRows";
private static final String RAW_DATA_SIZE = "rawDataSize";
private static final String TOTAL_SIZE = "totalSize";
private static final Set<String> STATS_PROPERTIES = ImmutableSet.of(NUM_FILES, NUM_ROWS, RAW_DATA_SIZE, TOTAL_SIZE);
Expand Down Expand Up @@ -749,6 +750,20 @@ public static HiveBasicStatistics getHiveBasicStatistics(Map<String, String> par
return new HiveBasicStatistics(numFiles, numRows, inMemoryDataSizeInBytes, onDiskDataSizeInBytes);
}

public static HiveBasicStatistics getBasicStatisticsWithSparkFallback(Map<String, String> parameters)
{
HiveBasicStatistics basicStatistics = getHiveBasicStatistics(parameters);
// Partitioned table without statistics
if (basicStatistics.getRowCount().isEmpty() || basicStatistics.getRowCount().getAsLong() == 0L) {
HiveBasicStatistics sparkBasicStatistics = getSparkBasicStatistics(parameters);
if (sparkBasicStatistics.getRowCount().isPresent()) {
return sparkBasicStatistics;
}
}

return basicStatistics;
}

public static Map<String, String> updateStatisticsParameters(Map<String, String> parameters, HiveBasicStatistics statistics)
{
ImmutableMap.Builder<String, String> result = ImmutableMap.builder();
Expand Down
Loading

0 comments on commit 5dc41d4

Please sign in to comment.