Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store data in dense chunks #6066

Closed
emilk opened this issue Apr 22, 2024 · 0 comments · Fixed by #6570
Closed

Store data in dense chunks #6066

emilk opened this issue Apr 22, 2024 · 0 comments · Fixed by #6570
Assignees
Labels
enhancement New feature or request 🚀 performance Optimization, memory use, etc ⛃ re_datastore affects the datastore itself

Comments

@emilk
Copy link
Member

emilk commented Apr 22, 2024

Summary

Store the data in the datastore as dense chunks to drastically reduce the cost of ingestion, as well as the RAM overhead, but at the cost of slower queries in the unhappy path (but amortized by a query cache).

For the happy-path (in-order data) queries should be about as fast as the current store.
For the unhappy path (out-of-order data) we have a parameter to tweak to either optimize for query time or memory use. By default we optimize for the latter, and rely on the query cache to amortize the former.

It should be pretty straight-forward to implement, taking maybe 2-4 weeks.

Details

The proposal is to arrange the data into dense chunks (similar to e.g. Parquet).
Each chunk contains N rows of data for a single Entity.

  • Each temporal batch becomes a single chunk
  • The SDK-side batcher outputs chunks (e.g. every 8ms)
  • The DataStore contains an index of chunks

During ingestion, each chunk is just inserted straight into the DataStore, which keeps a per-timeline index of the chunks.
The chunks can have overlapping time ranges. Each chunk can also be unsorted. This is what makes queries slower.

Because chunks can overlap in time, we need to use something like a interval tree to index them. We should still have O(log N) insertions and queries into this tree.

In the future we can add compaction and chunk splitting to the data store in order to optimize storage and/or lookups, but we don't need it for the first iteration.

The SDK-side batcher appends data to a chunk until one of the conditions is met:

  • The chunk is old enough (e.g. 8ms, our current default)
  • Its get too large (e.g. >1MB)
  • If any timeline is unsorted (out-of-order), we split after ~256 rows to minimize costs of linear searches during query time
  • If the chunk is large and an insertion would make a timeline unsorted, split before the new insert
  • If a timeline has been added or removed
  • If any component has a different datatype (e.g. it was a TensorData, but is now a Promise)

Pseudo-code

struct DataStore {
    entity_streams: Map<EntityPath, EntityStream>
}

struct EntityStream {
    /// Each chunk is found at most once on each timeline.
    timelines: Map<TimeLine, IntervalTree<TimeInt, ChunkId>>,
    
    chunks: Map<ChunkId, Arc<Chunk>>,
}

/// Dense storage of N rows of data for a specific EntityPath
struct Chunk {
    // Always sorted.
    row_ids: Vec<RowId>,

    // The time columns.
    timelines: Map<TimeLine, ChunkTimeline>,

    /// Each `Box<dyn Array>` is effectively a `Vec<Option<Vec<T>>>` backed as raw Arrow data.
    /// The outer option here is so that we can log only `Position` one time, but not a `Color`.
    /// The `dyn Array` also holds the data type.
    components: BTreeMap<ComponentName, Box<dyn Array>>,
}

struct ChunkTimeline {
    /// Hopefully sorted, but not necessarily.
    times: Vec<TimeInt>,
    
    /// Is [`Self::times`] sorted?
    sorted: bool,
    
    /// (min, max) of [`Self::times`]
    range: TImeRange,
    
    // Future improvement: keep a shuffle-index if unsorted
}

---------------------------------------------------

type RangeQueryOutput = Vec<(Arc<Bucket>, Range<usize>)>;

impl EntityStream {
    fn range_query(&self, component: ComponentName, timeline: TimelineName, range: TimeRange) -> RangeQueryOutput {
        self.timelines[timeline].overlaps(range)
            .flat_map(|chunk_id| self.chunks[chunk_id].range_query(component, range))
            .sorted_by(|(_, range)| range.start)
    }
}

impl Chunk {
    fn range_query(&Arc<self>, component: ComponentName, timeline: TimelineName, range: TimeRange) -> RangeQueryOutput {
        let timeline = &self.timelines[timeline];
        if timeline.sorted {
            let row_range = timeline.times.binary_search(range);
            vec![(self.clone, row_range)]
        } else {
            // slow linear search:
            self.times.iter().enumerate()
                .filter_map(|(row_idx, row_time)|{
                    range.contains(row_time).then(|| (self.clone(), row_idx..=row_idx))
                }).collect()
        }
    }
}

Future improvements

  • More efficient encoding of RowId and TimeInt, using some sort of time-series encoding, e.g. delta-encoding + RLE.
  • Chunk compaction in the DataStore
@emilk emilk added enhancement New feature or request ⛃ re_datastore affects the datastore itself 🚀 performance Optimization, memory use, etc labels Apr 22, 2024
emilk added a commit that referenced this issue Apr 22, 2024
emilk added a commit that referenced this issue Apr 22, 2024
emilk added a commit that referenced this issue Apr 23, 2024
Part of #6066

Shows that datastore currently uses 936 bytes to store a 16 byte RowId,
a 8 byte TimeInt, and a 8 byte f64, i.e. around 29x the memory use it
should have.

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested the web demo (if applicable):
* Using examples from latest `main` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/6067?manifest_url=https://app.rerun.io/version/main/examples_manifest.json)
* Using full set of examples from `nightly` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/6067?manifest_url=https://app.rerun.io/version/nightly/examples_manifest.json)
* [x] The PR title and labels are set such as to maximize their
usefulness for the next release's CHANGELOG
* [x] If applicable, add a new check to the [release
checklist](https://github.com/rerun-io/rerun/blob/main/tests/python/release_checklist)!

- [PR Build Summary](https://build.rerun.io/pr/6067)
- [Recent benchmark results](https://build.rerun.io/graphs/crates.html)
- [Wasm size tracking](https://build.rerun.io/graphs/sizes.html)

To run all checks from `main`, comment on the PR with `@rerun-bot
full-check`.

---------

Co-authored-by: Clement Rey <[email protected]>
@teh-cmc teh-cmc self-assigned this May 27, 2024
@emilk emilk added this to the 0.18 milestone Jun 24, 2024
@teh-cmc teh-cmc closed this as completed in ed20cba Jul 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request 🚀 performance Optimization, memory use, etc ⛃ re_datastore affects the datastore itself
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants