Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apply_ufunc support for chunks on input_core_dims #1995

Open
crusaderky opened this issue Mar 15, 2018 · 13 comments
Open

apply_ufunc support for chunks on input_core_dims #1995

crusaderky opened this issue Mar 15, 2018 · 13 comments

Comments

@crusaderky
Copy link
Contributor

I am trying to optimize the following function:

c = (a * b).sum('x', skipna=False)

where a and b are xarray.DataArray's, both with dimension x and both with dask backend.

I successfully obtained a 5.5x speedup with the following:

@numba.guvectorize(['void(float64[:], float64[:], float64[:])'], '(n),(n)->()', nopython=True, cache=True)
def mulsum(a, b, res):
    acc = 0
    for i in range(a.size):
        acc += a[i] * b[i]
    res.flat[0] = acc

c = xarray.apply_ufunc(
    mulsum, a, b,
    input_core_dims=[['x'], ['x']],
    dask='parallelized', output_dtypes=[float])

The problem is that this introduces a (quite problematic, in my case) constraint that a and b can't be chunked on dimension x - which is theoretically avoidable as long as the kernel function doesn't need interaction between x[i] and x[j] (e.g. it can't work for an interpolator, which would require to rely on dask ghosting).

Proposal

Add a parameter to apply_ufunc, reduce_func=None. reduce_func is a function which takes as input two parameters a, b that are the output of func. apply_ufunc will invoke it whenever there's chunking on an input_core_dim.

e.g. my use case above would simply become:

c = xarray.apply_ufunc(
    mulsum, a, b,
    input_core_dims=[['x'], ['x']],
    dask='parallelized', output_dtypes=[float], reduce_func=operator.sum)

So if I have 2 chunks in a and b on dimension x, apply_ufunc will internally do

c1 = mulsum(a1, b1)
c2 = mulsum(a2, b2)
c = operator.sum(c1, c2)

Note that reduce_func will be invoked exclusively in presence of dask='parallelized' and when there's chunking on one or more of the input_core_dims. If reduce_func is left to None, apply_ufunc will keep crashing like it does now.

@shoyer
Copy link
Member

shoyer commented Mar 16, 2018

Have you tried the new xarray.dot()? That might be even faster for this case.

@fujiisoup
Copy link
Member

I think if a and b have common dimensions other than x, even xarray.dot() does not allow chunking along x (because it internally uses apply_ufunc with dask=parallerized).

I think it would be nice if we could have a way to allow chunking along input_core_dims, though I do not yet imagine how it should look like.

@shoyer
Copy link
Member

shoyer commented Mar 16, 2018

For two inputs, don't we use dask.array.tensordot?

@fujiisoup
Copy link
Member

If a.dims=('x', 'y', 'z') and b.dims=('x', 'y', 'w'), then we can't use tensordot, as we need to multiply along dimension y.
Maybe we can use matmul in some limited case, but generally no.

@shoyer
Copy link
Member

shoyer commented Mar 16, 2018

One way to allow chunking across x would be to finish up dask.array.einsum: dask/dask#732

I'm reluctant to add reduce_func to xarray because it isn't clear to me exactly what the underlying abstraction is. It's something like a gufunc, but does a little bit more. Also, ideally we'd like this to be in dask.array, maybe as part of dask.array.apply_gufunc (dask/dask#3109).

For this specific problem, I think you could solve it with xarray.apply_ufunc by writing something like a gufunc that keeps the reduced axis as size 1 to apply to each chunk, and afterwards summing up along that dimension.

@crusaderky
Copy link
Contributor Author

For this specific problem, I think you could solve it with xarray.apply_ufunc by writing something like a gufunc that keeps the reduced axis as size 1 to apply to each chunk, and afterwards summing up along that dimension.

@shoyer could you make an example? That was my first thought but I could not figure out how to make the apply_ufunc do it.

@shoyer
Copy link
Member

shoyer commented Mar 16, 2018

could you make an example? That was my first thought but I could not figure out how to make the apply_ufunc do it.

OK, thinking a little more about it, this would not work with dask='parallelized' which does not allow for chunking over core dimensions. You would have parallelize the function with dask yourself, e.g., with dask.array.map_blocks, but then you could use apply_ufunc with dask='allowed'.

@shoyer
Copy link
Member

shoyer commented Mar 16, 2018

Try:

import dask.array
import numpy as np

def mulsum_chunk(a, b):
  return np.einsum('...i,...i', a, b)[..., np.newaxis]

def mulsum(a, b):
  # needs broadcasting/rechunking for a,b
  mapped = dask.array.map_blocks(mulsum_chunk, a, b, dtype=float,
                                 chunks=a.chunks[:-1] + (tuple(1 for _ in a.chunks[-1]),))
  return dask.array.sum(mapped, axis=-1)

@crusaderky
Copy link
Contributor Author

crusaderky commented Mar 16, 2018

[EDIT] drastically simplified chunking algorithm

@shoyer , close, but your version doesn't work in case of broadcasting.
I think I fixed it although it won't work correctly if only one between a or b has dask backend, and I'm not sure how to fix it:

import xarray
import numpy
import dask.array

coefficients = xarray.DataArray(
    dask.array.random.random((106, 99), chunks=(25, 25)),
    dims=['formula', 'time'])
components = xarray.DataArray(
    dask.array.random.random((106, 512 * 1024), chunks=(25, 65536)),
    dims=['formula', 'scenario'])

def mulsum(a, b, dim):
    return xarray.apply_ufunc(
        _mulsum_xarray_kernel, a, b,
        input_core_dims=[[dim], [dim]],
        dask='allowed', output_dtypes=[float])


def _mulsum_xarray_kernel(a, b):
    if isinstance(a, dask.array.Array) and isinstance(b, dask.array.Array):
        chunks = dask.array.core.broadcast_chunks(a.chunks, b.chunks)
        chunks = chunks[:-1] + (tuple(1 for _ in chunks[-1]), )

        mapped = dask.array.map_blocks(
            _mulsum_dask_kernel, a, b,
            dtype=float, chunks=chunks)
        return dask.array.sum(mapped, axis=-1)
    else:
        return _mulsum_dask_kernel(a, b)


def _mulsum_dask_kernel(a, b):
    a = numpy.ascontiguousarray(a)
    b = numpy.ascontiguousarray(b)
    res = numpy.einsum('...i,...i', a, b, optimize='optimal')
    return res[..., numpy.newaxis]

mulsum(coefficients, components, dim='formula')

Proposal 2

Modify apply_ufunc:

  • remove the check that the input_core_dims must not be chunked
  • add parameter output_chunks

My initial example would become:

def mulsum_kernel(a, b):
    return numpy.einsum('...i,...i', a, b)[..., numpy.newaxis]

c = xarray.apply_ufunc(
    mulsum_kernel, a, b,
    dask='parallelized', 
    input_core_dims=[['x'], ['x']],
    output_dtypes=[float],
    output_core_dims=[['__partial']],
    output_chunks={'__partial': [1 for _ in a.chunks[a.dims.index('x')]}
).sum('__partial')

Although I'm not sure this approach would be univocous when there's more than one core_dim...

@shoyer
Copy link
Member

shoyer commented Mar 16, 2018

Modify apply_ufunc:
remove the check that the input_core_dims must not be chunked
add parameter output_chunks

My main concern is ensuring that someone does not inadvertently apply a function not designed for multiple chunks to dask arrays. For example, suppose the function being applied is np.median.

Some loud flag that makes it very obvious what's going on seems like a good idea, e.g., possibly_chunked_core_dims=['x']?

Then we also need some sort of guarantee that chunked core dimensions aren't entirely removed, or else xarray/dask won't know how to stack them back up. I guess we could check to make sure that at least as many output core dimensions appear as appear in inputs cor edimensions?

@crusaderky
Copy link
Contributor Author

crusaderky commented Apr 24, 2018

@shoyer , you don't really need a parameter possibly_chunked_core_dims=['x']; you are already specifying output_chunks - without which apply_ufunc won't know what to do and crash...

@stale
Copy link

stale bot commented Mar 24, 2020

In order to maintain a list of currently relevant issues, we mark issues as stale after a period of inactivity

If this issue remains relevant, please comment here or remove the stale label; otherwise it will be marked as closed automatically

@TomNicholas
Copy link
Member

Has this not been solved by the argument allow_rechunk?

@crusaderky isn't this effectively what you were trying to achieve?

import xarray as xr

def mulsum(a, b):
    acc = 0
    for i in range(a.size):
        acc += a[i] * b[i]
    return acc

a = xr.DataArray(data=[1, 2, 3], dims=['x']).chunk({"x": 1})

b = xr.DataArray(data=[4, 5, 6], dims=['x']).chunk({"x": 1})

c = xr.apply_ufunc(
    mulsum, a, b,
    input_core_dims=[['x'], ['x']],
    dask='parallelized', output_dtypes=[float],
    dask_gufunc_kwargs={'allow_rechunk': True})

print(c.compute())

returns

<xarray.DataArray ()>
array(32)

I think this has only been possible since the implementation of xarray.apply_ufunc was switched from dask.array.blockwise to dask.array.apply_gufunc in #4060.

If this is actually doing what I think it's doing then we should document this possibility!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants