-
Hi all, I am trying to evaluate cuspatial and the recent cudf & dask updates for RAPIDS for a Geospatial workflow. I have a 2 Billion row Parquet table that contains point data (68GB - 480GB uncompressed csv), I have also created a single polygon to be able to test point in polygon across this data. %%time
path = "/ssd/Vegas/datafiles/parquet/vegas/*"
#dask-sql
c = Context()
# create a table and register it in the context
c.create_table("parquet", glob(path), gpu=True)
xy = c.sql("""SELECT lon,lat FROM parquet""")
%%time
xy_cudf = xy.compute()
xy_cudf_interleaved = xy_cudf.interleave_columns() #OOM here
Is there an option in cuspatial to not use interleaved columns for the spatial join? (I couldn't see one in the docs), appreciate it wont be as efficient but I am just trying to get a proof of concept working right now - can optimise down the track Or if anyone has any suggestions as to how I can manually interleave the dataframe that would also do for now! |
Beta Was this translation helpful? Give feedback.
Replies: 5 comments 16 replies
-
This (interleaving data costing temporary storage) is a great point. We're discussing. Stay tuned! |
Beta Was this translation helpful? Give feedback.
-
I have tried several methods to get this working but each time I am stuck when the points array needs to be realised. This is about as close as I have got so far - if anyone has any hints on how to improve this I am all ears. import cudf
import dask
import dask.dataframe as dd
import dask.array as da
import dask_cudf as dc
import json
import cuspatial
import pandas as pd
import geopandas as gpd
from shapely import from_wkb, from_wkt
import numpy as np
# Define the path to the parquet files
path_to_parquet_files = "/ssd/Vegas/datafiles/parquet/vegas/*"
# Read in the parquet files using dask.dataframe
ddf = dc.read_parquet(path_to_parquet_files)
ddf.head(5)
5 rows × 33 columns # Get the point columns as Dask arrays
lon_arr = ddf['lon']
lat_arr = ddf['lat']
interleaved = dd.concat([lon_arr, lat_arr], axis=0, interleave_partitions=True)
# Combine the point arrays into a single array
# point_arr = da.stack([lon_arr, lat_arr], axis=1) polygon_wkt = 'POLYGON((-115.074444 36.289153, -115.208314 36.325569, -115.208688 36.325646, -115.259397 36.335589, -115.260628 36.335652, -115.260845 36.335658, -115.276407 36.335967, -115.320842 36.33641, -115.333587 36.30627, -115.368573 36.170149, -115.3682 36.168344, -115.36794 36.16714, -115.353159 36.109493, -115.315922 36.023474, -115.298126 35.998029, -115.102856 35.918199, -115.100108 35.918038, -115.095546 35.917781, -115.084946 35.918409, -115.072079 35.923316, -114.918841 35.983522, -114.919047 36.0226, -114.919067 36.022821, -114.919108 36.022997, -114.93976 36.080677, -115.006009 36.219129, -115.007607 36.222047, -115.008162 36.222806, -115.010791 36.225883, -115.06011 36.27717, -115.068572 36.285772, -115.069297 36.286474, -115.069637 36.286803, -115.070046 36.287197, -115.071477 36.288191, -115.071736 36.288332, -115.074214 36.289087, -115.074444 36.289153))'
polygon_gdf = gpd.GeoSeries.from_wkt([polygon_wkt])
polygon_gdf
# Apply the point in polygon algorithm to the data
result = cuspatial.point_in_polygon(
cuspatial.GeoSeries.from_points_xy(interleaved),
cuspatial.from_geopandas(polygon_gdf)
)
result
# Convert the result to a Dask series and set the index
result_series = dc.from_dask_array(result)
result_series = result_series.set_index(ddf.index)
# Join the result series with the original dataframe
ddf_with_result = ddf.merge(result_series.to_frame('point_in_polygon'), how='left', left_index=True, right_index=True)
ddf_with_result.head(10) |
Beta Was this translation helpful? Give feedback.
-
Hi @voycey, we had some internal conversations about this. You may want to look at this dask_cudf feature request (closed, with a suggested workaround) filed by @isVoid . rapidsai/cudf#13308 Also, some guidance on reading Parquet by chunks:
If your row groups are are too large for GPU memory in the parquet file, then you would have a problem. Note in libcudf there is a chunked parquet reader that can go finer granularity, but it is not exposed in Python AFAIK yet. |
Beta Was this translation helpful? Give feedback.
-
Ok just to update this with something that is working: (I appreciate the help everyone 🙂 - hopefully my pain helps others trying to get a PoC working): This meant I also had to wrap the function in a separate function so that the serialization using pickle would work internally too (It didn't like doing this with cudf in it directly I think). So the bulk of the code now looks like this and is working pretty efficiently! %%time
def spatial_join(df_partition, polygon):
#build points geoseries
i_ddf = cudf.DataFrame({"x" : df_partition['lon'], "y" : df_partition['lat']}).interleave_columns()
points_gseries = cuspatial.GeoSeries.from_points_xy(i_ddf)
#do spatial join
result = cuspatial.point_in_polygon(
points_gseries, polygon
)
return result
def wrapped_spatial_join(df_partition):
gpolygon = cuspatial.from_geopandas(polygon)
return spatial_join(df_partition, gpolygon)
%%time
res = ddf.map_partitions(wrapped_spatial_join).compute()
# res = ddf.map_partitions(spatial_join, gpolygon, meta=ddf.head(), align_dataframes=False).compute()
%%time
res.value_counts()
1 minute to run a spatial join of around 10GB of points is pretty great on this GPU. |
Beta Was this translation helpful? Give feedback.
-
@voycey thanks for writing a positive review of cuSpatial and RAPIDS in your post here: https://towardsdatascience.com/data-engineering-fast-spatial-joins-across-2-billion-rows-on-a-single-old-gpu-c6cb531949ed Awesome! What's the next step? |
Beta Was this translation helpful? Give feedback.
Ok just to update this with something that is working: (I appreciate the help everyone 🙂 - hopefully my pain helps others trying to get a PoC working):
With a bit of help from ChatGPT I understood some of the limitations, primarily dask and building out the cudf Dataframe needs to be done in each worker for both the polygon and the points.
This meant I also had to wrap the function in a separate function so that the serialization using pickle would work internally too (It didn't like doing this with cudf in it directly I think).
Also creating the interleaved column INSIDE the function that was being mapped was obvious in hindsight as it would only work on partitioned data (which I think …