Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

[WIP] rushed - shed extension #1006

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions swarm/shed/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,19 @@ type DB struct {
ldb *leveldb.DB
}

var leveldbOptions = &opt.Options{
Compression: opt.NoCompression,
BlockSize: 1 << 16,
// Default max open file descriptors (ulimit -n) is 256 on OS
// X, and >=1024 on (most?) Linux machines. So set to a low
// number since we have multiple leveldb instances.
OpenFilesCacheCapacity: 10,
}

// NewDB constructs a new DB and validates the schema
// if it exists in database on the given path.
func NewDB(path string) (db *DB, err error) {
ldb, err := leveldb.OpenFile(path, &opt.Options{
OpenFilesCacheCapacity: openFileLimit,
})
ldb, err := leveldb.OpenFile(path, leveldbOptions)
if err != nil {
return nil, err
}
Expand Down
163 changes: 163 additions & 0 deletions swarm/shed/rushed/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package rushed

import (
"context"
"errors"
"sync"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/syndtr/goleveldb/leveldb"
)

// Mode is an enum for modes of access/update
type Mode = int

var (
errDBClosed = errors.New("DB closed")
)

// Batch wraps leveldb.Batch extending it with a waitgroup and a done channel
type Batch struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't expose newBatch(). Do we need to expose Batch?
(or vice-versa, if we expose Batch maybe we should expose newBatch, too)

*leveldb.Batch
Done chan struct{} // to signal when batch is written
Err error // error resulting from write
}

// NewBatch constructs a new batch
func NewBatch() *Batch {
return &Batch{
Batch: new(leveldb.Batch),
Done: make(chan struct{}),
}
}

// DB extends shed DB with batch execution, garbage collection and iteration support with subscriptions
type DB struct {
*shed.DB // underlying shed.DB
update func(*Batch, Mode, *shed.IndexItem) error // mode-dependent update method
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on update/access I would be happy to see more comments :) At this point, as just getting familiar with the code, it's totally not clear what their purpose is

access func(Mode, *shed.IndexItem) error // mode-dependent access method
batch *Batch // current batch
mu sync.RWMutex // mutex for accessing current batch
c chan struct{} // channel to signal writes on
closed chan struct{} // closed when writeBatches loop quits
}

// New constructs a new DB
func New(sdb *shed.DB, update func(*Batch, Mode, *shed.IndexItem) error, access func(Mode, *shed.IndexItem) error) *DB {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe I would extract update and access to a private Type => it would keep the method signature clearer and their definition is already duplicated (see DB struct)

db := &DB{
DB: sdb,
update: update,
access: access,
batch: NewBatch(),
c: make(chan struct{}, 1),
closed: make(chan struct{}),
}
go db.writeBatches()
return db
}

// Close terminates loops by closing the quit channel
func (db *DB) Close() {
// signal quit to writeBatches loop
close(db.c)
// wait for last batch to be written
<-db.closed
db.DB.Close()
}

// Accessor is a wrapper around DB where Put/Get is overwritten to apply the
// update/access method for the mode
// using Mode(mode) the DB implements the ChunkStore interface
type Accessor struct {
mode Mode
*DB
}

// Mode returns the ChunkStore interface for the mode of update on a multimode update DB
func (db *DB) Mode(mode Mode) *Accessor {
return &Accessor{
mode: mode,
DB: db,
}
}

// Put overwrites the underlying DB Put method for the specific mode of update
func (u *Accessor) Put(ctx context.Context, ch storage.Chunk) error {
return u.Update(ctx, u.mode, newItemFromChunk(ch))
}

// Get overwrites the underlying DB Get method for the specific mode of access
func (u *Accessor) Get(_ context.Context, addr storage.Address) (storage.Chunk, error) {
item := newItemFromAddress(addr)
if err := u.access(u.mode, item); err != nil {
return nil, err
}
return storage.NewChunk(item.Address, item.Data), nil
}

