Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Optimize block range scan in queryWithSpan #3813

Merged
merged 13 commits into from
Oct 6, 2021
33 changes: 25 additions & 8 deletions src/dbnode/storage/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ import (
xtime "github.com/m3db/m3/src/x/time"
)

const (
maxUint64 = ^uint64(0)
maxInt64 = int64(maxUint64 >> 1)
)

// IndexWriter accepts index inserts.
type IndexWriter interface {
// WritePending indexes the provided pending entries.
Expand Down Expand Up @@ -138,14 +133,24 @@ func (entry *Entry) IndexedBlockCount() int {
}

// IndexedForBlockStart returns a bool to indicate if the Entry has been successfully
// indexed for the given index blockstart.
// indexed for the given index blockStart.
func (entry *Entry) IndexedForBlockStart(indexBlockStart xtime.UnixNano) bool {
entry.reverseIndex.RLock()
isIndexed := entry.reverseIndex.indexedWithRLock(indexBlockStart)
entry.reverseIndex.RUnlock()
return isIndexed
}

// IndexedRange returns minimum and maximum blockStart values covered by index entry.
// The range is inclusive. Note that there may be uncovered gaps within the range.
// Returns (0, 0) for an empty range.
func (entry *Entry) IndexedRange() (xtime.UnixNano, xtime.UnixNano) {
entry.reverseIndex.RLock()
min, max := entry.reverseIndex.indexedRangeWithRLock()
entry.reverseIndex.RUnlock()
return min, max
}

// NeedsIndexUpdate returns a bool to indicate if the Entry needs to be indexed
// for the provided blockStart. It only allows a single index attempt at a time
// for a single entry.
Expand All @@ -154,7 +159,7 @@ func (entry *Entry) IndexedForBlockStart(indexBlockStart xtime.UnixNano) bool {
// is going to be sent to the index, and other go routines should not attempt the
// same write. Callers are expected to ensure they follow this guideline.
// Further, every call to NeedsIndexUpdate which returns true needs to have a corresponding
// OnIndexFinalze() call. This is required for correct lifecycle maintenance.
// OnIndexFinalize() call. This is required for correct lifecycle maintenance.
func (entry *Entry) NeedsIndexUpdate(indexBlockStartForWrite xtime.UnixNano) bool {
// first we try the low-cost path: acquire a RLock and see if the given block start
// has been marked successful or that we've attempted it.
Expand Down Expand Up @@ -370,7 +375,8 @@ func (entry *Entry) ReleaseRef() error {
// have a write for the 12-2p block from the 2-4p block, or we'd drop the late write.
type entryIndexState struct {
sync.RWMutex
states map[xtime.UnixNano]entryIndexBlockState
states map[xtime.UnixNano]entryIndexBlockState
minIndexedT, maxIndexedT xtime.UnixNano
}

// entryIndexBlockState is used to capture the state of indexing for a single shard
Expand All @@ -387,6 +393,10 @@ func newEntryIndexState() entryIndexState {
}
}

func (s *entryIndexState) indexedRangeWithRLock() (xtime.UnixNano, xtime.UnixNano) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why the separate method instead of inlining in IndexedRange ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IndexedRange is on Entry struct, indexedRangeWithRLock is on entryIndexState.

return s.minIndexedT, s.maxIndexedT
}

func (s *entryIndexState) indexedWithRLock(t xtime.UnixNano) bool {
v, ok := s.states[t]
if ok {
Expand Down Expand Up @@ -414,6 +424,13 @@ func (s *entryIndexState) setSuccessWithWLock(t xtime.UnixNano) {
s.states[t] = entryIndexBlockState{
success: true,
}

if t > s.maxIndexedT {
s.maxIndexedT = t
}
if t < s.minIndexedT || s.minIndexedT == 0 {
s.minIndexedT = t
}
}

func (s *entryIndexState) setAttemptWithWLock(t xtime.UnixNano, attempt bool) {
Expand Down
31 changes: 31 additions & 0 deletions src/dbnode/storage/entry_blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/fortytw2/leaktest"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/m3db/m3/src/dbnode/storage/series"
Expand Down Expand Up @@ -165,3 +166,33 @@ func TestEntryTryMarkIndexGarbageCollectedAfterSeriesClose(t *testing.T) {
require.False(t, entry.TryMarkIndexGarbageCollected())
})
}

func TestEntryIndexedRange(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

entry := NewEntry(NewEntryOptions{Series: newMockSeries(ctrl)})

assertRange := func(expectedMin, expectedMax xtime.UnixNano) {
min, max := entry.IndexedRange()
assert.Equal(t, expectedMin, min)
assert.Equal(t, expectedMax, max)
}

assertRange(0, 0)

entry.OnIndexPrepare(2)
assertRange(0, 0)

entry.OnIndexSuccess(2)
assertRange(2, 2)

entry.OnIndexSuccess(5)
assertRange(2, 5)

entry.OnIndexSuccess(1)
assertRange(1, 5)

entry.OnIndexSuccess(3)
assertRange(1, 5)
}
29 changes: 22 additions & 7 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,15 +544,30 @@ func (b *block) queryWithSpan(
doc := iter.Current()
if md, ok := doc.Metadata(); ok && md.OnIndexSeries != nil {
var (
inBlock bool
currentBlock = opts.StartInclusive.Truncate(b.blockSize)
inBlock bool
currentBlock = opts.StartInclusive.Truncate(b.blockSize)
endExclusive = opts.EndExclusive
minIndexed, maxIndexed = md.OnIndexSeries.IndexedRange()
)
for !inBlock {

if maxIndexed == 0 {
// Empty range.
continue
}

// Narrow down the range of blocks to scan because the client could have
// queried for an arbitrary wide range.
if currentBlock.Before(minIndexed) {
currentBlock = minIndexed
}
maxIndexedExclusive := maxIndexed.Add(time.Nanosecond)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add the nanosecond?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To convert inclusive timestamp to exclusive one. So that we can compare and replace endExclusive with maxIndexedExclusive in the subsequent if.

if endExclusive.After(maxIndexedExclusive) {
endExclusive = maxIndexedExclusive
}

for !inBlock && currentBlock.Before(endExclusive) {
Copy link
Collaborator

@nbroyles nbroyles Oct 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I don't quite follow this for loop. queryWithSpan is called on a block and takes a queryIter which iterates over index results within the same block. Given that, why do we need to do this loop over every block in the query range to check and see if the doc is indexed? Don't we only need to check that the block itself is within the query range and that doc is indexed for this same block (which it should be since it's in the queryIter)? I'm not super familiar with the intricacies of the read path, so could be misunderstanding here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I'm not really familiar with this aspect, either. I saw the opportunity to optimize this code without affecting its semantics, in which case I don't have to fully understand the context of it.
I think the best bet is to ask @robskillington who wrote the original code to shed some light on the purpose of this loop.

Copy link
Collaborator

@vpranckaitis vpranckaitis Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this loop behaves slightly different than before. Previously, this loop was being iterated no less than once, now it might not iterate at all. While the current implementation is more correct, maybe the less correct version was necessary for proper functioning? (Though conditions when the behavior would differ seem to be too rare for this to matter).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean the case when start == endInclusive? I think this was a bug, and it would have been difficult to replicate with the optimized version, so I chose to fix it. I believe this is not a realistic edge case, also this is certainly not the issue that we could have seen.

inBlock = md.OnIndexSeries.IndexedForBlockStart(currentBlock)
currentBlock = currentBlock.Add(b.blockSize)
if !currentBlock.Before(opts.EndExclusive) {
break
}
}

if !inBlock {
Expand Down Expand Up @@ -628,7 +643,7 @@ func (b *block) addQueryResults(
return batch, size, docsCount, err
}

// AggIter acquires a read lock on the block to get the set of segments for the returned iterator. However, the
// AggregateIter acquires a read lock on the block to get the set of segments for the returned iterator. However, the
// segments are searched and results are processed lazily in the returned iterator. The segments are finalized when
// the ctx is finalized to ensure the mmaps are not freed until the ctx closes. This allows the returned results to
// reference data in the mmap without copying.
Expand Down
3 changes: 3 additions & 0 deletions src/dbnode/storage/index/block_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,6 @@ func (m mockOnIndexSeries) IndexedForBlockStart(_ xtime.UnixNano) bool { return
func (m mockOnIndexSeries) IndexedOrAttemptedAny() bool { return false }
func (m mockOnIndexSeries) TryMarkIndexGarbageCollected() bool { return false }
func (m mockOnIndexSeries) NeedsIndexGarbageCollected() bool { return false }
func (m mockOnIndexSeries) IndexedRange() (xtime.UnixNano, xtime.UnixNano) {
return 0, 0
}
2 changes: 1 addition & 1 deletion src/dbnode/storage/index/block_prop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func TestAggregateDocLimits(t *testing.T) {
SetDocsLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}).
SetBytesReadLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute}).
SetAggregateDocsLimitOpts(limits.LookbackLimitOptions{Lookback: time.Minute})
queryLimits, err := limits.NewQueryLimits((limitOpts))
queryLimits, err := limits.NewQueryLimits(limitOpts)
require.NoError(t, err)
testOpts = testOpts.SetInstrumentOptions(iOpts).SetQueryLimits(queryLimits)

Expand Down
Loading