Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

websocket/gateio: Support multi connection management and integrate with GateIO #1580

Merged
merged 91 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from 82 commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
64c24ab
gateio: Add multi asset websocket support WIP.
Jul 14, 2024
f509399
meow
Jul 14, 2024
feed04e
Add tests and shenanigans
Jul 15, 2024
31a26c0
integrate flushing and for enabling/disabling pairs from rpc shenanigans
Jul 15, 2024
e1f2f7a
some changes
Jul 15, 2024
76524cc
linter: fixes strikes again.
Jul 15, 2024
640e82e
Change name ConnectionAssociation -> ConnectionCandidate for better c…
Jul 15, 2024
3a0440d
Add subscription tests (state functional)
Jul 16, 2024
c7d2b62
glorious:nits + proxy handling
Jul 16, 2024
fc281ee
Spelling
Jul 16, 2024
eaa44ba
linter: fixerino
Jul 16, 2024
a8debf9
instead of nil, dont do nil.
Jul 16, 2024
16b0e22
clean up nils
Jul 16, 2024
32252b2
cya nils
Jul 16, 2024
7c5d9c3
don't need to set URL or check if its running
Jul 17, 2024
2d2f872
stop ping handler routine leak
Jul 19, 2024
39191c8
* Fix bug where reader routine on error that is not a disconnection e…
Jul 19, 2024
f1c3895
Allow rollback on connect on any error across all connections
Jul 19, 2024
09bff6c
fix shadow jutsu
Jul 19, 2024
e66c9be
glorious/gk: nitters - adds in ws mock server
Jul 22, 2024
03de669
linter: fix
Jul 22, 2024
eddb58b
Merge branch 'master' into gateio_ws
Jul 23, 2024
8159b05
fix deadlock on connection as the previous channel had no reader and …
Jul 24, 2024
4f0b42f
Merge branch 'master' into gateio_ws
Jul 24, 2024
f98c3aa
gk: nits
Jul 24, 2024
d89a46a
Leak issue and edge case
Jul 25, 2024
51b9f17
gk: nits
Jul 26, 2024
6376a12
gk: drain brain
Jul 26, 2024
431c047
glorious: nits
Aug 15, 2024
d5bbd10
Merge branch 'master' into gateio_ws
Aug 19, 2024
d2f7c2e
Merge branch 'master' into gateio_ws
Aug 23, 2024
16d88cf
Merge branch 'master' into gateio_ws
Aug 24, 2024
0402bc7
Update exchanges/stream/websocket.go
shazbert Aug 30, 2024
0dfda95
glorious: nits
Aug 30, 2024
c58834e
add tests
Aug 30, 2024
d29893b
linter: fix
Aug 30, 2024
79e0eeb
Merge branch 'master' into gateio_ws
Aug 30, 2024
4f7bcd9
Merge branch 'master' into gateio_ws
Sep 2, 2024
45ff199
After merge
Sep 2, 2024
06acaac
Add error connection info
Sep 2, 2024
af986da
Fix edge case where it does not reconnect made by an already closed c…
Sep 9, 2024
1171278
stream coverage
Sep 12, 2024
4be6214
glorious: nits
Sep 12, 2024
a068ddb
Merge branch 'master' into gateio_ws
Sep 12, 2024
81cba36
glorious: nits removed asset error handling in stream package
Sep 12, 2024
bdc6954
linter: fix
Sep 13, 2024
3fe44ca
rm block
Sep 13, 2024
818584f
Add basic readme
Sep 13, 2024
289ac71
Merge branch 'master' into gateio_ws
Sep 13, 2024
dee32a4
Merge branch 'master' into gateio_ws
Sep 13, 2024
7063311
Merge branch 'master' into gateio_ws
Sep 16, 2024
0709ba3
fix asset enabled flush cycle for multi connection
Sep 16, 2024
b877b38
spella: fix
Sep 16, 2024
f7d1ec8
linter: fix
Sep 18, 2024
818432d
Merge branch 'master' into gateio_ws
Sep 18, 2024
bdc7afb
Add glorious suggestions, fix some race thing
Sep 19, 2024
3d6541e
reinstate name before any routine gets spawned
Sep 19, 2024
33c4128
stop on error in mock tests
Sep 19, 2024
ea25763
glorious: nits
Sep 19, 2024
ae92cd2
glorious: nits found in CI build
Sep 24, 2024
c49a75c
Add test for drain, bumped wait times as there seems to be something …
Sep 24, 2024
5551d54
mutex across shutdown and connect for protection
Sep 24, 2024
99c3c7b
lint: fix
Sep 24, 2024
f56cb78
test time withoffset, reinstate stop
Sep 24, 2024
b4bc7e2
fix whoops
Sep 24, 2024
ce4a5ce
const trafficCheckInterval; rm testmain
Sep 24, 2024
a745992
y
Sep 24, 2024
086fcc3
fix lint
Sep 24, 2024
a64e842
bump time check window
Sep 24, 2024
9e53313
stream: fix intermittant test failures while testing routines and rem…
Sep 25, 2024
10230b0
spells
Sep 25, 2024
4e02c4a
cant do what I did
Sep 25, 2024
7be80a0
protect race due to routine.
Sep 25, 2024
b51cf2f
update testURL
Sep 25, 2024
ff7ae03
use mock websocket connection instead of test URL's
Sep 25, 2024
1040153
linter: fix
Sep 25, 2024
f529bd2
remove url because its throwing errors on CI builds
Sep 25, 2024
5d0a7f7
connections drop all the time, don't need to worry about not being ab…
Sep 25, 2024
56cb431
remove another superfluous url thats not really set up for this
Sep 25, 2024
ca4999e
spawn overwatch routine when there is no errors, inline checker inste…
Sep 25, 2024
4240a0a
linter: fixerino uperino
Sep 26, 2024
8d6febc
glorious: panix
Sep 30, 2024
3df82ef
Merge branch 'master' into gateio_ws
Oct 1, 2024
7ff07d1
Merge branch 'master' into gateio_ws
Oct 1, 2024
3a24640
linter: things
Oct 1, 2024
1eef208
whoops
Oct 1, 2024
261b577
Merge branch 'master' into gateio_ws
Oct 1, 2024
1d0dc25
defer lock and use functions that don't require locking in SetProxyAd…
Oct 3, 2024
86cd5f9
Merge branch 'master' into gateio_ws
Oct 8, 2024
81bda15
lint: fix
Oct 8, 2024
2a62643
thrasher: nits
Oct 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/ADD_NEW_EXCHANGE.md
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ channels:
continue
}
// When we have a successful subscription, we can alert our internal management system of the success.
f.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe[i])
f.Websocket.AddSuccessfulSubscriptions(f.Websocket.Conn, channelsToSubscribe[i])
}
return errs
}
Expand Down Expand Up @@ -1038,7 +1038,7 @@ channels:
continue
}
// When we have a successful unsubscription, we can alert our internal management system of the success.
f.Websocket.RemoveSubscriptions(channelsToUnsubscribe[i])
f.Websocket.RemoveSubscriptions(f.Websocket.Conn, channelsToUnsubscribe[i])
}
if errs != nil {
return errs
Expand Down Expand Up @@ -1098,7 +1098,7 @@ func (f *FTX) Setup(exch *config.Exchange) error {
return err
}
// Sets up a new connection for the websocket, there are two separate connections denoted by the ConnectionSetup struct auth bool.
return f.Websocket.SetupNewConnection(stream.ConnectionSetup{
return f.Websocket.SetupNewConnection(&stream.ConnectionSetup{
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
// RateLimit int64 rudimentary rate limit that sleeps connection in milliseconds before sending designated payload
Expand Down
7 changes: 7 additions & 0 deletions engine/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2935,6 +2935,13 @@ func (s *RPCServer) SetExchangeAsset(_ context.Context, r *gctrpc.SetExchangeAss
return nil, err
}

if base.IsWebsocketEnabled() && base.Websocket.IsConnected() {
err = exch.FlushWebsocketChannels()
if err != nil {
return nil, err
}
}

return &gctrpc.GenericResponse{Status: MsgStatusSuccess}, nil
}

Expand Down
5 changes: 3 additions & 2 deletions exchanges/binance/binance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/subscription"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
testsubs "github.com/thrasher-corp/gocryptotrader/internal/testing/subscriptions"
mockws "github.com/thrasher-corp/gocryptotrader/internal/testing/websocket"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
)

Expand Down Expand Up @@ -1991,7 +1992,7 @@ func TestSubscribe(t *testing.T) {
require.ElementsMatch(t, req.Params, exp, "Params should have correct channels")
return w.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf(`{"result":null,"id":%d}`, req.ID)))
}
b = testexch.MockWsInstance[Binance](t, testexch.CurryWsMockUpgrader(t, mock))
b = testexch.MockWsInstance[Binance](t, mockws.CurryWsMockUpgrader(t, mock))
} else {
testexch.SetupWs(t, b)
}
Expand All @@ -2012,7 +2013,7 @@ func TestSubscribeBadResp(t *testing.T) {
require.NoError(t, err, "Unmarshal should not error")
return w.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf(`{"result":{"error":"carrots"},"id":%d}`, req.ID)))
}
b := testexch.MockWsInstance[Binance](t, testexch.CurryWsMockUpgrader(t, mock)) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
b := testexch.MockWsInstance[Binance](t, mockws.CurryWsMockUpgrader(t, mock)) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
err := b.Subscribe(channels)
assert.ErrorIs(t, err, common.ErrUnknownError, "Subscribe should error correctly")
assert.ErrorContains(t, err, "carrots", "Subscribe should error containing the carrots")
Expand Down
6 changes: 3 additions & 3 deletions exchanges/binance/binance_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ func (b *Binance) Unsubscribe(channels subscription.List) error {
// manageSubs subscribes or unsubscribes from a list of subscriptions
func (b *Binance) manageSubs(op string, subs subscription.List) error {
if op == wsSubscribeMethod {
if err := b.Websocket.AddSubscriptions(subs...); err != nil { // Note: AddSubscription will set state to subscribing
if err := b.Websocket.AddSubscriptions(b.Websocket.Conn, subs...); err != nil { // Note: AddSubscription will set state to subscribing
return err
}
} else {
Expand Down Expand Up @@ -592,15 +592,15 @@ func (b *Binance) manageSubs(op string, subs subscription.List) error {
b.Websocket.DataHandler <- err

if op == wsSubscribeMethod {
if err2 := b.Websocket.RemoveSubscriptions(subs...); err2 != nil {
if err2 := b.Websocket.RemoveSubscriptions(b.Websocket.Conn, subs...); err2 != nil {
err = common.AppendError(err, err2)
}
}
} else {
if op == wsSubscribeMethod {
err = common.AppendError(err, subs.SetStates(subscription.SubscribedState))
} else {
err = b.Websocket.RemoveSubscriptions(subs...)
err = b.Websocket.RemoveSubscriptions(b.Websocket.Conn, subs...)
}
}

Expand Down
2 changes: 1 addition & 1 deletion exchanges/binance/binance_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (b *Binance) Setup(exch *config.Exchange) error {
return err
}

return b.Websocket.SetupNewConnection(stream.ConnectionSetup{
return b.Websocket.SetupNewConnection(&stream.ConnectionSetup{
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
RateLimit: request.NewWeightedRateLimitByDuration(250 * time.Millisecond),
Expand Down
4 changes: 2 additions & 2 deletions exchanges/binanceus/binanceus_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ func (bi *Binanceus) Subscribe(channelsToSubscribe subscription.List) error {
return err
}
}
return bi.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe...)
return bi.Websocket.AddSuccessfulSubscriptions(bi.Websocket.Conn, channelsToSubscribe...)
}

// Unsubscribe unsubscribes from a set of channels
Expand All @@ -614,7 +614,7 @@ func (bi *Binanceus) Unsubscribe(channelsToUnsubscribe subscription.List) error
return err
}
}
return bi.Websocket.RemoveSubscriptions(channelsToUnsubscribe...)
return bi.Websocket.RemoveSubscriptions(bi.Websocket.Conn, channelsToUnsubscribe...)
}

func (bi *Binanceus) setupOrderbookManager() {
Expand Down
2 changes: 1 addition & 1 deletion exchanges/binanceus/binanceus_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (bi *Binanceus) Setup(exch *config.Exchange) error {
return err
}

return bi.Websocket.SetupNewConnection(stream.ConnectionSetup{
return bi.Websocket.SetupNewConnection(&stream.ConnectionSetup{
ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
ResponseMaxLimit: exch.WebsocketResponseMaxLimit,
RateLimit: request.NewWeightedRateLimitByDuration(300 * time.Millisecond),
Expand Down
16 changes: 8 additions & 8 deletions exchanges/bitfinex/bitfinex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1324,7 +1324,7 @@ func TestWsSubscribedResponse(t *testing.T) {
assert.ErrorContains(t, err, "waiter1", "Should error containing subID if")
}

err = b.Websocket.AddSubscriptions(&subscription.Subscription{Key: "waiter1"})
err = b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Key: "waiter1"})
require.NoError(t, err, "AddSubscriptions must not error")
err = b.wsHandleData([]byte(`{"event":"subscribed","channel":"ticker","chanId":224555,"subId":"waiter1","symbol":"tBTCUSD","pair":"BTCUSD"}`))
assert.NoError(t, err, "wsHandleData should not error")
Expand All @@ -1337,7 +1337,7 @@ func TestWsSubscribedResponse(t *testing.T) {
}

func TestWsOrderBook(t *testing.T) {
err := b.Websocket.AddSubscriptions(&subscription.Subscription{Key: 23405, Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsBook})
err := b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Key: 23405, Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsBook})
require.NoError(t, err, "AddSubscriptions must not error")
pressXToJSON := `[23405,[[38334303613,9348.8,0.53],[38334308111,9348.8,5.98979404],[38331335157,9344.1,1.28965787],[38334302803,9343.8,0.08230094],[38334279092,9343,0.8],[38334307036,9342.938663676,0.8],[38332749107,9342.9,0.2],[38332277330,9342.8,0.85],[38329406786,9342,0.1432012],[38332841570,9341.947288638,0.3],[38332163238,9341.7,0.3],[38334303384,9341.6,0.324],[38332464840,9341.4,0.5],[38331935870,9341.2,0.5],[38334312082,9340.9,0.02126899],[38334261292,9340.8,0.26763],[38334138680,9340.625455254,0.12],[38333896802,9339.8,0.85],[38331627527,9338.9,1.57863959],[38334186713,9338.9,0.26769],[38334305819,9338.8,2.999],[38334211180,9338.75285796,3.999],[38334310699,9337.8,0.10679883],[38334307414,9337.5,1],[38334179822,9337.1,0.26773],[38334306600,9336.659955102,1.79],[38334299667,9336.6,1.1],[38334306452,9336.6,0.13979771],[38325672859,9336.3,1.25],[38334311646,9336.2,1],[38334258509,9336.1,0.37],[38334310592,9336,1.79],[38334310378,9335.6,1.43],[38334132444,9335.2,0.26777],[38331367325,9335,0.07],[38334310703,9335,0.10680562],[38334298209,9334.7,0.08757301],[38334304857,9334.456899462,0.291],[38334309940,9334.088390727,0.0725],[38334310377,9333.7,1.2868],[38334297615,9333.607784,0.1108],[38334095188,9333.3,0.26785],[38334228913,9332.7,0.40861186],[38334300526,9332.363996604,0.3884],[38334310701,9332.2,0.10680562],[38334303548,9332.005382871,0.07],[38334311798,9331.8,0.41285228],[38334301012,9331.7,1.7952],[38334089877,9331.4,0.2679],[38321942150,9331.2,0.2],[38334310670,9330,1.069],[38334063096,9329.6,0.26796],[38334310700,9329.4,0.10680562],[38334310404,9329.3,1],[38334281630,9329.1,6.57150597],[38334036864,9327.7,0.26801],[38334310702,9326.6,0.10680562],[38334311799,9326.1,0.50220625],[38334164163,9326,0.219638],[38334309722,9326,1.5],[38333051682,9325.8,0.26807],[38334302027,9325.7,0.75],[38334203435,9325.366592,0.32397696],[38321967613,9325,0.05],[38334298787,9324.9,0.3],[38334301719,9324.8,3.6227592],[38331316716,9324.763454646,0.71442],[38334310698,9323.8,0.10680562],[38334035499,9323.7,0.23431017],[38334223472,9322.670551788,0.42150603],[38334163459,9322.560399006,0.143967],[38321825171,9320.8,2],[38334075805,9320.467496148,0.30772633],[38334075800,9319.916732238,0.61457592],[38333682302,9319.7,0.0011],[38331323088,9319.116771762,0.12913],[38333677480,9319,0.0199],[38334277797,9318.6,0.89],[38325235155,9318.041088,1.20249],[38334310910,9317.82382938,1.79],[38334311811,9317.2,0.61079138],[38334311812,9317.2,0.71937652],[38333298214,9317.1,50],[38334306359,9317,1.79],[38325531545,9316.382823951,0.21263],[38333727253,9316.3,0.02316372],[38333298213,9316.1,45],[38333836479,9316,2.135],[38324520465,9315.9,2.7681],[38334307411,9315.5,1],[38330313617,9315.3,0.84455],[38334077770,9315.294024,0.01248397],[38334286663,9315.294024,1],[38325533762,9315.290315394,2.40498],[38334310018,9315.2,3],[38333682617,9314.6,0.0011],[38334304794,9314.6,0.76364676],[38334304798,9314.3,0.69242113],[38332915733,9313.8,0.0199],[38334084411,9312.8,1],[38334311893,9350.1,-1.015],[38334302734,9350.3,-0.26737],[38334300732,9350.8,-5.2],[38333957619,9351,-0.90677089],[38334300521,9351,-1.6457],[38334301600,9351.012829557,-0.0523],[38334308878,9351.7,-2.5],[38334299570,9351.921544,-0.1015],[38334279367,9352.1,-0.26732],[38334299569,9352.411802928,-0.4036],[38334202773,9353.4,-0.02139404],[38333918472,9353.7,-1.96412776],[38334278782,9354,-0.26731],[38334278606,9355,-1.2785],[38334302105,9355.439221251,-0.79191542],[38313897370,9355.569409242,-0.43363],[38334292995,9355.584296,-0.0979],[38334216989,9355.8,-0.03686414],[38333894025,9355.9,-0.26721],[38334293798,9355.936691952,-0.4311],[38331159479,9356,-0.4204022],[38333918888,9356.1,-1.10885563],[38334298205,9356.4,-0.20124428],[38328427481,9356.5,-0.1],[38333343289,9356.6,-0.41034213],[38334297205,9356.6,-0.08835018],[38334277927,9356.741101161,-0.0737],[38334311645,9356.8,-0.5],[38334309002,9356.9,-5],[38334309736,9357,-0.10680107],[38334306448,9357.4,-0.18645275],[38333693302,9357.7,-0.2672],[38332815159,9357.8,-0.0011],[38331239824,9358.2,-0.02],[38334271608,9358.3,-2.999],[38334311971,9358.4,-0.55],[38333919260,9358.5,-1.9972841],[38334265365,9358.5,-1.7841],[38334277960,9359,-3],[38334274601,9359.020969848,-3],[38326848839,9359.1,-0.84],[38334291080,9359.247048,-0.16199869],[38326848844,9359.4,-1.84],[38333680200,9359.6,-0.26713],[38331326606,9359.8,-0.84454],[38334309738,9359.8,-0.10680107],[38331314707,9359.9,-0.2],[38333919803,9360.9,-1.41177599],[38323651149,9361.33417827,-0.71442],[38333656906,9361.5,-0.26705],[38334035500,9361.5,-0.40861586],[38334091886,9362.4,-6.85940815],[38334269617,9362.5,-4],[38323629409,9362.545858872,-2.40497],[38334309737,9362.7,-0.10680107],[38334312380,9362.7,-3],[38325280830,9362.8,-1.75123],[38326622800,9362.8,-1.05145],[38333175230,9363,-0.0011],[38326848745,9363.2,-0.79],[38334308960,9363.206775564,-0.12],[38333920234,9363.3,-1.25318113],[38326848843,9363.4,-1.29],[38331239823,9363.4,-0.02],[38333209613,9363.4,-0.26719],[38334299964,9364,-0.05583123],[38323470224,9364.161816648,-0.12912],[38334284711,9365,-0.21346019],[38334299594,9365,-2.6757062],[38323211816,9365.073132585,-0.21262],[38334312456,9365.1,-0.11167861],[38333209612,9365.2,-0.26719],[38327770474,9365.3,-0.0073],[38334298788,9365.3,-0.3],[38334075803,9365.409831204,-0.30772637],[38334309740,9365.5,-0.10680107],[38326608767,9365.7,-2.76809],[38333920657,9365.7,-1.25848083],[38329594226,9366.6,-0.02587],[38334311813,9366.7,-4.72290945],[38316386301,9367.39258128,-2.37581],[38334302026,9367.4,-4.5],[38334228915,9367.9,-0.81725458],[38333921381,9368.1,-1.72213641],[38333175678,9368.2,-0.0011],[38334301150,9368.2,-2.654604],[38334297208,9368.3,-0.78036466],[38334309739,9368.3,-0.10680107],[38331227515,9368.7,-0.02],[38331184470,9369,-0.003975],[38334203436,9369.319616,-0.32397695],[38334269964,9369.7,-0.5],[38328386732,9370,-4.11759935],[38332719555,9370,-0.025],[38333921935,9370.5,-1.2224398],[38334258511,9370.5,-0.35],[38326848842,9370.8,-0.34],[38333985038,9370.9,-0.8551502],[38334283018,9370.9,-1],[38326848744,9371,-1.34]],5]`
err = b.wsHandleData([]byte(pressXToJSON))
Expand All @@ -1355,7 +1355,7 @@ func TestWsOrderBook(t *testing.T) {
}

func TestWsTradeResponse(t *testing.T) {
err := b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsTrades, Key: 18788})
err := b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsTrades, Key: 18788})
require.NoError(t, err, "AddSubscriptions must not error")
pressXToJSON := `[18788,[[412685577,1580268444802,11.1998,176.3],[412685575,1580268444802,5,176.29952759],[412685574,1580268374717,1.99069999,176.41],[412685573,1580268374717,1.00930001,176.41],[412685572,1580268358760,0.9907,176.47],[412685571,1580268324362,0.5505,176.44],[412685570,1580268297270,-0.39040819,176.39],[412685568,1580268297270,-0.39780162,176.46475676],[412685567,1580268283470,-0.09,176.41],[412685566,1580268256536,-2.31310783,176.48],[412685565,1580268256536,-0.59669217,176.49],[412685564,1580268256536,-0.9902,176.49],[412685562,1580268194474,0.9902,176.55],[412685561,1580268186215,0.1,176.6],[412685560,1580268185964,-2.17096773,176.5],[412685559,1580268185964,-1.82903227,176.51],[412685558,1580268181215,2.098914,176.53],[412685557,1580268169844,16.7302,176.55],[412685556,1580268169844,3.25,176.54],[412685555,1580268155725,0.23576115,176.45],[412685553,1580268155725,3,176.44596249],[412685552,1580268155725,3.25,176.44],[412685551,1580268155725,5,176.44],[412685550,1580268155725,0.65830078,176.41],[412685549,1580268155725,0.45063807,176.41],[412685548,1580268153825,-0.67604704,176.39],[412685547,1580268145713,2.5883,176.41],[412685543,1580268087513,12.92927,176.33],[412685542,1580268087513,0.40083,176.33],[412685533,1580268005756,-0.17096773,176.32]]]`
err = b.wsHandleData([]byte(pressXToJSON))
Expand All @@ -1365,7 +1365,7 @@ func TestWsTradeResponse(t *testing.T) {
}

func TestWsTickerResponse(t *testing.T) {
err := b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsTicker, Key: 11534})
err := b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsTicker, Key: 11534})
require.NoError(t, err, "AddSubscriptions must not error")
pressXToJSON := `[11534,[61.304,2228.36155358,61.305,1323.2442970500003,0.395,0.0065,61.371,50973.3020771,62.5,57.421]]`
err = b.wsHandleData([]byte(pressXToJSON))
Expand All @@ -1376,7 +1376,7 @@ func TestWsTickerResponse(t *testing.T) {
if err != nil {
t.Error(err)
}
err = b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: wsTicker, Key: 123412})
err = b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: wsTicker, Key: 123412})
require.NoError(t, err, "AddSubscriptions must not error")
pressXToJSON = `[123412,[61.304,2228.36155358,61.305,1323.2442970500003,0.395,0.0065,61.371,50973.3020771,62.5,57.421]]`
err = b.wsHandleData([]byte(pressXToJSON))
Expand All @@ -1387,7 +1387,7 @@ func TestWsTickerResponse(t *testing.T) {
if err != nil {
t.Error(err)
}
err = b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: wsTicker, Key: 123413})
err = b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: wsTicker, Key: 123413})
require.NoError(t, err, "AddSubscriptions must not error")
pressXToJSON = `[123413,[61.304,2228.36155358,61.305,1323.2442970500003,0.395,0.0065,61.371,50973.3020771,62.5,57.421]]`
err = b.wsHandleData([]byte(pressXToJSON))
Expand All @@ -1398,7 +1398,7 @@ func TestWsTickerResponse(t *testing.T) {
if err != nil {
t.Error(err)
}
err = b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: wsTicker, Key: 123414})
err = b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{pair}, Channel: wsTicker, Key: 123414})
require.NoError(t, err, "AddSubscriptions must not error")
pressXToJSON = `[123414,[61.304,2228.36155358,61.305,1323.2442970500003,0.395,0.0065,61.371,50973.3020771,62.5,57.421]]`
err = b.wsHandleData([]byte(pressXToJSON))
Expand All @@ -1408,7 +1408,7 @@ func TestWsTickerResponse(t *testing.T) {
}

