Skip to content

Commit

Permalink
[SPARK-47628][SQL] Fix Postgres bit array issue 'Cannot cast to boolean'
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR fixes the below error when reading the bit array from Postgres.

```
[info]   Cause: org.postgresql.util.PSQLException: Cannot cast to boolean: "10101"
[info]   at org.postgresql.jdbc.BooleanTypeUtil.cannotCoerceException(BooleanTypeUtil.java:99)
[info]   at org.postgresql.jdbc.BooleanTypeUtil.fromString(BooleanTypeUtil.java:67)
[info]   at org.postgresql.jdbc.ArrayDecoding$7.parseValue(ArrayDecoding.java:267)
[info]   at org.postgresql.jdbc.ArrayDecoding$AbstractObjectStringArrayDecoder.populateFromString(ArrayDecoding.java:128)
[info]   at org.postgresql.jdbc.ArrayDecoding.readStringArray(ArrayDecoding.java:763)
[info]   at org.postgresql.jdbc.PgArray.buildArray(PgArray.java:320)
[info]   at org.postgresql.jdbc.PgArray.getArrayImpl(PgArray.java:179)
[info]   at org.postgresql.jdbc.PgArray.getArray(PgArray.java:116)
[info]   at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$25(JdbcUtils.scala:548)
[info]   at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.nullSafeConvert(JdbcUtils.scala:561)
[info]   at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$24(JdbcUtils.scala:548)
[info]   at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$24$adapted(JdbcUtils.scala:545)
[info]   at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:365)
[info]   at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:346)
[info]   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
[info]   at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
```

The issue is caused by both an upstream limitation and an improper mapping on our side.

The issue of Postges' own is that it does not distinguish bit(1) and bit(n>1) arrays and gets them both as boolean arrays, which causes a cast error on our task execution side.

The issue of our own is similar. We map both bit(1)[] and bit(n>1)[] as `ArrayType(BinaryType)`. It is exactly the opposite of Postgres' behaviour.

This PR fixes the mapping and makes a special getter for bit(n>1)[] values to fix both of the problems

### Why are the changes needed?

bugfix

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #45751 from yaooqinn/SPARK-47628.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
yaooqinn authored and dongjoon-hyun committed Mar 28, 2024
1 parent 8c4d676 commit 4b58a63
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
"CREATE FUNCTION test_null() RETURNS VOID AS $$ BEGIN RETURN; END; $$ LANGUAGE plpgsql")
.executeUpdate()

conn.prepareStatement("CREATE TABLE test_bit_array (c1 bit(1)[], c2 bit(5)[])").executeUpdate()
conn.prepareStatement("INSERT INTO test_bit_array VALUES (ARRAY[B'1', B'0'], " +
"ARRAY[B'00001', B'00010'])").executeUpdate()
}

test("Type mapping for various types") {
Expand Down Expand Up @@ -495,4 +498,11 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(df.schema.head.dataType === NullType)
checkAnswer(df, Seq(Row(null)))
}

test("SPARK-47628: Fix reading bit array type") {
val df = sqlContext.read.jdbc(jdbcUrl, "test_bit_array", new Properties)
val expected = Row(Array(true, false), Array(
Array[Byte](48, 48, 48, 48, 49), Array[Byte](48, 48, 48, 49, 48)))
checkAnswer(df, expected)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.jdbc

import java.math.{BigDecimal => JBigDecimal}
import java.nio.charset.StandardCharsets
import java.sql.{Connection, Date, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException, Timestamp}
import java.time.{Instant, LocalDate}
import java.util
Expand Down Expand Up @@ -512,6 +513,21 @@ object JdbcUtils extends Logging with SQLConfHelper {
(rs: ResultSet, row: InternalRow, pos: Int) =>
row.update(pos, rs.getBytes(pos + 1))

case _: ArrayType if metadata.contains("pg_bit_array_type") =>
// SPARK-47628: Handle PostgreSQL bit(n>1) array type ahead. As in the pgjdbc driver,
// bit(n>1)[] is not distinguishable from bit(1)[], and they are all recognized as boolen[].
// This is wrong for bit(n>1)[], so we need to handle it first as byte array.
(rs: ResultSet, row: InternalRow, pos: Int) =>
val fieldString = rs.getString(pos + 1)
if (fieldString != null) {
val strArray = fieldString.substring(1, fieldString.length - 1).split(",")
// Charset is picked from the pgjdbc driver for consistency.
val bytesArray = strArray.map(_.getBytes(StandardCharsets.US_ASCII))
row.update(pos, new GenericArrayData(bytesArray))
} else {
row.update(pos, null)
}

case ArrayType(et, _) =>
val elementConversion = et match {
case TimestampType => arrayConverter[Timestamp] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,21 @@ private case class PostgresDialect() extends JdbcDialect with SQLConfHelper {
case Types.OTHER => Some(StringType)
case _ if "text".equalsIgnoreCase(typeName) => Some(StringType) // sqlType is Types.VARCHAR
case Types.ARRAY =>
val scale = md.build().getLong("scale").toInt
val isTimestampNTZ = md.build().getBoolean("isTimestampNTZ")
// postgres array type names start with underscore
toCatalystType(typeName.drop(1), size, scale, isTimestampNTZ).map(ArrayType(_))
toCatalystType(typeName.drop(1), size, md).map(ArrayType(_))
case _ => None
}
}

private def toCatalystType(
typeName: String,
precision: Int,
scale: Int,
isTimestampNTZ: Boolean): Option[DataType] = typeName match {
md: MetadataBuilder): Option[DataType] = typeName match {
case "bool" => Some(BooleanType)
case "bit" => Some(BinaryType)
case "bit" if precision == 1 => Some(BooleanType)
case "bit" =>
md.putBoolean("pg_bit_array_type", value = true)
Some(BinaryType)
case "int2" => Some(ShortType)
case "int4" => Some(IntegerType)
case "int8" | "oid" => Some(LongType)
Expand All @@ -99,10 +99,11 @@ private case class PostgresDialect() extends JdbcDialect with SQLConfHelper {
Some(StringType)
case "bytea" => Some(BinaryType)
case "timestamptz" | "timetz" => Some(TimestampType)
case "timestamp" | "time" =>
Some(if (isTimestampNTZ) TimestampNTZType else TimestampType)
case "timestamp" | "time" => Some(getTimestampType(md.build()))
case "date" => Some(DateType)
case "numeric" | "decimal" if precision > 0 => Some(DecimalType.bounded(precision, scale))
case "numeric" | "decimal" if precision > 0 =>
val scale = md.build().getLong("scale").toInt
Some(DecimalType.bounded(precision, scale))
case "numeric" | "decimal" =>
// SPARK-26538: handle numeric without explicit precision and scale.
Some(DecimalType.SYSTEM_DEFAULT)
Expand Down

0 comments on commit 4b58a63

Please sign in to comment.