Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefix scans #393

Open
jaupe opened this issue Jul 2, 2017 · 14 comments
Open

Prefix scans #393

jaupe opened this issue Jul 2, 2017 · 14 comments

Comments

@jaupe
Copy link

jaupe commented Jul 2, 2017

I have a use-case to do prefix scans to gather cumulative totals/products of various quantitative measures. This is how I envisage how to do prefix scans from an API perspective:

// prefix sum
let seq = vec![1i64, 2, 3, 4, 5];
let sums = seq.par_iter().sums(); 
assert_eq!(sums vec![1, 3, 6, 10, 15]);

// prefix product
let seq = vec![1i64, 2, 3, 4, 5];
let prods= seq.par_iter().products(); 
assert_eq!(prods, vec![1, 2, 6, 24, 120]);

Would you like a PR for this or should I implement it as a separate crate?

@cuviper
Copy link
Member

cuviper commented Jul 2, 2017

Is that API on any ParallelIterator, or only IndexedParallelIterator? Does it always return Vec, or is it more general like collect()?

I'd also like to hear how you would parallelize this. It seems inherently sequential, although we had some ideas for scan() (#329), which is a more general case.

@jaupe
Copy link
Author

jaupe commented Jul 2, 2017

Hey @cuviper,

Looking at the Rayon API, I think it would good to make it as generic as possible; so ideally all iterators and using collect().

The algorithm in a pseudo-vector-algebra notation:

  1. Break the data into sub-slices and each sub-slice is processed in parallel.
[1,2,3,4,5,6,7,8,9,10]= [1,2,3],[4,5,6],[7,8,9],[10]
  1. A prefix scan is processed on each sub-slice and waits for all prefix scans to be completed.
[1,2,3],[4,5,6],[7,8,9],[10]=> [1,3,6],[4,9,15],[7,15,24],[10]
  1. Once all scans have been processed, sequentially process each sub-slice to compute the offset for each sub-slice. Once the offset is computed, adding the offset to each element in every sub-slice can be done in parallel.
[1,3,6],[4,9,15],[7,15,24],[10]  
=> [1,3,6],6+[4,9,15],21+[7,15,24],45+[10] 
=> [1,3,6],[10,15,21],[28,36,45],[55] 
  1. Fold each sub-slice to merge them together.
[1,3,6],[10,15,21],[28,36,45],[55] => [1,3,6,10,15,21,28,36,45,55]

@cbarrick
Copy link

cbarrick commented Jul 3, 2017

Adding the offset can be done in parallel

Is this true? It seems to me that you have to compute the final result for one group before you know what offset to add to the next. If so, you're back to the sequential algorithm.

For example, in this step:

[1,3,6],[4,9,15],[7,15,24],[10]  
=> [1,3,6],6+[4,9,15],21+[7,15,24],45+[10] 
=> [1,3,6],[10,15,21],[28,36,45],[55] 

How do you know to add 21 to the third group before you compute 6 + 15 in the second group?

@jaupe
Copy link
Author

jaupe commented Jul 3, 2017

Computing the offset/delta is sequential but the actual vector/scalar addition is parallelized. I can re-word it if it's not so clear.

@cuviper
Copy link
Member

cuviper commented Jul 3, 2017

That sounds pretty reasonable.

  • 1&2 can be done with a parallel fold that accumulates a local Vec<T>, then collect those into a simple temporary LinkedList<Vec<T>> to minimize further data movement.
  • Compute the group offsets sequentially.
  • For 3&4, tmp.into_par_iter().zip(offsets).flat_map(/* apply offsets */).collect().

Whether this belongs in rayon proper or its own crate is unclear.

@jaupe
Copy link
Author

jaupe commented Jul 6, 2017

I think a generic scan function definitely deserves a home within the rayon api so one can create cumulative sums and products. I'm on holiday at the moment but I can submit a PR when I get back?

@Qqwy
Copy link
Contributor

Qqwy commented Nov 11, 2019

We are two years down the road, but as I am also facing a situation in which a (parallel) scan operation is required, I'd love to contribute with a PR for this.

@cuviper would that be acceptable?

@cuviper
Copy link
Member

cuviper commented Nov 11, 2019

Sure, you're welcome to work on this. Don't forget #329 if you plan to make this more general than just sums and products.

@Qqwy
Copy link
Contributor

Qqwy commented Nov 11, 2019

👍 All right!
And I will keep #329 in mind.

An question about the implementation suggestion you mentioned above:

