diff --git a/CHANGELOG.md b/CHANGELOG.md index 29dea9e..969c513 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ Changelog ========= All notable changes to this project will be documented in this file. +v1.1.2-alpha +------------- + +### Changed + +- Combined mutexes into one for `Ws` client + v1.1.1-alpha ------------- diff --git a/README.md b/README.md index c8b17b3..76ffd3b 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ Installation ----------------- ```bash -go get github.com/amir-the-h/okex@v1.1.1-alpha +go get github.com/amir-the-h/okex@v1.1.2-alpha ``` Usage diff --git a/api/ws/client.go b/api/ws/client.go index 64e20fe..1738b4c 100644 --- a/api/ws/client.go +++ b/api/ws/client.go @@ -35,8 +35,7 @@ type ClientWs struct { secretKey []byte passphrase string lastTransmit map[bool]*time.Time - mu map[bool]*sync.Mutex - rmu map[bool]*sync.Mutex + mu map[bool]*sync.RWMutex AuthRequested *time.Time Authorized bool Private *Private @@ -68,8 +67,7 @@ func NewClient(ctx context.Context, apiKey, secretKey, passphrase string, url ma RawEventChan: make(chan *events.Basic), conn: make(map[bool]*websocket.Conn), lastTransmit: make(map[bool]*time.Time), - mu: map[bool]*sync.Mutex{true: {}, false: {}}, - rmu: map[bool]*sync.Mutex{true: {}, false: {}}, + mu: map[bool]*sync.RWMutex{true: {}, false: {}}, } c.Private = NewPrivate(c) c.Public = NewPublic(c) @@ -109,9 +107,11 @@ func (c *ClientWs) Connect(p bool) error { func (c *ClientWs) Login() error { c.mu[true].Lock() if c.Authorized { + c.mu[true].Unlock() return nil } if c.AuthRequested != nil && time.Since(*c.AuthRequested).Seconds() < 30 { + c.mu[true].Unlock() return nil } now := time.Now() @@ -229,17 +229,13 @@ func (c *ClientWs) WaitForAuthorization() error { func (c *ClientWs) dial(p bool) error { c.mu[p].Lock() - c.rmu[p].Lock() - defer func() { - c.mu[p].Unlock() - c.rmu[p].Unlock() - }() conn, res, err := websocket.DefaultDialer.Dial(string(c.url[p]), nil) if err != nil { var statusCode int if res != nil { statusCode = res.StatusCode } + c.mu[p].Unlock() return fmt.Errorf("error %d: %w", statusCode, err) } defer res.Body.Close() @@ -256,6 +252,7 @@ func (c *ClientWs) dial(p bool) error { } }() c.conn[p] = conn + c.mu[p].Unlock() return nil } func (c *ClientWs) sender(p bool) error { @@ -304,21 +301,21 @@ func (c *ClientWs) receiver(p bool) error { case <-c.ctx.Done(): return c.handleCancel("receiver") default: - c.rmu[p].Lock() + c.mu[p].RLock() err := c.conn[p].SetReadDeadline(time.Now().Add(pongWait)) if err != nil { - c.rmu[p].Unlock() + c.mu[p].RUnlock() return err } mt, data, err := c.conn[p].ReadMessage() if err != nil { - c.rmu[p].Unlock() + c.mu[p].RUnlock() if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { return c.conn[p].Close() } return err } - c.rmu[p].Unlock() + c.mu[p].RUnlock() now := time.Now() c.mu[p].Lock() c.lastTransmit[p] = &now