Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support arbitrary tuples #34

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 207 additions & 33 deletions src/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,56 @@ use std::ops::Deref;
/// because relations have no "recent" tuples, so the fn would be a
/// guaranteed no-op if both arguments were relations. See also
/// `join_into_relation`.
pub(crate) fn join_into<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>(
input1: &Variable<(Key, Val1)>,
input2: impl JoinInput<'me, (Key, Val2)>,
pub(crate) fn join_into<'me, Key: Ord, Value1: Ord, Value2: Ord, Result: Ord>(
input1: &Variable<(Key, Value1)>,
input2: impl JoinInput<'me, (Key, Value2)>,
output: &Variable<Result>,
mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result,
mut logic: impl FnMut(&Key, &Value1, &Value2) -> Result,
) {
join_into_by_impl(
input1,
input2,
output,
|(key, _value)| key,
|(key, _value)| key,
|tuple1, tuple2| logic(&tuple1.0, &tuple1.1, &tuple2.1),
)
}

pub(crate) fn join_into_by<
'me,
Key: Ord,
Tuple1: Ord,
Tuple2: Ord,
Accessor1,
Accessor2,
Result: Ord,
>(
input1: &Variable<Tuple1>,
input2: impl JoinInput<'me, Tuple2>,
output: &Variable<Result>,
accessor1: Accessor1,
accessor2: Accessor2,
logic: impl FnMut(&Tuple1, &Tuple2) -> Result,
) where
Accessor1: Fn(&Tuple1) -> &Key,
Accessor2: Fn(&Tuple2) -> &Key,
{
join_into_by_impl(input1, input2, output, accessor1, accessor2, logic)
}

#[inline(always)]
fn join_into_by_impl<'me, Key: Ord, Tuple1: Ord, Tuple2: Ord, Accessor1, Accessor2, Result: Ord>(
input1: &Variable<Tuple1>,
input2: impl JoinInput<'me, Tuple2>,
output: &Variable<Result>,
accessor1: Accessor1,
accessor2: Accessor2,
mut logic: impl FnMut(&Tuple1, &Tuple2) -> Result,
) where
Accessor1: Fn(&Tuple1) -> &Key,
Accessor2: Fn(&Tuple2) -> &Key,
{
let mut results = Vec::new();

let recent1 = input1.recent();
Expand All @@ -23,80 +67,210 @@ pub(crate) fn join_into<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>(
{
// scoped to let `closure` drop borrow of `results`.

let mut closure = |k: &Key, v1: &Val1, v2: &Val2| results.push(logic(k, v1, v2));
let mut closure = |tuple1: &Tuple1, tuple2: &Tuple2| results.push(logic(tuple1, tuple2));

for batch2 in input2.stable().iter() {
join_helper(&recent1, &batch2, &mut closure);
join_helper_by(&recent1, &batch2, &accessor1, &accessor2, &mut closure);
}

for batch1 in input1.stable().iter() {
join_helper(&batch1, &recent2, &mut closure);
join_helper_by(&batch1, &recent2, &accessor1, &accessor2, &mut closure);
}

join_helper(&recent1, &recent2, &mut closure);
join_helper_by(&recent1, &recent2, &accessor1, &accessor2, &mut closure);
}

output.insert(Relation::from_vec(results));
}

/// Join, but for two relations.
pub(crate) fn join_into_relation<'me, Key: Ord, Val1: Ord, Val2: Ord, Result: Ord>(
input1: &Relation<(Key, Val1)>,
input2: &Relation<(Key, Val2)>,
mut logic: impl FnMut(&Key, &Val1, &Val2) -> Result,
pub(crate) fn join_into_relation<'me, Key: Ord, Value1: Ord, Value2: Ord, Result: Ord>(
input1: &Relation<(Key, Value1)>,
input2: &Relation<(Key, Value2)>,
mut logic: impl FnMut(&Key, &Value1, &Value2) -> Result,
) -> Relation<Result> {
join_into_relation_by(
input1,
input2,
|(key, _value)| key,
|(key, _value)| key,
|tuple1, tuple2| logic(&tuple1.0, &tuple1.1, &tuple2.1),
)
}

