Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hive: Add View support for HIVE catalog #9852

Merged
merged 17 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.exceptions;

import com.google.errorprone.annotations.FormatMethod;

/** NoSuchIcebergViewException thrown when a view is found, but it is not an Iceberg view. */
public class NoSuchIcebergViewException extends NoSuchViewException {
@FormatMethod
public NoSuchIcebergViewException(String message, Object... args) {
super(message, args);
}

@FormatMethod
public static void check(boolean test, String message, Object... args) {
if (!test) {
throw new NoSuchIcebergViewException(message, args);
}
}
}
18 changes: 18 additions & 0 deletions core/src/main/java/org/apache/iceberg/CatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.iceberg.view.ViewMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -137,6 +138,23 @@ public static void dropTableData(FileIO io, TableMetadata metadata) {
deleteFile(io, metadata.metadataFileLocation(), "metadata");
}

/**
* Drops view metadata files referenced by ViewMetadata.
*
* <p>This should be called by dropView implementations
*
* @param io a FileIO to use for deletes
* @param metadata the last valid ViewMetadata instance for a dropped view.
*/
public static void dropViewMetadata(FileIO io, ViewMetadata metadata) {
boolean gcEnabled =
PropertyUtil.propertyAsBoolean(metadata.properties(), GC_ENABLED, GC_ENABLED_DEFAULT);

if (gcEnabled) {
deleteFile(io, metadata.metadataFileLocation(), "metadata");
}
}