// Update calls the update method for the specific mode with items
func (db *DB) Update(ctx context.Context, mode Mode, item *shed.IndexItem) error {
// obtain the current batch
// b := <-db.batch
db.mu.RLock()
b := db.batch
db.mu.RUnlock()
log.Debug("obtained batch")
if b == nil {
return errDBClosed
}
// call the update with the access mode
err := db.update(b, mode, item)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I the only one who favors the "simple statement" format if?
I think it's clearer that the variable only needed for a limited scope and does not pollute the scope.
(For me it's also easier to understand as I immediately know the scope I have to consider is just a few lines.)
=> if err:= foo(); err != nil { <3

return err
}
// wait for batch to be written and return batch error
// this is in order for Put calls to be synchronous
select {
case db.c <- struct{}{}:
default:
}
select {
case <-b.Done:
case <-ctx.Done():
return ctx.Err()
}
return b.Err
}

// writeBatches is a forever loop handing out the current batch to updaters
// and apply the batch when the db is free
// if the db is quit, the last batch is written out and batch channel is closed
func (db *DB) writeBatches() {
defer close(db.closed)
for range db.c {
db.mu.Lock()
b := db.batch
db.batch = NewBatch()
db.mu.Unlock()
db.writeBatch(b)
}
}

// writeBatch writes out the batch, sets the error and closes the done channel
func (db *DB) writeBatch(b *Batch) {
// apply the batch
b.Err = db.DB.WriteBatch(b.Batch)
// signal batch write to callers
close(b.Done)
}

func newItemFromChunk(ch storage.Chunk) *shed.IndexItem {
return &shed.IndexItem{
Address: ch.Address(),
Data: ch.Data(),
}
}

func newItemFromAddress(addr storage.Address) *shed.IndexItem {
return &shed.IndexItem{
Address: addr,
}
}
179 changes: 179 additions & 0 deletions swarm/shed/rushed/db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package rushed

import (
"bytes"
"context"
"encoding/binary"
"errors"
"flag"
"fmt"
"io/ioutil"
"os"
"sync"
"testing"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/ethereum/go-ethereum/swarm/storage"
colorable "github.com/mattn/go-colorable"
)

var (
loglevel = flag.Int("loglevel", 3, "verbosity of logs")
)

func init() {
flag.Parse()
log.PrintOrigins(true)
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
}

type tester struct {
index shed.Index
db *DB
}

func (t *tester) access(mode Mode, item *shed.IndexItem) error {
it, err := t.index.Get(*item)
if err != nil {
return err
}
*item = it
return nil
}

// update defines set accessors for different modes
func (t *tester) update(b *Batch, mode Mode, item *shed.IndexItem) error {
if mode != 0 {
return errors.New("no such mode")
}
return t.index.PutInBatch(b.Batch, *item)
}

func newTester(path string) (*tester, error) {
tester := new(tester)
sdb, err := shed.NewDB(path)
if err != nil {
return nil, err
}
tester.db = New(sdb, tester.update, tester.access)
tester.index, err = sdb.NewIndex("Hash->StoredTimestamp|AccessTimestamp|Data", shed.IndexFuncs{
EncodeKey: func(fields shed.IndexItem) (key []byte, err error) {
return fields.Address, nil
},
DecodeKey: func(key []byte) (e shed.IndexItem, err error) {
e.Address = key
return e, nil
},
EncodeValue: func(fields shed.IndexItem) (value []byte, err error) {
b := make([]byte, 16)
binary.BigEndian.PutUint64(b[:8], uint64(fields.StoreTimestamp))
binary.BigEndian.PutUint64(b[8:16], uint64(fields.AccessTimestamp))
value = append(b, fields.Data...)
return value, nil
},
DecodeValue: func(value []byte) (e shed.IndexItem, err error) {
e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
e.AccessTimestamp = int64(binary.BigEndian.Uint64(value[8:16]))
e.Data = value[16:]
return e, nil
},
})
if err != nil {
return nil, err
}
return tester, nil
}

func TestPutGet(t *testing.T) {
path, err := ioutil.TempDir("", "rushed-test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(path)
tester, err := newTester(path)
if err != nil {
t.Fatal(err)
}
defer tester.db.Close()
s := tester.db.Mode(0)
ch := storage.GenerateRandomChunk(chunk.DefaultSize)
log.Debug("put")
err = s.Put(context.Background(), ch)
if err != nil {
t.Fatal(err)
}
log.Debug("get")
got, err := s.Get(context.Background(), ch.Address())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(got.Data(), ch.Data()) {
t.Fatal("chunk data mismatch after retrieval")
}
}

type putter interface {
Put(context.Context, storage.Chunk) error
}

func (t *tester) Put(_ context.Context, ch storage.Chunk) error {
return t.index.Put(*(newItemFromChunk(ch)))
}
func BenchmarkPut(b *testing.B) {
n := 128
for j := 0; j < 2; j++ {
n *= 2
chunks := make([]storage.Chunk, n)
for k := 0; k < n; k++ {
chunks[k] = storage.GenerateRandomChunk(chunk.DefaultSize)
}
in := 1 * time.Nanosecond
for i := 0; i < 4; i++ {
for _, name := range []string{"shed", "rushed"} {
b.Run(fmt.Sprintf("N=%v Interval=%v, DB=%v", n, in, name), func(t *testing.B) {
benchmarkPut(t, chunks, in, name)
})
}
in *= time.Duration(100)
}
}
}

func benchmarkPut(b *testing.B, chunks []storage.Chunk, in time.Duration, name string) {
for i := 0; i < b.N; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • before this for loop I would generate the random chunks => so we don't benchmark the generate function, too
  • actually I would try to start only X number of Goroutines, where X should correlate to the number of CPUs (as we cannot run more in parallel anyway)
  • each Goroutine would get it's own slice and would try to iterate that as fast with put as possible

b.StopTimer()
path, err := ioutil.TempDir("", "rushed-test")
if err != nil {
b.Fatal(err)
}
tester, err := newTester(path)
if err != nil {
os.RemoveAll(path)
b.Fatal(err)
}
var db putter
if name == "shed" {
db = tester
} else {
db = tester.db.Mode(0)
}
var wg sync.WaitGroup
wg.Add(len(chunks))
ctx := context.Background()
b.StartTimer()
for _, ch := range chunks {
time.Sleep(in)
go func(chu storage.Chunk) {
defer wg.Done()
db.Put(ctx, chu)
}(ch)
}
wg.Wait()
b.StopTimer()
tester.db.Close()
os.RemoveAll(path)
}
}
Loading