Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix incorrect output from prints originating from different processes #604

Merged
merged 4 commits into from
Sep 19, 2024

Conversation

basnijholt
Copy link
Contributor

@basnijholt basnijholt commented May 31, 2024

In the PipeFunc documentation I have the following problem when executing code with a ProcessPoolExecutor:

image

With this fix it becomes:
image

The root of the issue is that nbconvert --execute produces this output:

  {
   "cell_type": "code",
   "execution_count": 47,
   "id": "92",
   "metadata": {
    "execution": {
     "iopub.execute_input": "2024-05-31T05:42:39.297713Z",
     "iopub.status.busy": "2024-05-31T05:42:39.297474Z",
     "iopub.status.idle": "2024-05-31T05:42:40.477462Z",
     "shell.execute_reply": "2024-05-31T05:42:40.475729Z"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.410279 - Running double_it for x=3"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.408318 - Running double_it for x=0"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.410888 - Running double_it for x=1"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.416024 - Running double_it for x=2"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.431485 - Running half_it for x=0"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.434285 - Running half_it for x=1"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.433559 - Running half_it for x=2"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.439223 - Running half_it for x=3"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:40.459668 - Running take_sum"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "14\n"
     ]
    }
   ],
   "source": [
    "from concurrent.futures import ProcessPoolExecutor\n",
    "import datetime\n",
    "import numpy as np\n",
    "import time\n",
    "from pipefunc import Pipeline, pipefunc\n",
    "\n",
    "\n",
    "@pipefunc(output_name=\"double\", mapspec=\"x[i] -> double[i]\")\n",
    "def double_it(x: int) -> int:\n",
    "    print(f\"{datetime.datetime.now()} - Running double_it for x={x}\")\n",
    "    time.sleep(1)\n",
    "    return 2 * x\n",
    "\n",
    "\n",
    "@pipefunc(output_name=\"half\", mapspec=\"x[i] -> half[i]\")\n",
    "def half_it(x: int) -> int:\n",
    "    print(f\"{datetime.datetime.now()} - Running half_it for x={x}\")\n",
    "    time.sleep(1)\n",
    "    return x // 2\n",
    "\n",
    "\n",
    "@pipefunc(output_name=\"sum\")\n",
    "def take_sum(half: np.ndarray, double: np.ndarray) -> int:\n",
    "    print(f\"{datetime.datetime.now()} - Running take_sum\")\n",
    "    return sum(half + double)\n",
    "\n",
    "\n",
    "pipeline = Pipeline([double_it, half_it, take_sum])\n",
    "inputs = {\"x\": [0, 1, 2, 3]}\n",
    "run_folder = \"my_run_folder\"\n",
    "executor = ProcessPoolExecutor(max_workers=8)  # use 8 processes\n",
    "results = pipeline.map(\n",
    "    inputs,\n",
    "    run_folder=run_folder,\n",
    "    parallel=True,\n",
    "    executor=executor,\n",
    "    storage=\"shared_memory_dict\",\n",
    ")\n",
    "print(results[\"sum\"].output)"
   ]
  },

In the PipeFunc documentation I got:
```
```
  {
   "cell_type": "code",
   "execution_count": 47,
   "id": "92",
   "metadata": {
    "execution": {
     "iopub.execute_input": "2024-05-31T05:42:39.297713Z",
     "iopub.status.busy": "2024-05-31T05:42:39.297474Z",
     "iopub.status.idle": "2024-05-31T05:42:40.477462Z",
     "shell.execute_reply": "2024-05-31T05:42:40.475729Z"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.410279 - Running double_it for x=3"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.408318 - Running double_it for x=0"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.410888 - Running double_it for x=1"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.416024 - Running double_it for x=2"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.431485 - Running half_it for x=0"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.434285 - Running half_it for x=1"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.433559 - Running half_it for x=2"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:39.439223 - Running half_it for x=3"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "2024-05-30 22:42:40.459668 - Running take_sum"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "14\n"
     ]
    }
   ],
   "source": [
    "from concurrent.futures import ProcessPoolExecutor\n",
    "import datetime\n",
    "import numpy as np\n",
    "import time\n",
    "from pipefunc import Pipeline, pipefunc\n",
    "\n",
    "\n",
    "@pipefunc(output_name=\"double\", mapspec=\"x[i] -> double[i]\")\n",
    "def double_it(x: int) -> int:\n",
    "    print(f\"{datetime.datetime.now()} - Running double_it for x={x}\")\n",
    "    time.sleep(1)\n",
    "    return 2 * x\n",
    "\n",
    "\n",
    "@pipefunc(output_name=\"half\", mapspec=\"x[i] -> half[i]\")\n",
    "def half_it(x: int) -> int:\n",
    "    print(f\"{datetime.datetime.now()} - Running half_it for x={x}\")\n",
    "    time.sleep(1)\n",
    "    return x // 2\n",
    "\n",
    "\n",
    "@pipefunc(output_name=\"sum\")\n",
    "def take_sum(half: np.ndarray, double: np.ndarray) -> int:\n",
    "    print(f\"{datetime.datetime.now()} - Running take_sum\")\n",
    "    return sum(half + double)\n",
    "\n",
    "\n",
    "pipeline = Pipeline([double_it, half_it, take_sum])\n",
    "inputs = {\"x\": [0, 1, 2, 3]}\n",
    "run_folder = \"my_run_folder\"\n",
    "executor = ProcessPoolExecutor(max_workers=8)  # use 8 processes\n",
    "results = pipeline.map(\n",
    "    inputs,\n",
    "    run_folder=run_folder,\n",
    "    parallel=True,\n",
    "    executor=executor,\n",
    "    storage=\"shared_memory_dict\",\n",
    ")\n",
    "print(results[\"sum\"].output)"
   ]
  },
