Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datafusion table provider: issues with timestamp types #441

Closed
dispanser opened this issue Sep 21, 2021 · 12 comments · Fixed by #1266
Closed

Datafusion table provider: issues with timestamp types #441

dispanser opened this issue Sep 21, 2021 · 12 comments · Fixed by #1266
Labels
binding/rust Issues for the Rust crate bug Something isn't working

Comments

@dispanser
Copy link
Contributor

In a recent attempt to write a where clause on a time stamp column in one of my favorite delta tables, I was not able to express that condition in delta-rs / datafusion. I believe this is due to a mismatch between the logical field type recorded in the delta log (and delivered to datafusion by delta-rs) and the expectation of datafusion to find the actual physical column type in the schema.

Concretely, given a delta table with a column that, according to (java) parquet tools, has the following type:

optional int64 endOfDsegTime (TIMESTAMP_MILLIS);

and according to delta-rs, the type:

SchemaField {
	name: "endOfDsegTime",
	type: primitive(
		"timestamp",
	),
	nullable: true,
	metadata: {},
},

When trying to use that field in a datafusion query endOfDsegTime >= TO_TIMESTAMP_MILLIS(\'2021-03-19T00:00:00\'), I see: Error during planning: 'Timestamp(Nanosecond, None) >= Timestamp(Millisecond, None)' can't be evaluated because there isn't a common type to coerce the types to, so datafusion seems to think that endOfDsegTime is in nanoseconds precision, which it isn't.

If I rewrite the where clause to endOfDsegTime >= TO_TIMESTAMP(\'2021-03-19T00:00:00\'), datafusion accepts the where clause but then fails somewhere deep in arrow compute territory:way to distinguish the different timestamp types from the metadata alone.

Cannot evaluate binary expression GtEq with types Timestamp(Millisecond, None) and Timestamp(Nanosecond, None)

I rewrote the table to int96 / nanoseconds; In that scenario, the second where clause works. I also noticed that according to delta-rs (and also, _delta_log/0.json), the schema is identical between these two versions of the table - delta does not distinguish based on the different physical parquet type, only in parquet the actual concrete type manifests.

Notes:

  • not sure how spark does it, but there "everything just works" irrespective of the column type in parquet
  • reading either of the two delta tables using datafusion::datasource::parquet also just works, because the schema derived from the datasource is based on the parquet file, not on the meta data in the delta log
  • I'm not sure this can be solved solely on delta side - it seems that there's no way to distinguish the different timestamp types from the metadata alone.
@dispanser dispanser added the bug Something isn't working label Sep 21, 2021
@dispanser dispanser changed the title Subject: Timestamp column schema delta vs parquet Datafusion table provider: issues with timestamp types Sep 21, 2021
@houqp
Copy link
Member

houqp commented Sep 22, 2021

There are two problems here. The first one is datafusion is not doing automatic type casting for your queries, which we really should. So please file an upstream ticket for that :)

The bigger problem is like you said our datafusion/arrow integration is not implemented correctly. We have the schema hardcoded to int96 nanosecond because that's what spark writes out by default: 3d3a32c.

However, I think the deltalake reader implementation should do the conversion to millisecond when reading them into memory so it's conforming to the deltatable metadata. @fvaleye also asked a similar question in the scala repo a long time ago: delta-io/delta#643. I confirmed with @zsxwing last week that individual implementations should be able to support parquet files with different physical timestamp types.

@mosyp
Copy link
Contributor

mosyp commented Sep 22, 2021

From delta log https://github.com/delta-io/delta/blob/master/PROTOCOL.md#primitive-types

timestamp | Microsecond precision timestamp without a timezone

From my personal delta-rs / spark interop, microsecond is the correct precision for timestamps which works with spark.

@houqp
Copy link
Member

houqp commented Sep 23, 2021

micro second is the right type to use for delta table's schema, but we need to update our pyarrow and datafusion integration to work with parquets that are written with both nanosecond and microsecond timestamp types.

@gopinathcs
Copy link

I'm doing a poc with datafusion-ext. Loaded DeltaLake with nyc Taxi data using PySpark and tried to use datafusion to query it. There is an issue with timestamp problem discussed here.

I tried to store the column in different timestamp format but Spark is converting it into microseconds.

"Error: ArrowError(ExternalError(Execution("Failed to map column projection for field tpep_dropoff_datetime. Incompatible data types Timestamp(Nanosecond, None) and Timestamp(Microsecond, None)")))"

@roeap
Copy link
Collaborator

roeap commented Oct 12, 2022

thanks @gopinathcs for the report.

Currently I am working on some updates of the datafusion APIs - #852. This also includes updates to latest datafusion and arrow.

That being said, the time representation in spark and arrow has been an issue, and I can likely take a closer look at this once the PR above is done. As this is quite a significant change to the datafusion APIs (not the TableProvider). If you have any feedback, we'd be happy to hear it.

@gopinathcs
Copy link

thanks @roeap. We are planning to use delta-rs and datafusion with caching in our platform. I will continue with my exploration and let know of any feedback.

@houqp
Copy link
Member

houqp commented Oct 13, 2022

If spark is writing the timestamp in microsecond units, then it should work, see

Ok(ArrowDataType::Timestamp(TimeUnit::Microsecond, None))
. @gopinathcs are you sure your pyspark code is not producing timestamps in int96 instead?

@gopinathcs
Copy link

OK, this is what I did in the POC. Loaded the table in Arrow, casted to micro timestamp and wrote it to Parquet. Then read the parquet into Spark and created Delta Table. Then tried to run a "Select * from" query using datafusion-ext.

Let me recheck again. Thanks @houqp for pointer.

@gopinathcs
Copy link

@houqp @roeap
Here is the table schem of Datafusion "Show columns From demo", Seems delta Lake and Datafusion show it has microseconds.

image

Get the same error if run query "Select * from demo limit 5".

Error: ArrowError(ExternalError(Execution("Failed to map column projection for field tpep_dropoff_datetime. Incompatible data types Timestamp(Nanosecond, None) and Timestamp(Microsecond, None)")))

@davidvesp
Copy link

Same error converting from a pyarrow table with the following schema:

id_articulo: int64
id_color: int64
es_imagen: decimal128(1, 0)
id_coleccion: int64
fecha_alta: timestamp[ms]
usuario_alta: string
fecha_baja: timestamp[ms]
usuario_baja: string
fecha_modificacion: timestamp[ms]
usuario_modificacion: string

deltalake.PyDeltaTableError: Schema error: Invalid data type for Delta Lake: Timestamp(Millisecond, None)

@wjones127
Copy link
Collaborator

Delta Lake format only supports microsecond precision timestamps. So it should work if you convert your columns to that. In the future, we might be able to try automatically casting timestamps for you (but would fail if out of representable bounds).

@davidvesp
Copy link

Yes, finally I manage to change the schema to use pa.timestamp('us') because the field was in nanoseconds, and then we must cast the table:
table.cast(target_schema=schema_cast, safe=False)
Thanks

wjones127 added a commit that referenced this issue Apr 14, 2023
# Description

This PR updates table scans with datafusion to read the file schema from
the parquet file within the latest add action of the table. This is to
work around some issues, where the schema we derive from metadata does
not match the data in the parquet files - e.g. nanosecond timestamps vs.
micorsoecond.

We also update the `Load` command to handle column selections and make
it more consistent with the other operations.

# Related Issue(s)

closes #441

# Documentation

<!---
Share links to useful documentation
--->

---------

Co-authored-by: Will Jones <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants