Skip to content

Commit

Permalink
feat(expr): Implement approx_count_distinct for stream processing (#3121
Browse files Browse the repository at this point in the history
)

* Add approx_count_distinct

* Add ApproxCountDistinct to AggKind

* Add binding for approx_count_distinct

* feat(expr): Made small optimizations and fix tests

* feat(stream): Add approx_count_distinct to stream

* feat(expr): Add support for deletion on Approx_count_distinct

* feat(expr): Fix bug

* feat(expr): Add some comments and fix issue with registers

* feat(expr): Change number of registers and buckets, change count hash method

* feat(expr):
* Update number of registers in both batch and stream implementation of
  approx_count_distinct to 2^14
* Change storage of RegisterBucket
* Add documentation of estimation error and # of rows that can be counted
* Add error handling for register overflow and invalid registers

* feat(expr): Adjust implementation of approx_distinct_count
* Update number of registers in both batch and stream implementation of
  approx_count_distinct to 2^14
* Change storage of RegisterBucket
* Add documentation of estimation error and # of rows that can be counted
* Add error handling for register overflow and invalid registers

* feat(expr): Change name from register to bucket

* feat(expr): Add tests for RegisterBucket

* feat(expr): Add tests for RegisterBucket
  • Loading branch information
Graphcalibur authored Jun 13, 2022
1 parent 8286446 commit 725c459
Show file tree
Hide file tree
Showing 3 changed files with 395 additions and 13 deletions.
28 changes: 16 additions & 12 deletions src/expr/src/vector_op/agg/approx_count_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,29 @@ use risingwave_common::types::*;
use crate::vector_op::agg::aggregator::Aggregator;
use crate::vector_op::agg::general_sorted_grouper::EqGroups;

const INDEX_BITS: u8 = 10; // number of bits used for finding the index of each 64-bit hash
const INDICES: usize = 1 << INDEX_BITS; // number of indices available
const INDEX_BITS: u8 = 14; // number of bits used for finding the index of each 64-bit hash
const NUM_OF_REGISTERS: usize = 1 << INDEX_BITS; // number of indices available
const COUNT_BITS: u8 = 64 - INDEX_BITS; // number of non-index bits in each 64-bit hash

// Approximation for bias correction for 16384 registers. See "HyperLogLog: the analysis of a
// near-optimal cardinality estimation algorithm" by Philippe Flajolet et al.
const BIAS_CORRECTION: f64 = 0.72125;

/// `ApproxCountDistinct` approximates the count of non-null rows using `HyperLogLog`. The
/// estimation error for `HyperLogLog` is 1.04/sqrt(num of registers). With 2^14 registers this
/// is ~1/128.
pub struct ApproxCountDistinct {
return_type: DataType,
input_col_idx: usize,
registers: [u8; INDICES],
registers: [u8; NUM_OF_REGISTERS],
}

impl ApproxCountDistinct {
pub fn new(return_type: DataType, input_col_idx: usize) -> Self {
Self {
return_type,
input_col_idx,
registers: [0; INDICES],
registers: [0; NUM_OF_REGISTERS],
}
}

Expand All @@ -51,7 +58,7 @@ impl ApproxCountDistinct {
let scalar_impl = datum_ref.unwrap().into_scalar_impl();
let hash = self.get_hash(scalar_impl);

let index = (hash as usize) & (INDICES - 1); // Index is based on last few bits
let index = (hash as usize) & (NUM_OF_REGISTERS - 1); // Index is based on last few bits
let count = self.count_hash(hash);

if count > self.registers[index] {
Expand All @@ -77,18 +84,15 @@ impl ApproxCountDistinct {

/// Calculates the bias-corrected harmonic mean of the registers to get the approximate count
fn calculate_result(&self) -> i64 {
// Approximation for bias correction. See "HyperLogLog: the analysis of a near-optimal
// cardinality estimation algorithm" by Philippe Flajolet et al.
let bias_correction = 0.72134;
let m = INDICES as f64;
let m = NUM_OF_REGISTERS as f64;
let mut mean = 0.0;

// Get harmonic mean of all the counts in results
for count in self.registers.iter() {
mean += 1.0 / ((1 << *count) as f64);
}

let raw_estimate = bias_correction * m * m / mean;
let raw_estimate = BIAS_CORRECTION * m * m / mean;

// If raw_estimate is not much bigger than m and some registers have value 0, set answer to
// m * log(m/V) where V is the number of registers with value 0
Expand Down Expand Up @@ -168,14 +172,14 @@ impl Aggregator for ApproxCountDistinct {
groups_iter.next();
group_cnt += 1;
builder.append(Some(self.calculate_result()))?;
self.registers = [0; INDICES];
self.registers = [0; NUM_OF_REGISTERS];
}

self.add_datum(datum_ref);

// reset state and exit when reach limit
if groups.is_reach_limit(group_cnt) {
self.registers = [0; INDICES];
self.registers = [0; NUM_OF_REGISTERS];
break;
}
}
Expand Down
Loading

0 comments on commit 725c459

Please sign in to comment.