Skip to content

Commit

Permalink
websocket/gateio: Support multi connection management and integrate w…
Browse files Browse the repository at this point in the history
…ith GateIO (#1580)

* gateio: Add multi asset websocket support WIP.

* meow

* Add tests and shenanigans

* integrate flushing and for enabling/disabling pairs from rpc shenanigans

* some changes

* linter: fixes strikes again.

* Change name ConnectionAssociation -> ConnectionCandidate for better clarity on purpose. Change connections map to point to candidate to track subscriptions for future dynamic connections holder and drop struct ConnectionDetails.

* Add subscription tests (state functional)

* glorious:nits + proxy handling

* Spelling

* linter: fixerino

* instead of nil, dont do nil.

* clean up nils

* cya nils

* don't need to set URL or check if its running

* stop ping handler routine leak

* * Fix bug where reader routine on error that is not a disconnection error but websocket frame error or anything really makes the reader routine return and then connection never cycles and the buffer gets filled.
* Handle reconnection via an errors.Is check which is simpler and in that scope allow for quick disconnect reconnect without waiting for connection cycle.
* Dial now uses code from DialContext but just calls context.Background()
* Don't allow reader to return on parse binary response error. Just output error and return a non nil response

* Allow rollback on connect on any error across all connections

* fix shadow jutsu

* glorious/gk: nitters - adds in ws mock server

* linter: fix

* fix deadlock on connection as the previous channel had no reader and would hang connection reader for eternity.

* gk: nits

* Leak issue and edge case

* gk: nits

* gk: drain brain

* glorious: nits

* Update exchanges/stream/websocket.go

Co-authored-by: Scott <[email protected]>

* glorious: nits

* add tests

* linter: fix

* After merge

* Add error connection info

* Fix edge case where it does not reconnect made by an already closed connection

* stream coverage

* glorious: nits

* glorious: nits removed asset error handling in stream package

* linter: fix

* rm block

* Add basic readme

* fix asset enabled flush cycle for multi connection

* spella: fix

* linter: fix

* Add glorious suggestions, fix some race thing

* reinstate name before any routine gets spawned

* stop on error in mock tests

* glorious: nits

* glorious: nits found in CI build

* Add test for drain, bumped wait times as there seems to be something happening on macos CI builds, used context.WithTimeout because its instant.

* mutex across shutdown and connect for protection

* lint: fix

* test time withoffset, reinstate stop

* fix whoops

* const trafficCheckInterval; rm testmain

* y

* fix lint

* bump time check window

* stream: fix intermittant test failures while testing routines and remove code that is not needed.

* spells

* cant do what I did

* protect race due to routine.

* update testURL

* use mock websocket connection instead of test URL's

* linter: fix

* remove url because its throwing errors on CI builds

* connections drop all the time, don't need to worry about not being able to echo back ws data as it can be easily reviewed _test file side.

* remove another superfluous url thats not really set up for this

* spawn overwatch routine when there is no errors, inline checker instead of waiting for a time period, add sleep inline with echo handler as this is really quick and wanted to ensure that latency is handing correctly

* linter: fixerino uperino

* glorious: panix

* linter: things

* whoops

* defer lock and use functions that don't require locking in SetProxyAddress

* lint: fix

* thrasher: nits

---------

Co-authored-by: shazbert <[email protected]>
Co-authored-by: Scott <[email protected]>
  • Loading branch information
3 people authored Oct 10, 2024
1 parent c2dfb37 commit ac731ce
Show file tree
Hide file tree
Showing 58 changed files with 1,990 additions and 1,469 deletions.
6 changes: 3 additions & 3 deletions docs/ADD_NEW_EXCHANGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ channels:
continue
}
// When we have a successful subscription, we can alert our internal management system of the success.
f.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i])
f.Websocket.AddSuccessfulSubscriptions(f.Websocket.Conn, channelsToSubscribe[i])
}
return errs
}
Expand Down Expand Up @@ -1038,7 +1038,7 @@ channels:
continue
}
// When we have a successful unsubscription, we can alert our internal management system of the success.
f.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
f.Websocket.RemoveSubscriptions(f.Websocket.Conn, channelsToUnsubscribe[i])
}
if errs != nil {
return errs
Expand Down Expand Up @@ -1098,7 +1098,7 @@ func (f *FTX) Setup(exch *config.Exchange) error {
return err
}
// Sets up a new connection for the websocket, there are two separate connections denoted by the ConnectionSetup struct auth bool.
return f.Websocket.SetupNewConnection(stream.ConnectionSetup{
return f.Websocket.SetupNewConnection(&stream.ConnectionSetup{
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
// RateLimit int64 rudimentary rate limit that sleeps connection in milliseconds before sending designated payload
Expand Down
7 changes: 7 additions & 0 deletions engine/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2935,6 +2935,13 @@ func (s *RPCServer) SetExchangeAsset(_ context.Context, r *gctrpc.SetExchangeAss
return nil, err
}

if base.IsWebsocketEnabled() && base.Websocket.IsConnected() {
err = exch.FlushWebsocketChannels()
if err != nil {
return nil, err
}
}

return &gctrpc.GenericResponse{Status: MsgStatusSuccess}, nil
}

Expand Down
5 changes: 3 additions & 2 deletions exchanges/binance/binance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions"
mockws "github.com/thrasher-corp/gocryptotrader/internal/testing/websocket"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
)

Expand Down Expand Up @@ -1989,7 +1990,7 @@ func TestSubscribe(t *testing.T) {
require.ElementsMatch(tb, req.Params, exp, "Params should have correct channels")
return w.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf(`{"result":null,"id":%d}`, req.ID)))
}
b = testexch.MockWsInstance[Binance](t, testexch.CurryWsMockUpgrader(t, mock))
b = testexch.MockWsInstance[Binance](t, mockws.CurryWsMockUpgrader(t, mock))
} else {
testexch.SetupWs(t, b)
}
Expand All @@ -2011,7 +2012,7 @@ func TestSubscribeBadResp(t *testing.T) {
require.NoError(tb, err, "Unmarshal should not error")
return w.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf(`{"result":{"error":"carrots"},"id":%d}`, req.ID)))
}
b := testexch.MockWsInstance[Binance](t, testexch.CurryWsMockUpgrader(t, mock)) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
b := testexch.MockWsInstance[Binance](t, mockws.CurryWsMockUpgrader(t, mock)) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
err := b.Subscribe(channels)
assert.ErrorIs(t, err, common.ErrUnknownError, "Subscribe should error correctly")
assert.ErrorContains(t, err, "carrots", "Subscribe should error containing the carrots")
Expand Down
6 changes: 3 additions & 3 deletions exchanges/binance/binance_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ func (b *Binance) Unsubscribe(channels subscription.List) error {
// manageSubs subscribes or unsubscribes from a list of subscriptions
func (b *Binance) manageSubs(op string, subs subscription.List) error {
if op == wsSubscribeMethod {
if err := b.Websocket.AddSubscriptions(subs...); err != nil { // Note: AddSubscription will set state to subscribing
if err := b.Websocket.AddSubscriptions(b.Websocket.Conn, subs...); err != nil { // Note: AddSubscription will set state to subscribing
return err
}
} else {
Expand Down Expand Up @@ -592,15 +592,15 @@ func (b *Binance) manageSubs(op string, subs subscription.List) error {
b.Websocket.DataHandler <- err

if op == wsSubscribeMethod {
if err2 := b.Websocket.RemoveSubscriptions(subs...); err2 != nil {
if err2 := b.Websocket.RemoveSubscriptions(b.Websocket.Conn, subs...); err2 != nil {
err = common.AppendError(err, err2)
}
}
} else {
if op == wsSubscribeMethod {
err = common.AppendError(err, subs.SetStates(subscription.SubscribedState))
} else {
err = b.Websocket.RemoveSubscriptions(subs...)
err = b.Websocket.RemoveSubscriptions(b.Websocket.Conn, subs...)
}
}

Expand Down
2 changes: 1 addition & 1 deletion exchanges/binance/binance_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (b *Binance) Setup(exch *config.Exchange) error {
return err
}

return b.Websocket.SetupNewConnection(stream.ConnectionSetup{
return b.Websocket.SetupNewConnection(&stream.ConnectionSetup{
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
RateLimit: request.NewWeightedRateLimitByDuration(250 * time.Millisecond),
Expand Down
4 changes: 2 additions & 2 deletions exchanges/binanceus/binanceus_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ func (bi *Binanceus) Subscribe(channelsToSubscribe subscription.List) error {
return err
}
}
return bi.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe...)
return bi.Websocket.AddSuccessfulSubscriptions(bi.Websocket.Conn, channelsToSubscribe...)
}

// Unsubscribe unsubscribes from a set of channels
Expand All @@ -614,7 +614,7 @@ func (bi *Binanceus) Unsubscribe(channelsToUnsubscribe subscription.List) error
return err
}
}
return bi.Websocket.RemoveSubscriptions(channelsToUnsubscribe...)
return bi.Websocket.RemoveSubscriptions(bi.Websocket.Conn, channelsToUnsubscribe...)
}

func (bi *Binanceus) setupOrderbookManager() {
Expand Down
2 changes: 1 addition & 1 deletion exchanges/binanceus/binanceus_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (bi *Binanceus) Setup(exch *config.Exchange) error {
return err
}

return bi.Websocket.SetupNewConnection(stream.ConnectionSetup{
return bi.Websocket.SetupNewConnection(&stream.ConnectionSetup{
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
RateLimit: request.NewWeightedRateLimitByDuration(300 * time.Millisecond),
Expand Down
16 changes: 8 additions & 8 deletions exchanges/bitfinex/bitfinex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1324,7 +1324,7 @@ func TestWsSubscribedResponse(t *testing.T) {
assert.ErrorContains(t, err, "waiter1", "Should error containing subID if")
}

err = b.Websocket.AddSubscriptions(&subscription.Subscription{Key: "waiter1"})
err = b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Key: "waiter1"})
require.NoError(t, err, "AddSubscriptions must not error")
err = b.wsHandleData([]byte(`{"event":"subscribed","channel":"ticker","chanId":224555,"subId":"waiter1","symbol":"tBTCUSD","pair":"BTCUSD"}`))
assert.NoError(t, err, "wsHandleData should not error")
Expand All @@ -1337,7 +1337,7 @@ func TestWsSubscribedResponse(t *testing.T) {
}

func TestWsOrderBook(t *testing.T) {
err := b.Websocket.AddSubscriptions(&subscription.Subscription{Key: 23405, Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsBook})
err := b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Key: 23405, Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsBook})
require.NoError(t, err, "AddSubscriptions must not error")
pressXToJSON := `[23405,[[38334303613,9348.8,0.53],[38334308111,9348.8,5.98979404],[38331335157,9344.1,1.28965787],[38334302803,9343.8,0.08230094],[38334279092,9343,0.8],[38334307036,9342.938663676,0.8],[38332749107,9342.9,0.2],[38332277330,9342.8,0.85],[38329406786,9342,0.1432012],[38332841570,9341.947288638,0.3],[38332163238,9341.7,0.3],[38334303384,9341.6,0.324],[38332464840,9341.4,0.5],[38331935870,9341.2,0.5],[38334312082,9340.9,0.02126899],[38334261292,9340.8,0.26763],[38334138680,9340.625455254,0.12],[38333896802,9339.8,0.85],[38331627527,9338.9,1.57863959],[38334186713,9338.9,0.26769],[38334305819,9338.8,2.999],[38334211180,9338.75285796,3.999],[38334310699,9337.8,0.10679883],[38334307414,9337.5,1],[38334179822,9337.1,0.26773],[38334306600,9336.659955102,1.79],[38334299667,9336.6,1.1],[38334306452,9336.6,0.13979771],[38325672859,9336.3,1.25],[38334311646,9336.2,1],[38334258509,9336.1,0.37],[38334310592,9336,1.79],[38334310378,9335.6,1.43],[38334132444,9335.2,0.26777],[38331367325,9335,0.07],[38334310703,9335,0.10680562],[38334298209,9334.7,0.08757301],[38334304857,9334.456899462,0.291],[38334309940,9334.088390727,0.0725],[38334310377,9333.7,1.2868],[38334297615,9333.607784,0.1108],[38334095188,9333.3,0.26785],[38334228913,9332.7,0.40861186],[38334300526,9332.363996604,0.3884],[38334310701,9332.2,0.10680562],[38334303548,9332.005382871,0.07],[38334311798,9331.8,0.41285228],[38334301012,9331.7,1.7952],[38334089877,9331.4,0.2679],[38321942150,9331.2,0.2],[38334310670,9330,1.069],[38334063096,9329.6,0.26796],[38334310700,9329.4,0.10680562],[38334310404,9329.3,1],[38334281630,9329.1,6.57150597],[38334036864,9327.7,0.26801],[38334310702,9326.6,0.10680562],[38334311799,9326.1,0.50220625],[38334164163,9326,0.219638],[38334309722,9326,1.5],[38333051682,9325.8,0.26807],[38334302027,9325.7,0.75],[38334203435,9325.366592,0.32397696],[38321967613,9325,0.05],[38334298787,9324.9,0.3],[38334301719,9324.8,3.6227592],[38331316716,9324.763454646,0.71442],[38334310698,9323.8,0.10680562],[38334035499,9323.7,0.23431017],[38334223472,9322.670551788,0.42150603],[38334163459,9322.560399006,0.143967],[38321825171,9320.8,2],[38334075805,9320.467496148,0.30772633],[38334075800,9319.916732238,0.61457592],[38333682302,9319.7,0.0011],[38331323088,9319.116771762,0.12913],[38333677480,9319,0.0199],[38334277797,9318.6,0.89],[38325235155,9318.041088,1.20249],[38334310910,9317.82382938,1.79],[38334311811,9317.2,0.61079138],[38334311812,9317.2,0.71937652],[38333298214,9317.1,50],[38334306359,9317,1.79],[38325531545,9316.382823951,0.21263],[38333727253,9316.3,0.02316372],[38333298213,9316.1,45],[38333836479,9316,2.135],[38324520465,9315.9,2.7681],[38334307411,9315.5,1],[38330313617,9315.3,0.84455],[38334077770,9315.294024,0.01248397],[38334286663,9315.294024,1],[38325533762,9315.290315394,2.40498],[38334310018,9315.2,3],[38333682617,9314.6,0.0011],[38334304794,9314.6,0.76364676],[38334304798,9314.3,0.69242113],[38332915733,9313.8,0.0199],[38334084411,9312.8,1],[38334311893,9350.1,-1.015],[38334302734,9350.3,-0.26737],[38334300732,9350.8,-5.2],[38333957619,9351,-0.90677089],[38334300521,9351,-1.6457],[38334301600,9351.012829557,-0.0523],[38334308878,9351.7,-2.5],[38334299570,9351.921544,-0.1015],[38334279367,9352.1,-0.26732],[38334299569,9352.411802928,-0.4036],[38334202773,9353.4,-0.02139404],[38333918472,9353.7,-1.96412776],[38334278782,9354,-0.26731],[38334278606,9355,-1.2785],[38334302105,9355.439221251,-0.79191542],[38313897370,9355.569409242,-0.43363],[38334292995,9355.584296,-0.0979],[38334216989,9355.8,-0.03686414],[38333894025,9355.9,-0.26721],[38334293798,9355.936691952,-0.4311],[38331159479,9356,-0.4204022],[38333918888,9356.1,-1.10885563],[38334298205,9356.4,-0.20124428],[38328427481,9356.5,-0.1],[38333343289,9356.6,-0.41034213],[38334297205,9356.6,-0.08835018],[38334277927,9356.741101161,-0.0737],[38334311645,9356.8,-0.5],[38334309002,9356.9,-5],[38334309736,9357,-0.10680107],[38334306448,9357.4,-0.18645275],[38333693302,9357.7,-0.2672],[38332815159,9357.8,-0.0011],[38331239824,9358.2,-0.02],[38334271608,9358.3,-2.999],[38334311971,9358.4,-0.55],[38333919260,9358.5,-1.9972841],[38334265365,9358.5,-1.7841],[38334277960,9359,-3],[38334274601,9359.020969848,-3],[38326848839,9359.1,-0.84],[38334291080,9359.247048,-0.16199869],[38326848844,9359.4,-1.84],[38333680200,9359.6,-0.26713],[38331326606,9359.8,-0.84454],[38334309738,9359.8,-0.10680107],[38331314707,9359.9,-0.2],[38333919803,9360.9,-1.41177599],[38323651149,9361.33417827,-0.71442],[38333656906,9361.5,-0.26705],[38334035500,9361.5,-0.40861586],[38334091886,9362.4,-6.85940815],[38334269617,9362.5,-4],[38323629409,9362.545858872,-2.40497],[38334309737,9362.7,-0.10680107],[38334312380,9362.7,-3],[38325280830,9362.8,-1.75123],[38326622800,9362.8,-1.05145],[38333175230,9363,-0.0011],[38326848745,9363.2,-0.79],[38334308960,9363.206775564,-0.12],[38333920234,9363.3,-1.25318113],[38326848843,9363.4,-1.29],[38331239823,9363.4,-0.02],[38333209613,9363.4,-0.26719],[38334299964,9364,-0.05583123],[38323470224,9364.161816648,-0.12912],[38334284711,9365,-0.21346019],[38334299594,9365,-2.6757062],[38323211816,9365.073132585,-0.21262],[38334312456,9365.1,-0.11167861],[38333209612,9365.2,-0.26719],[38327770474,9365.3,-0.0073],[38334298788,9365.3,-0.3],[38334075803,9365.409831204,-0.30772637],[38334309740,9365.5,-0.10680107],[38326608767,9365.7,-2.76809],[38333920657,9365.7,-1.25848083],[38329594226,9366.6,-0.02587],[38334311813,9366.7,-4.72290945],[38316386301,9367.39258128,-2.37581],[38334302026,9367.4,-4.5],[38334228915,9367.9,-0.81725458],[38333921381,9368.1,-1.72213641],[38333175678,9368.2,-0.0011],[38334301150,9368.2,-2.654604],[38334297208,9368.3,-0.78036466],[38334309739,9368.3,-0.10680107],[38331227515,9368.7,-0.02],[38331184470,9369,-0.003975],[38334203436,9369.319616,-0.32397695],[38334269964,9369.7,-0.5],[38328386732,9370,-4.11759935],[38332719555,9370,-0.025],[38333921935,9370.5,-1.2224398],[38334258511,9370.5,-0.35],[38326848842,9370.8,-0.34],[38333985038,9370.9,-0.8551502],[38334283018,9370.9,-1],[38326848744,9371,-1.34]],5]`
err = b.wsHandleData([]byte(pressXToJSON))
Expand All @@ -1355,7 +1355,7 @@ func TestWsOrderBook(t *testing.T) {
}

func TestWsTradeResponse(t *testing.T) {
err := b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsTrades, Key: 18788})
err := b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsTrades, Key: 18788})
require.NoError(t, err, "AddSubscriptions must not error")
pressXToJSON := `[18788,[[412685577,1580268444802,11.1998,176.3],[412685575,1580268444802,5,176.29952759],[412685574,1580268374717,1.99069999,176.41],[412685573,1580268374717,1.00930001,176.41],[412685572,1580268358760,0.9907,176.47],[412685571,1580268324362,0.5505,176.44],[412685570,1580268297270,-0.39040819,176.39],[412685568,1580268297270,-0.39780162,176.46475676],[412685567,1580268283470,-0.09,176.41],[412685566,1580268256536,-2.31310783,176.48],[412685565,1580268256536,-0.59669217,176.49],[412685564,1580268256536,-0.9902,176.49],[412685562,1580268194474,0.9902,176.55],[412685561,1580268186215,0.1,176.6],[412685560,1580268185964,-2.17096773,176.5],[412685559,1580268185964,-1.82903227,176.51],[412685558,1580268181215,2.098914,176.53],[412685557,1580268169844,16.7302,176.55],[412685556,1580268169844,3.25,176.54],[412685555,1580268155725,0.23576115,176.45],[412685553,1580268155725,3,176.44596249],[412685552,1580268155725,3.25,176.44],[412685551,1580268155725,5,176.44],[412685550,1580268155725,0.65830078,176.41],[412685549,1580268155725,0.45063807,176.41],[412685548,1580268153825,-0.67604704,176.39],[412685547,1580268145713,2.5883,176.41],[412685543,1580268087513,12.92927,176.33],[412685542,1580268087513,0.40083,176.33],[412685533,1580268005756,-0.17096773,176.32]]]`
err = b.wsHandleData([]byte(pressXToJSON))
Expand All @@ -1365,7 +1365,7 @@ func TestWsTradeResponse(t *testing.T) {
}

func TestWsTickerResponse(t *testing.T) {
err := b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsTicker, Key: 11534})
err := b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsTicker, Key: 11534})
require.NoError(t, err, "AddSubscriptions must not error")
pressXToJSON := `[11534,[61.304,2228.36155358,61.305,1323.2442970500003,0.395,0.0065,61.371,50973.3020771,62.5,57.421]]`
err = b.wsHandleData([]byte(pressXToJSON))
Expand All @@ -1376,7 +1376,7 @@ func TestWsTickerResponse(t *testing.T) {
if err != nil {
t.Error(err)
}
err = b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: wsTicker, Key: 123412})
err = b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: wsTicker, Key: 123412})
require.NoError(t, err, "AddSubscriptions must not error")
pressXToJSON = `[123412,[61.304,2228.36155358,61.305,1323.2442970500003,0.395,0.0065,61.371,50973.3020771,62.5,57.421]]`
err = b.wsHandleData([]byte(pressXToJSON))
Expand All @@ -1387,7 +1387,7 @@ func TestWsTickerResponse(t *testing.T) {
if err != nil {
t.Error(err)
}
err = b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: wsTicker, Key: 123413})
err = b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: wsTicker, Key: 123413})
require.NoError(t, err, "AddSubscriptions must not error")
pressXToJSON = `[123413,[61.304,2228.36155358,61.305,1323.2442970500003,0.395,0.0065,61.371,50973.3020771,62.5,57.421]]`
err = b.wsHandleData([]byte(pressXToJSON))
Expand All @@ -1398,7 +1398,7 @@ func TestWsTickerResponse(t *testing.T) {
if err != nil {
t.Error(err)
}
err = b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: wsTicker, Key: 123414})
err = b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: wsTicker, Key: 123414})
require.NoError(t, err, "AddSubscriptions must not error")
pressXToJSON = `[123414,[61.304,2228.36155358,61.305,1323.2442970500003,0.395,0.0065,61.371,50973.3020771,62.5,57.421]]`
err = b.wsHandleData([]byte(pressXToJSON))
Expand All @@ -1408,7 +1408,7 @@ func TestWsTickerResponse(t *testing.T) {
}