```
Copy link

welcome bot commented May 31, 2024

Thanks for submitting your first pull request! You are awesome! 🤗

If you haven't done so already, check out EBP's Code of Conduct and our Contributing Guide, as this will greatly help the review process.

Welcome to the EBP community! 🎉

@basnijholt
Copy link
Contributor Author

@agoose77 the failing CI check is unrelated to these changes

@basnijholt
Copy link
Contributor Author

@agoose77, friendly ping. Could you take a look at this?

@bsipocz
Copy link
Collaborator

bsipocz commented Sep 19, 2024

I have a very similar problem in one of my repos, but this fix doesn't seem to work, I still get the fragmented output in the rendered HTML.

@basnijholt
Copy link
Contributor Author

@bsipocz, did you set nb_merge_streams = True?

@bsipocz
Copy link
Collaborator

bsipocz commented Sep 19, 2024

@bsipocz, did you set nb_merge_streams = True?

Yeap, I didn't have that, but discovered the option by following the link to your pipefunc/pipefunc#125 PR. So thank you.

So now my issue is fixed even without this PR.

While this PR does pass all tests, and doesn't seem to break anything, I suppose it would be nice to add your failing case to the tests, too.

else:
new_outputs.append(output)
output["text"] = output["text"].strip() + "\n"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line necessary?

Copy link
Contributor Author

@basnijholt basnijholt Sep 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, perhaps it would be clearer to move it one line up. We're mutating the dict we just added to new_outputs.

edit: done

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I was just nitpicking that you stripe off whitespace and add a newline; practically the same, so is this really needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh this is in case there are multiple newlines at the end which can happen after merging the cells.

See the example in my first post.

I am hard at work on trying to write a test. Turns out that on MacOS the issue does not exist ... took me a good while to realize that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bsipocz, just added a test!

See my comment here #604 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@basnijholt @bsipocz just a hint, you should be using rstrip not strip, because now you have removed any possible indentation at the start of the streams 🤷

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll open a follow-up to fix that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I just opened #640

bsipocz
bsipocz previously approved these changes Sep 19, 2024
Copy link
Collaborator

@bsipocz bsipocz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to break anything, and thus I would go ahead and merge it, however it would be nicer to have a test covering it, too.

@basnijholt
Copy link
Contributor Author

I added a test. Without the changes here the test will fail:

________________________________________________ test_merge_streams_parallel ________________________________________________

sphinx_run = <conftest.SphinxFixture object at 0x7e810491f9b0>
file_regression = <conftest.FileRegression object at 0x7e80b65fe3f0>

    @pytest.mark.sphinx_params(
        "merge_streams_parallel.ipynb",
        conf={"nb_execution_mode": "off", "nb_merge_streams": True},
    )
    def test_merge_streams_parallel(sphinx_run, file_regression):
        """Test configuring multiple concurrent stdout/stderr outputs to be merged."""
        sphinx_run.build()
        assert sphinx_run.warnings() == ""
        doctree = sphinx_run.get_resolved_doctree("merge_streams_parallel")
>       file_regression.check(doctree.pformat(), extension=".xml", encoding="utf-8")

/home/bas.nijholt/repos/MyST-NB/tests/test_render_outputs.py:116:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <conftest.FileRegression object at 0x7e80b65fe3f0>
data = '<document source="merge_streams_parallel" translation_progress="{\'total\': 0, \'translated\': 0}">\n    <container c...        \n                \n                \n                \n                \n                \n                \n'
kwargs = {'encoding': 'utf-8', 'extension': '.xml'}

    def check(self, data, **kwargs):
>       return self.file_regression.check(self._strip_ignores(data), **kwargs)
E       AssertionError: FILES DIFFER:
E       /home/bas.nijholt/.tmp/pytest-of-bas.nijholt/pytest-49/test_merge_streams_parallel0/test_render_outputs/test_merge_streams_parallel.xml
E       /home/bas.nijholt/.tmp/pytest-of-bas.nijholt/pytest-49/test_merge_streams_parallel0/test_render_outputs/test_merge_streams_parallel.obtained.xml
E       HTML DIFF: /home/bas.nijholt/.tmp/pytest-of-bas.nijholt/pytest-49/test_merge_streams_parallel0/test_render_outputs/test_merge_streams_parallel.obtained.diff.html
E       ---
E       +++
E       @@ -9,13 +9,13 @@
E                                pass
E                <container classes="cell_output" nb_element="cell_code_output">
E                    <literal_block classes="output stream" language="myst-ansi" linenos="False" xml:space="preserve">
E       +                000000000
E                        0
E       -                0
E       -                0
E       -                0
E       -                0
E       -                0
E       -                0
E       -                0
E       -                0
E       -                0
E       +
E       +
E       +
E       +
E       +
E       +
E       +
E       +

The notebook is executed via jupyter nbconvert --execute merge_streams_parallel.ipynb --to ipynb and I committed the executed ipynb.

@bsipocz bsipocz added the bug Something isn't working label Sep 19, 2024
Copy link
Collaborator

@bsipocz bsipocz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @basnijholt!

@bsipocz bsipocz merged commit 25fa81d into executablebooks:master Sep 19, 2024
13 checks passed
basnijholt added a commit to pipefunc/pipefunc that referenced this pull request Sep 27, 2024
basnijholt added a commit to pipefunc/pipefunc that referenced this pull request Sep 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants