Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Event observer and Settings for 3rd party dep injection #5693

Merged
merged 2 commits into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 47 additions & 7 deletions chain/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ type Events struct {

heightEvents
*hcEvents
}

func NewEvents(ctx context.Context, api eventAPI) *Events {
gcConfidence := 2 * build.ForkLengthThreshold
observers []TipSetObserver
}

func NewEventsWithConfidence(ctx context.Context, api eventAPI, gcConfidence abi.ChainEpoch) *Events {
tsc := newTSCache(gcConfidence, api)

e := &Events{
Expand All @@ -77,8 +77,9 @@ func NewEvents(ctx context.Context, api eventAPI) *Events {
htHeights: map[abi.ChainEpoch][]uint64{},
},

hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}),
hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}),
observers: []TipSetObserver{},
}

go e.listenHeadChanges(ctx)
Expand All @@ -92,6 +93,11 @@ func NewEvents(ctx context.Context, api eventAPI) *Events {
return e
}

func NewEvents(ctx context.Context, api eventAPI) *Events {
gcConfidence := 2 * build.ForkLengthThreshold
return NewEventsWithConfidence(ctx, api, gcConfidence)
}

func (e *Events) listenHeadChanges(ctx context.Context) {
for {
if err := e.listenHeadChangesOnce(ctx); err != nil {
Expand Down Expand Up @@ -164,7 +170,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
}
}

if err := e.headChange(rev, app); err != nil {
if err := e.headChange(ctx, rev, app); err != nil {
log.Warnf("headChange failed: %s", err)
}

Expand All @@ -177,7 +183,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
return nil
}

func (e *Events) headChange(rev, app []*types.TipSet) error {
func (e *Events) headChange(ctx context.Context, rev, app []*types.TipSet) error {
if len(app) == 0 {
return xerrors.New("events.headChange expected at least one applied tipset")
}
Expand All @@ -189,5 +195,39 @@ func (e *Events) headChange(rev, app []*types.TipSet) error {
return err
}

if err := e.observeChanges(ctx, rev, app); err != nil {
return err
}
return e.processHeadChangeEvent(rev, app)
}

// A TipSetObserver receives notifications of tipsets
type TipSetObserver interface {
Apply(ctx context.Context, ts *types.TipSet) error
Revert(ctx context.Context, ts *types.TipSet) error
}

// TODO: add a confidence level so we can have observers with difference levels of confidence
func (e *Events) Observe(obs TipSetObserver) error {
e.lk.Lock()
defer e.lk.Unlock()
e.observers = append(e.observers, obs)
return nil
}

// observeChanges expects caller to hold e.lk
func (e *Events) observeChanges(ctx context.Context, rev, app []*types.TipSet) error {
for _, ts := range rev {
for _, o := range e.observers {
_ = o.Revert(ctx, ts)
}
}

for _, ts := range app {
for _, o := range e.observers {
_ = o.Apply(ctx, ts)
}
}

return nil
}
16 changes: 16 additions & 0 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,3 +729,19 @@ func Test() Option {
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})),
)
}

// For 3rd party dep injection.

func WithRepoType(repoType repo.RepoType) func(s *Settings) error {
return func(s *Settings) error {
s.nodeType = repoType
return nil
}
}

func WithInvokesKey(i invoke, resApi interface{}) func(s *Settings) error {
return func(s *Settings) error {
s.invokes[i] = fx.Populate(resApi)
return nil
}
}