func TestWsCandleResponse(t *testing.T) {
err := b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsCandles, Key: 343351})
err := b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsCandles, Key: 343351})
require.NoError(t, err, "AddSubscriptions must not error")
pressXToJSON := `[343351,[[1574698260000,7379.785503,7383.8,7388.3,7379.785503,1.68829482]]]`
err = b.wsHandleData([]byte(pressXToJSON))
Expand Down
10 changes: 5 additions & 5 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error {
c.Key = int(chanID)

// subscribeToChan removes the old subID keyed Subscription
if err := b.Websocket.AddSuccessfulSubscriptions(c); err != nil {
if err := b.Websocket.AddSuccessfulSubscriptions(b.Websocket.Conn, c); err != nil {
return fmt.Errorf("%w: %w subID: %s", stream.ErrSubscriptionFailure, err, subID)
}

Expand Down Expand Up @@ -1661,7 +1661,7 @@ func (b *Bitfinex) resubOrderbook(c *subscription.Subscription) error {

// Resub will block so we have to do this in a goro
go func() {
if err := b.Websocket.ResubscribeToChannel(c); err != nil {
if err := b.Websocket.ResubscribeToChannel(b.Websocket.Conn, c); err != nil {
log.Errorf(log.ExchangeSys, "%s error resubscribing orderbook: %v", b.Name, err)
}
}()
Expand Down Expand Up @@ -1748,13 +1748,13 @@ func (b *Bitfinex) subscribeToChan(chans subscription.List) error {
// Add a temporary Key so we can find this Sub when we get the resp without delay or context switch
// Otherwise we might drop the first messages after the subscribed resp
c.Key = subID // Note subID string type avoids conflicts with later chanID key
if err = b.Websocket.AddSubscriptions(c); err != nil {
if err = b.Websocket.AddSubscriptions(b.Websocket.Conn, c); err != nil {
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Pairs, err)
}

// Always remove the temporary subscription keyed by subID
defer func() {
_ = b.Websocket.RemoveSubscriptions(c)
_ = b.Websocket.RemoveSubscriptions(b.Websocket.Conn, c)
}()

respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, "subscribe:"+subID, req)
Expand Down Expand Up @@ -1861,7 +1861,7 @@ func (b *Bitfinex) unsubscribeFromChan(chans subscription.List) error {
return wErr
}

return b.Websocket.RemoveSubscriptions(c)
return b.Websocket.RemoveSubscriptions(b.Websocket.Conn, c)
}

// getErrResp takes a json response string and looks for an error event type
Expand Down
Loading

0 comments on commit ac731ce

Please sign in to comment.