Skip to content

Commit

Permalink
Rudimentary mutlihash journal: prep for partial keys
Browse files Browse the repository at this point in the history
  • Loading branch information
ribasushi committed Apr 19, 2024
1 parent d9212fd commit d33ea05
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 16 deletions.
145 changes: 129 additions & 16 deletions blockstore/badger/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@ import (
logger "github.com/ipfs/go-log/v2"
pool "github.com/libp2p/go-buffer-pool"
"github.com/multiformats/go-base32"
"github.com/multiformats/go-multihash"
"go.uber.org/zap"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/blockstore"
)

const (
mhJournalFilename = "MultiHashes.bin"
maxMhPrefixLen = 4 // 3-byte MhType varint, 1 byte len
hashBits = 256 // anything not this size will be rejected, see 🧵 https://filecoinproject.slack.com/archives/CRK2LKYHW/p1711381656211189?thread_ts=1711264671.316169&cid=CRK2LKYHW
)

var (
// KeyPool is the buffer pool we use to compute storage keys.
KeyPool *pool.BufferPool = pool.GlobalPool
Expand Down Expand Up @@ -103,6 +110,11 @@ const (
moveStateLock
)

type flushWriter interface {
io.WriteCloser
Sync() error
}

// Blockstore is a badger-backed IPLD blockstore.
type Blockstore struct {
stateLk sync.RWMutex
Expand All @@ -114,9 +126,13 @@ type Blockstore struct {
moveState bsMoveState
rlock int

db *badger.DB
dbNext *badger.DB // when moving
opts Options
db *badger.DB
mhJournal flushWriter

dbNext *badger.DB // when moving
mhJournalNext flushWriter

opts Options

prefixing bool
prefix []byte
Expand Down Expand Up @@ -151,9 +167,35 @@ func Open(opts Options) (*Blockstore, error) {

bs.moveCond.L = &bs.moveMx

bs.mhJournal, err = openJournal(opts.Dir)
if err != nil {
return nil, err
}

return bs, nil
}

var fadvWriter func(uintptr) error

func openJournal(dir string) (*os.File, error) {
fh, err := os.OpenFile(
dir+"/"+mhJournalFilename,
os.O_APPEND|os.O_WRONLY|os.O_CREATE,
0644,
)
if err != nil {
return nil, err
}

if fadvWriter != nil {
if err := fadvWriter(fh.Fd()); err != nil {
return nil, err
}
}

return fh, nil
}

// Close closes the store. If the store has already been closed, this noops and
// returns an error, even if the first closure resulted in error.
func (b *Blockstore) Close() error {
Expand All @@ -174,7 +216,23 @@ func (b *Blockstore) Close() error {
// wait for all accesses to complete
b.viewers.Wait()

return b.db.Close()
var err error

if errDb := b.db.Close(); errDb != nil {
errDb = xerrors.Errorf("failure closing the badger blockstore: %w", errDb)
log.Warn(errDb)
err = errDb
}

if errMj := b.mhJournal.Close(); errMj != nil {
errMj = xerrors.Errorf("failure closing the multihash journal: %w", errMj)
log.Warn(errMj)
if err == nil {
err = errMj
}
}

return err
}

func (b *Blockstore) access() error {
Expand Down Expand Up @@ -267,6 +325,8 @@ func (b *Blockstore) movingGC(ctx context.Context) error {

dbNext := b.dbNext
b.dbNext = nil
mhJournalNext := b.mhJournalNext
b.mhJournalNext = nil

var state bsMoveState
if dbNext != nil {
Expand All @@ -279,10 +339,12 @@ func (b *Blockstore) movingGC(ctx context.Context) error {

if dbNext != nil {
// the move failed and we have a left-over db; delete it.
err := dbNext.Close()
if err != nil {
if err := dbNext.Close(); err != nil {
log.Warnf("error closing badger db: %s", err)
}
if err := mhJournalNext.Close(); err != nil {
log.Warnf("error closing multihash journal: %s", err)
}
b.deleteDB(newPath)

b.lockMove()
Expand Down Expand Up @@ -321,9 +383,14 @@ func (b *Blockstore) movingGC(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to open badger blockstore in %s: %w", newPath, err)
}
mhjNew, err := openJournal(opts.Dir)
if err != nil {
return err
}

b.lockMove()
b.dbNext = dbNew
b.mhJournalNext = mhjNew
b.unlockMove(moveStateMoving)

log.Info("copying blockstore")
Expand All @@ -335,13 +402,18 @@ func (b *Blockstore) movingGC(ctx context.Context) error {
b.lockMove()
dbOld := b.db
b.db = b.dbNext
mhjOld := b.mhJournal
b.mhJournal = b.mhJournalNext
b.dbNext = nil
b.mhJournalNext = nil
b.unlockMove(moveStateCleanup)

err = dbOld.Close()
if err != nil {
if err := dbOld.Close(); err != nil {
log.Warnf("error closing old badger db: %s", err)
}
if err := mhjOld.Close(); err != nil {
log.Warnf("error closing old multihash journal: %s", err)
}

// this is the canonical db path; this is where our db lives.
dbPath := b.opts.Dir
Expand Down Expand Up @@ -401,6 +473,9 @@ func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB) (defErr er
}
}()

// not doing anything to preserve insertion into new MultiHashJournal
// this is *intentional* - the copy is indiscriminate
// TODO: fix when https://github.com/filecoin-project/lotus/issues/11778 lands
return iterateBadger(ctx, from, func(kvs []*badgerstruct.KV) error {
// check whether context is closed on every kv group
if err := ctx.Err(); err != nil {
Expand Down Expand Up @@ -676,11 +751,17 @@ func (b *Blockstore) Flush(context.Context) error {
if err := b.dbNext.Sync(); err != nil {
return err
}
if err := b.mhJournalNext.Sync(); err != nil {
return err
}
}

if err := b.db.Sync(); err != nil {
return err
}
if err := b.mhJournal.Sync(); err != nil {
return err
}

return nil
}
Expand Down Expand Up @@ -812,15 +893,19 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
}()
}

keys := make([][]byte, 0, len(blocks))
for _, block := range blocks {
keys := make([][]byte, len(blocks))
for i, block := range blocks {
k, pooled := b.PooledStorageKey(block.Cid())
if pooled {
toReturn = append(toReturn, k)
}
keys = append(keys, k)
keys[i] = k
}

jrnlSlab := pool.Get(len(blocks) * (maxMhPrefixLen + hashLen))
defer pool.Put(jrnlSlab)
jrnl := jrnlSlab[:0]

if err := b.db.View(func(txn *badger.Txn) error {
for i, k := range keys {
val, err := badgerGet(txn, k)
Expand All @@ -830,14 +915,34 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
} else if val != nil {
// Already have it
keys[i] = nil
} else {
// Got to insert that, check its well-formedness
c := blocks[i].Cid()
mh := c.Hash()
mhDec, err := multihash.Decode(mh)
if err != nil {
return err
}
if mhDec.Length != hashLen {
return xerrors.Errorf("unsupported hash length of %d bits for cid %s", mhDec.Length*8, c)
}
if mhDec.Code > maxMhType {
return xerrors.Errorf("unsupported codec %d for cid %s: varint encoding wider than %d bytes", mhDec.Code, c, maxMhPrefixLen-1)
}

// add a journal record
if pad := maxMhPrefixLen + hashLen - len(mh); pad > 0 {
jrnl = append(jrnl, make([]byte, pad)...)
}
jrnl = append(jrnl, mh...)
}
}
return nil
}); err != nil {
return err
}

put := func(db *badger.DB) error {
put := func(db *badger.DB, mhj flushWriter) error {
batch := db.NewWriteBatch()
defer batch.Cancel()

Expand All @@ -852,20 +957,23 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
}
}

err := batch.Flush()
if err != nil {
if err := batch.Flush(); err != nil {
return fmt.Errorf("failed to put blocks in badger blockstore: %w", err)
}

if _, err := mhj.Write(jrnl); err != nil {
return fmt.Errorf("failed to write multihashes to journal: %w", err)
}

return nil
}

if err := put(b.db); err != nil {
if err := put(b.db, b.mhJournal); err != nil {
return err
}

if b.dbNext != nil {
if err := put(b.dbNext); err != nil {
if err := put(b.dbNext, b.mhJournalNext); err != nil {
return err
}
}
Expand Down Expand Up @@ -1035,6 +1143,11 @@ func (b *Blockstore) HashOnRead(_ bool) {
log.Warnf("called HashOnRead on badger blockstore; function not supported; ignoring")
}

const (
hashLen = hashBits / 8
maxMhType = 1<<((maxMhPrefixLen-1)*7) - 1
)

// PooledStorageKey returns the storage key under which this CID is stored.
//
// The key is: prefix + base32_no_padding(cid.Hash)
Expand Down
17 changes: 17 additions & 0 deletions blockstore/badger/blockstore_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package badgerbs

import (
"golang.org/x/sys/unix"
"golang.org/x/xerrors"
)

func init() {
fadvWriter = func(fd uintptr) error {
for _, adv := range []int{unix.FADV_NOREUSE, unix.FADV_DONTNEED} {
if err := unix.Fadvise(int(fd), 0, 0, adv); err != nil {
return xerrors.Errorf("fadvise %d failed: %w", adv, err)
}
}
return nil
}
}

0 comments on commit d33ea05

Please sign in to comment.