Skip to content

Commit

Permalink
feat(sentinel): add sentinel module scaffolding and API
Browse files Browse the repository at this point in the history
- allows lotus to be started in sentinel mode and start a watch that logs
tipsets as they are received.
  • Loading branch information
frrist committed Feb 24, 2021
1 parent ccebc4f commit c671988
Show file tree
Hide file tree
Showing 16 changed files with 265 additions and 16 deletions.
1 change: 1 addition & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
// FullNode API is a low-level interface to the Filecoin network full node
type FullNode interface {
Common
Sentinel

// MethodGroup: Chain
// The Chain method group contains methods for interacting with the
Expand Down
12 changes: 12 additions & 0 deletions api/api_sentinel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package api

import (
"context"
)

type Sentinel interface {
// MethodGroup: Sentinel

// WatchStart start a watch against the chain
WatchStart(context.Context) error
}
1 change: 1 addition & 0 deletions api/apistruct/permissioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func PermissionedFullAPI(a api.FullNode) api.FullNode {
var out FullNodeStruct
auth.PermissionedProxy(AllPermissions, DefaultPerms, a, &out.Internal)
auth.PermissionedProxy(AllPermissions, DefaultPerms, a, &out.CommonStruct.Internal)
auth.PermissionedProxy(AllPermissions, DefaultPerms, a, &out.SentinelStruct.Internal)
return &out
}

Expand Down
12 changes: 12 additions & 0 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type CommonStruct struct {
// FullNodeStruct implements API passing calls to user-provided function values.
type FullNodeStruct struct {
CommonStruct
SentinelStruct

Internal struct {
ChainNotify func(context.Context) (<-chan []*api.HeadChange, error) `perm:"read"`
Expand Down Expand Up @@ -470,6 +471,12 @@ type WalletStruct struct {
}
}

type SentinelStruct struct {
Internal struct {
WatchStart func(ctx context.Context) error `perm:"admin"`
}
}

// CommonStruct

func (c *CommonStruct) AuthVerify(ctx context.Context, token string) ([]auth.Permission, error) {
Expand Down Expand Up @@ -1854,9 +1861,14 @@ func (c *WalletStruct) WalletDelete(ctx context.Context, addr address.Address) e
return c.Internal.WalletDelete(ctx, addr)
}

func (s *SentinelStruct) WatchStart(ctx context.Context) error {
return s.Internal.WatchStart(ctx)
}

var _ api.Common = &CommonStruct{}
var _ api.FullNode = &FullNodeStruct{}
var _ api.StorageMiner = &StorageMinerStruct{}
var _ api.WorkerAPI = &WorkerStruct{}
var _ api.GatewayAPI = &GatewayStruct{}
var _ api.WalletAPI = &WalletStruct{}
var _ api.Sentinel = &SentinelStruct{}
1 change: 1 addition & 0 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func NewFullNodeRPC(ctx context.Context, addr string, requestHeader http.Header)
closer, err := jsonrpc.NewMergeClient(ctx, addr, "Filecoin",
[]interface{}{
&res.CommonStruct.Internal,
&res.SentinelStruct.Internal,
&res.Internal,
}, requestHeader)

Expand Down
51 changes: 46 additions & 5 deletions chain/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,15 @@ type Events struct {

heightEvents
*hcEvents

observers []TipSetObserver
}

func NewEvents(ctx context.Context, api eventAPI) *Events {
gcConfidence := 2 * build.ForkLengthThreshold
return NewEventsWithConfidence(ctx, api, 2*build.ForkLengthThreshold)
}

func NewEventsWithConfidence(ctx context.Context, api eventAPI, gcConfidence abi.ChainEpoch) *Events {
tsc := newTSCache(gcConfidence, api)

e := &Events{
Expand All @@ -77,8 +81,9 @@ func NewEvents(ctx context.Context, api eventAPI) *Events {
htHeights: map[abi.ChainEpoch][]uint64{},
},

hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}),
hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}),
observers: []TipSetObserver{},
}

go e.listenHeadChanges(ctx)
Expand All @@ -90,6 +95,7 @@ func NewEvents(ctx context.Context, api eventAPI) *Events {
}

return e

}

func (e *Events) listenHeadChanges(ctx context.Context) {
Expand Down Expand Up @@ -164,7 +170,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
}
}

if err := e.headChange(rev, app); err != nil {
if err := e.headChange(ctx, rev, app); err != nil {
log.Warnf("headChange failed: %s", err)
}

Expand All @@ -177,7 +183,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
return nil
}

