From b245a87401e5828c1e551ffcf5b60d5cfb70160a Mon Sep 17 00:00:00 2001 From: ejeffrli Date: Mon, 2 Oct 2023 14:22:36 -0400 Subject: [PATCH] Implement validation tests with PostgreSQL and Redshift. Add spill bucket to workflow Add redshift to readme --- .github/workflows/run_release_tests.yml | 1 + validation_testing/README.md | 15 ++++- .../app/lib/app.ts | 6 ++ .../app/lib/stacks/dynamo-stack.ts | 6 +- .../app/lib/stacks/opensearch-stack.ts | 6 +- .../app/lib/stacks/rds-generic-stack.ts | 6 +- .../app/lib/stacks/redshift-stack.ts | 8 ++- .../app/lib/stacks/stack-props.ts | 1 + validation_testing/cleanup.sh | 20 +++++++ validation_testing/deploy_infra.sh | 25 ++++++++ validation_testing/main.py | 6 +- validation_testing/run_glue.sh | 16 +++++ validation_testing/run_release_tests.sh | 2 + validation_testing/run_tests.sh | 7 +++ .../scripts/exec_release_test_queries.py | 59 +++++++++++++++---- 15 files changed, 155 insertions(+), 29 deletions(-) create mode 100644 validation_testing/cleanup.sh create mode 100644 validation_testing/deploy_infra.sh create mode 100644 validation_testing/run_glue.sh create mode 100644 validation_testing/run_tests.sh diff --git a/.github/workflows/run_release_tests.yml b/.github/workflows/run_release_tests.yml index 9c4ae76b10..0b22370866 100644 --- a/.github/workflows/run_release_tests.yml +++ b/.github/workflows/run_release_tests.yml @@ -45,3 +45,4 @@ jobs: DATABASE_PASSWORD: ${{ secrets.DATABASE_PASSWORD }} S3_DATA_PATH: ${{ secrets.S3_DATA_PATH }} S3_JARS_BUCKET: ${{ secrets.S3_JARS_BUCKET }} + SPILL_BUCKET: ${{ secrets.SPILL_BUCKET }} diff --git a/validation_testing/README.md b/validation_testing/README.md index d9ef2db194..d4b05b287c 100644 --- a/validation_testing/README.md +++ b/validation_testing/README.md @@ -2,7 +2,13 @@ This folder contains both a CDK package which developers can freely use to facilitate their own testing as well as a testing workflow for release tests. -To develop here, you shoould build your docker image first. assuming you are already in this directory, just run `sh build.sh` to build your image. Then run `IMAGE=federation-validation-testing ~/docker_images/env.sh bash` to start your container. +To develop here, you should build your docker image first. assuming you are already in this directory, just run `sh build.sh` to build your image. Then run `IMAGE=federation-validation-testing ~/docker_images/env.sh bash` to start your container. + +### Current Testable Connectors +- MySQL +- PostgreSQL +- DynamoDB +- Redshift ### CDK Work @@ -26,6 +32,11 @@ export RESULTS_LOCATION= export DATABASE_PASSWORD= export S3_DATA_PATH= export REPOSITORY_ROOT= +export SPILL_BUCKET= ``` -Then just run `python3 main.py`. If you want to change what connectors are tested, modify the array of connectors in `main.py`, but you can only use ones that already have a corresponding CDK stack and glue job ready. +Then, run `python3 main.py` to test all connectors. If you want to change what connectors are tested, modify the array of connectors in `main.py`, but you can only use ones that already have a corresponding CDK stack and glue job ready. + +### Local Development + +`sh run_release_tests.sh {CONNECTOR_NAME}` was split up to make it more accessible for local development. First, run `sh deploy_infra.sh {CONNECTOR_NAME}` to deploy database, connector, and etl jobs, then run `sh run_glue.sh {CONNECTOR_NAME}` to load test data (tpcds customer and customer_address tables). Then, run `sh run_tests.sh {CONNECTOR_NAME}` repeatedly for development. Run `sh cleanup.sh {CONNECTOR_NAME}` to cleanup resources. To add/remove tests, modify ./scripts/exec_release_test_queries.py diff --git a/validation_testing/cdk_federation_infra_provisioning/app/lib/app.ts b/validation_testing/cdk_federation_infra_provisioning/app/lib/app.ts index e4ea60f7e3..0e07965a77 100644 --- a/validation_testing/cdk_federation_infra_provisioning/app/lib/app.ts +++ b/validation_testing/cdk_federation_infra_provisioning/app/lib/app.ts @@ -14,6 +14,7 @@ const app = new cdk.App() const database_password: string = process.env!.DATABASE_PASSWORD as string; const s3_path: string = process.env!.S3_DATA_PATH as string; const repo_root: string = process.env!.REPOSITORY_ROOT as string; +const spill_bucket: string = process.env!.SPILL_BUCKET as string; // these names match the names of our connectors (athena-*) with the exception of opensearch, which is in development const MYSQL_NAME: string = 'mysql' @@ -31,6 +32,7 @@ new RdsGenericStack(app, `${MYSQL_NAME}CdkStack`, { db_type: MYSQL_NAME, db_port: 3306, s3_path: s3_path, + spill_bucket: spill_bucket, tpcds_tables: tables, password: database_password, connector_yaml_path: `${repo_root}/athena-${MYSQL_NAME}/athena-${MYSQL_NAME}.yaml` @@ -41,6 +43,7 @@ new RdsGenericStack(app, `${POSTGRES_NAME}CdkStack`, { db_type: POSTGRES_NAME, db_port: 5432, s3_path: s3_path, + spill_bucket: spill_bucket, tpcds_tables: tables, password: database_password, connector_yaml_path: `${repo_root}/athena-${POSTGRES_NAME}/athena-${POSTGRES_NAME}.yaml` @@ -49,6 +52,7 @@ new RdsGenericStack(app, `${POSTGRES_NAME}CdkStack`, { new DynamoDBStack(app, `${DYNAMO_DB_NAME}CdkStack`, { test_size_gigabytes: 1, s3_path: s3_path, + spill_bucket: spill_bucket, tpcds_tables: tables, password: database_password, connector_yaml_path: `${repo_root}/athena-${DYNAMO_DB_NAME}/athena-${DYNAMO_DB_NAME}.yaml` @@ -57,6 +61,7 @@ new DynamoDBStack(app, `${DYNAMO_DB_NAME}CdkStack`, { new RedshiftStack(app, `${REDSHIFT_NAME}CdkStack`, { test_size_gigabytes: 1, s3_path: s3_path, + spill_bucket: spill_bucket, tpcds_tables: tables, password: database_password, connector_yaml_path: `${repo_root}/athena-${REDSHIFT_NAME}/athena-${REDSHIFT_NAME}.yaml` @@ -64,6 +69,7 @@ new RedshiftStack(app, `${REDSHIFT_NAME}CdkStack`, { new OpenSearchStack(app, `${OPENSEARCH_NAME}CdkStack`, { test_size_gigabytes: 1, s3_path: s3_path, + spill_bucket: spill_bucket, tpcds_tables: tables, password: database_password, connector_yaml_path: `${repo_root}/athena-elasticsearch/athena-elasticsearch.yaml` diff --git a/validation_testing/cdk_federation_infra_provisioning/app/lib/stacks/dynamo-stack.ts b/validation_testing/cdk_federation_infra_provisioning/app/lib/stacks/dynamo-stack.ts index 88cff34dc4..0cd3bacb84 100644 --- a/validation_testing/cdk_federation_infra_provisioning/app/lib/stacks/dynamo-stack.ts +++ b/validation_testing/cdk_federation_infra_provisioning/app/lib/stacks/dynamo-stack.ts @@ -11,10 +11,10 @@ import {FederationStackProps} from './stack-props' export class DynamoDBStack extends cdk.Stack { constructor(scope: Construct, id: string, props?: FederationStackProps) { super(scope, id, props); - this.init_resources(props!.test_size_gigabytes, props!.s3_path, props!.tpcds_tables, props!.connector_yaml_path); + this.init_resources(props!.test_size_gigabytes, props!.s3_path, props!.spill_bucket, props!.tpcds_tables, props!.connector_yaml_path); } - init_resources(test_size_gigabytes: number, s3_path: string, tpcds_tables: string[], connector_yaml_path: string) { + init_resources(test_size_gigabytes: number, s3_path: string, spill_bucket: string, tpcds_tables: string[], connector_yaml_path: string) { var glue_job_role = new iam.Role(this, 'glue-job-managed-role', { managedPolicies: [ iam.ManagedPolicy.fromAwsManagedPolicyName("AdministratorAccess") @@ -53,7 +53,7 @@ export class DynamoDBStack extends cdk.Stack { templateFile: cfn_template_file, parameters: { AthenaCatalogName: 'dynamodb-cdk-deployed', - SpillBucket: 'amazon-athena-federation-perf-spill-bucket' + SpillBucket: spill_bucket } }); } diff --git a/validation_testing/cdk_federation_infra_provisioning/app/lib/stacks/opensearch-stack.ts b/validation_testing/cdk_federation_infra_provisioning/app/lib/stacks/opensearch-stack.ts index 0d7ea35cd9..5b2378e815 100644 --- a/validation_testing/cdk_federation_infra_provisioning/app/lib/stacks/opensearch-stack.ts +++ b/validation_testing/cdk_federation_infra_provisioning/app/lib/stacks/opensearch-stack.ts @@ -40,10 +40,10 @@ export class OpenSearchStack extends cdk.Stack { const volumeSizePerNode = getVolumeSize(props!.test_size_gigabytes); const dataInstanceType = getInstanceType(props!.test_size_gigabytes); super(scope, id, props); - this.init_resources(volumeSizePerNode, dataInstanceType, props!.test_size_gigabytes, props!.s3_path, props!.tpcds_tables, props!.password, props!.connector_yaml_path); + this.init_resources(volumeSizePerNode, dataInstanceType, props!.test_size_gigabytes, props!.s3_path, props!.spill_bucket, props!.tpcds_tables, props!.password, props!.connector_yaml_path); } - init_resources(volumeSizePerNode: number, dataNodeInstanceType: string, test_size_gigabytes: number, s3_path: string, tpcds_tables: string[], password: string, connector_yaml_path: string){ + init_resources(volumeSizePerNode: number, dataNodeInstanceType: string, test_size_gigabytes: number, s3_path: string, spill_bucket: string, tpcds_tables: string[], password: string, connector_yaml_path: string){ const vpc = new ec2.Vpc(this, 'opensearch_vpc', { ipAddresses: ec2.IpAddresses.cidr('15.0.0.0/24'), subnetConfiguration: [ @@ -173,7 +173,7 @@ export class OpenSearchStack extends cdk.Stack { 'DomainMapping': `default=${connectionString}`, 'SecurityGroupIds': [securityGroup.securityGroupId], 'SubnetIds': [subnet.subnetId], - 'SpillBucket': 'amazon-athena-federation-perf-spill-bucket', + 'SpillBucket': spill_bucket, } }); } diff --git a/validation_testing/cdk_federation_infra_provisioning/app/lib/stacks/rds-generic-stack.ts b/validation_testing/cdk_federation_infra_provisioning/app/lib/stacks/rds-generic-stack.ts index a014dc2d82..f18277edb0 100644 --- a/validation_testing/cdk_federation_infra_provisioning/app/lib/stacks/rds-generic-stack.ts +++ b/validation_testing/cdk_federation_infra_provisioning/app/lib/stacks/rds-generic-stack.ts @@ -16,10 +16,10 @@ export interface RdsGenericStackProps extends FederationStackProps { export class RdsGenericStack extends cdk.Stack { constructor(scope: Construct, id: string, props?: RdsGenericStackProps) { super(scope, id, props); - this.init_resources(props!.test_size_gigabytes, props!.s3_path, props!.password, props!.tpcds_tables, props!.db_port, props!.db_type, props!.connector_yaml_path); + this.init_resources(props!.test_size_gigabytes, props!.s3_path, props!.spill_bucket, props!.password, props!.tpcds_tables, props!.db_port, props!.db_type, props!.connector_yaml_path); } - init_resources(test_size_gigabytes: number, s3_path: string, password: string, tpcds_tables: string[], db_port: number, db_type: string, connector_yaml_path: string) { + init_resources(test_size_gigabytes: number, s3_path: string, spill_bucket: string, password: string, tpcds_tables: string[], db_port: number, db_type: string, connector_yaml_path: string) { const vpc = new ec2.Vpc(this, `${db_type}_vpc`, { ipAddresses: ec2.IpAddresses.cidr('10.0.0.0/24'), subnetConfiguration: [ @@ -131,7 +131,7 @@ export class RdsGenericStack extends cdk.Stack { 'DefaultConnectionString': `${connectionStringPrefix}://${connectionString}`, 'SecurityGroupIds': [securityGroup.securityGroupId], 'SubnetIds': [subnet.subnetId], - 'SpillBucket': 'amazon-athena-federation-perf-spill-bucket', + 'SpillBucket': spill_bucket, } }); } diff --git a/validation_testing/cdk_federation_infra_provisioning/app/lib/stacks/redshift-stack.ts b/validation_testing/cdk_federation_infra_provisioning/app/lib/stacks/redshift-stack.ts index b82b531067..0ff89bb136 100644 --- a/validation_testing/cdk_federation_infra_provisioning/app/lib/stacks/redshift-stack.ts +++ b/validation_testing/cdk_federation_infra_provisioning/app/lib/stacks/redshift-stack.ts @@ -18,6 +18,7 @@ export class RedshiftStack extends cdk.Stack { super(scope, id, props); const test_size_gigabytes = props!.test_size_gigabytes; const s3_path = props!.s3_path; + const spill_bucket = props!.spill_bucket; const tpcds_table_names = props!.tpcds_tables; const password = props!.password; const connector_yaml_path = props!.connector_yaml_path; @@ -71,7 +72,7 @@ export class RedshiftStack extends cdk.Stack { const s3Spill = new s3.Bucket(this, 'redshift_spill_location', {}); - const connectionString = `jdbc:redshift://${cluster.clusterEndpoint.socketAddress}/test`; + const connectionString = `jdbc:redshift://${cluster.clusterEndpoint.socketAddress}/test?user=athena&password=${password}`; const subnet = vpc.isolatedSubnets[0]; const glueConnection = new glue.Connection(this, 'redshift_glue_connection', { type: glue.ConnectionType.JDBC, @@ -114,16 +115,17 @@ export class RedshiftStack extends cdk.Stack { }); } + var connectionStringPrefix = 'redshift'; const cfn_template_file = connector_yaml_path; const connectorSubStack = new CfnInclude(this, 'RedshiftLambdaStack', { templateFile: cfn_template_file, parameters: { 'LambdaFunctionName': 'redshift-cdk-deployed', 'SecretNamePrefix': 'asdf', - 'DefaultConnectionString': connectionString, + 'DefaultConnectionString': `${connectionStringPrefix}://${connectionString}`, 'SecurityGroupIds': [securityGroup.securityGroupId], 'SubnetIds': [subnet.subnetId], - 'SpillBucket': 'amazon-athena-federation-perf-spill-bucket', + 'SpillBucket': spill_bucket, } }); } diff --git a/validation_testing/cdk_federation_infra_provisioning/app/lib/stacks/stack-props.ts b/validation_testing/cdk_federation_infra_provisioning/app/lib/stacks/stack-props.ts index 3564197f52..546e5d6dd8 100644 --- a/validation_testing/cdk_federation_infra_provisioning/app/lib/stacks/stack-props.ts +++ b/validation_testing/cdk_federation_infra_provisioning/app/lib/stacks/stack-props.ts @@ -3,6 +3,7 @@ import * as cdk from 'aws-cdk-lib'; export interface FederationStackProps extends cdk.StackProps { readonly test_size_gigabytes: number; readonly s3_path: string; // should be maintained via env var + readonly spill_bucket: string; readonly tpcds_tables: string[]; readonly password: string; // should be maintained via env var readonly connector_yaml_path: string; diff --git a/validation_testing/cleanup.sh b/validation_testing/cleanup.sh new file mode 100644 index 0000000000..16e624916a --- /dev/null +++ b/validation_testing/cleanup.sh @@ -0,0 +1,20 @@ +CONNECTOR_NAME=$1 +VALIDATION_TESTING_ROOT=$REPOSITORY_ROOT/validation_testing +RELEASE_TESTS_EXIT_CODE=$? + +# Glue attaches ENIs to the VPC subnet, if there is one, in order to connect to the VPC, and then will issue a delete of them later, but if they aren't finished deleting by the time we invoke cdk destroy, +# CFN will not know about the resources and not be able to delete the stack. So, we proactively look for and delete ENIs attached to glue ourselves first +aws ec2 describe-subnets --filters "Name=tag:Name,Values=${CONNECTOR_NAME}CdkStack*" | jq ".Subnets[].SubnetId" | tr -d '"' \ +| xargs -n 1 -I {} aws ec2 describe-network-interfaces --filters "Name=subnet-id,Values={}" | jq ".NetworkInterfaces[] | select(.Description | startswith(\"Attached to Glue\")) | .NetworkInterfaceId" | tr -d '"' \ +| xargs -I {} aws ec2 delete-network-interface --network-interface-id {} + +# once that is done, we can delete our CDK stack. +cd $(dirname $(find . -name ATHENA_INFRA_SPINUP_ROOT))/app; +# cannot use --force because npm is stripping the flags, so pipe yes through +yes | npm run cdk destroy ${CONNECTOR_NAME}CdkStack; + +echo "FINISHED CLEANING UP RESOURCES FOR ${CONNECTOR_NAME}." + +echo "FINISHED RELEASE TESTS FOR ${CONNECTOR_NAME}." + +exit $RELEASE_TESTS_EXIT_CODE \ No newline at end of file diff --git a/validation_testing/deploy_infra.sh b/validation_testing/deploy_infra.sh new file mode 100644 index 0000000000..a02961918b --- /dev/null +++ b/validation_testing/deploy_infra.sh @@ -0,0 +1,25 @@ + +CONNECTOR_NAME=$1 +VALIDATION_TESTING_ROOT=$REPOSITORY_ROOT/validation_testing + +# upload connector jar to s3 and update yaml to s3 uri, redirect to /dev/null to not log the s3 path +aws s3 cp $REPOSITORY_ROOT/athena-$CONNECTOR_NAME/target/athena-$CONNECTOR_NAME-2022.47.1.jar "$S3_JARS_BUCKET/" > /dev/null +sed -i "s#CodeUri: \"./target/athena-$CONNECTOR_NAME-2022.47.1.jar\"#CodeUri: \"$S3_JARS_BUCKET/athena-$CONNECTOR_NAME-2022.47.1.jar\"#" $REPOSITORY_ROOT/athena-$CONNECTOR_NAME/athena-$CONNECTOR_NAME.yaml + +# go to cdk dir, build/synth/deploy +cd $(dirname $(find . -name ATHENA_INFRA_SPINUP_ROOT))/app; + +cat < .env +DATABASE_PASSWORD=$DATABASE_PASSWORD +S3_DATA_PATH=$S3_PATH +REPOSITORY_ROOT=$REPOSITORY_ROOT +EOF + +npm install; +npm run build; +npm run cdk synth; +npm run cdk deploy ${CONNECTOR_NAME}CdkStack > /dev/null; + +sed -i "s#CodeUri: \"$S3_JARS_BUCKET/athena-$CONNECTOR_NAME-2022.47.1.jar\"#CodeUri: \"./target/athena-$CONNECTOR_NAME-2022.47.1.jar\"#" $REPOSITORY_ROOT/athena-$CONNECTOR_NAME/athena-$CONNECTOR_NAME.yaml + +echo "FINISHED DEPLOYING INFRA FOR ${CONNECTOR_NAME}." diff --git a/validation_testing/main.py b/validation_testing/main.py index 07091d3fbe..68c6504e1d 100644 --- a/validation_testing/main.py +++ b/validation_testing/main.py @@ -2,8 +2,7 @@ import os import sys -TESTABLE_CONNECTORS = ['dynamodb', 'mysql'] - +TESTABLE_CONNECTORS = ['dynamodb', 'mysql', 'postgresql', 'redshift'] def run_single_connector_release_tests(connector_name): shell_command = f'sh run_release_tests.sh {connector_name}' @@ -20,7 +19,8 @@ def assert_required_env_vars_set(): 'REPOSITORY_ROOT', 'DATABASE_PASSWORD', 'S3_DATA_PATH', - 'S3_JARS_BUCKET' + 'S3_JARS_BUCKET', + 'SPILL_BUCKET' ] if not all([os.environ.get(env_var) for env_var in required_env_vars]): raise RuntimeError("not all expected environment variables were set!") diff --git a/validation_testing/run_glue.sh b/validation_testing/run_glue.sh new file mode 100644 index 0000000000..191c5e889a --- /dev/null +++ b/validation_testing/run_glue.sh @@ -0,0 +1,16 @@ +CONNECTOR_NAME=$1 +VALIDATION_TESTING_ROOT=$REPOSITORY_ROOT/validation_testing + +# cd back to validation root +cd $VALIDATION_TESTING_ROOT + +# now we run the glue jobs that the CDK stack created +# If there is any output to glue_job_synchronous_execution.py, we will exit this script with a failure code. +# The 2>&1 lets us pipe both stdout and stderr to grep, as opposed to just the stdout. https://stackoverflow.com/questions/818255/what-does-21-mean +echo "Starting glue jobs..." +aws glue list-jobs --max-results 100 \ +| jq ".JobNames[] | select(startswith(\"${CONNECTOR_NAME}gluejob\"))" \ +| xargs -I{} python3 scripts/glue_job_synchronous_execution.py {} 2>&1 \ +| grep -q '.' && exit 1 + +echo "FINISHED RUNNING GLUE JOBS FOR ${CONNECTOR_NAME}." \ No newline at end of file diff --git a/validation_testing/run_release_tests.sh b/validation_testing/run_release_tests.sh index 28b8eba514..5b0a7c815b 100644 --- a/validation_testing/run_release_tests.sh +++ b/validation_testing/run_release_tests.sh @@ -25,6 +25,8 @@ npm run build; npm run cdk synth; npm run cdk deploy ${CONNECTOR_NAME}CdkStack > /dev/null; +sed -i "s#CodeUri: \"$S3_JARS_BUCKET/athena-$CONNECTOR_NAME-2022.47.1.jar\"#CodeUri: \"./target/athena-$CONNECTOR_NAME-2022.47.1.jar\"#" $REPOSITORY_ROOT/athena-$CONNECTOR_NAME/athena-$CONNECTOR_NAME.yaml + echo "FINISHED DEPLOYING INFRA FOR ${CONNECTOR_NAME}." # cd back to validation root diff --git a/validation_testing/run_tests.sh b/validation_testing/run_tests.sh new file mode 100644 index 0000000000..6a1acbcb3f --- /dev/null +++ b/validation_testing/run_tests.sh @@ -0,0 +1,7 @@ +CONNECTOR_NAME=$1 +VALIDATION_TESTING_ROOT=$REPOSITORY_ROOT/validation_testing + +CONNECTOR_LAMBDA_ARN=$(aws lambda get-function --function-name $CONNECTOR_NAME-cdk-deployed | jq ".Configuration.FunctionArn" | tr -d '"') # trim quotes from the json string output +python3 scripts/exec_release_test_queries.py $CONNECTOR_NAME $RESULTS_LOCATION $CONNECTOR_LAMBDA_ARN +RELEASE_TESTS_EXIT_CODE=$? +echo "FINISHED RUNNING TESTS FOR ${CONNECTOR_NAME}, exit code was $RELEASE_TESTS_EXIT_CODE." \ No newline at end of file diff --git a/validation_testing/scripts/exec_release_test_queries.py b/validation_testing/scripts/exec_release_test_queries.py index c90945eecc..8a646fd7dc 100644 --- a/validation_testing/scripts/exec_release_test_queries.py +++ b/validation_testing/scripts/exec_release_test_queries.py @@ -3,8 +3,8 @@ import time athena_client = boto3.client('athena') -connectors_supporting_complex_predicates = ['mysql'] -connectors_supporting_limit_pushdown = ['dynamodb', 'mysql'] +connectors_supporting_complex_predicates = ['mysql', 'postgresql', 'redshift'] +connectors_supporting_limit_pushdown = ['dynamodb', 'mysql', 'postgresql', 'redshift'] def run_query(query_string, results_location, workgroup_name, catalog, database): """ @@ -68,11 +68,10 @@ def test_ddl(connector_name, results_location, workgroup_name, catalog, database ''' select_from_view_query = f'SELECT * FROM cx_view_{connector_name} LIMIT 100;' drop_view_query = f'DROP VIEW cx_view_{connector_name};' - # view queries run_query(create_view_query, results_location, workgroup_name, catalog, database) - run_query(select_from_view_query, results_location, workgroup_name, 'awsdatacatalog', glue_db) - run_query(drop_view_query, results_location, workgroup_name, 'awsdatacatalog', glue_db) + run_query(select_from_view_query, results_location, workgroup_name, 'AwsDataCatalog', glue_db) + run_query(drop_view_query, results_location, workgroup_name, 'AwsDataCatalog', glue_db) print('[INFO] - Successfully ran view queries.') # other ddl queries @@ -99,12 +98,8 @@ def test_dml(connector_name, results_location, workgroup_name, catalog, database assert_condition(predicate_pushdown_data_scanned < select_star_data_scanned, f'FAILURE CONDITION - predicate pushdown scans at least as much data as select *. Query ids: {select_star_qid}, {predicate_pushdown_qid}') if connector_name in connectors_supporting_complex_predicates: - # ensure complex predicate pushdown occurs and has less data scanned than select * and simple predicate pushdown - complex_predicate_pushdown_qid = run_query('select * from customer where c_customer_sk < 100 AND ((c_current_hdemo_sk + c_current_cdemo_sk) % 2 = 0) limit 100000', results_location, workgroup_name, catalog, database) - complex_predicate_pushdown_data_scanned = get_data_scanned(complex_predicate_pushdown_qid) - print(f'[INFO] - complex predicate pushdown query ran successfully, scanned {complex_predicate_pushdown_data_scanned} bytes') - assert_condition(complex_predicate_pushdown_data_scanned < select_star_data_scanned, f'FAILURE CONDITION - complex predicate pushdown scans at least as much data as select *. Query ids: {select_star_qid}, {complex_predicate_pushdown_qid}') - assert_condition(complex_predicate_pushdown_data_scanned < predicate_pushdown_data_scanned, f'FAILURE CONDITION - expected complex expression to reduce data scanned compared to simple predicate but it did not. Query ids: {predicate_pushdown_qid}, {complex_predicate_pushdown_qid}') + # ensure complex predicate pushdown occurs and has less data scanned than select * and/or simple predicate pushdown + complex_predicate_pushdown_tests(select_star_qid, predicate_pushdown_qid, results_location, workgroup_name, catalog, database) else: print(f'[INFO] - skipping complex predicate tests for {connector_name}, not supported.') @@ -117,6 +112,43 @@ def test_dml(connector_name, results_location, workgroup_name, catalog, database else: print(f'[INFO] - skipping limit pushdown tests for {connector_name}, not supported.') +def complex_predicate_pushdown_tests(select_star_qid, predicate_pushdown_qid, results_location, workgroup_name, catalog, database): + query_one = 'select * from customer where c_customer_sk < 100 AND ((c_current_hdemo_sk + c_current_cdemo_sk) % 2 = 0) limit 100000' + query_two = ''' + SELECT * FROM customer WHERE + c_customer_sk < 100 AND ((c_current_hdemo_sk + c_current_cdemo_sk) % 2 = 0) AND + ((c_first_sales_date_sk >= 2452000 AND c_birth_country in ('CHILE','TOGO','PHILIPPINES')) + OR (c_first_sales_date_sk >= 2452000 AND c_first_sales_date_sk <= 2453000) + OR (c_first_shipto_date_sk >= 2449000 AND c_first_shipto_date_sk <= 2451000)); + ''' + + # c_birth_country in ('CHILE','TOGO','PHILIPPINES') -> (c_birth_country = 'CHILE' OR c_birth_country = 'TOGO' OR c_birth_country = 'PHILIPPINES') + query_three = ''' + SELECT * FROM customer WHERE + c_customer_sk < 100 AND ((c_current_hdemo_sk + c_current_cdemo_sk) % 2 = 0) AND + ((c_first_sales_date_sk >= 2452000 AND (c_birth_country = 'CHILE' OR c_birth_country = 'TOGO' OR c_birth_country = 'PHILIPPINES')) + OR (c_first_sales_date_sk >= 2452000 AND c_first_sales_date_sk <= 2453000) + OR (c_first_shipto_date_sk >= 2449000 AND c_first_shipto_date_sk <= 2451000)); + ''' + + queries = [query_one, query_two, query_three] + + select_star_data_scanned = get_data_scanned(select_star_qid) + predicate_pushdown_data_scanned = get_data_scanned(predicate_pushdown_qid) + + for i, q in enumerate(queries): + test_complex_predicate_pushdown(q, i+1, select_star_qid, select_star_data_scanned, predicate_pushdown_qid, predicate_pushdown_data_scanned, results_location, workgroup_name, catalog, database) + + +def test_complex_predicate_pushdown(query, testnumber, select_star_qid, select_star_data_scanned, predicate_pushdown_qid, predicate_pushdown_data_scanned, results_location, workgroup_name, catalog, database): + # ensure complex predicate pushdown occurs and has less data scanned than select * and simple predicate pushdown + complex_predicate_pushdown_qid = run_query(query, results_location, workgroup_name, catalog, database) + complex_predicate_pushdown_data_scanned = get_data_scanned(complex_predicate_pushdown_qid) + print(f'[INFO] - complex predicate pushdown {testnumber} query ran successfully, scanned {complex_predicate_pushdown_data_scanned} bytes') + assert_condition(complex_predicate_pushdown_data_scanned < select_star_data_scanned, f'FAILURE CONDITION - complex predicate pushdown {testnumber} scans at least as much data as select *. Query ids: {select_star_qid}, {complex_predicate_pushdown_qid}') + assert_condition(complex_predicate_pushdown_data_scanned < predicate_pushdown_data_scanned, f'FAILURE CONDITION - expected complex expression {testnumber} to reduce data scanned compared to simple predicate but it did not. Query ids: {predicate_pushdown_qid}, {complex_predicate_pushdown_qid}') + + def assert_condition(success_condition, failure_message): if not success_condition: raise RuntimeError(failure_message) @@ -141,7 +173,9 @@ def run_queries(catalog, connector_name, results_location, lambda_arn): # db names differ for the datasource, maintain mapping db_mapping = {'dynamodb': 'default', - 'mysql': 'test' + 'mysql': 'test', + 'postgresql': 'public', + 'redshift': 'public' } workgroup_name = 'primary' database = db_mapping[connector_name] @@ -163,3 +197,4 @@ def run_queries(catalog, connector_name, results_location, lambda_arn): finally: delete_data_source(catalog) +