If I understand the source code for believe that the into_par_iter() for linked lists currently turns it back into a vector, since it uses the into_par_vec macro. As such, maybe we need a smarter way than the tmp.into_par_vec() you suggested above, since it seems (correct me if I'm wrong!) that we thus destroy the original 'nice split' only to rebuild it later.

Are there ways to let the threads communicate? (e.g. using a crossbeam channel?)
What I have in mind is the following:

  1. The original iterator's elements are approx. evenly divided over our threads. (precondition)
  2. Each of the threads/tasks does a local fold to accumulate into a local Vec<T>.
  3. Each of the threads/tasks communicates the vector's last value over a channel. (as an Option<T> to accomodate for empty vectors). The task then blocks on a channel receive.
  4. In one thread, we read out these channels from the receiving end.
  5. In this one thread, we (sequentially) fold over the received values. In every iteration, we send the value back over the appropriate channel, so that the respective task can continue by starting (6).
  6. All tasks in parallel update their local accumulated vec<T> with the received values.

The things I am unsure about here are thus:

  • Is it okay in Rayon to use a blocking receive, or will this create a potential for deadlocks? (i.e. is the threadpool 'smart' enough to store a task for later if it is waiting)
  • Is the abstraction of channels the right one to use here? I believe we can keep track of the number and order of channels by writing an appropriate implementation for split_at/split_off_left. Is that correct?

@cuviper
Copy link
Member

cuviper commented Nov 11, 2019

I'm not able to dig into this right now, but...

Is it okay in Rayon to use a blocking receive, or will this create a potential for deadlocks?

You'd have the potential for deadlocks like #592 -- thread stealing could cause your receiver to be deeper on the same stack as the sender that would want to fill it.

(i.e. is the threadpool 'smart' enough to store a task for later if it is waiting)

We don't have runtime hooks into external blocking things like mutexes or channels. Even if we did, you'd have to actually suspend the whole stack frame to "store" it for later, or else switch to a new stack to execute stolen tasks, so that stored task can be resumed independently.

@Qqwy
Copy link
Contributor

Qqwy commented Nov 11, 2019

Thank you for the information! That means that using channels is probably not the right way to go.

If I understand the source code for believe that the into_par_iter() for linked lists currently turns it back into a vector, since it uses the into_par_vec macro. As such, maybe we need a smarter way than the tmp.into_par_vec() you suggested above, since it seems (correct me if I'm wrong!) that we thus destroy the original 'nice split' only to rebuild it later.

After digging a bit deeper, it seems that I was incorrect:

  • LinkedList<Vec<T>>::into_par_vec() as called does not turn a long linked list of vectors back into a long vector.
  • Instead, it turns them into a Vec<Vec<T>> where the outer vector has the number of elements of threads in our pool.

Does into_par_vec() then by default spread these over all cores, giving each a single element? Or is there some heuristic we have to pass?
I found the LengthSplitter that seems to go down to 1 but its documentation mentions that "Rayon automatically normally attempts to adjust the size [...] to reduce overhead".

@Qqwy
Copy link
Contributor

Qqwy commented Nov 15, 2019

Another interesting note is that implementing something akin to the example code @cuviper gave above, we lose the indexability of our iterator, since flat_map is used. So I think that indeed (see #329 ) a VecTree abstraction (which can be converted to/from a nested iteration across multiple threads) is required. This also means that the implementation of drive will be different than those of the other ParallelIterator instances that use/are either a Producer or Consumer, since for the scan, we first have to consume everything, then perform a tiny amount of sequential work, and then need to be able to produce a new (indexable) parallel iterator.

I will probably start writing actual code and submitting a WIP PR next week (time permitting).

@gnzlbg
Copy link

gnzlbg commented Jan 23, 2020

A different way to implement this would be to use decoupled look-back: Merril et al., Single-pass Parallel Prefix Scan with Decoupled Look-back, 2016.

EDIT: I needed a couple of prefix sums in the past last weeks, and ended up doing them in a more naive way, but I'll try to find the time to dig more into this. They are quite useful, I needed them in the context of implementing parallel algorithms for maximum subsequence (e.g. see Perumalla et al., Parallel algorithms for maximum subsequence and maximum subarray, 1995) where the interesting building blocks are parallel prefix and postfix sums.

@arieluy
Copy link

arieluy commented Jan 27, 2024

I heard from someone who wanted to use my above PR in their project, so I've released it as a separate crate: https://crates.io/crates/rayon-scan
I hope this will be helpful to anyone else in this thread who is looking for this feature!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants