From a5d764ece5ff7452882fde8cfe8445241e54acfc Mon Sep 17 00:00:00 2001 From: Travis Person Date: Fri, 6 Jan 2023 23:02:36 +0000 Subject: [PATCH] feat: add toolshed commands to inspect statetree size --- cli/state.go | 10 +- cmd/lotus-shed/main.go | 2 + cmd/lotus-shed/state-stats.go | 533 ++++++++++++++++++++++++++++++++++ 3 files changed, 543 insertions(+), 2 deletions(-) create mode 100644 cmd/lotus-shed/state-stats.go diff --git a/cli/state.go b/cli/state.go index 434fb1a1cd7..7ab322516c0 100644 --- a/cli/state.go +++ b/cli/state.go @@ -252,10 +252,16 @@ func ParseTipSetString(ts string) ([]cid.Cid, error) { return cids, nil } +type TipSetResolver interface { + ChainHead(context.Context) (*types.TipSet, error) + ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) + ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error) +} + // LoadTipSet gets the tipset from the context, or the head from the API. // // It always gets the head from the API so commands use a consistent tipset even if time pases. -func LoadTipSet(ctx context.Context, cctx *cli.Context, api v0api.FullNode) (*types.TipSet, error) { +func LoadTipSet(ctx context.Context, cctx *cli.Context, api TipSetResolver) (*types.TipSet, error) { tss := cctx.String("tipset") if tss == "" { return api.ChainHead(ctx) @@ -264,7 +270,7 @@ func LoadTipSet(ctx context.Context, cctx *cli.Context, api v0api.FullNode) (*ty return ParseTipSetRef(ctx, api, tss) } -func ParseTipSetRef(ctx context.Context, api v0api.FullNode, tss string) (*types.TipSet, error) { +func ParseTipSetRef(ctx context.Context, api TipSetResolver, tss string) (*types.TipSet, error) { if tss[0] == '@' { if tss == "@head" { return api.ChainHead(ctx) diff --git a/cmd/lotus-shed/main.go b/cmd/lotus-shed/main.go index 6f84739fa5c..a8ced92f834 100644 --- a/cmd/lotus-shed/main.go +++ b/cmd/lotus-shed/main.go @@ -20,6 +20,8 @@ func main() { local := []*cli.Command{ addressCmd, + statActorCmd, + statObjCmd, base64Cmd, base32Cmd, base16Cmd, diff --git a/cmd/lotus-shed/state-stats.go b/cmd/lotus-shed/state-stats.go new file mode 100644 index 00000000000..b33e4109588 --- /dev/null +++ b/cmd/lotus-shed/state-stats.go @@ -0,0 +1,533 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "io" + "reflect" + "sync" + + "github.com/docker/go-units" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + lru "github.com/hashicorp/golang-lru" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-merkledag" + "github.com/urfave/cli/v2" + + "github.com/ipfs/go-blockservice" + offline "github.com/ipfs/go-ipfs-exchange-offline" + format "github.com/ipfs/go-ipld-format" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors/builtin" + "github.com/filecoin-project/lotus/chain/consensus/filcns" + "github.com/filecoin-project/lotus/chain/stmgr" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/vm" + lcli "github.com/filecoin-project/lotus/cli" + "github.com/filecoin-project/lotus/node/repo" + "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper" +) + +type lockedVisitor struct { + setLk sync.Mutex + set *cid.Set +} + +func (l *lockedVisitor) Visit(c cid.Cid) bool { + l.setLk.Lock() + defer l.setLk.Unlock() + + return l.set.Visit(c) +} + +type actorStats struct { + Address address.Address + Actor *types.Actor + Fields []fieldItem + Stats api.ObjStat +} + +type fieldItem struct { + Name string + Cid cid.Cid + Stats api.ObjStat +} + +type cacheNodeGetter struct { + ds format.NodeGetter + cache *lru.TwoQueueCache +} + +func newCacheNodeGetter(d format.NodeGetter, size int) (*cacheNodeGetter, error) { + cng := &cacheNodeGetter{ds: d} + + cache, err := lru.New2Q(size) + if err != nil { + return nil, err + } + + cng.cache = cache + + return cng, nil +} + +func (cng *cacheNodeGetter) Get(ctx context.Context, c cid.Cid) (format.Node, error) { + if a, ok := cng.cache.Get(c); ok { + return a.(format.Node), nil + } + + n, err := cng.ds.Get(ctx, c) + if err != nil { + return nil, err + } + + cng.cache.Add(c, n) + + return n, nil +} + +func (cng *cacheNodeGetter) GetMany(ctx context.Context, list []cid.Cid) <-chan *format.NodeOption { + return cng.ds.GetMany(ctx, list) +} + +type dagStatCollector struct { + ds format.NodeGetter + walk func(format.Node) ([]*format.Link, error) + + statsLk sync.Mutex + stats api.ObjStat +} + +func (dsc *dagStatCollector) record(ctx context.Context, nd format.Node) error { + size, err := nd.Size() + if err != nil { + return err + } + + dsc.statsLk.Lock() + defer dsc.statsLk.Unlock() + + dsc.stats.Size = dsc.stats.Size + size + dsc.stats.Links = dsc.stats.Links + 1 + + return nil +} + +func (dsc *dagStatCollector) walkLinks(ctx context.Context, c cid.Cid) ([]*format.Link, error) { + nd, err := dsc.ds.Get(ctx, c) + if err != nil { + return nil, err + } + + if err := dsc.record(ctx, nd); err != nil { + return nil, err + } + + return dsc.walk(nd) +} + +type ChainStoreTipSetResolver struct { + Chain *store.ChainStore +} + +func (tsr *ChainStoreTipSetResolver) ChainHead(ctx context.Context) (*types.TipSet, error) { + return tsr.Chain.GetHeaviestTipSet(), nil +} + +func (tsr *ChainStoreTipSetResolver) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) { + ts, err := tsr.Chain.GetTipSetFromKey(ctx, tsk) + if err != nil { + return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err) + } + return tsr.Chain.GetTipsetByHeight(ctx, h, ts, true) +} +func (tsr *ChainStoreTipSetResolver) ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) { + return tsr.Chain.LoadTipSet(ctx, tsk) +} + +var statObjCmd = &cli.Command{ + Name: "stat-obj", + Usage: "calculates the size of any DAG in the blockstore", + Flags: []cli.Flag{}, + Action: func(cctx *cli.Context) error { + ctx := lcli.ReqContext(cctx) + + c, err := cid.Parse(cctx.Args().First()) + if err != nil { + return err + } + + r, err := repo.NewFS(cctx.String("repo")) + if err != nil { + return xerrors.Errorf("opening fs repo: %w", err) + } + + exists, err := r.Exists() + if err != nil { + return err + } + if !exists { + return xerrors.Errorf("lotus repo doesn't exist") + } + + lr, err := r.Lock(repo.FullNode) + if err != nil { + return err + } + defer lr.Close() //nolint:errcheck + + bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) + if err != nil { + return fmt.Errorf("failed to open blockstore: %w", err) + } + + defer func() { + if c, ok := bs.(io.Closer); ok { + if err := c.Close(); err != nil { + log.Warnf("failed to close blockstore: %s", err) + } + } + }() + + dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + dsc := &dagStatCollector{ + ds: dag, + walk: carWalkFunc, + } + + if err := merkledag.Walk(ctx, dsc.walkLinks, c, cid.NewSet().Visit, merkledag.Concurrent()); err != nil { + return err + } + + return DumpJSON(dsc.stats) + }, +} + +var statActorCmd = &cli.Command{ + Name: "stat-actor", + Usage: "calculates the size of actors and their immeidate structures", + Description: `Any DAG linked by the actor object (field) will have its size calculated independently of all +other linked DAG. If an actor has two fields containing links to the same DAG the structure size will be counted +twice, included in each fields size individually. + +The top level Stats reported for an actor is computed independently of all fields and is a more accurate +accounting of the true size of the actor in the state datastore.`, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "tipset", + Usage: "specify tipset to call method on (pass comma separated array of cids)", + }, + &cli.IntFlag{ + Name: "workers", + Usage: "number of workers to use when processing", + Value: 10, + }, + &cli.BoolFlag{ + Name: "all", + Usage: "process all actors in stateroot of tipset", + Value: false, + }, + &cli.BoolFlag{ + Name: "pretty", + Usage: "print formated output instead of ldjson", + Value: false, + }, + }, + Action: func(cctx *cli.Context) error { + ctx := lcli.ReqContext(cctx) + + var addrs []address.Address + + if !cctx.Bool("all") { + for _, a := range cctx.Args().Slice() { + addr, err := address.NewFromString(a) + if err != nil { + return err + } + + addrs = append(addrs, addr) + } + } + + r, err := repo.NewFS(cctx.String("repo")) + if err != nil { + return xerrors.Errorf("opening fs repo: %w", err) + } + + exists, err := r.Exists() + if err != nil { + return err + } + if !exists { + return xerrors.Errorf("lotus repo doesn't exist") + } + + lr, err := r.Lock(repo.FullNode) + if err != nil { + return err + } + defer lr.Close() //nolint:errcheck + + bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore) + if err != nil { + return fmt.Errorf("failed to open blockstore: %w", err) + } + + defer func() { + if c, ok := bs.(io.Closer); ok { + if err := c.Close(); err != nil { + log.Warnf("failed to close blockstore: %s", err) + } + } + }() + + mds, err := lr.Datastore(context.Background(), "/metadata") + if err != nil { + return err + } + + cs := store.NewChainStore(bs, bs, mds, nil, nil) + if err := cs.Load(ctx); err != nil { + return nil + } + + tsExec := filcns.NewTipSetExecutor() + sm, err := stmgr.NewStateManager(cs, tsExec, vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil) + if err != nil { + return err + } + + tsr := &ChainStoreTipSetResolver{ + Chain: cs, + } + + ts, err := lcli.LoadTipSet(ctx, cctx, tsr) + if err != nil { + return err + } + + log.Infow("tipset", "parentstate", ts.ParentState()) + + if len(addrs) == 0 && cctx.Bool("all") { + var err error + addrs, err = sm.ListAllActors(ctx, ts) + if err != nil { + return err + } + } + + numWorkers := cctx.Int("workers") + + var wg sync.WaitGroup + + errChan := make(chan error) + jobs := make(chan address.Address, numWorkers) + results := make(chan actorStats, numWorkers) + + workersCtx, cancel := context.WithCancel(ctx) + defer cancel() + + worker := func(ctx context.Context, id int, wg *sync.WaitGroup, jobs <-chan address.Address, results chan<- actorStats) error { + completed := 0 + + defer wg.Done() + defer func() { + log.Infow("worker done", "id", id, "completed", completed) + }() + + for { + select { + case addr, ok := <-jobs: + if !ok { + return nil + } + + actor, err := sm.LoadActor(workersCtx, addr, ts) + if err != nil { + return err + } + + dag, err := newCacheNodeGetter(merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))), 8096) + if err != nil { + return err + } + + actStats, err := collectStats(workersCtx, addr, actor, dag) + if err != nil { + return err + } + + select { + case results <- actStats: + case <-ctx.Done(): + return ctx.Err() + } + case <-ctx.Done(): + return ctx.Err() + } + + completed = completed + 1 + } + } + + for w := 0; w < numWorkers; w++ { + wg.Add(1) + go func(id int) { + if err := worker(workersCtx, id, &wg, jobs, results); err != nil { + select { + case errChan <- err: + case <-workersCtx.Done(): + } + + return + } + }(w) + } + + go func() { + wg.Wait() + close(results) + }() + + go func() { + for _, addr := range addrs { + jobs <- addr + } + + close(jobs) + }() + + for { + select { + case result, ok := <-results: + if !ok { + return nil + } + + if cctx.Bool("pretty") { + DumpStats(result) + } else { + if err := DumpJSON(result); err != nil { + return err + } + } + case err := <-errChan: + return err + case <-ctx.Done(): + return ctx.Err() + + } + } + }, +} + +func collectStats(ctx context.Context, addr address.Address, actor *types.Actor, dag format.NodeGetter) (actorStats, error) { + log.Infow("actor", "addr", addr, "code", actor.Code, "name", builtin.ActorNameByCode(actor.Code)) + + nd, err := dag.Get(ctx, actor.Head) + if err != nil { + return actorStats{}, err + } + + // When it comes to fvm / evm actors this method of inspecting fields will probably not work + // and we may only be able to collect stats for the top level object. We might be able to iterate + // over the top level fields for the actors and identify field that are CIDs, but unsure if we would + // be able to identify a field name. + + oif, err := vm.DumpActorState(filcns.NewTipSetExecutor().NewActorRegistry(), actor, nd.RawData()) + if err != nil { + return actorStats{}, err + } + + fields := []fieldItem{} + + // Account actors return nil from DumpActorState as they have no state + if oif != nil { + v := reflect.Indirect(reflect.ValueOf(oif)) + for i := 0; i < v.NumField(); i++ { + varName := v.Type().Field(i).Name + varType := v.Type().Field(i).Type + varValue := v.Field(i).Interface() + + if varType == reflect.TypeOf(cid.Cid{}) { + fields = append(fields, fieldItem{ + Name: varName, + Cid: varValue.(cid.Cid), + }) + } + } + } + + actStats := actorStats{ + Address: addr, + Actor: actor, + } + + dsc := &dagStatCollector{ + ds: dag, + walk: carWalkFunc, + } + + if err := merkledag.Walk(ctx, dsc.walkLinks, actor.Head, cid.NewSet().Visit, merkledag.Concurrent()); err != nil { + return actorStats{}, err + } + + actStats.Stats = dsc.stats + + for _, field := range fields { + dsc := &dagStatCollector{ + ds: dag, + walk: carWalkFunc, + } + + if err := merkledag.Walk(ctx, dsc.walkLinks, field.Cid, cid.NewSet().Visit, merkledag.Concurrent()); err != nil { + return actorStats{}, err + } + + field.Stats = dsc.stats + + actStats.Fields = append(actStats.Fields, field) + } + + return actStats, nil +} + +func DumpJSON(i interface{}) error { + bs, err := json.Marshal(i) + if err != nil { + return err + } + + fmt.Println(string(bs)) + + return nil +} + +func DumpStats(actStats actorStats) { + strtype := builtin.ActorNameByCode(actStats.Actor.Code) + fmt.Printf("Address:\t%s\n", actStats.Address) + fmt.Printf("Balance:\t%s\n", types.FIL(actStats.Actor.Balance)) + fmt.Printf("Nonce:\t\t%d\n", actStats.Actor.Nonce) + fmt.Printf("Code:\t\t%s (%s)\n", actStats.Actor.Code, strtype) + fmt.Printf("Head:\t\t%s\n", actStats.Actor.Head) + fmt.Println() + + fmt.Printf("%-*s%-*s%-*s\n", 32, "Field", 24, "Size", 24, "\"Blocks\"") + + stats := actStats.Stats + sizeStr := units.BytesSize(float64(stats.Size)) + fmt.Printf("%-*s%-*s%-*s%-*d\n", 32, "", 10, sizeStr, 14, fmt.Sprintf("(%d)", stats.Size), 24, stats.Links) + + for _, s := range actStats.Fields { + stats := s.Stats + sizeStr := units.BytesSize(float64(stats.Size)) + fmt.Printf("%-*s%-*s%-*s%-*d\n", 32, s.Name, 10, sizeStr, 14, fmt.Sprintf("(%d)", stats.Size), 24, stats.Links) + } + + fmt.Println("--------------------------------------------------------------------------") +}