Skip to content

Commit

Permalink
feat: add TxPipeline to rueidiscompat
Browse files Browse the repository at this point in the history
Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Aug 10, 2024
1 parent 7ae7359 commit 8f822c2
Show file tree
Hide file tree
Showing 4 changed files with 345 additions and 22 deletions.
34 changes: 30 additions & 4 deletions rueidiscompat/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,17 @@ const (
)

type Cmdable interface {
CoreCmdable
Cache(ttl time.Duration) CacheCompat

Subscribe(ctx context.Context, channels ...string) PubSub
PSubscribe(ctx context.Context, patterns ...string) PubSub
SSubscribe(ctx context.Context, channels ...string) PubSub

Watch(ctx context.Context, fn func(Tx) error, keys ...string) error
}

type CoreCmdable interface {
Command(ctx context.Context) *CommandsInfoCmd
CommandList(ctx context.Context, filter FilterBy) *StringSliceCmd
CommandGetKeys(ctx context.Context, commands ...any) *StringSliceCmd
Expand Down Expand Up @@ -470,12 +479,11 @@ type ProbabilisticCmdable interface {
TDigestRevRank(ctx context.Context, key string, values ...float64) *IntSliceCmd
TDigestTrimmedMean(ctx context.Context, key string, lowCutQuantile, highCutQuantile float64) *FloatCmd

Subscribe(ctx context.Context, channels ...string) PubSub
PSubscribe(ctx context.Context, patterns ...string) PubSub
SSubscribe(ctx context.Context, channels ...string) PubSub

Pipeline() Pipeliner
Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error)

TxPipeline() Pipeliner
TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error)
}

// Align with go-redis
Expand Down Expand Up @@ -4629,6 +4637,24 @@ func (c *Compat) Pipeline() Pipeliner {
return newPipeline(c.client)
}

func (c *Compat) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
return newTxPipeline(c.client).Pipelined(ctx, fn)
}

func (c *Compat) TxPipeline() Pipeliner {
return newTxPipeline(c.client)
}

func (c *Compat) Watch(ctx context.Context, fn func(Tx) error, keys ...string) error {
dc, cancel := c.client.Dedicate()
defer cancel()
tx := newTx(dc, cancel)
if err := tx.Watch(ctx, keys...).Err(); err != nil {
return err
}
return fn(newTx(dc, cancel))
}