@SuppressWarnings("DangerousStringInternUsage")
private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
// keep track of deleted files in a map that can be cleaned up when memory runs low
Expand Down
200 changes: 193 additions & 7 deletions hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,29 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
Expand All @@ -56,13 +59,21 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.LocationUtil;
import org.apache.iceberg.view.BaseMetastoreViewCatalog;
import org.apache.iceberg.view.View;
import org.apache.iceberg.view.ViewBuilder;
import org.apache.iceberg.view.ViewMetadata;
import org.apache.iceberg.view.ViewOperations;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable {
public class HiveCatalog extends BaseMetastoreViewCatalog
implements SupportsNamespaces, Configurable {
public static final String LIST_ALL_TABLES = "list-all-tables";
public static final String LIST_ALL_TABLES_DEFAULT = "false";

Expand Down Expand Up @@ -117,6 +128,16 @@ public void initialize(String inputName, Map<String, String> properties) {
this.fileIOTracker = new FileIOTracker();
}

@Override
public TableBuilder buildTable(TableIdentifier identifier, Schema schema) {
return new ViewAwareTableBuilder(identifier, schema);
nk1506 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public ViewBuilder buildView(TableIdentifier identifier) {
return new TableAwareViewBuilder(identifier);
nk1506 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public List<TableIdentifier> listTables(Namespace namespace) {
Preconditions.checkArgument(
Expand Down Expand Up @@ -156,6 +177,38 @@ public List<TableIdentifier> listTables(Namespace namespace) {
}
}

@Override
public List<TableIdentifier> listViews(Namespace namespace) {
Preconditions.checkArgument(
isValidateNamespace(namespace), "Missing database in namespace: %s", namespace);

try {
String database = namespace.level(0);
List<String> viewNames =
clients.run(client -> client.getTables(database, "*", TableType.VIRTUAL_VIEW));

// Retrieving the Table objects from HMS in batches to avoid OOM
List<TableIdentifier> filteredTableIdentifiers = Lists.newArrayList();
Iterable<List<String>> viewNameSets = Iterables.partition(viewNames, 100);

for (List<String> viewNameSet : viewNameSets) {
filteredTableIdentifiers.addAll(
listIcebergTables(viewNameSet, namespace, HiveOperationsBase.ICEBERG_VIEW_TYPE_VALUE));
}

return filteredTableIdentifiers;
} catch (UnknownDBException e) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);

} catch (TException e) {
throw new RuntimeException("Failed to list all views under namespace " + namespace, e);

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted in call to listViews", e);
}
}

@Override
public String name() {
return name;
Expand Down Expand Up @@ -213,11 +266,57 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {
}
}

@Override
public boolean dropView(TableIdentifier identifier) {
nk1506 marked this conversation as resolved.
Show resolved Hide resolved
if (!isValidIdentifier(identifier)) {
return false;
}

try {
String database = identifier.namespace().level(0);
String viewName = identifier.name();

HiveViewOperations ops = (HiveViewOperations) newViewOps(identifier);
ViewMetadata lastViewMetadata = null;
try {
lastViewMetadata = ops.current();
} catch (NotFoundException e) {
LOG.warn("Failed to load view metadata for view: {}", identifier, e);
}

clients.run(
client -> {
client.dropTable(database, viewName, false, false);
return null;
});

if (lastViewMetadata != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about immediately deleting view metadata. When tables are dropped, there's a purge flag. For views it might be better to leave view metadata around and not drop it immediately

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I'm curious to hear about other people's opinions on whether view metadata should be deleted immediately when the view is dropped

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented elsewhere, but I agree it should adhere to the gcEnabled flag like in dropTables (this is in CatalogUtil)

CatalogUtil.dropViewMetadata(ops.io(), lastViewMetadata);
}

LOG.info("Dropped view: {}", identifier);
return true;
} catch (NoSuchObjectException e) {
LOG.info("Skipping drop, view does not exist: {}", identifier, e);
return false;
} catch (TException e) {
throw new RuntimeException("Failed to drop view " + identifier, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted in call to dropView", e);
}
}

@Override
public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
renameTableOrView(from, originalTo, HiveOperationsBase.ContentType.TABLE);
}

@Override
public void renameView(TableIdentifier from, TableIdentifier to) {
renameTableOrView(from, to, HiveOperationsBase.ContentType.VIEW);
}

private List<TableIdentifier> listIcebergTables(
List<String> tableNames, Namespace namespace, String tableTypeProp)
throws TException, InterruptedException {
Expand All @@ -233,13 +332,12 @@ private List<TableIdentifier> listIcebergTables(
.collect(Collectors.toList());
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
private void renameTableOrView(
TableIdentifier from,
TableIdentifier originalTo,
HiveOperationsBase.ContentType contentType) {
if (!isValidIdentifier(from)) {
throw new NoSuchTableException("Invalid identifier: %s", from);
}
Preconditions.checkArgument(isValidIdentifier(from), "Invalid identifier: %s", from);

TableIdentifier to = removeCatalogName(originalTo);
Preconditions.checkArgument(isValidIdentifier(to), "Invalid identifier: %s", to);
Expand All @@ -248,6 +346,16 @@ private void renameTableOrView(
"Cannot rename %s to %s. Namespace does not exist: %s", from, to, to.namespace());
}

if (tableExists(to)) {
throw new org.apache.iceberg.exceptions.AlreadyExistsException(
"Cannot rename %s to %s. Table already exists", from, to);
}

if (viewExists(to)) {
throw new org.apache.iceberg.exceptions.AlreadyExistsException(
"Cannot rename %s to %s. View already exists", from, to);
}

String toDatabase = to.namespace().level(0);
String fromDatabase = from.namespace().level(0);
String fromName = from.name();
Expand All @@ -268,7 +376,12 @@ private void renameTableOrView(
LOG.info("Renamed {} from {}, to {}", contentType.value(), from, to);

} catch (NoSuchObjectException e) {
throw new NoSuchTableException("Table does not exist: %s", from);
switch (contentType) {
case TABLE:
throw new NoSuchTableException("Cannot rename %s to %s. Table does not exist", from, to);
case VIEW:
throw new NoSuchViewException("Cannot rename %s to %s. View does not exist", from, to);
}

} catch (InvalidOperationException e) {
if (e.getMessage() != null
Expand All @@ -295,7 +408,7 @@ private void validateTableIsIcebergTableOrView(
HiveOperationsBase.validateTableIsIceberg(table, fullName);
break;
case VIEW:
throw new UnsupportedOperationException("View is not supported.");
HiveOperationsBase.validateTableIsIcebergView(table, fullName);
}
}

Expand Down Expand Up @@ -522,6 +635,11 @@ public TableOperations newTableOps(TableIdentifier tableIdentifier) {
return ops;
}

@Override
protected ViewOperations newViewOps(TableIdentifier identifier) {
return new HiveViewOperations(conf, clients, fileIO, name, identifier);
}

@Override
protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
// This is a little edgy since we basically duplicate the HMS location generation logic.
Expand Down Expand Up @@ -660,4 +778,72 @@ void setListAllTables(boolean listAllTables) {
ClientPool<IMetaStoreClient, TException> clientPool() {
return clients;
}

/**
* The purpose of this class is to add view detection only for Hive-Specific tables. Hive catalog
* follows checks at different levels: 1. During refresh, it validates if the table is an iceberg
* table or not. 2. During commit, it validates if there is any concurrent commit with table or
* table-name already exists. This class helps to do the validation on an early basis.
*/
private class ViewAwareTableBuilder extends BaseMetastoreViewCatalogTableBuilder {

private final TableIdentifier identifier;

private ViewAwareTableBuilder(TableIdentifier identifier, Schema schema) {
super(identifier, schema);
this.identifier = identifier;
}

@Override
public Transaction createOrReplaceTransaction() {
if (viewExists(identifier)) {
throw new org.apache.iceberg.exceptions.AlreadyExistsException(
"View with same name already exists: %s", identifier);
}
return super.createOrReplaceTransaction();
}

@Override
public org.apache.iceberg.Table create() {
if (viewExists(identifier)) {
throw new org.apache.iceberg.exceptions.AlreadyExistsException(
"View with same name already exists: %s", identifier);
}
return super.create();
}
}

/**
* The purpose of this class is to add table detection only for Hive-Specific view. Hive catalog
* follows checks at different levels: 1. During refresh, it validates if the view is an iceberg
* view or not. 2. During commit, it validates if there is any concurrent commit with view or
* view-name already exists. This class helps to do the validation on an early basis.
*/
private class TableAwareViewBuilder extends BaseViewBuilder {

private final TableIdentifier identifier;

private TableAwareViewBuilder(TableIdentifier identifier) {
super(identifier);
this.identifier = identifier;
}

@Override
public View createOrReplace() {
if (tableExists(identifier)) {
throw new org.apache.iceberg.exceptions.AlreadyExistsException(
"Table with same name already exists: %s", identifier);
}
return super.createOrReplace();
}

@Override
public View create() {
if (tableExists(identifier)) {
throw new org.apache.iceberg.exceptions.AlreadyExistsException(
"Table with same name already exists: %s", identifier);
}
return super.create();
}
}
}
Loading
Loading