Skip to content

Commit

Permalink
Minor: Add more documentation about Partitioning (#8022)
Browse files Browse the repository at this point in the history
* Minor: Add more documentation about Partitioning

* fix typo

* Apply suggestions from code review

Co-authored-by: comphead <[email protected]>

* Add more diagrams, improve text

* undo unintended changes

* undo unintended changes

* fix links

* Try and clarify

---------

Co-authored-by: comphead <[email protected]>
  • Loading branch information
alamb and comphead authored Nov 5, 2023
1 parent e505cdd commit b54990d
Showing 1 changed file with 86 additions and 4 deletions.
90 changes: 86 additions & 4 deletions datafusion/physical-expr/src/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,95 @@
// specific language governing permissions and limitations
// under the License.

//! [`Partitioning`] and [`Distribution`] for physical expressions
//! [`Partitioning`] and [`Distribution`] for `ExecutionPlans`
use std::fmt;
use std::sync::Arc;

use crate::{physical_exprs_equal, EquivalenceProperties, PhysicalExpr};

/// Partitioning schemes supported by operators.
/// Output partitioning supported by [`ExecutionPlan`]s.
///
/// When `executed`, `ExecutionPlan`s produce one or more independent stream of
/// data batches in parallel, referred to as partitions. The streams are Rust
/// `aync` [`Stream`]s (a special kind of future). The number of output
/// partitions varies based on the input and the operation performed.
///
/// For example, an `ExecutionPlan` that has output partitioning of 3 will
/// produce 3 distinct output streams as the result of calling
/// `ExecutionPlan::execute(0)`, `ExecutionPlan::execute(1)`, and
/// `ExecutionPlan::execute(2)`, as shown below:
///
/// ```text
/// ... ... ...
/// ... ▲ ▲ ▲
/// │ │ │
/// ▲ │ │ │
/// │ │ │ │
/// │ ┌───┴────┐ ┌───┴────┐ ┌───┴────┐
/// ┌────────────────────┐ │ Stream │ │ Stream │ │ Stream │
/// │ ExecutionPlan │ │ (0) │ │ (1) │ │ (2) │
/// └────────────────────┘ └────────┘ └────────┘ └────────┘
/// ▲ ▲ ▲ ▲
/// │ │ │ │
/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ │
/// Input │ │ │ │
/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ │
/// ▲ ┌ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ┌ ─ ─ ─ ─
/// │ Input │ Input │ Input │
/// │ │ Stream │ Stream │ Stream
/// (0) │ (1) │ (2) │
/// ... └ ─ ▲ ─ ─ └ ─ ▲ ─ ─ └ ─ ▲ ─ ─
/// │ │ │
/// │ │ │
/// │ │ │
///
/// ExecutionPlan with 1 input 3 (async) streams, one for each
/// that has 3 partitions, which itself output partition
/// has 3 output partitions
/// ```
///
/// It is common (but not required) that an `ExecutionPlan` has the same number
/// of input partitions as output partitons. However, some plans have different
/// numbers such as the `RepartitionExec` that redistributes batches from some
/// number of inputs to some number of outputs
///
/// ```text
/// ... ... ... ...
///
/// ▲ ▲ ▲
/// ▲ │ │ │
/// │ │ │ │
/// ┌────────┴───────────┐ │ │ │
/// │ RepartitionExec │ ┌────┴───┐ ┌────┴───┐ ┌────┴───┐
/// └────────────────────┘ │ Stream │ │ Stream │ │ Stream │
/// ▲ │ (0) │ │ (1) │ │ (2) │
/// │ └────────┘ └────────┘ └────────┘
/// │ ▲ ▲ ▲
/// ... │ │ │
/// └──────────┐│┌──────────┘
/// │││
/// │││
/// RepartitionExec with one input
/// that has 3 partitions, but 3 (async) streams, that internally
/// itself has only 1 output partition pull from the same input stream
/// ...
/// ```
///
/// # Additional Examples
///
/// A simple `FileScanExec` might produce one output stream (partition) for each
/// file (note the actual DataFusion file scaners can read individual files in
/// parallel, potentially producing multiple partitions per file)
///
/// Plans such as `SortPreservingMerge` produce a single output stream
/// (1 output partition) by combining some number of input streams (input partitions)
///
/// Plans such as `FilterExec` produce the same number of output streams
/// (partitions) as input streams (partitions).
///
/// [`ExecutionPlan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html
/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
#[derive(Debug, Clone)]
pub enum Partitioning {
/// Allocate batches using a round-robin algorithm and the specified number of partitions
Expand Down Expand Up @@ -126,7 +207,8 @@ impl PartialEq for Partitioning {
}
}

/// Distribution schemes
/// How data is distributed amongst partitions. See [`Partitioning`] for more
/// details.
#[derive(Debug, Clone)]
pub enum Distribution {
/// Unspecified distribution
Expand All @@ -139,7 +221,7 @@ pub enum Distribution {
}

impl Distribution {
/// Creates a Partitioning for this Distribution to satisfy itself
/// Creates a `Partitioning` that satisfies this `Distribution`
pub fn create_partitioning(&self, partition_count: usize) -> Partitioning {
match self {
Distribution::UnspecifiedDistribution => {
Expand Down

0 comments on commit b54990d

Please sign in to comment.