Skip to content

Commit

Permalink
feat(tiup-worker): add distribute publishing lock (#208)
Browse files Browse the repository at this point in the history
Implement the lock with redlock on redis

Signed-off-by: wuhuizuo <[email protected]>

Signed-off-by: wuhuizuo <[email protected]>
  • Loading branch information
wuhuizuo authored Dec 1, 2024
1 parent fe46963 commit f0b4200
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 4 deletions.
4 changes: 2 additions & 2 deletions publisher/cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ func startWorker(ctx context.Context, wg *sync.WaitGroup, reader *kafka.Reader,
return
}

(*wg).Add(1)
wg.Add(1)
go func() {
defer (*wg).Done()
defer wg.Done()
defer reader.Close()

for {
Expand Down
3 changes: 3 additions & 0 deletions publisher/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/PingCAP-QE/ee-apps/dl v0.0.0-20241104081507-4ebce35328e3
github.com/cloudevents/sdk-go/v2 v2.15.2
github.com/go-redis/redis/v8 v8.11.5
github.com/go-redsync/redsync/v4 v4.13.0
github.com/google/uuid v1.6.0
github.com/rs/zerolog v1.33.0
github.com/segmentio/kafka-go v0.4.47
Expand All @@ -30,6 +31,8 @@ require (
github.com/go-chi/chi/v5 v5.1.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/huandu/xstrings v1.5.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
Expand Down
19 changes: 19 additions & 0 deletions publisher/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,29 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOrTI=
github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-redsync/redsync/v4 v4.13.0 h1:49X6GJfnbLGaIpBBREM/zA4uIMDXKAh1NDkvQ1EkZKA=
github.com/go-redsync/redsync/v4 v4.13.0/go.mod h1:HMW4Q224GZQz6x1Xc7040Yfgacukdzu7ifTDAKiyErQ=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws=
github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/huandu/xstrings v1.5.0 h1:2ag3IFq9ZDANvthTwTiqSSZLjDc+BedvHPAp5tJy2TI=
github.com/huandu/xstrings v1.5.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
Expand Down Expand Up @@ -92,6 +105,10 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8=
github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/redis/rueidis v1.0.19 h1:s65oWtotzlIFN8eMPhyYwxlwLR1lUdhza2KtWprKYSo=
github.com/redis/rueidis v1.0.19/go.mod h1:8B+r5wdnjwK3lTFml5VtxjzGOQAC+5UmujoD12pDrEo=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
Expand All @@ -112,6 +129,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM=
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
Expand Down
33 changes: 31 additions & 2 deletions publisher/pkg/impl/tiup_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,22 @@ import (

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/go-redis/redis/v8"
redsync "github.com/go-redsync/redsync/v4"
goredis "github.com/go-redsync/redsync/v4/redis/goredis/v8"
"github.com/rs/zerolog"
)

const (
tiupPublishingMutexName = "global/mutex/tiup-publishing"
tiupPublishingMutexExpiry = 10 * time.Minute
tiupPublishingMutexTries = 32
tiupPublishingMutexRetryDelay = time.Second
)

type tiupWorker struct {
logger zerolog.Logger
redisClient redis.Cmdable
redisClient *redis.Client
mutex *redsync.Mutex
options struct {
LarkWebhookURL string
MirrorURL string
Expand All @@ -26,7 +36,7 @@ type tiupWorker struct {
}
}

func NewTiupWorker(logger *zerolog.Logger, redisClient redis.Cmdable, options map[string]string) (*tiupWorker, error) {
func NewTiupWorker(logger *zerolog.Logger, redisClient *redis.Client, options map[string]string) (*tiupWorker, error) {
handler := tiupWorker{redisClient: redisClient}
if logger == nil {
handler.logger = zerolog.New(os.Stderr).With().Timestamp().Logger()
Expand All @@ -47,6 +57,17 @@ func NewTiupWorker(logger *zerolog.Logger, redisClient redis.Cmdable, options ma
handler.options.PublicServiceURL = "http://publisher-<env>-mirror.<namespace>.svc"
}

pool := goredis.NewPool(redisClient)
rs := redsync.New(pool)

// Obtain a new mutex by using the same name for all instances wanting the
// same lock.
handler.mutex = rs.NewMutex(tiupPublishingMutexName,
redsync.WithExpiry(tiupPublishingMutexExpiry),
redsync.WithTries(tiupPublishingMutexTries),
redsync.WithRetryDelay(tiupPublishingMutexRetryDelay),
)

return &handler, nil
}

Expand Down Expand Up @@ -178,6 +199,14 @@ func (p *tiupWorker) notifyLark(req *PublishRequest, err error) {
}

func (p *tiupWorker) publish(file string, info *PublishInfo) error {
// Obtain a lock for our given global TiUP mirrors mutex.
// After this is successful, no one else can obtain the same
// lock (the same mutex name) until we unlock it.
if err := p.mutex.Lock(); err != nil {
return fmt.Errorf("failed to obtain lock: %v", err)
}
defer p.mutex.Unlock()

args := []string{"mirror", "publish", info.Name, info.Version, file, info.EntryPoint, "--os", info.OS, "--arch", info.Arch, "--desc", info.Description}
if info.Standalone {
args = append(args, "--standalone")
Expand Down

0 comments on commit f0b4200

Please sign in to comment.