Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Recursive" futures are deleted from the cluster #8854

Closed
josephnowak opened this issue Sep 3, 2024 · 5 comments
Closed

"Recursive" futures are deleted from the cluster #8854

josephnowak opened this issue Sep 3, 2024 · 5 comments

Comments

@josephnowak
Copy link

josephnowak commented Sep 3, 2024

Describe the issue:
My regular process consists of running an SQL command in my cluster to get a relatively big Dataframe which is scattered among all the workers to avoid running the query multiple times, but I have found that with the last version of Dask, this is no longer possible because the scattered data and any future is being deleted, probably this is related to this MR (dask/dask#11348).

Note: I tried the same example with Dask 2024.08.1 and everything works as expected

Minimal Complete Verifiable Example:

import pandas as pd
from dask import distributed

dask_client = distributed.Client()


def f():
    c = distributed.get_client()
    # Both options produce that the future being deleted from the cluster which means that it is lost
    # return c.submit(lambda : 1)
    return c.scatter(pd.DataFrame([[1, 2, 3, 4]]))

a = dask_client.submit(f).result()
# a will contain some kind of None future which can not even be printed on my Jupyter notebook.
a

Anything else we need to know?:
When I try to use the future as a parameter for another call of the submit method, I get the following error:

RuntimeError("<class 'distributed.client.Future'> object not properly initialized. This can happen if the object is being deserialized outside of the context of a Client or Worker.")

Environment:

  • Dask version: 2024.08.2
  • Python version: 3.11
  • Operating System: Windows 11 and also Linux (I do not remember the distro sorry is a Jupyter deployed on Amazon).
  • Install method (conda, pip, source): pip
@fjetter
Copy link
Member

fjetter commented Sep 3, 2024

I suspect what you're seeing is caused by #8827 not by dask/dask#11348

The exceptions you're seeing are unfortunate and we should fix that. However, the problem you are seeing itself is pretty much expected behavior now. This only worked in earlier version due to a race condition that we now got rid of, i.e. it worked by pure chance.

Can you elaborate you need this complex construct? It is a bit unusual for a function to return a future. Typically, your code would just look like this...

def f():
    return pd.DataFrame([[1, 2, 3, 4]])

fut = dask_client.submit(f)
# a will contain some kind of None future which can not even be printed on my Jupyter notebook.

As long as you are holding a reference to fut, the function will not be recomputed and you can reuse the result. There is no need to scatter anything or to submit the data again. Just return it and dask will take care of the rest.

@josephnowak
Copy link
Author

josephnowak commented Sep 3, 2024

I'm trying to execute a DAG of tasks to load certain data on Zarr, and I'm scattering some data frames to use them on the creation of multiple delayed DataArray with the map_blocks function, if I do not scatter them then I get a warning related to a huge graph size, here is an example of what I'm trying to achieve:

import pandas as pd
import xarray as xr
from dask import distributed
import dask.array as da

dask_client = distributed.Client()


def f():
    c = distributed.get_client()
    # Both options produce that the future being deleted from the cluster
    # return c.submit(lambda : 1)
    # Get the DataFrame from a slow SQL DB 
    return c.scatter(pd.DataFrame([[1, 2, 3, 4]]))


def slow_function(x, df):
    # Apply a logic using the DF, x.coords and transform it into a DataArray
    return x


def array_creation1(scattered_df):
    # The coords are retrieved from a query, 
    # but I will hardcode it to simplify the example
    coords = {"a": [1, 2], "b": [5, 7]}
    arr = xr.DataArray(
        da.zeros(shape=(len(coords["a"]), len(coords["b"])), chunks=(1, 2)),
        coords=coords,
        dims=["a", "b"]
    )
    # If the data is not scattered then I'm going to get a 
    # warning mentioning that the size of the graph is big
    arr = arr.map_blocks(
        slow_function,
        args=(scattered_df, )
    )
    # Store it in Zarr and return the xr.open_zarr delayed array
    # but as part of the example just compute the array to check that 
    # everything is ok and return the delayed version of it
    arr.compute()
    return arr


scatter_data = dask_client.submit(f).result()
# There can be more than 1 array_creation functions
dask_client.submit(array_creation1, scatter_data).result()

Is there any workaround for a workflow like that?

@fjetter
Copy link
Member

fjetter commented Sep 3, 2024

If you replace the scatter code with the example I provided you should be fine.

import pandas as pd
import xarray as xr
from dask import distributed
import dask.array as da

dask_client = distributed.Client()


def f():
    return pd.DataFrame([[1, 2, 3, 4]])

...

# Note: I am not fetching the result but am forwarding the Future object that is returned by submit
scatter_data = dask_client.submit(f)

dask_client.submit(array_creation1, scatter_data).result()

@josephnowak
Copy link
Author

josephnowak commented Sep 5, 2024

Sorry for the delay in my response.

I already tried your approach and it generated a big increase in memory use (from less than 1 GB to more than 8 GB as shown in the attached example code and image), which I think is being produced because a copy of the complete data frame is sent to every task and Dask does not release the copies until the whole computation (to_zarr) is done (which I think is the expected behavior based on what you told me here #8460).

What I did to solve the memory issue was to scatter the data inside the function that is running on the cluster before trying to save the array with Zarr, from my perspective this is not always ideal because I'm scattering the same data multiple times, but probably due to the hash logic inside that function it is not consuming more resources, so for now this works fine for me.
Would this be the way to proceed with Dask for use cases like mine?

In any case, I think you already clarified that it is not a bug, so I think that you can close the issue

import xarray as xr
import dask.array as da
import pandas as pd

from dask import distributed

dask_client = distributed.Client()

coords = {
    "date": pd.date_range("2005-01-01", "2008-01-01").to_numpy(),
    "security": list(range(30000))
}

dataset = xr.Dataset(
    {
        str(v): xr.DataArray(
            da.zeros(
                shape=(len(coords["date"]), len(coords["security"])),
                chunks=(30, 6000)
            ),
            coords=coords,
            dims=["date", "security"]
        )
        for v in list(range(64))
    }
)


def useless_func(x, d):
    return x

ms = distributed.diagnostics.MemorySampler()

def generate_data():
    d = pd.Series(list(range(10000000)))
    return d

d = dask_client.submit(generate_data)

def write_data(d, scatter_the_data_first: bool):
    if scatter_the_data_first:
        c = distributed.get_client()
        d = c.scatter(d, broadcast=True)

    dataset.map_blocks(
        useless_func, 
        kwargs={"d": d},
        template=dataset
    ).to_zarr("path", mode="w")

with ms.sample("No Scatter"):
    dask_client.submit(write_data, d, scatter_the_data_first=False).result()


with ms.sample("Scatter"):
    dask_client.submit(write_data, d, scatter_the_data_first=True).result()

ms.plot(align=True)

image

@fjetter
Copy link
Member

fjetter commented Sep 5, 2024

The issue is that you are using xarray inside of a task. This is typically not necessary. The code below works with plain dask API w/out using any client.futures. This makes the code easier to reason about and it does what you want

import dask

...

@dask.delayed
def generate_data():
    d = pd.Series(list(range(10000000)))
    return d

with distributed.Client(), ms.sample():
    d = generate_data()
    dataset.map_blocks(
        useless_func, 
        (d,),
        template=dataset
    ).to_zarr("path", mode="w")

ms.plot(align=True)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants