Skip to content

Commit

Permalink
feat(events): define Observer intreface for events
Browse files Browse the repository at this point in the history
- allows tipset apply and revert to be observed
  • Loading branch information
frrist committed Mar 3, 2021
1 parent c9c26ee commit 3d68abc
Showing 1 changed file with 47 additions and 7 deletions.
54 changes: 47 additions & 7 deletions chain/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ type Events struct {

heightEvents
*hcEvents
}

func NewEvents(ctx context.Context, api eventAPI) *Events {
gcConfidence := 2 * build.ForkLengthThreshold
observers []TipSetObserver
}

func NewEventsWithConfidence(ctx context.Context, api eventAPI, gcConfidence abi.ChainEpoch) *Events {
tsc := newTSCache(gcConfidence, api)

e := &Events{
Expand All @@ -77,8 +77,9 @@ func NewEvents(ctx context.Context, api eventAPI) *Events {
htHeights: map[abi.ChainEpoch][]uint64{},
},

hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}),
hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}),
observers: []TipSetObserver{},
}

go e.listenHeadChanges(ctx)
Expand All @@ -92,6 +93,11 @@ func NewEvents(ctx context.Context, api eventAPI) *Events {
return e
}

func NewEvents(ctx context.Context, api eventAPI) *Events {
gcConfidence := 2 * build.ForkLengthThreshold
return NewEventsWithConfidence(ctx, api, gcConfidence)
}

func (e *Events) listenHeadChanges(ctx context.Context) {
for {
if err := e.listenHeadChangesOnce(ctx); err != nil {
Expand Down Expand Up @@ -164,7 +170,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
}
}

if err := e.headChange(rev, app); err != nil {
if err := e.headChange(ctx, rev, app); err != nil {
log.Warnf("headChange failed: %s", err)
}

Expand All @@ -177,7 +183,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
return nil
}

func (e *Events) headChange(rev, app []*types.TipSet) error {
func (e *Events) headChange(ctx context.Context, rev, app []*types.TipSet) error {
if len(app) == 0 {
return xerrors.New("events.headChange expected at least one applied tipset")
}
Expand All @@ -189,5 +195,39 @@ func (e *Events) headChange(rev, app []*types.TipSet) error {
return err
}

if err := e.observeChanges(ctx, rev, app); err != nil {
return err
}
return e.processHeadChangeEvent(rev, app)
}

// A TipSetObserver receives notifications of tipsets
type TipSetObserver interface {
Apply(ctx context.Context, ts *types.TipSet) error
Revert(ctx context.Context, ts *types.TipSet) error
}

// TODO: add a confidence level so we can have observers with difference levels of confidence
func (e *Events) Observe(obs TipSetObserver) error {
e.lk.Lock()
defer e.lk.Unlock()
e.observers = append(e.observers, obs)
return nil
}

// observeChanges expects caller to hold e.lk
func (e *Events) observeChanges(ctx context.Context, rev, app []*types.TipSet) error {
for _, ts := range rev {
for _, o := range e.observers {
_ = o.Revert(ctx, ts)
}
}

for _, ts := range app {
for _, o := range e.observers {
_ = o.Apply(ctx, ts)
}
}

return nil
}

0 comments on commit 3d68abc

Please sign in to comment.