/// Join, but for two relations.
pub(crate) fn join_into_relation_by<
'me,
Key: Ord,
Tuple1: Ord,
Tuple2: Ord,
Accessor1,
Accessor2,
Result: Ord,
>(
input1: &Relation<Tuple1>,
input2: &Relation<Tuple2>,
accessor1: Accessor1,
accessor2: Accessor2,
mut logic: impl FnMut(&Tuple1, &Tuple2) -> Result,
) -> Relation<Result>
where
Accessor1: Fn(&Tuple1) -> &Key,
Accessor2: Fn(&Tuple2) -> &Key,
{
let mut results = Vec::new();

join_helper(&input1.elements, &input2.elements, |k, v1, v2| {
results.push(logic(k, v1, v2));
});
join_helper_by(
&input1.elements,
&input2.elements,
&accessor1,
&accessor2,
|tuple1, tuple2| {
results.push(logic(tuple1, tuple2));
},
);

Relation::from_vec(results)
}

/// Moves all recent tuples from `input1` that are not present in `input2` into `output`.
pub(crate) fn antijoin<'me, Key: Ord, Val: Ord, Result: Ord>(
input1: impl JoinInput<'me, (Key, Val)>,
pub(crate) fn antijoin<'me, Key: Ord, Value1: Ord, Result: Ord>(
input1: impl JoinInput<'me, (Key, Value1)>,
input2: &Relation<Key>,
mut logic: impl FnMut(&Key, &Val) -> Result,
mut logic: impl FnMut(&Key, &Value1) -> Result,
) -> Relation<Result> {
antijoin_by_impl(
input1,
input2,
|(key, _value)| key,
|key| key,
|(key, value)| logic(key, value),
)
}

pub(crate) fn antijoin_by<
'me,
Tuple1: Ord,
Tuple2: Ord,
Key: Ord,
Accessor1,
Accessor2,
Result: Ord,
>(
input1: impl JoinInput<'me, Tuple1>,
input2: &Relation<Tuple2>,
accessor1: Accessor1,
accessor2: Accessor2,
logic: impl FnMut(&Tuple1) -> Result,
) -> Relation<Result>
where
Accessor1: Fn(&Tuple1) -> &Key,
Accessor2: Fn(&Tuple2) -> &Key,
{
antijoin_by_impl(input1, input2, accessor1, accessor2, logic)
}