func (e *Events) headChange(rev, app []*types.TipSet) error {
func (e *Events) headChange(ctx context.Context, rev, app []*types.TipSet) error {
if len(app) == 0 {
return xerrors.New("events.headChange expected at least one applied tipset")
}
Expand All @@ -189,5 +195,40 @@ func (e *Events) headChange(rev, app []*types.TipSet) error {
return err
}

if err := e.observeChanges(ctx, rev, app); err != nil {
return err
}

return e.processHeadChangeEvent(rev, app)
}

// A TipSetObserver receives notifications of tipsets
type TipSetObserver interface {
Apply(ctx context.Context, ts *types.TipSet) error
Revert(ctx context.Context, ts *types.TipSet) error
}

// TODO: add a confidence level so we can have observers with difference levels of confidence
func (e *Events) Observe(obs TipSetObserver) error {
e.lk.Lock()
defer e.lk.Unlock()
e.observers = append(e.observers, obs)
return nil
}

// observeChanges expects caller to hold e.lk
func (e *Events) observeChanges(ctx context.Context, rev, app []*types.TipSet) error {
for _, ts := range rev {
for _, o := range e.observers {
_ = o.Revert(ctx, ts)
}
}

for _, ts := range app {
for _, o := range e.observers {
_ = o.Apply(ctx, ts)
}
}

return nil
}
1 change: 1 addition & 0 deletions cli/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ var Commands = []*cli.Command{
WithCategory("developer", fetchParamCmd),
WithCategory("network", netCmd),
WithCategory("network", syncCmd),
WithCategory("sentinel", sentinelCmd),
pprofCmd,
VersionCmd,
}
Expand Down
39 changes: 39 additions & 0 deletions cli/sentinel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package cli

import (
"github.com/urfave/cli/v2"
)

var sentinelCmd = &cli.Command{
Name: "sentinel",
Usage: "Interact with the sentinel module",
Subcommands: []*cli.Command{
sentinelStartWatchCmd,
},
}

var sentinelStartWatchCmd = &cli.Command{
Name: "watch",
Usage: "start a watch against the chain",
Flags: []cli.Flag{
&cli.Int64Flag{
Name: "confidence",
},
},
Action: func(cctx *cli.Context) error {
apic, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)

//confidence := abi.ChainEpoch(cctx.Int64("confidence"))

if err := apic.WatchStart(ctx); err != nil {
return err
}

return nil
},
}
45 changes: 38 additions & 7 deletions cmd/lotus/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ const (
preTemplateFlag = "genesis-template"
)

type daemonMode int

const (
modeUnknown daemonMode = 0 // no valid mode could be determined
modeStandard daemonMode = 1 // standard mode
modeLite daemonMode = 2 // lite mode, backed by gateway
modeSentinel daemonMode = 3 // stats collection mode, analyses chain events
)

var daemonStopCmd = &cli.Command{
Name: "stop",
Usage: "Stop a running lotus daemon",
Expand Down Expand Up @@ -119,6 +128,10 @@ var DaemonCmd = &cli.Command{
Usage: "start lotus in lite mode",
Hidden: true,
},
&cli.BoolFlag{
Name: "sentinel",
Usage: "start lotus in sentinel mode.",
},
&cli.StringFlag{
Name: "pprof",
Usage: "specify name of file for writing cpu profile to",
Expand Down Expand Up @@ -154,13 +167,15 @@ var DaemonCmd = &cli.Command{
},
},
Action: func(cctx *cli.Context) error {
isLite := cctx.Bool("lite")
daemonMode, err := getDaemonMode(cctx)
if err != nil {
return err
}

err := runmetrics.Enable(runmetrics.RunMetricOptions{
if err := runmetrics.Enable(runmetrics.RunMetricOptions{
EnableCPU: true,
EnableMemory: true,
})
if err != nil {
}); err != nil {
return xerrors.Errorf("enabling runtime metrics: %w", err)
}

Expand Down Expand Up @@ -217,7 +232,7 @@ var DaemonCmd = &cli.Command{
}
freshRepo := err != repo.ErrRepoExists

if !isLite {
if daemonMode != modeLite {
if err := paramfetch.GetParams(lcli.ReqContext(cctx), build.ParametersJSON(), 0); err != nil {
return xerrors.Errorf("fetching proof parameters: %w", err)
}
Expand Down Expand Up @@ -279,7 +294,7 @@ var DaemonCmd = &cli.Command{
// If the daemon is started in "lite mode", provide a GatewayAPI
// for RPC calls
liteModeDeps := node.Options()
if isLite {
if daemonMode == modeLite {
gapi, closer, err := lcli.GetGatewayAPI(cctx)
if err != nil {
return err
Expand All @@ -298,7 +313,7 @@ var DaemonCmd = &cli.Command{

var api api.FullNode
stop, err := node.New(ctx,
node.FullAPI(&api, node.Lite(isLite)),
node.FullAPI(&api, node.Lite(daemonMode == modeLite), node.Sentinel(daemonMode == modeSentinel)),

node.Override(new(dtypes.Bootstrapper), isBootstrapper),
node.Override(new(dtypes.ShutdownChan), shutdownChan),
Expand Down Expand Up @@ -499,3 +514,19 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)

return nil
}

func getDaemonMode(cctx *cli.Context) (daemonMode, error) {
isLite := cctx.Bool("lite")
isSentinel := cctx.Bool("sentinel")

switch {
case !isLite && !isSentinel:
return modeStandard, nil
case isLite && !isSentinel:
return modeLite, nil
case !isLite && isSentinel:
return modeSentinel, nil
default:
return modeUnknown, xerrors.Errorf("cannot specify more than one mode")
}
}
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1402,6 +1402,7 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 h1:WhxRHzgeVGETMlmVfqhRn8RIeeNoPr2Czh33I4Zdccw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
1 change: 1 addition & 0 deletions metrics/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func MetricedFullAPI(a api.FullNode) api.FullNode {
var out apistruct.FullNodeStruct
proxy(a, &out.Internal)
proxy(a, &out.CommonStruct.Internal)
proxy(a, &out.SentinelStruct.Internal)
return &out
}

Expand Down
Loading

0 comments on commit c671988

Please sign in to comment.