Skip to content

Commit

Permalink
fix: single slot DoMulti writes should be retried on MOVED / ASK (#697)
Browse files Browse the repository at this point in the history
Co-authored-by: wuyuxiang <[email protected]>
  • Loading branch information
wyxloading and wuyuxiang authored Dec 13, 2024
1 parent 2246f91 commit 383a2a1
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 55 deletions.
84 changes: 29 additions & 55 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,35 +555,54 @@ func (c *clusterClient) _pickMulti(multi []Completed) (retries *connretry, last
return retries, last, toReplica
}

inits := 0
for _, cmd := range multi {
if cmd.Slot() == cmds.InitSlot {
inits++
continue
}
if last == cmds.InitSlot {
last = cmd.Slot()
} else if init && last != cmd.Slot() {
panic(panicMixCxSlot)
}
p := c.pslots[cmd.Slot()]
if p == nil {
return nil, 0, false
}
count.m[p]++
}

if last == cmds.InitSlot {
// if all commands have no slots, such as INFO, we pick a non-nil slot.
for i, p := range c.pslots {
if p != nil {
last = uint16(i)
count.m[p] = inits
break
}
}
if last == cmds.InitSlot {
return nil, 0, false
}
}

retries = connretryp.Get(len(count.m), len(count.m))
for cc, n := range count.m {
retries.m[cc] = retryp.Get(0, n)
}
conncountp.Put(count)

for i, cmd := range multi {
var cc conn
if cmd.Slot() != cmds.InitSlot {
if last == cmds.InitSlot {
last = cmd.Slot()
} else if init && last != cmd.Slot() {
panic(panicMixCxSlot)
}
cc := c.pslots[cmd.Slot()]
re := retries.m[cc]
re.commands = append(re.commands, cmd)
re.cIndexes = append(re.cIndexes, i)
cc = c.pslots[cmd.Slot()]
} else {
cc = c.pslots[last]
}
re := retries.m[cc]
re.commands = append(re.commands, cmd)
re.cIndexes = append(re.cIndexes, i)
}
return retries, last, false
}
Expand Down Expand Up @@ -669,19 +688,12 @@ func (c *clusterClient) DoMulti(ctx context.Context, multi ...Completed) []Redis
return nil
}

retries, slot, toReplica, err := c.pickMulti(ctx, multi)
retries, _, _, err := c.pickMulti(ctx, multi)
if err != nil {
return fillErrs(len(multi), err)
}
defer connretryp.Put(retries)

if len(retries.m) <= 1 {
for _, re := range retries.m {
retryp.Put(re)
}
return c.doMulti(ctx, slot, multi, toReplica)
}

var wg sync.WaitGroup
var mu sync.Mutex

Expand Down Expand Up @@ -730,44 +742,6 @@ func fillErrs(n int, err error) (results []RedisResult) {
return results
}

func (c *clusterClient) doMulti(ctx context.Context, slot uint16, multi []Completed, toReplica bool) []RedisResult {
attempts := 1

retry:
cc, err := c.pick(ctx, slot, toReplica)
if err != nil {
return fillErrs(len(multi), err)
}
resps := cc.DoMulti(ctx, multi...)
process:
for i, resp := range resps.s {
switch addr, mode := c.shouldRefreshRetry(resp.Error(), ctx); mode {
case RedirectMove:
if c.retry && allReadOnly(multi) {
resultsp.Put(resps)
resps = c.redirectOrNew(addr, cc, multi[i].Slot(), mode).DoMulti(ctx, multi...)
goto process
}
case RedirectAsk:
if c.retry && allReadOnly(multi) {
resultsp.Put(resps)
resps = askingMulti(c.redirectOrNew(addr, cc, multi[i].Slot(), mode), ctx, multi)
goto process
}
case RedirectRetry:
if c.retry && allReadOnly(multi) {
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, multi[i], resp.Error())
if shouldRetry {
resultsp.Put(resps)
attempts++
goto retry
}
}
}
}
return resps.s
}

func (c *clusterClient) doCache(ctx context.Context, cmd Cacheable, ttl time.Duration) (resp RedisResult) {
attempts := 1

Expand Down
69 changes: 69 additions & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5239,3 +5239,72 @@ func TestClusterClientLoadingRetry(t *testing.T) {
}
})
}

func TestClusterClientMovedRetry(t *testing.T) {
defer ShouldNotLeaked(SetupLeakDetection())

setup := func() (*clusterClient, *mockConn) {
m := &mockConn{
DoFn: func(cmd Completed) RedisResult {
if strings.Join(cmd.Commands(), " ") == "CLUSTER SLOTS" {
return slotsMultiResp
}
return RedisResult{}
},
}
client, err := newClusterClient(
&ClientOption{InitAddress: []string{":0"}},
func(dst string, opt *ClientOption) conn { return m },
newRetryer(defaultRetryDelayFn),
)
if err != nil {
t.Fatalf("unexpected err %v", err)
}
return client, m
}

t.Run("DoMulti Retry on MOVED", func(t *testing.T) {
client, m := setup()

attempts := 0
m.DoMultiFn = func(multi ...Completed) *redisresults {
attempts++
if attempts == 1 {
return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '-', string: "MOVED 0 127.0.0.1"}, nil)}}
}
return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '+', string: "OK"}, nil)}}
}

cmd := client.B().Set().Key("test").Value(`test`).Build()
resps := client.DoMulti(context.Background(), cmd)
if len(resps) != 1 {
t.Fatalf("unexpected response length %v", len(resps))
}
if v, err := resps[0].ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
})

t.Run("DoMulti Retry on ASK", func(t *testing.T) {
client, m := setup()

attempts := 0
m.DoMultiFn = func(multi ...Completed) *redisresults {
attempts++
if attempts == 1 {
return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '-', string: "ASK 0 127.0.0.1"}, nil)}}
}
return &redisresults{s: []RedisResult{newResult(RedisMessage{typ: '+', string: "OK"}, nil), newResult(RedisMessage{typ: '+', string: "OK"}, nil)}}
}

cmd := client.B().Set().Key("test").Value(`test`).Build()
resps := client.DoMulti(context.Background(), cmd)
if len(resps) != 1 {
t.Fatalf("unexpected response length %v", len(resps))
}
if v, err := resps[0].ToString(); err != nil || v != "OK" {
t.Fatalf("unexpected response %v %v", v, err)
}
})

}

0 comments on commit 383a2a1

Please sign in to comment.