From d705f9f025a9124998e9e28e10ee850f3e51ee56 Mon Sep 17 00:00:00 2001 From: DanielYamin Date: Mon, 27 May 2024 16:38:23 +0300 Subject: [PATCH 1/6] multiple InstId --- api/ws/client.go | 23 +++++++++++++++------- api/ws/public.go | 12 ++++++++---- examples/main.go | 50 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 74 insertions(+), 11 deletions(-) create mode 100644 examples/main.go diff --git a/api/ws/client.go b/api/ws/client.go index 34178bd..4321231 100644 --- a/api/ws/client.go +++ b/api/ws/client.go @@ -4,6 +4,7 @@ import ( "context" "crypto/hmac" "crypto/sha256" + "crypto/tls" "encoding/base64" "encoding/json" "fmt" @@ -131,17 +132,17 @@ func (c *ClientWs) Login() error { // Users can choose to subscribe to one or more channels, and the total length of multiple channels cannot exceed 4096 bytes. // // https://www.okex.com/docs-v5/en/#websocket-api-subscribe -func (c *ClientWs) Subscribe(p bool, ch []okex.ChannelName, args map[string]string) error { - count := 1 +func (c *ClientWs) Subscribe(p bool, ch []okex.ChannelName, args ...map[string]string) error { + count := len(args) if len(ch) != 0 { count = len(ch) } tmpArgs := make([]map[string]string, count) - tmpArgs[0] = args + tmpArgs[0] = args[0] for i, name := range ch { tmpArgs[i] = map[string]string{} tmpArgs[i]["channel"] = string(name) - for k, v := range args { + for k, v := range args[0] { tmpArgs[i][k] = v } } @@ -225,16 +226,24 @@ func (c *ClientWs) WaitForAuthorization() error { func (c *ClientWs) dial(p bool) error { c.mu[p].Lock() - conn, res, err := websocket.DefaultDialer.Dial(string(c.url[p]), nil) + defer c.mu[p].Unlock() + + dialer := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: 45 * time.Second, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + + conn, res, err := dialer.Dial(string(c.url[p]), nil) if err != nil { var statusCode int if res != nil { statusCode = res.StatusCode } - c.mu[p].Unlock() return fmt.Errorf("error %d: %w", statusCode, err) } defer res.Body.Close() + go func() { err := c.receiver(p) if err != nil { @@ -247,8 +256,8 @@ func (c *ClientWs) dial(p bool) error { fmt.Printf("sender error: %v\n", err) } }() + c.conn[p] = conn - c.mu[p].Unlock() return nil } func (c *ClientWs) sender(p bool) error { diff --git a/api/ws/public.go b/api/ws/public.go index af13a23..543dca4 100644 --- a/api/ws/public.go +++ b/api/ws/public.go @@ -248,17 +248,21 @@ func (c *Public) UPriceLimit(req requests.PriceLimit, rCh ...bool) error { } // OrderBook -// Retrieve order book data. +// Retrieve order book data for multiple instruments. // // Use books for 400 depth levels, book5 for 5 depth levels, books50-l2-tbt tick-by-tick 50 depth levels, and books-l2-tbt for tick-by-tick 400 depth levels. // // https://www.okex.com/docs-v5/en/#websocket-api-public-channels-order-book-channel -func (c *Public) OrderBook(req requests.OrderBook, ch ...chan *public.OrderBook) error { - m := okex.S2M(req) +func (c *Public) OrderBook(reqs []requests.OrderBook, ch ...chan *public.OrderBook) error { if len(ch) > 0 { c.obCh = ch[0] } - return c.Subscribe(false, []okex.ChannelName{}, m) + var subscriptions []map[string]string + for _, req := range reqs { + m := okex.S2M(req) + subscriptions = append(subscriptions, m) + } + return c.Subscribe(false, []okex.ChannelName{}, subscriptions...) } // UOrderBook diff --git a/examples/main.go b/examples/main.go new file mode 100644 index 0000000..537dbe7 --- /dev/null +++ b/examples/main.go @@ -0,0 +1,50 @@ +package main + +import ( + "context" + "github.com/amir-the-h/okex" + "github.com/amir-the-h/okex/api" + "github.com/amir-the-h/okex/events/public" + requests "github.com/amir-the-h/okex/requests/ws/public" + "log" +) + +func main() { + apiKey := "" + secretKey := "" + passphrase := "" + ctx := context.Background() + client, err := api.NewClient(ctx, apiKey, secretKey, passphrase, okex.NormalServer) + if err != nil { + log.Fatalln(err) + } + + orderBookRequests := []requests.OrderBook{ + {InstID: "BTC-USDT", Channel: "books"}, + {InstID: "ETH-USDT", Channel: "books"}, + {InstID: "LTC-USDT", Channel: "books"}, + {InstID: "XRP-USDT", Channel: "books"}, + {InstID: "EOS-USDT", Channel: "books"}, + {InstID: "BCH-USDT", Channel: "books"}, + {InstID: "ETC-USDT", Channel: "books"}, + {InstID: "BSV-USDT", Channel: "books"}, + {InstID: "TRX-USDT", Channel: "books"}, + {InstID: "LINK-USDT", Channel: "books"}, + {InstID: "ADA-USDT", Channel: "books"}, + {InstID: "DOT-USDT", Channel: "books"}, + {InstID: "UNI-USDT", Channel: "books"}, + } + + obCh := make(chan *public.OrderBook) + err = client.Ws.Public.OrderBook(orderBookRequests, obCh) + if err != nil { + log.Fatalln(err) + } + + // Listen for updates + for update := range obCh { + log.Printf("Received order book update: %+v\n", update) + insId, _ := update.Arg.Get("instId") + log.Printf("Instrument ID: %s\n", insId) + } +} From a60cbbe2ead728b477e6c7f753c430cc82eace01 Mon Sep 17 00:00:00 2001 From: DanielYamin Date: Tue, 28 May 2024 13:51:17 +0300 Subject: [PATCH 2/6] SetDialer added --- api/ws/client.go | 49 ++++++++++++++++++++++++++++-------------------- examples/main.go | 9 +++++++++ go.mod | 2 +- 3 files changed, 39 insertions(+), 21 deletions(-) diff --git a/api/ws/client.go b/api/ws/client.go index 4321231..064d9df 100644 --- a/api/ws/client.go +++ b/api/ws/client.go @@ -4,13 +4,13 @@ import ( "context" "crypto/hmac" "crypto/sha256" - "crypto/tls" "encoding/base64" "encoding/json" "fmt" "github.com/amir-the-h/okex" "github.com/amir-the-h/okex/events" "github.com/gorilla/websocket" + "io" "net/http" "sync" "time" @@ -32,6 +32,7 @@ type ClientWs struct { sendChan map[bool]chan []byte url map[bool]okex.BaseURL conn map[bool]*websocket.Conn + dialer *websocket.Dialer apiKey string secretKey []byte passphrase string @@ -67,6 +68,7 @@ func NewClient(ctx context.Context, apiKey, secretKey, passphrase string, url ma StructuredEventChan: make(chan interface{}), RawEventChan: make(chan *events.Basic), conn: make(map[bool]*websocket.Conn), + dialer: websocket.DefaultDialer, lastTransmit: make(map[bool]*time.Time), mu: map[bool]*sync.RWMutex{true: {}, false: {}}, } @@ -133,19 +135,23 @@ func (c *ClientWs) Login() error { // // https://www.okex.com/docs-v5/en/#websocket-api-subscribe func (c *ClientWs) Subscribe(p bool, ch []okex.ChannelName, args ...map[string]string) error { - count := len(args) - if len(ch) != 0 { - count = len(ch) - } - tmpArgs := make([]map[string]string, count) - tmpArgs[0] = args[0] - for i, name := range ch { - tmpArgs[i] = map[string]string{} - tmpArgs[i]["channel"] = string(name) - for k, v := range args[0] { - tmpArgs[i][k] = v + chCount := max(len(ch), 1) + tmpArgs := make([]map[string]string, chCount*len(args)) + + n := 0 + for i := 0; i < chCount; i++ { + for _, arg := range args { + tmpArgs[n] = make(map[string]string) + for k, v := range arg { + tmpArgs[n][k] = v + } + if len(ch) > 0 { + tmpArgs[n]["channel"] = string(ch[i]) + } + n++ } } + return c.Send(p, okex.SubscribeOperation, tmpArgs) } @@ -206,6 +212,11 @@ func (c *ClientWs) SetChannels(errCh chan *events.Error, subCh chan *events.Subs c.SuccessChan = sCh } +// SetDialer sets a custom dialer for the WebSocket connection. +func (c *ClientWs) SetDialer(dialer *websocket.Dialer) { + c.dialer = dialer +} + // WaitForAuthorization waits for the auth response and try to log in if it was needed func (c *ClientWs) WaitForAuthorization() error { if c.Authorized { @@ -228,13 +239,7 @@ func (c *ClientWs) dial(p bool) error { c.mu[p].Lock() defer c.mu[p].Unlock() - dialer := websocket.Dialer{ - Proxy: http.ProxyFromEnvironment, - HandshakeTimeout: 45 * time.Second, - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } - - conn, res, err := dialer.Dial(string(c.url[p]), nil) + conn, res, err := c.dialer.Dial(string(c.url[p]), nil) if err != nil { var statusCode int if res != nil { @@ -242,8 +247,12 @@ func (c *ClientWs) dial(p bool) error { } return fmt.Errorf("error %d: %w", statusCode, err) } - defer res.Body.Close() + defer func(Body io.ReadCloser) { + err := Body.Close() + if err != nil { + } + }(res.Body) go func() { err := c.receiver(p) if err != nil { diff --git a/examples/main.go b/examples/main.go index 537dbe7..a454a3e 100644 --- a/examples/main.go +++ b/examples/main.go @@ -2,11 +2,15 @@ package main import ( "context" + "crypto/tls" "github.com/amir-the-h/okex" "github.com/amir-the-h/okex/api" "github.com/amir-the-h/okex/events/public" requests "github.com/amir-the-h/okex/requests/ws/public" + "github.com/gorilla/websocket" "log" + "net/http" + "time" ) func main() { @@ -35,6 +39,11 @@ func main() { {InstID: "UNI-USDT", Channel: "books"}, } + client.Ws.Public.SetDialer(&websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: 45 * time.Second, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }) obCh := make(chan *public.OrderBook) err = client.Ws.Public.OrderBook(orderBookRequests, obCh) if err != nil { diff --git a/go.mod b/go.mod index d4a64a0..68006a2 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,5 @@ module github.com/amir-the-h/okex -go 1.17 +go 1.21 require github.com/gorilla/websocket v1.4.2 From 33ca1546b161e6408bdf30f3267f6f220838f652 Mon Sep 17 00:00:00 2001 From: DanielYamin Date: Tue, 28 May 2024 17:13:43 +0300 Subject: [PATCH 3/6] Data race fix --- api/ws/client.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/api/ws/client.go b/api/ws/client.go index 064d9df..6b62ad0 100644 --- a/api/ws/client.go +++ b/api/ws/client.go @@ -70,7 +70,7 @@ func NewClient(ctx context.Context, apiKey, secretKey, passphrase string, url ma conn: make(map[bool]*websocket.Conn), dialer: websocket.DefaultDialer, lastTransmit: make(map[bool]*time.Time), - mu: map[bool]*sync.RWMutex{true: {}, false: {}}, + mu: map[bool]*sync.RWMutex{true: new(sync.RWMutex), false: new(sync.RWMutex)}, } c.Private = NewPrivate(c) c.Public = NewPublic(c) @@ -237,8 +237,6 @@ func (c *ClientWs) WaitForAuthorization() error { func (c *ClientWs) dial(p bool) error { c.mu[p].Lock() - defer c.mu[p].Unlock() - conn, res, err := c.dialer.Dial(string(c.url[p]), nil) if err != nil { var statusCode int @@ -247,10 +245,13 @@ func (c *ClientWs) dial(p bool) error { } return fmt.Errorf("error %d: %w", statusCode, err) } + c.conn[p] = conn + c.mu[p].Unlock() + defer func(Body io.ReadCloser) { err := Body.Close() if err != nil { - + fmt.Printf("error closing body: %v\n", err) } }(res.Body) go func() { @@ -266,9 +267,9 @@ func (c *ClientWs) dial(p bool) error { } }() - c.conn[p] = conn return nil } + func (c *ClientWs) sender(p bool) error { ticker := time.NewTicker(time.Millisecond * 300) defer ticker.Stop() @@ -297,7 +298,11 @@ func (c *ClientWs) sender(p bool) error { return err } case <-ticker.C: - if c.conn[p] != nil && (c.lastTransmit[p] == nil || (c.lastTransmit[p] != nil && time.Since(*c.lastTransmit[p]) > PingPeriod)) { + c.mu[p].RLock() + conn := c.conn[p] + lastTransmit := c.lastTransmit[p] + c.mu[p].RUnlock() + if conn != nil && (lastTransmit == nil || (lastTransmit != nil && time.Since(*lastTransmit) > PingPeriod)) { go func() { c.sendChan[p] <- []byte("ping") }() @@ -307,6 +312,7 @@ func (c *ClientWs) sender(p bool) error { } } } + func (c *ClientWs) receiver(p bool) error { for { select { @@ -344,6 +350,7 @@ func (c *ClientWs) receiver(p bool) error { } } } + func (c *ClientWs) sign(method, path string) (string, string) { t := time.Now().UTC().Unix() ts := fmt.Sprint(t) From 7023d0ccd8402ff7d7f8d5e3ac8c933bcf622ed5 Mon Sep 17 00:00:00 2001 From: DanielYamin Date: Tue, 28 May 2024 17:15:35 +0300 Subject: [PATCH 4/6] mu init rollback --- api/ws/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/ws/client.go b/api/ws/client.go index 6b62ad0..91aedd6 100644 --- a/api/ws/client.go +++ b/api/ws/client.go @@ -70,7 +70,7 @@ func NewClient(ctx context.Context, apiKey, secretKey, passphrase string, url ma conn: make(map[bool]*websocket.Conn), dialer: websocket.DefaultDialer, lastTransmit: make(map[bool]*time.Time), - mu: map[bool]*sync.RWMutex{true: new(sync.RWMutex), false: new(sync.RWMutex)}, + mu: map[bool]*sync.RWMutex{true: {}, false: {}}, } c.Private = NewPrivate(c) c.Public = NewPublic(c) From 9058b2afe9456e7f4d88eb5d8604a443f3a9a613 Mon Sep 17 00:00:00 2001 From: DanielYamin Date: Mon, 3 Jun 2024 10:10:09 +0300 Subject: [PATCH 5/6] Fix goroutine leaks --- api/ws/client.go | 97 ++++++++----- api/ws/public.go | 252 +++++++++++++++++++-------------- examples/{main.go => books.go} | 10 ++ 3 files changed, 213 insertions(+), 146 deletions(-) rename examples/{main.go => books.go} (86%) diff --git a/api/ws/client.go b/api/ws/client.go index 91aedd6..fa40b8a 100644 --- a/api/ws/client.go +++ b/api/ws/client.go @@ -57,20 +57,18 @@ const ( func NewClient(ctx context.Context, apiKey, secretKey, passphrase string, url map[bool]okex.BaseURL) *ClientWs { ctx, cancel := context.WithCancel(ctx) c := &ClientWs{ - apiKey: apiKey, - secretKey: []byte(secretKey), - passphrase: passphrase, - ctx: ctx, - Cancel: cancel, - url: url, - sendChan: map[bool]chan []byte{true: make(chan []byte, 3), false: make(chan []byte, 3)}, - DoneChan: make(chan interface{}), - StructuredEventChan: make(chan interface{}), - RawEventChan: make(chan *events.Basic), - conn: make(map[bool]*websocket.Conn), - dialer: websocket.DefaultDialer, - lastTransmit: make(map[bool]*time.Time), - mu: map[bool]*sync.RWMutex{true: {}, false: {}}, + apiKey: apiKey, + secretKey: []byte(secretKey), + passphrase: passphrase, + ctx: ctx, + Cancel: cancel, + url: url, + sendChan: map[bool]chan []byte{true: make(chan []byte, 3), false: make(chan []byte, 3)}, + DoneChan: make(chan interface{}), + conn: make(map[bool]*websocket.Conn), + dialer: websocket.DefaultDialer, + lastTransmit: make(map[bool]*time.Time), + mu: map[bool]*sync.RWMutex{true: {}, false: {}}, } c.Private = NewPrivate(c) c.Public = NewPublic(c) @@ -217,6 +215,11 @@ func (c *ClientWs) SetDialer(dialer *websocket.Dialer) { c.dialer = dialer } +func (c *ClientWs) SetEventChannels(structuredEventCh chan interface{}, rawEventCh chan *events.Basic) { + c.StructuredEventChan = structuredEventCh + c.RawEventChan = rawEventCh +} + // WaitForAuthorization waits for the auth response and try to log in if it was needed func (c *ClientWs) WaitForAuthorization() error { if c.Authorized { @@ -360,6 +363,7 @@ func (c *ClientWs) sign(method, path string) (string, string) { h.Write(p) return ts, base64.StdEncoding.EncodeToString(h.Sum(nil)) } + func (c *ClientWs) handleCancel(msg string) error { go func() { c.DoneChan <- msg @@ -367,35 +371,44 @@ func (c *ClientWs) handleCancel(msg string) error { return fmt.Errorf("operation cancelled: %s", msg) } -// TODO: break each case into a separate function func (c *ClientWs) process(data []byte, e *events.Basic) bool { switch e.Event { case "error": e := events.Error{} _ = json.Unmarshal(data, &e) - go func() { - c.ErrChan <- &e - }() + if c.ErrChan != nil { + go func() { + c.ErrChan <- &e + }() + } return true case "subscribe": e := events.Subscribe{} _ = json.Unmarshal(data, &e) - go func() { - if c.SubscribeChan != nil { + if c.SubscribeChan != nil { + go func() { c.SubscribeChan <- &e - } - c.StructuredEventChan <- e - }() + }() + } + if c.StructuredEventChan != nil { + go func() { + c.StructuredEventChan <- e + }() + } return true case "unsubscribe": e := events.Unsubscribe{} _ = json.Unmarshal(data, &e) - go func() { - if c.UnsubscribeCh != nil { + if c.UnsubscribeCh != nil { + go func() { c.UnsubscribeCh <- &e - } - c.StructuredEventChan <- e - }() + }() + } + if c.StructuredEventChan != nil { + go func() { + c.StructuredEventChan <- e + }() + } return true case "login": if time.Since(*c.AuthRequested).Seconds() > 30 { @@ -406,12 +419,16 @@ func (c *ClientWs) process(data []byte, e *events.Basic) bool { c.Authorized = true e := events.Login{} _ = json.Unmarshal(data, &e) - go func() { - if c.LoginChan != nil { + if c.LoginChan != nil { + go func() { c.LoginChan <- &e - } - c.StructuredEventChan <- e - }() + }() + } + if c.StructuredEventChan != nil { + go func() { + c.StructuredEventChan <- e + }() + } return true } if c.Private.Process(data, e) { @@ -428,12 +445,16 @@ func (c *ClientWs) process(data []byte, e *events.Basic) bool { } e := events.Success{} _ = json.Unmarshal(data, &e) - go func() { - if c.SuccessChan != nil { + if c.SuccessChan != nil { + go func() { c.SuccessChan <- &e - } - c.StructuredEventChan <- e - }() + }() + } + if c.StructuredEventChan != nil { + go func() { + c.StructuredEventChan <- e + }() + } return true } go func() { c.RawEventChan <- e }() diff --git a/api/ws/public.go b/api/ws/public.go index 543dca4..4ee43e5 100644 --- a/api/ws/public.go +++ b/api/ws/public.go @@ -83,7 +83,7 @@ func (c *Public) UTickers(req requests.Tickers, rCh ...bool) error { } // OpenInterest -// Retrieve the open interest. Data will by pushed every 3 seconds. +// Retrieve the open interest. Data will be pushed every 3 seconds. // // https://www.okex.com/docs-v5/en/#websocket-api-public-channels-open-interest-channel func (c *Public) OpenInterest(req requests.OpenInterest, ch ...chan *public.OpenInterest) error { @@ -106,7 +106,7 @@ func (c *Public) UOpenInterest(req requests.OpenInterest, rCh ...bool) error { } // Candlesticks -// Retrieve the open interest. Data will by pushed every 3 seconds. +// Retrieve the open interest. Data will be pushed every 3 seconds. // // https://www.okex.com/docs-v5/en/#websocket-api-public-channels-candlesticks-channel func (c *Public) Candlesticks(req requests.Candlesticks, ch ...chan *public.Candlesticks) error { @@ -379,196 +379,232 @@ func (c *Public) Process(data []byte, e *events.Basic) bool { switch ch { case "instruments": e := public.Instruments{} - err := json.Unmarshal(data, &e) - if err != nil { + if err := json.Unmarshal(data, &e); err != nil { return false } - go func() { - if c.iCh != nil { + if c.iCh != nil { + go func() { c.iCh <- &e - } - c.StructuredEventChan <- e - }() + }() + } + if c.StructuredEventChan != nil { + go func() { + c.StructuredEventChan <- e + }() + } return true case "tickers": e := public.Tickers{} - err := json.Unmarshal(data, &e) - if err != nil { + if err := json.Unmarshal(data, &e); err != nil { return false } - go func() { - if c.tCh != nil { + if c.tCh != nil { + go func() { c.tCh <- &e - } - c.StructuredEventChan <- e - }() + }() + } + if c.StructuredEventChan != nil { + go func() { + c.StructuredEventChan <- e + }() + } return true case "open-interest": e := public.OpenInterest{} - err := json.Unmarshal(data, &e) - if err != nil { + if err := json.Unmarshal(data, &e); err != nil { return false } - go func() { - if c.oiCh != nil { + if c.oiCh != nil { + go func() { c.oiCh <- &e - } - c.StructuredEventChan <- e - }() + }() + } + if c.StructuredEventChan != nil { + go func() { + c.StructuredEventChan <- e + }() + } return true case "trades": e := public.Trades{} - err := json.Unmarshal(data, &e) - if err != nil { + if err := json.Unmarshal(data, &e); err != nil { return false } - go func() { - if c.trCh != nil { + if c.trCh != nil { + go func() { c.trCh <- &e - } - c.StructuredEventChan <- e - }() + }() + } + if c.StructuredEventChan != nil { + go func() { + c.StructuredEventChan <- e + }() + } return true case "estimated-price": e := public.EstimatedDeliveryExercisePrice{} - err := json.Unmarshal(data, &e) - if err != nil { + if err := json.Unmarshal(data, &e); err != nil { return false } - go func() { - if c.edepCh != nil { + if c.edepCh != nil { + go func() { c.edepCh <- &e - } - c.StructuredEventChan <- e - }() + }() + } + if c.StructuredEventChan != nil { + go func() { + c.StructuredEventChan <- e + }() + } return true case "mark-price": e := public.MarkPrice{} - err := json.Unmarshal(data, &e) - if err != nil { + if err := json.Unmarshal(data, &e); err != nil { return false } - go func() { - if c.mpCh != nil { + if c.mpCh != nil { + go func() { c.mpCh <- &e - } - c.StructuredEventChan <- e - }() + }() + } + if c.StructuredEventChan != nil { + go func() { + c.StructuredEventChan <- e + }() + } return true case "price-limit": e := public.PriceLimit{} - err := json.Unmarshal(data, &e) - if err != nil { + if err := json.Unmarshal(data, &e); err != nil { return false } - go func() { - if c.plCh != nil { + if c.plCh != nil { + go func() { c.plCh <- &e - } - c.StructuredEventChan <- e - }() + }() + } + if c.StructuredEventChan != nil { + go func() { + c.StructuredEventChan <- e + }() + } return true case "opt-summary": e := public.OPTIONSummary{} - err := json.Unmarshal(data, &e) - if err != nil { + if err := json.Unmarshal(data, &e); err != nil { return false } - go func() { - if c.osCh != nil { + if c.osCh != nil { + go func() { c.osCh <- &e - } - c.StructuredEventChan <- e - }() + }() + } + if c.StructuredEventChan != nil { + go func() { + c.StructuredEventChan <- e + }() + } return true case "funding-rate": - e := public.OPTIONSummary{} - err := json.Unmarshal(data, &e) - if err != nil { + e := public.FundingRate{} + if err := json.Unmarshal(data, &e); err != nil { return false } - go func() { - if c.osCh != nil { - c.osCh <- &e - } - c.StructuredEventChan <- e - }() + if c.frCh != nil { + go func() { + c.frCh <- &e + }() + } + if c.StructuredEventChan != nil { + go func() { + c.StructuredEventChan <- e + }() + } return true case "index-tickers": e := public.IndexTickers{} - err := json.Unmarshal(data, &e) - if err != nil { + if err := json.Unmarshal(data, &e); err != nil { return false } - go func() { - if c.itCh != nil { + if c.itCh != nil { + go func() { c.itCh <- &e - } - c.StructuredEventChan <- e - }() + }() + } + if c.StructuredEventChan != nil { + go func() { + c.StructuredEventChan <- e + }() + } return true default: - // special cases - // market price candlestick channel chName := fmt.Sprint(ch) - // market price channels if strings.Contains(chName, "mark-price-candle") { e := public.MarkPriceCandlesticks{} - err := json.Unmarshal(data, &e) - if err != nil { + if err := json.Unmarshal(data, &e); err != nil { return false } - go func() { - if c.mpcCh != nil { + if c.mpcCh != nil { + go func() { c.mpcCh <- &e - } - c.StructuredEventChan <- e - }() + }() + } + if c.StructuredEventChan != nil { + go func() { + c.StructuredEventChan <- e + }() + } return true } - // index chandlestick channels if strings.Contains(chName, "index-candle") { e := public.IndexCandlesticks{} - err := json.Unmarshal(data, &e) - if err != nil { + if err := json.Unmarshal(data, &e); err != nil { return false } - go func() { - if c.icCh != nil { + if c.icCh != nil { + go func() { c.icCh <- &e - } - c.StructuredEventChan <- e - }() + }() + } + if c.StructuredEventChan != nil { + go func() { + c.StructuredEventChan <- e + }() + } return true } - // candlestick channels if strings.Contains(chName, "candle") { e := public.Candlesticks{} - err := json.Unmarshal(data, &e) - if err != nil { + if err := json.Unmarshal(data, &e); err != nil { return false } - go func() { - if c.cCh != nil { + if c.cCh != nil { + go func() { c.cCh <- &e - } - c.StructuredEventChan <- e - }() + }() + } + if c.StructuredEventChan != nil { + go func() { + c.StructuredEventChan <- e + }() + } return true } - // order book channels if strings.Contains(chName, "books") { e := public.OrderBook{} - err := json.Unmarshal(data, &e) - if err != nil { + if err := json.Unmarshal(data, &e); err != nil { return false } - go func() { - if c.obCh != nil { + if c.obCh != nil { + go func() { c.obCh <- &e - } - c.StructuredEventChan <- e - }() + }() + } + if c.StructuredEventChan != nil { + go func() { + c.StructuredEventChan <- e + }() + } return true } } diff --git a/examples/main.go b/examples/books.go similarity index 86% rename from examples/main.go rename to examples/books.go index a454a3e..9e393b3 100644 --- a/examples/main.go +++ b/examples/books.go @@ -10,10 +10,20 @@ import ( "github.com/gorilla/websocket" "log" "net/http" + _ "net/http/pprof" "time" ) func main() { + + // Start the pprof server + go func() { + log.Println("Starting pprof server on localhost:6060") + if err := http.ListenAndServe("localhost:6060", nil); err != nil { + log.Fatalf("could not start pprof server: %v", err) + } + }() + apiKey := "" secretKey := "" passphrase := "" From 150184599e9934b5dffd88f74286f3d5719ff8a3 Mon Sep 17 00:00:00 2001 From: DanielYamin Date: Mon, 3 Jun 2024 17:38:07 +0300 Subject: [PATCH 6/6] The better design pattern would be to handle channel operations without launching unnecessary goroutines --- api/ws/client.go | 38 +++++----------- api/ws/public.go | 112 ++++++++++++----------------------------------- 2 files changed, 38 insertions(+), 112 deletions(-) diff --git a/api/ws/client.go b/api/ws/client.go index fa40b8a..81d1678 100644 --- a/api/ws/client.go +++ b/api/ws/client.go @@ -377,37 +377,27 @@ func (c *ClientWs) process(data []byte, e *events.Basic) bool { e := events.Error{} _ = json.Unmarshal(data, &e) if c.ErrChan != nil { - go func() { - c.ErrChan <- &e - }() + c.ErrChan <- &e } return true case "subscribe": e := events.Subscribe{} _ = json.Unmarshal(data, &e) if c.SubscribeChan != nil { - go func() { - c.SubscribeChan <- &e - }() + c.SubscribeChan <- &e } if c.StructuredEventChan != nil { - go func() { - c.StructuredEventChan <- e - }() + c.StructuredEventChan <- e } return true case "unsubscribe": e := events.Unsubscribe{} _ = json.Unmarshal(data, &e) if c.UnsubscribeCh != nil { - go func() { - c.UnsubscribeCh <- &e - }() + c.UnsubscribeCh <- &e } if c.StructuredEventChan != nil { - go func() { - c.StructuredEventChan <- e - }() + c.StructuredEventChan <- e } return true case "login": @@ -420,14 +410,10 @@ func (c *ClientWs) process(data []byte, e *events.Basic) bool { e := events.Login{} _ = json.Unmarshal(data, &e) if c.LoginChan != nil { - go func() { - c.LoginChan <- &e - }() + c.LoginChan <- &e } if c.StructuredEventChan != nil { - go func() { - c.StructuredEventChan <- e - }() + c.StructuredEventChan <- e } return true } @@ -446,17 +432,13 @@ func (c *ClientWs) process(data []byte, e *events.Basic) bool { e := events.Success{} _ = json.Unmarshal(data, &e) if c.SuccessChan != nil { - go func() { - c.SuccessChan <- &e - }() + c.SuccessChan <- &e } if c.StructuredEventChan != nil { - go func() { - c.StructuredEventChan <- e - }() + c.StructuredEventChan <- e } return true } - go func() { c.RawEventChan <- e }() + c.RawEventChan <- e return false } diff --git a/api/ws/public.go b/api/ws/public.go index 4ee43e5..45c201e 100644 --- a/api/ws/public.go +++ b/api/ws/public.go @@ -383,14 +383,10 @@ func (c *Public) Process(data []byte, e *events.Basic) bool { return false } if c.iCh != nil { - go func() { - c.iCh <- &e - }() + c.iCh <- &e } if c.StructuredEventChan != nil { - go func() { - c.StructuredEventChan <- e - }() + c.StructuredEventChan <- e } return true case "tickers": @@ -399,14 +395,10 @@ func (c *Public) Process(data []byte, e *events.Basic) bool { return false } if c.tCh != nil { - go func() { - c.tCh <- &e - }() + c.tCh <- &e } if c.StructuredEventChan != nil { - go func() { - c.StructuredEventChan <- e - }() + c.StructuredEventChan <- e } return true case "open-interest": @@ -415,14 +407,10 @@ func (c *Public) Process(data []byte, e *events.Basic) bool { return false } if c.oiCh != nil { - go func() { - c.oiCh <- &e - }() + c.oiCh <- &e } if c.StructuredEventChan != nil { - go func() { - c.StructuredEventChan <- e - }() + c.StructuredEventChan <- e } return true case "trades": @@ -431,14 +419,10 @@ func (c *Public) Process(data []byte, e *events.Basic) bool { return false } if c.trCh != nil { - go func() { - c.trCh <- &e - }() + c.trCh <- &e } if c.StructuredEventChan != nil { - go func() { - c.StructuredEventChan <- e - }() + c.StructuredEventChan <- e } return true case "estimated-price": @@ -447,14 +431,10 @@ func (c *Public) Process(data []byte, e *events.Basic) bool { return false } if c.edepCh != nil { - go func() { - c.edepCh <- &e - }() + c.edepCh <- &e } if c.StructuredEventChan != nil { - go func() { - c.StructuredEventChan <- e - }() + c.StructuredEventChan <- e } return true case "mark-price": @@ -463,14 +443,10 @@ func (c *Public) Process(data []byte, e *events.Basic) bool { return false } if c.mpCh != nil { - go func() { - c.mpCh <- &e - }() + c.mpCh <- &e } if c.StructuredEventChan != nil { - go func() { - c.StructuredEventChan <- e - }() + c.StructuredEventChan <- e } return true case "price-limit": @@ -479,14 +455,10 @@ func (c *Public) Process(data []byte, e *events.Basic) bool { return false } if c.plCh != nil { - go func() { - c.plCh <- &e - }() + c.plCh <- &e } if c.StructuredEventChan != nil { - go func() { - c.StructuredEventChan <- e - }() + c.StructuredEventChan <- e } return true case "opt-summary": @@ -495,14 +467,10 @@ func (c *Public) Process(data []byte, e *events.Basic) bool { return false } if c.osCh != nil { - go func() { - c.osCh <- &e - }() + c.osCh <- &e } if c.StructuredEventChan != nil { - go func() { - c.StructuredEventChan <- e - }() + c.StructuredEventChan <- e } return true case "funding-rate": @@ -511,14 +479,10 @@ func (c *Public) Process(data []byte, e *events.Basic) bool { return false } if c.frCh != nil { - go func() { - c.frCh <- &e - }() + c.frCh <- &e } if c.StructuredEventChan != nil { - go func() { - c.StructuredEventChan <- e - }() + c.StructuredEventChan <- e } return true case "index-tickers": @@ -527,14 +491,10 @@ func (c *Public) Process(data []byte, e *events.Basic) bool { return false } if c.itCh != nil { - go func() { - c.itCh <- &e - }() + c.itCh <- &e } if c.StructuredEventChan != nil { - go func() { - c.StructuredEventChan <- e - }() + c.StructuredEventChan <- e } return true default: @@ -545,14 +505,10 @@ func (c *Public) Process(data []byte, e *events.Basic) bool { return false } if c.mpcCh != nil { - go func() { - c.mpcCh <- &e - }() + c.mpcCh <- &e } if c.StructuredEventChan != nil { - go func() { - c.StructuredEventChan <- e - }() + c.StructuredEventChan <- e } return true } @@ -562,14 +518,10 @@ func (c *Public) Process(data []byte, e *events.Basic) bool { return false } if c.icCh != nil { - go func() { - c.icCh <- &e - }() + c.icCh <- &e } if c.StructuredEventChan != nil { - go func() { - c.StructuredEventChan <- e - }() + c.StructuredEventChan <- e } return true } @@ -579,14 +531,10 @@ func (c *Public) Process(data []byte, e *events.Basic) bool { return false } if c.cCh != nil { - go func() { - c.cCh <- &e - }() + c.cCh <- &e } if c.StructuredEventChan != nil { - go func() { - c.StructuredEventChan <- e - }() + c.StructuredEventChan <- e } return true } @@ -596,14 +544,10 @@ func (c *Public) Process(data []byte, e *events.Basic) bool { return false } if c.obCh != nil { - go func() { - c.obCh <- &e - }() + c.obCh <- &e } if c.StructuredEventChan != nil { - go func() { - c.StructuredEventChan <- e - }() + c.StructuredEventChan <- e } return true }