diff --git a/posting/list.go b/posting/list.go index 90881dd5b6f..068d4505d06 100644 --- a/posting/list.go +++ b/posting/list.go @@ -114,7 +114,7 @@ type pIterator struct { deleteBelowTs uint64 } -func (it *pIterator) init(l *List, afterUid, deleteBelowTs uint64) error { +func (it *pIterator) seek(l *List, afterUid, deleteBelowTs uint64) error { if deleteBelowTs > 0 && deleteBelowTs <= l.minTs { return errors.Errorf("deleteBelowTs (%d) must be greater than the minTs in the list (%d)", deleteBelowTs, l.minTs) @@ -209,24 +209,19 @@ func (it *pIterator) moveToNextValidPart() error { return nil } - // If there are no more UIDs to iterate over, move to the next part of the - // list that contains valid data. - if len(it.uids) == 0 { - for it.splitIdx <= len(it.l.plist.Splits)-2 { - // moveToNextPart will increment it.splitIdx. Therefore, the for loop must only - // continue until len(splits) - 2. - if err := it.moveToNextPart(); err != nil { - return err - } - - if len(it.uids) > 0 { - return nil - } + // Iterate while there are no UIDs, and while we have more splits to iterate over. + for len(it.uids) == 0 && it.splitIdx < len(it.l.plist.Splits)-1 { + // moveToNextPart will increment it.splitIdx. Therefore, the for loop must only + // continue until len(splits)-1. + if err := it.moveToNextPart(); err != nil { + return err } } + return nil } +// next advances pIterator to the next valid part. func (it *pIterator) next() error { if it.deleteBelowTs > 0 { it.uids = nil @@ -244,7 +239,14 @@ func (it *pIterator) next() error { hex.EncodeToString(it.l.key)) } +// valid asserts that pIterator has valid uids, or advances it to the next valid part. +// It returns false if there are no more valid parts. func (it *pIterator) valid() (bool, error) { + if it.deleteBelowTs > 0 { + it.uids = nil + return false, nil + } + if len(it.uids) > 0 { return true, nil } @@ -566,7 +568,8 @@ func (l *List) setMutation(startTs uint64, data []byte) { l.Unlock() } -// Iterate will allow you to iterate over this posting List, while having acquired a read lock. +// Iterate will allow you to iterate over the mutable and immutable layers of +// this posting List, while having acquired a read lock. // So, please keep this iteration cheap, otherwise mutations would get stuck. // The iteration will start after the provided UID. The results would not include this uid. // The function will loop until either the posting List is fully iterated, or you return a false @@ -649,6 +652,7 @@ func (l *List) pickPostings(readTs uint64) (uint64, []*pb.Posting) { func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error) error { l.AssertRLock() + // mposts is the list of mutable postings deleteBelowTs, mposts := l.pickPostings(readTs) if readTs < l.minTs { return errors.Errorf("readTs: %d less than minTs: %d for key: %q", readTs, l.minTs, l.key) @@ -668,7 +672,9 @@ func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e prevUid uint64 err error ) - err = pitr.init(l, afterUid, deleteBelowTs) + + // pitr iterates through immutable postings + err = pitr.seek(l, afterUid, deleteBelowTs) if err != nil { return errors.Wrapf(err, "cannot initialize iterator when calling List.iterate") } @@ -1433,6 +1439,7 @@ func (l *List) Facets(readTs uint64, param *pb.FacetParams, langs []string, return fcs, nil } +// readListPart reads one split of a posting list from Badger. func (l *List) readListPart(startUid uint64) (*pb.PostingList, error) { key, err := x.SplitKey(l.key, startUid) if err != nil { diff --git a/posting/list_test.go b/posting/list_test.go index a7c06d1655e..14488469dbc 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -36,6 +36,10 @@ import ( "github.com/dgraph-io/dgraph/x" ) +func setMaxListSize(newMaxListSize int) { + maxListSize = newMaxListSize +} + func (l *List) PostingList() *pb.PostingList { l.RLock() defer l.RUnlock() @@ -451,6 +455,7 @@ func TestAddMutation_mrjn1(t *testing.T) { func TestMillion(t *testing.T) { // Ensure list is stored in a single part. + defer setMaxListSize(maxListSize) maxListSize = math.MaxInt32 key := x.DataKey("bal", 1331) @@ -907,10 +912,8 @@ func verifySplits(t *testing.T, splits []uint64) { func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) { // For testing, set the max list size to a lower threshold. + defer setMaxListSize(maxListSize) maxListSize = 5000 - defer func() { - maxListSize = math.MaxInt32 - }() key := x.DataKey(uuid.New().String(), 1331) ol, err := getNew(key, ps, math.MaxUint64) @@ -955,10 +958,8 @@ func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) { func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) { // For testing, set the max list size to a lower threshold. - maxListSize = 5000 - defer func() { - maxListSize = math.MaxInt32 - }() + defer setMaxListSize(maxListSize) + maxListSize = 10000 key := x.DataKey(uuid.New().String(), 1331) ol, err := getNew(key, ps, math.MaxUint64) @@ -1007,6 +1008,41 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) { return ol, commits } +func TestDeleteStarMultiPartList(t *testing.T) { + numEdges := 10000 + + list, _ := createMultiPartList(t, numEdges, false) + parsedKey, err := x.Parse(list.key) + require.NoError(t, err) + + validateCount := func(expected int) { + count := 0 + list.Iterate(math.MaxUint64, 0, func(posting *pb.Posting) error { + count++ + return nil + }) + require.Equal(t, expected, count) + } + validateCount(numEdges) + + readTs := list.maxTs + 1 + commitTs := readTs + 1 + + txn := NewTxn(readTs) + edge := &pb.DirectedEdge{ + ValueId: parsedKey.Uid, + Attr: parsedKey.Attr, + Value: []byte(x.Star), + Op: pb.DirectedEdge_DEL, + } + err = list.addMutation(context.Background(), txn, edge) + require.NoError(t, err) + + err = list.commitMutation(readTs, commitTs) + require.NoError(t, err) + validateCount(0) +} + func writePostingListToDisk(kvs []*bpb.KV) error { writer := NewTxnWriter(pstore) for _, kv := range kvs { @@ -1147,10 +1183,8 @@ func TestMultiPartListDelete(t *testing.T) { func TestMultiPartListDeleteAndAdd(t *testing.T) { size := int(1e5) // For testing, set the max list size to a lower threshold. + defer setMaxListSize(maxListSize) maxListSize = 5000 - defer func() { - maxListSize = math.MaxInt32 - }() // Add entries to the maps. key := x.DataKey(uuid.New().String(), 1331) @@ -1285,10 +1319,8 @@ func TestSingleListRollup(t *testing.T) { func TestRecursiveSplits(t *testing.T) { // For testing, set the max list size to a lower threshold. + defer setMaxListSize(maxListSize) maxListSize = mb / 2 - defer func() { - maxListSize = math.MaxInt32 - }() // Create a list that should be split recursively. size := int(1e5)