Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Enable deterministic row-group decomposition in Dataset.to_parquet #1340

Open
rjzamora opened this issue Dec 21, 2021 · 0 comments
Open
Labels
dataloader enhancement New feature or request IO

Comments

@rjzamora
Copy link
Collaborator

Is your feature request related to a problem? Please describe.

Using NVTabular (alone), it is not currently possible to write out a parquet dataset with a deterministic row-count in each output file. Such determinism may be desired by users who wish to perform distributed multi-gpu training for many epochs (e.g. with horovod).

Describe the solution you'd like

I feel that the determinism issue can be solved by introducing the following features within NVTabular:

  1. The user must be able to pass through the following kwargs to the underlying write engine in Dataset.to_parquet: row_group_size_rows (cudf-22.02 and later) and row_group_size (pyarrow). For now, NVTabular always uses the default row-group size when it writes a DataFrame partition to disk. This is problematic for users who want their dataset to be perfectly divided between n processes at training time. For a "perfect" data balance to be possible, (almost) all row-groups should comprise a deterministic row count.
  2. Dataset must support a custom method (perhaps rebalance_rowcount) to both repartition and redistribute data between partitions such that all partitions comprise the same number of rows (with the exception of some "residual" partition, perhaps). Without this feature, (1) is not useful yet, because the number of rows within the partition is not guaranteed to be divisible by the chosen/required row_group_size_rows value.
  3. On the "read" side of Dataset, we need a parameter (perhaps partition_factor) to specify a specific integer that the total number of partitions must be an exact multiple of. Therefore, if the user specifies partition_factor=8 and part_mem_fraction=0.1, then NVTabular should find the largest partition size (<=10% of the total device memory) where the total number of partitions is divisible by 8.

In the case that the size of the global dataset is perfectly divisible by some expected number of horovod workers (say 8), the three features listed above should make it possible for the user to produce a "perfectly" balanced dataset from which efficient distributed training can be performed.

This leaves us with the obvious problem that we cannot assume the dataset will always be divisible by the desired number of distributed workers. This means we will need to decide what to do in feature (2) and (3) when there is a residual row- and/or row-group count. Perhaps it is sufficient if (2) can be guaranteed to produce, at most, a single "misfit" partition (of minimum size), and (3) allows this final misfit partition to be treated specially (and optionally ignored)?

Notes: Why we want deterministic and balanced row-groups/partitions:

  • Can avoid "spilling" partial batches between partitions during data-loading
  • Can minimize work imbalance (always bad for parallel performance!)
  • Can avoid the need for users to explicitly partition their data into a distinct file for each worker
@rjzamora rjzamora added enhancement New feature or request IO dataloader labels Dec 21, 2021
@rjzamora rjzamora changed the title [FEA] Enable deterministinc row-group decomposition in Dataset.to_parquet [FEA] Enable deterministic row-group decomposition in Dataset.to_parquet Jan 6, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dataloader enhancement New feature or request IO
Projects
None yet
Development

No branches or pull requests

1 participant