Skip to content

Commit

Permalink
Add awareness of the gcBehavior_Block
Browse files Browse the repository at this point in the history
  • Loading branch information
macneale4 committed Feb 7, 2025
1 parent 04afcff commit 456d5c3
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 20 deletions.
40 changes: 21 additions & 19 deletions go/store/nbs/archive_chunk_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func (acs archiveChunkSource) getMany(ctx context.Context, eg *errgroup.Group, r
if data == nil {
foundAll = false
} else {
// NM4
if keeper != nil && keeper(h) {
return true, gcBehavior_Block, nil
}
Expand Down Expand Up @@ -171,32 +172,33 @@ func (acs archiveChunkSource) clone() (chunkSource, error) {
return archiveChunkSource{acs.file, rdr}, nil
}

func (acs archiveChunkSource) getRecordRanges(_ context.Context, requests []getRecord, _ keeperF) (map[hash.Hash]Range, gcBehavior, error) {
func (acs archiveChunkSource) getRecordRanges(_ context.Context, requests []getRecord, keeper keeperF) (map[hash.Hash]Range, gcBehavior, error) {
result := make(map[hash.Hash]Range, len(requests))
for _, req := range requests {
hAddr := *req.a
if acs.aRdr.has(hAddr) {
idx := acs.aRdr.search(hAddr)
if idx < 0 {
// Chunk not found.
continue
}

dictId, dataId := acs.aRdr.getChunkRef(idx)
dataSpan := acs.aRdr.getByteSpanByID(dataId)
dictSpan := acs.aRdr.getByteSpanByID(dictId)
idx := acs.aRdr.search(hAddr)
if idx < 0 {
// Chunk not found.
continue
}
if keeper != nil && keeper(hAddr) {
return nil, gcBehavior_Block, nil
}

rng := Range{
Offset: dataSpan.offset,
Length: uint32(dataSpan.length),
DictOffset: dictSpan.offset,
DictLength: uint32(dictSpan.length),
}
dictId, dataId := acs.aRdr.getChunkRef(idx)
dataSpan := acs.aRdr.getByteSpanByID(dataId)
dictSpan := acs.aRdr.getByteSpanByID(dictId)

result[hAddr] = rng
rng := Range{
Offset: dataSpan.offset,
Length: uint32(dataSpan.length),
DictOffset: dictSpan.offset,
DictLength: uint32(dictSpan.length),
}

result[hAddr] = rng
}
return result, gcBehavior_Continue, nil // NM4 - FIXME. Merging. This is wrong. Use the keeperF
return result, gcBehavior_Continue, nil
}

func (acs archiveChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, ToChunker), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
Expand Down
6 changes: 5 additions & 1 deletion go/store/nbs/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ type extractRecord struct {
err error
}

// Returned by read methods that take a |keeperFunc|, this lets a
// Returned by read methods that take a |keeperF|, this lets a
// caller know whether the operation was successful or if it needs to
// be retried. It may need to be retried if a GC is in progress but
// the dependencies indicated by the operation cannot be added to the
Expand All @@ -202,6 +202,10 @@ const (
gcBehavior_Block = true
)

// keeperF is a function that takes a hash.Hash and returns true if the hash is used by the GC system to indicate
// that the chunk requested may not be present in the future, and therefore |gcBehavior_Block| should be returned. This
// is used to allow read/write ops to the store by non-GC processes while GC is underway. The |keeperF| may be nil,
// in which case GC is not underway. If it's non-nil, and return false, it's ok to proceed with the operation (|gcBehavior_Continue|)
type keeperF func(hash.Hash) bool

type chunkReader interface {
Expand Down

0 comments on commit 456d5c3

Please sign in to comment.