Skip to content

Commit

Permalink
autobatch: batch deletes
Browse files Browse the repository at this point in the history
  • Loading branch information
Stebalien committed Apr 13, 2019
1 parent 5525660 commit 301890e
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 21 deletions.
54 changes: 34 additions & 20 deletions autobatch/autobatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,46 +13,48 @@ type Datastore struct {
child ds.Batching

// TODO: discuss making ds.Batch implement the full ds.Datastore interface
buffer map[ds.Key][]byte
buffer map[ds.Key]op
maxBufferEntries int
}

type op struct {
delete bool
value []byte
}

// NewAutoBatching returns a new datastore that automatically
// batches writes using the given Batching datastore. The size
// of the memory pool is given by size.
func NewAutoBatching(d ds.Batching, size int) *Datastore {
return &Datastore{
child: d,
buffer: make(map[ds.Key][]byte),
buffer: make(map[ds.Key]op, size),
maxBufferEntries: size,
}
}

// Delete deletes a key/value
func (d *Datastore) Delete(k ds.Key) error {
_, found := d.buffer[k]
delete(d.buffer, k)

err := d.child.Delete(k)
if found && err == ds.ErrNotFound {
return nil
}
return err
d.buffer[k] = op{delete: true}
return nil
}

// Get retrieves a value given a key.
func (d *Datastore) Get(k ds.Key) ([]byte, error) {
val, ok := d.buffer[k]
o, ok := d.buffer[k]
if ok {
return val, nil
if o.delete {
return nil, ds.ErrNotFound
}
return o.value, nil
}

return d.child.Get(k)
}

// Put stores a key/value.
func (d *Datastore) Put(k ds.Key, val []byte) error {
d.buffer[k] = val
d.buffer[k] = op{value: val}
if len(d.buffer) > d.maxBufferEntries {
return d.Flush()
}
Expand All @@ -66,33 +68,45 @@ func (d *Datastore) Flush() error {
return err
}

for k, v := range d.buffer {
err := b.Put(k, v)
for k, o := range d.buffer {
var err error
if o.delete {
err = b.Delete(k)
if err == ds.ErrNotFound {
// Ignore these, let delete be idempotent.
err = nil
}
} else {
err = b.Put(k, o.value)
}
if err != nil {
return err
}
}
// clear out buffer
d.buffer = make(map[ds.Key][]byte)
d.buffer = make(map[ds.Key]op, d.maxBufferEntries)

return b.Commit()
}

// Has checks if a key is stored.
func (d *Datastore) Has(k ds.Key) (bool, error) {
_, ok := d.buffer[k]
o, ok := d.buffer[k]
if ok {
return true, nil
return !o.delete, nil
}

return d.child.Has(k)
}

// GetSize implements Datastore.GetSize
func (d *Datastore) GetSize(k ds.Key) (int, error) {
v, ok := d.buffer[k]
o, ok := d.buffer[k]
if ok {
return len(v), nil
if o.delete {
return -1, ds.ErrNotFound
}
return len(o.value), nil
}

return d.child.GetSize(k)
Expand Down
53 changes: 52 additions & 1 deletion autobatch/autobatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,48 @@ func TestFlushing(t *testing.T) {
}
}

// Get works normally.
for _, k := range keys {
val, err := d.Get(k)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(val, v) {
t.Fatal("wrong value")
}
}

// Not flushed
_, err := child.Get(keys[0])
if err != ds.ErrNotFound {
t.Fatal("shouldnt have found value")
}

// Delete works.
err = d.Delete(keys[14])
if err != nil {
t.Fatal(err)
}
_, err = d.Get(keys[14])
if err != ds.ErrNotFound {
t.Fatal(err)
}

// Still not flushed
_, err = child.Get(keys[0])
if err != ds.ErrNotFound {
t.Fatal("shouldnt have found value")
}

// Final put flushes.
err = d.Put(ds.NewKey("test16"), v)
if err != nil {
t.Fatal(err)
}

// should be flushed now, try to get keys from child datastore
for _, k := range keys {
for _, k := range keys[:14] {
val, err := child.Get(k)
if err != nil {
t.Fatal(err)
Expand All @@ -51,4 +81,25 @@ func TestFlushing(t *testing.T) {
t.Fatal("wrong value")
}
}

// Never flushed the deleted key.
_, err = child.Get(keys[14])
if err != ds.ErrNotFound {
t.Fatal("shouldnt have found value")
}

// Delete doesn't flush
err = d.Delete(keys[0])
if err != nil {
t.Fatal(err)
}

val, err := child.Get(keys[0])
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(val, v) {
t.Fatal("wrong value")
}
}

0 comments on commit 301890e

Please sign in to comment.