Skip to content

Commit

Permalink
Oracle Data Types Clean up (#2453)
Browse files Browse the repository at this point in the history
Co-authored-by: AbdulRehman Faraj <[email protected]>
Co-authored-by: Trianz-Akshay <[email protected]>
  • Loading branch information
3 people authored Dec 13, 2024
1 parent 4a41f7e commit d0376e4
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import oracle.jdbc.OracleTypes;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
Expand All @@ -73,7 +74,6 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -353,87 +353,61 @@ private Schema getSchema(Connection jdbcConnection, TableName tableName, Schema
{
SchemaBuilder schemaBuilder = SchemaBuilder.newBuilder();

try (ResultSet resultSet = getColumns(jdbcConnection.getCatalog(), tableName, jdbcConnection.getMetaData());
Connection connection = getJdbcConnectionFactory().getConnection(getCredentialProvider())) {
boolean found = false;
HashMap<String, String> hashMap = new HashMap<String, String>();
/**
* Getting original data type from oracle table for conversion
*/
try
(PreparedStatement stmt = connection.prepareStatement("select COLUMN_NAME ,DATA_TYPE from USER_TAB_COLS where table_name =?")) {
stmt.setString(1, transformString(tableName.getTableName(), true));
ResultSet dataTypeResultSet = stmt.executeQuery();
while (dataTypeResultSet.next()) {
hashMap.put(dataTypeResultSet.getString(COLUMN_NAME).trim(), dataTypeResultSet.getString("DATA_TYPE").trim());
}
LOGGER.debug("hashMap", hashMap.toString());
while (resultSet.next()) {
ArrowType columnType = JdbcArrowTypeConverter.toArrowType(
resultSet.getInt("DATA_TYPE"),
resultSet.getInt("COLUMN_SIZE"),
resultSet.getInt("DECIMAL_DIGITS"),
configOptions);
String columnName = resultSet.getString(COLUMN_NAME);
/** Handling TIMESTAMP,DATE, 0 Precesion**/
if (columnType != null && columnType.getTypeID().equals(ArrowType.ArrowTypeID.Decimal)) {
String[] data = columnType.toString().split(",");
if (data[0].contains("0") || data[1].contains("0")) {
columnType = Types.MinorType.BIGINT.getType();
}

/** Handling negative scale issue */
if (Integer.parseInt(data[1].trim().replace(")", "")) < 0.0) {
columnType = Types.MinorType.VARCHAR.getType();
}
try (ResultSet resultSet = getColumns(jdbcConnection.getCatalog(), tableName, jdbcConnection.getMetaData())) {
while (resultSet.next()) {
ArrowType arrowColumnType = JdbcArrowTypeConverter.toArrowType(
resultSet.getInt("DATA_TYPE"),
resultSet.getInt("COLUMN_SIZE"),
resultSet.getInt("DECIMAL_DIGITS"),
configOptions);

String columnName = resultSet.getString(COLUMN_NAME);
int jdbcColumnType = resultSet.getInt("DATA_TYPE");
int scale = resultSet.getInt("COLUMN_SIZE");

LOGGER.debug("columnName: {}", columnName);
LOGGER.debug("arrowColumnType: {}", arrowColumnType);
LOGGER.debug("jdbcColumnType: {}", jdbcColumnType);

/**
* below data type conversion doing since a framework not giving appropriate
* data types for oracle data types.
*/

/** Handling TIMESTAMP, DATE, 0 Precision **/
if (arrowColumnType != null && arrowColumnType.getTypeID().equals(ArrowType.ArrowTypeID.Decimal)) {
String[] data = arrowColumnType.toString().split(",");
if (scale == 0 || Integer.parseInt(data[1].trim()) < 0) {
arrowColumnType = Types.MinorType.BIGINT.getType();
}
}

String dataType = hashMap.get(columnName);
LOGGER.debug("columnName: " + columnName);
LOGGER.debug("dataType: " + dataType);
/**
* below data type conversion doing since framework not giving appropriate
* data types for oracle data types..
*/
/**
* Converting oracle date data type into DATEDAY MinorType
*/
if (dataType != null && (dataType.contains("date") || dataType.contains("DATE"))) {
columnType = Types.MinorType.DATEDAY.getType();
}
/**
* Converting oracle NUMBER data type into BIGINT MinorType
*/
if (dataType != null && (dataType.contains("NUMBER")) && columnType.getTypeID().toString().equalsIgnoreCase("Utf8")) {
columnType = Types.MinorType.BIGINT.getType();
}
/**
* Converting an Oracle date data type into DATEDAY MinorType
*/
if (jdbcColumnType == java.sql.Types.TIMESTAMP && scale == 7) {
arrowColumnType = Types.MinorType.DATEDAY.getType();
}

/**
* Converting oracle TIMESTAMP data type into DATEMILLI MinorType
*/
if (dataType != null && (dataType.contains("TIMESTAMP"))
) {
columnType = Types.MinorType.DATEMILLI.getType();
}
if (columnType == null) {
columnType = Types.MinorType.VARCHAR.getType();
}
if (columnType != null && !SupportedTypes.isSupported(columnType)) {
columnType = Types.MinorType.VARCHAR.getType();
}
/**
* Converting an Oracle TIMESTAMP_WITH_TZ & TIMESTAMP_WITH_LOCAL_TZ data type into DATEMILLI MinorType
*/
if (jdbcColumnType == OracleTypes.TIMESTAMPLTZ || jdbcColumnType == OracleTypes.TIMESTAMPTZ) {
arrowColumnType = Types.MinorType.DATEMILLI.getType();
}

if (columnType != null && SupportedTypes.isSupported(columnType)) {
schemaBuilder.addField(FieldBuilder.newBuilder(columnName, columnType).build());
found = true;
}
else {
LOGGER.error("getSchema: Unable to map type for column[" + columnName + "] to a supported type, attempted " + columnType);
}
if (arrowColumnType != null && !SupportedTypes.isSupported(arrowColumnType)) {
LOGGER.warn("getSchema: Unable to map type JDBC type [{}] for column[{}] to a supported type, attempted {}", jdbcColumnType, columnName, arrowColumnType);
arrowColumnType = Types.MinorType.VARCHAR.getType();
}

if (arrowColumnType == null) {
LOGGER.warn("getSchema: column[{}] type is null setting it to varchar | JDBC Type is [{}]", columnName, jdbcColumnType);
arrowColumnType = Types.MinorType.VARCHAR.getType();
}
schemaBuilder.addField(FieldBuilder.newBuilder(columnName, arrowColumnType).build());
}
if (!found) {
throw new RuntimeException("Could not find table in " + tableName.getSchemaName());
}

partitionSchema.getFields().forEach(schemaBuilder::addField);
LOGGER.debug("Oracle Table Schema" + schemaBuilder.toString());
return schemaBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig;
import com.amazonaws.athena.connectors.jdbc.connection.JdbcConnectionFactory;
import com.amazonaws.athena.connectors.jdbc.connection.JdbcCredentialProvider;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.Assert;
Expand Down Expand Up @@ -312,7 +313,7 @@ public void doGetTable()
BlockAllocator blockAllocator = new BlockAllocatorImpl();
String[] schema = {"DATA_TYPE", "COLUMN_SIZE", "COLUMN_NAME", "DECIMAL_DIGITS", "NUM_PREC_RADIX"};
Object[][] values = {{Types.INTEGER, 12, "testCol1", 0, 0}, {Types.VARCHAR, 25, "testCol2", 0, 0},
{Types.TIMESTAMP, 93, "testCol3", 0, 0}, {Types.TIMESTAMP_WITH_TIMEZONE, 93, "testCol4", 0, 0}};
{Types.TIMESTAMP, 93, "testCol3", 0, 0}, {Types.TIMESTAMP_WITH_TIMEZONE, 93, "testCol4", 0, 0}, {Types.NUMERIC, 10, "testCol5", 2, 0}};
AtomicInteger rowNumber = new AtomicInteger(-1);
ResultSet resultSet = mockResultSet(schema, values, rowNumber);

Expand All @@ -321,6 +322,8 @@ public void doGetTable()
expectedSchemaBuilder.addField(FieldBuilder.newBuilder("testCol2", org.apache.arrow.vector.types.Types.MinorType.VARCHAR.getType()).build());
expectedSchemaBuilder.addField(FieldBuilder.newBuilder("testCol3", org.apache.arrow.vector.types.Types.MinorType.DATEMILLI.getType()).build());
expectedSchemaBuilder.addField(FieldBuilder.newBuilder("testCol4", org.apache.arrow.vector.types.Types.MinorType.VARCHAR.getType()).build());
ArrowType.Decimal testCol5ArrowType = ArrowType.Decimal.createDecimal(10, 2, 128);
expectedSchemaBuilder.addField(FieldBuilder.newBuilder("testCol5", testCol5ArrowType).build());
PARTITION_SCHEMA.getFields().forEach(expectedSchemaBuilder::addField);
Schema expected = expectedSchemaBuilder.build();

Expand Down

0 comments on commit d0376e4

Please sign in to comment.