-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
docs: add BucketFile proposal #1567
Changes from all commits
7b01e47
80464e5
0adff8b
786117e
b032afd
3376912
59b4074
708ac32
17c916c
21d2804
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,280 @@ | ||
--- | ||
title: BucketFile for reducing memory usage | ||
type: proposal | ||
menu: proposals | ||
status: | ||
owner: zhulongcheng | ||
--- | ||
|
||
### Related Tickets | ||
|
||
* https://github.com/thanos-io/thanos/issues/1471 (main) | ||
* https://github.com/thanos-io/thanos/issues/448 (store gateway OOM) | ||
|
||
## Summary | ||
|
||
This document describes the motivation and design of the `BucketFile` of the store component. | ||
|
||
Our goals here are: | ||
|
||
- A) Reduce memory usage | ||
- B) Reuse chunks and pieces of index fetched from object storage | ||
|
||
## Motivation | ||
|
||
Currently, the store gateway loads all [index.cache.json](https://github.com/thanos-io/thanos/blob/865d5ec710b5651ead08a7dc73864a6f5dbeeaa9/pkg/block/index.go#L40) files into memory on startup, which easily hits OOM situations. On the other hand, some indexes are rarely or never queried, so we don't have to keep these indexes in memory. | ||
|
||
The store gateway fetches chunks from object storage for each query request. but fetched chunks are not reused for next requests. | ||
|
||
## Proposal | ||
|
||
Add a helper struct to wrapper remote object storage file. We can temporarily name it `BucketFile`. | ||
|
||
`BucketFile` looks like `ByteSlice` in prometheus/tsdb, but with some download-with-cache functions. It fetches part or all of file content from object storage, writes content to local file, and records data have been fetched by a bitmap. | ||
|
||
`BucketFile` always returns a slice of the mmaped file. This is to reduce coping file data from kernel space to user space. | ||
|
||
### BucketFile in a block | ||
|
||
![BucketBlock](./img/bucketfile_in_a_block.jpg) | ||
|
||
The above sequence diagram shows how the BucketFile works with IndexReader and ChunksReader in a block: | ||
|
||
(BucketIndexFile and BucketChunksFile are represented by `BucketFile` ) | ||
|
||
1. BucketStore calls `blockSeries` method to retrieve series from a block. | ||
2. BucketBlock calls `ExpendedPostings` method to retrieve expended postings from BucketIndexReader. | ||
3. BucketIndexReader fetches posting entries from BucketIndexFile. | ||
4. BucketIndexFile returns posting entries. | ||
5. BucketIndexReader decodes posting entries into postings and returns expended postings. | ||
6. BucketBlock retrieves chunk metas from BucketIndexReader. | ||
7. BucketIndexReader returns chunk metas. | ||
8. BucketBlock calls BucketBlockChunksReader to prefetch chunks by chunk metas. | ||
9. BucketBlockChunksReader calls BucketBlockChunksFile to prefetch ranges. | ||
10. BucketBlockChunksFile notifies BucketBlockChunksReader prefetch done. | ||
11. BucketBlockChunksReader notifies BucketBlock prefetch done. | ||
12. BucketBlock retrieves chunks by metas. | ||
13. BucketBlockChunksReader returns chunks. | ||
14. BucketBlock converts chunks to `bucketSeriesSet` and returns `bucketSeriesSet`. | ||
|
||
### BucketFile internals | ||
|
||
![BucketFile](./img/bucketfile_internals.jpg) | ||
|
||
The above sequence diagram shows BucketFile internals: | ||
|
||
1. The `Fetch` method is called. | ||
2. BucketFile mmaps local file if not exists. | ||
3. BucketFile creates the pages bitmap if not exists. | ||
4. BucketFile creates the file descriptor if not exists. | ||
5. BucketFile converts range to page ids. | ||
6. BucketFile retrieves new page ids those have not been fetched. | ||
7. New page ids are retrieved. | ||
8. If new page ids are not empty, then partition page ids multiple partitions, else, go to step 20. | ||
9. For each partition, converts partition page ids to range. | ||
10. Fetch range by BucketClient. | ||
11. BucketClient retrieves bytes from ObjectStorage. | ||
12. ObjectStorage returns a `io.ReadCloser`. | ||
13. BucketClient returns the `io.ReadCloser` to BucketFile. | ||
14. BucketFile reader bytes from `io.ReadCloser` and write to FileDescriptor. | ||
15. FileDescriptor writes bytes into PageCache. | ||
16. Bytes are wrote successfully. | ||
17. Bytes are wrote successfully. | ||
18. BucketFile adds partition page ids into PageBitmap. | ||
19. PageBitmap is updated. | ||
20. BucketFile retrieves a slice of FileMmap. | ||
21. A slice is retrieved. | ||
22. BucketFile returns the slice to the caller. | ||
|
||
### BucketFile struct | ||
|
||
``` | ||
// Range represents a byte range. | ||
type Range struct { | ||
start int | ||
end int | ||
} | ||
|
||
// BucketFile represents a block index file or | ||
// chunks file in remote object storage. | ||
type BucketFile struct { | ||
mtx sync.RWMutex | ||
file *os.File // file descriptor | ||
data []byte // mmap bytes, shared & read only | ||
pages *roaring.Bitmap // record pages have been fetched | ||
size int // file size | ||
|
||
bkt objstore.BucketReader | ||
name string // bucket object name | ||
path string // local file path | ||
|
||
pendingDuration time.Duration // duration to fetch pending pages | ||
pendingPages *roaring.Bitmap // pending pages to be fetched | ||
pendingReaders int // record pending callers | ||
pendingChan chan error // chan to notify callers the result of prefetch | ||
} | ||
|
||
func (f *BucketFile) Fetch(start, end int) ([]byte, error) {...} | ||
func (f *BucketFile) Prefetch(ranges []*Range) (chan error, error) {...} | ||
func (f *BucketFile) Range(start, end int) ([]byte, error) {...} | ||
|
||
func (f *BucketFile) CloseMmap() error {...} | ||
func (f *BucketFile) CloseFile() error {...} | ||
func (f *BucketFile) Close() error {...} | ||
``` | ||
|
||
exported methods: | ||
|
||
* Fetch: | ||
|
||
`Fetch` returns a slice of mmaped file (`f.data[start:end]`). If some pages in the range have not been fetched, It will fetch those pages from object storage, write those pages to local file and update the pages (f.pages) bitmap. | ||
|
||
`Fetch` will not combine object storage requests. | ||
|
||
It will split big range request to multiple small requests if needed. | ||
|
||
`Fetch` internals: | ||
|
||
* ensure `end` is not greater than `f.size` | ||
|
||
* convert range to page ids | ||
* get new page ids (not in f.pages) | ||
* if have not new page ids | ||
* return a slice of mmaped file | ||
* split new page ids to multiple partitions | ||
* convert partition page ids to range and fetch concurrently | ||
* return a slice of mmaped file | ||
|
||
* Prefetch: | ||
|
||
`Prefetch` will combine small object storage requests. it returns a channel to pending callers, the channel is used to notify the result of prefetch. | ||
|
||
Callers need to check value from the channel to know if the prefetch was successful. If prefetch done, then callers can call the `Range` method to retrieve bytes. | ||
|
||
`Prefetch` also returns an error if the error can be found quickly (e.g. `end` is greater than `f.size`). | ||
|
||
`Prefetch` internals: | ||
|
||
- ensure `end` is not great than `f.size` | ||
|
||
- convert ranges to page ids | ||
- get new page ids (not in f.pages) | ||
- if have not new page ids (have been fetched) | ||
- return a slice of mmaped file (f.data[start:end]) | ||
|
||
- add new page into `f.pendingPages` | ||
- increase `f.pendingReaders` | ||
- return `f.pendingChan` | ||
* Range: | ||
|
||
`Range` returns a slice of mmaped file (`f.data[start:end]`). | ||
|
||
It will not try to fetch data from obj store, and return an error if some pages in the range have not been fetched. | ||
|
||
`Range` internals: | ||
|
||
- convert range to page ids | ||
- if all page ids have been fetched | ||
- return a slice of mmaped file | ||
|
||
* otherwise return an error | ||
|
||
* CloseMmap: `CloseMmap` unmmaps `f.data` . | ||
|
||
* CloseFile: `CloseFile` closes `f.file`. | ||
|
||
* Close: `Close` closes `f.file`, unmmaps `f.data` and set `f.pages` to nil. | ||
|
||
### Range to pages | ||
|
||
Convert a range to pages: | ||
|
||
``` | ||
const pageSize = 4 * 1024 // 4KiB | ||
|
||
minPageID := math.Floor( float(start) / float(pageSize) ) | ||
maxPageID := math.Ceil( float(end) / float(pageSize) ) | ||
|
||
pages := roaring.NewBitmap() | ||
pages.AddRange(minPageID, maxPageID) | ||
``` | ||
|
||
### Combine queries | ||
|
||
``` | ||
type BucketFile struct { | ||
// ... | ||
pendingDuration time.Duration // duration to fetch pending pages | ||
pendingPages *roaring.Bitmap // pending pages to be fetched | ||
pendingReaders int // record pending callers | ||
pendingChan chan error // chan to notify callers the result of prefetch | ||
} | ||
|
||
func (f *BucketFile) Prefetch(ranges []*Range) (chan error, error) {...} | ||
``` | ||
|
||
BucketFile appends new pages to `pendingPages`, waits for next time to fetch. So that we can combine multiple queries to ones. | ||
|
||
If the combined prefetch fails, all sub prefetches will fail. There must be error handling for failed prefetches. | ||
|
||
Each time to fetch pending pages, BucketFile will reset `pendingPages`, `pendingReaders` and `pendingChan`. | ||
|
||
### In-memory objects | ||
|
||
Each IndexReader decodes [postings offset block](https://github.com/prometheus/prometheus/blob/master/tsdb/docs/format/index.md#postings-offset-table) and [label offset block](https://github.com/prometheus/prometheus/blob/master/tsdb/docs/format/index.md#label-offset-table) into in-memory object for sequential and random accesses. | ||
|
||
Each BucketFile has a `pages` bitmap, but the bitmap uses small memory. | ||
|
||
For inactive IndexReaders and BucketFiles, if the inactive duration (`now.Sub(lastQueryTime)`) is greater than a configured duration (e.g. 15 mins), then IndexReaders will remove in-memory objects, and BucketFiles will remove in-memory bitmaps, close file descriptors and unmmap local files. This will reduce memory usage. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice, what about disk space? I would say we should try to have limited space as well. BTW can we remove pieces of e.g index from memory or only full index? or from disk? E.g questions is can we make it sparse for things that are not used.. (Probably too much effort) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BucketFiles will also remove local files. (will update the doc) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IndexReaders decode the The disk index is represented by BucketFile. If BucketFile is inactive, BucketFile will remove the disk index. |
||
|
||
### Preload blocks on startup | ||
|
||
The latest blocks will be queried the most. | ||
|
||
Preload `the last N hours` (configurable) blocks on startup. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interesting idea. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Preload what? Full block to memory? last N hours of it? That sounds scary for large objects. Even downloading to disk will take ages. 🤔 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Preload block index, the The latest blocks are usually small. |
||
This will fetch index files from object storage and decode bytes into in-memory objects. | ||
|
||
### Local files | ||
|
||
Because the max size of a index file is 64 GiB, and the default size of a chunks file is 512 MiB. | ||
|
||
So we can set the size of a local index file to 64 GiB, set the size of a local chunks file to 768 (512*1.5) MiB. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need to know/set max size of files here? Why? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
The size is used to create a sparse file and mmap a file. | ||
|
||
### Object storage byte ranges | ||
|
||
Currently, different provider clients implement byte ranges differently. We need to wrap their implementations and export the same api. | ||
|
||
I think byte ranges in [minio-go](https://github.com/minio/minio-go/blob/master/api-get-options.go#L99-L128) is a good example: | ||
|
||
1. Fetch last N bytes: | ||
`Range{start:0, end: -N}` -> Request header`Range: bytes=-N` | ||
**NOTE**: will not write to local file because we do not know the offset. | ||
2. Fetch everything starting from `start`: | ||
`Range{start:N, end: 0} ` -> Request heade `Range: bytes=N-` | ||
|
||
3. Fetch everything starting at `start` till `end`: | ||
`Range{start:N, end: M}` -> Request header `Range: bytes=M-N` | ||
|
||
Now all providers (Azure, COS, GCS, S3, Swift) support `Fetch last N bytes`, `Fetch everything starting from offset` and `Fetch everything starting at start till end`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for research, let's see first if we really need to have this working, maybe we can do it without those 3 special features of fetching ranges? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Essentially, index TOC knows exactly where things are in the first place, so we need just that 🤔 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we do not know the size of the remote index file, so we need the Feature-1 we know the start of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO: check overfetch. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
yes, you are right here. |
||
|
||
In order to support `Fetch last N bytes` in client side, there are some required changes: | ||
|
||
1. Upgrade GCS client package to version [v0.45.0+](https://github.com/googleapis/google-cloud-go/blob/master/CHANGES.md#v0450) | ||
2. reimplement Swift [GetRange](https://github.com/thanos-io/thanos/blob/master/pkg/objstore/swift/swift.go#L121) | ||
|
||
|
||
See https://tools.ietf.org/html/rfc7233#section-2.1 for reference. | ||
|
||
### Work plan | ||
|
||
- A) Reimplement `BucketReader.GetRange` | ||
- Upgrade GCS client package | ||
- Reimplement Swift `GetRange` | ||
- B) Implement `BucketFile` | ||
|
||
- C) Use `BucketFile` for `BucketIndexReader` and `BucketchunkReader` | ||
|
||
- D) Add Flags to enable `BucketFile` | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we will
mmap
everything and create sparse files then won't this technically mean that there is kind of an assumption that we have "infinite" RAM and address space? How willBucketFile
ensure that we won't pass a certain threshold? What will we do in situations under memory pressure? I think that ATM having a common pool of RAM for responses serves us well as a capacity planning mechanism but this could potentially mean that someone will just ask for random data from a bucket that could be terabytes in size, and we will hold all of that in memory?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will not hold all of that in memory.
BucketFile
will read small size (e.g. 1 MiB) data fromio.Reader
and write to a local file each time, untilio.Reader
EOF.If we fetch many blocks (> 1000 blocks) from object storage,
it is possible that the size of all mmaps exceeds the virtual address space (e.g. 128TiB) for the store gateway process.
I think Thanos Sharding is a good way to resolve this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah but typically resilient systems gracefully shed their load, no? I would rather have Thanos Store unmmap some files instead of crashing in the middle of the night when someone queries a bunch of disparate data. I am fairly confident that we should probably have some kind of strategy on how to deal with this kind of situation instead of crashing and telling our users that maybe they should split up the Thanos Store nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fully agree with @GiedriusS we need to think about that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a helper struct
MmapTracker
to manage the size of all mmaps.The
MmapTracker
is a global obj for store gw.For each
BucketFile
:MmapTracker.Allocate(size)
beforemmap
a fileMmapTracker.Free(size)
afterunmmap
a fileMmapTracker.allocate
returns an error if the current size is greater than the configurableMaxMmapSize
.How about this way? @bwplotka @GiedriusS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, for design it's really enough to say
We will track mmaped file globally per store.
And then knowing that there are important design questions to ask, e.g