Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt(rollup): change the way rollups are done #7253

Merged
merged 21 commits into from
Jan 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,14 +330,6 @@ func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) erro
l.AssertLock()
x.AssertTrue(mpost.Op == Set || mpost.Op == Del)

// Keys are added to the rollup batches here instead of at the point at which the
// transaction is committed because the transaction context does not keep track
// of the badger keys touched by mutations. It's useful to roll up lists even if
// the transaction is eventually aborted.
if len(l.mutationMap) > 0 {
IncrRollup.addKeyToBatch(l.key)
}

// If we have a delete all, then we replace the map entry with just one.
if hasDeleteAll(mpost) {
plist := &pb.PostingList{}
Expand Down
85 changes: 58 additions & 27 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,19 @@ import (
"github.com/pkg/errors"
)

// incrRollupi is used to batch keys for rollup incrementally.
type incrRollupi struct {
type pooledKeys struct {
// keysCh is populated with batch of 64 keys that needs to be rolled up during reads
keysCh chan *[][]byte
// keysPool is sync.Pool to share the batched keys to rollup.
keysPool *sync.Pool
count uint64
}

// incrRollupi is used to batch keys for rollup incrementally.
type incrRollupi struct {
// We are using 2 priorities with now, idx 0 represents the high priority keys to be rolled up
// while idx 1 represents low priority keys to be rolled up.
priorityKeys []*pooledKeys
count uint64
}

var (
Expand All @@ -54,15 +60,24 @@ var (

// IncrRollup is used to batch keys for rollup incrementally.
IncrRollup = &incrRollupi{
keysCh: make(chan *[][]byte),
keysPool: &sync.Pool{
New: func() interface{} {
return new([][]byte)
},
},
priorityKeys: make([]*pooledKeys, 2),
}
)

func init() {
x.AssertTrue(len(IncrRollup.priorityKeys) == 2)
for i := range IncrRollup.priorityKeys {
IncrRollup.priorityKeys[i] = &pooledKeys{
keysCh: make(chan *[][]byte, 16),
keysPool: &sync.Pool{
New: func() interface{} {
return new([][]byte)
},
},
}
}
}

// rollUpKey takes the given key's posting lists, rolls it up and writes back to badger
func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error {
l, err := GetNoStore(key, math.MaxUint64)
Expand All @@ -86,20 +101,23 @@ func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error {
return writer.Write(&bpb.KVList{Kv: kvs})
}

func (ir *incrRollupi) addKeyToBatch(key []byte) {
batch := ir.keysPool.Get().(*[][]byte)
// TODO: When the opRollup is not running the keys from keysPool of ir are dropped. Figure out some
// way to handle that.
func (ir *incrRollupi) addKeyToBatch(key []byte, priority int) {
rki := ir.priorityKeys[priority]
batch := rki.keysPool.Get().(*[][]byte)
*batch = append(*batch, key)
if len(*batch) < 16 {
ir.keysPool.Put(batch)
rki.keysPool.Put(batch)
return
}

select {
case ir.keysCh <- batch:
case rki.keysCh <- batch:
default:
// Drop keys and build the batch again. Lossy behavior.
*batch = (*batch)[:0]
ir.keysPool.Put(batch)
rki.keysPool.Put(batch)
}
}

Expand All @@ -111,16 +129,14 @@ func (ir *incrRollupi) Process(closer *z.Closer) {
defer writer.Flush()

m := make(map[uint64]int64) // map hash(key) to ts. hash(key) to limit the size of the map.
limiter := time.NewTicker(100 * time.Millisecond)
limiter := time.NewTicker(time.Millisecond)
defer limiter.Stop()
cleanupTick := time.NewTicker(5 * time.Minute)
defer cleanupTick.Stop()
forceRollupTick := time.NewTicker(500 * time.Millisecond)
defer forceRollupTick.Stop()

var batch *[][]byte

doRollup := func() {
doRollup := func(batch *[][]byte, priority int) {
currTs := time.Now().Unix()
for _, key := range *batch {
hash := z.MemHash(key)
Expand All @@ -133,9 +149,8 @@ func (ir *incrRollupi) Process(closer *z.Closer) {
}
}
}
// clear the batch and put it back in Sync keysPool
*batch = (*batch)[:0]
ir.keysPool.Put(batch)
ir.priorityKeys[priority].keysPool.Put(batch)
}

for {
Expand All @@ -151,15 +166,18 @@ func (ir *incrRollupi) Process(closer *z.Closer) {
}
}
case <-forceRollupTick.C:
batch = ir.keysPool.Get().(*[][]byte)
batch := ir.priorityKeys[0].keysPool.Get().(*[][]byte)
if len(*batch) > 0 {
doRollup()
doRollup(batch, 0)
} else {
ir.keysPool.Put(batch)
ir.priorityKeys[0].keysPool.Put(batch)
}
case batch = <-ir.keysCh:
doRollup()
// throttle to 1 batch = 64 rollups per 100 ms.
case batch := <-ir.priorityKeys[0].keysCh:
doRollup(batch, 0)
// We don't need a limiter here as we don't expect to call this function frequently.
case batch := <-ir.priorityKeys[1].keysCh:
doRollup(batch, 1)
// throttle to 1 batch = 16 rollups per 1 ms.
<-limiter.C
}
}
Expand Down Expand Up @@ -221,6 +239,14 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error {
keys = append(keys, key)
}

defer func() {
// Add these keys to be rolled up after we're done writing. This is the right place for them
// to be rolled up, because we just pushed these deltas over to Badger.
for _, key := range keys {
IncrRollup.addKeyToBatch([]byte(key), 1)
}
}()

var idx int
for idx < len(keys) {
// writer.update can return early from the loop in case we encounter badger.ErrTxnTooBig. On
Expand Down Expand Up @@ -328,7 +354,12 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
deltaCount := 0
defer func() {
if deltaCount > 0 {
IncrRollup.addKeyToBatch(key)
// If deltaCount is high, send it to high priority channel instead.
if deltaCount > 500 {
IncrRollup.addKeyToBatch(key, 0)
} else {
IncrRollup.addKeyToBatch(key, 1)
}
}
}()

Expand Down