diff --git a/swarm/shed/db.go b/swarm/shed/db.go index 987c89dcff..325aebabd7 100644 --- a/swarm/shed/db.go +++ b/swarm/shed/db.go @@ -40,12 +40,19 @@ type DB struct { ldb *leveldb.DB } +var leveldbOptions = &opt.Options{ + Compression: opt.NoCompression, + BlockSize: 1 << 16, + // Default max open file descriptors (ulimit -n) is 256 on OS + // X, and >=1024 on (most?) Linux machines. So set to a low + // number since we have multiple leveldb instances. + OpenFilesCacheCapacity: 10, +} + // NewDB constructs a new DB and validates the schema // if it exists in database on the given path. func NewDB(path string) (db *DB, err error) { - ldb, err := leveldb.OpenFile(path, &opt.Options{ - OpenFilesCacheCapacity: openFileLimit, - }) + ldb, err := leveldb.OpenFile(path, leveldbOptions) if err != nil { return nil, err } diff --git a/swarm/shed/rushed/db.go b/swarm/shed/rushed/db.go new file mode 100644 index 0000000000..69d330a5be --- /dev/null +++ b/swarm/shed/rushed/db.go @@ -0,0 +1,163 @@ +package rushed + +import ( + "context" + "errors" + "sync" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/ethereum/go-ethereum/swarm/storage" + "github.com/syndtr/goleveldb/leveldb" +) + +// Mode is an enum for modes of access/update +type Mode = int + +var ( + errDBClosed = errors.New("DB closed") +) + +// Batch wraps leveldb.Batch extending it with a waitgroup and a done channel +type Batch struct { + *leveldb.Batch + Done chan struct{} // to signal when batch is written + Err error // error resulting from write +} + +// NewBatch constructs a new batch +func NewBatch() *Batch { + return &Batch{ + Batch: new(leveldb.Batch), + Done: make(chan struct{}), + } +} + +// DB extends shed DB with batch execution, garbage collection and iteration support with subscriptions +type DB struct { + *shed.DB // underlying shed.DB + update func(*Batch, Mode, *shed.IndexItem) error // mode-dependent update method + access func(Mode, *shed.IndexItem) error // mode-dependent access method + batch *Batch // current batch + mu sync.RWMutex // mutex for accessing current batch + c chan struct{} // channel to signal writes on + closed chan struct{} // closed when writeBatches loop quits +} + +// New constructs a new DB +func New(sdb *shed.DB, update func(*Batch, Mode, *shed.IndexItem) error, access func(Mode, *shed.IndexItem) error) *DB { + db := &DB{ + DB: sdb, + update: update, + access: access, + batch: NewBatch(), + c: make(chan struct{}, 1), + closed: make(chan struct{}), + } + go db.writeBatches() + return db +} + +// Close terminates loops by closing the quit channel +func (db *DB) Close() { + // signal quit to writeBatches loop + close(db.c) + // wait for last batch to be written + <-db.closed + db.DB.Close() +} + +// Accessor is a wrapper around DB where Put/Get is overwritten to apply the +// update/access method for the mode +// using Mode(mode) the DB implements the ChunkStore interface +type Accessor struct { + mode Mode + *DB +} + +// Mode returns the ChunkStore interface for the mode of update on a multimode update DB +func (db *DB) Mode(mode Mode) *Accessor { + return &Accessor{ + mode: mode, + DB: db, + } +} + +// Put overwrites the underlying DB Put method for the specific mode of update +func (u *Accessor) Put(ctx context.Context, ch storage.Chunk) error { + return u.Update(ctx, u.mode, newItemFromChunk(ch)) +} + +// Get overwrites the underlying DB Get method for the specific mode of access +func (u *Accessor) Get(_ context.Context, addr storage.Address) (storage.Chunk, error) { + item := newItemFromAddress(addr) + if err := u.access(u.mode, item); err != nil { + return nil, err + } + return storage.NewChunk(item.Address, item.Data), nil +} + +// Update calls the update method for the specific mode with items +func (db *DB) Update(ctx context.Context, mode Mode, item *shed.IndexItem) error { + // obtain the current batch + // b := <-db.batch + db.mu.RLock() + b := db.batch + db.mu.RUnlock() + log.Debug("obtained batch") + if b == nil { + return errDBClosed + } + // call the update with the access mode + err := db.update(b, mode, item) + if err != nil { + return err + } + // wait for batch to be written and return batch error + // this is in order for Put calls to be synchronous + select { + case db.c <- struct{}{}: + default: + } + select { + case <-b.Done: + case <-ctx.Done(): + return ctx.Err() + } + return b.Err +} + +// writeBatches is a forever loop handing out the current batch to updaters +// and apply the batch when the db is free +// if the db is quit, the last batch is written out and batch channel is closed +func (db *DB) writeBatches() { + defer close(db.closed) + for range db.c { + db.mu.Lock() + b := db.batch + db.batch = NewBatch() + db.mu.Unlock() + db.writeBatch(b) + } +} + +// writeBatch writes out the batch, sets the error and closes the done channel +func (db *DB) writeBatch(b *Batch) { + // apply the batch + b.Err = db.DB.WriteBatch(b.Batch) + // signal batch write to callers + close(b.Done) +} + +func newItemFromChunk(ch storage.Chunk) *shed.IndexItem { + return &shed.IndexItem{ + Address: ch.Address(), + Data: ch.Data(), + } +} + +func newItemFromAddress(addr storage.Address) *shed.IndexItem { + return &shed.IndexItem{ + Address: addr, + } +} diff --git a/swarm/shed/rushed/db_test.go b/swarm/shed/rushed/db_test.go new file mode 100644 index 0000000000..5529368301 --- /dev/null +++ b/swarm/shed/rushed/db_test.go @@ -0,0 +1,179 @@ +package rushed + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "flag" + "fmt" + "io/ioutil" + "os" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/swarm/chunk" + "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/ethereum/go-ethereum/swarm/storage" + colorable "github.com/mattn/go-colorable" +) + +var ( + loglevel = flag.Int("loglevel", 3, "verbosity of logs") +) + +func init() { + flag.Parse() + log.PrintOrigins(true) + log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true)))) +} + +type tester struct { + index shed.Index + db *DB +} + +func (t *tester) access(mode Mode, item *shed.IndexItem) error { + it, err := t.index.Get(*item) + if err != nil { + return err + } + *item = it + return nil +} + +// update defines set accessors for different modes +func (t *tester) update(b *Batch, mode Mode, item *shed.IndexItem) error { + if mode != 0 { + return errors.New("no such mode") + } + return t.index.PutInBatch(b.Batch, *item) +} + +func newTester(path string) (*tester, error) { + tester := new(tester) + sdb, err := shed.NewDB(path) + if err != nil { + return nil, err + } + tester.db = New(sdb, tester.update, tester.access) + tester.index, err = sdb.NewIndex("Hash->StoredTimestamp|AccessTimestamp|Data", shed.IndexFuncs{ + EncodeKey: func(fields shed.IndexItem) (key []byte, err error) { + return fields.Address, nil + }, + DecodeKey: func(key []byte) (e shed.IndexItem, err error) { + e.Address = key + return e, nil + }, + EncodeValue: func(fields shed.IndexItem) (value []byte, err error) { + b := make([]byte, 16) + binary.BigEndian.PutUint64(b[:8], uint64(fields.StoreTimestamp)) + binary.BigEndian.PutUint64(b[8:16], uint64(fields.AccessTimestamp)) + value = append(b, fields.Data...) + return value, nil + }, + DecodeValue: func(value []byte) (e shed.IndexItem, err error) { + e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8])) + e.AccessTimestamp = int64(binary.BigEndian.Uint64(value[8:16])) + e.Data = value[16:] + return e, nil + }, + }) + if err != nil { + return nil, err + } + return tester, nil +} + +func TestPutGet(t *testing.T) { + path, err := ioutil.TempDir("", "rushed-test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(path) + tester, err := newTester(path) + if err != nil { + t.Fatal(err) + } + defer tester.db.Close() + s := tester.db.Mode(0) + ch := storage.GenerateRandomChunk(chunk.DefaultSize) + log.Debug("put") + err = s.Put(context.Background(), ch) + if err != nil { + t.Fatal(err) + } + log.Debug("get") + got, err := s.Get(context.Background(), ch.Address()) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(got.Data(), ch.Data()) { + t.Fatal("chunk data mismatch after retrieval") + } +} + +type putter interface { + Put(context.Context, storage.Chunk) error +} + +func (t *tester) Put(_ context.Context, ch storage.Chunk) error { + return t.index.Put(*(newItemFromChunk(ch))) +} +func BenchmarkPut(b *testing.B) { + n := 128 + for j := 0; j < 2; j++ { + n *= 2 + chunks := make([]storage.Chunk, n) + for k := 0; k < n; k++ { + chunks[k] = storage.GenerateRandomChunk(chunk.DefaultSize) + } + in := 1 * time.Nanosecond + for i := 0; i < 4; i++ { + for _, name := range []string{"shed", "rushed"} { + b.Run(fmt.Sprintf("N=%v Interval=%v, DB=%v", n, in, name), func(t *testing.B) { + benchmarkPut(t, chunks, in, name) + }) + } + in *= time.Duration(100) + } + } +} + +func benchmarkPut(b *testing.B, chunks []storage.Chunk, in time.Duration, name string) { + for i := 0; i < b.N; i++ { + b.StopTimer() + path, err := ioutil.TempDir("", "rushed-test") + if err != nil { + b.Fatal(err) + } + tester, err := newTester(path) + if err != nil { + os.RemoveAll(path) + b.Fatal(err) + } + var db putter + if name == "shed" { + db = tester + } else { + db = tester.db.Mode(0) + } + var wg sync.WaitGroup + wg.Add(len(chunks)) + ctx := context.Background() + b.StartTimer() + for _, ch := range chunks { + time.Sleep(in) + go func(chu storage.Chunk) { + defer wg.Done() + db.Put(ctx, chu) + }(ch) + } + wg.Wait() + b.StopTimer() + tester.db.Close() + os.RemoveAll(path) + } +} diff --git a/swarm/shed/rushed/subscribe.go b/swarm/shed/rushed/subscribe.go new file mode 100644 index 0000000000..0307936675 --- /dev/null +++ b/swarm/shed/rushed/subscribe.go @@ -0,0 +1,121 @@ +package rushed + +import ( + "errors" + + "github.com/ethereum/go-ethereum/swarm/shed" +) + +const ( + iterBatchSize = 128 +) + +var ( + errCancelled = errors.New("cancelled") +) + +type Subscription struct { + cancel chan struct{} // cancel the subscription + err error +} + +func Subscribe(index *shed.Index, buffer chan *shed.IndexItem, from *shed.IndexItem, trigger chan struct{}) *Subscription { + cancel := make(chan struct{}) + f := func(item *shed.IndexItem) (bool, error) { + select { + case buffer <- item: + return false, nil + case <-cancel: + return false, errCancelled + } + } + s := &Subscription{ + cancel: cancel, + } + wait := func() (bool, error) { + select { + case <-trigger: + return false, nil + case <-cancel: + return false, errCancelled + } + } + go func() { + defer close(buffer) + s.err = Iterate(index, from, f, wait) + }() + return s +} + +// iterate is a wrapper to shed.IterateFrom that periodically iterates starting from 'from' +// and remembering the last item on each round and continue the iteration from this on the +// following round +// once the items are retrieved in a fixed slice of max iterBatchSize elements +// it iterates over this slice and applies f to each element +// f returns a bool which when true terminates the iteration +// error returned from f result in terminating the iteration and returning the error +// if the iterator reached the last item in the index it calls the wait function +func Iterate(index *shed.Index, from *shed.IndexItem, f func(*shed.IndexItem) (bool, error), wait func() (bool, error)) error { + items := make([]*shed.IndexItem, iterBatchSize) + pos := 0 + cur := from + size := 0 + // define feed function that populates the items slice + feed := func(item shed.IndexItem) (bool, error) { + // assign the item at pos + items[pos] = &item + pos++ + cur = &item + // if reached the end, stop + if pos == len(items) { + return true, nil + } + return false, nil + } + // read when called triggers an IterateFrom on the index, populates the items slice + read := func() (int, error) { + defer func() { pos = 0 }() + for { + if err := index.IterateFrom(*cur, feed); err != nil { + return size, err + } + if size > 0 { + break + } + // if no items are available it calls wait and returns if stop or error + stop, err := wait() + if err != nil { + return 0, err + } + if stop { + return 0, nil + } + } + return size, nil + } + cnt := 0 + for { + if cnt == size { + // retrieved items are all fed to buffer + // get a new batch + // if c is buffered channel, it can still get items while batch is read from disk + // size items read, last is set if after size element no more needed + var err error + size, err = read() + if err != nil { + return err + } + cnt = 0 + } + // calls f on the item + stop, err := f(items[cnt]) + if err != nil { + return err + } + if stop { + break + } + cnt++ + } + return nil +} diff --git a/swarm/storage/localstore/db.go b/swarm/storage/localstore/db.go new file mode 100644 index 0000000000..c4f8ad3e00 --- /dev/null +++ b/swarm/storage/localstore/db.go @@ -0,0 +1,230 @@ +package localstore + +import ( + "context" + "encoding/binary" + "errors" + "time" + + "github.com/ethereum/go-ethereum/swarm/shed" + "github.com/syndtr/goleveldb/leveldb" +) + +/* + types of access: + - just get the data + - increment access index + + - when uploaded or pull synced + - when delivered + - when push synced + - when accessed +*/ + +var ( + errInvalidMode = errors.New("invalid mode") +) + +// Modes of access/update +const ( + SYNCING rushed.Mode = iota + UPLOAD + REQUEST + SYNCED + ACCESS + REMOVAL +) + +// DB is a local chunkstore using disk storage +type DB struct { + *rushed.DB + // fields and indexes + schemaName shed.StringField + size shed.Uint64Field + retrieval shed.Index + push shed.Index + pull shed.Index + gc shed.Index +} + +// NewDB constructs a local chunks db +func NewDB(path string) (*DB, error) { + db := new(DB) + sdb := shed.NewDB(path) + db.DB = rushed.NewDB(sdb, db.update, db.access) + db.schemaName, err = idb.NewStringField("schema-name") + if err != nil { + return nil, err + } + db.size, err = idb.NewUint64Field("size") + if err != nil { + return nil, err + } + db.retrieval, err = idb.NewIndex("Hash->StoredTimestamp|AccessTimestamp|Data", shed.IndexFuncs{ + EncodeKey: func(fields shed.IndexItem) (key []byte, err error) { + return fields.Hash, nil + }, + DecodeKey: func(key []byte) (e shed.IndexItem, err error) { + e.Hash = key + return e, nil + }, + EncodeValue: func(fields shed.IndexItem) (value []byte, err error) { + b := make([]byte, 16) + binary.BigEndian.PutUint64(b[:8], uint64(fields.StoreTimestamp)) + binary.BigEndian.PutUint64(b[8:16], uint64(fields.AccessTimestamp)) + value = append(b, fields.Data...) + return value, nil + }, + DecodeValue: func(value []byte) (e shed.IndexItem, err error) { + e.StoredTimestamp = int64(binary.BigEndian.Uint64(value[:8])) + e.AccessTimestamp = int64(binary.BigEndian.Uint64(value[8:16])) + e.Data = value[16:] + return e, nil + }, + }) + if err != nil { + return nil, err + } + // pull index allows history and live syncing per po bin + db.pull, err = idb.NewIndex("PO|StoredTimestamp|Hash->nil", shed.IndexFuncs{ + EncodeKey: func(fields shed.IndexItem) (key []byte, err error) { + key = make([]byte, 41) + key[0] = byte(uint8(db.po(fields.Hash))) + binary.BigEndian.PutUint64(key[1:9], fields.StoredTimestamp) + copy(key[9:], fields.Hash[:]) + return key, nil + }, + DecodeKey: func(key []byte) (e shed.IndexItem, err error) { + e.Hash = key[9:] + e.StoredTimestamp = int64(binary.BigEndian.Uint64(key[1:9])) + return e, nil + }, + EncodeValue: func(fields shed.IndexItem) (value []byte, err error) { + return nil, nil + }, + DecodeValue: func(value []byte) (e shed.IndexItem, err error) { + return e, nil + }, + }) + if err != nil { + return nil, err + } + // push index contains as yet unsynced chunks + db.push, err = idb.NewIndex("StoredTimestamp|Hash->nil", shed.IndexFuncs{ + EncodeKey: func(fields shed.IndexItem) (key []byte, err error) { + key = make([]byte, 40) + binary.BigEndian.PutUint64(key[:8], fields.StoredTimestamp) + copy(key[8:], fields.Hash[:]) + return key, nil + }, + DecodeKey: func(key []byte) (e shed.IndexItem, err error) { + e.Hash = key[8:] + e.StoredTimestamp = int64(binary.BigEndian.Uint64(key[:8])) + return e, nil + }, + EncodeValue: func(fields shed.IndexItem) (value []byte, err error) { + return nil, nil + }, + DecodeValue: func(value []byte) (e shed.IndexItem, err error) { + e.AccessTimestamp = int64(binary.BigEndian.Uint64(value)) + return e, nil + }, + }) + if err != nil { + return nil, err + } + // gc index for removable chunk ordered by ascending last access time + db.gcIndex, err = idb.NewIndex("AccessTimestamp|StoredTimestamp|Hash->nil", shed.IndexFuncs{ + EncodeKey: func(fields shed.IndexItem) (key []byte, err error) { + b := make([]byte, 16, 16+len(fields.Hash)) + binary.BigEndian.PutUint64(b[:8], uint64(fields.AccessTimestamp)) + binary.BigEndian.PutUint64(b[8:16], uint64(fields.StoreTimestamp)) + key = append(b, fields.Hash...) + return key, nil + }, + DecodeKey: func(key []byte) (e shed.IndexItem, err error) { + e.AccessTimestamp = int64(binary.BigEndian.Uint64(key[:8])) + e.StoreTimestamp = int64(binary.BigEndian.Uint64(key[8:16])) + e.Hash = key[16:] + return e, nil + }, + EncodeValue: func(fields shed.IndexItem) (value []byte, err error) { + return nil, nil + }, + DecodeValue: func(value []byte) (e shed.IndexItem, err error) { + return e, nil + }, + }) + if err != nil { + return nil, err + } + return db, nil +} + +// access defines get accessors for different modes +func (db *DB) access(b *leveldb.Batch, mode rushed.Mode, item *shed.IndexItem) error { + err := db.retrieve.Get(item) + switch mode { + case SYNCING: + case TESTSYNCING: + case REQUEST: + return db.Update(context.TODO(), REQUEST, item) + default: + return errInvalidMode + } + return nil +} + +// update defines set accessors for different modes +func (db *DB) update(b *rushed.Batch, mode rushed.Mode, item *shed.IndexItem) error { + switch mode { + case SYNCING: + // put to indexes: retrieve, pull + item.StoredTimestamp = now() + item.AccessTimestamp = now() + db.retrieve.PutInBatch(b, item) + db.pull.PutInBatch(b, item) + db.size.IncInBatch(b) + + case UPLOAD: + // put to indexes: retrieve, push, pull + item.StoredTimestamp = now() + item.AccessTimestamp = now() + db.retrieve.PutInBatch(b, item) + db.pull.PutInBatch(b, item) + db.push.PutInBatch(b, item) + + case REQUEST: + // put to indexes: retrieve, gc + item.StoredTimestamp = now() + item.AccessTimestamp = now() + db.retrieve.PutInBatch(b, item) + db.gc.PutInBatch(b, item) + + case SYNCED: + // delete from push, insert to gc + db.push.DeleteInBatch(b, item) + db.gc.PutInBatch(b, item) + + case ACCESS: + // update accessTimeStamp in retrieve, gc + db.gc.DeleteInBatch(b, item) + item.AccessTimestamp = now() + db.retrieve.PutInBatch(b, item) + db.gc.PutInBatch(b, item) + + case REMOVAL: + // delete from retrieve, pull, gc + db.retrieve.DeleteInBatch(b, item) + db.pull.DeleteInBatch(b, item) + db.gc.DeleteInBatch(b, item) + + default: + return errInvalidMode + } + return nil +} + +func now() uint64 { + return uint64(time.Now().UnixNano()) +}