Skip to content

Commit

Permalink
Use better dictionary cache
Browse files Browse the repository at this point in the history
  • Loading branch information
macneale4 committed Feb 6, 2025
1 parent 35cca92 commit 04afcff
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 105 deletions.
209 changes: 117 additions & 92 deletions go/libraries/doltcore/remotestorage/chunk_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/remotestorage/internal/reliable"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/nbs"

"github.com/hashicorp/golang-lru/v2"
)

// A remotestorage.ChunkFetcher is a pipelined chunk fetcher for fetching a
Expand All @@ -52,12 +54,12 @@ type ChunkFetcher struct {
egCtx context.Context

// toGetCh is the channel used to request chunks. This will be initially given a root,
// and as refs are found, they will be added to the channel for workers to batch and request. NM4.
// and as refs are found, they will be added to the channel for workers to batch and request.
toGetCh chan hash.HashSet

// resCh is the results channel for the fetcher. It is used both to return
// chunks themselves, and to indicate which chunks were requested but missing
// buy having a Hash, but are empty. NM4.
// by having a Hash, but are empty.
resCh chan nbs.ToChunker

abortCh chan struct{}
Expand All @@ -71,12 +73,12 @@ const (
reliableCallDeliverRespTimeout = 15 * time.Second
)

var globalDictCache *DictionaryCache
var globalDictCache *dictionaryCache
var once sync.Once

func NewChunkFetcher(ctx context.Context, dcs *DoltChunkStore) *ChunkFetcher {
once.Do(func() {
globalDictCache = NewDictionaryCache(newDownloads(), dcs.csClient)
globalDictCache = NewDictionaryCache(dcs.csClient)
})

eg, ctx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -271,7 +273,6 @@ func fetcherRPCDownloadLocsThread(ctx context.Context, reqCh chan *remotesapi.Ge
})
eg.Go(func() error {
for {
// NM4 - Where there responses come back - resp is an rpc struct. NM4.
resp, err := stream.Recv()
if err == io.EOF {
close(resCh)
Expand Down Expand Up @@ -317,7 +318,6 @@ func getMissingChunks(req *remotesapi.GetDownloadLocsRequest, resp *remotesapi.G
numRequested := len(req.ChunkHashes)
numResponded := 0
for _, loc := range resp.Locs {
// NM4 - Looky here.
hgr := loc.Location.(*remotesapi.DownloadLoc_HttpGetRange).HttpGetRange
numResponded += len(hgr.Ranges)
}
Expand Down Expand Up @@ -380,17 +380,10 @@ func (d downloads) Add(resp *remotesapi.DownloadLoc) {
d.refreshes[path] = refresh
}
for _, r := range gr.Ranges {
// NM4 - Split at this point? Break the dictionary into its own request.
d.ranges.Insert(gr.Url, r.Hash[:], r.Offset, r.Length, r.DictionaryOffset, r.DictionaryLength)
// if r.DictionaryLength == 0 {
// // NM4 - maybe invert the hash, and add it to a set of..... not sure.
// d.ranges.Insert(gr.Url, r.Hash, r.DictionaryOffset, r.DictionaryLength)
// }
}
}

// NM4 - On the client side, we only request HttpRanges for raw bytes. The struct includes the dictionary offset and length,
// but those only make sense in the response of DownloadLocations.
func toGetRange(rs []*ranges.GetRange) *GetRange {
ret := new(GetRange)
for _, r := range rs {
Expand Down Expand Up @@ -615,111 +608,143 @@ func fetcherDownloadURLThread(ctx context.Context, fetchReqCh chan fetchReq, don
}
}

///////
// dictionaryCache caches dictionaries for the chunks in an archive store. When we fetch from a database with an archive,
// we get back the path/offset/length of the dictionary for each chunk. These, by definition, are repeatedly used
// and we don't want to request the same dictionary multiple times.
//
// Currently (feb '25), archives generally have only a default dictionary, so this is kind of overkill. Mainly planning
// for the future when chunk grouping is the default and we could have thousands of dictionaries.
type dictionaryCache struct {
cache *lru.TwoQueueCache[DictionaryKey, *gozstd.DDict]
pending sync.Map
client remotesapi.ChunkStoreServiceClient
dlds downloads
}

// DictionaryKey is the a globaly unique identifier for an archive dictionary.
type DictionaryKey struct {
url string
off uint64
len uint32
// This is the short url to the resource, not including the query parameters - which are provided by the
// locationRefresher.
path string
off uint64
len uint32
}

type DictionaryCache struct {
mu sync.Mutex
cache map[DictionaryKey]*gozstd.DDict
client remotesapi.ChunkStoreServiceClient
dlds downloads
}
func NewDictionaryCache(client remotesapi.ChunkStoreServiceClient) *dictionaryCache {
c, err := lru.New2Q[DictionaryKey, *gozstd.DDict](1024)
if err != nil {
panic(err)
}

func NewDictionaryCache(downloads downloads, client remotesapi.ChunkStoreServiceClient) *DictionaryCache {
return &DictionaryCache{
mu: sync.Mutex{},
cache: make(map[DictionaryKey]*gozstd.DDict),
return &dictionaryCache{
cache: c,
client: client,
dlds: downloads,
dlds: newDownloads(),
}
}

func (dc *DictionaryCache) Get(rang *GetRange, idx int, stats StatsRecorder, recorder reliable.HealthRecorder) (*gozstd.DDict, error) {
// Way too granular... but I'll use a real cache for production. prototype maddddddneeesssss
dc.mu.Lock()
defer dc.mu.Unlock()

func (dc *dictionaryCache) get(rang *GetRange, idx int, stats StatsRecorder, recorder reliable.HealthRecorder) (*gozstd.DDict, error) {
path := rang.ResourcePath()
off := rang.Ranges[idx].DictionaryOffset
ln := rang.Ranges[idx].DictionaryLength

key := DictionaryKey{path, off, ln}
if v, ok := dc.cache[key]; ok {
return v, nil
} else {
if dict, ok := dc.cache.Get(key); ok {
return dict, nil
}

pathToUrl := dc.dlds.refreshes[path]
if pathToUrl == nil {
// Kinda do what Add does....
refresh := new(locationRefresh)
// Check for an in-flight request. Default dictionary will be requested many times, so we want to avoid
// making multiple requests for the same resource.
if ch, loaded := dc.pending.LoadOrStore(key, make(chan struct{})); loaded {
// There's an ongoing fetch, wait for its completion
<-ch.(chan struct{})
if dict, ok := dc.cache.Get(key); ok {
return dict, nil
}
return nil, errors.New("failed to fetch dictionary due to in-flight request")
}
// When update is done, regardless of success or failure, we need to unblock anyone waiting.
defer func() {
if ch, found := dc.pending.LoadAndDelete(key); found {
close(ch.(chan *gozstd.DDict))
}
}()

sRang := &remotesapi.HttpGetRange{}
sRang.Url = rang.Url
sRang.Ranges = append(sRang.Ranges, &remotesapi.RangeChunk{Offset: off, Length: ln})
rang := &remotesapi.DownloadLoc_HttpGetRange{HttpGetRange: sRang}
dl := &remotesapi.DownloadLoc{Location: rang}
// Fetch the dictionary
ddict, err := dc.fetchDictionary(rang, idx, stats, recorder)
if err != nil {
return nil, err
}

refresh.Add(dl)
dc.dlds.refreshes[path] = refresh
// Store the dictionary in the cache
dc.cache.Add(key, ddict)

pathToUrl = refresh
}
return ddict, nil
}

ctx := context.Background()
fetcher := globalHttpFetcher
func (dc *dictionaryCache) fetchDictionary(rang *GetRange, idx int, stats StatsRecorder, recorder reliable.HealthRecorder) (*gozstd.DDict, error) {
path := rang.ResourcePath()
off := rang.Ranges[idx].DictionaryOffset
ln := rang.Ranges[idx].DictionaryLength

urlF := func(lastError error) (string, error) {
earl, err := pathToUrl.GetURL(ctx, lastError, dc.client)
if err != nil {
return "", err
}
if earl == "" {
earl = path
}
return earl, nil
}
ctx := context.Background()
pathToUrl := dc.dlds.refreshes[path]
if pathToUrl == nil {
// We manually construct the RangeChunk and DownloadLoc in this case because we are retrieving the dictionary span.
// We'll make a single span request, and consume the entire response to create the dictionary.
sRang := &remotesapi.HttpGetRange{}
sRang.Url = rang.Url
sRang.Ranges = append(sRang.Ranges, &remotesapi.RangeChunk{Offset: off, Length: ln})
rang := &remotesapi.DownloadLoc_HttpGetRange{HttpGetRange: sRang}
dl := &remotesapi.DownloadLoc{Location: rang}

resp := reliable.StreamingRangeDownload(ctx, reliable.StreamingRangeRequest{
Fetcher: fetcher,
Offset: off,
Length: uint64(ln),
UrlFact: urlF,
Stats: stats,
Health: recorder,
BackOffFact: func(ctx context.Context) backoff.BackOff {
return downloadBackOff(ctx, 3) // params.DownloadRetryCount)
},
Throughput: reliable.MinimumThroughputCheck{
CheckInterval: defaultRequestParams.ThroughputMinimumCheckInterval,
BytesPerCheck: defaultRequestParams.ThroughputMinimumBytesPerCheck,
NumIntervals: defaultRequestParams.ThroughputMinimumNumIntervals,
},
RespHeadersTimeout: defaultRequestParams.RespHeadersTimeout,
})
defer resp.Close()
refresh := new(locationRefresh)
refresh.Add(dl)
dc.dlds.refreshes[path] = refresh
pathToUrl = refresh
}

buf := make([]byte, ln)
_, err := io.ReadFull(resp.Body, buf)
urlF := func(lastError error) (string, error) {
earl, err := pathToUrl.GetURL(ctx, lastError, dc.client)
if err != nil {
return nil, err
return "", err
}

rawDict, err := gozstd.Decompress(nil, buf)
if err != nil {
return nil, err
if earl == "" {
earl = path
}
return earl, nil
}

dict, err := gozstd.NewDDict(rawDict)
if err != nil {
return nil, err
}
resp := reliable.StreamingRangeDownload(ctx, reliable.StreamingRangeRequest{
Fetcher: globalHttpFetcher,
Offset: off,
Length: uint64(ln),
UrlFact: urlF,
Stats: stats,
Health: recorder,
BackOffFact: func(ctx context.Context) backoff.BackOff {
return downloadBackOff(ctx, 3) // params.DownloadRetryCount)
},
Throughput: reliable.MinimumThroughputCheck{
CheckInterval: defaultRequestParams.ThroughputMinimumCheckInterval,
BytesPerCheck: defaultRequestParams.ThroughputMinimumBytesPerCheck,
NumIntervals: defaultRequestParams.ThroughputMinimumNumIntervals,
},
RespHeadersTimeout: defaultRequestParams.RespHeadersTimeout,
})
defer resp.Close()

dc.cache[key] = dict
return dict, nil
buf := make([]byte, ln)
_, err := io.ReadFull(resp.Body, buf)
if err != nil {
return nil, err
}

// Dictionaries are compressed, but with vanilla zstd, so there is no dictionary.
rawDict, err := gozstd.Decompress(nil, buf)
if err != nil {
return nil, err
}

return gozstd.NewDDict(rawDict)
}
15 changes: 4 additions & 11 deletions go/libraries/doltcore/remotestorage/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,8 @@ func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.Ha
return nil
}

// NM4 - Extending the protobuf isn't not really necesary. Possible split this out into a new struct.
// GetRange is structurally the same as remotesapi.HttpGetRange, but with added functions. Instances of GetRange
// don't get sent over the wire, so it is not necessary to use the remotesapi, just convenient.
type GetRange remotesapi.HttpGetRange

func (gr *GetRange) ResourcePath() string {
Expand Down Expand Up @@ -509,7 +510,6 @@ func (a ArchiveToChunker) ToChunk() (chunks.Chunk, error) {
}

return newChunk, err

}

func (a ArchiveToChunker) FullCompressedChunkLen() uint32 {
Expand All @@ -523,8 +523,7 @@ func (a ArchiveToChunker) IsEmpty() bool {
}

func (a ArchiveToChunker) IsGhost() bool {
//TODO implement me
// NM4 - yes, need to. Or maybe not????
// archives are never ghosts. They are only instantiated when the chunk is found.
return false
}

Expand All @@ -538,7 +537,6 @@ type RangeChunkReader struct {
skip int
}

// NM4 - THis is the place where we need to intercept responses and conjour the "full" chunk.
func (r *RangeChunkReader) ReadChunk(stats StatsRecorder, health reliable.HealthRecorder) (nbs.ToChunker, error) {
if r.skip > 0 {
_, err := io.CopyN(io.Discard, r.Reader, int64(r.skip))
Expand All @@ -562,20 +560,15 @@ func (r *RangeChunkReader) ReadChunk(stats StatsRecorder, health reliable.Health
l := rang.Length
h := hash.New(rang.Hash)

if strings.HasPrefix(h.String(), "eh9e0b3ou") {
_ = h.String()
}

buf := make([]byte, l)
_, err := io.ReadFull(r.Reader, buf)
if err != nil {
return nbs.CompressedChunk{}, err
} else {
if rang.DictionaryLength == 0 {
// NOMS snappy compressed chunk.
return nbs.NewCompressedChunk(h, buf)
} else {
dict, err := globalDictCache.Get(r.GetRange, idx, stats, health)
dict, err := globalDictCache.get(r.GetRange, idx, stats, health)
if err != nil {
return nbs.CompressedChunk{}, err
}
Expand Down
3 changes: 1 addition & 2 deletions go/store/nbs/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (nbs *NomsBlockStore) GetChunkLocationsWithPaths(ctx context.Context, hashe
if err != nil {
return nil, err
}
toret := make(map[string]map[hash.Hash]Range, len(locs)) // NM4
toret := make(map[string]map[hash.Hash]Range, len(locs))
for k, v := range locs {
toret[k] = v
}
Expand Down Expand Up @@ -469,7 +469,6 @@ func OverwriteStoreManifest(ctx context.Context, store *NomsBlockStore, root has
}
// Appendix table files should come first in specs
for h, c := range appendixTableFiles {
// NM4 - not sure on this one....
s := tableSpec{fileType: typeNoms, hash: h, chunkCount: c}
contents.appendix = append(contents.appendix, s)
contents.specs = append(contents.specs, s)
Expand Down

0 comments on commit 04afcff

Please sign in to comment.