From 3ae06e3992a5f5b5114c866deed472f557a0d6f5 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 10 Aug 2021 23:53:57 -0700 Subject: [PATCH] fix: atomically get head when registering an observer This lets us always call check (accurately). --- chain/events/events.go | 10 ++---- chain/events/events_called.go | 25 ++++++-------- chain/events/events_height.go | 21 ++++-------- chain/events/events_test.go | 4 --- chain/events/observer.go | 63 +++++++++++++++++++++++------------ 5 files changed, 61 insertions(+), 62 deletions(-) diff --git a/chain/events/events.go b/chain/events/events.go index 1e39d364666..5c494fcb05c 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -50,17 +50,13 @@ func NewEventsWithConfidence(ctx context.Context, api EventAPI, gcConfidence abi cache := newCache(api, gcConfidence) ob := newObserver(cache, gcConfidence) - he := newHeightEvents(cache, gcConfidence) - headChange := newHCEvents(cache) - - // Cache first. Observers are ordered and we always want to fill the cache first. - ob.Observe(cache.observer()) - ob.Observe(he.observer()) - ob.Observe(headChange.observer()) if err := ob.start(ctx); err != nil { return nil, err } + he := newHeightEvents(cache, ob, gcConfidence) + headChange := newHCEvents(cache, ob) + return &Events{ob, he, headChange}, nil } diff --git a/chain/events/events_called.go b/chain/events/events_called.go index 091b4b31a66..026ad8a4e22 100644 --- a/chain/events/events_called.go +++ b/chain/events/events_called.go @@ -69,10 +69,9 @@ type queuedEvent struct { type hcEvents struct { cs EventAPI + lk sync.Mutex lastTs *types.TipSet - lk sync.Mutex - ctr triggerID // TODO: get rid of trigger IDs and just use pointers as keys. @@ -93,7 +92,7 @@ type hcEvents struct { watcherEvents } -func newHCEvents(api EventAPI) *hcEvents { +func newHCEvents(api EventAPI, obs *observer) *hcEvents { e := &hcEvents{ cs: api, confQueue: map[triggerH]map[msgH][]*queuedEvent{}, @@ -105,15 +104,16 @@ func newHCEvents(api EventAPI) *hcEvents { e.messageEvents = newMessageEvents(e, api) e.watcherEvents = newWatcherEvents(e, api) + // We need to take the lock as the observer could immediately try calling us. + e.lk.Lock() + e.lastTs = obs.Observe((*hcEventsObserver)(e)) + e.lk.Unlock() + return e } type hcEventsObserver hcEvents -func (e *hcEvents) observer() TipSetObserver { - return (*hcEventsObserver)(e) -} - func (e *hcEventsObserver) Apply(ctx context.Context, from, to *types.TipSet) error { e.lk.Lock() defer e.lk.Unlock() @@ -284,14 +284,9 @@ func (e *hcEvents) onHeadChanged(ctx context.Context, check CheckFunc, hnd Event defer e.lk.Unlock() // Check if the event has already occurred - more := true - done := false - if e.lastTs != nil { - var err error - done, more, err = check(ctx, e.lastTs) - if err != nil { - return 0, xerrors.Errorf("called check error (h: %d): %w", e.lastTs.Height(), err) - } + done, more, err := check(ctx, e.lastTs) + if err != nil { + return 0, xerrors.Errorf("called check error (h: %d): %w", e.lastTs.Height(), err) } if done { timeout = NoTimeout diff --git a/chain/events/events_height.go b/chain/events/events_height.go index 02c252bc998..73df04be6fc 100644 --- a/chain/events/events_height.go +++ b/chain/events/events_height.go @@ -30,13 +30,17 @@ type heightEvents struct { lastGc abi.ChainEpoch //nolint:structcheck } -func newHeightEvents(api EventAPI, gcConfidence abi.ChainEpoch) *heightEvents { - return &heightEvents{ +func newHeightEvents(api EventAPI, obs *observer, gcConfidence abi.ChainEpoch) *heightEvents { + he := &heightEvents{ api: api, gcConfidence: gcConfidence, tsHeights: map[abi.ChainEpoch][]*heightHandler{}, triggerHeights: map[abi.ChainEpoch][]*heightHandler{}, } + he.lk.Lock() + he.head = obs.Observe((*heightEventsObserver)(he)) + he.lk.Unlock() + return he } // ChainAt invokes the specified `HeightHandler` when the chain reaches the @@ -69,15 +73,6 @@ func (e *heightEvents) ChainAt(ctx context.Context, hnd HeightHandler, rev Rever e.lk.Lock() for { head := e.head - - // If we haven't initialized yet, store the trigger and move on. - if head == nil { - e.triggerHeights[triggerAt] = append(e.triggerHeights[triggerAt], handler) - e.tsHeights[h] = append(e.tsHeights[h], handler) - e.lk.Unlock() - return nil - } - if head.Height() >= h { // Head is past the handler height. We at least need to stash the tipset to // avoid doing this from the main event loop. @@ -152,10 +147,6 @@ func (e *heightEvents) ChainAt(ctx context.Context, hnd HeightHandler, rev Rever } } -func (e *heightEvents) observer() TipSetObserver { - return (*heightEventsObserver)(e) -} - // Updates the head and garbage collects if we're 2x over our garbage collection confidence period. func (e *heightEventsObserver) updateHead(h *types.TipSet) { e.lk.Lock() diff --git a/chain/events/events_test.go b/chain/events/events_test.go index a0c09424407..0536b5ebbc9 100644 --- a/chain/events/events_test.go +++ b/chain/events/events_test.go @@ -875,8 +875,6 @@ func TestCalledTimeout(t *testing.T) { events, err = NewEvents(context.Background(), fcs) require.NoError(t, err) - fcs.advance(0, 1, 0, nil) - err = events.Called(context.Background(), func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) { return true, true, nil }, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) { @@ -1298,8 +1296,6 @@ func TestStateChangedTimeout(t *testing.T) { events, err = NewEvents(context.Background(), fcs) require.NoError(t, err) - fcs.advance(0, 1, 0, nil) - err = events.StateChanged(func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) { return true, true, nil }, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { diff --git a/chain/events/observer.go b/chain/events/observer.go index 52fc1de2546..c67d821b568 100644 --- a/chain/events/observer.go +++ b/chain/events/observer.go @@ -18,25 +18,26 @@ import ( type observer struct { api EventAPI - lk sync.Mutex gcConfidence abi.ChainEpoch ready chan struct{} + lk sync.Mutex head *types.TipSet maxHeight abi.ChainEpoch - observers []TipSetObserver } -func newObserver(api EventAPI, gcConfidence abi.ChainEpoch) *observer { - return &observer{ +func newObserver(api *cache, gcConfidence abi.ChainEpoch) *observer { + obs := &observer{ api: api, gcConfidence: gcConfidence, ready: make(chan struct{}), observers: []TipSetObserver{}, } + obs.Observe(api.observer()) + return obs } func (o *observer) start(ctx context.Context) error { @@ -100,12 +101,18 @@ func (o *observer) listenHeadChangesOnce(ctx context.Context) error { return xerrors.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type) } - head := cur[0].Val + curHead := cur[0].Val + + o.lk.Lock() if o.head == nil { - o.head = head + o.head = curHead close(o.ready) - } else if !o.head.Equals(head) { - changes, err := o.api.ChainGetPath(ctx, o.head.Key(), head.Key()) + } + startHead := o.head + o.lk.Unlock() + + if !startHead.Equals(curHead) { + changes, err := o.api.ChainGetPath(ctx, startHead.Key(), curHead.Key()) if err != nil { return xerrors.Errorf("failed to get path from last applied tipset to head: %w", err) } @@ -152,18 +159,23 @@ func (o *observer) headChange(ctx context.Context, rev, app []*types.TipSet) err ctx, span := trace.StartSpan(ctx, "events.HeadChange") span.AddAttributes(trace.Int64Attribute("reverts", int64(len(rev)))) span.AddAttributes(trace.Int64Attribute("applies", int64(len(app)))) + + o.lk.Lock() + head := o.head + o.lk.Unlock() + defer func() { - span.AddAttributes(trace.Int64Attribute("endHeight", int64(o.head.Height()))) + span.AddAttributes(trace.Int64Attribute("endHeight", int64(head.Height()))) span.End() }() // NOTE: bailing out here if the head isn't what we expected is fine. We'll re-start the // entire process and handle any strange reorgs. for i, from := range rev { - if !from.Equals(o.head) { + if !from.Equals(head) { return xerrors.Errorf( "expected to revert %s (%d), reverting %s (%d)", - o.head.Key(), o.head.Height(), from.Key(), from.Height(), + head.Key(), head.Height(), from.Key(), from.Height(), ) } var to *types.TipSet @@ -171,7 +183,7 @@ func (o *observer) headChange(ctx context.Context, rev, app []*types.TipSet) err // If we have more reverts, the next revert is the next head. to = rev[i+1] } else { - // At the end of the revert sequenece, we need to looup the joint tipset + // At the end of the revert sequenece, we need to lookup the joint tipset // between the revert sequence and the apply sequence. var err error to, err = o.api.ChainGetTipSet(ctx, from.Parents()) @@ -181,9 +193,14 @@ func (o *observer) headChange(ctx context.Context, rev, app []*types.TipSet) err } } - // Get the observers late in case an observer registers/unregisters itself. + // Get the current observers and atomically set the head. + // + // 1. We need to get the observers every time in case some registered/deregistered. + // 2. We need to atomically set the head so new observers don't see events twice or + // skip them. o.lk.Lock() observers := o.observers + o.head = to o.lk.Unlock() for _, obs := range observers { @@ -196,39 +213,43 @@ func (o *observer) headChange(ctx context.Context, rev, app []*types.TipSet) err log.Errorf("reverted past finality, from %d to %d", o.maxHeight, to.Height()) } - o.head = to + head = to } for _, to := range app { - if to.Parents() != o.head.Key() { + if to.Parents() != head.Key() { return xerrors.Errorf( "cannot apply %s (%d) with parents %s on top of %s (%d)", - to.Key(), to.Height(), to.Parents(), o.head.Key(), o.head.Height(), + to.Key(), to.Height(), to.Parents(), head.Key(), head.Height(), ) } - // Get the observers late in case an observer registers/unregisters itself. o.lk.Lock() observers := o.observers + o.head = to o.lk.Unlock() for _, obs := range observers { - if err := obs.Apply(ctx, o.head, to); err != nil { + if err := obs.Apply(ctx, head, to); err != nil { log.Errorf("observer %T failed to revert tipset %s (%d) with: %s", obs, to.Key(), to.Height(), err) } } - o.head = to if to.Height() > o.maxHeight { o.maxHeight = to.Height() } + head = to } return nil } -// TODO: add a confidence level so we can have observers with difference levels of confidence -func (o *observer) Observe(obs TipSetObserver) { +// Observe registers the observer, and returns the current tipset. The observer is guaranteed to +// observe events starting at this tipset. +// +// Returns nil if the observer hasn't started yet (but still registers). +func (o *observer) Observe(obs TipSetObserver) *types.TipSet { o.lk.Lock() defer o.lk.Unlock() o.observers = append(o.observers, obs) + return o.head }