Skip to content

Commit

Permalink
Rudimentary mutlihash journal: prep for partial keys
Browse files Browse the repository at this point in the history
  • Loading branch information
ribasushi committed Apr 21, 2024
1 parent d9212fd commit 093cb33
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 19 deletions.
193 changes: 174 additions & 19 deletions blockstore/badger/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@ import (
logger "github.com/ipfs/go-log/v2"
pool "github.com/libp2p/go-buffer-pool"
"github.com/multiformats/go-base32"
"github.com/multiformats/go-multihash"
"go.uber.org/zap"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/blockstore"
)

const (
mhJournalFilename = "MultiHashes.bin"
maxMhPrefixLen = 4 // 3-byte MhType varint, 1 byte len
hashBits = 256 // anything not this size will be rejected, see 🧵 https://filecoinproject.slack.com/archives/CRK2LKYHW/p1711381656211189?thread_ts=1711264671.316169&cid=CRK2LKYHW
)

var (
// KeyPool is the buffer pool we use to compute storage keys.
KeyPool *pool.BufferPool = pool.GlobalPool
Expand Down Expand Up @@ -103,6 +110,11 @@ const (
moveStateLock
)

type flushWriter interface {
io.WriteCloser
Sync() error
}

// Blockstore is a badger-backed IPLD blockstore.
type Blockstore struct {
stateLk sync.RWMutex
Expand All @@ -114,9 +126,13 @@ type Blockstore struct {
moveState bsMoveState
rlock int

db *badger.DB
dbNext *badger.DB // when moving
opts Options
db *badger.DB
mhJournal flushWriter

dbNext *badger.DB // when moving
mhJournalNext flushWriter

opts Options

prefixing bool
prefix []byte
Expand Down Expand Up @@ -151,9 +167,37 @@ func Open(opts Options) (*Blockstore, error) {

bs.moveCond.L = &bs.moveMx

if !opts.ReadOnly {
bs.mhJournal, err = openJournal(opts.Dir)
if err != nil {
return nil, err
}
}

return bs, nil
}

var fadvWriter func(uintptr) error

func openJournal(dir string) (*os.File, error) {
fh, err := os.OpenFile(
dir+"/"+mhJournalFilename,
os.O_APPEND|os.O_WRONLY|os.O_CREATE,
0644,
)
if err != nil {
return nil, err
}

if fadvWriter != nil {
if err := fadvWriter(fh.Fd()); err != nil {
return nil, err
}
}

return fh, nil
}

// Close closes the store. If the store has already been closed, this noops and
// returns an error, even if the first closure resulted in error.
func (b *Blockstore) Close() error {
Expand All @@ -174,7 +218,25 @@ func (b *Blockstore) Close() error {
// wait for all accesses to complete
b.viewers.Wait()

return b.db.Close()
var err error

if errDb := b.db.Close(); errDb != nil {
errDb = xerrors.Errorf("failure closing the badger blockstore: %w", errDb)
log.Warn(errDb)
err = errDb
}

if b.mhJournal != nil {
if errMj := b.mhJournal.Close(); errMj != nil {
errMj = xerrors.Errorf("failure closing the multihash journal: %w", errMj)
log.Warn(errMj)
if err == nil {
err = errMj
}
}
}

return err
}

func (b *Blockstore) access() error {
Expand Down Expand Up @@ -267,6 +329,8 @@ func (b *Blockstore) movingGC(ctx context.Context) error {

dbNext := b.dbNext
b.dbNext = nil
mhJournalNext := b.mhJournalNext
b.mhJournalNext = nil

var state bsMoveState
if dbNext != nil {
Expand All @@ -279,10 +343,12 @@ func (b *Blockstore) movingGC(ctx context.Context) error {

if dbNext != nil {
// the move failed and we have a left-over db; delete it.
err := dbNext.Close()
if err != nil {
if err := dbNext.Close(); err != nil {
log.Warnf("error closing badger db: %s", err)
}
if err := mhJournalNext.Close(); err != nil {
log.Warnf("error closing multihash journal: %s", err)
}
b.deleteDB(newPath)

b.lockMove()
Expand Down Expand Up @@ -316,32 +382,45 @@ func (b *Blockstore) movingGC(ctx context.Context) error {
opts := b.opts
opts.Dir = newPath
opts.ValueDir = newPath
opts.ReadOnly = false // by definition the new copy is writable (we just wrote it)

dbNew, err := badger.Open(opts.Options)
if err != nil {
return fmt.Errorf("failed to open badger blockstore in %s: %w", newPath, err)
}
mhjNew, err := openJournal(opts.Dir)
if err != nil {
return err
}

b.lockMove()
b.dbNext = dbNew
b.mhJournalNext = mhjNew
b.unlockMove(moveStateMoving)

log.Info("copying blockstore")
err = b.doCopy(ctx, b.db, b.dbNext)
err = b.doCopy(ctx, b.db, b.dbNext, b.mhJournalNext)
if err != nil {
return fmt.Errorf("error moving badger blockstore to %s: %w", newPath, err)
}

b.lockMove()
dbOld := b.db
b.db = b.dbNext
mhjOld := b.mhJournal
b.mhJournal = b.mhJournalNext
b.dbNext = nil
b.mhJournalNext = nil
b.unlockMove(moveStateCleanup)

err = dbOld.Close()
if err != nil {
if err := dbOld.Close(); err != nil {
log.Warnf("error closing old badger db: %s", err)
}
if mhjOld != nil {
if err := mhjOld.Close(); err != nil {
log.Warnf("error closing old multihash journal: %s", err)
}
}

// this is the canonical db path; this is where our db lives.
dbPath := b.opts.Dir
Expand Down Expand Up @@ -390,7 +469,7 @@ func symlink(path, linkTo string) error {
}

// doCopy copies a badger blockstore to another
func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB) (defErr error) {
func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB, jrnlFh io.Writer) (defErr error) {
batch := to.NewWriteBatch()
defer func() {
if defErr == nil {
Expand All @@ -406,11 +485,47 @@ func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB) (defErr er
if err := ctx.Err(); err != nil {
return err
}

jrnlSlab := pool.Get(len(kvs) * (maxMhPrefixLen + hashLen))
defer pool.Put(jrnlSlab)
jrnl := jrnlSlab[:0]

mhBuf := pool.Get(maxMhPrefixLen + hashLen)
defer pool.Put(mhBuf)

var mhDec *multihash.DecodedMultihash

for _, kv := range kvs {

n, err := base32.RawStdEncoding.Decode(mhBuf, kv.Key[b.prefixLen:])
if err != nil {
return xerrors.Errorf("undecodeable key 0x%X: %s", kv.Key, err)
}
mhDec, err = multihash.Decode(mhBuf[:n])
if err != nil {
return xerrors.Errorf("unexpected multihash 0x%X for key 0x%X: %s", mhBuf[:n], kv.Key, err)
}
if mhDec.Length != hashLen {
return xerrors.Errorf("unsupported hash length of %d bits for key 0x%X", mhDec.Length*8, kv.Key)
}
if mhDec.Code > maxMhType {
return xerrors.Errorf("unsupported mhtype %d for key 0x%X: varint encoding wider than %d bytes", mhDec.Code, kv.Key, maxMhPrefixLen-1)
}

if err := batch.Set(kv.Key, kv.Value); err != nil {
return err
}
// add a journal record
if pad := maxMhPrefixLen + hashLen - n; pad > 0 {
jrnl = append(jrnl, make([]byte, pad)...)
}
jrnl = append(jrnl, mhBuf[:n]...)
}

if _, err := jrnlFh.Write(jrnl); err != nil {
return xerrors.Errorf("failed to write multihashes to journal: %w", err)
}

return nil
})
}
Expand Down Expand Up @@ -676,11 +791,19 @@ func (b *Blockstore) Flush(context.Context) error {
if err := b.dbNext.Sync(); err != nil {
return err
}
if err := b.mhJournalNext.Sync(); err != nil {
return err
}
}

if err := b.db.Sync(); err != nil {
return err
}
if b.mhJournal != nil {
if err := b.mhJournal.Sync(); err != nil {
return err
}
}

return nil
}
Expand Down Expand Up @@ -812,15 +935,19 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
}()
}

keys := make([][]byte, 0, len(blocks))
for _, block := range blocks {
keys := make([][]byte, len(blocks))
for i, block := range blocks {
k, pooled := b.PooledStorageKey(block.Cid())
if pooled {
toReturn = append(toReturn, k)
}
keys = append(keys, k)
keys[i] = k
}

jrnlSlab := pool.Get(len(blocks) * (maxMhPrefixLen + hashLen))
defer pool.Put(jrnlSlab)
jrnl := jrnlSlab[:0]

if err := b.db.View(func(txn *badger.Txn) error {
for i, k := range keys {
val, err := badgerGet(txn, k)
Expand All @@ -830,14 +957,34 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
} else if val != nil {
// Already have it
keys[i] = nil
} else {
// Got to insert that, check its well-formedness
c := blocks[i].Cid()
mh := c.Hash()
mhDec, err := multihash.Decode(mh)
if err != nil {
return err
}
if mhDec.Length != hashLen {
return xerrors.Errorf("unsupported hash length of %d bits for cid %s", mhDec.Length*8, c)
}
if mhDec.Code > maxMhType {
return xerrors.Errorf("unsupported mhtype %d for cid %s: varint encoding wider than %d bytes", mhDec.Code, c, maxMhPrefixLen-1)
}

// add a journal record
if pad := maxMhPrefixLen + hashLen - len(mh); pad > 0 {
jrnl = append(jrnl, make([]byte, pad)...)
}
jrnl = append(jrnl, mh...)
}
}
return nil
}); err != nil {
return err
}

put := func(db *badger.DB) error {
put := func(db *badger.DB, mhj flushWriter) error {
batch := db.NewWriteBatch()
defer batch.Cancel()

Expand All @@ -852,20 +999,23 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
}
}

err := batch.Flush()
if err != nil {
return fmt.Errorf("failed to put blocks in badger blockstore: %w", err)
if err := batch.Flush(); err != nil {
return xerrors.Errorf("failed to put blocks in badger blockstore: %w", err)
}

if _, err := mhj.Write(jrnl); err != nil {
return xerrors.Errorf("failed to write multihashes to journal: %w", err)
}

return nil
}

if err := put(b.db); err != nil {
if err := put(b.db, b.mhJournal); err != nil {
return err
}

if b.dbNext != nil {
if err := put(b.dbNext); err != nil {
if err := put(b.dbNext, b.mhJournalNext); err != nil {
return err
}
}
Expand Down Expand Up @@ -1035,6 +1185,11 @@ func (b *Blockstore) HashOnRead(_ bool) {
log.Warnf("called HashOnRead on badger blockstore; function not supported; ignoring")
}

const (
hashLen = hashBits / 8
maxMhType = 1<<((maxMhPrefixLen-1)*7) - 1
)

// PooledStorageKey returns the storage key under which this CID is stored.
//
// The key is: prefix + base32_no_padding(cid.Hash)
Expand Down
17 changes: 17 additions & 0 deletions blockstore/badger/blockstore_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package badgerbs

import (
"golang.org/x/sys/unix"
"golang.org/x/xerrors"
)

func init() {
fadvWriter = func(fd uintptr) error {
for _, adv := range []int{unix.FADV_NOREUSE, unix.FADV_DONTNEED} {
if err := unix.Fadvise(int(fd), 0, 0, adv); err != nil {
return xerrors.Errorf("fadvise %d failed: %w", adv, err)
}
}
return nil
}
}

0 comments on commit 093cb33

Please sign in to comment.