Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(expr): Implement approx_count_distinct for stream processing #3121

Merged
merged 19 commits into from
Jun 13, 2022

Conversation

Graphcalibur
Copy link
Contributor

What's changed and what's your intention?

Checklist

  • I have written necessary docs and comments
  • I have added necessary unit tests and integration tests
  • All checks passed in ./risedev check (or alias, ./risedev c)

Refer to a related PR or issue link (optional)

FIxes #2727

@codecov
Copy link

codecov bot commented Jun 10, 2022

Codecov Report

Merging #3121 (adb165b) into main (97177a9) will increase coverage by 0.03%.
The diff coverage is 88.79%.

@@            Coverage Diff             @@
##             main    #3121      +/-   ##
==========================================
+ Coverage   73.51%   73.54%   +0.03%     
==========================================
  Files         737      738       +1     
  Lines      101665   101887     +222     
==========================================
+ Hits        74735    74937     +202     
- Misses      26930    26950      +20     
Flag Coverage Δ
rust 73.54% <88.79%> (+0.03%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
.../src/executor/aggregation/approx_count_distinct.rs 88.44% <88.44%> (ø)
...rc/expr/src/vector_op/agg/approx_count_distinct.rs 87.83% <100.00%> (+3.73%) ⬆️
src/stream/src/executor/aggregation/mod.rs 94.29% <100.00%> (ø)

📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more

@jon-chuang
Copy link
Contributor

jon-chuang commented Jun 10, 2022

Could you provide the estimates for error rate and max number of rows for the chosen parameters?

For instance, since we have 2^10 registers, we can estimate a maximum of about ~ 16 trillion distinct rows (16T / 2^10 ~ 16B, and u32 can count 4B elements, and probability of first bit being 0 and second being 1 is 1/4, so we can count ~ 16B distinct elements in a register). Of course, this is assuming there is a 0 chance of overflowing a count, so its just a rough estimate.

This is useful info for our users and developers as 16T rows, about 1.6PB, is not unheard of for some users...

Also, what happens if we overflow a count?


Further, the original spec is also wrong.

For instance, if max count for first bit is 2^32, second bit should be 2^31... 17th bit should be 2^16 e.g. since we successively halve the counts.

So the struct should be:

struct RegisterBucket {
    count_1_to_16: [u32; 16],
    count_17_to_24: [u16; 8],
    count_25_to_32: [u8; 8],
}

So the total cost per bucket is 704 bits. 2^10 buckets => ~90KB, 2^16 buckets => ~5.8MB.

We could add further buckets for count > 32, but I'm not sure it would be appropriate, since if we were in that regime of magnitude, then we would be overflowing buckets everywhere and things would stop working..


2^10 also doesn't seem that good, it has, according to the paper, a ~1/32 (1.04 / sqrt(2^10)) error rate. Why not stick to 2^16 (1/128 error rate) like in Redis? 5.8MB seems quite reasonable for a better estimate.

I guess larger number of buckets also helps increase the max distinct count. (2^16 buckets & u32 for first bit => 1 quadrillion rows)

Maybe we could add some notes about future improvements (sparse representation, better estimate for small number of elements)?


Finally, I'd like to see some results or tests that show that the estimates are within the error bounds for some large number of distinct elements.

Something that can run in reasonable time for unit test but still demonstrates it works as expected.

Copy link
Contributor

@lmatz lmatz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We may put the optimization, i.e. counting huge datasets and sparse-dense transition, into future work.

  2. A benchmark can be set up in future PR to show how each configuration affects error bounds empirically. Verify the correctness and help us to understand the performance.

  3. The number of bits for each register can be corrected.

  4. Since we may use more registers to deal with the high-cardinality situation, calculating the bias correction accurately is helpful.

rest LGTM

@Graphcalibur
Copy link
Contributor Author

2^10 also doesn't seem that good, it has, according to the paper, a ~1/32 (1.04 / sqrt(2^10)) error rate. Why not stick to 2^16 (1/128 error rate) like in Redis? 5.8MB seems quite reasonable for a better estimate.

I can update the implementation to use 2^14 registers, which is the same as Redis's implementation of HyperLogLog. This gives it an error rate of ~1/128 and it only uses about 1.44 MB.

After updating the RegisterBucket to your suggestion, the maximum number of distinct rows that can be stored should be about ~141 quadrillion rows (or 2^47 rows). There are 2^14 RegisterBuckets and each RegisterBucket can store ~2^33 distinct rows (since 1/2 of the rows will have 1 as its last bit).

Should this information be documented within the code itself?

Also, what happens if we overflow a count?

One idea I had is to simply not increment a counter once it reaches its maximum value (and also not decrement a counter once it reaches zero). This makes it less accurate for extremely large volumes of data but the error should be negligible for most use cases I believe.

Also, would it be a good idea to store bools as the "count" for 33 to 64?

@jon-chuang
Copy link
Contributor

jon-chuang commented Jun 11, 2022

Also, would it be a good idea to store bools as the "count" for 33 to 64?

Well, bool takes the same space as u8. It could be a bitmap I guess if we want to save space?

One idea I had is to simply not increment a counter once it reaches its maximum value

Perhaps we can throw an error? I think it is behaviour outside the bounds of the design.

So we can say: HyperLogLog: count exceeds maximum value. You may be trying to run approx_distinct_count on a stream that has too high overall cardinality, or too many repeated values.


I think its also worth thinking - it could be done at a future validation step - if we have a value that is repeated quite a lot, i.e. has a high count, but which happens to have a hash resulting in high amount of trailing 0s for a particular bucket, we will face this error. In other words, some of the assumptions about the counts aren't good since repeated values are correlated.

So its probably worth taking another look at the bounds for the count again.

Personally, I am in favour of increasing the count max so we don't have to think about the issue, and we can deal with both high row cardinality (independent counts) and high duplicate cardinality (correlated counts).

So we could start by simply have all the counts be u32 or u64. (2048/4096 bits).

Even with all counts using u64 and 2^16 registers, the cost is 33 MB, at an error rate of ~0.4%. And we have a guarantee that the algorithm works for any total cardinality < 2^64. The memory cost is < 6x of:

struct RegisterBucket {
    count_1_to_16: [u32; 16],
    count_17_to_24: [u16; 8],
    count_25_to_32: [u8; 8],
}

@fuyufjh @lmatz what do you guys think? Is this memory cost acceptable to have a deterministic guarantee?


To optimize the memory cost without giving up a deterministic guarantee for inputs, we can utilize a sparse representation:

  1. Resizeable count i.e. bigint.
  2. Have a sparse representation of each register:
struct Register {
  counts_1_to_16: [u64; 16]
  counts_17_to_64: Option<Vec<(u8, u64)>>,
}

(u8, u64): the first number represents the the number of trailing zeros, the second is the count.
Since with 2^16 buckets, for total cardinality < 2^32, we should reasonably expect most bucket to have max count < 2^16, we will not have to deal with the vec representation in most cases.


But I think for this PR, we can just have the simple model:

struct RegisterBucket {
    count_1_to_16: [u32; 16],
    count_17_to_24: [u16; 8],
    count_25_to_32: [u8; 8],
}

Maybe we should open an issue on choosing appropriate max counts and discuss further there.

Should this information be documented within the code itself?

I think you should put a descriptive doc comment replacing:

/// `StreamingApproxCountDistinct` approximates the count of non-null rows using `HyperLogLog`.

We should probably document in developer/user facing docs in the future as well.

@jon-chuang
Copy link
Contributor

jon-chuang commented Jun 11, 2022

I think the statement

each RegisterBucket can store ~2^33 distinct rows (since 1/2 of the rows will have 1 as its last bit).

is closer to:

each RegisterBucket can store at most ~2^33 non-distinct rows

So we have an upper bound, but nothing like a practical bound for what cardinality our choice of parameters can handle.


Technically, any bound we give if we want to optimize the size of counts based on probability would be probabilistic. The deterministic bound (worst-case) is the size of the smallest count.

So if our size of the smallest count is 1 bit, we can only guarantee a deterministic result if the total number of rows counted is at most 1.

* Update number of registers in both batch and stream implementation of
  approx_count_distinct to 2^14
* Change storage of RegisterBucket
* Add documentation of estimation error and # of rows that can be counted
* Add error handling for register overflow and invalid registers
* Update number of registers in both batch and stream implementation of
  approx_count_distinct to 2^14
* Change storage of RegisterBucket
* Add documentation of estimation error and # of rows that can be counted
* Add error handling for register overflow and invalid registers
@Graphcalibur Graphcalibur requested review from jon-chuang and lmatz June 13, 2022 02:50
@lmatz
Copy link
Contributor

lmatz commented Jun 13, 2022

HyperLogLog: count exceeds maximum value.

We may throw a warning for the non-strict mode as the user may not want to fail the entire job.

Is this memory cost acceptable to have a deterministic guarantee?

It is good.

Maybe we should open an issue on choosing appropriate max counts and discuss further there.

Great, we may revisit this sort of extreme case later if we don't have a strong and clear motivation to choose either way for the moment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feature: support approx_count_distinct
3 participants