Skip to content

Commit

Permalink
Merge pull request #3494 from filecoin-project/feat/faster-blocksync-…
Browse files Browse the repository at this point in the history
…serving

load fewer messages from disk when serving blocksync requests
  • Loading branch information
magik6k authored Sep 2, 2020
2 parents 7536744 + ca8f586 commit d552803
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 18 deletions.
39 changes: 24 additions & 15 deletions chain/blocksync/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,43 +221,52 @@ func collectChainSegment(
func gatherMessages(cs *store.ChainStore, ts *types.TipSet) ([]*types.Message, [][]uint64, []*types.SignedMessage, [][]uint64, error) {
blsmsgmap := make(map[cid.Cid]uint64)
secpkmsgmap := make(map[cid.Cid]uint64)
var secpkmsgs []*types.SignedMessage
var blsmsgs []*types.Message
var secpkincl, blsincl [][]uint64

var blscids, secpkcids []cid.Cid
for _, block := range ts.Blocks() {
bmsgs, smsgs, err := cs.MessagesForBlock(block)
bc, sc, err := cs.ReadMsgMetaCids(block.Messages)
if err != nil {
return nil, nil, nil, nil, err
}

// FIXME: DRY. Use `chain.Message` interface.
bmi := make([]uint64, 0, len(bmsgs))
for _, m := range bmsgs {
i, ok := blsmsgmap[m.Cid()]
bmi := make([]uint64, 0, len(bc))
for _, m := range bc {
i, ok := blsmsgmap[m]
if !ok {
i = uint64(len(blsmsgs))
blsmsgs = append(blsmsgs, m)
blsmsgmap[m.Cid()] = i
i = uint64(len(blscids))
blscids = append(blscids, m)
blsmsgmap[m] = i
}

bmi = append(bmi, i)
}
blsincl = append(blsincl, bmi)

smi := make([]uint64, 0, len(smsgs))
for _, m := range smsgs {
i, ok := secpkmsgmap[m.Cid()]
smi := make([]uint64, 0, len(sc))
for _, m := range sc {
i, ok := secpkmsgmap[m]
if !ok {
i = uint64(len(secpkmsgs))
secpkmsgs = append(secpkmsgs, m)
secpkmsgmap[m.Cid()] = i
i = uint64(len(secpkcids))
secpkcids = append(secpkcids, m)
secpkmsgmap[m] = i
}

smi = append(smi, i)
}
secpkincl = append(secpkincl, smi)
}

blsmsgs, err := cs.LoadMessagesFromCids(blscids)
if err != nil {
return nil, nil, nil, nil, err
}

secpkmsgs, err := cs.LoadSignedMessagesFromCids(secpkcids)
if err != nil {
return nil, nil, nil, nil, err
}

return blsmsgs, blsincl, secpkmsgs, secpkincl, nil
}
15 changes: 12 additions & 3 deletions chain/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var chainHeadKey = dstore.NewKey("head")
var blockValidationCacheKeyPrefix = dstore.NewKey("blockValidation")

var DefaultTipSetCacheSize = 8192
var DefaultMsgMetaCacheSize = 2048

func init() {
if s := os.Getenv("LOTUS_CHAIN_TIPSET_CACHE"); s != "" {
Expand All @@ -58,6 +59,14 @@ func init() {
}
DefaultTipSetCacheSize = tscs
}

if s := os.Getenv("LOTUS_CHAIN_MSGMETA_CACHE"); s != "" {
mmcs, err := strconv.Atoi(s)
if err != nil {
log.Errorf("failed to parse 'LOTUS_CHAIN_MSGMETA_CACHE' env var: %s", err)
}
DefaultMsgMetaCacheSize = mmcs
}
}

// ReorgNotifee represents a callback that gets called upon reorgs.
Expand Down Expand Up @@ -97,7 +106,7 @@ type ChainStore struct {
}

func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder) *ChainStore {
c, _ := lru.NewARC(2048)
c, _ := lru.NewARC(DefaultMsgMetaCacheSize)
tsc, _ := lru.NewARC(DefaultTipSetCacheSize)
cs := &ChainStore{
bs: bs,
Expand Down Expand Up @@ -834,7 +843,7 @@ type mmCids struct {
secpk []cid.Cid
}

func (cs *ChainStore) readMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error) {
func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error) {
o, ok := cs.mmCache.Get(mmc)
if ok {
mmcids := o.(*mmCids)
Expand Down Expand Up @@ -890,7 +899,7 @@ func (cs *ChainStore) GetPath(ctx context.Context, from types.TipSetKey, to type
}

func (cs *ChainStore) MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
blscids, secpkcids, err := cs.readMsgMetaCids(b.Messages)
blscids, secpkcids, err := cs.ReadMsgMetaCids(b.Messages)
if err != nil {
return nil, nil, err
}
Expand Down

0 comments on commit d552803

Please sign in to comment.