diff --git a/blockstore/badger/blockstore.go b/blockstore/badger/blockstore.go index b634f9ef0c..83cb553926 100644 --- a/blockstore/badger/blockstore.go +++ b/blockstore/badger/blockstore.go @@ -1,6 +1,7 @@ package badgerbs import ( + "bytes" "context" "fmt" "io" @@ -19,6 +20,7 @@ import ( logger "github.com/ipfs/go-log/v2" pool "github.com/libp2p/go-buffer-pool" "github.com/multiformats/go-base32" + "github.com/multiformats/go-multicodec" "github.com/multiformats/go-multihash" "go.uber.org/zap" "golang.org/x/xerrors" @@ -30,11 +32,7 @@ const ( mhJournalFilename = "MultiHashes.bin" maxMhPrefixLen = 4 // 3-byte MhType varint, 1 byte len hashBits = 256 // anything not this size will be rejected, see 🧵 https://filecoinproject.slack.com/archives/CRK2LKYHW/p1711381656211189?thread_ts=1711264671.316169&cid=CRK2LKYHW -) - -var ( - // KeyPool is the buffer pool we use to compute storage keys. - KeyPool *pool.BufferPool = pool.GlobalPool + binkeyBits = 128 ) var ( @@ -63,6 +61,9 @@ type Options struct { // Prefix is an optional prefix to prepend to keys. Default: "". Prefix string + + // Whether to check for legacy base32-encoded keys on Get/Delete + QueryLegacyKeys bool } func DefaultOptions(path string) Options { @@ -468,6 +469,31 @@ func symlink(path, linkTo string) error { return os.Symlink(path, linkTo) } +// this is so bad... 🤮 +var cidMakers = []cid.Prefix{ + cid.NewPrefixV1(uint64(multicodec.Raw), uint64(multicodec.Blake2b256)), + cid.NewPrefixV1(uint64(multicodec.Raw), uint64(multicodec.Sha2_256)), +} + +func findCidForPartialKV(kv *badgerstruct.KV) (c cid.Cid, err error) { + // this is so so SO nasty... 🤮 + for _, maker := range cidMakers { + c, err = maker.Sum(kv.Value) + if err != nil { + return + } + h := c.Hash() + if bytes.Equal(kv.Key, h[len(h)-hashLen:len(h)-hashLen+binkeyLen]) { + break + } + c = cid.Undef + } + if !c.Defined() { + err = xerrors.Errorf("none of the available mutihashers produced a hash starting with 0x%X", kv.Key) + } + return +} + // doCopy copies a badger blockstore to another func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB, jrnlFh io.Writer) (defErr error) { batch := to.NewWriteBatch() @@ -490,36 +516,53 @@ func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB, jrnlFh io. defer pool.Put(jrnlSlab) jrnl := jrnlSlab[:0] - mhBuf := pool.Get(maxMhPrefixLen + hashLen) - defer pool.Put(mhBuf) - var mhDec *multihash.DecodedMultihash for _, kv := range kvs { - n, err := base32.RawStdEncoding.Decode(mhBuf, kv.Key[b.prefixLen:]) - if err != nil { - return xerrors.Errorf("undecodeable key 0x%X: %s", kv.Key, err) - } - mhDec, err = multihash.Decode(mhBuf[:n]) - if err != nil { - return xerrors.Errorf("unexpected multihash 0x%X for key 0x%X: %s", mhBuf[:n], kv.Key, err) - } - if mhDec.Length != hashLen { - return xerrors.Errorf("unsupported hash length of %d bits for key 0x%X", mhDec.Length*8, kv.Key) - } - if mhDec.Code > maxMhType { - return xerrors.Errorf("unsupported mhtype %d for key 0x%X: varint encoding wider than %d bytes", mhDec.Code, kv.Key, maxMhPrefixLen-1) + k := kv.Key + var mh []byte + + if len(k) != binkeyLen { + // this is a legacy key: remake it regardless of b.opts.QueryLegacyKeys + // do not use a pooled buffer, as it messes with the badger batch + mh = make([]byte, maxMhPrefixLen+hashLen) + n, err := base32.RawStdEncoding.Decode(mh, k[b.prefixLen:]) + if err != nil { + return xerrors.Errorf("undecodeable key 0x%X: %s", k, err) + } + mh = mh[:n] + + mhDec, err = multihash.Decode(mh) + if err != nil { + return xerrors.Errorf("unexpected multihash 0x%X for key 0x%X: %s", mh, k, err) + } + if mhDec.Length != hashLen { + return xerrors.Errorf("unsupported hash length of %d bits for key 0x%X", mhDec.Length*8, k) + } + if mhDec.Code > maxMhType { + return xerrors.Errorf("unsupported mhtype %d for key 0x%X: varint encoding wider than %d bytes", mhDec.Code, k, maxMhPrefixLen-1) + } + + k = mh[n-hashLen : n-hashLen+binkeyLen : n-hashLen+binkeyLen] // we checked the multihash digest is hashLen (256bits) long + } else { + // nasty way to recreate the hash from the payload 🤮 + // worth it however for maintaining journal consistency + c, err := findCidForPartialKV(kv) + if err != nil { + return err + } + mh = c.Hash() } - if err := batch.Set(kv.Key, kv.Value); err != nil { + if err := batch.Set(k, kv.Value); err != nil { return err } // add a journal record - if pad := maxMhPrefixLen + hashLen - n; pad > 0 { + if pad := maxMhPrefixLen + hashLen - len(mh); pad > 0 { jrnl = append(jrnl, make([]byte, pad)...) } - jrnl = append(jrnl, mhBuf[:n]...) + jrnl = append(jrnl, mh...) } if _, err := jrnlFh.Write(jrnl); err != nil { @@ -735,12 +778,23 @@ func (b *Blockstore) Size() (int64, error) { } // badgerGet is a basic tri-state: value+nil nil+nil nil+err -func badgerGet(t *badger.Txn, k []byte) (*valueItem, error) { - switch item, err := t.Get(k); err { +func (b *Blockstore) badgerGet(t *badger.Txn, mk badgerMultiKey) (*valueItem, error) { + switch item, err := t.Get(mk.binKey()); err { case nil: - return &valueItem{item}, nil + return &valueItem{item, mk.binKey()}, nil case badger.ErrKeyNotFound: - return nil, nil + if !b.opts.QueryLegacyKeys { + return nil, nil + } + // try again with legacy ... + switch item, err := t.Get(mk.legacyKey()); err { + case nil: + return &valueItem{item, mk.legacyKey()}, nil + case badger.ErrKeyNotFound: + return nil, nil + default: + return nil, err + } default: return nil, err } @@ -748,6 +802,40 @@ func badgerGet(t *badger.Txn, k []byte) (*valueItem, error) { type valueItem struct { badgerItem *badger.Item + currentKey []byte +} + +func (vi *valueItem) size() (int, error) { + return int(vi.badgerItem.ValueSize()), nil +} +func (vi *valueItem) block(c cid.Cid) (blocks.Block, error) { + payload, err := vi.badgerItem.ValueCopy(nil) + if err != nil { + return nil, err + } + if err := checkHash(c, payload, vi.currentKey); err != nil { + return nil, err + } + return blocks.NewBlockWithCid(payload, c) +} +func (vi *valueItem) view(c cid.Cid, f func(val []byte) error) error { + return vi.badgerItem.Value(func(payload []byte) error { + if err := checkHash(c, payload, vi.currentKey); err != nil { + return err + } + return f(payload) + }) +} + +func checkHash(c cid.Cid, b, k []byte) error { + rehash, err := c.Prefix().Sum(b) + if err != nil { + return err + } + if !rehash.Equals(c) { + return xerrors.Errorf("multihash mismatch for cid %s (badger key 0x%X): value hashes to 0x%X, but expected multihash 0x%X", c, k, []byte(rehash.Hash()), []byte(c.Hash())) + } + return nil } // View implements blockstore.Viewer, which leverages zero-copy read-only @@ -761,19 +849,20 @@ func (b *Blockstore) View(ctx context.Context, c cid.Cid, fn func([]byte) error) b.lockDB() defer b.unlockDB() - k, pooled := b.PooledStorageKey(c) - if pooled { - defer KeyPool.Put(k) + mk, err := b.pooledMultiKey(c) + if err != nil { + return err } + defer pool.Put(mk) return b.db.View(func(txn *badger.Txn) error { - val, err := badgerGet(txn, k) + val, err := b.badgerGet(txn, mk) if err != nil { return fmt.Errorf("failed to view block from badger blockstore: %w", err) } else if val == nil { return ipld.ErrNotFound{Cid: c} } - return val.badgerItem.Value(fn) + return val.view(c, fn) }) } @@ -818,14 +907,15 @@ func (b *Blockstore) Has(ctx context.Context, c cid.Cid) (bool, error) { b.lockDB() defer b.unlockDB() - k, pooled := b.PooledStorageKey(c) - if pooled { - defer KeyPool.Put(k) + mk, err := b.pooledMultiKey(c) + if err != nil { + return false, err } + defer pool.Put(mk) var canHaz bool - err := b.db.View(func(txn *badger.Txn) error { - val, err := badgerGet(txn, k) + err = b.db.View(func(txn *badger.Txn) error { + val, err := b.badgerGet(txn, mk) if val != nil { canHaz = true } @@ -852,27 +942,24 @@ func (b *Blockstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { b.lockDB() defer b.unlockDB() - k, pooled := b.PooledStorageKey(c) - if pooled { - defer KeyPool.Put(k) + mk, err := b.pooledMultiKey(c) + if err != nil { + return nil, err } + defer pool.Put(mk) - var buf []byte - - if err := b.db.View(func(txn *badger.Txn) error { - val, err := badgerGet(txn, k) + var blk blocks.Block + err = b.db.View(func(txn *badger.Txn) error { + val, err := b.badgerGet(txn, mk) if err != nil { return fmt.Errorf("failed to get block from badger blockstore: %w", err) } else if val == nil { return ipld.ErrNotFound{Cid: c} } - buf, err = val.badgerItem.ValueCopy(nil) + blk, err = val.block(c) return err - }); err != nil { - return nil, err - } - - return blocks.NewBlockWithCid(buf, c) + }) + return blk, err } // GetSize implements Blockstore.GetSize. @@ -885,14 +972,15 @@ func (b *Blockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { b.lockDB() defer b.unlockDB() - k, pooled := b.PooledStorageKey(c) - if pooled { - defer KeyPool.Put(k) + mk, err := b.pooledMultiKey(c) + if err != nil { + return -1, err } + defer pool.Put(mk) size := -1 - err := b.db.View(func(txn *badger.Txn) error { - val, err := badgerGet(txn, k) + err = b.db.View(func(txn *badger.Txn) error { + val, err := b.badgerGet(txn, mk) if err != nil { return fmt.Errorf("failed to get block size from badger blockstore: %w", err) @@ -900,8 +988,8 @@ func (b *Blockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { return ipld.ErrNotFound{Cid: c} } - size = int(val.badgerItem.ValueSize()) - return nil + size, err = val.size() + return err }) return size, err @@ -922,26 +1010,24 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { b.lockDB() defer b.unlockDB() - // toReturn tracks the byte slices to return to the pool, if we're using key - // prefixing. we can't return each slice to the pool after each Set, because - // badger holds on to the slice. - var toReturn [][]byte - if b.prefixing { - toReturn = make([][]byte, 0, len(blocks)) - defer func() { - for _, b := range toReturn { - KeyPool.Put(b) - } - }() + type kv struct { + mk badgerMultiKey + val []byte } - keys := make([][]byte, len(blocks)) - for i, block := range blocks { - k, pooled := b.PooledStorageKey(block.Cid()) - if pooled { - toReturn = append(toReturn, k) + // kvs/defer() must be declared before (thus happen after) the NewWriteBatch below + kvs := make([]kv, 0, len(blocks)) + defer func() { + for _, kv := range kvs { + pool.Put(kv.mk) } - keys[i] = k + }() + for i := range blocks { + mk, err := b.pooledMultiKey(blocks[i].Cid()) + if err != nil { + return err + } + kvs = append(kvs, kv{mk: mk}) } jrnlSlab := pool.Get(len(blocks) * (maxMhPrefixLen + hashLen)) @@ -949,34 +1035,19 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { jrnl := jrnlSlab[:0] if err := b.db.View(func(txn *badger.Txn) error { - for i, k := range keys { - val, err := badgerGet(txn, k) + for i := range kvs { + val, err := b.badgerGet(txn, kvs[i].mk) if err != nil { // Something is actually wrong return err - } else if val != nil { - // Already have it - keys[i] = nil - } else { - // Got to insert that, check its well-formedness - c := blocks[i].Cid() - mh := c.Hash() - mhDec, err := multihash.Decode(mh) - if err != nil { - return err - } - if mhDec.Length != hashLen { - return xerrors.Errorf("unsupported hash length of %d bits for cid %s", mhDec.Length*8, c) - } - if mhDec.Code > maxMhType { - return xerrors.Errorf("unsupported mhtype %d for cid %s: varint encoding wider than %d bytes", mhDec.Code, c, maxMhPrefixLen-1) - } - - // add a journal record + } else if val == nil { + // Got to insert that, add a journal record, prepare value + mh := blocks[i].Cid().Hash() if pad := maxMhPrefixLen + hashLen - len(mh); pad > 0 { jrnl = append(jrnl, make([]byte, pad)...) } jrnl = append(jrnl, mh...) + kvs[i].val = blocks[i].RawData() } } return nil @@ -988,14 +1059,11 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { batch := db.NewWriteBatch() defer batch.Cancel() - for i, block := range blocks { - k := keys[i] - if k == nil { - // skipped because we already have it. - continue - } - if err := batch.Set(k, block.RawData()); err != nil { - return err + for i := range kvs { + if kvs[i].val != nil { + if err := batch.Set(kvs[i].mk.binKey(), kvs[i].val); err != nil { + return err + } } } @@ -1037,88 +1105,90 @@ func (b *Blockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error { b.lockDB() defer b.unlockDB() - // toReturn tracks the byte slices to return to the pool, if we're using key - // prefixing. we can't return each slice to the pool after each Set, because - // badger holds on to the slice. - var toReturn [][]byte - if b.prefixing { - toReturn = make([][]byte, 0, len(cids)) - defer func() { - for _, b := range toReturn { - KeyPool.Put(b) + // keys/defer() must be declared before (thus happen after) the NewWriteBatch below + multikeys := make([]badgerMultiKey, len(cids)) + defer func() { + for _, mk := range multikeys { + if mk != nil { + pool.Put(mk) } - }() + } + }() + var err error + for i := range cids { + multikeys[i], err = b.pooledMultiKey(cids[i]) + if err != nil { + return err + } } batch := b.db.NewWriteBatch() defer batch.Cancel() - for _, cid := range cids { - k, pooled := b.PooledStorageKey(cid) - if pooled { - toReturn = append(toReturn, k) - } - if err := batch.Delete(k); err != nil { - return err + // only delete keys we are known to have + // blindly calling batch.Delete() clutters the logs with delete markers + if err := b.db.View(func(txn *badger.Txn) error { + for _, mk := range multikeys { + + variants := append( + make([][]byte, 0, 2), + mk.binKey(), + ) + if b.opts.QueryLegacyKeys { + variants = append(variants, mk.legacyKey()) + } + + for _, k := range variants { + _, err := txn.Get(k) + if err == badger.ErrKeyNotFound { + continue + } else if err != nil { + return err + } + // key does exist + if err := batch.Delete(k); err != nil { + return err + } + } } - } - err := batch.Flush() - if err != nil { - err = fmt.Errorf("failed to delete blocks from badger blockstore: %w", err) + return batch.Flush() + }); err != nil { + return fmt.Errorf("failed to delete blocks from badger blockstore: %w", err) } - return err + + return nil } // AllKeysChan implements Blockstore.AllKeysChan. +var EnableHorriblyInefficientAllKeysChanMethod bool + func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { - if err := b.access(); err != nil { - return nil, err + + if !EnableHorriblyInefficientAllKeysChanMethod { + return nil, xerrors.New("method AllKeysChan() is not supported, use ForEachKey() instead") } - b.lockDB() - defer b.unlockDB() + // reimplement to use strictly as test of the below + // this is useless in production anyway 🫠 + cids := make([]cid.Cid, 0, 1<<20) - txn := b.db.NewTransaction(false) - opts := badger.IteratorOptions{PrefetchSize: 100} - if b.prefixing { - opts.Prefix = b.prefix + if err := b.ForEachKey(func(c cid.Cid) error { + cids = append(cids, c) + return nil + }); err != nil { + return nil, err } - iter := txn.NewIterator(opts) ch := make(chan cid.Cid) go func() { - defer b.viewers.Done() defer close(ch) - defer iter.Close() - - // NewCidV1 makes a copy of the multihash buffer, so we can reuse it to - // contain allocs. - var buf []byte - for iter.Rewind(); iter.Valid(); iter.Next() { - if ctx.Err() != nil { - return // context has fired. - } - if !b.isOpen() { - // open iterators will run even after the database is closed... - return // closing, yield. - } - k := iter.Item().Key() - if b.prefixing { - k = k[b.prefixLen:] - } - - if reqlen := base32.RawStdEncoding.DecodedLen(len(k)); len(buf) < reqlen { - buf = make([]byte, reqlen) - } - if n, err := base32.RawStdEncoding.Decode(buf, k); err == nil { - select { - case ch <- cid.NewCidV1(cid.Raw, buf[:n]): - case <-ctx.Done(): - return - } - } else { - log.Warnf("failed to decode key %s in badger AllKeysChan; err: %s", k, err) + for _, c := range cids { + select { + case <-ctx.Done(): + return + case ch <- c: + // keep looping } } }() @@ -1128,6 +1198,10 @@ func (b *Blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { // Implementation of BlockstoreIterator interface func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error { + // unfortunately this is used in splitstore, so have to reimplement horribly 🫠 + mhBuf := pool.Get(maxMhPrefixLen + hashLen) + defer pool.Put(mhBuf) + if err := b.access(); err != nil { return err } @@ -1136,107 +1210,97 @@ func (b *Blockstore) ForEachKey(f func(cid.Cid) error) error { b.lockDB() defer b.unlockDB() - txn := b.db.NewTransaction(false) - defer txn.Discard() - - opts := badger.IteratorOptions{PrefetchSize: 100} - if b.prefixing { - opts.Prefix = b.prefix - } - - iter := txn.NewIterator(opts) - defer iter.Close() + var err error + var c cid.Cid - var buf []byte - for iter.Rewind(); iter.Valid(); iter.Next() { + return iterateBadger(context.Background(), b.db, func(kvs []*badgerstruct.KV) error { if !b.isOpen() { return ErrBlockstoreClosed } - k := iter.Item().Key() - if b.prefixing { - k = k[b.prefixLen:] - } + for _, kv := range kvs { + if len(kv.Key) != binkeyLen { + if !b.opts.QueryLegacyKeys { + continue + } + // this is a legacy key: just use it + n, err := base32.RawStdEncoding.Decode(mhBuf, kv.Key[b.prefixLen:]) + if err != nil { + return err + } + c = cid.NewCidV1(uint64(multicodec.Raw), mhBuf[:n]) + } else { + c, err = findCidForPartialKV(kv) + if err != nil { + return err + } + } - klen := base32.RawStdEncoding.DecodedLen(len(k)) - if klen > len(buf) { - buf = make([]byte, klen) + if err := f(c); err != nil { + return err + } } - n, err := base32.RawStdEncoding.Decode(buf, k) - if err != nil { - return err - } + return nil + }) +} - c := cid.NewCidV1(cid.Raw, buf[:n]) +// HashOnRead implements Blockstore.HashOnRead. It is not supported by this +// blockstore. +func (b *Blockstore) HashOnRead(t bool) { + if !t { + log.Warnf("attempt to disable HashOnRead on badger blockstore: this is always unconditionally enabled, ignoring") + } +} - err = f(c) - if err != nil { - return err - } +type badgerMultiKey []byte + +func (bk badgerMultiKey) binKey() []byte { return bk[:binkeyLen] } +func (bk badgerMultiKey) legacyKey() []byte { return bk[binkeyLen:] } + +// returns an amalgam binary/legacy key to be used for the necessary lookups +func (b *Blockstore) pooledMultiKey(c cid.Cid) (badgerMultiKey, error) { + mh := c.Hash() + mhDec, err := multihash.Decode(mh) + if err != nil { + return nil, err + } + if mhDec.Length != hashLen { + return nil, xerrors.Errorf("unsupported hash length of %d bits for cid %s", mhDec.Length*8, c) + } + if mhDec.Code > maxMhType { + return nil, xerrors.Errorf("unsupported mhtype %d for cid %s: varint encoding wider than %d bytes", mhDec.Code, c, maxMhPrefixLen-1) } - return nil -} + if !b.opts.QueryLegacyKeys { + mk := pool.Get(binkeyLen) + copy(mk, mhDec.Digest[:binkeyLen]) + return mk, nil + } -// HashOnRead implements Blockstore.HashOnRead. It is not supported by this -// blockstore. -func (b *Blockstore) HashOnRead(_ bool) { - log.Warnf("called HashOnRead on badger blockstore; function not supported; ignoring") + legacyLen := base32.RawStdEncoding.EncodedLen(len(mh)) + + if !b.prefixing { + mk := pool.Get(binkeyLen + legacyLen) + copy(mk, mhDec.Digest[:binkeyLen]) + base32.RawStdEncoding.Encode(mk[binkeyLen:], mh) + return mk, nil + } + + mk := pool.Get(binkeyLen + b.prefixLen + legacyLen) + copy(mk, mhDec.Digest[:binkeyLen]) + copy(mk[binkeyLen:], b.prefix) + base32.RawStdEncoding.Encode(mk[binkeyLen+b.prefixLen:], mh) + + return mk, nil } const ( + binkeyLen = binkeyBits / 8 hashLen = hashBits / 8 maxMhType = 1<<((maxMhPrefixLen-1)*7) - 1 ) -// PooledStorageKey returns the storage key under which this CID is stored. -// -// The key is: prefix + base32_no_padding(cid.Hash) -// -// This method may return pooled byte slice, which MUST be returned to the -// KeyPool if pooled=true, or a leak will occur. -func (b *Blockstore) PooledStorageKey(c cid.Cid) (key []byte, pooled bool) { - h := c.Hash() - size := base32.RawStdEncoding.EncodedLen(len(h)) - if !b.prefixing { // optimize for branch prediction. - k := pool.Get(size) - base32.RawStdEncoding.Encode(k, h) - return k, true // slicing upto length unnecessary; the pool has already done this. - } - - size += b.prefixLen - k := pool.Get(size) - copy(k, b.prefix) - base32.RawStdEncoding.Encode(k[b.prefixLen:], h) - return k, true // slicing upto length unnecessary; the pool has already done this. -} - -// Storage acts like PooledStorageKey, but attempts to write the storage key -// into the provided slice. If the slice capacity is insufficient, it allocates -// a new byte slice with enough capacity to accommodate the result. This method -// returns the resulting slice. -func (b *Blockstore) StorageKey(dst []byte, c cid.Cid) []byte { - h := c.Hash() - reqsize := base32.RawStdEncoding.EncodedLen(len(h)) + b.prefixLen - if reqsize > cap(dst) { - // passed slice is smaller than required size; create new. - dst = make([]byte, reqsize) - } else if reqsize > len(dst) { - // passed slice has enough capacity, but its length is - // restricted, expand. - dst = dst[:cap(dst)] - } - - if b.prefixing { // optimize for branch prediction. - copy(dst, b.prefix) - base32.RawStdEncoding.Encode(dst[b.prefixLen:], h) - } else { - base32.RawStdEncoding.Encode(dst, h) - } - return dst[:reqsize] -} - // this method is added for lotus-shed needs // WARNING: THIS IS COMPLETELY UNSAFE; DONT USE THIS IN PRODUCTION CODE func (b *Blockstore) DB() *badger.DB { diff --git a/blockstore/badger/blockstore_test.go b/blockstore/badger/blockstore_test.go index d253f37d95..d96591f839 100644 --- a/blockstore/badger/blockstore_test.go +++ b/blockstore/badger/blockstore_test.go @@ -12,7 +12,6 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "github.com/filecoin-project/lotus/blockstore" @@ -38,40 +37,6 @@ func TestBadgerBlockstore(t *testing.T) { }).RunTests(t, "prefixed") } -func TestStorageKey(t *testing.T) { - //stm: @SPLITSTORE_BADGER_OPEN_001, @SPLITSTORE_BADGER_CLOSE_001 - //stm: @SPLITSTORE_BADGER_STORAGE_KEY_001 - bs, _ := newBlockstore(DefaultOptions)(t) - bbs := bs.(*Blockstore) - defer bbs.Close() //nolint:errcheck - - cid1 := blocks.NewBlock([]byte("some data")).Cid() - cid2 := blocks.NewBlock([]byte("more data")).Cid() - cid3 := blocks.NewBlock([]byte("a little more data")).Cid() - require.NotEqual(t, cid1, cid2) // sanity check - require.NotEqual(t, cid2, cid3) // sanity check - - // nil slice; let StorageKey allocate for us. - k1 := bbs.StorageKey(nil, cid1) - require.Len(t, k1, 55) - require.True(t, cap(k1) == len(k1)) - - // k1's backing array is reused. - k2 := bbs.StorageKey(k1, cid2) - require.Len(t, k2, 55) - require.True(t, cap(k2) == len(k1)) - - // bring k2 to len=0, and verify that its backing array gets reused - // (i.e. k1 and k2 are overwritten) - k3 := bbs.StorageKey(k2[:0], cid3) - require.Len(t, k3, 55) - require.True(t, cap(k3) == len(k3)) - - // backing array of k1 and k2 has been modified, i.e. memory is shared. - require.Equal(t, k3, k1) - require.Equal(t, k3, k2) -} - func newBlockstore(optsSupplier func(path string) Options) func(tb testing.TB) (bs blockstore.BasicBlockstore, path string) { return func(tb testing.TB) (bs blockstore.BasicBlockstore, path string) { tb.Helper() diff --git a/blockstore/badger/blockstore_test_suite.go b/blockstore/badger/blockstore_test_suite.go index 480f5d793f..a50e550ba6 100644 --- a/blockstore/badger/blockstore_test_suite.go +++ b/blockstore/badger/blockstore_test_suite.go @@ -18,6 +18,10 @@ import ( "github.com/filecoin-project/lotus/blockstore" ) +func init() { + EnableHorriblyInefficientAllKeysChanMethod = true +} + // TODO: move this to go-ipfs-blockstore. type Suite struct { NewBlockstore func(tb testing.TB) (bs blockstore.BasicBlockstore, path string) @@ -214,7 +218,7 @@ func (s *Suite) TestAllKeysRespectsContext(t *testing.T) { cancel() // pull one value out to avoid race - _, _ = <-ch + <-ch v, ok = <-ch require.Equal(t, cid.Undef, v) diff --git a/node/modules/chain.go b/node/modules/chain.go index 70cb9b5854..520a6b8ccf 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -29,6 +29,7 @@ import ( "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" + "github.com/filecoin-project/lotus/system" ) // ChainBitswap uses a blockstore that bypasses all caches. @@ -87,7 +88,13 @@ func ChainStore(lc fx.Lifecycle, chain := store.NewChainStore(cbs, sbs, ds, weight, j) if err := chain.Load(helpers.LifecycleCtx(mctx, lc)); err != nil { - return nil, xerrors.Errorf("loading chain state from disk: %w", err) + if !system.BadgerQueryLegacyKeys { + err = xerrors.Errorf("loading chain state from disk ( !!! PERHAPS YOU NEED TO SET `LOTUS_CHAIN_BADGERSTORE_QUERY_LEGACY_KEYS` TO TRUE !!! ): %w", err) + + } else { + err = xerrors.Errorf("loading chain state from disk: %w", err) + } + return nil, err } var startHook func(context.Context) error diff --git a/node/repo/blockstore_opts.go b/node/repo/blockstore_opts.go index f0f8d70784..f0de388e68 100644 --- a/node/repo/blockstore_opts.go +++ b/node/repo/blockstore_opts.go @@ -5,6 +5,7 @@ import ( "strconv" badgerbs "github.com/filecoin-project/lotus/blockstore/badger" + "github.com/filecoin-project/lotus/system" ) // BadgerBlockstoreOptions returns the badger options to apply for the provided @@ -61,6 +62,13 @@ func BadgerBlockstoreOptions(domain BlockstoreDomain, path string, readonly bool } } + if system.BadgerFsyncDisable { + opts.SyncWrites = false + } + if system.BadgerQueryLegacyKeys { + opts.QueryLegacyKeys = true + } + return opts, nil } diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index d8e41fb2bd..cc72cbf8b3 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -26,7 +26,6 @@ import ( "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/storage/sealer/fsutil" "github.com/filecoin-project/lotus/storage/sealer/storiface" - "github.com/filecoin-project/lotus/system" ) const ( @@ -503,10 +502,6 @@ func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain return } - if system.BadgerFsyncDisable { - opts.SyncWrites = false - } - bs, err := badgerbs.Open(opts) if err != nil { fsr.bsErr = err diff --git a/system/io.go b/system/io.go index 80e2e16efd..82ba51df66 100644 --- a/system/io.go +++ b/system/io.go @@ -5,7 +5,10 @@ import ( "strings" ) -var BadgerFsyncDisable bool +var ( + BadgerFsyncDisable bool + BadgerQueryLegacyKeys bool +) func init() { // Do not fsync badgers, it does not add value at this stage @@ -25,4 +28,11 @@ func init() { BadgerFsyncDisable = true } } + + if legacy, isSet := os.LookupEnv("LOTUS_CHAIN_BADGERSTORE_QUERY_LEGACY_KEYS"); isSet { + legacy = strings.ToLower(legacy) + if legacy != "" && legacy != "0" && legacy != "false" && legacy != "no" { + BadgerQueryLegacyKeys = true + } + } }