-
-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow passing kwds to ProcessPool #252
Conversation
I hate to admit it... but there's a PR (#198) that's been open for a while on |
it would be cool to merge both PRs, but agree that it would be cool to test (for both PRs) that behaviour changes when some kwarg is passed. apart from tests, is there anything else to do before merging? |
4a8782a
to
b6bdb3c
Compare
added test |
@mmckerns anything from my side still? |
This is good... it's on my shortlist to test and review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of these changes are fine. However, the code in _serve
needs work. Essentially, a Pool
instance gets cached in __STATE
, and will be reused unless you make a change to the Pool
configuration. So, if you call Pool(4)
then you call Pool(4, maxtasksperchild=2)
... then as is, your code won't spawn an new pool with maxtasksperchild=2
(because of line 117). Basically, if the nodes
or any kwds
change, you need to make sure it instantiates a new Pool
.
You should also make the same changes for ThreadPool
.
pathos/multiprocessing.py
Outdated
return | ||
if AbstractWorkerPool.__init__.__doc__: __init__.__doc__ = AbstractWorkerPool.__init__.__doc__ + __init__.__doc__ | ||
#def __exit__(self, *args): | ||
# self._clear() | ||
# return | ||
def _serve(self, nodes=None): #XXX: should be STATE method; use id | ||
def _serve(self, nodes=None, **kwds): #XXX: should be STATE method; use id | ||
"""Create a new server if one isn't already initialized""" | ||
if nodes is None: nodes = self.__nodes | ||
_pool = __STATE.get(self._id, None) | ||
if not _pool or nodes != _pool.__nodes: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs to also check _pool._maxtasksperchild
, _pool._initargs
, _pool._initializer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if not _pool or nodes != _pool.__nodes: | |
if ( | |
_pool is None | |
or nodes != _pool.__nodes | |
or kwds.get('maxtasksperchild') != _pool._maxtasksperchild | |
or kwds.get('initargs') != _pool._initargs | |
or kwds.get('initializer') != _pool._initializer | |
): |
like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's missing the leading underscores in the latter two.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 edited the comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you'll also need: (1) corresponding changes to _clear
, and (2) when _serve
is called by one of the map
functions and kwds={}
, then it does the expected thing by pulling the kwds from the existing pool
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added 5bdd7af which should take care of it: only add a new pool to state if kwds changed, and only clear a pool from state if kwds match. The (2) part I didn't fully get: pool kwds are e.g. initializer
, whereas map kwds are e.g. chunksize
. I think it should be OK with this last commit, as the is no overlap between those two kinds of kwds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct, there is no overlap between map and pool kwds.
@@ -27,6 +28,15 @@ def test_mp(): | |||
result = result_queue.get() | |||
assert result == _result | |||
|
|||
# test ProcessPool keyword argument propagation | |||
pool.clear() | |||
pool = ProcessPool(nodes=4, initializer=lambda: time.sleep(0.6)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pool = ProcessPool(nodes=4, initializer=lambda: time.sleep(0.6)) | |
pool = ProcessPool(nodes=4, initializer=lambda: time.sleep(0.6)) | |
assert pool._pool.initializer, 'Subsequent pool with different kwds should propagate' |
right? if not propagated, default initializer should be falsely?
wondering why this test actually takes 0.6+ seconds, doesn't that mean propagation is working? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default initializer is None, I believe. You can check the defaults with:
>>> from pathos.pools import _ProcessPool as Pool
>>> p = Pool()
>>> p._initializer
and so on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a state bleed somewhere? because by your theory, my test should be failing. as the test does exactly what you described: first a plain pool, then a pool with initializer. by your theory, that initializer should be ignored and old pool reused. but by the test, the map now takes much longer. so apparently the initializer is propagated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you've made all the necessary edits. See comments above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, I'll still make the changes, but I still would like to understand why the test is showing the expected results. L13 populates the state, and L33 calls ProcessPool() again but with sleep initializer. And then time.monotonic
shows that the call to map now indeed takes longer due to the sleep. So why does it currently work, if you say it should not work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was probably clearing the cached pool because _serve
was being called with empty kwds
in the map
call (and the proper state handling wasn't done correctly).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Needs documentation, but I'll add that.
super! thanks for the review 👍 |
I also handled if someone passes |
Hi 👋
I would like to propagate
maxtasksperchild
keyword, but for this I had to switch frompathos
tomultiprocess
ref ddelange/mapply#29.This however decreases stability/cleanup of workers, so rather allow a pathos user to propagate it :)