Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add case insensitive schema functionality #1539

Merged
merged 1 commit into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -142,8 +143,21 @@ public static Map<String, JdbcRecordHandler> createJdbcRecordHandlerMap(Map<Stri
public static TableName informationSchemaCaseInsensitiveTableMatch(Connection connection, final String databaseName,
final String tableName) throws Exception
{
// Gets case insensitive schema name
String resolvedSchemaName = null;
PreparedStatement statement = getSchemaNameQuery(connection, databaseName);
try (ResultSet resultSet = statement.executeQuery()) {
if (resultSet.next()) {
resolvedSchemaName = resultSet.getString("schema_name");
}
else {
throw new RuntimeException(String.format("During SCHEMA Case Insensitive look up could not find Database '%s'", databaseName));
}
}

// passes actual cased schema name to query for tableName
String resolvedName = null;
PreparedStatement statement = getTableNameQuery(connection, tableName, databaseName);
statement = getTableNameQuery(connection, tableName, resolvedSchemaName);
try (ResultSet resultSet = statement.executeQuery()) {
if (resultSet.next()) {
resolvedName = resultSet.getString("table_name");
Expand All @@ -153,10 +167,11 @@ public static TableName informationSchemaCaseInsensitiveTableMatch(Connection co
LOGGER.info("Resolved name from Case Insensitive look up : {}", resolvedName);
}
else {
throw new RuntimeException(String.format("During Case Insensitive look up could not find Table '%s' in Database '%s'", tableName, databaseName));
throw new RuntimeException(String.format("During TABLE Case Insensitive look up could not find Table '%s' in Database '%s'", tableName, databaseName));
}
}
return new TableName(databaseName, resolvedName);

return new TableName(resolvedSchemaName, resolvedName);
}

public static PreparedStatement getTableNameQuery(Connection connection, String tableName, String databaseName) throws SQLException
Expand All @@ -170,6 +185,15 @@ public static PreparedStatement getTableNameQuery(Connection connection, String
return preparedStatement;
}

public static PreparedStatement getSchemaNameQuery(Connection connection, String databaseName) throws SQLException
{
String sql = "SELECT schema_name FROM information_schema.schemata WHERE (schema_name = ? or lower(schema_name) = ?)";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.setString(1, databaseName);
preparedStatement.setString(2, databaseName);
return preparedStatement;
}

public static List<TableName> getTables(Connection connection, String databaseName) throws SQLException
{
String tablesAndViews = "Tables and Views";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ protected List<TableName> listTables(final Connection jdbcConnection, final Stri
return list.build();
}

private String caseInsensitiveNameResolver(PreparedStatement preparedStatement, String tableName, String databaseName) throws SQLException
protected String caseInsensitiveNameResolver(PreparedStatement preparedStatement, String tableName, String databaseName) throws SQLException
{
String resolvedName = null;
try (ResultSet resultSet = preparedStatement.executeQuery()) {
Expand All @@ -289,21 +289,45 @@ protected TableName caseInsensitiveTableSearch(Connection connection, final Stri
return caseInsensitiveTableMaterialViewMatch(connection, databaseName, tableName);
}

protected String caseInsensitiveSchemaResolver(Connection connection, String databaseName) throws SQLException
{
String sql = "SELECT schema_name FROM information_schema.schemata WHERE (schema_name = ? or lower(schema_name) = ?)";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.setString(1, databaseName);
preparedStatement.setString(2, databaseName);

String resolvedSchemaName = null;
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet.next()) {
resolvedSchemaName = resultSet.getString("schema_name");
LOGGER.info("Resolved Schema from Case Insensitive look up : {}", resolvedSchemaName);
}
}
return resolvedSchemaName;
}

public TableName caseInsensitiveTableMaterialViewMatch(Connection connection, final String databaseName,
final String tableName) throws Exception
{
String resolvedName;
PreparedStatement preparedStatement = JDBCUtil.getTableNameQuery(connection, tableName, databaseName);
resolvedName = caseInsensitiveNameResolver(preparedStatement, tableName, databaseName);

if (resolvedName == null) {
preparedStatement = getMaterializedViewCaseInsensitive(connection, tableName, databaseName);
resolvedName = caseInsensitiveNameResolver(preparedStatement, tableName, databaseName);
if (resolvedName == null) {
throw new RuntimeException(String.format("During Case Insensitive look up could not find '%s' in Database '%s'", tableName, databaseName));
String resolvedSchemaName = caseInsensitiveSchemaResolver(connection, databaseName);
if (resolvedSchemaName == null) {
throw new RuntimeException(String.format("During SCHEMA Case Insensitive look up could not find Database '%s'", databaseName));
}

String resolvedTableName;
PreparedStatement preparedStatement = JDBCUtil.getTableNameQuery(connection, tableName, resolvedSchemaName);
resolvedTableName = caseInsensitiveNameResolver(preparedStatement, tableName, resolvedSchemaName);

if (resolvedTableName == null) {
LOGGER.info(String.format("'%s' not found in case insensitive table look up. Looking for '%s' as case insensitive materialized view", tableName, tableName));

preparedStatement = getMaterializedViewCaseInsensitive(connection, tableName, resolvedSchemaName);
resolvedTableName = caseInsensitiveNameResolver(preparedStatement, tableName, resolvedSchemaName);
if (resolvedTableName == null) {
throw new RuntimeException(String.format("During Case Insensitive look up could not find '%s' in Database '%s'", tableName, resolvedSchemaName));
}
}
return new TableName(databaseName, resolvedName);
return new TableName(resolvedSchemaName, resolvedTableName);
}

private int decodeContinuationToken(GetSplitsRequest request)
Expand Down Expand Up @@ -366,7 +390,7 @@ private List<TableName> getMaterializedViews(Connection connection, String datab
return JDBCUtil.getTableMetadata(preparedStatement, materializedViews);
}

private PreparedStatement getMaterializedViewCaseInsensitive(Connection connection, String matviewname, String databaseName) throws SQLException
protected PreparedStatement getMaterializedViewCaseInsensitive(Connection connection, String matviewname, String databaseName) throws SQLException
{
String sql = "select matviewname as \"TABLE_NAME\" from pg_catalog.pg_matviews mv where (matviewname = ? or lower(matviewname) = ?) and schemaname = ?";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,11 +328,23 @@ public void doGetTableWithArrayColumns()
.forEach(expectedSchemaBuilder::addField);
Schema expected = expectedSchemaBuilder.build();

ResultSet caseInsensitiveSchemaResult = Mockito.mock(ResultSet.class);
String sql = "SELECT schema_name FROM information_schema.schemata WHERE (schema_name = ? or lower(schema_name) = ?)";
PreparedStatement preparedSchemaStatement = connection.prepareStatement(sql);
preparedSchemaStatement.setString(1, "testschema");
preparedSchemaStatement.setString(2, "testschema");

String[] columnNames = new String[] {"schema_name"};
String[][] tableNameValues = new String[][]{new String[] {"testSchema"}};
caseInsensitiveSchemaResult = mockResultSet(columnNames, tableNameValues, new AtomicInteger(-1));

Mockito.when(preparedSchemaStatement.executeQuery()).thenReturn(caseInsensitiveSchemaResult);

TableName inputTableName = new TableName("testSchema", "testtable");
String[] columnNames = new String[] {"table_name"};
String[][] tableNameValues = new String[][]{new String[] {"testTable"}};
columnNames = new String[] {"table_name"};
tableNameValues = new String[][]{new String[] {"testTable"}};
ResultSet resultSetName = mockResultSet(columnNames, tableNameValues, new AtomicInteger(-1));
String sql = "SELECT table_name FROM information_schema.tables WHERE (table_name = ? or lower(table_name) = ?) AND table_schema = ?";
sql = "SELECT table_name FROM information_schema.tables WHERE (table_name = ? or lower(table_name) = ?) AND table_schema = ?";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.setString(1, "testtable");
preparedStatement.setString(2, "testtable");
Expand Down Expand Up @@ -382,16 +394,29 @@ public void doGetTableMaterializedView()
.forEach(expectedSchemaBuilder::addField);
Schema expected = expectedSchemaBuilder.build();

// Simulates table look up in information_schema.schemata and returns empty result set
ResultSet caseInsensitiveSchemaResult = Mockito.mock(ResultSet.class);
String sql = "SELECT schema_name FROM information_schema.schemata WHERE (schema_name = ? or lower(schema_name) = ?)";
PreparedStatement preparedSchemaStatement = connection.prepareStatement(sql);
preparedSchemaStatement.setString(1, "testschema");
preparedSchemaStatement.setString(2, "testschema");

String[] columnNames = new String[] {"schema_name"};
String[][] tableNameValues = new String[][]{new String[] {"testSchema"}};
caseInsensitiveSchemaResult = mockResultSet(columnNames, tableNameValues, new AtomicInteger(-1));

Mockito.when(preparedSchemaStatement.executeQuery()).thenReturn(caseInsensitiveSchemaResult);

// Simulates table look up in information_schema.tables and returns empty result set, because materialized views are stored separately
ResultSet resultSetName = Mockito.mock(ResultSet.class);
String sql = "SELECT table_name FROM information_schema.tables WHERE (table_name = ? or lower(table_name) = ?) AND table_schema = ?";
ResultSet caseInsensitiveTableResult = Mockito.mock(ResultSet.class);
sql = "SELECT table_name FROM information_schema.tables WHERE (table_name = ? or lower(table_name) = ?) AND table_schema = ?";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.setString(1, "testmatview");
preparedStatement.setString(2, "testmatview");
preparedStatement.setString(3, "testSchema");

Mockito.when(preparedStatement.executeQuery()).thenReturn(resultSetName);
Mockito.when(resultSetName.next()).thenReturn(false);
Mockito.when(preparedStatement.executeQuery()).thenReturn(caseInsensitiveTableResult);
Mockito.when(caseInsensitiveTableResult.next()).thenReturn(false);

// Simulates Materialized View look up in pg_catalog.pgmatviews system table
sql = "select matviewname as \"TABLE_NAME\" from pg_catalog.pg_matviews mv where (matviewname = ? or lower(matviewname) = ?) and schemaname = ?";
Expand All @@ -401,11 +426,11 @@ public void doGetTableMaterializedView()
preparedStatement.setString(3, "testSchema");

TableName inputTableName = new TableName("testSchema", "testmatview");
String[] columnNames = new String[] {"table_name"};
String[][] tableNameValues = new String[][]{new String[] {"testMatView"}};
resultSetName = mockResultSet(columnNames, tableNameValues, new AtomicInteger(-1));
Mockito.when(preparedStatement.executeQuery()).thenReturn(resultSetName);
columnNames = new String[] {"table_name"};
tableNameValues = new String[][]{new String[] {"testMatView"}};
caseInsensitiveTableResult = mockResultSet(columnNames, tableNameValues, new AtomicInteger(-1));

Mockito.when(preparedStatement.executeQuery()).thenReturn(caseInsensitiveTableResult);

String resolvedTableName = "testMatView";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import com.google.common.collect.ImmutableMap;
import org.apache.arrow.util.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;

Expand All @@ -54,6 +60,8 @@
public class RedshiftMetadataHandler
extends PostGreSqlMetadataHandler
{
private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftMetadataHandler.class);

/**
* Instantiates handler to be used by Lambda function directly.
*
Expand All @@ -75,6 +83,24 @@ public RedshiftMetadataHandler(DatabaseConnectionConfig databaseConnectionConfig
super(databaseConnectionConfig, secretsManager, athena, jdbcConnectionFactory, configOptions);
}

@Override
public String caseInsensitiveSchemaResolver(Connection connection, String databaseName) throws SQLException
{
String sql = "SELECT nspname FROM pg_namespace WHERE (nspname = ? or lower(nspname) = ?)";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.setString(1, databaseName);
preparedStatement.setString(2, databaseName);

String resolvedSchemaName = null;
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet.next()) {
resolvedSchemaName = resultSet.getString("nspname");
LOGGER.info("Resolved Schema from Case Insensitive look up : {}", resolvedSchemaName);
}
}
return resolvedSchemaName;
}

@Override
public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAllocator allocator, GetDataSourceCapabilitiesRequest request)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,11 +330,23 @@ public void doGetTableWithArrayColumns()
.forEach(expectedSchemaBuilder::addField);
Schema expected = expectedSchemaBuilder.build();

ResultSet caseInsensitiveSchemaResult = Mockito.mock(ResultSet.class);
String sql = "SELECT nspname FROM pg_namespace WHERE (nspname = ? or lower(nspname) = ?)";
PreparedStatement preparedSchemaStatement = connection.prepareStatement(sql);
preparedSchemaStatement.setString(1, "testschema");
preparedSchemaStatement.setString(2, "testschema");

String[] columnNames = new String[] {"nspname"};
String[][] tableNameValues = new String[][]{new String[] {"testSchema"}};
caseInsensitiveSchemaResult = mockResultSet(columnNames, tableNameValues, new AtomicInteger(-1));

Mockito.when(preparedSchemaStatement.executeQuery()).thenReturn(caseInsensitiveSchemaResult);

TableName inputTableName = new TableName("testSchema", "testtable");
String[] columnNames = new String[] {"table_name"};
String[][] tableNameValues = new String[][]{new String[] {"testTable"}};
columnNames = new String[] {"table_name"};
tableNameValues = new String[][]{new String[] {"testTable"}};
ResultSet resultSetName = mockResultSet(columnNames, tableNameValues, new AtomicInteger(-1));
String sql = "SELECT table_name FROM information_schema.tables WHERE (table_name = ? or lower(table_name) = ?) AND table_schema = ?";
sql = "SELECT table_name FROM information_schema.tables WHERE (table_name = ? or lower(table_name) = ?) AND table_schema = ?";
PreparedStatement preparedStatement = this.connection.prepareStatement(sql);
preparedStatement.setString(1, "testtable");
preparedStatement.setString(2, "testtable");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import sys
import pymysql
import pymysql.cursors
from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv,
['db_url',
'username',
'password'])

connection = pymysql.connect(host=args['db_url'], user=args['username'], password=args['password'])
cursor = connection.cursor()

cursor.execute('CREATE DATABASE camelCaseTest')
cursor.execute('USE camelCaseTest')
cursor.execute('CREATE TABLE camelCase (ID int)')
cursor.execute('INSERT INTO camelCase VALUES (5)')
cursor.execute('CREATE TABLE UPPERCASE (ID int)')
cursor.execute('INSERT INTO UPPERCASE VALUES (7)')

cursor.execute('CREATE DATABASE UPPERCASETEST')
cursor.execute('USE UPPERCASETEST')
cursor.execute('CREATE TABLE camelCase (ID int)')
cursor.execute('INSERT INTO camelCase VALUES (4)')
cursor.execute('CREATE TABLE UPPERCASE (ID int)')
cursor.execute('INSERT INTO UPPERCASE VALUES (6)')

Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import sys
from awsglue.utils import getResolvedOptions
import pg

args = getResolvedOptions(sys.argv,
['db_url',
'username',
'password'])

connection = pg.DB( host=args['db_url'], user=args['username'], passwd=args['password'], dbname='test')

connection.query('CREATE SCHEMA "camelCaseTest"')
connection.query('CREATE TABLE "camelCaseTest"."camelCase" (ID int)')
connection.query('INSERT INTO "camelCaseTest"."camelCase" VALUES (5)')
connection.query('CREATE TABLE "camelCaseTest"."UPPERCASE" (ID int)')
connection.query('INSERT INTO "camelCaseTest"."UPPERCASE" VALUES (7)')

connection.query('CREATE SCHEMA "UPPERCASETEST"')
connection.query('CREATE TABLE "UPPERCASETEST"."camelCase" (ID int)')
connection.query('INSERT INTO "UPPERCASETEST"."camelCase" VALUES (4)')
connection.query('CREATE TABLE "UPPERCASETEST"."UPPERCASE" (ID int)')
connection.query('INSERT INTO "UPPERCASETEST"."UPPERCASE" VALUES (6)')

connection.query('CREATE MATERIALIZED VIEW "UPPERCASETEST"."camelCaseView" AS SELECT * FROM "camelCaseTest"."camelCase"')
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def process_single_table(table_name):
connection_type='redshift',
connection_options={
'url': args['db_url'],
'dbtable': table_name,
'dbtable': "public."+table_name,
'user': args['username'],
'password': args['password'],
'redshiftTmpDir': args['redshiftTmpDir']
Expand Down
Loading
Loading