func TestWsCandleResponse(t *testing.T) {
err := b.Websocket.AddSubscriptions(&subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsCandles, Key: 343351})
err := b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: wsCandles, Key: 343351})
require.NoError(t, err, "AddSubscriptions must not error")
pressXToJSON := `[343351,[[1574698260000,7379.785503,7383.8,7388.3,7379.785503,1.68829482]]]`
err = b.wsHandleData([]byte(pressXToJSON))
Expand Down
10 changes: 5 additions & 5 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error {
c.Key = int(chanID)

// subscribeToChan removes the old subID keyed Subscription
if err := b.Websocket.AddSuccessfulSubscriptions(c); err != nil {
if err := b.Websocket.AddSuccessfulSubscriptions(b.Websocket.Conn, c); err != nil {
return fmt.Errorf("%w: %w subID: %s", stream.ErrSubscriptionFailure, err, subID)
}

Expand Down Expand Up @@ -1661,7 +1661,7 @@ func (b *Bitfinex) resubOrderbook(c *subscription.Subscription) error {

// Resub will block so we have to do this in a goro
go func() {
if err := b.Websocket.ResubscribeToChannel(c); err != nil {
if err := b.Websocket.ResubscribeToChannel(b.Websocket.Conn, c); err != nil {
log.Errorf(log.ExchangeSys, "%s error resubscribing orderbook: %v", b.Name, err)
}
}()
Expand Down Expand Up @@ -1748,13 +1748,13 @@ func (b *Bitfinex) subscribeToChan(chans subscription.List) error {
// Add a temporary Key so we can find this Sub when we get the resp without delay or context switch
// Otherwise we might drop the first messages after the subscribed resp
c.Key = subID // Note subID string type avoids conflicts with later chanID key
if err = b.Websocket.AddSubscriptions(c); err != nil {
if err = b.Websocket.AddSubscriptions(b.Websocket.Conn, c); err != nil {
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Pairs, err)
}

// Always remove the temporary subscription keyed by subID
defer func() {
_ = b.Websocket.RemoveSubscriptions(c)
_ = b.Websocket.RemoveSubscriptions(b.Websocket.Conn, c)
}()

respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, "subscribe:"+subID, req)
Expand Down Expand Up @@ -1861,7 +1861,7 @@ func (b *Bitfinex) unsubscribeFromChan(chans subscription.List) error {
return wErr
}

return b.Websocket.RemoveSubscriptions(c)
return b.Websocket.RemoveSubscriptions(b.Websocket.Conn, c)
}

// getErrResp takes a json response string and looks for an error event type
Expand Down
Loading
Loading