Skip to content

Commit

Permalink
Fix rueidislock highload (#604)
Browse files Browse the repository at this point in the history
* fix: remove the busy retry bug of rueidislock.WithContext

Signed-off-by: Rueian <[email protected]>

* fix: allow ":" in the custom rueidislock.LockerOption.KeyPrefix

Signed-off-by: Rueian <[email protected]>

* fix: possible deadlocks and panics when calling rueidislock.Close

Signed-off-by: Rueian <[email protected]>

---------

Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian authored Aug 10, 2024
1 parent 433debc commit d910fc6
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 24 deletions.
101 changes: 78 additions & 23 deletions rueidislock/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func NewLocker(option LockerOption) (Locker, error) {

if option.ClientOption.DisableCache {
impl.noloop = true
impl.nocsc = true
} else {
if option.NoLoopTracking {
option.ClientOption.ClientTrackingOptions = []string{"OPTOUT", "NOLOOP"}
Expand Down Expand Up @@ -115,6 +116,7 @@ type locker struct {
majority int32
totalcnt int32
noloop bool
nocsc bool
setpx bool
}

Expand Down Expand Up @@ -198,6 +200,10 @@ func (m *locker) waitgate(ctx context.Context, name string) (g *gate, err error)
g.w++
m.mu.Unlock()
}
var timeout <-chan time.Time
if m.nocsc {
timeout = time.After(m.timeout)
}
select {
case <-ctx.Done():
m.mu.Lock()
Expand All @@ -211,6 +217,8 @@ func (m *locker) waitgate(ctx context.Context, name string) (g *gate, err error)
return g, nil
}
return nil, ErrLockerClosed
case <-timeout:
return g, nil
}
}

Expand Down Expand Up @@ -248,22 +256,32 @@ func (m *locker) onInvalidations(messages []rueidis.RedisMessage) {
default:
}
}
select {
case g.ch <- struct{}{}:
default:
}
}
m.mu.RUnlock()
}
for _, msg := range messages {
k, _ := msg.ToString()
if ks := strings.SplitN(k, ":", 3); len(ks) == 3 {
m.mu.RLock()
g, ok := m.gates[ks[2]]
if ok {
n, _ := strconv.Atoi(ks[1])
select {
case g.csc[n] <- struct{}{}:
default:
if strings.HasPrefix(k, m.prefix) {
if ks := strings.SplitN(k[len(m.prefix)+1:], ":", 2); len(ks) == 2 {
m.mu.RLock()
g, ok := m.gates[ks[1]]
if ok {
n, _ := strconv.Atoi(ks[0])
select {
case g.csc[n] <- struct{}{}:
default:
}
select {
case g.ch <- struct{}{}:
default:
}
}
m.mu.RUnlock()
}
m.mu.RUnlock()
}
}
}
Expand All @@ -275,6 +293,8 @@ func (m *locker) try(ctx context.Context, cancel context.CancelFunc, name string
deadline := time.Now().Add(m.validity)
cacneltm := time.AfterFunc(m.validity, cancel)
released := int32(0)
acquired := int32(0)
failures := int32(0)

done := make(chan struct{})
monitoring := func(err error, key string, deadline time.Time, csc chan struct{}) {
Expand All @@ -291,10 +311,14 @@ func (m *locker) try(ctx context.Context, cancel context.CancelFunc, name string
<-csc
}
}
case <-csc:
if err = m.script(ctx, extend, key, val, deadline); err == nil {
if !m.noloop {
<-csc
case _, ok := <-csc:
if !ok {
err = ErrLockerClosed
} else {
if err = m.script(ctx, extend, key, val, deadline); err == nil {
if !m.noloop {
<-csc
}
}
}
}
Expand All @@ -305,8 +329,7 @@ func (m *locker) try(ctx context.Context, cancel context.CancelFunc, name string
}
if released := atomic.AddInt32(&released, 1); released >= m.majority {
cancel()
if released == m.totalcnt {
close(done)
if released == m.totalcnt && atomic.LoadInt32(&failures) < m.majority {
m.mu.Lock()
if g.w--; g.w == 0 {
if m.gates[name] == g {
Expand All @@ -320,6 +343,9 @@ func (m *locker) try(ctx context.Context, cancel context.CancelFunc, name string
}
m.mu.Unlock()
}
if released == m.totalcnt {
close(done)
}
}
}

Expand All @@ -330,22 +356,26 @@ func (m *locker) try(ctx context.Context, cancel context.CancelFunc, name string
}
if err != ErrNotLocked {
if err = m.acquire(ctx, key, val, deadline, force); force && err == nil {
select {
case ch <- struct{}{}:
default:
m.mu.RLock()
if m.gates != nil {
select {
case ch <- struct{}{}:
default:
}
}
m.mu.RUnlock()
}
}
go monitoring(err, key, deadline, ch)
return err
}

var i, acquired, failures int32
for ; acquired < m.majority && failures < m.majority; i++ {
var i int32
for ; atomic.LoadInt32(&acquired) < m.majority && atomic.LoadInt32(&failures) < m.majority; i++ {
if err = acquire(err, keyname(m.prefix, name, i), g.csc[i], force); err == nil {
acquired++
atomic.AddInt32(&acquired, 1)
} else {
failures++
atomic.AddInt32(&failures, 1)
}
}
if i < m.totalcnt {
Expand All @@ -355,12 +385,13 @@ func (m *locker) try(ctx context.Context, cancel context.CancelFunc, name string
}
}(i, err)
}
if cacneltm.Stop() && failures < m.majority {
if cacneltm.Stop() && atomic.LoadInt32(&failures) < m.majority {
return func() {
cancel()
<-done
}
}
<-done
return nil
}

Expand All @@ -370,6 +401,13 @@ func (m *locker) ForceWithContext(ctx context.Context, name string) (context.Con
if cancel := m.try(ctx, cancel, name, g, true); cancel != nil {
return ctx, cancel, nil
}
m.mu.Lock()
if g.w--; g.w == 0 {
if m.gates[name] == g {
delete(m.gates, name)
}
}
m.mu.Unlock()
}
cancel()
return ctx, cancel, ErrNotLocked
Expand All @@ -381,6 +419,13 @@ func (m *locker) TryWithContext(ctx context.Context, name string) (context.Conte
if cancel := m.try(ctx, cancel, name, g, false); cancel != nil {
return ctx, cancel, nil
}
m.mu.Lock()
if g.w--; g.w == 0 {
if m.gates[name] == g {
delete(m.gates, name)
}
}
m.mu.Unlock()
}
cancel()
return ctx, cancel, ErrNotLocked
Expand All @@ -394,6 +439,13 @@ func (m *locker) WithContext(ctx context.Context, name string) (context.Context,
if cancel := m.try(ctx, cancel, name, g, false); cancel != nil {
return ctx, cancel, nil
}
m.mu.Lock()
if g.w--; g.w == 0 && err != nil { // delete g from m.gates only when exiting with an error.
if m.gates[name] == g {
delete(m.gates, name)
}
}
m.mu.Unlock()
}
if cancel(); err != nil {
return ctx, cancel, err
Expand All @@ -408,6 +460,9 @@ func (m *locker) Client() rueidis.Client {
func (m *locker) Close() {
m.mu.Lock()
for _, g := range m.gates {
for _, ch := range g.csc {
close(ch)
}
close(g.ch)
}
m.gates = nil
Expand Down
2 changes: 1 addition & 1 deletion rueidislock/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ func TestLocker_Close(t *testing.T) {
t.Fatal(err)
}
if _, _, err := locker.WithContext(context.Background(), lck); err != ErrLockerClosed {
t.Error(err)
t.Fatal(err)
}
}
for _, nocsc := range []bool{false, true} {
Expand Down

0 comments on commit d910fc6

Please sign in to comment.