Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: improve multiset_delta() docs #830

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions hydroflow/tests/surface_multiset_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use multiplatform_test::multiplatform_test;

#[multiplatform_test]
pub fn test_multiset_delta() {
let (input_send, input_recv) = hydroflow::util::unbounded_channel::<u32>();
let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::<u32>();
let (input_send, input_recv) = hydroflow::util::unbounded_channel::<char>();
let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::<char>();

let mut flow = hydroflow_syntax! {
source_stream(input_recv)
Expand All @@ -14,19 +14,31 @@ pub fn test_multiset_delta() {
};
assert_graphvis_snapshots!(flow);

input_send.send(3).unwrap();
input_send.send(4).unwrap();
input_send.send(3).unwrap();
input_send.send('a').unwrap();
input_send.send('b').unwrap();
input_send.send('a').unwrap();
flow.run_tick();
assert_eq!(&[3, 4, 3], &*collect_ready::<Vec<_>, _>(&mut result_recv));
// 'a', 'b', 'a'
assert_eq!(&['a', 'b', 'a'], &*collect_ready::<Vec<_>, _>(&mut result_recv));

input_send.send(3).unwrap();
input_send.send(5).unwrap();
input_send.send(3).unwrap();
input_send.send(3).unwrap();
input_send.send('a').unwrap();
input_send.send('c').unwrap();
input_send.send('a').unwrap();
input_send.send('a').unwrap();
flow.run_tick();
// 'c', 'a'
// First two 'a's are removed due to previous tick.
assert_eq!(&['c', 'a'], &*collect_ready::<Vec<_>, _>(&mut result_recv));

input_send.send('b').unwrap();
input_send.send('c').unwrap();
input_send.send('a').unwrap();
input_send.send('a').unwrap();
input_send.send('a').unwrap();
input_send.send('a').unwrap();
flow.run_tick();
// First two "3"s are removed due to previous tick.
assert_eq!(&[5, 3], &*collect_ready::<Vec<_>, _>(&mut result_recv));
// 3 'a's and the 'c' are removed due to previous tick.
assert_eq!(&['b', 'a'], &*collect_ready::<Vec<_>, _>(&mut result_recv));
}

#[multiplatform_test]
Expand Down
50 changes: 37 additions & 13 deletions hydroflow_lang/src/graph/ops/multiset_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,54 @@ use super::{
WriteContextArgs, RANGE_0, RANGE_1,
};

// TODO(mingwei): more doc
/// Multiset delta from the previous tick.
/// The multiset inverse of [`persist()`](#persist).
///
/// > 1 input stream of `T`, 1 output stream of `T`, where `T: Eq + Hash`
///
/// For set semantics, [`unique()`](#unique) can be thought of as a "delta" operator, the inverse
/// of [`persist()`](#persist). In `persist`, new items come in, and all items are repeatedly
/// released out. Conversely, `unique` take repeated items in, and only releases the new ones out.
///
/// This operator does a similar inversion but for multiset semantics, with some caveats. When it
/// receives duplicate items, instead of ignoring them, it "subtracts" them from the items received
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"subtracts" is confusing. Why not say "if we received k copies of an item in the previous tick, and we receive l > k copies in the current tick, we output l - k copies of the item."

/// in the previous tick: i.e. if we received `k` copies of an item in the previous tick, and we
/// receive `l > k` copies in the current tick, we output `l - k` copies of the item.
/// However unlike `unique`, this count is only maintained for the previous tick, not over all time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the proposed rewording above, maybe rephrase to match.

I'm actually confused as to whether it remembers the MAX number of copies over time, or just the last tick number? If it's MAX, then we should probably implement a generic lattice_delta, where this is a specific one for multiset lattices. If it's not MAX, then I'd like to discuss semantics --- it'd be weird for this to be the only 1-tick windowed operator (other than next_tick).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is only the previous tick. The semantics become weird if it's not somewhere after a persist operator

///
/// In the example below, in the second tick two 'a's are removed because two 'a's were received in
/// the previous tick. The third 'a' is released though.
///
/// ```rustbook
/// let (input_send, input_recv) = hydroflow::util::unbounded_channel::<u32>();
/// let (input_send, input_recv) = hydroflow::util::unbounded_channel::<char>();
/// let mut flow = hydroflow::hydroflow_syntax! {
/// source_stream(input_recv)
/// -> multiset_delta()
/// -> for_each(|n| println!("{}", n));
/// };
///
/// input_send.send(3).unwrap();
/// input_send.send(4).unwrap();
/// input_send.send(3).unwrap();
/// input_send.send('a').unwrap();
/// input_send.send('b').unwrap();
/// input_send.send('a').unwrap();
/// flow.run_tick();
/// // 'a', 'b', 'a'
///
/// input_send.send('a').unwrap();
/// input_send.send('c').unwrap();
/// input_send.send('a').unwrap();
/// input_send.send('a').unwrap();
/// flow.run_tick();
/// // 3, 4,
/// // 'c', 'a'
/// // First two 'a's are removed due to previous tick.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Give an example of the 3rd tick to clarify what confuses me above about MAX over all time vs last tick?

///
/// input_send.send(3).unwrap();
/// input_send.send(5).unwrap();
/// input_send.send(3).unwrap();
/// input_send.send(3).unwrap();
/// input_send.send('b').unwrap();
/// input_send.send('c').unwrap();
/// input_send.send('a').unwrap();
/// input_send.send('a').unwrap();
/// input_send.send('a').unwrap();
/// input_send.send('a').unwrap();
/// flow.run_tick();
/// // 5, 3
/// // First two "3"s are removed due to previous tick.
/// // 'b', 'a'
/// // 3 'a's and the 'c' are removed due to previous tick.
/// ```
pub const MULTISET_DELTA: OperatorConstraints = OperatorConstraints {
name: "multiset_delta",
Expand Down
4 changes: 3 additions & 1 deletion hydroflow_lang/src/graph/ops/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use super::{
use crate::diagnostic::{Diagnostic, Level};
use crate::graph::{FlowProps, LatticeFlowType};

/// Stores each item as it passes through, and replays all item every tick.
/// Stores each item as it passes through, and replays all items every tick.
///
/// > 1 input stream, 1 output stream
///
/// ```hydroflow
/// // Normally `source_iter(...)` only emits once, but `persist()` will replay the `"hello"`
Expand Down
Loading