diff --git a/posting/list.go b/posting/list.go index 6b51b18d73e..1fc86cf4754 100644 --- a/posting/list.go +++ b/posting/list.go @@ -903,25 +903,7 @@ type rollupOutput struct { newMinTs uint64 } -// Merge all entries in mutation layer with commitTs <= l.commitTs into -// immutable layer. Note that readTs can be math.MaxUint64, so do NOT use it -// directly. It should only serve as the read timestamp for iteration. -func (l *List) rollup(readTs uint64, split bool) (*rollupOutput, error) { - l.AssertRLock() - - // Pick all committed entries - if l.minTs > readTs { - // If we are already past the readTs, then skip the rollup. - return nil, nil - } - - out := &rollupOutput{ - plist: &pb.PostingList{ - Splits: l.plist.Splits, - }, - parts: make(map[uint64]*pb.PostingList), - } - +func (l *List) encode(out *rollupOutput, readTs uint64, split bool) error { var plist *pb.PostingList var startUid, endUid uint64 var splitIdx int @@ -968,19 +950,48 @@ func (l *List) rollup(readTs uint64, split bool) (*rollupOutput, error) { }) // Finish writing the last part of the list (or the whole list if not a multi-part list). if err != nil { - return nil, errors.Wrapf(err, "cannot iterate through the list") + return errors.Wrapf(err, "cannot iterate through the list") } plist.Pack = enc.Done() if plist.Pack != nil { if plist.Pack.BlockSize != uint32(blockSize) { - return nil, errors.Errorf("actual block size %d is different from expected value %d", + return errors.Errorf("actual block size %d is different from expected value %d", plist.Pack.BlockSize, blockSize) } } - - if len(l.plist.Splits) > 0 { + if split && len(l.plist.Splits) > 0 { out.parts[startUid] = plist } + return nil +} + +// Merge all entries in mutation layer with commitTs <= l.commitTs into +// immutable layer. Note that readTs can be math.MaxUint64, so do NOT use it +// directly. It should only serve as the read timestamp for iteration. +func (l *List) rollup(readTs uint64, split bool) (*rollupOutput, error) { + l.AssertRLock() + + // Pick all committed entries + if l.minTs > readTs { + // If we are already past the readTs, then skip the rollup. + return nil, nil + } + + out := &rollupOutput{ + plist: &pb.PostingList{ + Splits: l.plist.Splits, + }, + parts: make(map[uint64]*pb.PostingList), + } + + if len(out.plist.Splits) > 0 || len(l.mutationMap) > 0 { + if err := l.encode(out, readTs, split); err != nil { + return nil, errors.Wrapf(err, "while encoding") + } + } else { + // We already have a nicely packed posting list. Just use it. + out.plist = l.plist + } maxCommitTs := l.minTs { @@ -1354,7 +1365,7 @@ func shouldSplit(plist *pb.PostingList) bool { } func (out *rollupOutput) updateSplits() { - if out.plist == nil { + if out.plist == nil || len(out.parts) > 0 { out.plist = &pb.PostingList{} } out.plist.Splits = out.splits() @@ -1438,13 +1449,11 @@ func binSplit(lowUid uint64, plist *pb.PostingList) ([]uint64, []*pb.PostingList } // Add elements in plist.Postings to the corresponding list. - for _, posting := range plist.Postings { - if posting.Uid < midUid { - lowPl.Postings = append(lowPl.Postings, posting) - } else { - highPl.Postings = append(highPl.Postings, posting) - } - } + pidx := sort.Search(len(plist.Postings), func(idx int) bool { + return plist.Postings[idx].Uid >= midUid + }) + lowPl.Postings = plist.Postings[:pidx] + highPl.Postings = plist.Postings[pidx:] return []uint64{lowUid, midUid}, []*pb.PostingList{lowPl, highPl} } diff --git a/posting/list_test.go b/posting/list_test.go index a970bc6bd74..1e29d9f23f8 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -945,6 +945,8 @@ func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) { require.NoError(t, writePostingListToDisk(kvs)) ol, err = getNew(key, ps, math.MaxUint64) require.NoError(t, err) + require.Nil(t, ol.plist.Pack) + require.Equal(t, 0, len(ol.plist.Postings)) require.True(t, len(ol.plist.Splits) > 0) verifySplits(t, ol.plist.Splits)