-
Notifications
You must be signed in to change notification settings - Fork 820
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: shift websocket reader and dialer code from wrapper to stream package #1493
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the concepts. Will need some more work, but defining functions at the Connection Setup level is nice. The removal of needing duplicate ReadMessage functions is nice
Handler: ok.WsHandleData, | ||
Bootstrap: ok.WsUnAuthBootstrap, | ||
ReadBufferSize: 8192, | ||
WriteBufferSize: 8192, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a comment consideration for subscription limit being passed in. Would something overlooking the subscriptions handle subscription count? Or can a connection itself understand that you're trying to oversubscribe?
I understand that the next step is about subscriptions, so I understand it won't be addressed here, but putting out the consideration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that is something that I want to think about implementing after shifting subscription handling over to individual connections. So we can hit a max ceiling and start generating new connections to handle everything.
exchanges/stream/websocket.go
Outdated
if w.GenerateSubs == nil { | ||
return errors.New("generate subscriptions function not set") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be much sooner, otherwise everything gets up and running, but it still errors out
// InitialiseConnection sets up a websocket connection | ||
func (w *Websocket) InitialiseConnection(conn Connection, bootstrap func(Connection) error, handler func([]byte) error) error { | ||
dialer := *websocket.DefaultDialer | ||
if w.proxyAddr != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With proxyAddr now being a direct url, and that SetProxy
can be called at any time via RPC, I think this will require mutexs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I un exported it and only call it in Websocket Connect
method which is locked down.
exchanges/stream/websocket.go
Outdated
if w.connector != nil { | ||
err := w.connector() | ||
if err != nil { | ||
w.setState(disconnected) | ||
return fmt.Errorf("%v Error connecting %w", w.exchangeName, err) | ||
} | ||
} | ||
|
||
if w.Conn != nil { | ||
err := w.InitialiseConnection(w.Conn, w.UnAuthBootstrap, w.UnAuthHandler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You will definitely need to expand on these details for differentiating these two ways of setting up connections. At present it is quite confusion which one is what and why. No using "new" 😄
Docs/commentary required about the changes youre introducing
exchanges/stream/websocket.go
Outdated
} | ||
} | ||
|
||
if w.Conn != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This setup doesn't work. Non-new websockets will panic here as w.Conn is defined, but the functions below are not
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x2 addr=0x0 pc=0x104adf5a8]
goroutine 120 [running]:
github.com/thrasher-corp/gocryptotrader/exchanges/stream.(*Websocket).listen(0x140001ae3c0, {0x1058830d8, 0x140008e0100}, 0x0)
/Users/scottg/go/src/github.com/thrasher-corp/gocryptotrader/exchanges/stream/websocket.go:992 +0x78
created by github.com/thrasher-corp/gocryptotrader/exchanges/stream.(*Websocket).InitialiseConnection in goroutine 146
/Users/scottg/go/src/github.com/thrasher-corp/gocryptotrader/exchanges/stream/websocket.go:1022 +0x1a8
This is for Kraken, it shouldn'tbe hitting this new code for Kraken, right? No errors are thrown from nil functions in InistialiseConnection
exchanges/stream/websocket_types.go
Outdated
// Authenticated stream connection | ||
AuthConn Connection | ||
AuthConn Connection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Down the line, I would love for AuthConn to be removed. An auth connection is just another connection. At present, any connection issues for unauth/auth affect the other and it doesn't make much sense for the direction we're heading with multiple websocket connection types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work 🎉
Subjective comments mostly
I think it moves us in the right direction, definitely.
I'm mindful of what our overall websockets structure will look like, and the current mixing of naming stream
, websocket
and connection
, but I think this improves, generally.
exchanges/stream/websocket.go
Outdated
return w.Shutdown() | ||
} | ||
return nil | ||
} | ||
|
||
// GetWebsocketURL returns the running websocket URL | ||
func (w *Websocket) GetWebsocketURL() string { | ||
return w.runningURL | ||
func (w *Websocket) GetWebsocketURL(auth bool) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two issues with this:
- I don't think we should go further down the "websocket is a websocket but maybe actually 2 and one of them is auth" route.
- I don't think there's any cleaner API here in passing a boolean than having 2 methods
So in both cases I vote for GetWebsocketAuthURL() which has convenience handling for AuthConn is nil.
exchanges/stream/websocket.go
Outdated
@@ -677,76 +680,70 @@ func (w *Websocket) CanUseAuthenticatedWebsocketForWrapper() bool { | |||
|
|||
// SetWebsocketURL sets websocket URL and can refresh underlying connections | |||
func (w *Websocket) SetWebsocketURL(url string, auth, reconnect bool) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Back to front-review-comments 🤦
See my comment below on GetWebsocketURL.
I actually vote for 2 methods on this too, even though it changes the API.
Trying to think about how this pattern would look like for a router, but I'm about a week away from putting that up on the drawing board. Still mulling it over.
exchanges/stream/websocket.go
Outdated
func (w *Websocket) IsInitialised() bool { | ||
return w.state.Load() != uninitialised | ||
} | ||
func (w *Websocket) IsInitialised() bool { return w.state.Load() != uninitialised } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a note on all of these one-linifications:
I've often considered them non-idiomatic, since I've never seen them in the core lib or written by any of the trend setters.
I also find them difficult to parse by scanning, because the content indentation varies if it has a comment, which public funcs need to have.
So I guess I'm saying there should be a space after 644 to make checkAndSetMonitorRunning
body parsable, and that 649-677 should be multi-liners.
This is subjective. But if you were on the fence or debating it, that's my taste on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't enjoy them either, if that helps sway you Shazbert 😄 Doesn't help readability, doesn't really save much space. If any changes to the func occur, it has to change anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exchanges/bitfinex/bitfinex.go
Outdated
@@ -1688,7 +1688,7 @@ func (b *Bitfinex) CancelMultipleOrdersV2(ctx context.Context, orderID, clientOr | |||
cancelledOrder.AuxLimitPrice = f | |||
} | |||
} | |||
cancelledOrders[y] = cancelledOrder | |||
cancelledOrders = append(cancelledOrders, cancelledOrder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this fix something?
In any case should be in it's own commit, and if it's a fix PR'd separately.
If it's fixing something, should have a test to prove it was broken before too 😄
exchanges/exchange_test.go
Outdated
@@ -208,7 +208,7 @@ func TestSetClientProxyAddress(t *testing.T) { | |||
t.Error("SetClientProxyAddress parsed invalid URL") | |||
} | |||
|
|||
if newBase.Websocket.GetProxyAddress() != "" { | |||
if newBase.Websocket.GetProxyAddress().String() != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think that this would work with assert.NotEmpty
And we should be explicit about why we're expecting this, e.g. "SetClientProxyAddress should not set ProxyAddress on error"
exchanges/stream/websocket_test.go
Outdated
mockServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
upgrader := websocket.Upgrader{} | ||
conn, err := upgrader.Upgrade(w, r, nil) | ||
if err != nil { | ||
log.Fatalf("Failed to upgrade connection to WebSocket: %v", err) | ||
} | ||
go func() { // Echo the message back to the client for future use. | ||
defer conn.Close() | ||
_, msg, err := conn.ReadMessage() | ||
if err != nil { | ||
log.Fatalf("Failed to read message from WebSocket connection: %v", err) | ||
} | ||
err = conn.WriteMessage(websocket.TextMessage, msg) | ||
if err != nil { | ||
log.Fatalf("Failed to write message to WebSocket connection: %v", err) | ||
} | ||
}() | ||
})) | ||
mockServerURL += strings.Split(mockServer.URL, "//")[1] | ||
|
||
mockProxy = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
if r.Method == http.MethodConnect { | ||
w.WriteHeader(http.StatusOK) | ||
return | ||
} | ||
|
||
upgrader := websocket.Upgrader{} | ||
conn, err := upgrader.Upgrade(w, r, nil) | ||
if err != nil { | ||
http.Error(w, "Failed to upgrade connection to WebSocket", http.StatusInternalServerError) | ||
return | ||
} | ||
actualServerConn, _, err := websocket.DefaultDialer.Dial("ws://"+r.Host, nil) | ||
if err != nil { | ||
err = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, "Failed to dial actual server from proxy")) | ||
log.Printf("Failed to dial actual server from proxy: %v", err) | ||
conn.Close() | ||
return | ||
} | ||
go func() { | ||
defer conn.Close() | ||
for { | ||
messageType, message, err := conn.ReadMessage() | ||
if err != nil { | ||
return | ||
} | ||
if err := actualServerConn.WriteMessage(messageType, message); err != nil { | ||
return | ||
} | ||
} | ||
}() | ||
go func() { | ||
defer actualServerConn.Close() | ||
for { | ||
messageType, message, err := actualServerConn.ReadMessage() | ||
if err != nil { | ||
return | ||
} | ||
if err := conn.WriteMessage(messageType, message); err != nil { | ||
return | ||
} | ||
} | ||
}() | ||
})) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be a func. Or 2.
exchanges/stream/websocket_test.go
Outdated
ExchangeName: "test3", | ||
Verbose: true, | ||
URL: websocketTestURL, | ||
// ProxyURL: proxyURL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
☝️ Feels like maybe these were leaked, so just highlighting.
exchanges/stream/websocket_test.go
Outdated
@@ -699,7 +789,7 @@ func TestSendMessage(t *testing.T) { | |||
testData := &testCases[i] | |||
t.Run(testData.WC.ExchangeName, func(t *testing.T) { | |||
t.Parallel() | |||
if testData.WC.ProxyURL != "" && !useProxyTests { | |||
if /*testData.WC.ProxyURL != "" &&*/ !useProxyTests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
☝️ Feels like maybe these were leaked, so just highlighting.
exchanges/stream/websocket_types.go
Outdated
ExchangeConfig *config.Exchange | ||
DefaultURL string | ||
ExchangeConfig *config.Exchange | ||
// DefaultURL string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
☝️ Feels like maybe these were leaked, so just highlighting.
Co-authored-by: Gareth Kirwan <[email protected]>
Co-authored-by: Gareth Kirwan <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #1493 +/- ##
==========================================
+ Coverage 35.89% 37.78% +1.88%
==========================================
Files 411 411
Lines 177595 147708 -29887
==========================================
- Hits 63752 55815 -7937
+ Misses 106058 84092 -21966
- Partials 7785 7801 +16
|
w.setConnectedStatus(false) | ||
return w.Connection.UnderlyingConn().Close() | ||
w.connected.Store(false) | ||
return w.Connection.NetConn().Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UnderlyingConn was deprecated so its now NetConn()
Closing in favour of #1580. |
PR Description
Opened as draft for initial inspection and critique.This attempts to reduce code duplication for websocket wrappers and gets it another step forward for multi-connection handling.ErrCurrencyPairRequired
SetWebsocketURL
into two functionsSetWebsocketURL
&&SetWebsocketAuthURL
GetWebsocketURL
into two functionsGetWebsocketURL
&&GetWebsocketAuthURL
DefaultURL
setState
,setEnabled
,setDataMonitorRunning
,dataMonitorRunning
,checkAndSetMonitorRunning
,setTrafficMonitorRunning
,setConnectionMonitorRunning
and use the field methods directly.parseBinaryResponse()
so as to make sure ifio.ReadAll
fails it will still close.Websocket_Connection
setConnectedStatus
andIsConnected
methods in favour of field methods.Fixes # (issue)
Type of change
Please delete options that are not relevant and add an
x
in[]
as item is complete.How has this been tested
Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration and
also consider improving test coverage whilst working on a certain feature or package.
Checklist