Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bitfinex: Fix WS trade processing #1754

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ issues:
- text: "Expect WriteFile permissions to be 0600 or less"
linters:
- gosec
- text: 'shadow: declaration of "err" shadows declaration at'
linters: [ govet ]


exclude-dirs:
- vendor
Expand Down
135 changes: 76 additions & 59 deletions exchanges/bitfinex/bitfinex_test.go

Large diffs are not rendered by default.

25 changes: 14 additions & 11 deletions exchanges/bitfinex/bitfinex_types.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package bitfinex

import (
"encoding/json"
"errors"
"sync"
"time"

"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/types"
)

var (
errSetCannotBeEmpty = errors.New("set cannot be empty")
errNoSeqNo = errors.New("no sequence number")
errParamNotAllowed = errors.New("param not allowed")
errParsingWSField = errors.New("error parsing WS field")
errTickerInvalidSymbol = errors.New("invalid ticker symbol")
errTickerInvalidResp = errors.New("invalid ticker response format")
errTickerInvalidFieldCount = errors.New("invalid ticker response field count")
Expand Down Expand Up @@ -488,16 +489,18 @@ type WebsocketBook struct {
Period int64
}

// WebsocketTrade holds trade information
type WebsocketTrade struct {
// wsTrade holds trade information
type wsTrade struct {
ID int64
Timestamp int64
Price float64
Timestamp types.Time
Amount float64
// Funding rate of the trade
Rate float64
// Funding offer period in days
Period int64
Price float64
Period int64 // Funding offer period in days
}

// UnmarshalJSON unmarshals json bytes into a wsTrade
func (t *wsTrade) UnmarshalJSON(data []byte) error {
return json.Unmarshal(data, &[5]any{&t.ID, &t.Timestamp, &t.Amount, &t.Price, &t.Period})
}

// Candle holds OHLC data
Expand Down Expand Up @@ -625,7 +628,7 @@ const (
wsPositionClose = "pc"
wsWalletSnapshot = "ws"
wsWalletUpdate = "wu"
wsTradeExecutionUpdate = "tu"
wsTradeUpdated = "tu"
wsTradeExecuted = "te"
wsFundingCreditSnapshot = "fcs"
wsFundingCreditNew = "fcn"
Expand All @@ -636,7 +639,7 @@ const (
wsFundingLoanUpdate = "flu"
wsFundingLoanCancel = "flc"
wsFundingTradeExecuted = "fte"
wsFundingTradeUpdate = "ftu"
wsFundingTradeUpdated = "ftu"
wsFundingInfoUpdate = "fiu"
wsBalanceUpdate = "bu"
wsMarginInfoUpdate = "miu"
Expand Down
224 changes: 94 additions & 130 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"hash/crc32"
"math"
"net/http"
"sort"
"strconv"
Expand Down Expand Up @@ -33,6 +34,10 @@ import (
"github.com/thrasher-corp/gocryptotrader/log"
)

var (
errParsingWSField = errors.New("error parsing WS field")
)
gbjk marked this conversation as resolved.
Show resolved Hide resolved

var defaultSubscriptions = subscription.List{
{Enabled: true, Channel: subscription.TickerChannel, Asset: asset.All},
{Enabled: true, Channel: subscription.AllTradesChannel, Asset: asset.All},
Expand Down Expand Up @@ -162,8 +167,8 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
eventType, hasEventType := d[1].(string)

if chanID != 0 {
if c := b.Websocket.GetSubscription(chanID); c != nil {
return b.handleWSChannelUpdate(c, eventType, d)
if s := b.Websocket.GetSubscription(chanID); s != nil {
return b.handleWSChannelUpdate(s, respRaw, eventType, d)
}
if b.Verbose {
log.Warnf(log.ExchangeSys, "%s %s; dropped WS message: %s", b.Name, subscription.ErrNotFound, respRaw)
Expand Down Expand Up @@ -201,8 +206,8 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
return b.handleWSPositionSnapshot(d)
case wsPositionNew, wsPositionUpdate, wsPositionClose:
return b.handleWSPositionUpdate(d)
case wsTradeExecuted, wsTradeExecutionUpdate:
return b.handleWSTradeUpdate(d, eventType)
case wsTradeExecuted, wsTradeUpdated:
return b.handleWSMyTradeUpdate(d, eventType)
case wsFundingOfferSnapshot:
if snapBundle, ok := d[2].([]interface{}); ok && len(snapBundle) > 0 {
if _, ok := snapBundle[0].([]interface{}); ok {
Expand Down Expand Up @@ -398,7 +403,7 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
b.Websocket.DataHandler <- fundingInfo
}
}
case wsFundingTradeExecuted, wsFundingTradeUpdate:
case wsFundingTradeExecuted, wsFundingTradeUpdated:
if data, ok := d[2].([]interface{}); ok && len(data) > 0 {
var wsFundingTrade WsFundingTrade
tradeID, ok := data[0].(float64)
Expand Down Expand Up @@ -544,16 +549,15 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error {
return nil
}

func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, eventType string, d []interface{}) error {
func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, respRaw []byte, eventType string, d []interface{}) error {
if s == nil {
return fmt.Errorf("%w: Subscription param", common.ErrNilPointer)
}

if eventType == wsChecksum {
switch eventType {
case wsChecksum:
return b.handleWSChecksum(s, d)
}

if eventType == wsHeartbeat {
case wsHeartbeat:
return nil
}

Expand All @@ -569,7 +573,7 @@ func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, eventType
case subscription.TickerChannel:
return b.handleWSTickerUpdate(s, d)
case subscription.AllTradesChannel:
return b.handleWSTradesUpdate(s, eventType, d)
return b.handleWSAllTrades(s, respRaw)
}

return fmt.Errorf("%s unhandled channel update: %s", b.Name, s.Channel)
Expand Down Expand Up @@ -869,139 +873,99 @@ func (b *Bitfinex) handleWSTickerUpdate(c *subscription.Subscription, d []interf
return nil
}

func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType string, d []interface{}) error {
if c == nil {
func (b *Bitfinex) handleWSAllTrades(s *subscription.Subscription, respRaw []byte) error {
feedEnabled := b.IsTradeFeedEnabled()
if !feedEnabled && !b.IsSaveTradeDataEnabled() {
return nil
}
if s == nil {
return fmt.Errorf("%w: Subscription param", common.ErrNilPointer)
}
if len(c.Pairs) != 1 {
if len(s.Pairs) != 1 {
return subscription.ErrNotSinglePair
}
if !b.IsSaveTradeDataEnabled() {
return nil
}
if c.Asset == asset.MarginFunding {
return nil
_, valueType, _, err := jsonparser.Get(respRaw, "[1]")
if err != nil {
return fmt.Errorf("%w `tradesUpdate[1]`: %w", errParsingWSField, err)
}
var tradeHolder []WebsocketTrade
switch len(d) {
case 2:
snapshot, ok := d[1].([]interface{})
if !ok {
return errors.New("unable to type assert trade snapshot data")
}
for i := range snapshot {
elem, ok := snapshot[i].([]interface{})
if !ok {
return errors.New("unable to type assert trade snapshot element data")
}
tradeID, ok := elem[0].(float64)
if !ok {
return errors.New("unable to type assert trade ID")
}
timestamp, ok := elem[1].(float64)
if !ok {
return errors.New("unable to type assert trade timestamp")
}
amount, ok := elem[2].(float64)
if !ok {
return errors.New("unable to type assert trade amount")
}
wsTrade := WebsocketTrade{
ID: int64(tradeID),
Timestamp: int64(timestamp),
Amount: amount,
}
if len(elem) == 5 {
rate, ok := elem[3].(float64)
if !ok {
return errors.New("unable to type assert trade rate")
}
wsTrade.Rate = rate
period, ok := elem[4].(float64)
if !ok {
return errors.New("unable to type assert trade period")
}
wsTrade.Period = int64(period)
} else {
price, ok := elem[3].(float64)
if !ok {
return errors.New("unable to type assert trade price")
}
wsTrade.Rate = price
}
tradeHolder = append(tradeHolder, wsTrade)
}
case 3:
if eventType != wsFundingTradeUpdate && eventType != wsTradeExecutionUpdate {
return fmt.Errorf("unhandled WS trade update event: %s", eventType)
}
data, ok := d[2].([]interface{})
if !ok {
return errors.New("trade data type assertion error")
var wsTrades []*wsTrade
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change this shadowed var name/the original string var name

switch valueType {
case jsonparser.String:
if t, err := b.handleWSPublicTradeUpdate(respRaw); err != nil {
return err
} else {
wsTrades = []*wsTrade{t}
}

tradeID, ok := data[0].(float64)
if !ok {
return errors.New("unable to type assert trade ID")
case jsonparser.Array:
if wsTrades, err = b.handleWSPublicTradesSnapshot(respRaw); err != nil {
return err
}
timestamp, ok := data[1].(float64)
if !ok {
return errors.New("unable to type assert trade timestamp")
default:
return fmt.Errorf("%w `tradesUpdate[1]`: %w `%s`", errParsingWSField, jsonparser.UnknownValueTypeError, valueType)
}
trades := make([]trade.Data, len(wsTrades))
for i, w := range wsTrades {
t := trade.Data{
Exchange: b.Name,
AssetType: s.Asset,
CurrencyPair: s.Pairs[0],
TID: strconv.FormatInt(w.ID, 10),
Timestamp: w.Timestamp.Time().UTC(),
Side: order.Buy,
Amount: w.Amount,
Price: w.Price,
}
amount, ok := data[2].(float64)
if !ok {
return errors.New("unable to type assert trade amount")
if w.Period != 0 {
t.AssetType = asset.MarginFunding
}
wsTrade := WebsocketTrade{
ID: int64(tradeID),
Timestamp: int64(timestamp),
Amount: amount,
if t.Amount < 0 {
t.Side = order.Sell
t.Amount = math.Abs(t.Amount)
}
if len(data) == 5 {
rate, ok := data[3].(float64)
if !ok {
return errors.New("unable to type assert trade rate")
}
period, ok := data[4].(float64)
if !ok {
return errors.New("unable to type assert trade period")
}
wsTrade.Rate = rate
wsTrade.Period = int64(period)
trades[i] = t
}
if feedEnabled {
b.Websocket.DataHandler <- trades
}
if b.IsSaveTradeDataEnabled() {
err = trade.AddTradesToBuffer(b.GetName(), trades...)
}
return err
}

func (b *Bitfinex) handleWSPublicTradesSnapshot(respRaw []byte) (trades []*wsTrade, errs error) {
handleTrade := func(v []byte, valueType jsonparser.ValueType, _ int, _ error) {
if valueType != jsonparser.Array {
errs = common.AppendError(errs, fmt.Errorf("%w `tradesSnapshot[1][*]`: %w `%s`", errParsingWSField, jsonparser.UnknownValueTypeError, valueType))
Comment on lines +938 to +939
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little bit extra given that there is only one caller to this and its behind a gate checking if its an array. I am normally one for all checks should be within the function, but its that you just checked to get to this function

Copy link
Collaborator

@gloriousCode gloriousCode Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this simplification?

func (b *Bitfinex) handleWSPublicTradesSnapshot(respRaw []byte) ([]*wsTrade, error) {
	var trades []*wsTrade
	err := json.Unmarshal(respRaw, &trades)
	return trades, err
}

Where handleWSAllTrades changes with:

k, valueType, _, err := jsonparser.Get(respRaw, "[1]")
...
if wsTrades, err = b.handleWSPublicTradesSnapshot(k); err != nil {

A lot less tangling of additional jsonparser functions, with added bonus of naked return removal

} else {
price, ok := data[3].(float64)
if !ok {
return errors.New("unable to type assert trade price")
t := &wsTrade{}
if err := json.Unmarshal(v, t); err != nil {
errs = common.AppendError(errs, fmt.Errorf("%w `tradesSnapshot[1][*]`: %w", errParsingWSField, err))
} else {
trades = append(trades, t)
}
wsTrade.Price = price
}
tradeHolder = append(tradeHolder, wsTrade)
}
trades := make([]trade.Data, len(tradeHolder))
for i := range tradeHolder {
side := order.Buy
newAmount := tradeHolder[i].Amount
if newAmount < 0 {
side = order.Sell
newAmount *= -1
}
price := tradeHolder[i].Price
if price == 0 && tradeHolder[i].Rate > 0 {
price = tradeHolder[i].Rate
}
trades[i] = trade.Data{
TID: strconv.FormatInt(tradeHolder[i].ID, 10),
CurrencyPair: c.Pairs[0],
Timestamp: time.UnixMilli(tradeHolder[i].Timestamp),
Price: price,
Amount: newAmount,
Exchange: b.Name,
AssetType: c.Asset,
Side: side,
}
}

return b.AddTradesToBuffer(trades...)
if _, err := jsonparser.ArrayEach(respRaw, handleTrade, "[1]"); err != nil {
errs = common.AppendError(errs, err)
}
return
}

func (b *Bitfinex) handleWSPublicTradeUpdate(respRaw []byte) (*wsTrade, error) {
v, valueType, _, err := jsonparser.Get(respRaw, "[2]")
if err != nil {
return nil, fmt.Errorf("%w `tradesUpdate[2]`: %w", errParsingWSField, err)
}
if valueType != jsonparser.Array {
return nil, fmt.Errorf("%w `tradesUpdate[2]`: %w `%s`", errParsingWSField, jsonparser.UnknownValueTypeError, valueType)
}
t := &wsTrade{}
if err := json.Unmarshal(v, t); err != nil {
return nil, fmt.Errorf("%w `tradeUpdate[2]`: %w", errParsingWSField, err)
}
return t, nil
}

func (b *Bitfinex) handleWSNotification(d []interface{}, respRaw []byte) error {
Expand Down Expand Up @@ -1173,7 +1137,7 @@ func (b *Bitfinex) handleWSPositionUpdate(d []interface{}) error {
return nil
}

func (b *Bitfinex) handleWSTradeUpdate(d []interface{}, eventType string) error {
func (b *Bitfinex) handleWSMyTradeUpdate(d []interface{}, eventType string) error {
tradeData, ok := d[2].([]interface{})
if !ok {
return common.GetTypeAssertError("[]interface{}", d[2], "tradeUpdate")
Expand Down
5 changes: 5 additions & 0 deletions exchanges/bitfinex/testdata/wsAllTrades.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[18788,[[412685577,1580268444802,11.1998,176.3],[412685578,1580268484802,-5,176.29952759],[412685579,1580269005757,4.2,0.1244,12]],1]
[18788,"te",[5690221201,1734237017719,0.00991467,102570],2]
[18788,"tu",[5690221202,1734237017704,-0.01925285,102560],3]
[18788,"fte",[5690221203,1734237018019,0.00991467,102550,30],4]
[18788,"ftu",[5690221204,1734237018094,-0.01925285,102540,30],5]
2 changes: 1 addition & 1 deletion exchanges/btse/btse_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1281,7 +1281,7 @@ func (b *BTSE) UpdateOrderExecutionLimits(ctx context.Context, a asset.Item) err
var errs error
limits := make([]order.MinMaxLevel, 0, len(summary))
for _, marketInfo := range summary {
p, err := marketInfo.Pair() //nolint:govet // Deliberately shadow err
p, err := marketInfo.Pair()
if err != nil {
errs = common.AppendError(err, fmt.Errorf("%s: %w", p, err))
continue
Expand Down
Loading
Loading