From 6ed970ada35dd56dc1be6c843b90feaf8db5f855 Mon Sep 17 00:00:00 2001 From: Jithendar Trianz Date: Tue, 19 Mar 2024 17:20:29 +0530 Subject: [PATCH 1/2] Extended QPT to athena-timestream --- .../timestream/TimestreamMetadataHandler.java | 53 ++++++++++- .../timestream/TimestreamRecordHandler.java | 23 +++-- .../qpt/TimestreamQueryPassthrough.java | 93 +++++++++++++++++++ 3 files changed, 160 insertions(+), 9 deletions(-) create mode 100644 athena-timestream/src/main/java/com/amazonaws/athena/connectors/timestream/qpt/TimestreamQueryPassthrough.java diff --git a/athena-timestream/src/main/java/com/amazonaws/athena/connectors/timestream/TimestreamMetadataHandler.java b/athena-timestream/src/main/java/com/amazonaws/athena/connectors/timestream/TimestreamMetadataHandler.java index d7cdb2efe0..fa39e0939f 100644 --- a/athena-timestream/src/main/java/com/amazonaws/athena/connectors/timestream/TimestreamMetadataHandler.java +++ b/athena-timestream/src/main/java/com/amazonaws/athena/connectors/timestream/TimestreamMetadataHandler.java @@ -26,6 +26,8 @@ import com.amazonaws.athena.connector.lambda.domain.Split; import com.amazonaws.athena.connector.lambda.domain.TableName; import com.amazonaws.athena.connector.lambda.handlers.GlueMetadataHandler; +import com.amazonaws.athena.connector.lambda.metadata.GetDataSourceCapabilitiesRequest; +import com.amazonaws.athena.connector.lambda.metadata.GetDataSourceCapabilitiesResponse; import com.amazonaws.athena.connector.lambda.metadata.GetSplitsRequest; import com.amazonaws.athena.connector.lambda.metadata.GetSplitsResponse; import com.amazonaws.athena.connector.lambda.metadata.GetTableLayoutRequest; @@ -35,14 +37,17 @@ import com.amazonaws.athena.connector.lambda.metadata.ListSchemasResponse; import com.amazonaws.athena.connector.lambda.metadata.ListTablesRequest; import com.amazonaws.athena.connector.lambda.metadata.ListTablesResponse; +import com.amazonaws.athena.connector.lambda.metadata.optimizations.OptimizationSubType; import com.amazonaws.athena.connector.lambda.security.EncryptionKeyFactory; import com.amazonaws.athena.connector.util.PaginatedRequestIterator; +import com.amazonaws.athena.connectors.timestream.qpt.TimestreamQueryPassthrough; import com.amazonaws.athena.connectors.timestream.query.QueryFactory; import com.amazonaws.services.athena.AmazonAthena; import com.amazonaws.services.glue.AWSGlue; import com.amazonaws.services.glue.model.Table; import com.amazonaws.services.secretsmanager.AWSSecretsManager; import com.amazonaws.services.timestreamquery.AmazonTimestreamQuery; +import com.amazonaws.services.timestreamquery.model.ColumnInfo; import com.amazonaws.services.timestreamquery.model.Datum; import com.amazonaws.services.timestreamquery.model.QueryRequest; import com.amazonaws.services.timestreamquery.model.QueryResult; @@ -51,13 +56,16 @@ import com.amazonaws.services.timestreamwrite.model.ListDatabasesRequest; import com.amazonaws.services.timestreamwrite.model.ListDatabasesResult; import com.amazonaws.services.timestreamwrite.model.ListTablesResult; +import com.google.common.collect.ImmutableMap; import org.apache.arrow.util.VisibleForTesting; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -85,12 +93,15 @@ public class TimestreamMetadataHandler private final AmazonTimestreamQuery tsQuery; private final AmazonTimestreamWrite tsMeta; + private final TimestreamQueryPassthrough queryPassthrough; + public TimestreamMetadataHandler(java.util.Map configOptions) { super(SOURCE_TYPE, configOptions); glue = getAwsGlue(); tsQuery = TimestreamClientBuilder.buildQueryClient(SOURCE_TYPE); tsMeta = TimestreamClientBuilder.buildWriteClient(SOURCE_TYPE); + queryPassthrough = new TimestreamQueryPassthrough(); } @VisibleForTesting @@ -109,6 +120,16 @@ protected TimestreamMetadataHandler( this.glue = glue; this.tsQuery = tsQuery; this.tsMeta = tsMeta; + queryPassthrough = new TimestreamQueryPassthrough(); + } + + @Override + public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAllocator allocator, GetDataSourceCapabilitiesRequest request) + { + ImmutableMap.Builder> capabilities = ImmutableMap.builder(); + this.queryPassthrough.addQueryPassthroughCapabilityIfEnabled(capabilities, this.configOptions); + + return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build()); } @Override @@ -289,6 +310,27 @@ public GetTableResponse doGetTable(BlockAllocator blockAllocator, GetTableReques } } + @Override + public GetTableResponse doGetQueryPassthroughSchema(BlockAllocator allocator, GetTableRequest request) throws Exception + { + if (!request.isQueryPassthrough()) { + throw new IllegalArgumentException("No Query passed through [{}]" + request); + } + + queryPassthrough.verify(request.getQueryPassthroughArguments()); + String customerPassedQuery = request.getQueryPassthroughArguments().get(TimestreamQueryPassthrough.QUERY); + QueryRequest queryRequest = new QueryRequest().withQueryString(customerPassedQuery).withMaxRows(1); + QueryResult queryResult = tsQuery.query(queryRequest); + List columnInfo = queryResult.getColumnInfo(); + SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder(); + for (ColumnInfo column : columnInfo) { + Field nextField = TimestreamSchemaUtils.makeField(column.getName(), column.getType().getScalarType().toLowerCase()); + schemaBuilder.addField(nextField); + } + + return new GetTableResponse(request.getCatalogName(), request.getTableName(), schemaBuilder.build(), Collections.emptySet()); + } + /** * Our table doesn't support complex layouts or partitioning so we simply make this method a NoOp. * @@ -308,7 +350,16 @@ public GetSplitsResponse doGetSplits(BlockAllocator allocator, GetSplitsRequest { //Since we do not support connector level parallelism for this source at the moment, we generate a single //basic split. - Split split = Split.newBuilder(makeSpillLocation(request), makeEncryptionKey()).build(); + Split split; + if (request.getConstraints().isQueryPassThrough()) { + logger.info("QPT Split Requested"); + Map qptArguments = request.getConstraints().getQueryPassthroughArguments(); + split = Split.newBuilder(makeSpillLocation(request), makeEncryptionKey()).applyProperties(qptArguments).build(); + } + else { + split = Split.newBuilder(makeSpillLocation(request), makeEncryptionKey()).build(); + } + return new GetSplitsResponse(request.getCatalogName(), split); } } diff --git a/athena-timestream/src/main/java/com/amazonaws/athena/connectors/timestream/TimestreamRecordHandler.java b/athena-timestream/src/main/java/com/amazonaws/athena/connectors/timestream/TimestreamRecordHandler.java index 1ce06dd3b3..7c70089abc 100644 --- a/athena-timestream/src/main/java/com/amazonaws/athena/connectors/timestream/TimestreamRecordHandler.java +++ b/athena-timestream/src/main/java/com/amazonaws/athena/connectors/timestream/TimestreamRecordHandler.java @@ -37,6 +37,7 @@ import com.amazonaws.athena.connector.lambda.handlers.GlueMetadataHandler; import com.amazonaws.athena.connector.lambda.handlers.RecordHandler; import com.amazonaws.athena.connector.lambda.records.ReadRecordsRequest; +import com.amazonaws.athena.connectors.timestream.qpt.TimestreamQueryPassthrough; import com.amazonaws.athena.connectors.timestream.query.QueryFactory; import com.amazonaws.athena.connectors.timestream.query.SelectQueryBuilder; import com.amazonaws.services.athena.AmazonAthena; @@ -88,6 +89,7 @@ public class TimestreamRecordHandler private final QueryFactory queryFactory = new QueryFactory(); private final AmazonTimestreamQuery tsQuery; + private final TimestreamQueryPassthrough queryPassthrough = new TimestreamQueryPassthrough(); public TimestreamRecordHandler(java.util.Map configOptions) { @@ -115,14 +117,19 @@ protected TimestreamRecordHandler(AmazonS3 amazonS3, AWSSecretsManager secretsMa protected void readWithConstraint(BlockSpiller spiller, ReadRecordsRequest recordsRequest, QueryStatusChecker queryStatusChecker) { TableName tableName = recordsRequest.getTableName(); - - SelectQueryBuilder queryBuilder = queryFactory.createSelectQueryBuilder(GlueMetadataHandler.VIEW_METADATA_FIELD); - - String query = queryBuilder.withDatabaseName(tableName.getSchemaName()) - .withTableName(tableName.getTableName()) - .withProjection(recordsRequest.getSchema()) - .withConjucts(recordsRequest.getConstraints()) - .build(); + String query; + if (recordsRequest.getConstraints().isQueryPassThrough()) { + queryPassthrough.verify(recordsRequest.getConstraints().getQueryPassthroughArguments()); + query = recordsRequest.getConstraints().getQueryPassthroughArguments().get(TimestreamQueryPassthrough.QUERY); + } + else { + SelectQueryBuilder queryBuilder = queryFactory.createSelectQueryBuilder(GlueMetadataHandler.VIEW_METADATA_FIELD); + query = queryBuilder.withDatabaseName(tableName.getSchemaName()) + .withTableName(tableName.getTableName()) + .withProjection(recordsRequest.getSchema()) + .withConjucts(recordsRequest.getConstraints()) + .build(); + } logger.info("readWithConstraint: query[{}]", query); diff --git a/athena-timestream/src/main/java/com/amazonaws/athena/connectors/timestream/qpt/TimestreamQueryPassthrough.java b/athena-timestream/src/main/java/com/amazonaws/athena/connectors/timestream/qpt/TimestreamQueryPassthrough.java new file mode 100644 index 0000000000..a8c8635094 --- /dev/null +++ b/athena-timestream/src/main/java/com/amazonaws/athena/connectors/timestream/qpt/TimestreamQueryPassthrough.java @@ -0,0 +1,93 @@ +/*- + * #%L + * athena-timestream + * %% + * Copyright (C) 2019 Amazon Web Services + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package com.amazonaws.athena.connectors.timestream.qpt; + +import com.amazonaws.athena.connector.lambda.metadata.optimizations.querypassthrough.QueryPassthroughSignature; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +public class TimestreamQueryPassthrough implements QueryPassthroughSignature +{ + // Constant value representing the name of the query. + public static final String NAME = "query"; + + // Constant value representing the domain of the query. + public static final String SCHEMA_NAME = "system"; + + // List of arguments for the query, statically initialized as it always contains the same value. + public static final String QUERY = "QUERY"; + + public static final List ARGUMENTS = Arrays.asList(QUERY); + + private static final Logger LOGGER = LoggerFactory.getLogger(TimestreamQueryPassthrough.class); + + @Override + public String getFunctionSchema() + { + return SCHEMA_NAME; + } + + @Override + public String getFunctionName() + { + return NAME; + } + + @Override + public List getFunctionArguments() + { + return ARGUMENTS; + } + + @Override + public Logger getLogger() + { + return LOGGER; + } + + @Override + public void customConnectorVerifications(Map engineQptArguments) + { + String partiQLStatement = engineQptArguments.get(QUERY); + String upperCaseStatement = partiQLStatement.trim().toUpperCase(Locale.ENGLISH); + + // Immediately check if the statement starts with "SELECT" + if (!upperCaseStatement.startsWith("SELECT")) { + throw new UnsupportedOperationException("Statement does not start with SELECT."); + } + + // List of disallowed keywords + Set disallowedKeywords = ImmutableSet.of("INSERT", "UPDATE", "DELETE", "CREATE", "DROP", "ALTER"); + + // Check if the statement contains any disallowed keywords + for (String keyword : disallowedKeywords) { + if (upperCaseStatement.contains(" " + keyword + " ") || upperCaseStatement.startsWith(keyword + " ")) { + throw new UnsupportedOperationException("Unaccepted operation; only SELECT statements are allowed. Found: " + keyword); + } + } + } +} From f0908eb1607b9de784366d81494e72a2947da9db Mon Sep 17 00:00:00 2001 From: Jithendar Trianz Date: Fri, 22 Mar 2024 20:23:52 +0530 Subject: [PATCH 2/2] address review comments --- .../connectors/timestream/TimestreamMetadataHandler.java | 1 + .../timestream/qpt/TimestreamQueryPassthrough.java | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/athena-timestream/src/main/java/com/amazonaws/athena/connectors/timestream/TimestreamMetadataHandler.java b/athena-timestream/src/main/java/com/amazonaws/athena/connectors/timestream/TimestreamMetadataHandler.java index fa39e0939f..990673f527 100644 --- a/athena-timestream/src/main/java/com/amazonaws/athena/connectors/timestream/TimestreamMetadataHandler.java +++ b/athena-timestream/src/main/java/com/amazonaws/athena/connectors/timestream/TimestreamMetadataHandler.java @@ -320,6 +320,7 @@ public GetTableResponse doGetQueryPassthroughSchema(BlockAllocator allocator, Ge queryPassthrough.verify(request.getQueryPassthroughArguments()); String customerPassedQuery = request.getQueryPassthroughArguments().get(TimestreamQueryPassthrough.QUERY); QueryRequest queryRequest = new QueryRequest().withQueryString(customerPassedQuery).withMaxRows(1); + // Timestream Query does not provide a way to conduct a dry run or retrieve metadata results without execution. Therefore, we need to "seek" at least once before obtaining metadata. QueryResult queryResult = tsQuery.query(queryRequest); List columnInfo = queryResult.getColumnInfo(); SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder(); diff --git a/athena-timestream/src/main/java/com/amazonaws/athena/connectors/timestream/qpt/TimestreamQueryPassthrough.java b/athena-timestream/src/main/java/com/amazonaws/athena/connectors/timestream/qpt/TimestreamQueryPassthrough.java index a8c8635094..3461c330cc 100644 --- a/athena-timestream/src/main/java/com/amazonaws/athena/connectors/timestream/qpt/TimestreamQueryPassthrough.java +++ b/athena-timestream/src/main/java/com/amazonaws/athena/connectors/timestream/qpt/TimestreamQueryPassthrough.java @@ -72,8 +72,8 @@ public Logger getLogger() @Override public void customConnectorVerifications(Map engineQptArguments) { - String partiQLStatement = engineQptArguments.get(QUERY); - String upperCaseStatement = partiQLStatement.trim().toUpperCase(Locale.ENGLISH); + String customerPassedQuery = engineQptArguments.get(QUERY); + String upperCaseStatement = customerPassedQuery.trim().toUpperCase(Locale.ENGLISH); // Immediately check if the statement starts with "SELECT" if (!upperCaseStatement.startsWith("SELECT")) { @@ -85,7 +85,7 @@ public void customConnectorVerifications(Map engineQptArguments) // Check if the statement contains any disallowed keywords for (String keyword : disallowedKeywords) { - if (upperCaseStatement.contains(" " + keyword + " ") || upperCaseStatement.startsWith(keyword + " ")) { + if (upperCaseStatement.contains(keyword)) { throw new UnsupportedOperationException("Unaccepted operation; only SELECT statements are allowed. Found: " + keyword); } }