From f5e87fd8ddf54ca9917083076214fccf26c51774 Mon Sep 17 00:00:00 2001 From: "akshay.kachore" Date: Fri, 15 Mar 2024 14:35:00 +0530 Subject: [PATCH 1/4] qpt changes for vertica --- athena-vertica/pom.xml | 6 + .../vertica/VerticaMetadataHandler.java | 226 ++++++++++++++---- .../vertica/VerticaQueryPassthrough.java | 68 ++++++ .../vertica/query/QueryFactory.java | 5 + .../query/VerticaExportQueryBuilder.java | 21 +- athena-vertica/src/main/resources/Vertica.stg | 4 + 6 files changed, 280 insertions(+), 50 deletions(-) create mode 100644 athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaQueryPassthrough.java diff --git a/athena-vertica/pom.xml b/athena-vertica/pom.xml index 3ab6c47d01..a29cfad470 100644 --- a/athena-vertica/pom.xml +++ b/athena-vertica/pom.xml @@ -66,6 +66,12 @@ ST4 ${antlr.st4.version} + + com.amazonaws + athena-jdbc + 2022.47.1 + compile + diff --git a/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaMetadataHandler.java b/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaMetadataHandler.java index 6f6e9aa02a..31be1d39f4 100644 --- a/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaMetadataHandler.java +++ b/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaMetadataHandler.java @@ -25,13 +25,29 @@ import com.amazonaws.athena.connector.lambda.data.Block; import com.amazonaws.athena.connector.lambda.data.BlockAllocator; import com.amazonaws.athena.connector.lambda.data.BlockWriter; +import com.amazonaws.athena.connector.lambda.data.FieldBuilder; import com.amazonaws.athena.connector.lambda.data.SchemaBuilder; +import com.amazonaws.athena.connector.lambda.data.SupportedTypes; import com.amazonaws.athena.connector.lambda.domain.Split; import com.amazonaws.athena.connector.lambda.domain.TableName; import com.amazonaws.athena.connector.lambda.domain.predicate.Constraints; import com.amazonaws.athena.connector.lambda.handlers.MetadataHandler; -import com.amazonaws.athena.connector.lambda.metadata.*; +import com.amazonaws.athena.connector.lambda.metadata.GetDataSourceCapabilitiesRequest; +import com.amazonaws.athena.connector.lambda.metadata.GetDataSourceCapabilitiesResponse; +import com.amazonaws.athena.connector.lambda.metadata.GetSplitsRequest; +import com.amazonaws.athena.connector.lambda.metadata.GetSplitsResponse; +import com.amazonaws.athena.connector.lambda.metadata.GetTableLayoutRequest; +import com.amazonaws.athena.connector.lambda.metadata.GetTableRequest; +import com.amazonaws.athena.connector.lambda.metadata.GetTableResponse; +import com.amazonaws.athena.connector.lambda.metadata.ListSchemasRequest; +import com.amazonaws.athena.connector.lambda.metadata.ListSchemasResponse; +import com.amazonaws.athena.connector.lambda.metadata.ListTablesRequest; +import com.amazonaws.athena.connector.lambda.metadata.ListTablesResponse; +import com.amazonaws.athena.connector.lambda.metadata.MetadataRequest; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.OptimizationSubType; import com.amazonaws.athena.connector.lambda.security.EncryptionKeyFactory; +import com.amazonaws.athena.connectors.jdbc.manager.JdbcArrowTypeConverter; +import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough; import com.amazonaws.athena.connectors.vertica.query.QueryFactory; import com.amazonaws.athena.connectors.vertica.query.VerticaExportQueryBuilder; import com.amazonaws.services.athena.AmazonAthena; @@ -41,17 +57,29 @@ import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.amazonaws.services.secretsmanager.AWSSecretsManager; -import org.apache.commons.lang3.StringUtils; +import com.google.common.collect.ImmutableMap; import org.apache.arrow.util.VisibleForTesting; import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.stringtemplate.v4.ST; -import java.sql.*; -import java.util.*; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; public class VerticaMetadataHandler @@ -77,7 +105,8 @@ public class VerticaMetadataHandler private final VerticaSchemaUtils verticaSchemaUtils; private AmazonS3 amazonS3; - public VerticaMetadataHandler(java.util.Map configOptions) + private final VerticaQueryPassthrough queryPassthrough = new VerticaQueryPassthrough(); + public VerticaMetadataHandler(Map configOptions) { super(SOURCE_TYPE, configOptions); amazonS3 = AmazonS3ClientBuilder.defaultClient(); @@ -87,15 +116,15 @@ public VerticaMetadataHandler(java.util.Map configOptions) @VisibleForTesting protected VerticaMetadataHandler( - EncryptionKeyFactory keyFactory, - VerticaConnectionFactory connectionFactory, - AWSSecretsManager awsSecretsManager, - AmazonAthena athena, - String spillBucket, - String spillPrefix, - VerticaSchemaUtils verticaSchemaUtils, - AmazonS3 amazonS3, - java.util.Map configOptions) + EncryptionKeyFactory keyFactory, + VerticaConnectionFactory connectionFactory, + AWSSecretsManager awsSecretsManager, + AmazonAthena athena, + String spillBucket, + String spillPrefix, + VerticaSchemaUtils verticaSchemaUtils, + AmazonS3 amazonS3, + Map configOptions) { super(keyFactory, awsSecretsManager, athena, SOURCE_TYPE, spillBucket, spillPrefix, configOptions); this.connectionFactory = connectionFactory; @@ -174,6 +203,80 @@ public ListTablesResponse doListTables(BlockAllocator allocator, ListTablesReque return new ListTablesResponse(request.getCatalogName(), tables, null); } + protected ArrowType getArrayArrowTypeFromTypeName(String typeName, int precision, int scale) + { + // Default ARRAY type is VARCHAR. + return new ArrowType.Utf8(); + } + @Override + public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAllocator allocator, GetDataSourceCapabilitiesRequest request) + { + ImmutableMap.Builder> capabilities = ImmutableMap.builder(); + queryPassthrough.addQueryPassthroughCapabilityIfEnabled(capabilities, configOptions); + + return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build()); + } + + + @Override + public GetTableResponse doGetQueryPassthroughSchema(final BlockAllocator blockAllocator, final GetTableRequest getTableRequest) + throws Exception + { + System.out.println("inside doGetQueryPassthroughSchema ========="); + if (!getTableRequest.isQueryPassthrough()) { + throw new IllegalArgumentException("No Query passed through [{}]" + getTableRequest); + } + + queryPassthrough.verify(getTableRequest.getQueryPassthroughArguments()); + String customerPassedQuery = getTableRequest.getQueryPassthroughArguments().get(JdbcQueryPassthrough.QUERY); + + try (Connection connection = getConnection(getTableRequest)) { + PreparedStatement preparedStatement = connection.prepareStatement(customerPassedQuery); + ResultSetMetaData metadata = preparedStatement.getMetaData(); + if (metadata == null) { + throw new UnsupportedOperationException("Query not supported: ResultSetMetaData not available for query: " + customerPassedQuery); + } + SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder(); + + for (int columnIndex = 1; columnIndex <= metadata.getColumnCount(); columnIndex++) { + String columnName = metadata.getColumnName(columnIndex); + String columnLabel = metadata.getColumnLabel(columnIndex); + //todo; is there a mechanism to pass both back to the engine? + columnName = columnName.equals(columnLabel) ? columnName : columnLabel; + + int precision = metadata.getPrecision(columnIndex); + int scale = metadata.getScale(columnIndex); + + ArrowType columnType = JdbcArrowTypeConverter.toArrowType( + metadata.getColumnType(columnIndex), + precision, + scale, + configOptions); + + if (columnType != null && SupportedTypes.isSupported(columnType)) { + if (columnType instanceof ArrowType.List) { + schemaBuilder.addListField(columnName, getArrayArrowTypeFromTypeName( + metadata.getTableName(columnIndex), + metadata.getColumnDisplaySize(columnIndex), + precision)); + } + else { + schemaBuilder.addField(FieldBuilder.newBuilder(columnName, columnType).build()); + } + } + else { + // Default to VARCHAR ArrowType + logger.warn("getSchema: Unable to map type for column[" + columnName + + "] to a supported type, attempted " + columnType + " - defaulting type to VARCHAR."); + schemaBuilder.addField(FieldBuilder.newBuilder(columnName, new ArrowType.Utf8()).build()); + } + } + + Schema schema = schemaBuilder.build(); + return new GetTableResponse(getTableRequest.getCatalogName(), getTableRequest.getTableName(), schema, Collections.emptySet()); + } + } + /** * Used to get definition (field names, types, descriptions, etc...) of a Table. @@ -200,7 +303,7 @@ public GetTableResponse doGetTable(BlockAllocator allocator, GetTableRequest req request.getTableName(), schema, partitionCols - ); + ); } /** @@ -228,7 +331,7 @@ public void enhancePartitionSchema(SchemaBuilder partitionSchemaBuilder, GetTabl * @param queryStatusChecker A QueryStatusChecker that you can use to stop doing work for a query that has already terminated */ @Override - public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest request, QueryStatusChecker queryStatusChecker) throws SQLException { + public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest request, QueryStatusChecker queryStatusChecker) throws SQLException { logger.info("in getPartitions: "+ request); Schema schemaName = request.getSchema(); @@ -243,20 +346,29 @@ public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest request //Build the SQL query Connection connection = getConnection(request); - DatabaseMetaData dbMetadata = connection.getMetaData(); - ResultSet definition = dbMetadata.getColumns(null, tableName.getSchemaName(), tableName.getTableName(), null); + + // if QPT get input query from Athena console + //else old logic VerticaExportQueryBuilder queryBuilder = queryFactory.createVerticaExportQueryBuilder(); + String preparedSQLStmt; - String preparedSQLStmt = queryBuilder.withS3ExportBucket(s3ExportBucket) - .withQueryID(queryID) - .withColumns(definition, schemaName) - .fromTable(tableName.getSchemaName(), tableName.getTableName()) - .withConstraints(constraints, schemaName) - .build(); + if (!request.getTableName().getQualifiedTableName().equalsIgnoreCase(queryPassthrough.getFunctionSignature())) { - logger.info("Vertica Export Statement: {}", preparedSQLStmt); + DatabaseMetaData dbMetadata = connection.getMetaData(); + ResultSet definition = dbMetadata.getColumns(null, tableName.getSchemaName(), tableName.getTableName(), null); + + preparedSQLStmt = queryBuilder.withS3ExportBucket(s3ExportBucket) + .withQueryID(queryID) + .withColumns(definition, schemaName) + .fromTable(tableName.getSchemaName(), tableName.getTableName()) + .withConstraints(constraints, schemaName) + .build(); + } else { + preparedSQLStmt = null; + } + logger.info("Vertica Export Statement: {}", preparedSQLStmt); // Build the Set AWS Region SQL String awsRegionSql = queryBuilder.buildSetAwsRegionSql(amazonS3.getRegion().toString()); @@ -294,18 +406,30 @@ public GetSplitsResponse doGetSplits(BlockAllocator allocator, GetSplitsRequest Set splits = new HashSet<>(); String exportBucket = getS3ExportBucket(); String queryId = request.getQueryId().replace("-",""); - + Constraints constraints = request.getConstraints(); + String s3ExportBucket = getS3ExportBucket(); + String sqlStatement; //testing if the user has access to the requested table - testAccess(connection, request.getTableName()); + + FieldReader fieldReaderQid = request.getPartitions().getFieldReader("queryId"); + String queryID = fieldReaderQid.readText().toString(); //get the SQL statement which was created in getPartitions FieldReader fieldReaderPS = request.getPartitions().getFieldReader("preparedStmt"); - String sqlStatement = fieldReaderPS.readText().toString(); + if (constraints.isQueryPassThrough()) { + String preparedSQL = buildQueryPassthroughSql(constraints); + VerticaExportQueryBuilder queryBuilder = queryFactory.createQptVerticaExportQueryBuilder(); + sqlStatement = queryBuilder.withS3ExportBucket(s3ExportBucket) + .withQueryID(queryID) + .withPreparedStatementSQL(preparedSQL).build(); + logger.info("Vertica Export Statement: {}", sqlStatement); + } + else { + testAccess(connection, request.getTableName()); + sqlStatement = fieldReaderPS.readText().toString(); + } String catalogName = request.getCatalogName(); - FieldReader fieldReaderQid = request.getPartitions().getFieldReader("queryId"); - String queryID = fieldReaderQid.readText().toString(); - FieldReader fieldReaderAwsRegion = request.getPartitions().getFieldReader("awsRegionSql"); String awsRegionSql = fieldReaderAwsRegion.readText().toString(); @@ -313,9 +437,9 @@ public GetSplitsResponse doGetSplits(BlockAllocator allocator, GetSplitsRequest //execute the queries on Vertica executeQueriesOnVertica(connection, sqlStatement, awsRegionSql); - /* - * For each generated S3 object, create a split and add data to the split. - */ + /* + * For each generated S3 object, create a split and add data to the split. + */ Split split; List s3ObjectSummaries = getlistExportedObjects(exportBucket, queryId); @@ -336,19 +460,19 @@ public GetSplitsResponse doGetSplits(BlockAllocator allocator, GetSplitsRequest return new GetSplitsResponse(catalogName, splits); } else - { - //No records were exported by Vertica for the issued query, creating a "empty" split - logger.info("No records were exported by Vertica"); - split = Split.newBuilder(makeSpillLocation(request), makeEncryptionKey()) - .add("query_id", queryID) - .add(VERTICA_CONN_STR, getConnStr(request)) - .add("exportBucket", exportBucket) - .add("s3ObjectKey", EMPTY_STRING) - .build(); - splits.add(split); - logger.info("doGetSplits: exit - " + splits.size()); - return new GetSplitsResponse(catalogName,split); - } + { + //No records were exported by Vertica for the issued query, creating a "empty" split + logger.info("No records were exported by Vertica"); + split = Split.newBuilder(makeSpillLocation(request), makeEncryptionKey()) + .add("query_id", queryID) + .add(VERTICA_CONN_STR, getConnStr(request)) + .add("exportBucket", exportBucket) + .add("s3ObjectKey", EMPTY_STRING) + .build(); + splits.add(split); + logger.info("doGetSplits: exit - " + splits.size()); + return new GetSplitsResponse(catalogName,split); + } } @@ -409,7 +533,13 @@ private void testAccess(Connection conn, TableName table) { public String getS3ExportBucket() { - return configOptions.get(EXPORT_BUCKET_KEY); + return configOptions.get(EXPORT_BUCKET_KEY); + } + + public String buildQueryPassthroughSql(Constraints constraints) throws SQLException + { + queryPassthrough.verify(constraints.getQueryPassthroughArguments()); + return constraints.getQueryPassthroughArguments().get(VerticaQueryPassthrough.QUERY); } } diff --git a/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaQueryPassthrough.java b/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaQueryPassthrough.java new file mode 100644 index 0000000000..e8ff0bc37a --- /dev/null +++ b/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaQueryPassthrough.java @@ -0,0 +1,68 @@ +/*- + * #%L + * athena-vertica + * %% + * Copyright (C) 2019 - 2024 Amazon Web Services + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package com.amazonaws.athena.connectors.vertica; + +import com.amazonaws.athena.connector.lambda.metadata.optimizations.querypassthrough.QueryPassthroughSignature; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; + +public class VerticaQueryPassthrough implements QueryPassthroughSignature +{ + // Constant value representing the name of the query. + public static final String NAME = "query"; + + // Constant value representing the domain of the query. + public static final String SCHEMA_NAME = "system"; + + // List of arguments for the query, statically initialized as it always contains the same value. + public static final String QUERY = "QUERY"; + + public static final List ARGUMENTS = Arrays.asList(QUERY); + + private static final Logger LOGGER = LoggerFactory.getLogger(VerticaQueryPassthrough.class); + + @Override + public String getFunctionSchema() + { + return SCHEMA_NAME; + } + + @Override + public String getFunctionName() + { + return NAME; + } + + @Override + public List getFunctionArguments() + { + return ARGUMENTS; + } + + @Override + public Logger getLogger() + { + return LOGGER; + } + +} diff --git a/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/query/QueryFactory.java b/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/query/QueryFactory.java index 1d98bfd79e..6b4f53c711 100644 --- a/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/query/QueryFactory.java +++ b/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/query/QueryFactory.java @@ -108,4 +108,9 @@ public VerticaExportQueryBuilder createVerticaExportQueryBuilder() { return new VerticaExportQueryBuilder(getQueryTemplate(VerticaExportQueryBuilder.getTemplateName())); } + + public VerticaExportQueryBuilder createQptVerticaExportQueryBuilder() + { + return new VerticaExportQueryBuilder(getQueryTemplate(VerticaExportQueryBuilder.getQptTemplateName())); + } } diff --git a/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/query/VerticaExportQueryBuilder.java b/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/query/VerticaExportQueryBuilder.java index 417ebef512..ac00846817 100644 --- a/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/query/VerticaExportQueryBuilder.java +++ b/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/query/VerticaExportQueryBuilder.java @@ -42,6 +42,7 @@ public class VerticaExportQueryBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(VerticaExportQueryBuilder.class); private static final String TEMPLATE_NAME = "templateVerticaExportQuery"; + private static final String QPT_TEMPLATE_NAME = "templateVerticaExportQPTQuery"; private static final String TEMPLATE_FIELD = "builder"; private static final String QUOTE_CHARS = "\""; private final ST query; @@ -50,7 +51,7 @@ public class VerticaExportQueryBuilder { private String queryID; private String colNames; private String constraintValues; - + private String preparedStatementSQL; public VerticaExportQueryBuilder(ST template) { @@ -62,6 +63,11 @@ static String getTemplateName() return TEMPLATE_NAME; } + static String getQptTemplateName() + { + return QPT_TEMPLATE_NAME; + } + public String getTable(){return table;} public VerticaExportQueryBuilder fromTable(String schemaName, String tableName) @@ -72,6 +78,17 @@ public VerticaExportQueryBuilder fromTable(String schemaName, String tableName) public String getColNames() {return colNames;} + public VerticaExportQueryBuilder withPreparedStatementSQL(String preparedStatementSQL) + { + this.preparedStatementSQL = preparedStatementSQL; + return this; + } + + public String getPreparedStatementSQL() + { + return preparedStatementSQL; + } + // get the column names from user issued query in Athena public VerticaExportQueryBuilder withColumns(ResultSet definition, Schema tableSchema) throws SQLException { //get column name and type from the Schema in a hashmap for future use @@ -221,7 +238,7 @@ public VerticaExportQueryBuilder withQueryID(String queryID) public String build() { Validate.notNull(s3ExportBucket, "s3ExportBucket can not be null."); - Validate.notNull(table, "table can not be null."); + Validate.notNull(table != null ? table : preparedStatementSQL, "table can not be null."); Validate.notNull(queryID, "queryID can not be null."); query.add(TEMPLATE_FIELD, this); diff --git a/athena-vertica/src/main/resources/Vertica.stg b/athena-vertica/src/main/resources/Vertica.stg index 686f7932ed..beed4c3024 100644 --- a/athena-vertica/src/main/resources/Vertica.stg +++ b/athena-vertica/src/main/resources/Vertica.stg @@ -13,3 +13,7 @@ templateVerticaExportQuery(builder) ::= <% EXPORT TO PARQUET(directory = 's3:///', Compression='snappy', fileSizeMB=16, rowGroupSizeMB=16) AS SELECT FROM %> +templateVerticaExportQPTQuery(builder) ::= <% +EXPORT TO PARQUET(directory = 's3:///', Compression='snappy', fileSizeMB=16, rowGroupSizeMB=16) AS +%> + From f413e9663856688fb18463a34d69b6f98f6513de Mon Sep 17 00:00:00 2001 From: "akshay.kachore" Date: Thu, 21 Mar 2024 17:25:37 +0530 Subject: [PATCH 2/4] checkstyle issue --- .../athena/connectors/vertica/VerticaMetadataHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaMetadataHandler.java b/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaMetadataHandler.java index 31be1d39f4..a728f50509 100644 --- a/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaMetadataHandler.java +++ b/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaMetadataHandler.java @@ -222,7 +222,6 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca public GetTableResponse doGetQueryPassthroughSchema(final BlockAllocator blockAllocator, final GetTableRequest getTableRequest) throws Exception { - System.out.println("inside doGetQueryPassthroughSchema ========="); if (!getTableRequest.isQueryPassthrough()) { throw new IllegalArgumentException("No Query passed through [{}]" + getTableRequest); } From 50d9d637867f344e4e1eddb8201859c204d971ef Mon Sep 17 00:00:00 2001 From: "akshay.kachore" Date: Thu, 4 Apr 2024 11:00:29 +0530 Subject: [PATCH 3/4] qpt review comment changes --- .../vertica/VerticaMetadataHandler.java | 35 +---- .../vertica/VerticaSchemaUtils.java | 131 +++++++++--------- 2 files changed, 71 insertions(+), 95 deletions(-) diff --git a/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaMetadataHandler.java b/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaMetadataHandler.java index a728f50509..9d340fb3d6 100644 --- a/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaMetadataHandler.java +++ b/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaMetadataHandler.java @@ -25,9 +25,7 @@ import com.amazonaws.athena.connector.lambda.data.Block; import com.amazonaws.athena.connector.lambda.data.BlockAllocator; import com.amazonaws.athena.connector.lambda.data.BlockWriter; -import com.amazonaws.athena.connector.lambda.data.FieldBuilder; import com.amazonaws.athena.connector.lambda.data.SchemaBuilder; -import com.amazonaws.athena.connector.lambda.data.SupportedTypes; import com.amazonaws.athena.connector.lambda.domain.Split; import com.amazonaws.athena.connector.lambda.domain.TableName; import com.amazonaws.athena.connector.lambda.domain.predicate.Constraints; @@ -46,7 +44,6 @@ import com.amazonaws.athena.connector.lambda.metadata.MetadataRequest; import com.amazonaws.athena.connector.lambda.metadata.optimizations.OptimizationSubType; import com.amazonaws.athena.connector.lambda.security.EncryptionKeyFactory; -import com.amazonaws.athena.connectors.jdbc.manager.JdbcArrowTypeConverter; import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough; import com.amazonaws.athena.connectors.vertica.query.QueryFactory; import com.amazonaws.athena.connectors.vertica.query.VerticaExportQueryBuilder; @@ -81,6 +78,8 @@ import java.util.Set; import java.util.UUID; +import static com.amazonaws.athena.connectors.vertica.VerticaSchemaUtils.convertToArrowType; + public class VerticaMetadataHandler extends MetadataHandler @@ -240,35 +239,8 @@ public GetTableResponse doGetQueryPassthroughSchema(final BlockAllocator blockAl for (int columnIndex = 1; columnIndex <= metadata.getColumnCount(); columnIndex++) { String columnName = metadata.getColumnName(columnIndex); String columnLabel = metadata.getColumnLabel(columnIndex); - //todo; is there a mechanism to pass both back to the engine? columnName = columnName.equals(columnLabel) ? columnName : columnLabel; - - int precision = metadata.getPrecision(columnIndex); - int scale = metadata.getScale(columnIndex); - - ArrowType columnType = JdbcArrowTypeConverter.toArrowType( - metadata.getColumnType(columnIndex), - precision, - scale, - configOptions); - - if (columnType != null && SupportedTypes.isSupported(columnType)) { - if (columnType instanceof ArrowType.List) { - schemaBuilder.addListField(columnName, getArrayArrowTypeFromTypeName( - metadata.getTableName(columnIndex), - metadata.getColumnDisplaySize(columnIndex), - precision)); - } - else { - schemaBuilder.addField(FieldBuilder.newBuilder(columnName, columnType).build()); - } - } - else { - // Default to VARCHAR ArrowType - logger.warn("getSchema: Unable to map type for column[" + columnName + - "] to a supported type, attempted " + columnType + " - defaulting type to VARCHAR."); - schemaBuilder.addField(FieldBuilder.newBuilder(columnName, new ArrowType.Utf8()).build()); - } + convertToArrowType(schemaBuilder, columnName, metadata.getColumnTypeName(columnIndex)); } Schema schema = schemaBuilder.build(); @@ -276,7 +248,6 @@ public GetTableResponse doGetQueryPassthroughSchema(final BlockAllocator blockAl } } - /** * Used to get definition (field names, types, descriptions, etc...) of a Table. * diff --git a/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaSchemaUtils.java b/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaSchemaUtils.java index 9c3d0d2ef2..6939f8d2bb 100644 --- a/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaSchemaUtils.java +++ b/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaSchemaUtils.java @@ -47,69 +47,7 @@ protected Schema buildTableSchema(Connection connection, TableName name) while(definition.next()) { String colType = definition.getString("TYPE_NAME").toUpperCase(); - switch (colType) - { - //If Bit - case "BIT": - { - tableSchemaBuilder.addBitField(definition.getString("COLUMN_NAME")); - break; - } - //If TinyInt - case "TINYINT": - { - tableSchemaBuilder.addTinyIntField(definition.getString("COLUMN_NAME")); - break; - } - //If SmallInt - case "SMALLINT": - { - tableSchemaBuilder.addSmallIntField(definition.getString("COLUMN_NAME")); - break; - } - //If Int - case "INTEGER": - //If BIGINT - case "BIGINT": { - tableSchemaBuilder.addBigIntField(definition.getString("COLUMN_NAME")); - break; - } - //If FLOAT4 - case "FLOAT4": - { - tableSchemaBuilder.addFloat4Field(definition.getString("COLUMN_NAME")); - break; - } - //If FLOAT8 - case "FLOAT8": - { - tableSchemaBuilder.addFloat8Field(definition.getString("COLUMN_NAME")); - break; - } - //If DECIMAL/NUMERIC - case "NUMERIC": - { - tableSchemaBuilder.addDecimalField(definition.getString("COLUMN_NAME"), 10, 2); - break; - } - //If VARCHAR - case "BOOLEAN": - case "VARCHAR": - case "TIMESTAMPTZ": - case "TIMESTAMP": { - tableSchemaBuilder.addStringField(definition.getString("COLUMN_NAME")); - break; - } - //If DATETIME - case "DATETIME": - { - tableSchemaBuilder.addDateDayField(definition.getString("COLUMN_NAME")); - break; - } - - default: - tableSchemaBuilder.addStringField(definition.getString("COLUMN_NAME")); - } + convertToArrowType(tableSchemaBuilder, definition.getString("COLUMN_NAME"), colType); } return tableSchemaBuilder.build(); @@ -120,4 +58,71 @@ protected Schema buildTableSchema(Connection connection, TableName name) } } + + public static void convertToArrowType(SchemaBuilder tableSchemaBuilder, String colName, String colType) throws SQLException + { + switch (colType) + { + //If Bit + case "BIT": + { + tableSchemaBuilder.addBitField(colName); + break; + } + //If TinyInt + case "TINYINT": + { + tableSchemaBuilder.addTinyIntField(colName); + break; + } + //If SmallInt + case "SMALLINT": + { + tableSchemaBuilder.addSmallIntField(colName); + break; + } + //If Int + case "INTEGER": + //If BIGINT + case "BIGINT": { + tableSchemaBuilder.addBigIntField(colName); + break; + } + //If FLOAT4 + case "FLOAT4": + { + tableSchemaBuilder.addFloat4Field(colName); + break; + } + //If FLOAT8 + case "FLOAT8": + { + tableSchemaBuilder.addFloat8Field(colName); + break; + } + //If DECIMAL/NUMERIC + case "NUMERIC": + { + tableSchemaBuilder.addDecimalField(colName, 10, 2); + break; + } + //If VARCHAR + case "BOOLEAN": + case "VARCHAR": + case "TIMESTAMPTZ": + case "TIMESTAMP": { + tableSchemaBuilder.addStringField(colName); + break; + } + //If DATETIME + case "DATETIME": + { + tableSchemaBuilder.addDateDayField(colName); + break; + } + + default: + tableSchemaBuilder.addStringField(colName); + } + } } From 21521894eaf55798b9426f5404eecc9b82c8059a Mon Sep 17 00:00:00 2001 From: "akshay.kachore" Date: Fri, 5 Apr 2024 12:55:41 +0530 Subject: [PATCH 4/4] qpt review comment changes --- .../athena/connectors/vertica/VerticaMetadataHandler.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaMetadataHandler.java b/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaMetadataHandler.java index 9d340fb3d6..a0156f2819 100644 --- a/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaMetadataHandler.java +++ b/athena-vertica/src/main/java/com/amazonaws/athena/connectors/vertica/VerticaMetadataHandler.java @@ -44,7 +44,6 @@ import com.amazonaws.athena.connector.lambda.metadata.MetadataRequest; import com.amazonaws.athena.connector.lambda.metadata.optimizations.OptimizationSubType; import com.amazonaws.athena.connector.lambda.security.EncryptionKeyFactory; -import com.amazonaws.athena.connectors.jdbc.qpt.JdbcQueryPassthrough; import com.amazonaws.athena.connectors.vertica.query.QueryFactory; import com.amazonaws.athena.connectors.vertica.query.VerticaExportQueryBuilder; import com.amazonaws.services.athena.AmazonAthena; @@ -226,7 +225,7 @@ public GetTableResponse doGetQueryPassthroughSchema(final BlockAllocator blockAl } queryPassthrough.verify(getTableRequest.getQueryPassthroughArguments()); - String customerPassedQuery = getTableRequest.getQueryPassthroughArguments().get(JdbcQueryPassthrough.QUERY); + String customerPassedQuery = getTableRequest.getQueryPassthroughArguments().get(VerticaQueryPassthrough.QUERY); try (Connection connection = getConnection(getTableRequest)) { PreparedStatement preparedStatement = connection.prepareStatement(customerPassedQuery);