Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broken pipe error in long-lived write-only workload #315

Closed
eric-czech opened this issue Dec 3, 2020 · 2 comments
Closed

Broken pipe error in long-lived write-only workload #315

eric-czech opened this issue Dec 3, 2020 · 2 comments

Comments

@eric-czech
Copy link

In attempting to run many workloads that write continuously to zarr archives on Google Storage (GS) for many hours (6-12 hrs for each), I noticed that a substantial fraction of them, maybe as high as 20%, are failing with an error like this:

Traceback (most recent call last):
  File "scripts/convert_genetic_data.py", line 312, in <module>
    fire.Fire()
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/fire/core.py", line 138, in Fire
    component_trace = _Fire(component, args, parsed_flag_args, context, name)
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/fire/core.py", line 463, in _Fire
    component, remaining_args = _CallAndUpdateTrace(
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/fire/core.py", line 672, in _CallAndUpdateTrace
    component = fn(*varargs, **kwargs)
  File "scripts/convert_genetic_data.py", line 296, in bgen_to_zarr
    ds = rechunk_dataset(
  File "scripts/convert_genetic_data.py", line 217, in rechunk_dataset
    res = fn(
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/sgkit/io/bgen/bgen_reader.py", line 519, in rechunk_bgen
    rechunked.execute()
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/rechunker/api.py", line 76, in execute
    self._executor.execute_plan(self._plan, **kwargs)
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/rechunker/executors/dask.py", line 24, in execute_plan
    return plan.compute(**kwargs)
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/dask/base.py", line 167, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/dask/base.py", line 452, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/dask/threaded.py", line 76, in get
    results = get_async(
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/dask/local.py", line 486, in get_async
    raise_exception(exc, tb)
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/dask/local.py", line 316, in reraise
    raise exc
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/dask/local.py", line 222, in execute_task
    result = _execute_task(task, data)
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/dask/array/core.py", line 3724, in store_chunk
    return load_store_chunk(x, out, index, lock, return_stored, False)
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/dask/array/core.py", line 3713, in load_store_chunk
    out[index] = np.asanyarray(x)
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/zarr/core.py", line 1115, in __setitem__
    self.set_basic_selection(selection, value, fields=fields)
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/zarr/core.py", line 1210, in set_basic_selection
    return self._set_basic_selection_nd(selection, value, fields=fields)
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/zarr/core.py", line 1501, in _set_basic_selection_nd
    self._set_selection(indexer, value, fields=fields)
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/zarr/core.py", line 1550, in _set_selection
    self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields)
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/zarr/core.py", line 1664, in _chunk_setitem
    self._chunk_setitem_nosync(chunk_coords, chunk_selection, value,
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/zarr/core.py", line 1729, in _chunk_setitem_nosync
    self.chunk_store[ckey] = cdata
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/fsspec/mapping.py", line 154, in __setitem__
    self.fs.pipe_file(key, value)
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/fsspec/asyn.py", line 121, in wrapper
    return maybe_sync(func, self, *args, **kwargs)
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/fsspec/asyn.py", line 100, in maybe_sync
    return sync(loop, func, *args, **kwargs)
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/fsspec/asyn.py", line 71, in sync
    raise exc.with_traceback(tb)
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/fsspec/asyn.py", line 55, in f
    result[0] = await future
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/gcsfs/core.py", line 1007, in _pipe_file
    return await simple_upload(
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/gcsfs/core.py", line 1523, in simple_upload
    j = await fs._call(
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/gcsfs/core.py", line 487, in _call
    async with self.session.request(
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/aiohttp/client.py", line 1117, in __aenter__
    self._resp = await self._coro
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/aiohttp/client.py", line 544, in _request
    await resp.start(conn)
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/aiohttp/client_reqrep.py", line 890, in start
    message, payload = await self._protocol.read()  # type: ignore
  File "/workdir/.snakemake/conda/3c6331d1/lib/python3.8/site-packages/aiohttp/streams.py", line 604, in read
    await self._waiter
aiohttp.client_exceptions.ClientOSError: [Errno 32] Broken pipe

I could provide some more details on what I'm doing, but I'm not sure how to make this easily reproducible anyways. I would assume this is a common experience for anybody trying to write to GS using dask and zarr via gcsfs/fsspec for a long period of time.

Has this already been reported? Are the aiohttp utilities in use here common enough to other file system implementations that this is a bug better logged against fsspec instead? Or perhaps Zarr? I assume the underlying issue is unavoidable but that some library in this call stack ought to be more resilient to it.

@anish749
Copy link

I have been facing this error as well when directly using gcsfs and writing files very slowly. The size of the files aren't big for my case (around 200~700 MB), but we were writing a few bytes every few second to the GCSFS api (which buffers internally), this also led to having the connection open for a long time, and we noticed the same failures after 10 hrs or so.

I haven't been able to fix this specific issue, but resorted to using the google-storage-client and writing smaller files periodically. That seemed to survive longer, but had a different set of connection errors.

@martindurant
Copy link
Member

aio-libs/aiohttp#6912 ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants