Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spatial_shuffle() can result in ArrowTypeError when using pyarrow 12 #256

Open
FlorisCalkoen opened this issue Jun 15, 2023 · 5 comments
Open

Comments

@FlorisCalkoen
Copy link

FlorisCalkoen commented Jun 15, 2023

When using a Dask distributed client and pyarrow=12.*, spatial shuffle can trigger a call to pa.Schema.from_pandas(df) which results in ArrowTypeError: Did not pass numpy.dtype object.

Reproducible example:

import dask
import dask.dataframe as dd
import dask_geopandas
import geodatasets
import geopandas as gpd
import pandas as pd
import pyarrow as pa
from distributed import Client
from geopandas.array import GeometryDtype

# sample data
rivers = gpd.read_file(geodatasets.get_path("eea large_rivers")).to_crs(4326)
grid = gpd.read_file(geodatasets.get_path("geoda grid100")).to_crs(4326)

# error doesn't happen when you don't use a distributed client.
client = Client()
print(client.dashboard_link)

# make data available to all workers
[scattered_grid] = client.scatter([grid[["geometry", "POLYID"]]], broadcast=True)

# do some work
def overlay_by_grid(df, grid):
    df = gpd.overlay(
        df,
        grid,
        keep_geom_type=False,
    ).explode(column="geometry", index_parts=False)
    return df


META = gpd.GeoDataFrame(
    {
        "NAME": pd.Series([], dtype=str),
        "Shape_Leng": pd.Series([], dtype="f8"),
        "POLYID": pd.Series([], dtype="i4"),
        "geometry": gpd.GeoSeries([], dtype=GeometryDtype),
    }
)

lazy_values = []
ddf = dask_geopandas.from_geopandas(rivers, npartitions=10)
for partition in ddf.to_delayed():
    partition = dask.delayed(overlay_by_grid)(partition, scattered_grid)
    lazy_values.append(partition)
ddf = dd.from_delayed(lazy_values, meta=META)
ddf.spatial_shuffle()  #  <-- this produces the error when using pyarrow=12.* and the `dask.distributed.Client`
Click to see traceback
  /Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection
  return lib.intersection(a, b, **kwargs)
/Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection
  return lib.intersection(a, b, **kwargs)
/Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection
  return lib.intersection(a, b, **kwargs)
/Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection
  return lib.intersection(a, b, **kwargs)
/Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection
  return lib.intersection(a, b, **kwargs)
/Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection
  return lib.intersection(a, b, **kwargs)
/Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection
  return lib.intersection(a, b, **kwargs)
/Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection
  return lib.intersection(a, b, **kwargs)
/Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection
  return lib.intersection(a, b, **kwargs)
/Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection
  return lib.intersection(a, b, **kwargs)
/Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection
  return lib.intersection(a, b, **kwargs)
/Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection
  return lib.intersection(a, b, **kwargs)
/Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection
  return lib.intersection(a, b, **kwargs)
/Users/calkoen/mambaforge/envs/testenv/lib/python3.11/site-packages/pygeos/set_operations.py:129: RuntimeWarning: invalid value encountered in intersection
  return lib.intersection(a, b, **kwargs)
2023-06-15 13:34:24,661 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-transfer-6ccb0d6d714273c1b356ecef0cf2ba3d', 0)
Function:  shuffle_transfer
args:      (     NAME    Shape_Leng  ...  _partitions      _index
0  Danube  2.770357e+06  ...            2   539448662
1  Danube  2.770357e+06  ...            1   452134766
2  Danube  2.770357e+06  ...            8  3782464245
2  Danube  2.770357e+06  ...            9  4146295380
2  Danube  2.770357e+06  ...            9  4145483446
2  Danube  2.770357e+06  ...            9  4145105290
2  Danube  2.770357e+06  ...            9  4145180995

[7 rows x 6 columns], '6ccb0d6d714273c1b356ecef0cf2ba3d', 0, 10, '_partitions', {0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 6ccb0d6d714273c1b356ecef0cf2ba3d')"

2023-06-15 13:34:24,708 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-transfer-6ccb0d6d714273c1b356ecef0cf2ba3d', 5)
Function:  shuffle_transfer
args:      (    NAME     Shape_Leng  ...  _partitions      _index
0  Rhone  910617.937531  ...            1   224597375
0  Rhone  910617.937531  ...            1   224461416
0  Rhone  910617.937531  ...            0   142653391
0  Rhone  910617.937531  ...            0   171455062
0  Rhone  910617.937531  ...            1   171476924
0  Rhone  910617.937531  ...            1   181956632
1   Sava  625964.046091  ...            9  3864776142
2   Sava  625964.046091  ...            9  3889637540

[8 rows x 6 columns], '6ccb0d6d714273c1b356ecef0cf2ba3d', 5, 10, '_partitions', {0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 6ccb0d6d714273c1b356ecef0cf2ba3d')"

2023-06-15 13:34:24,709 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-transfer-6ccb0d6d714273c1b356ecef0cf2ba3d', 2)
Function:  shuffle_transfer
args:      (Empty GeoDataFrame
Columns: [NAME, Shape_Leng, POLYID, geometry, _partitions, _index]
Index: [], '6ccb0d6d714273c1b356ecef0cf2ba3d', 2, 10, '_partitions', {0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 6ccb0d6d714273c1b356ecef0cf2ba3d')"

2023-06-15 13:34:24,709 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-transfer-6ccb0d6d714273c1b356ecef0cf2ba3d', 3)
Function:  shuffle_transfer
args:      (      NAME     Shape_Leng  ...  _partitions      _index
0    Loire  944755.381131  ...            0   104588529
1  Nemunas  689209.621531  ...            4  2235773386

[2 rows x 6 columns], '6ccb0d6d714273c1b356ecef0cf2ba3d', 3, 10, '_partitions', {0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 6ccb0d6d714273c1b356ecef0cf2ba3d')"

2023-06-15 13:34:24,710 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-transfer-6ccb0d6d714273c1b356ecef0cf2ba3d', 9)
Function:  shuffle_transfer
args:      (       NAME     Shape_Leng  ...  _partitions      _index
0    Vuoksa  153155.546336  ...            5  2706767300
1  Kemijoki  465164.358679  ...            5  2768818853
1  Kemijoki  465164.358679  ...            5  2604172295
1  Kemijoki  465164.358679  ...            5  2600927167
1  Kemijoki  465164.358679  ...            5  2599495452
1  Kemijoki  465164.358679  ...            6  2789728699
1  Kemijoki  465164.358679  ...            6  2782712749
1  Kemijoki  465164.358679  ...            6  2795155147
1  Kemijoki  465164.358679  ...            6  2795062494
2    Vuoksa  153155.546336  ...            6  3016224678

[10 rows x 6 columns], '6ccb0d6d714273c1b356ecef0cf2ba3d', 9, 10, '_partitions', {0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 6ccb0d6d714273c1b356ecef0cf2ba3d')"

2023-06-15 13:34:24,711 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-transfer-6ccb0d6d714273c1b356ecef0cf2ba3d', 8)
Function:  shuffle_transfer
args:      (         NAME     Shape_Leng  ...  _partitions      _index
0  Zap. Dvina  806083.181154  ...            6  3119987828
1  Zap. Dvina  806083.181154  ...            6  3210229576
2          Po  624045.842969  ...            1   515795029
3          Po  624045.842969  ...            1   500383478

[4 rows x 6 columns], '6ccb0d6d714273c1b356ecef0cf2ba3d', 8, 10, '_partitions', {0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 6ccb0d6d714273c1b356ecef0cf2ba3d')"

2023-06-15 13:34:24,714 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-transfer-6ccb0d6d714273c1b356ecef0cf2ba3d', 6)
Function:  shuffle_transfer
args:      (    NAME     Shape_Leng  ...  _partitions     _index
0  Seine  673648.436798  ...            4  969640100

[1 rows x 6 columns], '6ccb0d6d714273c1b356ecef0cf2ba3d', 6, 10, '_partitions', {0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 6ccb0d6d714273c1b356ecef0cf2ba3d')"

2023-06-15 13:34:24,715 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-transfer-6ccb0d6d714273c1b356ecef0cf2ba3d', 7)
Function:  shuffle_transfer
args:      (      NAME     Shape_Leng  ...  _partitions      _index
0     Tisa  624533.651205  ...            8  3689965283
0     Tisa  624533.651205  ...            9  3832754910
0     Tisa  624533.651205  ...            8  3689967583
0     Tisa  624533.651205  ...            8  3705278923
1  Vistula  895680.167951  ...            7  3569398882
1  Vistula  895680.167951  ...            7  3637816901
1  Vistula  895680.167951  ...            7  3637217724
2  Vistula  895680.167951  ...            7  3620047341
3  Vistula  895680.167951  ...            7  3637743105

[9 rows x 6 columns], '6ccb0d6d714273c1b356ecef0cf2ba3d', 7, 10, '_partitions', {0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 6ccb0d6d714273c1b356ecef0cf2ba3d')"

2023-06-15 13:34:24,718 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-transfer-6ccb0d6d714273c1b356ecef0cf2ba3d', 1)
Function:  shuffle_transfer
args:      (   NAME    Shape_Leng  ...  _partitions     _index
0  Ebro  8.269909e+05  ...            0    3871157
0  Ebro  8.269909e+05  ...            0     645733
1  Elbe  1.087288e+06  ...            3  801920486
2  Elbe  1.087288e+06  ...            3  798473106
2  Elbe  1.087288e+06  ...            3  745151648
2  Elbe  1.087288e+06  ...            3  751031899
2  Elbe  1.087288e+06  ...            2  673496756
2  Elbe  1.087288e+06  ...            2  653474055

[8 rows x 6 columns], '6ccb0d6d714273c1b356ecef0cf2ba3d', 1, 10, '_partitions', {0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
kwargs:    {}
Exception: "RuntimeError('shuffle_transfer failed during shuffle 6ccb0d6d714273c1b356ecef0cf2ba3d')"

---------------------------------------------------------------------------
ArrowTypeError                            Traceback (most recent call last)
File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/shuffle/_shuffle.py:63, in shuffle_transfer()
     62 try:
---> 63     return _get_worker_extension().add_partition(
     64         input,
     65         shuffle_id=id,
     66         type=ShuffleType.DATAFRAME,
     67         input_partition=input_partition,
     68         npartitions=npartitions,
     69         column=column,
     70         parts_out=parts_out,
     71     )
     72 except Exception as e:

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/shuffle/_worker_extension.py:654, in add_partition()
    653     kwargs["empty"] = data
--> 654 shuffle = self.get_or_create_shuffle(shuffle_id, type=type, **kwargs)
    655 return sync(
    656     self.worker.loop,
    657     shuffle.add_partition,
    658     data=data,
    659     input_partition=input_partition,
    660 )

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/shuffle/_worker_extension.py:910, in get_or_create_shuffle()
    904 def get_or_create_shuffle(
    905     self,
    906     shuffle_id: ShuffleId,
    907     type: ShuffleType,
    908     **kwargs: Any,
    909 ) -> ShuffleRun:
--> 910     return sync(
    911         self.worker.loop,
    912         self._get_or_create_shuffle,
    913         shuffle_id,
    914         type,
    915         **kwargs,
    916     )

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/utils.py:418, in sync()
    417     typ, exc, tb = error
--> 418     raise exc.with_traceback(tb)
    419 else:

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/utils.py:391, in f()
    390     future = asyncio.ensure_future(future)
--> 391     result = yield future
    392 except Exception:

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/tornado/gen.py:767, in run()
    766 try:
--> 767     value = future.result()
    768 except Exception as e:
    769     # Save the exception for later. It's important that
    770     # gen.throw() not be called inside this try/except block
    771     # because that makes sys.exc_info behave unexpectedly.

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/shuffle/_worker_extension.py:735, in _get_or_create_shuffle()
    734 if shuffle is None:
--> 735     shuffle = await self._refresh_shuffle(
    736         shuffle_id=shuffle_id,
    737         type=type,
    738         kwargs=kwargs,
    739     )
    741 if self.closed:

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/shuffle/_worker_extension.py:784, in _refresh_shuffle()
    779     assert kwargs is not None
    780     result = await self.worker.scheduler.shuffle_get_or_create(
    781         id=shuffle_id,
    782         type=type,
    783         spec={
--> 784             "schema": pa.Schema.from_pandas(kwargs["empty"])
    785             .serialize()
    786             .to_pybytes(),
    787             "npartitions": kwargs["npartitions"],
    788             "column": kwargs["column"],
    789             "parts_out": kwargs["parts_out"],
    790         },
    791         worker=self.worker.address,
    792     )
    793 elif type == ShuffleType.ARRAY_RECHUNK:

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/pyarrow/types.pxi:2574, in pyarrow.lib.Schema.from_pandas()
   2573 from pyarrow.pandas_compat import dataframe_to_types
-> 2574 names, types, metadata = dataframe_to_types(
   2575     df,

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/pyarrow/pandas_compat.py:546, in dataframe_to_types()
    544     empty = c.head(0) if isinstance(
    545         c, _pandas_api.pd.Series) else c[:0]
--> 546     type_ = pa.array(empty, from_pandas=True).type
    547 else:

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/pyarrow/array.pxi:323, in pyarrow.lib.array()
    322         values, obj.dtype, type)
--> 323 result = _ndarray_to_array(values, mask, type, c_from_pandas, safe,
    324                            pool)

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/pyarrow/array.pxi:79, in pyarrow.lib._ndarray_to_array()
     78 shared_ptr[CChunkedArray] chunked_out
---> 79 shared_ptr[CDataType] c_type = _ndarray_to_type(values, type)
     80 CCastOptions cast_options = CCastOptions(safe)

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/pyarrow/array.pxi:67, in pyarrow.lib._ndarray_to_type()
     66 with nogil:
---> 67     check_status(NumPyDtypeToArrow(dtype, &c_type))
     68 

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/pyarrow/error.pxi:123, in pyarrow.lib.check_status()
    122 elif status.IsTypeError():
--> 123     raise ArrowTypeError(message)
    124 elif status.IsCapacityError():

ArrowTypeError: Did not pass numpy.dtype object

The above exception was the direct cause of the following exception:

RuntimeError                              Traceback (most recent call last)
Cell In[6], line 1
----> 1 ddf.spatial_shuffle()

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/dask_geopandas/core.py:807, in GeoDataFrame.spatial_shuffle(self, by, level, calculate_partitions, npartitions, divisions, **kwargs)
    797 sorted_ddf = self.set_index(
    798     by,
    799     sorted=False,
   (...)
    803     **kwargs,
    804 )
    806 if calculate_partitions:
--> 807     sorted_ddf.calculate_spatial_partitions()
    809 return sorted_ddf

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/dask_geopandas/core.py:186, in _Frame.calculate_spatial_partitions(self)
    178 else:
    179     import pygeos  # noqa
    181     parts = geopandas.GeoSeries(
    182         self.map_partitions(
    183             lambda part: pygeos.convex_hull(
    184                 pygeos.geometrycollections(part.geometry.values.data)
    185             )
--> 186         ).compute(),
    187         crs=self.crs,
    188     )
    189 self.spatial_partitions = parts

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/dask/base.py:310, in DaskMethodsMixin.compute(self, **kwargs)
    286 def compute(self, **kwargs):
    287     """Compute this dask collection
    288 
    289     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    308     dask.compute
    309     """
--> 310     (result,) = compute(self, traverse=False, **kwargs)
    311     return result

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/dask/base.py:595, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    592     keys.append(x.__dask_keys__())
    593     postcomputes.append(x.__dask_postcompute__())
--> 595 results = schedule(dsk, keys, **kwargs)
    596 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/client.py:3227, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3225         should_rejoin = False
   3226 try:
-> 3227     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3228 finally:
   3229     for f in futures.values():

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/client.py:2361, in Client.gather(self, futures, errors, direct, asynchronous)
   2359 else:
   2360     local_worker = None
-> 2361 return self.sync(
   2362     self._gather,
   2363     futures,
   2364     errors=errors,
   2365     direct=direct,
   2366     local_worker=local_worker,
   2367     asynchronous=asynchronous,
   2368 )

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/utils.py:351, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    349     return future
    350 else:
--> 351     return sync(
    352         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    353     )

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/utils.py:418, in sync(loop, func, callback_timeout, *args, **kwargs)
    416 if error:
    417     typ, exc, tb = error
--> 418     raise exc.with_traceback(tb)
    419 else:
    420     return result

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/utils.py:391, in sync.<locals>.f()
    389         future = wait_for(future, callback_timeout)
    390     future = asyncio.ensure_future(future)
--> 391     result = yield future
    392 except Exception:
    393     error = sys.exc_info()

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/tornado/gen.py:767, in Runner.run(self)
    765 try:
    766     try:
--> 767         value = future.result()
    768     except Exception as e:
    769         # Save the exception for later. It's important that
    770         # gen.throw() not be called inside this try/except block
    771         # because that makes sys.exc_info behave unexpectedly.
    772         exc: Optional[Exception] = e

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/client.py:2224, in Client._gather(self, futures, errors, direct, local_worker)
   2222         exc = CancelledError(key)
   2223     else:
-> 2224         raise exception.with_traceback(traceback)
   2225     raise exc
   2226 if errors == "skip":

File ~/mambaforge/envs/testenv/lib/python3.11/site-packages/distributed/shuffle/_shuffle.py:73, in shuffle_transfer()
     63     return _get_worker_extension().add_partition(
     64         input,
     65         shuffle_id=id,
   (...)
     70         parts_out=parts_out,
     71     )
     72 except Exception as e:
---> 73     raise RuntimeError(f"shuffle_transfer failed during shuffle {id}") from e

RuntimeError: shuffle_transfer failed during shuffle 6ccb0d6d714273c1b356ecef0cf2ba3d

The error can also be reproduced like this:

pa.Schema.from_pandas(ddf.partitions[0].compute())

The same code works when using pyarrow=11, although it's slower.

Environment:

  • Dask version: '2023.5.1'
  • Dask-geopandas version: 'v0.3.1'
  • Pyarrow version: '12.0.0'
  • Python version: 3.11
  • Operating System: Mac m1
  • Install method (conda, pip, source): mamba
@jorisvandenbossche
Copy link
Member

@FlorisCalkoen thanks for the report!

I can reproduce the error, but I am not sure it is related to the pyarrow version (I get the error with pyarrow 10 or 11 as well), but maybe rather related to the dask/distributed version?

My current understanding is that this error comes from the new shuffle implementation in distributed (https://blog.coiled.io/blog/shuffling-large-data-at-constant-memory.html, starting with dask 2023.2.1), which now uses Arrow IPC to serialize the data and send them between workers. But converting a geopandas.GeoDataFrame to pyarrow.Table doesn't work out of the box, because arrow doesn't know what to do with the geometry column.

And I can confirm this by specifying to use the older task-based shuffling:

ddf.spatial_shuffle(shuffle="tasks")

That works without error for me.

@jorisvandenbossche
Copy link
Member

We should of course ensure this works with the new P2P shuffle as well, as that brings many benefits.
I have to look a bit closer into it, but essentially we have to make the following work:

In [7]: rivers = gpd.read_file(geodatasets.get_path("eea large_rivers")).to_crs(4326)

In [8]: import pyarrow as pa

In [9]: pa.Table.from_pandas(rivers)
...
ArrowTypeError: ('Did not pass numpy.dtype object', 'Conversion failed for column geometry with type geometry')

This is something that could be fixed on the GeoPandas side by defining an arrow extension type (to control how the geometry column gets converted to arrow and back). However, I am not fully sure how dask/distributed could know we want back a GeoDataFrame and not a DataFrame (something to try out).

Or dask/distributed needs to give us some way to register a method to override this default conversion, similarly as we did for just dask's to_parquet to register a pyarrow_schema_dispatch (82da8f1)

@martinfleis
Copy link
Member

cc @hendrikmakait, you were interested how P2P works with dask-geopandas. It doesn't at the moment :).

@jorisvandenbossche
Copy link
Member

With the following patch to geopandas, the above example works:

--- a/geopandas/array.py
+++ b/geopandas/array.py
@@ -1257,7 +1257,10 @@ class GeometryArray(ExtensionArray):
         # GH 1413
         if isinstance(scalars, BaseGeometry):
             scalars = [scalars]
-        return from_shapely(scalars)
+        try:
+            return from_shapely(scalars)
+        except TypeError:
+            return from_wkb(scalars)
 
     def _values_for_factorize(self):
         # type: () -> Tuple[np.ndarray, Any]
@@ -1454,6 +1457,11 @@ class GeometryArray(ExtensionArray):
         """
         return to_shapely(self)
 
+    def __arrow_array__(self, type=None):
+        # convert the underlying array values to a pyarrow Array
+        import pyarrow
+        return pyarrow.array(to_wkb(self), type=type)
+
     def _binop(self, other, op):
         def convert_values(param):
             if not _is_scalar_geometry(param) and (

Explanation:

  • Adding __arrow_array__ ensures that the conversion to an Arrow table works automatically (which is done in distributed's P2P shuffle, it calls pa.Table.from_pandas on each chunk.
  • When only adding __arrow_array__, the conversion back from arrow to pandas fails: distributed will try to cast the pyarrow->pandas converted DataFrame to the original dtypes (df.astype(meta.dtypes, copy=False) at https://github.com/dask/distributed/blob/9beab9a06a7777cb8d6bb2d90ae961b69de2e532/distributed/shuffle/_arrow.py#L71-L72). This fails, because geopandas currently doesn't support doing Series[binary WKB values].astype("geometry") (while we probably should?)
  • Overriding _from_sequence to call from_wkb if from_shapely fails is a quick workaround to get .astype("geometry") working.

@jorisvandenbossche
Copy link
Member

While the points I raise are things we should address in geopandas anyway at some point (although there are some questions about which default representation to use when converting to arrow), there are also other solutions in dask and distributed itself: dask added a dispatch method for pyarrow<->pandas conversion (dask/dask#10312) which we can implement, and I think that should also fix this issue when that dispatch method is used in distributed (WIP PR for this is at dask/distributed#7743)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants