From 80396fe8fc3cafe913906c89488e98dbc27c55df Mon Sep 17 00:00:00 2001 From: AbdulRehman Faraj Date: Fri, 15 Mar 2024 18:04:25 +0000 Subject: [PATCH] Extended QPT to athena-saphana --- .../connectors/saphana/SaphanaMetadataHandler.java | 6 ++++++ .../connectors/saphana/SaphanaRecordHandler.java | 11 +++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/athena-saphana/src/main/java/com/amazonaws/athena/connectors/saphana/SaphanaMetadataHandler.java b/athena-saphana/src/main/java/com/amazonaws/athena/connectors/saphana/SaphanaMetadataHandler.java index 551d8eb199..b0c9aca0a3 100644 --- a/athena-saphana/src/main/java/com/amazonaws/athena/connectors/saphana/SaphanaMetadataHandler.java +++ b/athena-saphana/src/main/java/com/amazonaws/athena/connectors/saphana/SaphanaMetadataHandler.java @@ -19,6 +19,7 @@ * #L% */ package com.amazonaws.athena.connectors.saphana; + import com.amazonaws.athena.connector.lambda.QueryStatusChecker; import com.amazonaws.athena.connector.lambda.data.Block; import com.amazonaws.athena.connector.lambda.data.BlockAllocator; @@ -135,6 +136,7 @@ public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAlloca capabilities.put(DataSourceOptimizations.SUPPORTS_TOP_N_PUSHDOWN.withSupportedSubTypes(TopNPushdownSubType.SUPPORTS_ORDER_BY)); + jdbcQueryPassthrough.addQueryPassthroughCapabilityIfEnabled(capabilities, configOptions); return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build()); } @@ -228,6 +230,10 @@ public void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest getTabl public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsRequest getSplitsRequest) { LOGGER.debug("{}: Catalog {}, table {}", getSplitsRequest.getQueryId(), getSplitsRequest.getTableName().getSchemaName(), getSplitsRequest.getTableName().getTableName()); + if (getSplitsRequest.getConstraints().isQueryPassThrough()) { + LOGGER.info("QPT Split Requested"); + return setupQueryPassthroughSplit(getSplitsRequest); + } int partitionContd = decodeContinuationToken(getSplitsRequest); Set splits = new HashSet<>(); Block partitions = getSplitsRequest.getPartitions(); diff --git a/athena-saphana/src/main/java/com/amazonaws/athena/connectors/saphana/SaphanaRecordHandler.java b/athena-saphana/src/main/java/com/amazonaws/athena/connectors/saphana/SaphanaRecordHandler.java index 4e12d59079..c65656c45f 100644 --- a/athena-saphana/src/main/java/com/amazonaws/athena/connectors/saphana/SaphanaRecordHandler.java +++ b/athena-saphana/src/main/java/com/amazonaws/athena/connectors/saphana/SaphanaRecordHandler.java @@ -86,8 +86,15 @@ public PreparedStatement buildSplitSql(Connection jdbcConnection, String catalog { LOGGER.debug("SaphanaQueryStringBuilder::buildSplitSql SplitQueryBuilder class {}", jdbcSplitQueryBuilder.getClass().getName()); - PreparedStatement preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, - tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + PreparedStatement preparedStatement; + + if (constraints.isQueryPassThrough()) { + preparedStatement = buildQueryPassthroughSql(jdbcConnection, constraints); + } + else { + preparedStatement = jdbcSplitQueryBuilder.buildSql(jdbcConnection, null, + tableName.getSchemaName(), tableName.getTableName(), schema, constraints, split); + } LOGGER.debug("SaphanaQueryStringBuilder::buildSplitSql clearing field children from schema"); clearChildren(schema);