From 799b6a98e65ce9133d1e63868ac4216720751eab Mon Sep 17 00:00:00 2001 From: frrist Date: Tue, 26 Apr 2022 19:39:44 -0700 Subject: [PATCH 01/10] refactor: simplify job cli --- chain/gap/fill.go | 4 +- chain/gap/find.go | 6 +- chain/gap/notify.go | 4 +- commands/chain.go | 41 +++--- commands/daemon.go | 26 ++-- commands/gap.go | 257 -------------------------------------- commands/index.go | 239 ----------------------------------- commands/job/gap.go | 110 ++++++++++++++++ commands/job/index.go | 178 ++++++++++++++++++++++++++ commands/{ => job}/job.go | 50 ++++++-- commands/job/options.go | 146 ++++++++++++++++++++++ commands/job/survey.go | 51 ++++++++ commands/job/walk.go | 85 +++++++++++++ commands/job/watch.go | 116 +++++++++++++++++ commands/job/worker.go | 58 +++++++++ commands/lily.go | 6 +- commands/log.go | 18 +-- commands/migrate.go | 5 +- commands/net.go | 30 ++--- commands/stop.go | 6 +- commands/survey.go | 88 ------------- commands/sync.go | 15 +-- commands/util.go | 7 +- commands/wait.go | 8 +- commands/walk.go | 178 -------------------------- commands/watch.go | 168 ------------------------- commands/worker.go | 98 --------------- lens/lily/api.go | 155 ++++++++++------------- lens/lily/impl.go | 182 +++++++++++++-------------- main.go | 15 +-- storage/sql.go | 4 +- 31 files changed, 1033 insertions(+), 1321 deletions(-) delete mode 100644 commands/gap.go delete mode 100644 commands/index.go create mode 100644 commands/job/gap.go create mode 100644 commands/job/index.go rename commands/{ => job}/job.go (75%) create mode 100644 commands/job/options.go create mode 100644 commands/job/survey.go create mode 100644 commands/job/walk.go create mode 100644 commands/job/watch.go create mode 100644 commands/job/worker.go delete mode 100644 commands/survey.go delete mode 100644 commands/walk.go delete mode 100644 commands/watch.go delete mode 100644 commands/worker.go diff --git a/chain/gap/fill.go b/chain/gap/fill.go index 32258e944..0cfb3b9f3 100644 --- a/chain/gap/fill.go +++ b/chain/gap/fill.go @@ -21,12 +21,12 @@ type Filler struct { DB *storage.Database node lens.API name string - minHeight, maxHeight uint64 + minHeight, maxHeight int64 tasks []string done chan struct{} } -func NewFiller(node lens.API, db *storage.Database, name string, minHeight, maxHeight uint64, tasks []string) *Filler { +func NewFiller(node lens.API, db *storage.Database, name string, minHeight, maxHeight int64, tasks []string) *Filler { return &Filler{ DB: db, node: node, diff --git a/chain/gap/find.go b/chain/gap/find.go index 3cbf0a713..3f58a27fd 100644 --- a/chain/gap/find.go +++ b/chain/gap/find.go @@ -16,12 +16,12 @@ type Finder struct { DB *storage.Database node lens.API name string - minHeight, maxHeight uint64 + minHeight, maxHeight int64 tasks []string done chan struct{} } -func NewFinder(node lens.API, db *storage.Database, name string, minHeight, maxHeight uint64, tasks []string) *Finder { +func NewFinder(node lens.API, db *storage.Database, name string, minHeight, maxHeight int64, tasks []string) *Finder { return &Finder{ DB: db, node: node, @@ -89,7 +89,7 @@ func (g *Finder) Run(ctx context.Context) error { if err != nil { return err } - if uint64(head.Height()) < g.maxHeight { + if int64(head.Height()) < g.maxHeight { return xerrors.Errorf("cannot look for gaps beyond chain head height %d", head.Height()) } diff --git a/chain/gap/notify.go b/chain/gap/notify.go index 0f141923d..19c593535 100644 --- a/chain/gap/notify.go +++ b/chain/gap/notify.go @@ -18,12 +18,12 @@ type Notifier struct { queue *queue.AsynQ node lens.API name string - minHeight, maxHeight uint64 + minHeight, maxHeight int64 tasks []string done chan struct{} } -func NewNotifier(node lens.API, db *storage.Database, queue *queue.AsynQ, name string, minHeight, maxHeight uint64, tasks []string) *Notifier { +func NewNotifier(node lens.API, db *storage.Database, queue *queue.AsynQ, name string, minHeight, maxHeight int64, tasks []string) *Notifier { return &Notifier{ DB: db, queue: queue, diff --git a/commands/chain.go b/commands/chain.go index 016beab12..63edf3fa5 100644 --- a/commands/chain.go +++ b/commands/chain.go @@ -9,7 +9,6 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" - "github.com/filecoin-project/lily/lens/lily" "github.com/filecoin-project/lotus/api" lotusbuild "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" @@ -17,6 +16,8 @@ import ( "github.com/ipfs/go-cid" "github.com/urfave/cli/v2" "golang.org/x/xerrors" + + "github.com/filecoin-project/lily/lens/lily" ) var ChainCmd = &cli.Command{ @@ -36,12 +37,12 @@ var ChainCmd = &cli.Command{ var ChainHeadCmd = &cli.Command{ Name: "head", Usage: "Print chain head", - Flags: flagSet( - clientAPIFlagSet, + Flags: FlagSet( + ClientAPIFlagSet, ), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) - lapi, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + lapi, closer, err := GetAPI(ctx) if err != nil { return err } @@ -63,8 +64,8 @@ var ChainGetBlock = &cli.Command{ Name: "getblock", Usage: "Get a block and print its details", ArgsUsage: "[blockCid]", - Flags: flagSet( - clientAPIFlagSet, + Flags: FlagSet( + ClientAPIFlagSet, []cli.Flag{ &cli.BoolFlag{ Name: "raw", @@ -73,7 +74,7 @@ var ChainGetBlock = &cli.Command{ }), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) - lapi, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + lapi, closer, err := GetAPI(ctx) if err != nil { return err } @@ -155,12 +156,12 @@ var ChainReadObjCmd = &cli.Command{ Name: "read-obj", Usage: "Read the raw bytes of an object", ArgsUsage: "[objectCid]", - Flags: flagSet( - clientAPIFlagSet, + Flags: FlagSet( + ClientAPIFlagSet, ), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) - lapi, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + lapi, closer, err := GetAPI(ctx) if err != nil { return err } @@ -190,8 +191,8 @@ var ChainStatObjCmd = &cli.Command{ When a base is provided it will be walked first, and all links visisted will be ignored when the passed in object is walked. `, - Flags: flagSet( - clientAPIFlagSet, + Flags: FlagSet( + ClientAPIFlagSet, []cli.Flag{ &cli.StringFlag{ Name: "base", @@ -200,7 +201,7 @@ var ChainStatObjCmd = &cli.Command{ }), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) - lapi, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + lapi, closer, err := GetAPI(ctx) if err != nil { return err } @@ -240,7 +241,7 @@ var ChainGetMsgCmd = &cli.Command{ } ctx := lotuscli.ReqContext(cctx) - lapi, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + lapi, closer, err := GetAPI(ctx) if err != nil { return err } @@ -282,8 +283,8 @@ var ChainListCmd = &cli.Command{ Name: "list", Aliases: []string{"love"}, Usage: "View a segment of the chain", - Flags: flagSet( - clientAPIFlagSet, + Flags: FlagSet( + ClientAPIFlagSet, []cli.Flag{ &cli.Uint64Flag{Name: "height", DefaultText: "current head"}, &cli.IntFlag{Name: "count", Value: 30}, @@ -299,7 +300,7 @@ var ChainListCmd = &cli.Command{ }), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) - lapi, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + lapi, closer, err := GetAPI(ctx) if err != nil { return err } @@ -413,8 +414,8 @@ var ChainSetHeadCmd = &cli.Command{ Name: "sethead", Usage: "manually set the local nodes head tipset (Caution: normally only used for recovery)", ArgsUsage: "[tipsetkey]", - Flags: flagSet( - clientAPIFlagSet, + Flags: FlagSet( + ClientAPIFlagSet, []cli.Flag{ &cli.BoolFlag{ Name: "genesis", @@ -427,7 +428,7 @@ var ChainSetHeadCmd = &cli.Command{ }), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) - lapi, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + lapi, closer, err := GetAPI(ctx) if err != nil { return err } diff --git a/commands/daemon.go b/commands/daemon.go index 25bcab1c9..0337a4176 100644 --- a/commands/daemon.go +++ b/commands/daemon.go @@ -31,31 +31,31 @@ import ( "github.com/filecoin-project/lily/storage" ) -var clientAPIFlags struct { - apiAddr string - apiToken string +var ClientAPIFlags struct { + ApiAddr string + ApiToken string } -var clientAPIFlag = &cli.StringFlag{ +var ClientAPIFlag = &cli.StringFlag{ Name: "api", Usage: "Address of lily api in multiaddr format.", EnvVars: []string{"LILY_API"}, Value: "/ip4/127.0.0.1/tcp/1234", - Destination: &clientAPIFlags.apiAddr, + Destination: &ClientAPIFlags.ApiAddr, } -var clientTokenFlag = &cli.StringFlag{ +var ClientTokenFlag = &cli.StringFlag{ Name: "api-token", Usage: "Authentication token for lily api.", EnvVars: []string{"LILY_API_TOKEN"}, Value: "", - Destination: &clientAPIFlags.apiToken, + Destination: &ClientAPIFlags.ApiToken, } -// clientAPIFlagSet are used by commands that act as clients of a daemon's API -var clientAPIFlagSet = []cli.Flag{ - clientAPIFlag, - clientTokenFlag, +// ClientAPIFlagSet are used by commands that act as clients of a daemon's API +var ClientAPIFlagSet = []cli.Flag{ + ClientAPIFlag, + ClientTokenFlag, } type daemonOpts struct { @@ -125,7 +125,7 @@ Note that jobs are not persisted between restarts of the daemon. See `, Flags: []cli.Flag{ - clientAPIFlag, + ClientAPIFlag, &cli.StringFlag{ Name: "repo", Usage: "Specify path where lily should store chain state.", @@ -266,7 +266,7 @@ Note that jobs are not persisted between restarts of the daemon. See node.ApplyIf(func(s *node.Settings) bool { return c.IsSet("api") }, node.Override(node.SetApiEndpointKey, func(lr repo.LockedRepo) error { - apima, err := multiaddr.NewMultiaddr(clientAPIFlags.apiAddr) + apima, err := multiaddr.NewMultiaddr(ClientAPIFlags.ApiAddr) if err != nil { return err } diff --git a/commands/gap.go b/commands/gap.go deleted file mode 100644 index 1f2e7a135..000000000 --- a/commands/gap.go +++ /dev/null @@ -1,257 +0,0 @@ -package commands - -import ( - "fmt" - "os" - "strings" - "time" - - lotuscli "github.com/filecoin-project/lotus/cli" - "github.com/urfave/cli/v2" - "golang.org/x/xerrors" - - "github.com/filecoin-project/lily/chain/indexer/tasktype" - "github.com/filecoin-project/lily/lens/lily" - "github.com/filecoin-project/lily/schedule" -) - -type gapOps struct { - apiAddr string - apiToken string - storage string - tasks string - name string - from uint64 - to uint64 - queue string -} - -var gapFlags gapOps - -var GapCmd = &cli.Command{ - Name: "gap", - Usage: "Launch gap filling and finding jobs", - Subcommands: []*cli.Command{ - GapFillCmd, - GapFindCmd, - }, -} - -var GapFillCmd = &cli.Command{ - Name: "fill", - Usage: "Fill gaps in the database", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "api", - Usage: "Address of lily api in multiaddr format.", - EnvVars: []string{"LILY_API"}, - Value: "/ip4/127.0.0.1/tcp/1234", - Destination: &gapFlags.apiAddr, - }, - &cli.StringFlag{ - Name: "api-token", - Usage: "Authentication token for lily api.", - EnvVars: []string{"LILY_API_TOKEN"}, - Value: "", - Destination: &gapFlags.apiToken, - }, - &cli.StringFlag{ - Name: "storage", - Usage: "Name of storage that results will be written to.", - EnvVars: []string{"LILY_STORAGE"}, - Value: "", - Destination: &gapFlags.storage, - }, - &cli.StringFlag{ - Name: "tasks", - Usage: "Comma separated list of tasks to fill. Each task is reported separately in the database. If empty all task will be filled.", - EnvVars: []string{"LILY_TASKS"}, - Value: "", - Destination: &gapFlags.tasks, - }, - &cli.StringFlag{ - Name: "name", - Usage: "Name of job for easy identification later.", - EnvVars: []string{"LILY_JOB_NAME"}, - Value: "", - Destination: &gapFlags.name, - }, - &cli.Uint64Flag{ - Name: "to", - Usage: "to epoch to search for gaps in", - EnvVars: []string{"LILY_TO"}, - Destination: &gapFlags.to, - Required: true, - }, - &cli.Uint64Flag{ - Name: "from", - Usage: "from epoch to search for gaps in", - EnvVars: []string{"LILY_FROM"}, - Destination: &gapFlags.from, - Required: true, - }, - &cli.StringFlag{ - Name: "queue", - Usage: "Name of queue that fill will write missing tipset tasks to.", - EnvVars: []string{"LILY_FILL_QUEUE"}, - Value: "", - Destination: &gapFlags.queue, - }, - }, - Before: func(cctx *cli.Context) error { - from, to := gapFlags.from, gapFlags.to - if to < from { - return xerrors.Errorf("value of --to (%d) should be >= --from (%d)", to, from) - } - - return nil - }, - Action: func(cctx *cli.Context) error { - ctx := lotuscli.ReqContext(cctx) - - var taskList []string - if gapFlags.tasks == "" { - taskList = tasktype.AllTableTasks - } else { - taskList = strings.Split(gapFlags.tasks, ",") - } - - fillName := fmt.Sprintf("fill_%d", time.Now().Unix()) - if gapFlags.name != "" { - fillName = gapFlags.name - } - - api, closer, err := GetAPI(ctx, gapFlags.apiAddr, gapFlags.apiToken) - if err != nil { - return err - } - defer closer() - - var res *schedule.JobSubmitResult - if gapFlags.queue == "" { - res, err = api.LilyGapFill(ctx, &lily.LilyGapFillConfig{ - RestartOnFailure: false, - RestartOnCompletion: false, - RestartDelay: 0, - Storage: gapFlags.storage, - Name: fillName, - Tasks: taskList, - To: gapFlags.to, - From: gapFlags.from, - }) - if err != nil { - return err - } - } else { - res, err = api.LilyGapFillNotify(ctx, &lily.LilyGapFillNotifyConfig{ - RestartOnFailure: false, - RestartOnCompletion: false, - RestartDelay: 0, - Storage: gapFlags.storage, - Name: fillName, - Tasks: taskList, - To: gapFlags.to, - From: gapFlags.from, - Queue: gapFlags.queue, - }) - if err != nil { - return err - } - } - - if err := printNewJob(os.Stdout, res); err != nil { - return err - } - return nil - }, -} - -var GapFindCmd = &cli.Command{ - Name: "find", - Usage: "find gaps in the database", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "api", - Usage: "Address of lily api in multiaddr format.", - EnvVars: []string{"LILY_API"}, - Value: "/ip4/127.0.0.1/tcp/1234", - Destination: &gapFlags.apiAddr, - }, - &cli.StringFlag{ - Name: "api-token", - Usage: "Authentication token for lily api.", - EnvVars: []string{"LILY_API_TOKEN"}, - Value: "", - Destination: &gapFlags.apiToken, - }, - &cli.StringFlag{ - Name: "storage", - Usage: "Name of storage that results will be written to.", - Value: "", - Destination: &gapFlags.storage, - }, - &cli.StringFlag{ - Name: "name", - Usage: "Name of job for easy identification later.", - Value: "", - Destination: &gapFlags.name, - }, - &cli.StringFlag{ - Name: "tasks", - Usage: "Comma separated list of tasks to fill. Each task is reported separately in the database. If empty all task will be filled.", - Value: "", - Destination: &gapFlags.tasks, - }, - &cli.Uint64Flag{ - Name: "to", - Usage: "to epoch to search for gaps in", - Destination: &gapFlags.to, - Required: true, - }, - &cli.Uint64Flag{ - Name: "from", - Usage: "from epoch to search for gaps in", - Destination: &gapFlags.from, - Required: true, - }, - }, - Action: func(cctx *cli.Context) error { - ctx := lotuscli.ReqContext(cctx) - - api, closer, err := GetAPI(ctx, gapFlags.apiAddr, gapFlags.apiToken) - if err != nil { - return err - } - defer closer() - - findName := fmt.Sprintf("find_%d", time.Now().Unix()) - if gapFlags.name != "" { - findName = gapFlags.name - } - - var taskList []string - if gapFlags.tasks == "" { - taskList = tasktype.AllTableTasks - } else { - taskList = strings.Split(gapFlags.tasks, ",") - } - - res, err := api.LilyGapFind(ctx, &lily.LilyGapFindConfig{ - RestartOnFailure: false, - RestartOnCompletion: false, - RestartDelay: 0, - Storage: gapFlags.storage, - Tasks: taskList, - Name: findName, - To: gapFlags.to, - From: gapFlags.from, - }) - if err != nil { - return err - } - if err := printNewJob(os.Stdout, res); err != nil { - return err - } - return nil - }, -} diff --git a/commands/index.go b/commands/index.go deleted file mode 100644 index 72173b2ab..000000000 --- a/commands/index.go +++ /dev/null @@ -1,239 +0,0 @@ -package commands - -import ( - "fmt" - "strconv" - "strings" - "time" - - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/chain/types" - lotuscli "github.com/filecoin-project/lotus/cli" - "github.com/ipfs/go-cid" - "github.com/urfave/cli/v2" - "golang.org/x/xerrors" - - "github.com/filecoin-project/lily/chain/actors/builtin" - "github.com/filecoin-project/lily/chain/indexer/tasktype" - "github.com/filecoin-project/lily/lens/lily" -) - -type indexOps struct { - tasks string - storage string - apiAddr string - apiToken string - name string - window time.Duration - queue string -} - -var indexFlags indexOps - -var IndexTipSetCmd = &cli.Command{ - Name: "tipset", - Usage: "Index the state of a tipset from the filecoin blockchain by tipset key", - Action: func(cctx *cli.Context) error { - ctx := lotuscli.ReqContext(cctx) - - indexName := fmt.Sprintf("index_%d", time.Now().Unix()) - if indexFlags.name != "" { - indexName = indexFlags.name - } - - var tsStr string - if tsStr = cctx.Args().First(); tsStr == "" { - return xerrors.Errorf("tipset argument required") - } - - tsk, err := parseTipSetKey(tsStr) - if err != nil { - return xerrors.Errorf("failed to parse tipset key: %w", err) - } - - taskList := strings.Split(indexFlags.tasks, ",") - if indexFlags.tasks == "*" { - taskList = tasktype.AllTableTasks - } - - api, closer, err := GetAPI(ctx, indexFlags.apiAddr, indexFlags.apiToken) - if err != nil { - return err - } - defer closer() - - if indexFlags.queue == "" { - cfg := &lily.LilyIndexConfig{ - TipSet: tsk, - Name: indexName, - Tasks: taskList, - Storage: indexFlags.storage, - Window: indexFlags.window, - } - - _, err = api.LilyIndex(ctx, cfg) - if err != nil { - return err - } - } else { - cfg := &lily.LilyIndexNotifyConfig{ - TipSet: tsk, - Name: indexName, - Tasks: taskList, - Queue: indexFlags.queue, - } - - _, err = api.LilyIndexNotify(ctx, cfg) - if err != nil { - return err - } - } - - return nil - }, -} - -var IndexHeightCmd = &cli.Command{ - Name: "height", - Usage: "Index the state of a tipset from the filecoin blockchain by height", - Action: func(cctx *cli.Context) error { - ctx := lotuscli.ReqContext(cctx) - - indexName := fmt.Sprintf("index_%d", time.Now().Unix()) - if indexFlags.name != "" { - indexName = indexFlags.name - } - - var tsStr string - if tsStr = cctx.Args().First(); tsStr == "" { - return xerrors.Errorf("height argument required") - } - - height, err := strconv.ParseInt(cctx.Args().First(), 10, 46) - if err != nil { - return err - } - - taskList := strings.Split(indexFlags.tasks, ",") - if indexFlags.tasks == "*" { - taskList = tasktype.AllTableTasks - } - - api, closer, err := GetAPI(ctx, indexFlags.apiAddr, indexFlags.apiToken) - if err != nil { - return err - } - defer closer() - - ts, err := api.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(height), types.EmptyTSK) - if err != nil { - return err - } - - if height != int64(ts.Height()) { - log.Warnf("height (%d) is null round, indexing height %d", height, ts.Height()) - } - - if indexFlags.queue == "" { - cfg := &lily.LilyIndexConfig{ - TipSet: ts.Key(), - Name: indexName, - Tasks: taskList, - Storage: indexFlags.storage, - Window: indexFlags.window, - } - - _, err = api.LilyIndex(ctx, cfg) - if err != nil { - return err - } - } else { - cfg := &lily.LilyIndexNotifyConfig{ - TipSet: ts.Key(), - Name: indexName, - Tasks: taskList, - Queue: indexFlags.queue, - } - - _, err = api.LilyIndexNotify(ctx, cfg) - if err != nil { - return err - } - } - - return nil - }, -} - -var IndexCmd = &cli.Command{ - Name: "index", - Usage: "Index the state of a tipset from the filecoin blockchain.", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "tasks", - Usage: "Comma separated list of tasks to run. Each task is reported separately in the database.", - EnvVars: []string{"LILY_TASKS"}, - Destination: &indexFlags.tasks, - }, - &cli.StringFlag{ - Name: "storage", - Usage: "Name of storage that results will be written to.", - EnvVars: []string{"LILY_STORAGE"}, - Value: "", - Destination: &indexFlags.storage, - }, - &cli.StringFlag{ - Name: "api", - Usage: "Address of lily api in multiaddr format.", - EnvVars: []string{"LILY_API"}, - Value: "/ip4/127.0.0.1/tcp/1234", - Destination: &indexFlags.apiAddr, - }, - &cli.StringFlag{ - Name: "api-token", - Usage: "Authentication token for lily api.", - EnvVars: []string{"LILY_API_TOKEN"}, - Value: "", - Destination: &indexFlags.apiToken, - }, - &cli.StringFlag{ - Name: "name", - Usage: "Name of job for easy identification later.", - EnvVars: []string{"LILY_JOB_NAME"}, - Value: "", - Destination: &indexFlags.name, - }, - &cli.DurationFlag{ - Name: "window", - Usage: "Duration after which any indexing work not completed will be marked incomplete", - EnvVars: []string{"LILY_WINDOW"}, - Value: builtin.EpochDurationSeconds * time.Second, - Destination: &indexFlags.window, - }, - &cli.StringFlag{ - Name: "queue", - Usage: "Name of queue that index will write tipsets to. If empty the node will index tipsets locally. If populated the node will write the tipset to the queue for tipset-workers to consume", - EnvVars: []string{"LILY_INDEX_QUEUE"}, - Value: "", - Destination: &indexFlags.queue, - }, - }, - Subcommands: []*cli.Command{ - IndexTipSetCmd, - IndexHeightCmd, - }, -} - -func parseTipSetKey(val string) (types.TipSetKey, error) { - tskStr := strings.Split(val, ",") - var cids []cid.Cid - for _, c := range tskStr { - blkc, err := cid.Decode(c) - if err != nil { - return types.EmptyTSK, err - } - cids = append(cids, blkc) - } - - return types.NewTipSetKey(cids...), nil -} diff --git a/commands/job/gap.go b/commands/job/gap.go new file mode 100644 index 000000000..eec2e2286 --- /dev/null +++ b/commands/job/gap.go @@ -0,0 +1,110 @@ +package job + +import ( + "os" + + lotuscli "github.com/filecoin-project/lotus/cli" + "github.com/urfave/cli/v2" + + "github.com/filecoin-project/lily/commands" + "github.com/filecoin-project/lily/lens/lily" +) + +var GapFindCmd = &cli.Command{ + Name: "find", + Usage: "find gaps in the database", + Flags: []cli.Flag{ + RangeFromFlag, + RangeToFlag, + }, + Before: func(cctx *cli.Context) error { + return rangeFlags.validate() + }, + Action: func(cctx *cli.Context) error { + ctx := lotuscli.ReqContext(cctx) + + api, closer, err := commands.GetAPI(ctx) + if err != nil { + return err + } + defer closer() + + res, err := api.LilyGapFind(ctx, &lily.LilyGapFindConfig{ + JobConfig: RunFlags.ParseJobConfig(), + To: rangeFlags.to, + From: rangeFlags.from, + }) + if err != nil { + return err + } + return commands.PrintNewJob(os.Stdout, res) + }, +} + +var GapFillCmd = &cli.Command{ + Name: "fill", + Usage: "Fill gaps in the database", + Flags: []cli.Flag{ + RangeFromFlag, + RangeToFlag, + }, + Subcommands: []*cli.Command{ + GapFillNotifyCmd, + }, + Before: func(cctx *cli.Context) error { + return rangeFlags.validate() + }, + Action: func(cctx *cli.Context) error { + ctx := lotuscli.ReqContext(cctx) + + api, closer, err := commands.GetAPI(ctx) + if err != nil { + return err + } + defer closer() + + res, err := api.LilyGapFill(ctx, &lily.LilyGapFillConfig{ + JobConfig: RunFlags.ParseJobConfig(), + To: rangeFlags.to, + From: rangeFlags.from, + }) + if err != nil { + return err + } + + return commands.PrintNewJob(os.Stdout, res) + }, +} + +var GapFillNotifyCmd = &cli.Command{ + Name: "notify", + Flags: []cli.Flag{ + NotifyQueueFlag, + }, + Action: func(cctx *cli.Context) error { + ctx := lotuscli.ReqContext(cctx) + + api, closer, err := commands.GetAPI(ctx) + if err != nil { + return err + } + defer closer() + + cfg := &lily.LilyGapFillNotifyConfig{ + GapFillConfig: lily.LilyGapFillConfig{ + JobConfig: RunFlags.ParseJobConfig(), + From: rangeFlags.from, + To: rangeFlags.to, + }, + Queue: notifyFlags.queue, + } + + res, err := api.LilyGapFillNotify(ctx, cfg) + if err != nil { + return err + } + + return commands.PrintNewJob(os.Stdout, res) + + }, +} diff --git a/commands/job/index.go b/commands/job/index.go new file mode 100644 index 000000000..138234b89 --- /dev/null +++ b/commands/job/index.go @@ -0,0 +1,178 @@ +package job + +import ( + "strings" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/chain/types" + lotuscli "github.com/filecoin-project/lotus/cli" + "github.com/ipfs/go-cid" + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lily/commands" + "github.com/filecoin-project/lily/lens/lily" +) + +type indexOps struct { + height int64 + tsKey string + // must be set in before function + tipsetKey types.TipSetKey +} + +var indexFlags indexOps + +var IndexCmd = &cli.Command{ + Name: "index", + Usage: "Index the state of a tipset from the filecoin blockchain.", + Subcommands: []*cli.Command{ + IndexTipSetCmd, + IndexHeightCmd, + }, +} + +var IndexTipSetCmd = &cli.Command{ + Name: "tipset", + Usage: "Index the state of a tipset from the filecoin blockchain by tipset key", + Subcommands: []*cli.Command{ + IndexNotifyCmd, + }, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "tipset", + Usage: "TipSetKey to index", + Destination: &indexFlags.tsKey, + Required: true, + }, + }, + Before: func(cctx *cli.Context) error { + tsk, err := parseTipSetKey(indexFlags.tsKey) + if err != nil { + return xerrors.Errorf("failed to parse tipset key: %w", err) + } + indexFlags.tipsetKey = tsk + + return nil + }, + Action: func(cctx *cli.Context) error { + ctx := lotuscli.ReqContext(cctx) + + api, closer, err := commands.GetAPI(ctx) + if err != nil { + return err + } + defer closer() + + _, err = api.LilyIndex(ctx, &lily.LilyIndexConfig{ + JobConfig: RunFlags.ParseJobConfig(), + TipSet: indexFlags.tipsetKey, + }) + if err != nil { + return err + } + + return nil + }, +} + +var IndexHeightCmd = &cli.Command{ + Name: "height", + Usage: "Index the state of a tipset from the filecoin blockchain by height", + Flags: []cli.Flag{ + &cli.Int64Flag{ + Name: "height", + Usage: "Height to index", + Destination: &indexFlags.height, + Required: true, + }, + }, + Subcommands: []*cli.Command{ + IndexNotifyCmd, + }, + Before: func(cctx *cli.Context) error { + ctx := lotuscli.ReqContext(cctx) + + api, closer, err := commands.GetAPI(ctx) + if err != nil { + return err + } + defer closer() + + ts, err := api.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(indexFlags.height), types.EmptyTSK) + if err != nil { + return err + } + + if indexFlags.height != int64(ts.Height()) { + return xerrors.Errorf("height (%d) is null round, next non-null round height: %d", indexFlags.height, ts.Height()) + } + indexFlags.tipsetKey = ts.Key() + + return nil + }, + Action: func(cctx *cli.Context) error { + ctx := lotuscli.ReqContext(cctx) + + api, closer, err := commands.GetAPI(ctx) + if err != nil { + return err + } + defer closer() + + _, err = api.LilyIndex(ctx, &lily.LilyIndexConfig{ + JobConfig: RunFlags.ParseJobConfig(), + TipSet: indexFlags.tipsetKey, + }) + if err != nil { + return err + } + + return nil + }, +} + +var IndexNotifyCmd = &cli.Command{ + Name: "notify", + Flags: []cli.Flag{ + NotifyQueueFlag, + }, + Action: func(cctx *cli.Context) error { + ctx := lotuscli.ReqContext(cctx) + + api, closer, err := commands.GetAPI(ctx) + if err != nil { + return err + } + defer closer() + + cfg := &lily.LilyIndexNotifyConfig{ + IndexConfig: lily.LilyIndexConfig{ + JobConfig: RunFlags.ParseJobConfig(), + TipSet: indexFlags.tipsetKey, + }, + Queue: notifyFlags.queue, + } + + _, err = api.LilyIndexNotify(ctx, cfg) + if err != nil { + return err + } + + return nil + }, +} + +func parseTipSetKey(val string) (types.TipSetKey, error) { + tskStr := strings.Split(val, ",") + var cids []cid.Cid + for _, c := range tskStr { + blkc, err := cid.Decode(c) + if err != nil { + return types.EmptyTSK, err + } + cids = append(cids, blkc) + } + + return types.NewTipSetKey(cids...), nil +} diff --git a/commands/job.go b/commands/job/job.go similarity index 75% rename from commands/job.go rename to commands/job/job.go index 8cef5f607..55e9075c3 100644 --- a/commands/job.go +++ b/commands/job/job.go @@ -1,4 +1,4 @@ -package commands +package job import ( "encoding/json" @@ -8,6 +8,7 @@ import ( lotuscli "github.com/filecoin-project/lotus/cli" "github.com/urfave/cli/v2" + "github.com/filecoin-project/lily/commands" "github.com/filecoin-project/lily/schedule" ) @@ -15,6 +16,7 @@ var JobCmd = &cli.Command{ Name: "job", Usage: "Manage jobs being run by the daemon.", Subcommands: []*cli.Command{ + JobRunCmd, JobStartCmd, JobStopCmd, JobWaitCmd, @@ -22,6 +24,28 @@ var JobCmd = &cli.Command{ }, } +var JobRunCmd = &cli.Command{ + Name: "run", + Usage: "run a job", + Flags: []cli.Flag{ + RunWindowFlag, + RunTaskFlag, + RunStorageFlag, + RunNameFlag, + RunRestartDelayFlag, + RunRestartFailure, + RunRestartCompletion, + }, + Subcommands: []*cli.Command{ + WalkCmd, + WatchCmd, + IndexCmd, + SurveyCmd, + GapFillCmd, + GapFillCmd, + }, +} + var jobControlFlags struct { ID int } @@ -29,8 +53,8 @@ var jobControlFlags struct { var JobStartCmd = &cli.Command{ Name: "start", Usage: "start a job.", - Flags: flagSet( - clientAPIFlagSet, + Flags: commands.FlagSet( + commands.ClientAPIFlagSet, []cli.Flag{ &cli.IntFlag{ Name: "id", @@ -42,7 +66,7 @@ var JobStartCmd = &cli.Command{ ), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) - api, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + api, closer, err := commands.GetAPI(ctx) if err != nil { return err } @@ -55,8 +79,8 @@ var JobStartCmd = &cli.Command{ var JobStopCmd = &cli.Command{ Name: "stop", Usage: "stop a job.", - Flags: flagSet( - clientAPIFlagSet, + Flags: commands.FlagSet( + commands.ClientAPIFlagSet, []cli.Flag{ &cli.IntFlag{ Name: "id", @@ -68,7 +92,7 @@ var JobStopCmd = &cli.Command{ ), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) - api, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + api, closer, err := commands.GetAPI(ctx) if err != nil { return err } @@ -81,12 +105,12 @@ var JobStopCmd = &cli.Command{ var JobListCmd = &cli.Command{ Name: "list", Usage: "list all jobs and their status", - Flags: flagSet( - clientAPIFlagSet, + Flags: commands.FlagSet( + commands.ClientAPIFlagSet, ), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) - api, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + api, closer, err := commands.GetAPI(ctx) if err != nil { return err } @@ -110,8 +134,8 @@ var JobListCmd = &cli.Command{ var JobWaitCmd = &cli.Command{ Name: "wait", Usage: "wait on a job to complete.", - Flags: flagSet( - clientAPIFlagSet, + Flags: commands.FlagSet( + commands.ClientAPIFlagSet, []cli.Flag{ &cli.IntFlag{ Name: "id", @@ -123,7 +147,7 @@ var JobWaitCmd = &cli.Command{ ), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) - api, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + api, closer, err := commands.GetAPI(ctx) if err != nil { return err } diff --git a/commands/job/options.go b/commands/job/options.go new file mode 100644 index 000000000..c58d3bf69 --- /dev/null +++ b/commands/job/options.go @@ -0,0 +1,146 @@ +package job + +import ( + "fmt" + "time" + + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lily/lens/lily" +) + +type runOpts struct { + Storage string + Name string + + Tasks *cli.StringSlice + + Window time.Duration + RestartDelay time.Duration + + RestartCompletion bool + RestartFailure bool +} + +func (r runOpts) ParseJobConfig() lily.LilyJobConfig { + if RunFlags.Name == "" { + RunFlags.Name = fmt.Sprintf("job_%d", time.Now().Unix()) + } + if len(RunFlags.Tasks.Value()) == 0 { + // TODO don't panic + panic("need tasks") + } + // TODO handle task wild card * + return lily.LilyJobConfig{ + Name: RunFlags.Name, + Storage: RunFlags.Storage, + Tasks: RunFlags.Tasks.Value(), + Window: RunFlags.Window, + RestartOnFailure: RunFlags.RestartFailure, + RestartOnCompletion: RunFlags.RestartCompletion, + RestartDelay: RunFlags.RestartDelay, + } +} + +var RunFlags runOpts + +var RunWindowFlag = &cli.DurationFlag{ + Name: "window", + Usage: "Duaration after which job execution will be cancled", + EnvVars: []string{"LILY_JOB_WINDOW"}, + Destination: &RunFlags.Window, +} + +var RunTaskFlag = &cli.StringSliceFlag{ + Name: "tasks", + Usage: "Comma separated list of tasks to run in job. Each task is reported separately in the storage backend.", + EnvVars: []string{"LILY_JOB_TASKS"}, + Destination: RunFlags.Tasks, +} + +var RunStorageFlag = &cli.StringFlag{ + Name: "storage", + Usage: "Name of storage that job will write result to.", + EnvVars: []string{"LILY_JOB_STORAGE"}, + Value: "", + Destination: &RunFlags.Storage, +} + +var RunNameFlag = &cli.StringFlag{ + Name: "name", + Usage: "Name of job for easy identification later.", + EnvVars: []string{"LILY_JOB_NAME"}, + Value: "", + Destination: &RunFlags.Name, +} + +var RunRestartDelayFlag = &cli.DurationFlag{ + Name: "restart-delay", + Usage: "Duration to wait before restarting job", + EnvVars: []string{"LILY_JOB_RESTART_DELAY"}, + Value: 0, + Destination: &RunFlags.RestartDelay, +} + +var RunRestartCompletion = &cli.BoolFlag{ + Name: "restart-on-completion", + Usage: "Restart the job after it completes", + EnvVars: []string{"LILY_JOB_RESTART_COMPLETION"}, + Value: false, + Destination: &RunFlags.RestartCompletion, +} + +var RunRestartFailure = &cli.BoolFlag{ + Name: "restart-on-failure", + Usage: "Restart the job if it fails", + EnvVars: []string{"LILY_JOB_RESTART_FAILURE"}, + Value: false, + Destination: &RunFlags.RestartFailure, +} + +type notifyOps struct { + queue string +} + +var notifyFlags notifyOps + +var NotifyQueueFlag = &cli.StringFlag{ + Name: "queue", + Usage: "Name of queue system that job will notify.", + EnvVars: []string{"LILY_JOB_QUEUE"}, + Value: "", + Destination: ¬ifyFlags.queue, +} + +type rangeOps struct { + from int64 + to int64 +} + +var rangeFlags rangeOps + +func (r rangeOps) validate() error { + from, to := rangeFlags.from, rangeFlags.to + if to < from { + return xerrors.Errorf("value of --to (%d) should be >= --from (%d)", to, from) + } + + return nil +} + +var RangeFromFlag = &cli.Int64Flag{ + Name: "from", + Usage: "Limit actor and message processing to tipsets at or above `HEIGHT`", + EnvVars: []string{"LILY_FROM"}, + Destination: &rangeFlags.from, + Required: true, +} + +var RangeToFlag = &cli.Int64Flag{ + Name: "to", + Usage: "Limit actor and message processing to tipsets at or below `HEIGHT`", + EnvVars: []string{"LILY_TO"}, + Destination: &rangeFlags.to, + Required: true, +} diff --git a/commands/job/survey.go b/commands/job/survey.go new file mode 100644 index 000000000..325a8e225 --- /dev/null +++ b/commands/job/survey.go @@ -0,0 +1,51 @@ +package job + +import ( + "os" + "time" + + lotuscli "github.com/filecoin-project/lotus/cli" + "github.com/urfave/cli/v2" + + "github.com/filecoin-project/lily/commands" + "github.com/filecoin-project/lily/lens/lily" +) + +var surveyFlags struct { + tasks string + storage string + name string + interval time.Duration +} + +var SurveyCmd = &cli.Command{ + Name: "survey", + Usage: "Start a daemon job to survey the node and its environment.", + Flags: []cli.Flag{ + &cli.DurationFlag{ + Name: "interval", + Usage: "Interval to wait between each survey", + Value: 10 * time.Minute, + Destination: &surveyFlags.interval, + }, + }, + Action: func(cctx *cli.Context) error { + ctx := lotuscli.ReqContext(cctx) + + api, closer, err := commands.GetAPI(ctx) + if err != nil { + return err + } + defer closer() + + res, err := api.LilySurvey(ctx, &lily.LilySurveyConfig{ + JobConfig: RunFlags.ParseJobConfig(), + Interval: surveyFlags.interval, + }) + if err != nil { + return err + } + + return commands.PrintNewJob(os.Stdout, res) + }, +} diff --git a/commands/job/walk.go b/commands/job/walk.go new file mode 100644 index 000000000..e85817a04 --- /dev/null +++ b/commands/job/walk.go @@ -0,0 +1,85 @@ +package job + +import ( + "os" + + lotuscli "github.com/filecoin-project/lotus/cli" + "github.com/urfave/cli/v2" + + "github.com/filecoin-project/lily/commands" + "github.com/filecoin-project/lily/lens/lily" +) + +var WalkCmd = &cli.Command{ + Name: "walk", + Usage: "Start a daemon job to walk a range of the filecoin blockchain.", + Flags: []cli.Flag{ + RangeFromFlag, + RangeToFlag, + }, + Subcommands: []*cli.Command{ + WalkNotifyCmd, + }, + Before: func(cctx *cli.Context) error { + return rangeFlags.validate() + }, + Action: func(cctx *cli.Context) error { + ctx := lotuscli.ReqContext(cctx) + + api, closer, err := commands.GetAPI(ctx) + if err != nil { + return err + } + defer closer() + + cfg := &lily.LilyWalkConfig{ + JobConfig: RunFlags.ParseJobConfig(), + From: rangeFlags.from, + To: rangeFlags.to, + } + + res, err := api.LilyWalk(ctx, cfg) + if err != nil { + return err + } + + if err := commands.PrintNewJob(os.Stdout, res); err != nil { + return err + } + + return nil + }, +} + +var WalkNotifyCmd = &cli.Command{ + Name: "notify", + Flags: []cli.Flag{ + NotifyQueueFlag, + }, + Action: func(cctx *cli.Context) error { + ctx := lotuscli.ReqContext(cctx) + + api, closer, err := commands.GetAPI(ctx) + if err != nil { + return err + } + defer closer() + + cfg := &lily.LilyWalkNotifyConfig{ + WalkConfig: lily.LilyWalkConfig{ + JobConfig: RunFlags.ParseJobConfig(), + From: rangeFlags.from, + To: rangeFlags.to, + }, + Queue: notifyFlags.queue, + } + + res, err := api.LilyWalkNotify(ctx, cfg) + if err != nil { + return err + } + + return commands.PrintNewJob(os.Stdout, res) + + }, +} diff --git a/commands/job/watch.go b/commands/job/watch.go new file mode 100644 index 000000000..adcd3cff7 --- /dev/null +++ b/commands/job/watch.go @@ -0,0 +1,116 @@ +package job + +import ( + "os" + + lotuscli "github.com/filecoin-project/lotus/cli" + "github.com/urfave/cli/v2" + + "github.com/filecoin-project/lily/commands" + "github.com/filecoin-project/lily/lens/lily" + "github.com/filecoin-project/lily/schedule" +) + +type watchOps struct { + confidence int + workers int + bufferSize int +} + +var watchFlags watchOps + +var WatchConfidenceFlag = &cli.IntFlag{ + Name: "confidence", + Usage: "Sets the size of the cache used to hold tipsets for possible reversion before being committed to the database", + EnvVars: []string{"LILY_CONFIDENCE"}, + Value: 2, + Destination: &watchFlags.confidence, +} +var WatchWorkersFlag = &cli.IntFlag{ + Name: "workers", + Usage: "Sets the number of tipsets that may be simultaneous indexed while watching", + EnvVars: []string{"LILY_WATCH_WORKERS"}, + Value: 2, + Destination: &watchFlags.workers, +} +var WatchBufferSizeFlag = &cli.IntFlag{ + Name: "buffer-size", + Usage: "Set the number of tipsets the watcher will buffer while waiting for a worker to accept the work", + EnvVars: []string{"LILY_WATCH_BUFFER"}, + Value: 5, + Destination: &watchFlags.bufferSize, +} + +var WatchCmd = &cli.Command{ + Name: "watch", + Usage: "Start a daemon job to watch the head of the filecoin blockchain.", + Flags: []cli.Flag{ + WatchConfidenceFlag, + WatchWorkersFlag, + WatchBufferSizeFlag, + }, + Subcommands: []*cli.Command{ + WatchNotifyCmd, + }, + Action: func(cctx *cli.Context) error { + ctx := lotuscli.ReqContext(cctx) + + api, closer, err := commands.GetAPI(ctx) + if err != nil { + return err + } + defer closer() + + var res *schedule.JobSubmitResult + cfg := &lily.LilyWatchConfig{ + JobConfig: RunFlags.ParseJobConfig(), + BufferSize: watchFlags.bufferSize, + Confidence: watchFlags.confidence, + Workers: watchFlags.workers, + } + + res, err = api.LilyWatch(ctx, cfg) + if err != nil { + return err + } + + if err := commands.PrintNewJob(os.Stdout, res); err != nil { + return err + } + + return nil + }, +} + +var WatchNotifyCmd = &cli.Command{ + Name: "notify", + Flags: []cli.Flag{ + NotifyQueueFlag, + }, + Action: func(cctx *cli.Context) error { + ctx := lotuscli.ReqContext(cctx) + + api, closer, err := commands.GetAPI(ctx) + if err != nil { + return err + } + defer closer() + + cfg := &lily.LilyWatchNotifyConfig{ + JobConfig: RunFlags.ParseJobConfig(), + + Confidence: watchFlags.confidence, + BufferSize: watchFlags.bufferSize, + + Queue: notifyFlags.queue, + } + + res, err := api.LilyWatchNotify(ctx, cfg) + if err != nil { + return err + } + + return commands.PrintNewJob(os.Stdout, res) + + }, +} diff --git a/commands/job/worker.go b/commands/job/worker.go new file mode 100644 index 000000000..ebb28511d --- /dev/null +++ b/commands/job/worker.go @@ -0,0 +1,58 @@ +package job + +import ( + "os" + + lotuscli "github.com/filecoin-project/lotus/cli" + "github.com/urfave/cli/v2" + + "github.com/filecoin-project/lily/commands" + "github.com/filecoin-project/lily/lens/lily" +) + +var tipsetWorkerFlags struct { + queue string + concurrency int +} + +var TipSetWorkerCmd = &cli.Command{ + Name: "tipset-worker", + Flags: commands.FlagSet( + commands.ClientAPIFlagSet, + []cli.Flag{ + &cli.IntFlag{ + Name: "concurrency", + Usage: "Concurrency sets the maximum number of concurrent processing of tasks. If set to a zero or negative value it will be set to the number of CPUs usable by the current process.", + Value: 1, + Destination: &tipsetWorkerFlags.concurrency, + }, + &cli.StringFlag{ + Name: "queue", + Usage: "Name of queue worker will consume work from.", + EnvVars: []string{"LILY_TSWORKER_QUEUE"}, + Value: "", + Destination: &tipsetWorkerFlags.queue, + }, + }, + ), + Action: func(cctx *cli.Context) error { + ctx := lotuscli.ReqContext(cctx) + + api, closer, err := commands.GetAPI(ctx) + if err != nil { + return err + } + defer closer() + + res, err := api.StartTipSetWorker(ctx, &lily.LilyTipSetWorkerConfig{ + JobConfig: RunFlags.ParseJobConfig(), + Queue: tipsetWorkerFlags.queue, + Concurrency: tipsetWorkerFlags.concurrency, + }) + if err != nil { + return err + } + + return commands.PrintNewJob(os.Stdout, res) + }, +} diff --git a/commands/lily.go b/commands/lily.go index 6ce958832..dc39a9270 100644 --- a/commands/lily.go +++ b/commands/lily.go @@ -14,10 +14,10 @@ import ( "github.com/filecoin-project/lily/lens/lily" ) -func GetAPI(ctx context.Context, addrStr string, token string) (lily.LilyAPI, jsonrpc.ClientCloser, error) { - addrStr = strings.TrimSpace(addrStr) +func GetAPI(ctx context.Context) (lily.LilyAPI, jsonrpc.ClientCloser, error) { + addrStr := strings.TrimSpace(ClientAPIFlags.ApiAddr) - ainfo := cliutil.APIInfo{Addr: addrStr, Token: []byte(token)} + ainfo := cliutil.APIInfo{Addr: addrStr, Token: []byte(ClientAPIFlags.ApiToken)} addr, err := ainfo.DialArgs("v0") if err != nil { diff --git a/commands/log.go b/commands/log.go index 01b208d7b..e00465462 100644 --- a/commands/log.go +++ b/commands/log.go @@ -32,13 +32,13 @@ var LogCmd = &cli.Command{ var LogList = &cli.Command{ Name: "list", Usage: "List log systems", - Flags: flagSet( - clientAPIFlagSet, + Flags: FlagSet( + ClientAPIFlagSet, ), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) - api, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + api, closer, err := GetAPI(ctx) if err != nil { return err } @@ -75,8 +75,8 @@ var LogSetLevel = &cli.Command{ warn error `, - Flags: flagSet( - clientAPIFlagSet, + Flags: FlagSet( + ClientAPIFlagSet, []cli.Flag{ &cli.StringSliceFlag{ Name: "system", @@ -88,7 +88,7 @@ var LogSetLevel = &cli.Command{ Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) - api, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + api, closer, err := GetAPI(ctx) if err != nil { return err } @@ -132,13 +132,13 @@ var LogSetLevelRegex = &cli.Command{ warn error `, - Flags: flagSet( - clientAPIFlagSet, + Flags: FlagSet( + ClientAPIFlagSet, ), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) - api, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + api, closer, err := GetAPI(ctx) if err != nil { return err } diff --git a/commands/migrate.go b/commands/migrate.go index a9631f0b1..71c3e2a05 100644 --- a/commands/migrate.go +++ b/commands/migrate.go @@ -4,10 +4,11 @@ import ( "fmt" "os" - "github.com/filecoin-project/lily/version" "github.com/urfave/cli/v2" "golang.org/x/xerrors" + "github.com/filecoin-project/lily/version" + "github.com/filecoin-project/lily/model" "github.com/filecoin-project/lily/storage" ) @@ -66,7 +67,7 @@ var dbConnectFlags = []cli.Flag{ var MigrateCmd = &cli.Command{ Name: "migrate", Usage: "Manage the schema version installed in a database.", - Flags: flagSet( + Flags: FlagSet( dbConnectFlags, []cli.Flag{ &cli.StringFlag{ diff --git a/commands/net.go b/commands/net.go index 1e06fcb1e..1af06e3f1 100644 --- a/commands/net.go +++ b/commands/net.go @@ -29,12 +29,12 @@ var NetCmd = &cli.Command{ var NetID = &cli.Command{ Name: "id", Usage: "Get peer ID of libp2p node used by daemon", - Flags: flagSet( - clientAPIFlagSet, + Flags: FlagSet( + ClientAPIFlagSet, ), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) - lapi, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + lapi, closer, err := GetAPI(ctx) if err != nil { return xerrors.Errorf("get api: %w", err) } @@ -53,12 +53,12 @@ var NetID = &cli.Command{ var NetListen = &cli.Command{ Name: "listen", Usage: "List libp2p addresses daemon is listening on", - Flags: flagSet( - clientAPIFlagSet, + Flags: FlagSet( + ClientAPIFlagSet, ), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) - lapi, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + lapi, closer, err := GetAPI(ctx) if err != nil { return xerrors.Errorf("get api: %w", err) } @@ -79,8 +79,8 @@ var NetListen = &cli.Command{ var NetPeers = &cli.Command{ Name: "peers", Usage: "List peers daemon is connected to", - Flags: flagSet( - clientAPIFlagSet, + Flags: FlagSet( + ClientAPIFlagSet, []cli.Flag{ &cli.BoolFlag{ Name: "agent", @@ -96,7 +96,7 @@ var NetPeers = &cli.Command{ ), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) - lapi, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + lapi, closer, err := GetAPI(ctx) if err != nil { return xerrors.Errorf("get api: %w", err) } @@ -157,12 +157,12 @@ var NetPeers = &cli.Command{ var NetReachability = &cli.Command{ Name: "reachability", Usage: "Print information about reachability from the Internet", - Flags: flagSet( - clientAPIFlagSet, + Flags: FlagSet( + ClientAPIFlagSet, ), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) - lapi, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + lapi, closer, err := GetAPI(ctx) if err != nil { return xerrors.Errorf("get api: %w", err) } @@ -184,8 +184,8 @@ var NetReachability = &cli.Command{ var NetScores = &cli.Command{ Name: "scores", Usage: "List scores assigned to peers", - Flags: flagSet( - clientAPIFlagSet, + Flags: FlagSet( + ClientAPIFlagSet, []cli.Flag{ &cli.BoolFlag{ Name: "extended", @@ -196,7 +196,7 @@ var NetScores = &cli.Command{ ), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) - lapi, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + lapi, closer, err := GetAPI(ctx) if err != nil { return xerrors.Errorf("get api: %w", err) } diff --git a/commands/stop.go b/commands/stop.go index bbbddd416..dc9f7fe60 100644 --- a/commands/stop.go +++ b/commands/stop.go @@ -8,12 +8,12 @@ import ( var StopCmd = &cli.Command{ Name: "stop", Usage: "Stop a running lily daemon", - Flags: flagSet( - clientAPIFlagSet, + Flags: FlagSet( + ClientAPIFlagSet, ), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) - lapi, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + lapi, closer, err := GetAPI(ctx) if err != nil { return err } diff --git a/commands/survey.go b/commands/survey.go deleted file mode 100644 index b0babe270..000000000 --- a/commands/survey.go +++ /dev/null @@ -1,88 +0,0 @@ -package commands - -import ( - "fmt" - "os" - "strings" - "time" - - lotuscli "github.com/filecoin-project/lotus/cli" - "github.com/urfave/cli/v2" - - "github.com/filecoin-project/lily/lens/lily" - "github.com/filecoin-project/lily/network" -) - -var surveyFlags struct { - tasks string - storage string - name string - interval time.Duration -} - -var SurveyCmd = &cli.Command{ - Name: "survey", - Usage: "Start a daemon job to survey the node and its environment.", - Flags: flagSet( - clientAPIFlagSet, - []cli.Flag{ - &cli.StringFlag{ - Name: "tasks", - Usage: "Comma separated list of survey tasks to run. Each task is reported separately in the database.", - Value: strings.Join([]string{network.PeerAgentsTask}, ","), - Destination: &surveyFlags.tasks, - }, - &cli.DurationFlag{ - Name: "interval", - Usage: "Interval to wait between each survey", - Value: 10 * time.Minute, - Destination: &surveyFlags.interval, - }, - &cli.StringFlag{ - Name: "storage", - Usage: "Name of storage that results will be written to.", - Value: "", - Destination: &surveyFlags.storage, - }, - &cli.StringFlag{ - Name: "name", - Usage: "Name of job for easy identification later.", - Value: "", - Destination: &surveyFlags.name, - }, - }, - ), - Action: func(cctx *cli.Context) error { - ctx := lotuscli.ReqContext(cctx) - - survName := fmt.Sprintf("survey_%d", time.Now().Unix()) - if surveyFlags.name != "" { - survName = surveyFlags.name - } - - api, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) - if err != nil { - return err - } - defer closer() - - cfg := &lily.LilySurveyConfig{ - Name: survName, - Tasks: strings.Split(surveyFlags.tasks, ","), - Interval: surveyFlags.interval, - RestartDelay: 0, - RestartOnCompletion: false, - RestartOnFailure: true, - Storage: surveyFlags.storage, - } - - res, err := api.LilySurvey(ctx, cfg) - if err != nil { - return err - } - if err := printNewJob(os.Stdout, res); err != nil { - return err - } - return nil - }, -} diff --git a/commands/sync.go b/commands/sync.go index fb087370c..d874f0605 100644 --- a/commands/sync.go +++ b/commands/sync.go @@ -6,12 +6,13 @@ import ( "time" "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lily/chain/actors/builtin" "github.com/filecoin-project/lotus/api" lotuscli "github.com/filecoin-project/lotus/cli" cid "github.com/ipfs/go-cid" "github.com/urfave/cli/v2" + "github.com/filecoin-project/lily/chain/actors/builtin" + "github.com/filecoin-project/lily/lens/lily" ) @@ -27,12 +28,12 @@ var SyncCmd = &cli.Command{ var SyncStatusCmd = &cli.Command{ Name: "status", Usage: "Report sync status of a running lily daemon", - Flags: flagSet( - clientAPIFlagSet, + Flags: FlagSet( + ClientAPIFlagSet, ), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) - lapi, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + lapi, closer, err := GetAPI(ctx) if err != nil { return err } @@ -83,8 +84,8 @@ var SyncStatusCmd = &cli.Command{ var SyncWaitCmd = &cli.Command{ Name: "wait", Usage: "Wait for sync to be complete", - Flags: flagSet( - clientAPIFlagSet, + Flags: FlagSet( + ClientAPIFlagSet, []cli.Flag{ &cli.BoolFlag{ Name: "watch", @@ -94,7 +95,7 @@ var SyncWaitCmd = &cli.Command{ ), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) - lapi, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + lapi, closer, err := GetAPI(ctx) if err != nil { return err } diff --git a/commands/util.go b/commands/util.go index 1e0446b1b..8f1a0b484 100644 --- a/commands/util.go +++ b/commands/util.go @@ -5,11 +5,12 @@ import ( "fmt" "io" - "github.com/filecoin-project/lily/schedule" "github.com/urfave/cli/v2" + + "github.com/filecoin-project/lily/schedule" ) -func flagSet(fs ...[]cli.Flag) []cli.Flag { +func FlagSet(fs ...[]cli.Flag) []cli.Flag { var flags []cli.Flag for _, f := range fs { @@ -19,7 +20,7 @@ func flagSet(fs ...[]cli.Flag) []cli.Flag { return flags } -func printNewJob(w io.Writer, res *schedule.JobSubmitResult) error { +func PrintNewJob(w io.Writer, res *schedule.JobSubmitResult) error { prettyJob, err := json.MarshalIndent(res, "", "\t") if err != nil { return err diff --git a/commands/wait.go b/commands/wait.go index 18d0d2b80..dd74622ae 100644 --- a/commands/wait.go +++ b/commands/wait.go @@ -13,8 +13,8 @@ import ( var WaitApiCmd = &cli.Command{ Name: "wait-api", Usage: "Wait for lily api to come online", - Flags: flagSet( - clientAPIFlagSet, + Flags: FlagSet( + ClientAPIFlagSet, []cli.Flag{ &cli.DurationFlag{ Name: "timeout", @@ -33,7 +33,7 @@ var WaitApiCmd = &cli.Command{ } for { - err := checkAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) + err := checkAPI(ctx, ClientAPIFlags.ApiAddr, ClientAPIFlags.ApiToken) if err == nil { return nil } @@ -51,7 +51,7 @@ var WaitApiCmd = &cli.Command{ } func checkAPI(ctx context.Context, addrStr string, token string) error { - lapi, closer, err := GetAPI(ctx, addrStr, token) + lapi, closer, err := GetAPI(ctx) if err != nil { return err } diff --git a/commands/walk.go b/commands/walk.go deleted file mode 100644 index 075c98250..000000000 --- a/commands/walk.go +++ /dev/null @@ -1,178 +0,0 @@ -package commands - -import ( - "fmt" - "os" - "strings" - "time" - - lotuscli "github.com/filecoin-project/lotus/cli" - "github.com/urfave/cli/v2" - "golang.org/x/xerrors" - - "github.com/filecoin-project/lily/chain/actors/builtin" - "github.com/filecoin-project/lily/chain/indexer/tasktype" - "github.com/filecoin-project/lily/lens/lily" - "github.com/filecoin-project/lily/schedule" -) - -type walkOps struct { - from int64 - to int64 - tasks string - window time.Duration - storage string - apiAddr string - apiToken string - name string - workers int - queue string -} - -var walkFlags walkOps - -var WalkCmd = &cli.Command{ - Name: "walk", - Usage: "Start a daemon job to walk a range of the filecoin blockchain.", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "tasks", - Usage: "Comma separated list of tasks to run. Each task is reported separately in the database.", - EnvVars: []string{"LILY_TASKS"}, - Destination: &walkFlags.tasks, - }, - &cli.DurationFlag{ - Name: "window", - Usage: "Duration after which any indexing work not completed will be marked incomplete", - EnvVars: []string{"LILY_WINDOW"}, - Value: builtin.EpochDurationSeconds * time.Second * 10, // walks don't need to complete within a single epoch - Destination: &walkFlags.window, - }, - &cli.Int64Flag{ - Name: "from", - Usage: "Limit actor and message processing to tipsets at or above `HEIGHT`", - EnvVars: []string{"LILY_FROM"}, - Destination: &walkFlags.from, - Required: true, - }, - &cli.Int64Flag{ - Name: "to", - Usage: "Limit actor and message processing to tipsets at or below `HEIGHT`", - EnvVars: []string{"LILY_TO"}, - Destination: &walkFlags.to, - Required: true, - }, - &cli.StringFlag{ - Name: "storage", - Usage: "Name of storage that results will be written to.", - EnvVars: []string{"LILY_STORAGE"}, - Value: "", - Destination: &walkFlags.storage, - }, - &cli.StringFlag{ - Name: "api", - Usage: "Address of lily api in multiaddr format.", - EnvVars: []string{"LILY_API"}, - Value: "/ip4/127.0.0.1/tcp/1234", - Destination: &walkFlags.apiAddr, - }, - &cli.StringFlag{ - Name: "api-token", - Usage: "Authentication token for lily api.", - EnvVars: []string{"LILY_API_TOKEN"}, - Value: "", - Destination: &walkFlags.apiToken, - }, - &cli.StringFlag{ - Name: "name", - Usage: "Name of job for easy identification later.", - EnvVars: []string{"LILY_JOB_NAME"}, - Value: "", - Destination: &walkFlags.name, - }, - &cli.IntFlag{ - Name: "workers", - Usage: "Sets the number of tipsets that may be simultaneous indexed while walking", - EnvVars: []string{"LILY_WALK_WORKERS"}, - Value: 1, - Destination: &walkFlags.workers, - }, - &cli.StringFlag{ - Name: "queue", - Usage: "Name of queue that walked will write tipsets to. If empty the node will walk and index tipsets locally. If populated the node will write tipsets to the queue for tipset-workers to consume", - EnvVars: []string{"LILY_WALK_QUEUE"}, - Value: "", - Destination: &walkFlags.queue, - }, - }, - Before: func(cctx *cli.Context) error { - from, to := walkFlags.from, walkFlags.to - if to < from { - return xerrors.Errorf("value of --to (%d) should be >= --from (%d)", to, from) - } - - return nil - }, - Action: func(cctx *cli.Context) error { - ctx := lotuscli.ReqContext(cctx) - - walkName := fmt.Sprintf("walk_%d", time.Now().Unix()) - if walkFlags.name != "" { - walkName = walkFlags.name - } - - taskList := strings.Split(walkFlags.tasks, ",") - if walkFlags.tasks == "*" { - taskList = tasktype.AllTableTasks - } - - api, closer, err := GetAPI(ctx, walkFlags.apiAddr, walkFlags.apiToken) - if err != nil { - return err - } - defer closer() - - var res *schedule.JobSubmitResult - if walkFlags.queue == "" { - cfg := &lily.LilyWalkConfig{ - Name: walkName, - Tasks: taskList, - Window: walkFlags.window, - From: walkFlags.from, - To: walkFlags.to, - RestartDelay: 0, - RestartOnCompletion: false, - RestartOnFailure: false, - Storage: walkFlags.storage, - Workers: walkFlags.workers, - } - - res, err = api.LilyWalk(ctx, cfg) - if err != nil { - return err - } - } else { - cfg := &lily.LilyWalkNotifyConfig{ - Name: walkName, - Tasks: taskList, - From: walkFlags.from, - To: walkFlags.to, - RestartDelay: 0, - RestartOnCompletion: false, - RestartOnFailure: false, - Queue: walkFlags.queue, - } - - res, err = api.LilyWalkNotify(ctx, cfg) - if err != nil { - return err - } - } - - if err := printNewJob(os.Stdout, res); err != nil { - return err - } - - return nil - }, -} diff --git a/commands/watch.go b/commands/watch.go deleted file mode 100644 index daad55c9b..000000000 --- a/commands/watch.go +++ /dev/null @@ -1,168 +0,0 @@ -package commands - -import ( - "fmt" - "os" - "strings" - "time" - - lotuscli "github.com/filecoin-project/lotus/cli" - "github.com/filecoin-project/specs-actors/actors/builtin" - "github.com/urfave/cli/v2" - - "github.com/filecoin-project/lily/chain/indexer/tasktype" - "github.com/filecoin-project/lily/lens/lily" - "github.com/filecoin-project/lily/schedule" -) - -type watchOps struct { - confidence int - tasks string - window time.Duration - storage string - apiAddr string - apiToken string - name string - workers int - bufferSize int - queue string -} - -var watchFlags watchOps - -var WatchCmd = &cli.Command{ - Name: "watch", - Usage: "Start a daemon job to watch the head of the filecoin blockchain.", - Flags: []cli.Flag{ - &cli.IntFlag{ - Name: "confidence", - Usage: "Sets the size of the cache used to hold tipsets for possible reversion before being committed to the database", - EnvVars: []string{"LILY_CONFIDENCE"}, - Value: 2, - Destination: &watchFlags.confidence, - }, - &cli.IntFlag{ - Name: "workers", - Usage: "Sets the number of tipsets that may be simultaneous indexed while watching", - EnvVars: []string{"LILY_WATCH_WORKERS"}, - Value: 2, - Destination: &watchFlags.workers, - }, - &cli.IntFlag{ - Name: "buffer-size", - Usage: "Set the number of tipsets the watcher will buffer while waiting for a worker to accept the work", - EnvVars: []string{"LILY_WATCH_BUFFER"}, - Value: 5, - Destination: &watchFlags.bufferSize, - }, - &cli.StringFlag{ - Name: "tasks", - Usage: "Comma separated list of tasks to run. Each task is reported separately in the database.", - EnvVars: []string{"LILY_TASKS"}, - Destination: &watchFlags.tasks, - }, - &cli.DurationFlag{ - Name: "window", - Usage: "Duration after which any indexing work not completed will be marked incomplete", - EnvVars: []string{"LILY_WINDOW"}, - Value: builtin.EpochDurationSeconds * time.Second, - Destination: &watchFlags.window, - }, - &cli.StringFlag{ - Name: "storage", - Usage: "Name of storage that results will be written to.", - EnvVars: []string{"LILY_STORAGE"}, - Value: "", - Destination: &watchFlags.storage, - }, - &cli.StringFlag{ - Name: "api", - Usage: "Address of lily api in multiaddr format.", - EnvVars: []string{"LILY_API"}, - Value: "/ip4/127.0.0.1/tcp/1234", - Destination: &watchFlags.apiAddr, - }, - &cli.StringFlag{ - Name: "api-token", - Usage: "Authentication token for lily api.", - EnvVars: []string{"LILY_API_TOKEN"}, - Value: "", - Destination: &watchFlags.apiToken, - }, - &cli.StringFlag{ - Name: "name", - Usage: "Name of job for easy identification later.", - EnvVars: []string{"LILY_JOB_NAME"}, - Value: "", - Destination: &watchFlags.name, - }, - &cli.StringFlag{ - Name: "queue", - Usage: "Name of queue that watcher will write tipsets to. If empty the node will watch and index tipsets locally. If populated the node will write tipsets to the queue for tipset-workers to consume", - EnvVars: []string{"LILY_WATCH_QUEUE"}, - Value: "", - Destination: &watchFlags.queue, - }, - }, - Action: func(cctx *cli.Context) error { - ctx := lotuscli.ReqContext(cctx) - - watchName := fmt.Sprintf("watch_%d", time.Now().Unix()) - if watchFlags.name != "" { - watchName = watchFlags.name - } - - taskList := strings.Split(watchFlags.tasks, ",") - if watchFlags.tasks == "*" { - taskList = tasktype.AllTableTasks - } - - api, closer, err := GetAPI(ctx, watchFlags.apiAddr, watchFlags.apiToken) - if err != nil { - return err - } - defer closer() - - var res *schedule.JobSubmitResult - if watchFlags.queue == "" { - cfg := &lily.LilyWatchConfig{ - Name: watchName, - Tasks: taskList, - Window: watchFlags.window, - Confidence: watchFlags.confidence, - RestartDelay: 0, - RestartOnCompletion: false, - RestartOnFailure: true, - Storage: watchFlags.storage, - Workers: watchFlags.workers, - BufferSize: watchFlags.bufferSize, - } - - res, err = api.LilyWatch(ctx, cfg) - if err != nil { - return err - } - } else { - cfg := &lily.LilyWatchNotifyConfig{ - Name: watchName, - Tasks: taskList, - Confidence: watchFlags.confidence, - RestartDelay: 0, - RestartOnCompletion: false, - RestartOnFailure: true, - BufferSize: watchFlags.bufferSize, - Queue: watchFlags.queue, - } - - res, err = api.LilyWatchNotify(ctx, cfg) - if err != nil { - return err - } - } - if err := printNewJob(os.Stdout, res); err != nil { - return err - } - - return nil - }, -} diff --git a/commands/worker.go b/commands/worker.go deleted file mode 100644 index 4afc77a0d..000000000 --- a/commands/worker.go +++ /dev/null @@ -1,98 +0,0 @@ -package commands - -import ( - "os" - - lotuscli "github.com/filecoin-project/lotus/cli" - "github.com/urfave/cli/v2" - - "github.com/filecoin-project/lily/lens/lily" -) - -var WorkerCmd = &cli.Command{ - Name: "worker-start", - Subcommands: []*cli.Command{ - TipSetWorkerCmd, - }, -} - -var tipsetWorkerFlags struct { - queue string - name string - storage string - concurrency int -} - -var TipSetWorkerCmd = &cli.Command{ - Name: "tipset-processor", - Flags: flagSet( - clientAPIFlagSet, - []cli.Flag{ - &cli.IntFlag{ - Name: "concurrency", - Usage: "Concurrency sets the maximum number of concurrent processing of tasks. If set to a zero or negative value it will be set to the number of CPUs usable by the current process.", - Value: 1, - Destination: &tipsetWorkerFlags.concurrency, - }, - &cli.StringFlag{ - Name: "storage", - Usage: "Name of storage that results will be written to.", - EnvVars: []string{"LILY_STORAGE"}, - Value: "", - Destination: &tipsetWorkerFlags.storage, - }, - &cli.StringFlag{ - Name: "name", - Usage: "Name of job for easy identification later.", - EnvVars: []string{"LILY_JOB_NAME"}, - Value: "", - Destination: &tipsetWorkerFlags.name, - }, - &cli.StringFlag{ - Name: "queue", - Usage: "Name of queue worker will consume work from.", - EnvVars: []string{"LILY_TSWORKER_QUEUE"}, - Value: "", - Destination: &tipsetWorkerFlags.queue, - }, - }, - ), - Action: func(cctx *cli.Context) error { - ctx := lotuscli.ReqContext(cctx) - - api, closer, err := GetAPI(ctx, clientAPIFlags.apiAddr, clientAPIFlags.apiToken) - if err != nil { - return err - } - defer closer() - - if tipsetWorkerFlags.name == "" { - id, err := api.ID(ctx) - if err != nil { - return err - } - tipsetWorkerFlags.name = id.ShortString() - } - - cfg := &lily.LilyTipSetWorkerConfig{ - Concurrency: tipsetWorkerFlags.concurrency, - Storage: tipsetWorkerFlags.storage, - Name: tipsetWorkerFlags.name, - RestartOnFailure: true, - RestartOnCompletion: false, - RestartDelay: 0, - Queue: tipsetWorkerFlags.queue, - } - - res, err := api.StartTipSetWorker(ctx, cfg) - if err != nil { - return err - } - - if err := printNewJob(os.Stdout, res); err != nil { - return err - } - - return nil - }, -} diff --git a/lens/lily/api.go b/lens/lily/api.go index 5ca37ac46..023a6e226 100644 --- a/lens/lily/api.go +++ b/lens/lily/api.go @@ -72,126 +72,97 @@ type LilyAPI interface { StartTipSetWorker(ctx context.Context, cfg *LilyTipSetWorkerConfig) (*schedule.JobSubmitResult, error) } - -type LilyIndexConfig struct { - TipSet types.TipSetKey - Name string - Tasks []string - Storage string // name of storage system to use, may be empty - Window time.Duration -} - -type LilyIndexNotifyConfig struct { - TipSet types.TipSetKey - Name string - Tasks []string - Queue string +type LilyJobConfig struct { + // Name is the name of the job. + Name string + // Tasks are executed by the job. + Tasks []string + // Window after which if an execution of the job is not complete it will be canceled. + Window time.Duration + // RestartOnFailure when true will restart the job if it encounters an error. + RestartOnFailure bool + // RestartOnCompletion when true will restart the job when it completes. + RestartOnCompletion bool + // RestartDelay configures how long to wait before restarting the job. + RestartDelay time.Duration + // Storage is the name of the storage system the job will use, may be empty. + Storage string } type LilyWatchConfig struct { - Name string - Tasks []string - Window time.Duration - Confidence int - RestartOnFailure bool - RestartOnCompletion bool - RestartDelay time.Duration - Storage string // name of storage system to use, may be empty - Workers int // number of indexing jobs that can run in parallel - BufferSize int // number of tipsets to buffer from notifier service + JobConfig LilyJobConfig + + BufferSize int // number of tipsets to buffer from notifier service + Confidence int + Workers int // number of indexing jobs that can run in parallel } type LilyWatchNotifyConfig struct { - Name string - Tasks []string - Confidence int - RestartOnFailure bool - RestartOnCompletion bool - RestartDelay time.Duration - BufferSize int // number of tipsets to buffer from notifier service - Queue string + JobConfig LilyJobConfig + + BufferSize int // number of tipsets to buffer from notifier service + Confidence int + Queue string } type LilyWalkConfig struct { - From int64 - To int64 - Name string - Tasks []string - Window time.Duration - RestartOnFailure bool - RestartOnCompletion bool - RestartDelay time.Duration - Storage string // name of storage system to use, may be empty - Workers int // number of indexing jobs that can run in parallel + JobConfig LilyJobConfig + + From int64 + To int64 } type LilyWalkNotifyConfig struct { - From int64 - To int64 - Name string - Tasks []string - RestartOnFailure bool - RestartOnCompletion bool - RestartDelay time.Duration - Queue string + WalkConfig LilyWalkConfig + + Queue string } type LilyGapFindConfig struct { - RestartOnFailure bool - RestartOnCompletion bool - RestartDelay time.Duration - Storage string // name of storage system to use, cannot be empty and must be Database storage. - Name string - To uint64 - From uint64 - Tasks []string // name of tasks to fill gaps for + JobConfig LilyJobConfig + + To int64 + From int64 } type LilyGapFillConfig struct { - RestartOnFailure bool - RestartOnCompletion bool - RestartDelay time.Duration - Storage string // name of storage system to use, cannot be empty and must be Database storage. - Name string - To uint64 - From uint64 - Tasks []string // name of tasks to fill gaps for + JobConfig LilyJobConfig + + To int64 + From int64 } type LilyGapFillNotifyConfig struct { - RestartOnFailure bool - RestartOnCompletion bool - RestartDelay time.Duration - Storage string // name of storage system to use, cannot be empty and must be Database storage. - Name string - To uint64 - From uint64 - Tasks []string // name of tasks to fill gaps for - Queue string -} + GapFillConfig LilyGapFillConfig -type LilySurveyConfig struct { - Name string - Tasks []string - Interval time.Duration - RestartOnFailure bool - RestartOnCompletion bool - RestartDelay time.Duration - Storage string // name of storage system to use, may be empty + Queue string } type LilyTipSetWorkerConfig struct { - Queue string + JobConfig LilyJobConfig + // Queue is the name of the queueing system the worker will consume work from. + Queue string // Concurrency sets the maximum number of concurrent processing of tasks. // If set to a zero or negative value, NewServer will overwrite the value // to the number of CPUs usable by the current process. Concurrency int - // Storage sets the name of storage system to use, may be empty - Storage string - // Name sets the job name - Name string - RestartOnFailure bool - RestartOnCompletion bool - RestartDelay time.Duration +} + +type LilySurveyConfig struct { + JobConfig LilyJobConfig + + Interval time.Duration +} + +type LilyIndexConfig struct { + JobConfig LilyJobConfig + + TipSet types.TipSetKey +} + +type LilyIndexNotifyConfig struct { + IndexConfig LilyIndexConfig + + Queue string } diff --git a/lens/lily/impl.go b/lens/lily/impl.go index f9a8a0094..13236e584 100644 --- a/lens/lily/impl.go +++ b/lens/lily/impl.go @@ -75,13 +75,13 @@ func (m *LilyNodeAPI) ChainGetTipSetAfterHeight(ctx context.Context, epoch abi.C func (m *LilyNodeAPI) StartTipSetWorker(_ context.Context, cfg *LilyTipSetWorkerConfig) (*schedule.JobSubmitResult, error) { ctx := context.Background() - log.Infow("starting TipSetWorker", "name", cfg.Name) + log.Infow("starting TipSetWorker", "name", cfg.JobConfig.Name) md := storage.Metadata{ - JobName: cfg.Name, + JobName: cfg.JobConfig.Name, } // create a database connection for this watch, ensure its pingable, and run migrations if needed/configured to. - strg, err := m.StorageCatalog.Connect(ctx, cfg.Storage, md) + strg, err := m.StorageCatalog.Connect(ctx, cfg.JobConfig.Storage, md) if err != nil { return nil, err } @@ -96,41 +96,41 @@ func (m *LilyNodeAPI) StartTipSetWorker(_ context.Context, cfg *LilyTipSetWorker return nil, err } - im, err := integrated.NewManager(taskAPI, strg, cfg.Name) + im, err := integrated.NewManager(taskAPI, strg, cfg.JobConfig.Name) if err != nil { return nil, err } - db, err := m.StorageCatalog.ConnectAsDatabase(ctx, cfg.Storage, md) + db, err := m.StorageCatalog.ConnectAsDatabase(ctx, cfg.JobConfig.Storage, md) if err != nil { return nil, err } res := m.Scheduler.Submit(&schedule.JobConfig{ - Name: cfg.Name, + Name: cfg.JobConfig.Name, Type: "tipset-worker", Params: map[string]string{ "queue": cfg.Queue, - "storage": cfg.Storage, + "storage": cfg.JobConfig.Storage, "concurrency": strconv.Itoa(cfg.Concurrency), }, - Job: queue.NewAsynqWorker(im, db, cfg.Name, 1, qcfg), - RestartOnFailure: cfg.RestartOnFailure, - RestartOnCompletion: cfg.RestartOnCompletion, - RestartDelay: cfg.RestartDelay, + Job: queue.NewAsynqWorker(im, db, cfg.JobConfig.Name, 1, qcfg), + RestartOnFailure: cfg.JobConfig.RestartOnFailure, + RestartOnCompletion: cfg.JobConfig.RestartOnCompletion, + RestartDelay: cfg.JobConfig.RestartDelay, }) return res, nil } func (m *LilyNodeAPI) LilyIndex(_ context.Context, cfg *LilyIndexConfig) (interface{}, error) { md := storage.Metadata{ - JobName: cfg.Name, + JobName: cfg.JobConfig.Name, } // the context's passed to these methods live for the duration of the clients request, so make a new one. ctx := context.Background() // create a database connection for this watch, ensure its pingable, and run migrations if needed/configured to. - strg, err := m.StorageCatalog.Connect(ctx, cfg.Storage, md) + strg, err := m.StorageCatalog.Connect(ctx, cfg.JobConfig.Storage, md) if err != nil { return nil, err } @@ -141,7 +141,7 @@ func (m *LilyNodeAPI) LilyIndex(_ context.Context, cfg *LilyIndexConfig) (interf } // instantiate an indexer to extract block, message, and actor state data from observed tipsets and persists it to the storage. - im, err := integrated.NewManager(taskAPI, strg, cfg.Name, integrated.WithWindow(cfg.Window)) + im, err := integrated.NewManager(taskAPI, strg, cfg.JobConfig.Name, integrated.WithWindow(cfg.JobConfig.Window)) if err != nil { return nil, err } @@ -151,7 +151,7 @@ func (m *LilyNodeAPI) LilyIndex(_ context.Context, cfg *LilyIndexConfig) (interf return nil, err } - success, err := im.TipSet(ctx, ts, indexer.WithTasks(cfg.Tasks)) + success, err := im.TipSet(ctx, ts, indexer.WithTasks(cfg.JobConfig.Tasks)) return success, err } @@ -165,14 +165,14 @@ func (m *LilyNodeAPI) LilyIndexNotify(_ context.Context, cfg *LilyIndexNotifyCon return nil, err } - ts, err := m.ChainGetTipSet(ctx, cfg.TipSet) + ts, err := m.ChainGetTipSet(ctx, cfg.IndexConfig.TipSet) if err != nil { return nil, err } idx := distributed.NewTipSetIndexer(queue.NewAsynq(qcfg)) - return idx.TipSet(ctx, ts, indexer.WithIndexerType(indexer.Index), indexer.WithTasks(cfg.Tasks)) + return idx.TipSet(ctx, ts, indexer.WithIndexerType(indexer.Index), indexer.WithTasks(cfg.IndexConfig.JobConfig.Tasks)) } type watcherAPIWrapper struct { @@ -185,7 +185,7 @@ func (m *LilyNodeAPI) LilyWatch(_ context.Context, cfg *LilyWatchConfig) (*sched ctx := context.Background() md := storage.Metadata{ - JobName: cfg.Name, + JobName: cfg.JobConfig.Name, } wapi := &watcherAPIWrapper{ @@ -194,7 +194,7 @@ func (m *LilyNodeAPI) LilyWatch(_ context.Context, cfg *LilyWatchConfig) (*sched } // create a database connection for this watch, ensure its pingable, and run migrations if needed/configured to. - strg, err := m.StorageCatalog.Connect(ctx, cfg.Storage, md) + strg, err := m.StorageCatalog.Connect(ctx, cfg.JobConfig.Storage, md) if err != nil { return nil, err } @@ -205,33 +205,33 @@ func (m *LilyNodeAPI) LilyWatch(_ context.Context, cfg *LilyWatchConfig) (*sched } // instantiate an indexer to extract block, message, and actor state data from observed tipsets and persists it to the storage. - idx, err := integrated.NewManager(taskAPI, strg, cfg.Name, integrated.WithWindow(cfg.Window)) + idx, err := integrated.NewManager(taskAPI, strg, cfg.JobConfig.Name, integrated.WithWindow(cfg.JobConfig.Window)) if err != nil { return nil, err } - watchJob := watch.NewWatcher(wapi, idx, cfg.Name, - watch.WithTasks(cfg.Tasks...), + watchJob := watch.NewWatcher(wapi, idx, cfg.JobConfig.Name, + watch.WithTasks(cfg.JobConfig.Tasks...), watch.WithConfidence(cfg.Confidence), watch.WithConcurrentWorkers(cfg.Workers), watch.WithBufferSize(cfg.BufferSize), ) res := m.Scheduler.Submit(&schedule.JobConfig{ - Name: cfg.Name, + Name: cfg.JobConfig.Name, Type: "watch", Params: map[string]string{ - "window": cfg.Window.String(), - "storage": cfg.Storage, + "window": cfg.JobConfig.Window.String(), + "storage": cfg.JobConfig.Storage, "confidence": strconv.Itoa(cfg.Confidence), "worker": strconv.Itoa(cfg.Workers), "buffer": strconv.Itoa(cfg.BufferSize), }, - Tasks: cfg.Tasks, + Tasks: cfg.JobConfig.Tasks, Job: watchJob, - RestartOnFailure: cfg.RestartOnFailure, - RestartOnCompletion: cfg.RestartOnCompletion, - RestartDelay: cfg.RestartDelay, + RestartOnFailure: cfg.JobConfig.RestartOnFailure, + RestartOnCompletion: cfg.JobConfig.RestartOnCompletion, + RestartDelay: cfg.JobConfig.RestartDelay, }) return res, nil @@ -249,25 +249,25 @@ func (m *LilyNodeAPI) LilyWatchNotify(_ context.Context, cfg *LilyWatchNotifyCon } idx := distributed.NewTipSetIndexer(queue.NewAsynq(qcfg)) - watchJob := watch.NewWatcher(wapi, idx, cfg.Name, - watch.WithTasks(cfg.Tasks...), + watchJob := watch.NewWatcher(wapi, idx, cfg.JobConfig.Name, + watch.WithTasks(cfg.JobConfig.Tasks...), watch.WithConfidence(cfg.Confidence), watch.WithBufferSize(cfg.BufferSize), ) res := m.Scheduler.Submit(&schedule.JobConfig{ - Name: cfg.Name, + Name: cfg.JobConfig.Name, Type: "watch-notify", Params: map[string]string{ "confidence": strconv.Itoa(cfg.Confidence), "buffer": strconv.Itoa(cfg.BufferSize), "queue": cfg.Queue, }, - Tasks: cfg.Tasks, + Tasks: cfg.JobConfig.Tasks, Job: watchJob, - RestartOnFailure: cfg.RestartOnFailure, - RestartOnCompletion: cfg.RestartOnCompletion, - RestartDelay: cfg.RestartDelay, + RestartOnFailure: cfg.JobConfig.RestartOnFailure, + RestartOnCompletion: cfg.JobConfig.RestartOnCompletion, + RestartDelay: cfg.JobConfig.RestartDelay, }) return res, err @@ -278,11 +278,11 @@ func (m *LilyNodeAPI) LilyWalk(_ context.Context, cfg *LilyWalkConfig) (*schedul ctx := context.Background() md := storage.Metadata{ - JobName: cfg.Name, + JobName: cfg.JobConfig.Name, } // create a database connection for this watch, ensure its pingable, and run migrations if needed/configured to. - strg, err := m.StorageCatalog.Connect(ctx, cfg.Storage, md) + strg, err := m.StorageCatalog.Connect(ctx, cfg.JobConfig.Storage, md) if err != nil { return nil, err } @@ -293,25 +293,25 @@ func (m *LilyNodeAPI) LilyWalk(_ context.Context, cfg *LilyWalkConfig) (*schedul } // instantiate an indexer to extract block, message, and actor state data from observed tipsets and persists it to the storage. - idx, err := integrated.NewManager(taskAPI, strg, cfg.Name, integrated.WithWindow(cfg.Window)) + idx, err := integrated.NewManager(taskAPI, strg, cfg.JobConfig.Name, integrated.WithWindow(cfg.JobConfig.Window)) if err != nil { return nil, err } res := m.Scheduler.Submit(&schedule.JobConfig{ - Name: cfg.Name, + Name: cfg.JobConfig.Name, Type: "walk", Params: map[string]string{ - "window": cfg.Window.String(), + "window": cfg.JobConfig.Window.String(), "minHeight": fmt.Sprintf("%d", cfg.From), "maxHeight": fmt.Sprintf("%d", cfg.To), - "storage": cfg.Storage, + "storage": cfg.JobConfig.Storage, }, - Tasks: cfg.Tasks, - Job: walk.NewWalker(idx, m, cfg.Name, cfg.Tasks, cfg.From, cfg.To), - RestartOnFailure: cfg.RestartOnFailure, - RestartOnCompletion: cfg.RestartOnCompletion, - RestartDelay: cfg.RestartDelay, + Tasks: cfg.JobConfig.Tasks, + Job: walk.NewWalker(idx, m, cfg.JobConfig.Name, cfg.JobConfig.Tasks, cfg.From, cfg.To), + RestartOnFailure: cfg.JobConfig.RestartOnFailure, + RestartOnCompletion: cfg.JobConfig.RestartOnCompletion, + RestartDelay: cfg.JobConfig.RestartDelay, }) return res, nil @@ -325,18 +325,18 @@ func (m *LilyNodeAPI) LilyWalkNotify(_ context.Context, cfg *LilyWalkNotifyConfi idx := distributed.NewTipSetIndexer(queue.NewAsynq(qcfg)) res := m.Scheduler.Submit(&schedule.JobConfig{ - Name: cfg.Name, + Name: cfg.WalkConfig.JobConfig.Name, Type: "walk-notify", Params: map[string]string{ - "minHeight": fmt.Sprintf("%d", cfg.From), - "maxHeight": fmt.Sprintf("%d", cfg.To), + "minHeight": fmt.Sprintf("%d", cfg.WalkConfig.From), + "maxHeight": fmt.Sprintf("%d", cfg.WalkConfig.To), "queue": cfg.Queue, }, - Tasks: cfg.Tasks, - Job: walk.NewWalker(idx, m, cfg.Name, cfg.Tasks, cfg.From, cfg.To), - RestartOnFailure: cfg.RestartOnFailure, - RestartOnCompletion: cfg.RestartOnCompletion, - RestartDelay: cfg.RestartDelay, + Tasks: cfg.WalkConfig.JobConfig.Tasks, + Job: walk.NewWalker(idx, m, cfg.WalkConfig.JobConfig.Name, cfg.WalkConfig.JobConfig.Tasks, cfg.WalkConfig.From, cfg.WalkConfig.To), + RestartOnFailure: cfg.WalkConfig.JobConfig.RestartOnFailure, + RestartOnCompletion: cfg.WalkConfig.JobConfig.RestartOnCompletion, + RestartDelay: cfg.WalkConfig.JobConfig.RestartDelay, }) return res, nil @@ -347,28 +347,28 @@ func (m *LilyNodeAPI) LilyGapFind(_ context.Context, cfg *LilyGapFindConfig) (*s ctx := context.Background() md := storage.Metadata{ - JobName: cfg.Name, + JobName: cfg.JobConfig.Name, } // create a database connection for this watch, ensure its pingable, and run migrations if needed/configured to. - db, err := m.StorageCatalog.ConnectAsDatabase(ctx, cfg.Storage, md) + db, err := m.StorageCatalog.ConnectAsDatabase(ctx, cfg.JobConfig.Storage, md) if err != nil { return nil, err } res := m.Scheduler.Submit(&schedule.JobConfig{ - Name: cfg.Name, + Name: cfg.JobConfig.Name, Type: "find", - Tasks: cfg.Tasks, + Tasks: cfg.JobConfig.Tasks, Params: map[string]string{ "minHeight": fmt.Sprintf("%d", cfg.From), "maxHeight": fmt.Sprintf("%d", cfg.To), - "storage": cfg.Storage, + "storage": cfg.JobConfig.Storage, }, - Job: gap.NewFinder(m, db, cfg.Name, cfg.From, cfg.To, cfg.Tasks), - RestartOnFailure: cfg.RestartOnFailure, - RestartOnCompletion: cfg.RestartOnCompletion, - RestartDelay: cfg.RestartDelay, + Job: gap.NewFinder(m, db, cfg.JobConfig.Name, cfg.From, cfg.To, cfg.JobConfig.Tasks), + RestartOnFailure: cfg.JobConfig.RestartOnFailure, + RestartOnCompletion: cfg.JobConfig.RestartOnCompletion, + RestartDelay: cfg.JobConfig.RestartDelay, }) return res, nil @@ -379,28 +379,28 @@ func (m *LilyNodeAPI) LilyGapFill(_ context.Context, cfg *LilyGapFillConfig) (*s ctx := context.Background() md := storage.Metadata{ - JobName: cfg.Name, + JobName: cfg.JobConfig.Name, } // create a database connection for this watch, ensure its pingable, and run migrations if needed/configured to. - db, err := m.StorageCatalog.ConnectAsDatabase(ctx, cfg.Storage, md) + db, err := m.StorageCatalog.ConnectAsDatabase(ctx, cfg.JobConfig.Storage, md) if err != nil { return nil, err } res := m.Scheduler.Submit(&schedule.JobConfig{ - Name: cfg.Name, + Name: cfg.JobConfig.Name, Type: "fill", Params: map[string]string{ "minHeight": fmt.Sprintf("%d", cfg.From), "maxHeight": fmt.Sprintf("%d", cfg.To), - "storage": cfg.Storage, + "storage": cfg.JobConfig.Storage, }, - Tasks: cfg.Tasks, - Job: gap.NewFiller(m, db, cfg.Name, cfg.From, cfg.To, cfg.Tasks), - RestartOnFailure: cfg.RestartOnFailure, - RestartOnCompletion: cfg.RestartOnCompletion, - RestartDelay: cfg.RestartDelay, + Tasks: cfg.JobConfig.Tasks, + Job: gap.NewFiller(m, db, cfg.JobConfig.Name, cfg.From, cfg.To, cfg.JobConfig.Tasks), + RestartOnFailure: cfg.JobConfig.RestartOnFailure, + RestartOnCompletion: cfg.JobConfig.RestartOnCompletion, + RestartDelay: cfg.JobConfig.RestartDelay, }) return res, nil @@ -411,7 +411,7 @@ func (m *LilyNodeAPI) LilyGapFillNotify(_ context.Context, cfg *LilyGapFillNotif ctx := context.Background() md := storage.Metadata{ - JobName: cfg.Name, + JobName: cfg.GapFillConfig.JobConfig.Name, } qcfg, err := m.QueueCatalog.AsynqConfig(cfg.Queue) @@ -420,24 +420,24 @@ func (m *LilyNodeAPI) LilyGapFillNotify(_ context.Context, cfg *LilyGapFillNotif } // create a database connection for this watch, ensure its pingable, and run migrations if needed/configured to. - db, err := m.StorageCatalog.ConnectAsDatabase(ctx, cfg.Storage, md) + db, err := m.StorageCatalog.ConnectAsDatabase(ctx, cfg.GapFillConfig.JobConfig.Storage, md) if err != nil { return nil, err } res := m.Scheduler.Submit(&schedule.JobConfig{ - Name: cfg.Name, + Name: cfg.GapFillConfig.JobConfig.Name, Type: "fill-notify", Params: map[string]string{ - "minHeight": fmt.Sprintf("%d", cfg.From), - "maxHeight": fmt.Sprintf("%d", cfg.To), - "storage": cfg.Storage, + "minHeight": fmt.Sprintf("%d", cfg.GapFillConfig.From), + "maxHeight": fmt.Sprintf("%d", cfg.GapFillConfig.To), + "storage": cfg.GapFillConfig.JobConfig.Storage, "queue": cfg.Queue, }, - Tasks: cfg.Tasks, - Job: gap.NewNotifier(m, db, queue.NewAsynq(qcfg), cfg.Name, cfg.From, cfg.To, cfg.Tasks), - RestartOnFailure: cfg.RestartOnFailure, - RestartOnCompletion: cfg.RestartOnCompletion, - RestartDelay: cfg.RestartDelay, + Tasks: cfg.GapFillConfig.JobConfig.Tasks, + Job: gap.NewNotifier(m, db, queue.NewAsynq(qcfg), cfg.GapFillConfig.JobConfig.Name, cfg.GapFillConfig.From, cfg.GapFillConfig.To, cfg.GapFillConfig.JobConfig.Tasks), + RestartOnFailure: cfg.GapFillConfig.JobConfig.RestartOnFailure, + RestartOnCompletion: cfg.GapFillConfig.JobConfig.RestartOnCompletion, + RestartDelay: cfg.GapFillConfig.JobConfig.RestartDelay, }) return res, nil @@ -587,27 +587,27 @@ func (m *LilyNodeAPI) LilySurvey(_ context.Context, cfg *LilySurveyConfig) (*sch ctx := context.Background() // create a database connection for this watch, ensure its pingable, and run migrations if needed/configured to. - strg, err := m.StorageCatalog.Connect(ctx, cfg.Storage, storage.Metadata{JobName: cfg.Name}) + strg, err := m.StorageCatalog.Connect(ctx, cfg.JobConfig.Storage, storage.Metadata{JobName: cfg.JobConfig.Name}) if err != nil { return nil, err } // instantiate a new surveyer. - surv, err := network.NewSurveyer(m, strg, cfg.Interval, cfg.Name, cfg.Tasks) + surv, err := network.NewSurveyer(m, strg, cfg.Interval, cfg.JobConfig.Name, cfg.JobConfig.Tasks) if err != nil { return nil, err } res := m.Scheduler.Submit(&schedule.JobConfig{ - Name: cfg.Name, - Tasks: cfg.Tasks, + Name: cfg.JobConfig.Name, + Tasks: cfg.JobConfig.Tasks, Job: surv, Params: map[string]string{ "interval": cfg.Interval.String(), }, - RestartOnFailure: cfg.RestartOnFailure, - RestartOnCompletion: cfg.RestartOnCompletion, - RestartDelay: cfg.RestartDelay, + RestartOnFailure: cfg.JobConfig.RestartOnFailure, + RestartOnCompletion: cfg.JobConfig.RestartOnCompletion, + RestartDelay: cfg.JobConfig.RestartDelay, }) return res, nil diff --git a/main.go b/main.go index 6a95888c2..5f9060bb4 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ import ( "github.com/urfave/cli/v2" "github.com/filecoin-project/lily/commands" + "github.com/filecoin-project/lily/commands/job" "github.com/filecoin-project/lily/version" ) @@ -69,13 +70,15 @@ func main() { }) } - cli.AppHelpTemplate = commands.AppHelpTemplate + //cli.AppHelpTemplate = commands.AppHelpTemplate app := &cli.App{ Name: "lily", Usage: "a tool for capturing on-chain state from the filecoin network", Version: fmt.Sprintf("VisorVersion: \t%s\n NewestNetworkVersion: \t%d\n GenesisFile: \t%s\n DevNet: \t%t\n UserVersion: \t%s\n UpgradeSchedule: \n%s", version.String(), build.NewestNetworkVersion, build.GenesisFile, build.Devnet, build.UserVersion(), up.String()), Flags: []cli.Flag{ + commands.ClientAPIFlag, + commands.ClientTokenFlag, &cli.StringFlag{ Name: "log-level", EnvVars: []string{"GOLOG_LOG_LEVEL"}, @@ -153,27 +156,21 @@ func main() { Destination: &commands.VisorMetricFlags.RedisDB, }, }, - HideHelp: true, + HideHelp: false, Metadata: commands.Metadata(), Commands: []*cli.Command{ commands.ChainCmd, commands.DaemonCmd, commands.ExportChainCmd, - commands.GapCmd, commands.HelpCmd, - commands.IndexCmd, commands.InitCmd, - commands.JobCmd, commands.LogCmd, commands.MigrateCmd, commands.NetCmd, - commands.SurveyCmd, commands.StopCmd, commands.SyncCmd, commands.WaitApiCmd, - commands.WalkCmd, - commands.WatchCmd, - commands.WorkerCmd, + job.JobCmd, }, } app.Setup() diff --git a/storage/sql.go b/storage/sql.go index 695e2a03e..24f05b60c 100644 --- a/storage/sql.go +++ b/storage/sql.go @@ -492,7 +492,7 @@ func GenerateUpsertStrings(model interface{}) (string, string) { } // returns a map of heights to missing tasks, and a list of heights to iterate the map in order with. -func (d *Database) ConsolidateGaps(ctx context.Context, minHeight, maxHeight uint64, tasks ...string) (map[int64][]string, []int64, error) { +func (d *Database) ConsolidateGaps(ctx context.Context, minHeight, maxHeight int64, tasks ...string) (map[int64][]string, []int64, error) { gaps, err := d.QueryGaps(ctx, minHeight, maxHeight, tasks...) if err != nil { return nil, nil, err @@ -512,7 +512,7 @@ func (d *Database) ConsolidateGaps(ctx context.Context, minHeight, maxHeight uin return out, heights, nil } -func (d *Database) QueryGaps(ctx context.Context, minHeight, maxHeight uint64, tasks ...string) ([]*visor.GapReport, error) { +func (d *Database) QueryGaps(ctx context.Context, minHeight, maxHeight int64, tasks ...string) ([]*visor.GapReport, error) { var out []*visor.GapReport if len(tasks) != 0 { if err := d.AsORM().ModelContext(ctx, &out). From 051fc5e9e145b6f2db156e14514e15c4ddffc31f Mon Sep 17 00:00:00 2001 From: frrist Date: Wed, 27 Apr 2022 11:51:06 -0700 Subject: [PATCH 02/10] fix tests --- chain/gap/find_test.go | 6 ++--- commands/job/{gap.go => fill.go} | 31 ----------------------- commands/job/find.go | 42 ++++++++++++++++++++++++++++++++ commands/job/job.go | 2 +- itests/builder.go | 22 +++++++++-------- 5 files changed, 58 insertions(+), 45 deletions(-) rename commands/job/{gap.go => fill.go} (70%) create mode 100644 commands/job/find.go diff --git a/chain/gap/find_test.go b/chain/gap/find_test.go index e5f0143fa..e85b9d35b 100644 --- a/chain/gap/find_test.go +++ b/chain/gap/find_test.go @@ -19,8 +19,8 @@ import ( ) var ( - minHeight = uint64(0) - maxHeight = uint64(10) + minHeight = int64(0) + maxHeight = int64(10) ) func TestFind(t *testing.T) { @@ -459,7 +459,7 @@ func (e *PREditor) truncate() { require.NoError(e.t, err, "visor_processing_report") } -func (e *PREditor) initialize(count uint64, tasks ...string) { +func (e *PREditor) initialize(count int64, tasks ...string) { // build the task array // uncomment to see all query // db.AddQueryHook(&LoggingQueryHook{}) diff --git a/commands/job/gap.go b/commands/job/fill.go similarity index 70% rename from commands/job/gap.go rename to commands/job/fill.go index eec2e2286..b3b7e2597 100644 --- a/commands/job/gap.go +++ b/commands/job/fill.go @@ -10,37 +10,6 @@ import ( "github.com/filecoin-project/lily/lens/lily" ) -var GapFindCmd = &cli.Command{ - Name: "find", - Usage: "find gaps in the database", - Flags: []cli.Flag{ - RangeFromFlag, - RangeToFlag, - }, - Before: func(cctx *cli.Context) error { - return rangeFlags.validate() - }, - Action: func(cctx *cli.Context) error { - ctx := lotuscli.ReqContext(cctx) - - api, closer, err := commands.GetAPI(ctx) - if err != nil { - return err - } - defer closer() - - res, err := api.LilyGapFind(ctx, &lily.LilyGapFindConfig{ - JobConfig: RunFlags.ParseJobConfig(), - To: rangeFlags.to, - From: rangeFlags.from, - }) - if err != nil { - return err - } - return commands.PrintNewJob(os.Stdout, res) - }, -} - var GapFillCmd = &cli.Command{ Name: "fill", Usage: "Fill gaps in the database", diff --git a/commands/job/find.go b/commands/job/find.go new file mode 100644 index 000000000..a3ba15c38 --- /dev/null +++ b/commands/job/find.go @@ -0,0 +1,42 @@ +package job + +import ( + "os" + + lotuscli "github.com/filecoin-project/lotus/cli" + "github.com/urfave/cli/v2" + + "github.com/filecoin-project/lily/commands" + "github.com/filecoin-project/lily/lens/lily" +) + +var GapFindCmd = &cli.Command{ + Name: "find", + Usage: "find gaps in the database", + Flags: []cli.Flag{ + RangeFromFlag, + RangeToFlag, + }, + Before: func(cctx *cli.Context) error { + return rangeFlags.validate() + }, + Action: func(cctx *cli.Context) error { + ctx := lotuscli.ReqContext(cctx) + + api, closer, err := commands.GetAPI(ctx) + if err != nil { + return err + } + defer closer() + + res, err := api.LilyGapFind(ctx, &lily.LilyGapFindConfig{ + JobConfig: RunFlags.ParseJobConfig(), + To: rangeFlags.to, + From: rangeFlags.from, + }) + if err != nil { + return err + } + return commands.PrintNewJob(os.Stdout, res) + }, +} diff --git a/commands/job/job.go b/commands/job/job.go index 55e9075c3..a52b6a7a1 100644 --- a/commands/job/job.go +++ b/commands/job/job.go @@ -42,7 +42,7 @@ var JobRunCmd = &cli.Command{ IndexCmd, SurveyCmd, GapFillCmd, - GapFillCmd, + GapFindCmd, }, } diff --git a/itests/builder.go b/itests/builder.go index 95bb36770..9ec987db7 100644 --- a/itests/builder.go +++ b/itests/builder.go @@ -149,20 +149,22 @@ func (vw *VectorWalkValidator) Run(ctx context.Context) node.StopFunc { // create a walk config from the builder values walkCfg := &lily.LilyWalkConfig{ - From: vw.from, - To: vw.to, - Name: vw.t.Name(), - Tasks: vw.tasks, - Window: 0, - RestartOnFailure: false, - RestartOnCompletion: false, - RestartDelay: 0, - Storage: "TestDatabase1", + From: vw.from, + To: vw.to, + JobConfig: lily.LilyJobConfig{ + Name: vw.t.Name(), + Tasks: vw.tasks, + Window: 0, + RestartOnFailure: false, + RestartOnCompletion: false, + RestartDelay: 0, + Storage: "TestDatabase1", + }, } walkStart := time.Now() // walk that walk - vw.t.Logf("starting walk from %d to %d with tasks %s", walkCfg.From, walkCfg.To, walkCfg.Tasks) + vw.t.Logf("starting walk from %d to %d with tasks %s", walkCfg.From, walkCfg.To, walkCfg.JobConfig.Tasks) res, err := vw.api.LilyWalk(ctx, walkCfg) require.NoError(vw.t, err) require.NotEmpty(vw.t, res) From 4928cad85e62852eb1b6682311ecc577be0d1012 Mon Sep 17 00:00:00 2001 From: frrist Date: Wed, 27 Apr 2022 12:16:42 -0700 Subject: [PATCH 03/10] docs: add find description --- commands/job/find.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/commands/job/find.go b/commands/job/find.go index a3ba15c38..9a9b7af06 100644 --- a/commands/job/find.go +++ b/commands/job/find.go @@ -12,7 +12,22 @@ import ( var GapFindCmd = &cli.Command{ Name: "find", - Usage: "find gaps in the database", + Usage: "find gaps in the database for a given range and a set of tasks.", + Description: ` +The find job searches for gaps in a database storage system by executing the SQL gap_find() function over the visor_processing_reports table. +find will query the database for gaps based on the list of tasks (--tasks) provided over the specified range (--to --from). +An epoch is considered to have gaps iff: +- a task specified by the --task flag is not present at each epoch within the specified range. +- a task specified by the --task flag does not have status 'OK' at each epoch within the specified range. +The results of the find job are written to the visor_gap_reports table. + +As an example, the below command: + $ lily job run --tasks=block_headers,messages find --from=10 --to=20 +searches for gaps in block_headers and messages tasks from epoch 10 to 20 (inclusive). + +Note: the find job should NOT be executed against heights that were imported from historical data dumps: https://lilium.sh/data/dumps/ +since visor_processing_report entries will not be present for imported data. +`, Flags: []cli.Flag{ RangeFromFlag, RangeToFlag, From 3a95eb724bda09958c1a9c59449c447d58c248f2 Mon Sep 17 00:00:00 2001 From: frrist Date: Wed, 27 Apr 2022 13:13:47 -0700 Subject: [PATCH 04/10] add docs --- commands/job/fill.go | 27 +++++++++++++++++--- commands/job/find.go | 12 +++++---- commands/job/index.go | 23 ++++++++++++----- commands/job/job.go | 1 + commands/job/options.go | 17 +++++++------ commands/job/survey.go | 2 +- commands/job/walk.go | 22 +++++++++++++--- commands/job/watch.go | 56 +++++++++++++++++++++++++++++++++++------ commands/job/worker.go | 7 +++--- 9 files changed, 129 insertions(+), 38 deletions(-) diff --git a/commands/job/fill.go b/commands/job/fill.go index b3b7e2597..fd0580228 100644 --- a/commands/job/fill.go +++ b/commands/job/fill.go @@ -12,7 +12,21 @@ import ( var GapFillCmd = &cli.Command{ Name: "fill", - Usage: "Fill gaps in the database", + Usage: "fill gaps in the database for a given range and set of tasks.", + Description: ` +The fill job queries the visor_gap_reports table for gaps to fill and indexes the data reported to have gaps. +A gap in the visor_gap_reports table is any row with status 'GAP'. +fill will index gaps based on the list of tasks (--tasks) provided over the specified range (--from --to). +Each epoch and its corresponding list of tasks found in the visor_gap_reports table will be indexed independently. +When the gap is successfully filled its corresponding entry in the visor_gap_reports table will be updated with status 'FILLED'. + +As an example, the below command: + $ lily job run --tasks=block_headers,message fill --from=10 --to=20 +fills gaps for block_headers and messages tasks from epoch 10 to 20 (inclusive) + +Constraints: +- the fill job must be executed AFTER a find job. These jobs must NOT be executed simultaneously. +`, Flags: []cli.Flag{ RangeFromFlag, RangeToFlag, @@ -33,7 +47,7 @@ var GapFillCmd = &cli.Command{ defer closer() res, err := api.LilyGapFill(ctx, &lily.LilyGapFillConfig{ - JobConfig: RunFlags.ParseJobConfig(), + JobConfig: RunFlags.ParseJobConfig(cctx.Command.Name), To: rangeFlags.to, From: rangeFlags.from, }) @@ -46,7 +60,12 @@ var GapFillCmd = &cli.Command{ } var GapFillNotifyCmd = &cli.Command{ - Name: "notify", + Name: "notify", + Usage: "notify the provided queueing system of gaps to index allowing tipset-workers to perform the indexing.", + Description: ` +The notify command will insert tasks into the provided queueing system for consumption by tipset-workers. +This command should be used when lily is configured to perform distributed indexing. +`, Flags: []cli.Flag{ NotifyQueueFlag, }, @@ -61,7 +80,7 @@ var GapFillNotifyCmd = &cli.Command{ cfg := &lily.LilyGapFillNotifyConfig{ GapFillConfig: lily.LilyGapFillConfig{ - JobConfig: RunFlags.ParseJobConfig(), + JobConfig: RunFlags.ParseJobConfig("fill-" + cctx.Command.Name), From: rangeFlags.from, To: rangeFlags.to, }, diff --git a/commands/job/find.go b/commands/job/find.go index 9a9b7af06..a1691d1ce 100644 --- a/commands/job/find.go +++ b/commands/job/find.go @@ -15,18 +15,20 @@ var GapFindCmd = &cli.Command{ Usage: "find gaps in the database for a given range and a set of tasks.", Description: ` The find job searches for gaps in a database storage system by executing the SQL gap_find() function over the visor_processing_reports table. -find will query the database for gaps based on the list of tasks (--tasks) provided over the specified range (--to --from). +find will query the database for gaps based on the list of tasks (--tasks) provided over the specified range (--from --to). An epoch is considered to have gaps iff: - a task specified by the --task flag is not present at each epoch within the specified range. - a task specified by the --task flag does not have status 'OK' at each epoch within the specified range. -The results of the find job are written to the visor_gap_reports table. +The results of the find job are written to the visor_gap_reports table with status 'GAP'. As an example, the below command: $ lily job run --tasks=block_headers,messages find --from=10 --to=20 searches for gaps in block_headers and messages tasks from epoch 10 to 20 (inclusive). -Note: the find job should NOT be executed against heights that were imported from historical data dumps: https://lilium.sh/data/dumps/ -since visor_processing_report entries will not be present for imported data. +Constraints: +- the find job must NOT be executed against heights that were imported from historical data dumps: https://lilium.sh/data/dumps/ +since visor_processing_report entries will not be present for imported data (meaning the entire range will be considered to have gaps). +- the find job must be executed BEFORE a fill job. These jobs must NOT be executed simultaneously. `, Flags: []cli.Flag{ RangeFromFlag, @@ -45,7 +47,7 @@ since visor_processing_report entries will not be present for imported data. defer closer() res, err := api.LilyGapFind(ctx, &lily.LilyGapFindConfig{ - JobConfig: RunFlags.ParseJobConfig(), + JobConfig: RunFlags.ParseJobConfig(cctx.Command.Name), To: rangeFlags.to, From: rangeFlags.from, }) diff --git a/commands/job/index.go b/commands/job/index.go index 138234b89..785c30a5e 100644 --- a/commands/job/index.go +++ b/commands/job/index.go @@ -26,6 +26,9 @@ var indexFlags indexOps var IndexCmd = &cli.Command{ Name: "index", Usage: "Index the state of a tipset from the filecoin blockchain.", + Description: ` +The index command may be used to index a single tipset from the filecoin blockchain specified either by height or by tipset key. +`, Subcommands: []*cli.Command{ IndexTipSetCmd, IndexHeightCmd, @@ -34,7 +37,7 @@ var IndexCmd = &cli.Command{ var IndexTipSetCmd = &cli.Command{ Name: "tipset", - Usage: "Index the state of a tipset from the filecoin blockchain by tipset key", + Usage: "Index the state of a tipset from the filecoin blockchain by tipset key.", Subcommands: []*cli.Command{ IndexNotifyCmd, }, @@ -65,7 +68,7 @@ var IndexTipSetCmd = &cli.Command{ defer closer() _, err = api.LilyIndex(ctx, &lily.LilyIndexConfig{ - JobConfig: RunFlags.ParseJobConfig(), + JobConfig: RunFlags.ParseJobConfig(cctx.Command.Name), TipSet: indexFlags.tipsetKey, }) if err != nil { @@ -78,7 +81,10 @@ var IndexTipSetCmd = &cli.Command{ var IndexHeightCmd = &cli.Command{ Name: "height", - Usage: "Index the state of a tipset from the filecoin blockchain by height", + Usage: "Index the state of a tipset from the filecoin blockchain by height.", + Description: ` + Index the state of a tipset from the filecoin blockchain by height. If the provided height is a null-round and error will be returned. +`, Flags: []cli.Flag{ &cli.Int64Flag{ Name: "height", @@ -121,7 +127,7 @@ var IndexHeightCmd = &cli.Command{ defer closer() _, err = api.LilyIndex(ctx, &lily.LilyIndexConfig{ - JobConfig: RunFlags.ParseJobConfig(), + JobConfig: RunFlags.ParseJobConfig(cctx.Command.Name), TipSet: indexFlags.tipsetKey, }) if err != nil { @@ -133,7 +139,12 @@ var IndexHeightCmd = &cli.Command{ } var IndexNotifyCmd = &cli.Command{ - Name: "notify", + Name: "notify", + Usage: "notify the provided queueing system of the tipset to index allowing tipset-workers to perform the indexing.", + Description: ` +The notify command will insert tasks into the provided queueing system for consumption by tipset-workers. +This command should be used when lily is configured to perform distributed indexing. +`, Flags: []cli.Flag{ NotifyQueueFlag, }, @@ -148,7 +159,7 @@ var IndexNotifyCmd = &cli.Command{ cfg := &lily.LilyIndexNotifyConfig{ IndexConfig: lily.LilyIndexConfig{ - JobConfig: RunFlags.ParseJobConfig(), + JobConfig: RunFlags.ParseJobConfig("index-" + cctx.Command.Name), TipSet: indexFlags.tipsetKey, }, Queue: notifyFlags.queue, diff --git a/commands/job/job.go b/commands/job/job.go index a52b6a7a1..aa85af831 100644 --- a/commands/job/job.go +++ b/commands/job/job.go @@ -43,6 +43,7 @@ var JobRunCmd = &cli.Command{ SurveyCmd, GapFillCmd, GapFindCmd, + TipSetWorkerCmd, }, } diff --git a/commands/job/options.go b/commands/job/options.go index c58d3bf69..6061666de 100644 --- a/commands/job/options.go +++ b/commands/job/options.go @@ -23,9 +23,9 @@ type runOpts struct { RestartFailure bool } -func (r runOpts) ParseJobConfig() lily.LilyJobConfig { +func (r runOpts) ParseJobConfig(kind string) lily.LilyJobConfig { if RunFlags.Name == "" { - RunFlags.Name = fmt.Sprintf("job_%d", time.Now().Unix()) + RunFlags.Name = fmt.Sprintf("%s_%d", kind, time.Now().Unix()) } if len(RunFlags.Tasks.Value()) == 0 { // TODO don't panic @@ -47,8 +47,9 @@ var RunFlags runOpts var RunWindowFlag = &cli.DurationFlag{ Name: "window", - Usage: "Duaration after which job execution will be cancled", + Usage: "Duaration after which job execution will be canceled", EnvVars: []string{"LILY_JOB_WINDOW"}, + Value: 0, Destination: &RunFlags.Window, } @@ -61,7 +62,7 @@ var RunTaskFlag = &cli.StringSliceFlag{ var RunStorageFlag = &cli.StringFlag{ Name: "storage", - Usage: "Name of storage that job will write result to.", + Usage: "Name of storage backend the job will write result to.", EnvVars: []string{"LILY_JOB_STORAGE"}, Value: "", Destination: &RunFlags.Storage, @@ -69,7 +70,7 @@ var RunStorageFlag = &cli.StringFlag{ var RunNameFlag = &cli.StringFlag{ Name: "name", - Usage: "Name of job for easy identification later.", + Usage: "Name of job for easy identification later. Will appear as 'reporter' in the visor_processing_reports table.", EnvVars: []string{"LILY_JOB_NAME"}, Value: "", Destination: &RunFlags.Name, @@ -77,7 +78,7 @@ var RunNameFlag = &cli.StringFlag{ var RunRestartDelayFlag = &cli.DurationFlag{ Name: "restart-delay", - Usage: "Duration to wait before restarting job", + Usage: "Duration to wait before restarting job after it ends execution", EnvVars: []string{"LILY_JOB_RESTART_DELAY"}, Value: 0, Destination: &RunFlags.RestartDelay, @@ -85,7 +86,7 @@ var RunRestartDelayFlag = &cli.DurationFlag{ var RunRestartCompletion = &cli.BoolFlag{ Name: "restart-on-completion", - Usage: "Restart the job after it completes", + Usage: "Restart the job after it completes.", EnvVars: []string{"LILY_JOB_RESTART_COMPLETION"}, Value: false, Destination: &RunFlags.RestartCompletion, @@ -93,7 +94,7 @@ var RunRestartCompletion = &cli.BoolFlag{ var RunRestartFailure = &cli.BoolFlag{ Name: "restart-on-failure", - Usage: "Restart the job if it fails", + Usage: "Restart the job if it fails.", EnvVars: []string{"LILY_JOB_RESTART_FAILURE"}, Value: false, Destination: &RunFlags.RestartFailure, diff --git a/commands/job/survey.go b/commands/job/survey.go index 325a8e225..ff8da6f8a 100644 --- a/commands/job/survey.go +++ b/commands/job/survey.go @@ -39,7 +39,7 @@ var SurveyCmd = &cli.Command{ defer closer() res, err := api.LilySurvey(ctx, &lily.LilySurveyConfig{ - JobConfig: RunFlags.ParseJobConfig(), + JobConfig: RunFlags.ParseJobConfig(cctx.Command.Name), Interval: surveyFlags.interval, }) if err != nil { diff --git a/commands/job/walk.go b/commands/job/walk.go index e85817a04..14aa1f74e 100644 --- a/commands/job/walk.go +++ b/commands/job/walk.go @@ -12,7 +12,16 @@ import ( var WalkCmd = &cli.Command{ Name: "walk", - Usage: "Start a daemon job to walk a range of the filecoin blockchain.", + Usage: "walk and index a range of the filecoin blockchain.", + Description: ` +The walk command will index state based on the list of tasks (--tasks) provided over the specified range (--from --to). +Each epoch will be indexed serially starting from the heaviest tipset at the upper height (--to) to the lower height (--to). + +As and example, the below command: + $ lily job run --tasks=block_headers,messages walk --from=10 --to=20 +walks epochs 20 through 10 (inclusive) executing the block_headers and messages task for each epoch. +The status of each epoch and its set of tasks can be observed in the visor_processing_reports table. +`, Flags: []cli.Flag{ RangeFromFlag, RangeToFlag, @@ -33,7 +42,7 @@ var WalkCmd = &cli.Command{ defer closer() cfg := &lily.LilyWalkConfig{ - JobConfig: RunFlags.ParseJobConfig(), + JobConfig: RunFlags.ParseJobConfig(cctx.Command.Name), From: rangeFlags.from, To: rangeFlags.to, } @@ -52,7 +61,12 @@ var WalkCmd = &cli.Command{ } var WalkNotifyCmd = &cli.Command{ - Name: "notify", + Name: "notify", + Usage: "notify the provided queueing system of epochs to index allowing tipset-workers to perform the indexing.", + Description: ` +The notify command will insert tasks into the provided queueing system for consumption by tipset-workers. +This command should be used when lily is configured to perform distributed indexing. +`, Flags: []cli.Flag{ NotifyQueueFlag, }, @@ -67,7 +81,7 @@ var WalkNotifyCmd = &cli.Command{ cfg := &lily.LilyWalkNotifyConfig{ WalkConfig: lily.LilyWalkConfig{ - JobConfig: RunFlags.ParseJobConfig(), + JobConfig: RunFlags.ParseJobConfig("walk-" + cctx.Command.Name), From: rangeFlags.from, To: rangeFlags.to, }, diff --git a/commands/job/watch.go b/commands/job/watch.go index adcd3cff7..79e8acb38 100644 --- a/commands/job/watch.go +++ b/commands/job/watch.go @@ -21,21 +21,21 @@ var watchFlags watchOps var WatchConfidenceFlag = &cli.IntFlag{ Name: "confidence", - Usage: "Sets the size of the cache used to hold tipsets for possible reversion before being committed to the database", + Usage: "Sets the size of the cache used to hold tipsets for possible reversion before being committed to the database.", EnvVars: []string{"LILY_CONFIDENCE"}, Value: 2, Destination: &watchFlags.confidence, } var WatchWorkersFlag = &cli.IntFlag{ Name: "workers", - Usage: "Sets the number of tipsets that may be simultaneous indexed while watching", + Usage: "Sets the number of tipsets that may be simultaneous indexed while watching.", EnvVars: []string{"LILY_WATCH_WORKERS"}, Value: 2, Destination: &watchFlags.workers, } var WatchBufferSizeFlag = &cli.IntFlag{ Name: "buffer-size", - Usage: "Set the number of tipsets the watcher will buffer while waiting for a worker to accept the work", + Usage: "Set the number of tipsets the watcher will buffer while waiting for a worker to accept the work.", EnvVars: []string{"LILY_WATCH_BUFFER"}, Value: 5, Destination: &watchFlags.bufferSize, @@ -43,7 +43,44 @@ var WatchBufferSizeFlag = &cli.IntFlag{ var WatchCmd = &cli.Command{ Name: "watch", - Usage: "Start a daemon job to watch the head of the filecoin blockchain.", + Usage: "watch the head of the filecoin blockchain and index each new head as it becomes avaiable", + Description: ` +The watch command subscribes to incoming tipsets from the filecoin blockchain and indexes them as the arrive. + +Since it may be the case that tipsets arrive at a rate greater than their rate of indexing the watch job maintains a +queue of tipsets to index. Consumption of this queue can be configured via the --workers flag. Increasing the value provided +to the --workers flag will allow the watch job to index tipsets simultaneously (Note: this will use a significant amount of system resources). + +Since it may be the case that lily experiences a reorg while the watch job is observing the head of the chain +the --confidence flag may be used to buffed the amount of tipsets observed before it begins indexing - illustrated by the below diagram: + + *unshift* *unshift* *unshift* *unshift* + │ │ │ │ │ │ │ │ + ┌──▼──▼──┐ ┌──▼──▼──┐ ┌──▼──▼──┐ ┌──▼──▼──┐ + │ │ │ ts10 │ │ ts11 │ │ ts12 │ + ... ---> ├────────┤ ---> ├────────┤ ---> ├────────┤ ---> ├────────┤ ---> ... + │ ts09 │ │ ts09 │ │ ts10 │ │ ts11 │ + ├────────┤ ├────────┤ ├────────┤ ├────────┤ + │ ts08 │ │ ts08 │ │ ts09 │ │ ts10 │ + ├────────┤ ├────────┤ ├────────┤ ├────────┤ + │ ... │ │ ... │ │ ... │ │ ... │ + ├────────┤ ├────────┤ ├────────┤ ├────────┤ + │ ts02 │ │ ts02 │ │ ts03 │ │ ts04 │ + ├────────┤ ├────────┤ ├────────┤ ├────────┤ + │ ts01 │ │ ts01 │ │ ts02 │ │ ts03 │ + ├────────┤ ├────────┤ ├────────┤ ├────────┤ + │ ts00 │ │ ts00 │ │ ts01 │ │ ts02 │ + └────────┘ └────────┘ └──│──│──┘ └──│──│──┘ + ▼ ▼ *pop* ▼ ▼ *pop* + ┌────────┐ ┌────────┐ + (confidence=10 :: length=10) │ ts00 │ │ ts01 │ + └────────┘ └────────┘ + (process) (process) + +As and example, the below command: + $ lily job run --tasks-block_headers,messages watch --confidence=10 --workers=2 +watches the chain head and only indexes a tipset after observing 10 subsequent tipsets indexing at most two tipset simultaneously. +`, Flags: []cli.Flag{ WatchConfidenceFlag, WatchWorkersFlag, @@ -63,7 +100,7 @@ var WatchCmd = &cli.Command{ var res *schedule.JobSubmitResult cfg := &lily.LilyWatchConfig{ - JobConfig: RunFlags.ParseJobConfig(), + JobConfig: RunFlags.ParseJobConfig(cctx.Command.Name), BufferSize: watchFlags.bufferSize, Confidence: watchFlags.confidence, Workers: watchFlags.workers, @@ -83,7 +120,12 @@ var WatchCmd = &cli.Command{ } var WatchNotifyCmd = &cli.Command{ - Name: "notify", + Name: "notify", + Usage: "notify the provided queueing system of epochs to index allowing tipset-workers to perform the indexing.", + Description: ` +The notify command will insert tasks into the provided queueing system for consumption by tipset-workers. +This command should be used when lily is configured to perform distributed indexing. +`, Flags: []cli.Flag{ NotifyQueueFlag, }, @@ -97,7 +139,7 @@ var WatchNotifyCmd = &cli.Command{ defer closer() cfg := &lily.LilyWatchNotifyConfig{ - JobConfig: RunFlags.ParseJobConfig(), + JobConfig: RunFlags.ParseJobConfig("watch-" + cctx.Command.Name), Confidence: watchFlags.confidence, BufferSize: watchFlags.bufferSize, diff --git a/commands/job/worker.go b/commands/job/worker.go index ebb28511d..f852704b8 100644 --- a/commands/job/worker.go +++ b/commands/job/worker.go @@ -16,7 +16,8 @@ var tipsetWorkerFlags struct { } var TipSetWorkerCmd = &cli.Command{ - Name: "tipset-worker", + Name: "tipset-worker", + Usage: "start a tipset-worker that consumes tasks from the provided queuing system and performs indexing", Flags: commands.FlagSet( commands.ClientAPIFlagSet, []cli.Flag{ @@ -28,7 +29,7 @@ var TipSetWorkerCmd = &cli.Command{ }, &cli.StringFlag{ Name: "queue", - Usage: "Name of queue worker will consume work from.", + Usage: "Name of queue system worker will consume work from.", EnvVars: []string{"LILY_TSWORKER_QUEUE"}, Value: "", Destination: &tipsetWorkerFlags.queue, @@ -45,7 +46,7 @@ var TipSetWorkerCmd = &cli.Command{ defer closer() res, err := api.StartTipSetWorker(ctx, &lily.LilyTipSetWorkerConfig{ - JobConfig: RunFlags.ParseJobConfig(), + JobConfig: RunFlags.ParseJobConfig(cctx.Command.Name), Queue: tipsetWorkerFlags.queue, Concurrency: tipsetWorkerFlags.concurrency, }) From c23834cc300a142f6565a0098a731626c4cdc6ed Mon Sep 17 00:00:00 2001 From: frrist Date: Wed, 27 Apr 2022 17:05:37 -0700 Subject: [PATCH 05/10] remove help topics --- commands/help.go | 219 ----------------------------------------- commands/job/worker.go | 31 +++--- main.go | 4 - 3 files changed, 14 insertions(+), 240 deletions(-) diff --git a/commands/help.go b/commands/help.go index 8751bf735..aa8a9ce6a 100644 --- a/commands/help.go +++ b/commands/help.go @@ -60,223 +60,4 @@ var HelpCmd = &cli.Command{ HelpModelsListCmd, HelpModelsDescribeCmd, }, - Action: func(c *cli.Context) error { - args := c.Args() - if args.Present() { - return ShowCommandHelp(c, args.First()) - } - - _ = cli.ShowAppHelp(c) - return nil - }, -} - -func ShowCommandHelp(ctx *cli.Context, command string) error { - if command == "" { - cli.HelpPrinter(ctx.App.Writer, cli.SubcommandHelpTemplate, ctx.App) - return nil - } - - for _, c := range ctx.App.Commands { - if c.HasName(command) { - templ := c.CustomHelpTemplate - if templ == "" { - templ = cli.CommandHelpTemplate - } - - cli.HelpPrinter(ctx.App.Writer, templ, c) - - return nil - } - } - - for _, t := range helpTopics { - if t.Name == command { - fmt.Fprintln(ctx.App.Writer, t.Text) - return nil - } - } - - if ctx.App.CommandNotFound == nil { - return cli.Exit(fmt.Sprintf("No help topic for '%v'", command), 3) - } - - ctx.App.CommandNotFound(ctx, command) - return nil -} - -func Metadata() map[string]interface{} { - return map[string]interface{}{ - "Topics": helpTopics, - } -} - -var AppHelpTemplate = `{{.Name}}{{if .Usage}} - {{.Usage}}{{end}} - -Usage: - - {{.HelpName}} [global options] [arguments...] - -The commands are: -{{range .VisibleCategories}}{{if .Name}} - {{.Name}}:{{range .VisibleCommands}} - {{join .Names ", "}}{{"\t"}}{{.Usage}}{{end}}{{else}}{{range .VisibleCommands}} - {{join .Names ", "}}{{"\t"}}{{.Usage}}{{end}}{{end}}{{end}} - -Use "{{.HelpName}} help " for more information about a command. - -Additional help topics: -{{range .Metadata.Topics}} - {{.Name}}{{"\t"}}{{.Description}}{{end}} - -Use "{{.HelpName}} help " for more information about that topic. -` - -type helpTopic struct { - Name string - Description string - Text string -} - -// ---------------------------------------------------------------------------- -// 80 characters --> -var helpTopics = []helpTopic{ - { - Name: "overview", - Description: "Overview of visor", - Text: `Visor is an application for capturing on-chain state from the filecoin network. -It extracts data from the blocks and messages contained in each tipset and -captures the effects those messages have on actor states. Visor can 'watch' -the head of the filecoin chain for incoming tipsets or 'walk' the chain to -analyze historic tipsets. - -A watch is intended to follow the growth of the chain and operates by -subscribing to incoming tipsets and processing them as they arrive. A -confidence level may be specified which determines how many epochs visor -should wait before processing the tipset. This is to allow for chain -reorganisation near the head. A low confidence level risks extracting data from -tipsets that do not form part of the consensus chain. - -A walk takes a range of heights and will walk from the heaviest tipset at the -upper height to the lower height using the parent state root present in each -tipset. - -The type of data extracted by lily is controlled by 'tasks' that focus on -particular parts of the chain. For more information about available tasks -see 'visor help tasks'. - -Data is extracted into models that represent chain objects, components of actor -state and derived statistics. Visor can insert these extracted models into a -TimescaleDB database as separate tables or emit them as csv files. - -Visor requires access to a filecoin blockstore that holds the state of the -chain. For watching incoming tipsets the blockstore must be connected and in -sync with the filecoin network. Historic walks can be performed against an -offline store. - -While running, lily will maintain its own local blockstore and attempt -to synchronise it with the filecoin network. For more information on running -visor as a daemon, including how to initialise the blockstore, see -'visor help daemon'. - -`, - }, - - { - Name: "monitoring", - Description: "Monitoring lily operation", - Text: `Visor may be monitored during operation using logfiles, metrics and tracing. -The lily command recognizes environment variables and provides options to -control the behaviour of each type of monitoring output. Options should be -supplied before any sub command: - - lily [global options] - -Visor uses the IPFS logging library (https://github.com/ipfs/go-ipfs) to write -application logs. By default logs are written to STDERR in JSON format. Log -lines are labeled with one of seven levels to indicate severity of the message -(DEBUG, INFO, WARN, ERROR, DPANIC, PANIC, FATAL). Line are also labeled with -named systems which indicate the area of function that produced the log. Each -system may be configured to only emit log messages of a specific level or -higher. By default all log levels are set to debug and above. - -A number of environment variables may be used to control the format and -destination of the logs. - - GOLOG_LOG_LEVEL Set the default log level for all log systems. - - GOLOG_LOG_FMT Set the output log format. By default logs will be - colorized and text format. Use 'json' to specify - JSON formatted logs and 'nocolor' to log in text - format without colors. - - GOLOG_FILE Specify the name of the file that logs should be - written to. Only used if GOLOG_OUTPUT contains the - 'file' keyword. - - GOLOG_OUTPUT Specify whether to output to file, stderr, stdout or - a combination. Separate each keyword with a '+', for - example: file+stderr - - LILY_LOG_LEVEL_NAMED Set the log level of specific loggers. The value - should be a comma delimited list of log systems and - log levels formatted as name:level, for example - 'logger1:debug,logger2:info'. - -In addition, lily supports some global options for controlling logging: - - --log-level LEVEL Set the default log level for all loggers to LEVEL. - This option overrides any value set using the - GOLOG_LOG_LEVEL environment variable. - - --log-level-named value Set the log level of specific loggers. This option - overrides any value set using the - LILY_LOG_LEVEL_NAMED environment variable. - -To control logging output while the lily daemon is running see 'visor help log'. - -During operation lily exposes metrics and debugging information on port 9991 -by default. The address used by this http server can be changed using the -'--prometheus-port' option which expects an IP address and port number. The -address may be omitted to run the server on all interfaces, for example: ':9991'. - -The following paths can be accessed using a standard web browser. - - /metrics Metrics published in prometheus format - - /debug/pprof/ Access to standard Go profiling and debugging information - memory allocations, cpu profile and active goroutines dumps. - -Visor can publish function level tracing to a Jaeger compatible service. By -default tracing is disabled. - -Environment variables for controlling function level tracing: - - LILY_TRACING Enable tracing. Set to 'true' to enable tracing. - - JAEGER_AGENT_HOST, Hostname and port of a Jaeger compatible agent that - JAEGER_AGENT_PORT lily should send traces to. - - JAEGER_SERVICE_NAME The name lily should use when reporting traces. - - JAEGER_SAMPLER_TYPE, Control the type of sampling used to capture traces. - JAEGER_SAMPLER_PARAM The type may be either 'const' or 'probabilistic'. - The behaviour of the sampler is controlled by the - value of param. For a 'const' sampler a value of 1 - indicates that every function call should be traced, - while 0 means none should be traced. No intermediate - values are accepted. For a 'probabilistic' sampler - the param indicates the fraction of function calls - that should be sampled. - -The following options may be used to override the tracing environment variables: - - --tracing - --jaeger-agent-host - --jaeger-agent-port - --jaeger-service-name - --jaeger-sampler-type - --jaeger-sampler-param -`, - }, } diff --git a/commands/job/worker.go b/commands/job/worker.go index f852704b8..c6c0aa806 100644 --- a/commands/job/worker.go +++ b/commands/job/worker.go @@ -18,24 +18,21 @@ var tipsetWorkerFlags struct { var TipSetWorkerCmd = &cli.Command{ Name: "tipset-worker", Usage: "start a tipset-worker that consumes tasks from the provided queuing system and performs indexing", - Flags: commands.FlagSet( - commands.ClientAPIFlagSet, - []cli.Flag{ - &cli.IntFlag{ - Name: "concurrency", - Usage: "Concurrency sets the maximum number of concurrent processing of tasks. If set to a zero or negative value it will be set to the number of CPUs usable by the current process.", - Value: 1, - Destination: &tipsetWorkerFlags.concurrency, - }, - &cli.StringFlag{ - Name: "queue", - Usage: "Name of queue system worker will consume work from.", - EnvVars: []string{"LILY_TSWORKER_QUEUE"}, - Value: "", - Destination: &tipsetWorkerFlags.queue, - }, + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "concurrency", + Usage: "Concurrency sets the maximum number of concurrent processing of tasks. If set to a zero or negative value it will be set to the number of CPUs usable by the current process.", + Value: 1, + Destination: &tipsetWorkerFlags.concurrency, }, - ), + &cli.StringFlag{ + Name: "queue", + Usage: "Name of queue system worker will consume work from.", + EnvVars: []string{"LILY_TSWORKER_QUEUE"}, + Value: "", + Destination: &tipsetWorkerFlags.queue, + }, + }, Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) diff --git a/main.go b/main.go index 5f9060bb4..4bd723770 100644 --- a/main.go +++ b/main.go @@ -70,8 +70,6 @@ func main() { }) } - //cli.AppHelpTemplate = commands.AppHelpTemplate - app := &cli.App{ Name: "lily", Usage: "a tool for capturing on-chain state from the filecoin network", @@ -156,8 +154,6 @@ func main() { Destination: &commands.VisorMetricFlags.RedisDB, }, }, - HideHelp: false, - Metadata: commands.Metadata(), Commands: []*cli.Command{ commands.ChainCmd, commands.DaemonCmd, From d058508d0246982ef6712dbd3cd145d3ed0d95ce Mon Sep 17 00:00:00 2001 From: frrist Date: Wed, 27 Apr 2022 17:51:08 -0700 Subject: [PATCH 06/10] fix: panics in job parse --- commands/job/fill.go | 4 ++-- commands/job/find.go | 2 +- commands/job/index.go | 6 +++--- commands/job/options.go | 14 +++++++------- commands/job/survey.go | 2 +- commands/job/walk.go | 4 ++-- commands/job/watch.go | 4 ++-- commands/job/worker.go | 2 +- 8 files changed, 19 insertions(+), 19 deletions(-) diff --git a/commands/job/fill.go b/commands/job/fill.go index fd0580228..214df3ed5 100644 --- a/commands/job/fill.go +++ b/commands/job/fill.go @@ -47,7 +47,7 @@ Constraints: defer closer() res, err := api.LilyGapFill(ctx, &lily.LilyGapFillConfig{ - JobConfig: RunFlags.ParseJobConfig(cctx.Command.Name), + JobConfig: RunFlags.ParseJobConfig("fill"), To: rangeFlags.to, From: rangeFlags.from, }) @@ -80,7 +80,7 @@ This command should be used when lily is configured to perform distributed index cfg := &lily.LilyGapFillNotifyConfig{ GapFillConfig: lily.LilyGapFillConfig{ - JobConfig: RunFlags.ParseJobConfig("fill-" + cctx.Command.Name), + JobConfig: RunFlags.ParseJobConfig("fill-notify"), From: rangeFlags.from, To: rangeFlags.to, }, diff --git a/commands/job/find.go b/commands/job/find.go index a1691d1ce..87677f317 100644 --- a/commands/job/find.go +++ b/commands/job/find.go @@ -47,7 +47,7 @@ since visor_processing_report entries will not be present for imported data (mea defer closer() res, err := api.LilyGapFind(ctx, &lily.LilyGapFindConfig{ - JobConfig: RunFlags.ParseJobConfig(cctx.Command.Name), + JobConfig: RunFlags.ParseJobConfig("find"), To: rangeFlags.to, From: rangeFlags.from, }) diff --git a/commands/job/index.go b/commands/job/index.go index 785c30a5e..1798855f3 100644 --- a/commands/job/index.go +++ b/commands/job/index.go @@ -68,7 +68,7 @@ var IndexTipSetCmd = &cli.Command{ defer closer() _, err = api.LilyIndex(ctx, &lily.LilyIndexConfig{ - JobConfig: RunFlags.ParseJobConfig(cctx.Command.Name), + JobConfig: RunFlags.ParseJobConfig("index-tipset"), TipSet: indexFlags.tipsetKey, }) if err != nil { @@ -127,7 +127,7 @@ var IndexHeightCmd = &cli.Command{ defer closer() _, err = api.LilyIndex(ctx, &lily.LilyIndexConfig{ - JobConfig: RunFlags.ParseJobConfig(cctx.Command.Name), + JobConfig: RunFlags.ParseJobConfig("index-height"), TipSet: indexFlags.tipsetKey, }) if err != nil { @@ -159,7 +159,7 @@ This command should be used when lily is configured to perform distributed index cfg := &lily.LilyIndexNotifyConfig{ IndexConfig: lily.LilyIndexConfig{ - JobConfig: RunFlags.ParseJobConfig("index-" + cctx.Command.Name), + JobConfig: RunFlags.ParseJobConfig("index-notify"), TipSet: indexFlags.tipsetKey, }, Queue: notifyFlags.queue, diff --git a/commands/job/options.go b/commands/job/options.go index 6061666de..eedde1074 100644 --- a/commands/job/options.go +++ b/commands/job/options.go @@ -4,17 +4,21 @@ import ( "fmt" "time" + logging "github.com/ipfs/go-log/v2" "github.com/urfave/cli/v2" "golang.org/x/xerrors" + "github.com/filecoin-project/lily/chain/indexer/tasktype" "github.com/filecoin-project/lily/lens/lily" ) +var log = logging.Logger("lily/commands/job") + type runOpts struct { Storage string Name string - Tasks *cli.StringSlice + Tasks cli.StringSlice Window time.Duration RestartDelay time.Duration @@ -27,11 +31,6 @@ func (r runOpts) ParseJobConfig(kind string) lily.LilyJobConfig { if RunFlags.Name == "" { RunFlags.Name = fmt.Sprintf("%s_%d", kind, time.Now().Unix()) } - if len(RunFlags.Tasks.Value()) == 0 { - // TODO don't panic - panic("need tasks") - } - // TODO handle task wild card * return lily.LilyJobConfig{ Name: RunFlags.Name, Storage: RunFlags.Storage, @@ -57,7 +56,8 @@ var RunTaskFlag = &cli.StringSliceFlag{ Name: "tasks", Usage: "Comma separated list of tasks to run in job. Each task is reported separately in the storage backend.", EnvVars: []string{"LILY_JOB_TASKS"}, - Destination: RunFlags.Tasks, + Value: cli.NewStringSlice(tasktype.AllTableTasks...), + Destination: &RunFlags.Tasks, } var RunStorageFlag = &cli.StringFlag{ diff --git a/commands/job/survey.go b/commands/job/survey.go index ff8da6f8a..5c8bb8739 100644 --- a/commands/job/survey.go +++ b/commands/job/survey.go @@ -39,7 +39,7 @@ var SurveyCmd = &cli.Command{ defer closer() res, err := api.LilySurvey(ctx, &lily.LilySurveyConfig{ - JobConfig: RunFlags.ParseJobConfig(cctx.Command.Name), + JobConfig: RunFlags.ParseJobConfig("survey"), Interval: surveyFlags.interval, }) if err != nil { diff --git a/commands/job/walk.go b/commands/job/walk.go index 14aa1f74e..aee454488 100644 --- a/commands/job/walk.go +++ b/commands/job/walk.go @@ -42,7 +42,7 @@ The status of each epoch and its set of tasks can be observed in the visor_proce defer closer() cfg := &lily.LilyWalkConfig{ - JobConfig: RunFlags.ParseJobConfig(cctx.Command.Name), + JobConfig: RunFlags.ParseJobConfig("walk"), From: rangeFlags.from, To: rangeFlags.to, } @@ -81,7 +81,7 @@ This command should be used when lily is configured to perform distributed index cfg := &lily.LilyWalkNotifyConfig{ WalkConfig: lily.LilyWalkConfig{ - JobConfig: RunFlags.ParseJobConfig("walk-" + cctx.Command.Name), + JobConfig: RunFlags.ParseJobConfig("walk-notify"), From: rangeFlags.from, To: rangeFlags.to, }, diff --git a/commands/job/watch.go b/commands/job/watch.go index 79e8acb38..c96e316a8 100644 --- a/commands/job/watch.go +++ b/commands/job/watch.go @@ -100,7 +100,7 @@ watches the chain head and only indexes a tipset after observing 10 subsequent t var res *schedule.JobSubmitResult cfg := &lily.LilyWatchConfig{ - JobConfig: RunFlags.ParseJobConfig(cctx.Command.Name), + JobConfig: RunFlags.ParseJobConfig("watch"), BufferSize: watchFlags.bufferSize, Confidence: watchFlags.confidence, Workers: watchFlags.workers, @@ -139,7 +139,7 @@ This command should be used when lily is configured to perform distributed index defer closer() cfg := &lily.LilyWatchNotifyConfig{ - JobConfig: RunFlags.ParseJobConfig("watch-" + cctx.Command.Name), + JobConfig: RunFlags.ParseJobConfig("watch-notify"), Confidence: watchFlags.confidence, BufferSize: watchFlags.bufferSize, diff --git a/commands/job/worker.go b/commands/job/worker.go index c6c0aa806..97052d355 100644 --- a/commands/job/worker.go +++ b/commands/job/worker.go @@ -43,7 +43,7 @@ var TipSetWorkerCmd = &cli.Command{ defer closer() res, err := api.StartTipSetWorker(ctx, &lily.LilyTipSetWorkerConfig{ - JobConfig: RunFlags.ParseJobConfig(cctx.Command.Name), + JobConfig: RunFlags.ParseJobConfig("tipset-worker"), Queue: tipsetWorkerFlags.queue, Concurrency: tipsetWorkerFlags.concurrency, }) From d6cec2a3105ab4a1977fb83f8e82fceb43dda39a Mon Sep 17 00:00:00 2001 From: frrist Date: Wed, 27 Apr 2022 17:59:19 -0700 Subject: [PATCH 07/10] remove ClientAPIFlagSet from remainder of commands --- commands/chain.go | 84 +++++++++++++++++++-------------------------- commands/job/job.go | 54 ++++++++++++----------------- commands/log.go | 21 ++++-------- commands/net.go | 49 +++++++++----------------- commands/stop.go | 3 -- commands/sync.go | 16 +++------ commands/wait.go | 15 ++++---- 7 files changed, 90 insertions(+), 152 deletions(-) diff --git a/commands/chain.go b/commands/chain.go index 63edf3fa5..984685dae 100644 --- a/commands/chain.go +++ b/commands/chain.go @@ -37,9 +37,6 @@ var ChainCmd = &cli.Command{ var ChainHeadCmd = &cli.Command{ Name: "head", Usage: "Print chain head", - Flags: FlagSet( - ClientAPIFlagSet, - ), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) lapi, closer, err := GetAPI(ctx) @@ -64,14 +61,12 @@ var ChainGetBlock = &cli.Command{ Name: "getblock", Usage: "Get a block and print its details", ArgsUsage: "[blockCid]", - Flags: FlagSet( - ClientAPIFlagSet, - []cli.Flag{ - &cli.BoolFlag{ - Name: "raw", - Usage: "print just the raw block header", - }, - }), + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "raw", + Usage: "print just the raw block header", + }, + }, Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) lapi, closer, err := GetAPI(ctx) @@ -156,9 +151,6 @@ var ChainReadObjCmd = &cli.Command{ Name: "read-obj", Usage: "Read the raw bytes of an object", ArgsUsage: "[objectCid]", - Flags: FlagSet( - ClientAPIFlagSet, - ), Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) lapi, closer, err := GetAPI(ctx) @@ -191,14 +183,12 @@ var ChainStatObjCmd = &cli.Command{ When a base is provided it will be walked first, and all links visisted will be ignored when the passed in object is walked. `, - Flags: FlagSet( - ClientAPIFlagSet, - []cli.Flag{ - &cli.StringFlag{ - Name: "base", - Usage: "ignore links found in this obj", - }, - }), + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "base", + Usage: "ignore links found in this obj", + }, + }, Action: func(cctx *cli.Context) error { ctx := lotuscli.ReqContext(cctx) lapi, closer, err := GetAPI(ctx) @@ -283,21 +273,19 @@ var ChainListCmd = &cli.Command{ Name: "list", Aliases: []string{"love"}, Usage: "View a segment of the chain", - Flags: FlagSet( - ClientAPIFlagSet, - []cli.Flag{ - &cli.Uint64Flag{Name: "height", DefaultText: "current head"}, - &cli.IntFlag{Name: "count", Value: 30}, - &cli.StringFlag{ - Name: "format", - Usage: "specify the format to print out tipsets", - Value: ": (