/// Moves all recent tuples from `input1` that are not present in `input2` into `output`.
#[inline(always)]
pub(crate) fn antijoin_by_impl<
'me,
Tuple1: Ord,
Tuple2: Ord,
Key: Ord,
Accessor1,
Accessor2,
Result: Ord,
>(
input1: impl JoinInput<'me, Tuple1>,
input2: &Relation<Tuple2>,
accessor1: Accessor1,
accessor2: Accessor2,
mut logic: impl FnMut(&Tuple1) -> Result,
) -> Relation<Result>
where
Accessor1: Fn(&Tuple1) -> &Key,
Accessor2: Fn(&Tuple2) -> &Key,
{
let mut tuples2 = &input2[..];

let results = input1
.recent()
.iter()
.filter(|(ref key, _)| {
tuples2 = gallop(tuples2, |k| k < key);
tuples2.first() != Some(key)
.filter(|tuple| {
let key = accessor1(tuple);
tuples2 = gallop(tuples2, |k| accessor2(k) < key);
tuples2.first().map(|tuple2| accessor2(tuple2)) != Some(key)
})
.map(|(ref key, ref val)| logic(key, val))
.map(|tuple| logic(tuple))
.collect::<Vec<_>>();

Relation::from_vec(results)
}

fn join_helper<K: Ord, V1, V2>(
mut slice1: &[(K, V1)],
mut slice2: &[(K, V2)],
mut result: impl FnMut(&K, &V1, &V2),
#[allow(dead_code)]
fn join_helper<Key: Ord, Value1, Value2>(
slice1: &[(Key, Value1)],
slice2: &[(Key, Value2)],
mut result: impl FnMut(&Key, &Value1, &Value2),
) {
join_helper_by_impl(
slice1,
slice2,
|(key, _value)| key,
|(key, _value)| key,
|tuple1, tuple2| result(&tuple1.0, &tuple1.1, &tuple2.1),
)
}

fn join_helper_by<Key: Ord, Tuple1, Tuple2, Accessor1, Accessor2>(
slice1: &[Tuple1],
slice2: &[Tuple2],
accessor1: Accessor1,
accessor2: Accessor2,
result: impl FnMut(&Tuple1, &Tuple2),
) where
Accessor1: Fn(&Tuple1) -> &Key,
Accessor2: Fn(&Tuple2) -> &Key,
{
join_helper_by_impl(slice1, slice2, accessor1, accessor2, result)
}

#[inline(always)]
fn join_helper_by_impl<Key: Ord, Tuple1, Tuple2, Accessor1, Accessor2>(
mut slice1: &[Tuple1],
mut slice2: &[Tuple2],
accessor1: Accessor1,
accessor2: Accessor2,
mut result: impl FnMut(&Tuple1, &Tuple2),
) where
Accessor1: Fn(&Tuple1) -> &Key,
Accessor2: Fn(&Tuple2) -> &Key,
{
while !slice1.is_empty() && !slice2.is_empty() {
use std::cmp::Ordering;

let ordering = { accessor1(&slice1[0]).cmp(&accessor2(&slice2[0])) };

// If the keys match produce tuples, else advance the smaller key until they might.
match slice1[0].0.cmp(&slice2[0].0) {
match ordering {
Ordering::Less => {
slice1 = gallop(slice1, |x| x.0 < slice2[0].0);
slice1 = gallop(slice1, |x| accessor1(x) < accessor2(&slice2[0]));
Copy link
Contributor

@ecstatic-morse ecstatic-morse Aug 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To join efficiently, tuples need to be in sorted order. This is a precondition of gallop/binary_search. In the current implementation, both slice1 and slice2 are in sorted order because they come from a Relation, but the mapped values (once accessor is applied) may not be.

It would be possible to make this a contract of the accessors (probably checked at runtime?), but that would be a pretty big footgun, and you would still have to re-index variables when their elements are in the wrong order. Accessors only help when tuple elements aren't grouped properly (e.g. (A, B, C) vs. ((A, B), C)). However, there are type-safe ways of handling that particular case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no, I feared I'd be missing something like that. 😟

I'm familiar with h-lists, but not exactly sure how one would apply them here? 🤔

Copy link
Contributor

@ecstatic-morse ecstatic-morse Aug 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datafrog expects leapers to use an interface like (Key, Value), which becomes awkward when Key or Value contain multiple elements themselves. That's why you see variables with types like ((Origin, Point), Origin) or ((Loan, Point), ()) over in Polonius. However, the only prerequisite for an efficient join is that variables/relations have a common prefix: You should be able to join (Loan, Origin, X) with (Loan, Origin, Y, Z) directly, without having to re-index them as ((Loan, Origin), ...).

I think it would be simplest to express this constraint on top of h-lists (with the typical ordering reversed, so (((A), B), C) instead of (A, (B, (C)))), since you can take a reference to a valid h-list representing any prefix of that type. You could also do something similar with extension traits on top of tuples (impl Prefix<(A, B)> for (A, B, C)), but everything would have to be Copy. This is fine for Polonius I suppose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed explanation!

}
Ordering::Equal => {
// Determine the number of matching keys in each slice.
let count1 = slice1.iter().take_while(|x| x.0 == slice1[0].0).count();
let count2 = slice2.iter().take_while(|x| x.0 == slice2[0].0).count();
let count1 = slice1
.iter()
.take_while(|x| accessor1(x) == accessor1(&slice1[0]))
.count();
let count2 = slice2
.iter()
.take_while(|x| accessor2(x) == accessor2(&slice2[0]))
.count();

// Produce results from the cross-product of matches.
for index1 in 0..count1 {
for s2 in slice2[..count2].iter() {
result(&slice1[0].0, &slice1[index1].1, &s2.1);
result(&slice1[index1], s2);
}
}

Expand All @@ -105,13 +279,13 @@ fn join_helper<K: Ord, V1, V2>(
slice2 = &slice2[count2..];
}
Ordering::Greater => {
slice2 = gallop(slice2, |x| x.0 < slice1[0].0);
slice2 = gallop(slice2, |x| accessor2(x) < accessor1(&slice1[0]));
}
}
}
}

pub(crate) fn gallop<T>(mut slice: &[T], mut cmp: impl FnMut(&T) -> bool) -> &[T] {
pub(crate) fn gallop<Tuple>(mut slice: &[Tuple], mut cmp: impl FnMut(&Tuple) -> bool) -> &[Tuple] {
// if empty slice, or already >= element, return
if !slice.is_empty() && cmp(&slice[0]) {
let mut step = 1;
Expand Down
10 changes: 5 additions & 5 deletions src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

use super::{Relation, Variable};

pub(crate) fn map_into<T1: Ord, T2: Ord>(
input: &Variable<T1>,
output: &Variable<T2>,
logic: impl FnMut(&T1) -> T2,
pub(crate) fn map_into<Tuple1: Ord, Tuple2: Ord>(
input: &Variable<Tuple1>,
output: &Variable<Tuple2>,
logic: impl FnMut(&Tuple1) -> Tuple2,
) {
let results: Vec<T2> = input.recent.borrow().iter().map(logic).collect();
let results: Vec<Tuple2> = input.recent.borrow().iter().map(logic).collect();

output.insert(Relation::from_vec(results));
}
Loading