func (c CacheCompat) BitCount(ctx context.Context, key string, bitCount *BitCount) *IntCmd {
var resp rueidis.RedisResult
if bitCount == nil {
Expand Down
20 changes: 2 additions & 18 deletions rueidiscompat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import (
// To avoid this: it is good idea to use reasonable bigger read/write timeouts
// depends on your batch size and/or use TxPipeline.
type Pipeliner interface {
Cmdable
CoreCmdable

// Len is to obtain the number of commands in the pipeline that have not yet been executed.
Len() int
Expand Down Expand Up @@ -96,10 +96,6 @@ type Pipeline struct {
rets []Cmder
}

func (c *Pipeline) Cache(ttl time.Duration) CacheCompat {
return c.comp.Cache(ttl)
}

func (c *Pipeline) Command(ctx context.Context) *CommandsInfoCmd {
ret := c.comp.Command(ctx)
c.rets = append(c.rets, ret)
Expand Down Expand Up @@ -2434,18 +2430,6 @@ func (c *Pipeline) TDigestTrimmedMean(ctx context.Context, key string, lowCutQua
return ret
}

func (c *Pipeline) Subscribe(ctx context.Context, channels ...string) PubSub {
return c.comp.Subscribe(ctx, channels...)
}

func (c *Pipeline) PSubscribe(ctx context.Context, patterns ...string) PubSub {
return c.comp.PSubscribe(ctx, patterns...)
}

func (c *Pipeline) SSubscribe(ctx context.Context, channels ...string) PubSub {
return c.comp.SSubscribe(ctx, channels...)
}

func (c *Pipeline) TSAdd(ctx context.Context, key string, timestamp interface{}, value float64) *IntCmd {
ret := c.comp.TSAdd(ctx, key, timestamp, value)
c.rets = append(c.rets, ret)
Expand Down Expand Up @@ -2788,7 +2772,7 @@ func (c *Pipeline) Len() int {
}

// Do queues the custom command for later execution.
func (c *Pipeline) Do(ctx context.Context, args ...interface{}) *Cmd {
func (c *Pipeline) Do(_ context.Context, args ...interface{}) *Cmd {
ret := &Cmd{}
if len(args) == 0 {
ret.SetErr(errors.New("redis: please enter the command to be executed"))
Expand Down
145 changes: 145 additions & 0 deletions rueidiscompat/tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package rueidiscompat

import (
"context"
"errors"
"time"
"unsafe"

"github.com/redis/rueidis"
)

var TxFailedErr = errors.New("redis: transaction failed")

var _ Pipeliner = (*TxPipeline)(nil)

type rePipeline = Pipeline

func newTxPipeline(real rueidis.Client) *TxPipeline {
return &TxPipeline{rePipeline: newPipeline(real)}
}

type TxPipeline struct {
*rePipeline
}

func (c *TxPipeline) Exec(ctx context.Context) ([]Cmder, error) {
p := c.comp.client.(*proxy)
if len(p.cmds) == 0 {
return nil, nil
}

rets := c.rets
cmds := p.cmds
c.rets = nil
p.cmds = nil

cmds = append(cmds, c.comp.client.B().Multi().Build(), c.comp.client.B().Exec().Build())
for i := len(cmds) - 2; i >= 1; i-- {
j := i - 1
cmds[j], cmds[i] = cmds[i], cmds[j]
}

resp := p.DoMulti(ctx, cmds...)
results, err := resp[len(resp)-1].ToArray()
if rueidis.IsRedisNil(err) {
err = TxFailedErr
}
for i, r := range results {
rets[i].from(*(*rueidis.RedisResult)(unsafe.Pointer(&proxyresult{
err: resp[i+1].NonRedisError(),
val: r,
})))
}
return rets, err
}

func (c *TxPipeline) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
if err := fn(c); err != nil {
return nil, err
}
return c.Exec(ctx)
}

func (c *TxPipeline) Pipeline() Pipeliner {
return c
}

func (c *TxPipeline) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipelined(ctx, fn)
}

func (c *TxPipeline) TxPipeline() Pipeliner {
return c
}

var _ rueidis.Client = (*txproxy)(nil)

type txproxy struct {
rueidis.CoreClient
}

func (p *txproxy) DoCache(_ context.Context, _ rueidis.Cacheable, _ time.Duration) (resp rueidis.RedisResult) {
panic("not implemented")
}

func (p *txproxy) DoMultiCache(_ context.Context, _ ...rueidis.CacheableTTL) (resp []rueidis.RedisResult) {
panic("not implemented")
}

func (p *txproxy) DoStream(_ context.Context, _ rueidis.Completed) rueidis.RedisResultStream {
panic("not implemented")
}

func (p *txproxy) DoMultiStream(_ context.Context, _ ...rueidis.Completed) rueidis.MultiRedisResultStream {
panic("not implemented")
}

func (p *txproxy) Dedicated(_ func(rueidis.DedicatedClient) error) (err error) {
panic("not implemented")
}

func (p *txproxy) Dedicate() (client rueidis.DedicatedClient, cancel func()) {
panic("not implemented")
}

func (p *txproxy) Nodes() map[string]rueidis.Client {
panic("not implemented")
}

type Tx interface {
CoreCmdable
Watch(ctx context.Context, keys ...string) *StatusCmd
Unwatch(ctx context.Context, keys ...string) *StatusCmd
Close(ctx context.Context) error
}

func newTx(client rueidis.DedicatedClient, cancel func()) *tx {
return &tx{CoreCmdable: NewAdapter(&txproxy{CoreClient: client}), cancel: cancel}
}

type tx struct {
CoreCmdable
cancel func()
}

func (t *tx) Watch(ctx context.Context, keys ...string) *StatusCmd {
ret := &StatusCmd{}
if len(keys) != 0 {
client := t.CoreCmdable.(*Compat).client
ret.from(client.Do(ctx, client.B().Watch().Key(keys...).Build()))
}
return ret
}

func (t *tx) Unwatch(ctx context.Context, _ ...string) *StatusCmd {
ret := &StatusCmd{}
client := t.CoreCmdable.(*Compat).client
ret.from(client.Do(ctx, client.B().Unwatch().Build()))
return ret
}

func (t *tx) Close(_ context.Context) error {
t.cancel()
return nil
}
Loading

0 comments on commit 8f822c2

Please sign in to comment.