diff --git a/.gitignore b/.gitignore
index 37d101daaa..a6480b1401 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,3 +21,4 @@ package-lock.json
cdk.out
.env
**/*.dylib
+*/dynamodb-local-metadata.json
diff --git a/athena-dynamodb/pom.xml b/athena-dynamodb/pom.xml
index fd44faefe9..2ae4c5c4e9 100644
--- a/athena-dynamodb/pom.xml
+++ b/athena-dynamodb/pom.xml
@@ -8,6 +8,17 @@
4.0.0
athena-dynamodb
2022.47.1
+
+
+
+ software.amazon.awssdk
+ bom
+ 2.25.4
+ pom
+ import
+
+
+
com.amazonaws
@@ -20,23 +31,30 @@
athena-federation-integ-test
2022.47.1
test
-
-
- com.amazonaws
- aws-java-sdk-dynamodb
- ${aws-sdk.version}
-
- commons-logging
- commons-logging
+ com.amazonaws
+ aws-java-sdk-sts
+
+ software.amazon.awssdk
+ dynamodb
+
+
+ software.amazon.awssdk
+ dynamodb-enhanced
+
com.amazonaws
DynamoDBLocal
- 2.0.0
+ LATEST
+ test
+
+
+ software.amazon.awssdk
+ url-connection-client
test
@@ -93,6 +111,14 @@
test-jar
test
+
+ software.amazon.awssdk
+ sdk-core
+
+
+ software.amazon.awssdk
+ sts
+
diff --git a/athena-dynamodb/src/main/java/com/amazonaws/athena/connectors/dynamodb/DynamoDBMetadataHandler.java b/athena-dynamodb/src/main/java/com/amazonaws/athena/connectors/dynamodb/DynamoDBMetadataHandler.java
index e2379cce9d..ef15f9683d 100644
--- a/athena-dynamodb/src/main/java/com/amazonaws/athena/connectors/dynamodb/DynamoDBMetadataHandler.java
+++ b/athena-dynamodb/src/main/java/com/amazonaws/athena/connectors/dynamodb/DynamoDBMetadataHandler.java
@@ -19,7 +19,6 @@
*/
package com.amazonaws.athena.connectors.dynamodb;
-import com.amazonaws.athena.connector.credentials.CrossAccountCredentialsProvider;
import com.amazonaws.athena.connector.lambda.QueryStatusChecker;
import com.amazonaws.athena.connector.lambda.ThrottlingInvoker;
import com.amazonaws.athena.connector.lambda.data.Block;
@@ -43,6 +42,7 @@
import com.amazonaws.athena.connector.lambda.metadata.glue.GlueFieldLexer;
import com.amazonaws.athena.connector.lambda.security.EncryptionKeyFactory;
import com.amazonaws.athena.connectors.dynamodb.constants.DynamoDBConstants;
+import com.amazonaws.athena.connectors.dynamodb.credentials.CrossAccountCredentialsProviderV2;
import com.amazonaws.athena.connectors.dynamodb.model.DynamoDBIndex;
import com.amazonaws.athena.connectors.dynamodb.model.DynamoDBPaginatedTables;
import com.amazonaws.athena.connectors.dynamodb.model.DynamoDBTable;
@@ -53,10 +53,6 @@
import com.amazonaws.athena.connectors.dynamodb.util.DDBTypeUtils;
import com.amazonaws.athena.connectors.dynamodb.util.IncrementingValueNameProducer;
import com.amazonaws.services.athena.AmazonAthena;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
-import com.amazonaws.services.dynamodbv2.document.ItemUtils;
-import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.glue.AWSGlue;
import com.amazonaws.services.glue.model.Database;
import com.amazonaws.services.glue.model.Table;
@@ -69,6 +65,9 @@
import org.apache.arrow.vector.types.pojo.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.enhanced.dynamodb.document.EnhancedDocument;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import java.util.ArrayList;
import java.util.Collections;
@@ -99,6 +98,7 @@
import static com.amazonaws.athena.connectors.dynamodb.constants.DynamoDBConstants.SEGMENT_ID_PROPERTY;
import static com.amazonaws.athena.connectors.dynamodb.constants.DynamoDBConstants.TABLE_METADATA;
import static com.amazonaws.athena.connectors.dynamodb.throttling.DynamoDBExceptionFilter.EXCEPTION_FILTER;
+import static com.amazonaws.athena.connectors.dynamodb.util.DDBTypeUtils.toAttributeValue;
/**
* Handles metadata requests for the Athena DynamoDB Connector.
@@ -131,15 +131,15 @@ public class DynamoDBMetadataHandler
private static final DatabaseFilter DB_FILTER = (Database database) -> (database.getLocationUri() != null && database.getLocationUri().contains(DYNAMO_DB_FLAG));
private final ThrottlingInvoker invoker;
- private final AmazonDynamoDB ddbClient;
+ private final DynamoDbClient ddbClient;
private final AWSGlue glueClient;
private final DynamoDBTableResolver tableResolver;
public DynamoDBMetadataHandler(java.util.Map configOptions)
{
super(SOURCE_TYPE, configOptions);
- this.ddbClient = AmazonDynamoDBClientBuilder.standard()
- .withCredentials(CrossAccountCredentialsProvider.getCrossAccountCredentialsIfPresent(configOptions, "DynamoDBMetadataHandler_CrossAccountRoleSession"))
+ this.ddbClient = DynamoDbClient.builder()
+ .credentialsProvider(CrossAccountCredentialsProviderV2.getCrossAccountCredentialsIfPresent(configOptions, "DynamoDBMetadataHandler_CrossAccountRoleSession"))
.build();
this.glueClient = getAwsGlue();
this.invoker = ThrottlingInvoker.newDefaultBuilder(EXCEPTION_FILTER, configOptions).build();
@@ -153,7 +153,7 @@ public DynamoDBMetadataHandler(java.util.Map configOptions)
AmazonAthena athena,
String spillBucket,
String spillPrefix,
- AmazonDynamoDB ddbClient,
+ DynamoDbClient ddbClient,
AWSGlue glueClient,
java.util.Map configOptions)
{
@@ -400,8 +400,9 @@ private void precomputeAdditionalMetadata(Set columnsToIgnore, Map splitMetadata = new HashMap<>(partitionMetadata);
Object hashKeyValue = DDBTypeUtils.convertArrowTypeIfNecessary(hashKeyName, hashKeyValueReader.readObject());
- String hashKeyValueJSON = Jackson.toJsonString(ItemUtils.toAttributeValue(hashKeyValue));
- splitMetadata.put(hashKeyName, hashKeyValueJSON);
+ splitMetadata.put(hashKeyName, DDBTypeUtils.attributeToJson(toAttributeValue(hashKeyValue), hashKeyName));
splits.add(new Split(spillLocation, makeEncryptionKey(), splitMetadata));
diff --git a/athena-dynamodb/src/main/java/com/amazonaws/athena/connectors/dynamodb/DynamoDBRecordHandler.java b/athena-dynamodb/src/main/java/com/amazonaws/athena/connectors/dynamodb/DynamoDBRecordHandler.java
index f52f36c290..ce74d92cd1 100644
--- a/athena-dynamodb/src/main/java/com/amazonaws/athena/connectors/dynamodb/DynamoDBRecordHandler.java
+++ b/athena-dynamodb/src/main/java/com/amazonaws/athena/connectors/dynamodb/DynamoDBRecordHandler.java
@@ -19,8 +19,6 @@
*/
package com.amazonaws.athena.connectors.dynamodb;
-import com.amazonaws.AmazonWebServiceRequest;
-import com.amazonaws.athena.connector.credentials.CrossAccountCredentialsProvider;
import com.amazonaws.athena.connector.lambda.QueryStatusChecker;
import com.amazonaws.athena.connector.lambda.ThrottlingInvoker;
import com.amazonaws.athena.connector.lambda.data.Block;
@@ -31,30 +29,31 @@
import com.amazonaws.athena.connector.lambda.domain.predicate.Constraints;
import com.amazonaws.athena.connector.lambda.handlers.RecordHandler;
import com.amazonaws.athena.connector.lambda.records.ReadRecordsRequest;
+import com.amazonaws.athena.connectors.dynamodb.credentials.CrossAccountCredentialsProviderV2;
import com.amazonaws.athena.connectors.dynamodb.resolver.DynamoDBFieldResolver;
import com.amazonaws.athena.connectors.dynamodb.util.DDBPredicateUtils;
import com.amazonaws.athena.connectors.dynamodb.util.DDBRecordMetadata;
import com.amazonaws.athena.connectors.dynamodb.util.DDBTypeUtils;
import com.amazonaws.services.athena.AmazonAthena;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
-import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
-import com.amazonaws.services.dynamodbv2.model.AttributeValue;
-import com.amazonaws.services.dynamodbv2.model.QueryRequest;
-import com.amazonaws.services.dynamodbv2.model.QueryResult;
-import com.amazonaws.services.dynamodbv2.model.ScanRequest;
-import com.amazonaws.services.dynamodbv2.model.ScanResult;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import com.amazonaws.util.json.Jackson;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import org.apache.arrow.util.VisibleForTesting;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.enhanced.dynamodb.document.EnhancedDocument;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryResponse;
+import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
+import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import java.io.IOException;
import java.util.ArrayList;
@@ -103,14 +102,14 @@ public class DynamoDBRecordHandler
private static final TypeReference> ATTRIBUTE_VALUE_MAP_TYPE_REFERENCE = new TypeReference>() {};
private final LoadingCache invokerCache;
- private final AmazonDynamoDB ddbClient;
+ private final DynamoDbClient ddbClient;
public DynamoDBRecordHandler(java.util.Map configOptions)
{
super(sourceType, configOptions);
- this.ddbClient = AmazonDynamoDBClientBuilder.standard()
- .withCredentials(CrossAccountCredentialsProvider.getCrossAccountCredentialsIfPresent(configOptions, "DynamoDBRecordHandler_CrossAccountRoleSession"))
- .build();
+ this.ddbClient = DynamoDbClient.builder()
+ .credentialsProvider(CrossAccountCredentialsProviderV2.getCrossAccountCredentialsIfPresent(configOptions, "DynamoDBMetadataHandler_CrossAccountRoleSession"))
+ .build();
this.invokerCache = CacheBuilder.newBuilder().build(
new CacheLoader() {
@Override
@@ -124,7 +123,7 @@ public ThrottlingInvoker load(String tableName)
}
@VisibleForTesting
- DynamoDBRecordHandler(AmazonDynamoDB ddbClient, AmazonS3 amazonS3, AWSSecretsManager secretsManager, AmazonAthena athena, String sourceType, java.util.Map configOptions)
+ DynamoDBRecordHandler(DynamoDbClient ddbClient, AmazonS3 amazonS3, AWSSecretsManager secretsManager, AmazonAthena athena, String sourceType, java.util.Map configOptions)
{
super(amazonS3, secretsManager, athena, sourceType, configOptions);
this.ddbClient = ddbClient;
@@ -259,10 +258,15 @@ private List getRangeValues(String rangeKeyFilter)
return rangeValues;
}
+ private boolean isQueryRequest(Split split)
+ {
+ return split.getProperty(SEGMENT_ID_PROPERTY) == null;
+ }
+
/*
- Converts a split into a Query or Scan request
+ Converts a split into a Query
*/
- private AmazonWebServiceRequest buildReadRequest(Split split, String tableName, Schema schema, Constraints constraints, boolean disableProjectionAndCasing)
+ private QueryRequest buildQueryRequest(Split split, String tableName, Schema schema, Constraints constraints, boolean disableProjectionAndCasing, Map exclusiveStartKey)
{
validateExpectedMetadata(split.getProperties());
// prepare filters
@@ -273,7 +277,7 @@ private AmazonWebServiceRequest buildReadRequest(Split split, String tableName,
if (rangeKeyFilter != null || nonKeyFilter != null) {
try {
expressionAttributeNames.putAll(Jackson.getObjectMapper().readValue(split.getProperty(EXPRESSION_NAMES_METADATA), STRING_MAP_TYPE_REFERENCE));
- expressionAttributeValues.putAll(Jackson.getObjectMapper().readValue(split.getProperty(EXPRESSION_VALUES_METADATA), ATTRIBUTE_VALUE_MAP_TYPE_REFERENCE));
+ expressionAttributeValues.putAll(EnhancedDocument.fromJson(split.getProperty(EXPRESSION_VALUES_METADATA)).toMap());
}
catch (IOException e) {
throw new RuntimeException(e);
@@ -290,58 +294,89 @@ private AmazonWebServiceRequest buildReadRequest(Split split, String tableName,
})
.collect(Collectors.joining(","));
- boolean isQuery = split.getProperty(SEGMENT_ID_PROPERTY) == null;
-
- if (isQuery) {
- // prepare key condition expression
- String indexName = split.getProperty(INDEX_METADATA);
- String hashKeyName = split.getProperty(HASH_KEY_NAME_METADATA);
- String hashKeyAlias = DDBPredicateUtils.aliasColumn(hashKeyName);
- String keyConditionExpression = hashKeyAlias + " = " + HASH_KEY_VALUE_ALIAS;
- if (rangeKeyFilter != null) {
- if (rangeFilterHasIn(rangeKeyFilter)) {
- List rangeKeyValues = getRangeValues(rangeKeyFilter);
- for (String value : rangeKeyValues) {
- expressionAttributeValues.remove(value);
- }
- }
- else {
- keyConditionExpression += " AND " + rangeKeyFilter;
+ // prepare key condition expression
+ String indexName = split.getProperty(INDEX_METADATA);
+ String hashKeyName = split.getProperty(HASH_KEY_NAME_METADATA);
+ String hashKeyAlias = DDBPredicateUtils.aliasColumn(hashKeyName);
+ String keyConditionExpression = hashKeyAlias + " = " + HASH_KEY_VALUE_ALIAS;
+ if (rangeKeyFilter != null) {
+ if (rangeFilterHasIn(rangeKeyFilter)) {
+ List rangeKeyValues = getRangeValues(rangeKeyFilter);
+ for (String value : rangeKeyValues) {
+ expressionAttributeValues.remove(value);
}
}
- expressionAttributeNames.put(hashKeyAlias, hashKeyName);
- expressionAttributeValues.put(HASH_KEY_VALUE_ALIAS, Jackson.fromJsonString(split.getProperty(hashKeyName), AttributeValue.class));
-
- QueryRequest queryRequest = new QueryRequest()
- .withTableName(tableName)
- .withIndexName(indexName)
- .withKeyConditionExpression(keyConditionExpression)
- .withFilterExpression(nonKeyFilter)
- .withExpressionAttributeNames(expressionAttributeNames)
- .withExpressionAttributeValues(expressionAttributeValues)
- .withProjectionExpression(projectionExpression);
- if (canApplyLimit(constraints)) {
- queryRequest.setLimit((int) constraints.getLimit());
+ else {
+ keyConditionExpression += " AND " + rangeKeyFilter;
}
- return queryRequest;
}
- else {
- int segmentId = Integer.parseInt(split.getProperty(SEGMENT_ID_PROPERTY));
- int segmentCount = Integer.parseInt(split.getProperty(SEGMENT_COUNT_METADATA));
-
- ScanRequest scanRequest = new ScanRequest()
- .withTableName(tableName)
- .withSegment(segmentId)
- .withTotalSegments(segmentCount)
- .withFilterExpression(nonKeyFilter)
- .withExpressionAttributeNames(expressionAttributeNames.isEmpty() ? null : expressionAttributeNames)
- .withExpressionAttributeValues(expressionAttributeValues.isEmpty() ? null : expressionAttributeValues)
- .withProjectionExpression(projectionExpression);
- if (canApplyLimit(constraints)) {
- scanRequest.setLimit((int) constraints.getLimit());
+ expressionAttributeNames.put(hashKeyAlias, hashKeyName);
+
+ AttributeValue hashKeyAttribute = DDBTypeUtils.jsonToAttributeValue(split.getProperty(hashKeyName), hashKeyName);
+ expressionAttributeValues.put(HASH_KEY_VALUE_ALIAS, hashKeyAttribute);
+
+ QueryRequest.Builder queryRequestBuilder = QueryRequest.builder()
+ .tableName(tableName)
+ .indexName(indexName)
+ .keyConditionExpression(keyConditionExpression)
+ .filterExpression(nonKeyFilter)
+ .expressionAttributeNames(expressionAttributeNames)
+ .expressionAttributeValues(expressionAttributeValues)
+ .projectionExpression(projectionExpression)
+ .exclusiveStartKey(exclusiveStartKey);
+ if (canApplyLimit(constraints)) {
+ queryRequestBuilder.limit((int) constraints.getLimit());
+ }
+ return queryRequestBuilder.build();
+ }
+
+ /*
+ Converts a split into a Scan Request
+ */
+ private ScanRequest buildScanRequest(Split split, String tableName, Schema schema, Constraints constraints, boolean disableProjectionAndCasing, Map exclusiveStartKey)
+ {
+ validateExpectedMetadata(split.getProperties());
+ // prepare filters
+ String rangeKeyFilter = split.getProperty(RANGE_KEY_FILTER_METADATA);
+ String nonKeyFilter = split.getProperty(NON_KEY_FILTER_METADATA);
+ Map expressionAttributeNames = new HashMap<>();
+ Map expressionAttributeValues = new HashMap<>();
+ if (rangeKeyFilter != null || nonKeyFilter != null) {
+ try {
+ expressionAttributeNames.putAll(Jackson.getObjectMapper().readValue(split.getProperty(EXPRESSION_NAMES_METADATA), STRING_MAP_TYPE_REFERENCE));
+ expressionAttributeValues.putAll(EnhancedDocument.fromJson(split.getProperty(EXPRESSION_VALUES_METADATA)).toMap());
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
}
- return scanRequest;
}
+
+ // Only read columns that are needed in the query
+ String projectionExpression = disableProjectionAndCasing ? null : schema.getFields()
+ .stream()
+ .map(field -> {
+ String aliasedName = DDBPredicateUtils.aliasColumn(field.getName());
+ expressionAttributeNames.put(aliasedName, field.getName());
+ return aliasedName;
+ })
+ .collect(Collectors.joining(","));
+
+ int segmentId = Integer.parseInt(split.getProperty(SEGMENT_ID_PROPERTY));
+ int segmentCount = Integer.parseInt(split.getProperty(SEGMENT_COUNT_METADATA));
+
+ ScanRequest.Builder scanRequestBuilder = ScanRequest.builder()
+ .tableName(tableName)
+ .segment(segmentId)
+ .totalSegments(segmentCount)
+ .filterExpression(nonKeyFilter)
+ .expressionAttributeNames(expressionAttributeNames.isEmpty() ? null : expressionAttributeNames)
+ .expressionAttributeValues(expressionAttributeValues.isEmpty() ? null : expressionAttributeValues)
+ .projectionExpression(projectionExpression)
+ .exclusiveStartKey(exclusiveStartKey);
+ if (canApplyLimit(constraints)) {
+ scanRequestBuilder.limit((int) constraints.getLimit());
+ }
+ return scanRequestBuilder.build();
}
/*
@@ -349,7 +384,6 @@ private AmazonWebServiceRequest buildReadRequest(Split split, String tableName,
*/
private Iterator