Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] GC Index entries belonging to a closed shard #4094

Merged
merged 2 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 56 additions & 5 deletions src/dbnode/storage/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/context"
m3errors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/resource"
xtime "github.com/m3db/m3/src/x/time"
Expand All @@ -57,9 +58,14 @@ type IndexWriter interface {

// EntryMetrics are metrics for an entry.
type EntryMetrics struct {
gcNoReconcile tally.Counter
gcNeedsReconcile tally.Counter

gcNoReconcile tally.Counter
gcNeedsReconcile tally.Counter
gcSuccessShardClosed tally.Counter
gcSuccessEmpty tally.Counter
noGcNil tally.Counter
noGcErr tally.Counter
noGcHasReaders tally.Counter
noGcNotEmptySeries tally.Counter
duplicateNoReconcile tally.Counter
duplicateNeedsReconcile tally.Counter
}
Expand All @@ -75,12 +81,35 @@ func NewEntryMetrics(scope tally.Scope) *EntryMetrics {
"reconcile": "needs_reconcile",
"path": "gc",
}).Counter("count"),
gcSuccessShardClosed: scope.Tagged(map[string]string{
"reason": "shard_closed",
"path": "gc",
}).Counter("gc_count"),
gcSuccessEmpty: scope.Tagged(map[string]string{
"reason": "empty",
"path": "gc",
}).Counter("gc_count"),
noGcNil: scope.Tagged(map[string]string{
"reason": "nil",
"path": "gc",
}).Counter("no_gc_count"),
noGcErr: scope.Tagged(map[string]string{
"reason": "error",
"path": "gc",
}).Counter("no_gc_count"),
noGcHasReaders: scope.Tagged(map[string]string{
"reason": "has_readers",
"path": "gc",
}).Counter("no_gc_count"),
noGcNotEmptySeries: scope.Tagged(map[string]string{
"reason": "not_empty_series",
"path": "gc",
}).Counter("no_gc_count"),

duplicateNoReconcile: scope.Tagged(map[string]string{
"reconcile": "no_reconcile",
"path": "duplicate",
}).Counter("count"),

duplicateNeedsReconcile: scope.Tagged(map[string]string{
"reconcile": "needs_reconcile",
"path": "duplicate",
Expand Down Expand Up @@ -352,7 +381,20 @@ func (entry *Entry) TryMarkIndexGarbageCollected() bool {
// a series to be in the index but not have data written yet, and so any series not in the
// lookup yet we cannot yet consider empty.
e, _, err := entry.Shard.TryRetrieveSeriesAndIncrementReaderWriterCount(entry.ID)
if err != nil || e == nil {
if m3errors.Is(err, errShardNotOpen) {
// Shard is closing, all entries which belonged to it should be gc'ed.
entry.metrics.gcSuccessShardClosed.Inc(1)
entry.IndexGarbageCollected.Store(true)
return true
}

if err != nil {
entry.metrics.noGcErr.Inc(1)
return false
}

if e == nil {
entry.metrics.noGcNil.Inc(1)
return false
}

Expand All @@ -370,18 +412,21 @@ func (entry *Entry) TryMarkIndexGarbageCollected() bool {
// Consider non-empty if the entry is still being held since this could indicate
// another thread holding a new series prior to writing to it.
if e.ReaderWriterCount() > 1 {
entry.metrics.noGcHasReaders.Inc(1)
return false
}

// Series must be empty to be GCed. This happens when the data and index are flushed to disk and
// so the series no longer has in-mem data.
if !e.Series.IsEmpty() {
entry.metrics.noGcNotEmptySeries.Inc(1)
return false
}

// Mark as GCed from index so the entry can be safely cleaned up in the shard.
// The reference to this entry from the index is removed by the code path that
// marks this GCed bool.
e.metrics.gcSuccessEmpty.Inc(1)
e.IndexGarbageCollected.Store(true)

if e != entry {
Expand Down Expand Up @@ -436,6 +481,12 @@ func (entry *Entry) NeedsIndexGarbageCollected() bool {
if entry.insertTime.Load() == 0 {
return false // Not inserted, does not need garbage collection.
}

// NB(antanas): Entries need to be GC'ed for closed shards.
// Orphan entries will cause problems if same shard returns to the same node.
if entry.Shard.Closed() {
return true
}
// Check that a write is not potentially pending and the series is empty.
return entry.ReaderWriterCount() == 0 && entry.Series.IsEmpty()
}
Expand Down
Loading