Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get rid of mutex in matching/liveness and reduce test duration #5917

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 28 additions & 42 deletions service/matching/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,90 +36,76 @@ type (
status int32
timeSource clock.TimeSource
ttl time.Duration
// internal shutdown channel
shutdownChan chan struct{}

// stopCh is used to signal the liveness to stop
stopCh chan struct{}
// wg is used to wait for the liveness to stop
wg sync.WaitGroup

// broadcast shutdown functions
broadcastShutdownFn func()

sync.Mutex
lastEventTime time.Time
lastEventTimeNano int64
}
)

var _ common.Daemon = (*liveness)(nil)

func newLiveness(
timeSource clock.TimeSource,
ttl time.Duration,
broadcastShutdownFn func(),
) *liveness {
func newLiveness(timeSource clock.TimeSource, ttl time.Duration, broadcastShutdownFn func()) *liveness {
return &liveness{
status: common.DaemonStatusInitialized,
timeSource: timeSource,
ttl: ttl,
shutdownChan: make(chan struct{}),

status: common.DaemonStatusInitialized,
timeSource: timeSource,
ttl: ttl,
stopCh: make(chan struct{}),
broadcastShutdownFn: broadcastShutdownFn,

lastEventTime: timeSource.Now().UTC(),
lastEventTimeNano: timeSource.Now().UnixNano(),
}
}

func (l *liveness) Start() {
if !atomic.CompareAndSwapInt32(
&l.status,
common.DaemonStatusInitialized,
common.DaemonStatusStarted,
) {
if !atomic.CompareAndSwapInt32(&l.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}

l.wg.Add(1)
go l.eventLoop()
}

func (l *liveness) Stop() {
if !atomic.CompareAndSwapInt32(
&l.status,
common.DaemonStatusStarted,
common.DaemonStatusStopped,
) {
if !atomic.CompareAndSwapInt32(&l.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}

close(l.shutdownChan)
close(l.stopCh)
l.broadcastShutdownFn()
l.wg.Wait()
taylanisikdemir marked this conversation as resolved.
Show resolved Hide resolved
}

func (l *liveness) eventLoop() {
ttlTimer := time.NewTicker(l.ttl)
defer ttlTimer.Stop()
defer l.wg.Done()
checkTimer := time.NewTicker(l.ttl / 2)
defer checkTimer.Stop()

for {
select {
case <-ttlTimer.C:
case <-checkTimer.C:
if !l.isAlive() {
l.Stop()
}

case <-l.shutdownChan:
case <-l.stopCh:
return
}
}
}

func (l *liveness) isAlive() bool {
l.Lock()
defer l.Unlock()
return l.lastEventTime.Add(l.ttl).After(l.timeSource.Now())
now := l.timeSource.Now().UnixNano()
lastUpdate := atomic.LoadInt64(&l.lastEventTimeNano)
return now-lastUpdate < l.ttl.Nanoseconds()
}

func (l *liveness) markAlive(
now time.Time,
) {
l.Lock()
defer l.Unlock()
if l.lastEventTime.Before(now) {
l.lastEventTime = now.UTC()
}
func (l *liveness) markAlive() {
now := l.timeSource.Now().UnixNano()
atomic.StoreInt64(&l.lastEventTimeNano, now)
}
74 changes: 30 additions & 44 deletions service/matching/liveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type (
timeSource clock.MockedTimeSource
ttl time.Duration
shutdownFlag int32
liveness *liveness
}
)

Expand All @@ -49,75 +50,60 @@ func TestLivenessSuite(t *testing.T) {
suite.Run(t, s)
}

func (s *livenessSuite) SetupSuite() {
}

func (s *livenessSuite) TearDownSuite() {
}

func (s *livenessSuite) SetupTest() {
s.Assertions = require.New(s.T())

s.ttl = 2 * time.Second
s.ttl = 500 * time.Millisecond
s.timeSource = clock.NewMockedTimeSource()

s.shutdownFlag = 0
}

func (s *livenessSuite) TearDownTest() {

s.liveness = newLiveness(s.timeSource, s.ttl, func() { atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1) })
}

func (s *livenessSuite) TestIsAlive_No() {
liveness := newLiveness(s.timeSource, s.ttl, func() { atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1) })
s.timeSource.Advance(s.ttl * 2)
alive := liveness.isAlive()
s.timeSource.Advance(s.ttl)
alive := s.liveness.isAlive()
s.False(alive)
}

func (s *livenessSuite) TestIsAlive_Yes() {
liveness := newLiveness(s.timeSource, s.ttl, func() { atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1) })
s.timeSource.Advance(s.ttl / 2)
alive := liveness.isAlive()
alive := s.liveness.isAlive()
s.True(alive)
}

