-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add workerPoolConcurrency as a configuration option #5636
Add workerPoolConcurrency as a configuration option #5636
Conversation
Will figure out the process to merge to v3.x before opening a new PR. |
I realized that this is the correct way. v3.x are cherry-picked as needed. Reopening this and making the builds pass. |
@merceyz I may created some confusion by closing and re-opening this PR. After conversing on the discord channel, I realized that merging with |
@@ -39,11 +40,11 @@ export interface ExtractBufferOptions { | |||
|
|||
let workerPool: WorkerPool<ConvertToZipPayload, PortablePath> | null; | |||
|
|||
export async function convertToZip(tgz: Buffer, opts: ExtractBufferOptions) { | |||
export async function convertToZip(tgz: Buffer, limit: Limit, opts: ExtractBufferOptions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move limit
inside the opts
object? And perhaps renaming it poolSize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved under opts
and renamed it to poolSize
.
const tmpFolder = await xfs.mktempPromise(); | ||
const tmpFile = ppath.join(tmpFolder, `archive.zip`); | ||
|
||
workerPool ||= new WorkerPool(getZipWorkerSource()); | ||
workerPool ||= new WorkerPool(getZipWorkerSource(), limit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the previous point, I'd prefer if we used an option bag rather than a free parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added an optional option bag to WorkerPool
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
workerPoolConcurrency
That seems a bit too implementation detail specific, perhaps it should be called availableParallelismOverride
to match what you're overriding.
For example in yarn workspaces foreach
we also use that function and that isn't done by a worker pool.
Though as I mentioned in #5635 (comment), I'd prefer if this was fixed upstream in Node.js, then everything using that function would Just Work™.
.yarn/versions/85334687.yml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In it's current state this PR is a major
change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed - although once moved as optional parameters in the option bags it'll be fine as a minor (only the modified packages + cli can be marked as minor, the rest can be declined).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the version file to bump only the changed plugins as well as @yarnpkg/core
and @yarnpkg/cli
to minor
.
Not sure I agree with that - the function name itself is an implementation detail, so I don't think it's required to use the name Perhaps another name like
I don't know how easy that'd be - from a quick look there's been resistance to that, and I'm not expert enough to know if it should be reconsidered or not. In any case, having this setting feels reasonable all in all - various other tools provide configuration settings for this kind of thing (like Jest's |
…/worker-pool-concurrency
Ah, tests are failing! let me fix them. |
Fixed! It was because of sending |
@@ -10,7 +11,7 @@ describe(`tgzUtils`, () => { | |||
npath.join(__dirname, `fixtures/carbon-icons-svelte-10.21.0.tgz`), | |||
), | |||
); | |||
await expect(tgzUtils.convertToZip(data, {compressionLevel: 0})).resolves.toBeTruthy(); | |||
await expect(tgzUtils.convertToZip(data, {compressionLevel: 0, poolSize: pLimit(2)})).resolves.toBeTruthy(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to use an explicit pool size here rather than the default?
f3f29bc
to
4f50a91
Compare
4f50a91
to
f89fcc9
Compare
**What's the problem this PR addresses?** This PR extends @anuragkalia's work in #5636 (it starts from the same commits and builds upon them). - The original implementation was keeping a singleton worker pool - I felt this was problematic since we may enter situations (even if only in tests) where multiple calls to `convertToZip` would want to use different pools. - Converting an archive without using a worker was only possible when the concurrency was completely disabled. This was my idea, but retrospectively it felt better to me to have two settings: one for the concurrency, and another to define whether the concurrency is enabled or not. - Passing all those settings from the various fetchers would have been unwieldly. **How did you fix it?** - I created a `TaskPool` interface; `WorkerPool` implements it (using workers, same implementation as before), but so does `AsyncPool` (a new implementation that simply forwards the call to the `pLimit` limiter). - I feel like this also addresses @merceyz's comment regarding the setting name - it's now called `taskPoolConcurrency`, which doesn't directly refers to workers. - A `getConfigurationWorker` function returns the proper task pool for the given configuration object. To make that possible, WeakMap instances can now be used as storage argument for the `get*WithDefault` family of functions. **Checklist** <!--- Don't worry if you miss something, chores are automatically tested. --> <!--- This checklist exists to help you remember doing the chores when you submit a PR. --> <!--- Put an `x` in all the boxes that apply. --> - [x] I have read the [Contributing Guide](https://yarnpkg.com/advanced/contributing). <!-- See https://yarnpkg.com/advanced/contributing#preparing-your-pr-to-be-released for more details. --> <!-- Check with `yarn version check` and fix with `yarn version check -i` --> - [x] I have set the packages that need to be released for my changes to be effective. <!-- The "Testing chores" workflow validates that your PR follows our guidelines. --> <!-- If it doesn't pass, click on it to see details as to what your PR might be missing. --> - [x] I will check that all automated PR checks pass before the PR gets reviewed. --------- Co-authored-by: Anurag Kalia <[email protected]>
Closing since it got superseded by #5764 ! Thanks @anuragkalia 🙏 |
What's the problem this PR addresses?
This PR introduces a configuration option to set a limit to the concurrency in the worker pool. Right now, it reads from
require('os').cpus()
which can be overstated in certain CI system which are running many pods on the same machine. This, in turn, can mean the container running can crash. More generally, it is a good idea to let the user decide the CPU concurrency they want to employ.Closes #5635
How did you fix it?
I added this option as a feature ala
networkConcurrency
. And sent it as a constructor arg tonew WorkerPool()
. The default value remainsnodeUtils/availableParallelism()
so that this is opt-in and the current behaviour remains in tact in future. The configuration also has the advantage of not exceeding the global number of workers if another part of the codebase needs worker threads for themselves.Checklist