Skip to content

Commit

Permalink
Migrate DynamoDB Connector to use AWS SDK V2
Browse files Browse the repository at this point in the history
  • Loading branch information
AbdulRehman Faraj committed Mar 12, 2024
1 parent 4d8bcaa commit 6f40a93
Show file tree
Hide file tree
Showing 19 changed files with 1,173 additions and 546 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ package-lock.json
cdk.out
.env
**/*.dylib
*/dynamodb-local-metadata.json
44 changes: 35 additions & 9 deletions athena-dynamodb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>athena-dynamodb</artifactId>
<version>2022.47.1</version>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.25.4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
Expand All @@ -20,23 +31,30 @@
<artifactId>athena-federation-integ-test</artifactId>
<version>2022.47.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-dynamodb</artifactId>
<version>${aws-sdk.version}</version>
<exclusions>
<!-- replaced with jcl-over-slf4j -->
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb-enhanced</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>DynamoDBLocal</artifactId>
<version>2.0.0</version>
<version>LATEST</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>url-connection-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -93,6 +111,14 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
</dependency>
</dependencies>
<repositories>
<repository>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
*/
package com.amazonaws.athena.connectors.dynamodb;

import com.amazonaws.athena.connector.credentials.CrossAccountCredentialsProvider;
import com.amazonaws.athena.connector.lambda.QueryStatusChecker;
import com.amazonaws.athena.connector.lambda.ThrottlingInvoker;
import com.amazonaws.athena.connector.lambda.data.Block;
Expand All @@ -43,6 +42,7 @@
import com.amazonaws.athena.connector.lambda.metadata.glue.GlueFieldLexer;
import com.amazonaws.athena.connector.lambda.security.EncryptionKeyFactory;
import com.amazonaws.athena.connectors.dynamodb.constants.DynamoDBConstants;
import com.amazonaws.athena.connectors.dynamodb.credentials.CrossAccountCredentialsProviderV2;
import com.amazonaws.athena.connectors.dynamodb.model.DynamoDBIndex;
import com.amazonaws.athena.connectors.dynamodb.model.DynamoDBPaginatedTables;
import com.amazonaws.athena.connectors.dynamodb.model.DynamoDBTable;
Expand All @@ -53,10 +53,6 @@
import com.amazonaws.athena.connectors.dynamodb.util.DDBTypeUtils;
import com.amazonaws.athena.connectors.dynamodb.util.IncrementingValueNameProducer;
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.ItemUtils;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.glue.AWSGlue;
import com.amazonaws.services.glue.model.Database;
import com.amazonaws.services.glue.model.Table;
Expand All @@ -69,6 +65,9 @@
import org.apache.arrow.vector.types.pojo.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.enhanced.dynamodb.document.EnhancedDocument;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -99,6 +98,7 @@
import static com.amazonaws.athena.connectors.dynamodb.constants.DynamoDBConstants.SEGMENT_ID_PROPERTY;
import static com.amazonaws.athena.connectors.dynamodb.constants.DynamoDBConstants.TABLE_METADATA;
import static com.amazonaws.athena.connectors.dynamodb.throttling.DynamoDBExceptionFilter.EXCEPTION_FILTER;
import static com.amazonaws.athena.connectors.dynamodb.util.DDBTypeUtils.toAttributeValue;

/**
* Handles metadata requests for the Athena DynamoDB Connector.
Expand Down Expand Up @@ -131,15 +131,15 @@ public class DynamoDBMetadataHandler
private static final DatabaseFilter DB_FILTER = (Database database) -> (database.getLocationUri() != null && database.getLocationUri().contains(DYNAMO_DB_FLAG));

private final ThrottlingInvoker invoker;
private final AmazonDynamoDB ddbClient;
private final DynamoDbClient ddbClient;
private final AWSGlue glueClient;
private final DynamoDBTableResolver tableResolver;

public DynamoDBMetadataHandler(java.util.Map<String, String> configOptions)
{
super(SOURCE_TYPE, configOptions);
this.ddbClient = AmazonDynamoDBClientBuilder.standard()
.withCredentials(CrossAccountCredentialsProvider.getCrossAccountCredentialsIfPresent(configOptions, "DynamoDBMetadataHandler_CrossAccountRoleSession"))
this.ddbClient = DynamoDbClient.builder()
.credentialsProvider(CrossAccountCredentialsProviderV2.getCrossAccountCredentialsIfPresent(configOptions, "DynamoDBMetadataHandler_CrossAccountRoleSession"))
.build();
this.glueClient = getAwsGlue();
this.invoker = ThrottlingInvoker.newDefaultBuilder(EXCEPTION_FILTER, configOptions).build();
Expand All @@ -153,7 +153,7 @@ public DynamoDBMetadataHandler(java.util.Map<String, String> configOptions)
AmazonAthena athena,
String spillBucket,
String spillPrefix,
AmazonDynamoDB ddbClient,
DynamoDbClient ddbClient,
AWSGlue glueClient,
java.util.Map<String, String> configOptions)
{
Expand Down Expand Up @@ -400,8 +400,9 @@ private void precomputeAdditionalMetadata(Set<String> columnsToIgnore, Map<Strin
for (AttributeValue value : accumulator) {
expressionValueMapping.put(valueNameProducer2.getNext(), value);
}

partitionsSchemaBuilder.addMetadata(EXPRESSION_NAMES_METADATA, Jackson.toJsonString(aliasedColumns));
partitionsSchemaBuilder.addMetadata(EXPRESSION_VALUES_METADATA, Jackson.toJsonString(expressionValueMapping));
partitionsSchemaBuilder.addMetadata(EXPRESSION_VALUES_METADATA, EnhancedDocument.fromAttributeValueMap(expressionValueMapping).toJson());
}
}

Expand Down Expand Up @@ -436,8 +437,7 @@ public GetSplitsResponse doGetSplits(BlockAllocator allocator, GetSplitsRequest
Map<String, String> splitMetadata = new HashMap<>(partitionMetadata);

Object hashKeyValue = DDBTypeUtils.convertArrowTypeIfNecessary(hashKeyName, hashKeyValueReader.readObject());
String hashKeyValueJSON = Jackson.toJsonString(ItemUtils.toAttributeValue(hashKeyValue));
splitMetadata.put(hashKeyName, hashKeyValueJSON);
splitMetadata.put(hashKeyName, DDBTypeUtils.attributeToJson(toAttributeValue(hashKeyValue), hashKeyName));

splits.add(new Split(spillLocation, makeEncryptionKey(), splitMetadata));

Expand Down
Loading

0 comments on commit 6f40a93

Please sign in to comment.