func (s *livenessSuite) TestMarkAlive_Noop() {
liveness := newLiveness(s.timeSource, s.ttl, func() { atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1) })
lastEventTime := liveness.lastEventTime
newEventTime := s.timeSource.Now().Add(-1)
liveness.markAlive(newEventTime)
s.True(lastEventTime.Equal(liveness.lastEventTime))
lastEventTime := s.liveness.lastEventTimeNano
// not advanding time so markAlive will be a noop
s.liveness.markAlive()
s.Equal(lastEventTime, s.liveness.lastEventTimeNano)
}

func (s *livenessSuite) TestMarkAlive_Updated() {
liveness := newLiveness(s.timeSource, s.ttl, func() { atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1) })
newEventTime := s.timeSource.Now().Add(1)
liveness.markAlive(newEventTime)
s.True(newEventTime.Equal(liveness.lastEventTime))
s.timeSource.Advance(time.Duration(1))
s.liveness.markAlive()
s.Equal(s.timeSource.Now().UnixNano(), s.liveness.lastEventTimeNano)
}

func (s *livenessSuite) TestEventLoop_Noop() {
liveness := newLiveness(s.timeSource, s.ttl, func() { atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1) })
liveness.Start()

s.timeSource.Advance(s.ttl * 4)
liveness.markAlive(s.timeSource.Now())

timer := time.NewTimer(s.ttl * 2)
select {
case <-liveness.shutdownChan:
s.Fail("should not shutdown")
case <-timer.C:
s.Equal(int32(0), atomic.LoadInt32(&s.shutdownFlag))
}
s.liveness.Start()
defer s.liveness.Stop()

// advance time ttl/2 and mark alive
s.timeSource.Advance(s.ttl / 2)
s.liveness.markAlive()
s.True(s.liveness.isAlive())

// advance time ttl/2 more and validate still alive
s.timeSource.Advance(s.ttl / 2)
time.Sleep(100 * time.Millisecond) // give event loop time to run
s.True(s.liveness.isAlive())
s.Equal(int32(0), atomic.LoadInt32(&s.shutdownFlag))
}

func (s *livenessSuite) TestEventLoop_Shutdown() {
liveness := newLiveness(s.timeSource, s.ttl, func() { atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1) })
liveness.Start()
s.liveness.Start()
defer s.liveness.Stop()

s.timeSource.Advance(s.ttl * 4)
<-liveness.shutdownChan
s.timeSource.Advance(s.ttl)
<-s.liveness.stopCh
s.Equal(int32(1), atomic.LoadInt32(&s.shutdownFlag))
}
11 changes: 8 additions & 3 deletions service/matching/taskListManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,12 @@ func newTaskListManager(
taskListTypeMetricScope.UpdateGauge(metrics.PollerPerTaskListCounter,
float64(len(tlMgr.pollerHistory.getPollerInfo(time.Time{}))))
})
tlMgr.liveness = newLiveness(clock.NewRealTimeSource(), taskListConfig.IdleTasklistCheckInterval(), tlMgr.Stop)

livenessInterval := taskListConfig.IdleTasklistCheckInterval()
tlMgr.liveness = newLiveness(clock.NewRealTimeSource(), livenessInterval, func() {
tlMgr.logger.Info("Task list manager stopping because no recent events", tag.Dynamic("interval", livenessInterval))
tlMgr.Stop()
})
var isolationGroups []string
if tlMgr.isIsolationMatcherEnabled() {
isolationGroups = config.AllIsolationGroups
Expand Down Expand Up @@ -261,7 +266,7 @@ func (c *taskListManagerImpl) AddTask(ctx context.Context, params addTaskParams)
}
if params.forwardedFrom == "" {
// request sent by history service
c.liveness.markAlive(time.Now())
c.liveness.markAlive()
}
var syncMatch bool
_, err := c.executeWithRetry(func() (interface{}, error) {
Expand Down Expand Up @@ -347,7 +352,7 @@ func (c *taskListManagerImpl) GetTask(
c.Stop()
return nil, ErrNoTasks
}
c.liveness.markAlive(time.Now())
c.liveness.markAlive()
task, err := c.getTask(ctx, maxDispatchPerSecond)
if err != nil {
return nil, fmt.Errorf("couldn't get task: %w", err)
Expand Down
Loading