From b54990d7aaab38163d9666ffa4304f9e3060fd2d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 5 Nov 2023 07:01:31 -0500 Subject: [PATCH] Minor: Add more documentation about Partitioning (#8022) * Minor: Add more documentation about Partitioning * fix typo * Apply suggestions from code review Co-authored-by: comphead * Add more diagrams, improve text * undo unintended changes * undo unintended changes * fix links * Try and clarify --------- Co-authored-by: comphead --- datafusion/physical-expr/src/partitioning.rs | 90 +++++++++++++++++++- 1 file changed, 86 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/partitioning.rs b/datafusion/physical-expr/src/partitioning.rs index 6a8fca4a1543..cbacb7a8a906 100644 --- a/datafusion/physical-expr/src/partitioning.rs +++ b/datafusion/physical-expr/src/partitioning.rs @@ -15,14 +15,95 @@ // specific language governing permissions and limitations // under the License. -//! [`Partitioning`] and [`Distribution`] for physical expressions +//! [`Partitioning`] and [`Distribution`] for `ExecutionPlans` use std::fmt; use std::sync::Arc; use crate::{physical_exprs_equal, EquivalenceProperties, PhysicalExpr}; -/// Partitioning schemes supported by operators. +/// Output partitioning supported by [`ExecutionPlan`]s. +/// +/// When `executed`, `ExecutionPlan`s produce one or more independent stream of +/// data batches in parallel, referred to as partitions. The streams are Rust +/// `aync` [`Stream`]s (a special kind of future). The number of output +/// partitions varies based on the input and the operation performed. +/// +/// For example, an `ExecutionPlan` that has output partitioning of 3 will +/// produce 3 distinct output streams as the result of calling +/// `ExecutionPlan::execute(0)`, `ExecutionPlan::execute(1)`, and +/// `ExecutionPlan::execute(2)`, as shown below: +/// +/// ```text +/// ... ... ... +/// ... ▲ ▲ ▲ +/// │ │ │ +/// ▲ │ │ │ +/// │ │ │ │ +/// │ ┌───┴────┐ ┌───┴────┐ ┌───┴────┐ +/// ┌────────────────────┐ │ Stream │ │ Stream │ │ Stream │ +/// │ ExecutionPlan │ │ (0) │ │ (1) │ │ (2) │ +/// └────────────────────┘ └────────┘ └────────┘ └────────┘ +/// ▲ ▲ ▲ ▲ +/// │ │ │ │ +/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ │ +/// Input │ │ │ │ +/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ │ +/// ▲ ┌ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ┌ ─ ─ ─ ─ +/// │ Input │ Input │ Input │ +/// │ │ Stream │ Stream │ Stream +/// (0) │ (1) │ (2) │ +/// ... └ ─ ▲ ─ ─ └ ─ ▲ ─ ─ └ ─ ▲ ─ ─ +/// │ │ │ +/// │ │ │ +/// │ │ │ +/// +/// ExecutionPlan with 1 input 3 (async) streams, one for each +/// that has 3 partitions, which itself output partition +/// has 3 output partitions +/// ``` +/// +/// It is common (but not required) that an `ExecutionPlan` has the same number +/// of input partitions as output partitons. However, some plans have different +/// numbers such as the `RepartitionExec` that redistributes batches from some +/// number of inputs to some number of outputs +/// +/// ```text +/// ... ... ... ... +/// +/// ▲ ▲ ▲ +/// ▲ │ │ │ +/// │ │ │ │ +/// ┌────────┴───────────┐ │ │ │ +/// │ RepartitionExec │ ┌────┴───┐ ┌────┴───┐ ┌────┴───┐ +/// └────────────────────┘ │ Stream │ │ Stream │ │ Stream │ +/// ▲ │ (0) │ │ (1) │ │ (2) │ +/// │ └────────┘ └────────┘ └────────┘ +/// │ ▲ ▲ ▲ +/// ... │ │ │ +/// └──────────┐│┌──────────┘ +/// │││ +/// │││ +/// RepartitionExec with one input +/// that has 3 partitions, but 3 (async) streams, that internally +/// itself has only 1 output partition pull from the same input stream +/// ... +/// ``` +/// +/// # Additional Examples +/// +/// A simple `FileScanExec` might produce one output stream (partition) for each +/// file (note the actual DataFusion file scaners can read individual files in +/// parallel, potentially producing multiple partitions per file) +/// +/// Plans such as `SortPreservingMerge` produce a single output stream +/// (1 output partition) by combining some number of input streams (input partitions) +/// +/// Plans such as `FilterExec` produce the same number of output streams +/// (partitions) as input streams (partitions). +/// +/// [`ExecutionPlan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html +/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html #[derive(Debug, Clone)] pub enum Partitioning { /// Allocate batches using a round-robin algorithm and the specified number of partitions @@ -126,7 +207,8 @@ impl PartialEq for Partitioning { } } -/// Distribution schemes +/// How data is distributed amongst partitions. See [`Partitioning`] for more +/// details. #[derive(Debug, Clone)] pub enum Distribution { /// Unspecified distribution @@ -139,7 +221,7 @@ pub enum Distribution { } impl Distribution { - /// Creates a Partitioning for this Distribution to satisfy itself + /// Creates a `Partitioning` that satisfies this `Distribution` pub fn create_partitioning(&self, partition_count: usize) -> Partitioning { match self { Distribution::UnspecifiedDistribution => {