Skip to content

Commit

Permalink
feat(hydroflow_plus): add core negation operators (#1036)
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj authored Jan 20, 2024
1 parent 88a1796 commit 5a03ed4
Show file tree
Hide file tree
Showing 25 changed files with 1,145 additions and 19 deletions.
20 changes: 12 additions & 8 deletions hydroflow/src/compiled/pull/anti_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,21 @@ where
Ipos: 'a + Iterator<Item = (Key, V)>,
{
if new_tick {
for kv in input_pos {
if !state_neg.contains(&kv.0) {
state_pos.insert(kv);
}
}

Either::Left(
state_pos
.iter()
.filter(|(k, _)| !state_neg.contains(k))
.cloned(),
.filter(|kv| !state_neg.contains(&kv.0))
.cloned()
.collect::<Vec<_>>()
.into_iter()
.chain(input_pos.filter_map(|kv| {
if !state_neg.contains(&kv.0) {
state_pos.insert(kv.clone());
Some(kv)
} else {
None
}
})),
)
} else {
Either::Right(AntiJoin {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,6 @@ Received vertex: 3
Received vertex: 6
Received vertex: 11
Received vertex: 12
unreached_vertices vertex: 12
unreached_vertices vertex: 11
unreached_vertices vertex: 12

11 changes: 7 additions & 4 deletions hydroflow/tests/surface_codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,10 @@ pub fn test_anti_join() {

flow.run_available();
let out: Vec<_> = collect_ready(&mut out_recv);
assert_eq!(&[(1, 2), (2, 3), (3, 4), (4, 5), (5, 4), (6, 5)], &*out);
assert_eq!(
&[(1, 2), (1, 2), (2, 3), (3, 4), (4, 5), (5, 4), (6, 5)],
&*out
);
}

#[multiplatform_test]
Expand All @@ -339,7 +342,7 @@ pub fn test_anti_join_static() {
}
flow.run_tick();
let out: Vec<_> = collect_ready(&mut out_recv);
assert_eq!(&[(1, 2), (5, 6), (400, 5)], &*out);
assert_eq!(&[(1, 2), (1, 2), (5, 6), (400, 5)], &*out);

neg_send.send(400).unwrap();

Expand Down Expand Up @@ -369,15 +372,15 @@ pub fn test_anti_join_tick_static() {
}
flow.run_tick();
let out: Vec<_> = collect_ready(&mut out_recv);
assert_eq!(&[(1, 2), (5, 6), (400, 5)], &*out);
assert_eq!(&[(1, 2), (1, 2), (5, 6), (400, 5)], &*out);

for x in [(10, 10), (10, 10), (200, 5)] {
pos_send.send(x).unwrap();
}

flow.run_available();
let out: Vec<_> = collect_ready(&mut out_recv);
assert_eq!(&[(10, 10)], &*out);
assert_eq!(&[(10, 10), (10, 10)], &*out);
}

#[multiplatform_test]
Expand Down
4 changes: 2 additions & 2 deletions hydroflow/tests/surface_difference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub fn test_diff_static() {

df.run_tick();

assert_eq!(&[1], &*collect_ready::<Vec<_>, _>(&mut output_recv));
assert_eq!(&[1, 1], &*collect_ready::<Vec<_>, _>(&mut output_recv));

pos_send.send(1).unwrap();
pos_send.send(1).unwrap();
Expand All @@ -82,7 +82,7 @@ pub fn test_diff_static() {

df.run_tick();

assert_eq!(&[1, 3], &*collect_ready::<Vec<_>, _>(&mut output_recv));
assert_eq!(&[1, 1, 3], &*collect_ready::<Vec<_>, _>(&mut output_recv));
}

#[multiplatform_test]
Expand Down
5 changes: 3 additions & 2 deletions hydroflow_lang/src/graph/ops/anti_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ use crate::graph::{GraphEdgeType, OpInstGenerics, OperatorInstance, PortIndexVal
///
/// For a given tick, computes the anti-join of the items in the input
/// streams, returning unique items in the `pos` input that do not have matching keys
/// in the `neg` input. Note this is set semantics, so duplicate items in the `pos` input
/// are output 0 or 1 times (if they do/do-not have a match in `neg` respectively.)
/// in the `neg` input. Note this is set semantics only for the `neg element`. Order
/// is preserved for new elements in a given tick, but not for elements processed
/// in a previous tick with `'static`.
///
/// ```hydroflow
/// source_iter(vec![("dog", 1), ("cat", 2), ("elephant", 3)]) -> [pos]diff;
Expand Down
3 changes: 1 addition & 2 deletions hydroflow_lang/src/graph/ops/difference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use crate::graph::{GraphEdgeType, OperatorInstance, PortIndexValue};
/// `difference` can be provided with one or two generic lifetime persistence arguments
/// in the same way as [`join`](#join), see [`join`'s documentation](#join) for more info.
///
/// Note set semantics here: duplicate items in the `pos` input
/// are output 0 or 1 times (if they do/do-not have a match in `neg` respectively.)
/// Note set semantics only for the `neg` input.
///
/// ```hydroflow
/// source_iter(vec!["dog", "cat", "elephant"]) -> [pos]diff;
Expand Down
168 changes: 168 additions & 0 deletions hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,90 @@ impl<'a, T, N: Location<'a>> Stream<'a, T, Windowed, N> {
.pipeline_op(parse_quote!(unique::<'tick>()), false)
}

pub fn filter_not_in(&self, other: &Stream<'a, T, Windowed, N>) -> Stream<'a, T, Windowed, N>
where
T: Eq + Hash,
{
if self.node.id() != other.node.id() {
panic!("union must be called on streams on the same node");
}

let next_id = {
let mut next_id = self.next_id.borrow_mut();
let id = *next_id;
*next_id += 1;
id
};

let self_ident = if (self.is_delta, other.is_delta) == (true, false) {
self.ensure_concrete().ident
} else {
self.ident.clone()
};

let other_ident = &other.ident;
let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site());

let mut builders = self.builders.borrow_mut();
let builder = builders
.as_mut()
.unwrap()
.entry(self.node.id())
.or_default();

let output_delta = match (self.is_delta, other.is_delta) {
(true, true) => {
// we don't gain any performance by having the first be 'tick,
// just persist the second one and we get deltas out
builder.add_statement(parse_quote! {
#ident = difference::<'tick, 'static>() -> tee();
});

true
}
(true, false) => {
// difference/anti_join<'static, _> does not replay
// but we need to re-filter every tick
builder.add_statement(parse_quote! {
#ident = difference::<'tick, 'tick>() -> tee();
});

false
}
(false, true) => {
builder.add_statement(parse_quote! {
#ident = difference::<'tick, 'static>() -> tee();
});

false
}
(false, false) => {
builder.add_statement(parse_quote! {
#ident = difference::<'tick, 'tick>() -> tee();
});

false
}
};

builder.add_statement(parse_quote! {
#self_ident -> [pos]#ident;
});

builder.add_statement(parse_quote! {
#other_ident -> [neg]#ident;
});

Stream {
ident,
node: self.node.clone(),
next_id: self.next_id,
builders: self.builders,
is_delta: output_delta,
_phantom: PhantomData,
}
}

pub fn sample_every(
&self,
duration: impl Quoted<'a, std::time::Duration> + Copy + 'a,
Expand Down Expand Up @@ -469,6 +553,90 @@ impl<'a, K, V1, W, N: Location<'a>> Stream<'a, (K, V1), W, N> {
_phantom: PhantomData,
}
}

pub fn anti_join<W2>(&self, n: &Stream<'a, K, W2, N>) -> Stream<'a, (K, V1), W, N>
where
K: Eq + Hash,
{
if self.node.id() != n.node.id() {
panic!("anti_join must be called on streams on the same node");
}

let next_id = {
let mut next_id = self.next_id.borrow_mut();
let id = *next_id;
*next_id += 1;
id
};

let self_ident = if (self.is_delta, n.is_delta) == (true, false) {
self.ensure_concrete().ident
} else {
self.ident.clone()
};

let other_ident = &n.ident;
let ident = syn::Ident::new(&format!("stream_{}", next_id), Span::call_site());

let mut builders = self.builders.borrow_mut();
let builder = builders
.as_mut()
.unwrap()
.entry(self.node.id())
.or_default();

let output_delta = match (self.is_delta, n.is_delta) {
(true, true) => {
// we don't gain any performance by having the first be 'tick,
// just persist the second one and we get deltas out
builder.add_statement(parse_quote! {
#ident = anti_join::<'tick, 'static>() -> tee();
});

true
}
(true, false) => {
// difference/anti_join<'static, _> does not replay
// but we need to re-filter every tick
builder.add_statement(parse_quote! {
#ident = anti_join::<'tick, 'tick>() -> tee();
});

false
}
(false, true) => {
builder.add_statement(parse_quote! {
#ident = anti_join::<'tick, 'static>() -> tee();
});

false
}
(false, false) => {
builder.add_statement(parse_quote! {
#ident = anti_join::<'tick, 'tick>() -> tee();
});

false
}
};

builder.add_statement(parse_quote! {
#self_ident -> [pos]#ident;
});

builder.add_statement(parse_quote! {
#other_ident -> [neg]#ident;
});

Stream {
ident,
node: self.node.clone(),
next_id: self.next_id,
builders: self.builders,
is_delta: output_delta,
_phantom: PhantomData,
}
}
}

impl<'a, K: Eq + Hash, V, N: Location<'a>> Stream<'a, (K, V), Windowed, N> {
Expand Down
1 change: 1 addition & 0 deletions hydroflow_plus_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use stageleft::{q, Quoted, RuntimeData};

pub mod cluster;
pub mod first_ten;
pub mod negation;
pub mod networked;

#[stageleft::entry(UnboundedReceiverStream<u32>)]
Expand Down
Loading

0 comments on commit 5a03ed4

Please sign in to comment.