From 5e7495216ae139a6b09115427dddeb3c360f4407 Mon Sep 17 00:00:00 2001 From: linstohu Date: Wed, 20 Dec 2023 20:33:03 +0800 Subject: [PATCH] feat: htx spot ws --- htx/spot/marketws/client.go | 342 ++++++++++++++++++++++++++++ htx/spot/marketws/client_test.go | 132 +++++++++++ htx/spot/marketws/events.go | 34 +++ htx/spot/marketws/response.go | 118 ++++++++++ htx/spot/marketws/subscriptions.go | 96 ++++++++ htx/spot/marketws/topics.go | 99 ++++++++ htx/spot/marketws/types/messages.go | 83 +++++++ htx/spot/marketws/vars.go | 32 +++ htx/spot/rest/types/kline.go | 33 +++ htx/utils/gzip.go | 62 +++++ 10 files changed, 1031 insertions(+) create mode 100644 htx/spot/marketws/client.go create mode 100644 htx/spot/marketws/client_test.go create mode 100644 htx/spot/marketws/events.go create mode 100644 htx/spot/marketws/response.go create mode 100644 htx/spot/marketws/subscriptions.go create mode 100644 htx/spot/marketws/topics.go create mode 100644 htx/spot/marketws/types/messages.go create mode 100644 htx/spot/marketws/vars.go create mode 100644 htx/spot/rest/types/kline.go create mode 100644 htx/utils/gzip.go diff --git a/htx/spot/marketws/client.go b/htx/spot/marketws/client.go new file mode 100644 index 0000000..7367266 --- /dev/null +++ b/htx/spot/marketws/client.go @@ -0,0 +1,342 @@ +/* + * Copyright (c) 2023, LinstoHu + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package marketws + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log/slog" + "math/rand" + "net/http" + "strings" + "sync" + "time" + + "github.com/chuckpreslar/emission" + "github.com/go-playground/validator" + "github.com/gorilla/websocket" + htxutils "github.com/linstohu/nexapi/htx/utils" + cmap "github.com/orcaman/concurrent-map/v2" +) + +type MarketWsClient struct { + baseURL string + // debug mode + debug bool + // logger + logger *slog.Logger + + ctx context.Context + conn *websocket.Conn + mu sync.RWMutex + isConnected bool + + autoReconnect bool + disconnect chan struct{} + + sending sync.Mutex + subscriptions cmap.ConcurrentMap[string, struct{}] + + emitter *emission.Emitter +} + +type MarketWsClientCfg struct { + BaseURL string `validate:"required"` + Debug bool + // Logger + Logger *slog.Logger +} + +func NewMarketWsClient(ctx context.Context, cfg *MarketWsClientCfg) (*MarketWsClient, error) { + if err := validator.New().Struct(cfg); err != nil { + return nil, err + } + + cli := &MarketWsClient{ + baseURL: cfg.BaseURL, + debug: cfg.Debug, + logger: cfg.Logger, + + ctx: ctx, + autoReconnect: true, + + subscriptions: cmap.New[struct{}](), + emitter: emission.NewEmitter(), + } + + if cli.logger == nil { + cli.logger = slog.Default() + } + + err := cli.start() + if err != nil { + return nil, err + } + + return cli, nil +} + +func (m *MarketWsClient) start() error { + m.conn = nil + m.setIsConnected(false) + m.disconnect = make(chan struct{}) + + for i := 0; i < MaxTryTimes; i++ { + conn, _, err := m.connect() + if err != nil { + m.logger.Info(fmt.Sprintf("connect error, times(%v), error: %s", i, err.Error())) + tm := (i + 1) * 5 + time.Sleep(time.Duration(tm) * time.Second) + continue + } + m.conn = conn + break + } + if m.conn == nil { + return errors.New("connect failed") + } + + m.setIsConnected(true) + + m.resubscribe() + + if m.autoReconnect { + go m.reconnect() + } + + go m.readMessages() + + return nil +} + +func (m *MarketWsClient) connect() (*websocket.Conn, *http.Response, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + conn, resp, err := websocket.DefaultDialer.DialContext(ctx, m.baseURL, nil) + if err == nil { + conn.SetReadLimit(32768 * 64) + } + + return conn, resp, err +} + +func (m *MarketWsClient) reconnect() { + <-m.disconnect + + m.setIsConnected(false) + + m.logger.Info("disconnect, then reconnect...") + + time.Sleep(1 * time.Second) + + select { + case <-m.ctx.Done(): + m.logger.Info(fmt.Sprintf("never reconnect, %s", m.ctx.Err())) + return + default: + m.start() + } +} + +// close closes the websocket connection +func (m *MarketWsClient) close() error { + close(m.disconnect) + + err := m.conn.Close() + if err != nil { + return err + } + + return nil +} + +// setIsConnected sets state for isConnected +func (m *MarketWsClient) setIsConnected(state bool) { + m.mu.Lock() + defer m.mu.Unlock() + + m.isConnected = state +} + +// IsConnected returns the WebSocket connection state +func (m *MarketWsClient) IsConnected() bool { + m.mu.RLock() + defer m.mu.RUnlock() + + return m.isConnected +} + +func (m *MarketWsClient) readMessages() { + for { + select { + case <-m.ctx.Done(): + m.logger.Info(fmt.Sprintf("context done, error: %s", m.ctx.Err().Error())) + + if err := m.close(); err != nil { + m.logger.Info(fmt.Sprintf("websocket connection closed error, %s", err.Error())) + } + + return + default: + msgType, buf, err := m.conn.ReadMessage() + if err != nil { + m.logger.Error(fmt.Sprintf("read message error, %s", err)) + time.Sleep(TimerIntervalSecond * time.Second) + continue + } + + // decompress gzip data if it is binary message + if msgType == websocket.BinaryMessage { + message, err := htxutils.GZipDecompress(buf) + if err != nil { + m.logger.Error(fmt.Sprintf("ungzip data error: %s", err)) + + if err := m.close(); err != nil { + m.logger.Error(fmt.Sprintf("websocket connection closed error, %s", err.Error())) + } + + return + } + + var msg AnyMessage + + err = json.Unmarshal([]byte(message), &msg) + if err != nil { + m.logger.Error(fmt.Sprintf("read object error, %s", err)) + + if err := m.close(); err != nil { + m.logger.Error(fmt.Sprintf("websocket connection closed error, %s", err.Error())) + } + + return + } + + switch { + case msg.Ping != nil: + err := m.pong(msg.Ping) + if err != nil { + m.logger.Error(fmt.Sprintf("handle ping error: %s", err.Error())) + } + case msg.Response != nil: + // todo + case msg.SubscribedMessage != nil: + err := m.handle(msg.SubscribedMessage) + if err != nil { + m.logger.Error(fmt.Sprintf("handle message error: %s", err.Error())) + } + } + } + } + } +} + +func (m *MarketWsClient) resubscribe() error { + topics := m.subscriptions.Keys() + + if len(topics) == 0 { + return nil + } + + redo := make([]string, 0) + + for _, v := range topics { + // do subscription + err := m.send(&Request{ + ID: fmt.Sprintf("%v", rand.Uint32()), + Sub: v, + }) + + if err != nil { + redo = append(redo, v) + continue + } + } + + if len(redo) != 0 { + return fmt.Errorf("resubscribe error: %s", strings.Join(redo, ",")) + } + + return nil +} + +func (m *MarketWsClient) subscribe(topic string) error { + if m.subscriptions.Has(topic) { + return nil + } + + // do subscription + + err := m.send(&Request{ + ID: fmt.Sprintf("%v", rand.Uint32()), + Sub: topic, + }) + + if err != nil { + return err + } + + m.subscriptions.Set(topic, struct{}{}) + + return nil +} + +func (m *MarketWsClient) unsubscribe(topic string) error { + err := m.send(&Request{ + ID: fmt.Sprintf("%v", rand.Uint32()), + UnSub: topic, + }) + + if err != nil { + return err + } + + m.subscriptions.Remove(topic) + + return nil +} + +func (m *MarketWsClient) send(req *Request) error { + m.sending.Lock() + + // Rate Limit: https://www.htx.com/en-us/opend/newApiPages/?id=662 + defer time.Sleep(100 * time.Millisecond) + defer m.sending.Unlock() + + if !m.IsConnected() { + return errors.New("connection is closed") + } + + return m.conn.WriteJSON(req) +} + +func (m *MarketWsClient) pong(ping *PingMessage) error { + m.sending.Lock() + + // Rate Limit: https://www.htx.com/en-us/opend/newApiPages/?id=662 + defer time.Sleep(100 * time.Millisecond) + defer m.sending.Unlock() + + if !m.IsConnected() { + return errors.New("connection is closed") + } + + return m.conn.WriteJSON(ping) +} diff --git a/htx/spot/marketws/client_test.go b/htx/spot/marketws/client_test.go new file mode 100644 index 0000000..1032188 --- /dev/null +++ b/htx/spot/marketws/client_test.go @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2023, LinstoHu + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package marketws + +import ( + "context" + "fmt" + "testing" + + "github.com/linstohu/nexapi/htx/spot/marketws/types" + resttypes "github.com/linstohu/nexapi/htx/spot/rest/types" + "github.com/stretchr/testify/assert" +) + +func testNewMarketWsClient(ctx context.Context, t *testing.T, url string) *MarketWsClient { + cli, err := NewMarketWsClient(ctx, &MarketWsClientCfg{ + BaseURL: url, + Debug: true, + }) + + if err != nil { + t.Fatalf("Could not create websocket client, %s", err) + } + + return cli +} + +func TestSubscribeKline(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + cli := testNewMarketWsClient(ctx, t, GlobalWsBaseURL) + + topic, err := cli.GetKlineTopic(&KlineTopicParam{ + Symbol: "btcusdt", + Interval: resttypes.Minute1, + }) + assert.Nil(t, err) + + cli.AddListener(topic, func(e any) { + kline, ok := e.(*types.Kline) + if !ok { + return + } + + fmt.Printf("Topic: %s, Open: %v, Close: %v, Low: %v, High: %v, Amount: %v\n", + topic, kline.Open, kline.Close, kline.Low, kline.High, kline.Amount) + }) + + cli.Subscribe(topic) + + select {} +} + +func TestSubscribeMBPUpdateDepth(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + cli := testNewMarketWsClient(ctx, t, MBPWsBaseURL) + + topic, err := cli.GetMBPDepthUpdateTopic(&MBPDepthUpdateTopicParam{ + Symbol: "btcusdt", + Level: 5, + }) + assert.Nil(t, err) + + cli.AddListener(topic, func(e any) { + depth, ok := e.(*types.MBPRefreshDepth) + if !ok { + return + } + + fmt.Printf("Topic: %s, ", topic) + for _, v := range depth.Bids { + fmt.Printf("Bids: %v", v) + } + for _, v := range depth.Asks { + fmt.Printf("Bids: %v", v) + } + }) + + cli.Subscribe(topic) + + select {} +} + +func TestSubscribeMBPRefreshDepth(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + cli := testNewMarketWsClient(ctx, t, GlobalWsBaseURL) + + topic, err := cli.GetMBPRefreshDepthTopic(&MBPDepthRefreshTopicParam{ + Symbol: "btcusdt", + Level: 5, + }) + assert.Nil(t, err) + + cli.AddListener(topic, func(e any) { + depth, ok := e.(*types.MBPRefreshDepth) + if !ok { + return + } + + fmt.Printf("Topic: %s, ", topic) + for _, v := range depth.Bids { + fmt.Printf("Bids: %v", v) + } + for _, v := range depth.Asks { + fmt.Printf("Bids: %v", v) + } + }) + + cli.Subscribe(topic) + + select {} +} diff --git a/htx/spot/marketws/events.go b/htx/spot/marketws/events.go new file mode 100644 index 0000000..095fe50 --- /dev/null +++ b/htx/spot/marketws/events.go @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2023, LinstoHu + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package marketws + +import "github.com/chuckpreslar/emission" + +type Listener func(any) + +func (m *MarketWsClient) AddListener(event string, listener Listener) *emission.Emitter { + return m.emitter.On(event, listener) +} + +func (m *MarketWsClient) RemoveListener(event string, listener Listener) *emission.Emitter { + return m.emitter.Off(m, listener) +} + +func (m *MarketWsClient) GetListeners(event string, argument any) *emission.Emitter { + return m.emitter.Emit(event, argument) +} diff --git a/htx/spot/marketws/response.go b/htx/spot/marketws/response.go new file mode 100644 index 0000000..00b9b83 --- /dev/null +++ b/htx/spot/marketws/response.go @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2023, LinstoHu + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package marketws + +import ( + "encoding/json" + "errors" + + "github.com/valyala/fastjson" +) + +type Request struct { + ID string `json:"id,omitempty"` + Sub string `json:"sub,omitempty"` + UnSub string `json:"unsub,omitempty"` +} + +// AnyMessage represents either a JSON Response or SubscribedMessage. +type AnyMessage struct { + Ping *PingMessage + Response *Response + SubscribedMessage *SubscribedMessage +} + +type PingMessage struct { + Ping int64 `json:"ping,omitempty"` +} + +type Response struct { + ID string `json:"id,omitempty"` + Status string `json:"status,omitempty"` + Subbed string `json:"subbed,omitempty"` + Ts int64 `json:"ts,omitempty"` +} + +type SubscribedMessage struct { + Channel string `json:"ch,omitempty"` + Ts int64 `json:"ts,omitempty"` + Data json.RawMessage `json:"tick"` +} + +func (m AnyMessage) MarshalJSON() ([]byte, error) { + var v any + + switch { + case m.Response != nil && m.SubscribedMessage == nil: + v = m.Response + case m.Response == nil && m.SubscribedMessage != nil: + v = m.SubscribedMessage + } + + if v != nil { + return json.Marshal(v) + } + + return nil, errors.New("message must have exactly one of the Response or SubscribedMessage fields set") +} + +func (m *AnyMessage) UnmarshalJSON(data []byte) error { + var p fastjson.Parser + v, err := p.ParseBytes(data) + if err != nil { + return err + } + + if v.Exists("ping") { + var resp PingMessage + + if err := json.Unmarshal(data, &resp); err != nil { + return err + } + + m.Ping = &resp + + return nil + } + + if v.Exists("id") { + var resp Response + + if err := json.Unmarshal(data, &resp); err != nil { + return err + } + + m.Response = &resp + + return nil + } + + if v.Exists("ch") { + var msg = &SubscribedMessage{ + Channel: string(v.GetStringBytes("ch")), + Ts: v.GetInt64("ts"), + Data: v.Get("tick").MarshalTo(nil), + } + + m.SubscribedMessage = msg + + return nil + } + + return nil +} diff --git a/htx/spot/marketws/subscriptions.go b/htx/spot/marketws/subscriptions.go new file mode 100644 index 0000000..42c13a3 --- /dev/null +++ b/htx/spot/marketws/subscriptions.go @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2023, LinstoHu + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package marketws + +import ( + "encoding/json" + "fmt" + "strings" + + "github.com/linstohu/nexapi/htx/spot/marketws/types" +) + +func (m *MarketWsClient) Subscribe(topic string) error { + return m.subscribe(topic) +} + +func (m *MarketWsClient) UnSubscribe(topic string) error { + return m.unsubscribe(topic) +} + +func (m *MarketWsClient) handle(msg *SubscribedMessage) error { + if m.debug { + m.logger.Info(fmt.Sprintf("subscribed message, channel: %s", msg.Channel)) + } + + if strings.Contains(msg.Channel, "mbp") { + switch { + case strings.Contains(msg.Channel, "refresh"): + var data types.MBPRefreshDepth + err := json.Unmarshal(msg.Data, &data) + if err != nil { + return err + } + m.GetListeners(msg.Channel, &data) + default: + return fmt.Errorf("unknown message, topic: %s", msg.Channel) + } + } else { + switch { + case strings.Contains(msg.Channel, "kline"): + var data types.Kline + err := json.Unmarshal(msg.Data, &data) + if err != nil { + return err + } + m.GetListeners(msg.Channel, &data) + case strings.Contains(msg.Channel, "bbo"): + var data types.BBO + err := json.Unmarshal(msg.Data, &data) + if err != nil { + return err + } + m.GetListeners(msg.Channel, &data) + case strings.Contains(msg.Channel, "depth"): + var data types.Depth + err := json.Unmarshal(msg.Data, &data) + if err != nil { + return err + } + m.GetListeners(msg.Channel, &data) + case strings.Contains(msg.Channel, "ticker"): + var data types.Ticker + err := json.Unmarshal(msg.Data, &data) + if err != nil { + return err + } + m.GetListeners(msg.Channel, &data) + case strings.Contains(msg.Channel, "trade"): + var data types.MarketTradeMsg + err := json.Unmarshal(msg.Data, &data) + if err != nil { + return err + } + m.GetListeners(msg.Channel, &data) + default: + return fmt.Errorf("unknown message, topic: %s", msg.Channel) + } + } + + return nil +} diff --git a/htx/spot/marketws/topics.go b/htx/spot/marketws/topics.go new file mode 100644 index 0000000..54cb38c --- /dev/null +++ b/htx/spot/marketws/topics.go @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2023, LinstoHu + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package marketws + +import ( + "fmt" + + "github.com/go-playground/validator" + spottypes "github.com/linstohu/nexapi/htx/spot/rest/types" +) + +type KlineTopicParam struct { + Symbol string `validate:"required"` + Interval spottypes.KlineInterval `validate:"required,oneof=1min 5min 15min 30min 60min 4hour 1day 1mon 1week 1year"` +} + +func (m *MarketWsClient) GetKlineTopic(params *KlineTopicParam) (string, error) { + err := validator.New().Struct(params) + if err != nil { + return "", err + } + + return fmt.Sprintf("market.%s.kline.%s", params.Symbol, params.Interval), nil +} + +func (m *MarketWsClient) GetBBOTopic(symbol string) (string, error) { + if symbol == "" { + return "", fmt.Errorf("the symbol field must be provided") + } + return fmt.Sprintf("market.%s.bbo", symbol), nil +} + +type DepthTopicParam struct { + Symbol string `validate:"required"` + Type string `validate:"required,oneof=step0 step1 step2 step3 step4 step5"` +} + +func (m *MarketWsClient) GetDepthTopic(params *DepthTopicParam) (string, error) { + err := validator.New().Struct(params) + if err != nil { + return "", err + } + return fmt.Sprintf("market.%s.depth.%s", params.Symbol, params.Type), nil +} + +func (m *MarketWsClient) GetTickerTopic(symbol string) (string, error) { + if symbol == "" { + return "", fmt.Errorf("the symbol field must be provided") + } + return fmt.Sprintf("market.%s.ticker", symbol), nil +} + +func (m *MarketWsClient) GetMarketTradeTopic(symbol string) (string, error) { + if symbol == "" { + return "", fmt.Errorf("the symbol field must be provided") + } + return fmt.Sprintf("market.%s.trade.detail", symbol), nil +} + +type MBPDepthUpdateTopicParam struct { + Symbol string `validate:"required"` + Level int `validate:"required,oneof=5 20 150 400"` +} + +func (m *MarketWsClient) GetMBPDepthUpdateTopic(params *MBPDepthUpdateTopicParam) (string, error) { + err := validator.New().Struct(params) + if err != nil { + return "", err + } + return fmt.Sprintf("market.%s.mbp.%v", params.Symbol, params.Level), nil +} + +type MBPDepthRefreshTopicParam struct { + Symbol string `validate:"required"` + Level int `validate:"required,oneof=5 10 20"` +} + +func (m *MarketWsClient) GetMBPRefreshDepthTopic(params *MBPDepthRefreshTopicParam) (string, error) { + err := validator.New().Struct(params) + if err != nil { + return "", err + } + return fmt.Sprintf("market.%s.mbp.refresh.%v", params.Symbol, params.Level), nil +} diff --git a/htx/spot/marketws/types/messages.go b/htx/spot/marketws/types/messages.go new file mode 100644 index 0000000..c76058a --- /dev/null +++ b/htx/spot/marketws/types/messages.go @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2023, LinstoHu + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package types + +type Kline struct { + ID int `json:"id,omitempty"` + Open float64 `json:"open,omitempty"` + Close float64 `json:"close,omitempty"` + Low float64 `json:"low,omitempty"` + High float64 `json:"high,omitempty"` + Amount float64 `json:"amount,omitempty"` + Vol float64 `json:"vol,omitempty"` + Count int `json:"count,omitempty"` +} + +type BBO struct { + SeqID int64 `json:"seqId,omitempty"` + Ask float64 `json:"ask,omitempty"` + AskSize float64 `json:"askSize,omitempty"` + Bid float64 `json:"bid,omitempty"` + BidSize float64 `json:"bidSize,omitempty"` + QuoteTime int64 `json:"quoteTime,omitempty"` + Symbol string `json:"symbol,omitempty"` +} + +type Depth struct { + Bids [][]float64 `json:"bids,omitempty"` + Asks [][]float64 `json:"asks,omitempty"` + Version int64 `json:"version,omitempty"` + Ts int64 `json:"ts,omitempty"` +} + +type Ticker struct { + Open float64 `json:"open,omitempty"` + High float64 `json:"high,omitempty"` + Low float64 `json:"low,omitempty"` + Close float64 `json:"close,omitempty"` + Amount float64 `json:"amount,omitempty"` + Vol float64 `json:"vol,omitempty"` + Count int `json:"count,omitempty"` + Bid float64 `json:"bid,omitempty"` + BidSize float64 `json:"bidSize,omitempty"` + Ask float64 `json:"ask,omitempty"` + AskSize float64 `json:"askSize,omitempty"` + LastPrice float64 `json:"lastPrice,omitempty"` + LastSize float64 `json:"lastSize,omitempty"` +} + +type MarketTradeMsg struct { + ID int64 `json:"id,omitempty"` + Ts int64 `json:"ts,omitempty"` + Data []MarketTrade `json:"data,omitempty"` +} + +type MarketTrade struct { + ID int64 `json:"id,omitempty"` + Ts int64 `json:"ts,omitempty"` + TradeID int64 `json:"tradeId,omitempty"` + Amount float64 `json:"amount,omitempty"` + Price float64 `json:"price,omitempty"` + Direction string `json:"direction,omitempty"` +} + +type MBPRefreshDepth struct { + SeqNum int64 `json:"seqNum,omitempty"` + Bids [][]float64 `json:"bids,omitempty"` + Asks [][]float64 `json:"asks,omitempty"` +} diff --git a/htx/spot/marketws/vars.go b/htx/spot/marketws/vars.go new file mode 100644 index 0000000..a951eaf --- /dev/null +++ b/htx/spot/marketws/vars.go @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2023, LinstoHu + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package marketws + +const ( + GlobalWsBaseURL = "wss://api.huobi.pro/ws" + GlobalWsBaseURLForAWS = "wss://api-aws.huobi.pro/ws" + + MBPWsBaseURL = "wss://api.huobi.pro/feed" + MBPWsBaseURLForAWS = "wss://api-aws.huobi.pro/feed" +) + +const ( + MaxTryTimes = 5 + + TimerIntervalSecond = 5 +) diff --git a/htx/spot/rest/types/kline.go b/htx/spot/rest/types/kline.go new file mode 100644 index 0000000..8af22e1 --- /dev/null +++ b/htx/spot/rest/types/kline.go @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2023, LinstoHu + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package types + +type KlineInterval string + +var ( + Minute1 KlineInterval = "1min" + Minute5 KlineInterval = "5min" + Minute15 KlineInterval = "15min" + Minute30 KlineInterval = "30min" + Minute60 KlineInterval = "60min" + Hour4 KlineInterval = "4hour" + Day1 KlineInterval = "1day" + Month1 KlineInterval = "1mon" + Week1 KlineInterval = "1week" + Year1 KlineInterval = "1year" +) diff --git a/htx/utils/gzip.go b/htx/utils/gzip.go new file mode 100644 index 0000000..77f552e --- /dev/null +++ b/htx/utils/gzip.go @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2023, LinstoHu + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package utils + +import ( + "bytes" + "compress/gzip" + "io" +) + +func GZipDecompress(input []byte) (string, error) { + buf := bytes.NewBuffer(input) + reader, gzipErr := gzip.NewReader(buf) + if gzipErr != nil { + return "", gzipErr + } + defer reader.Close() + + result, readErr := io.ReadAll(reader) + if readErr != nil { + return "", readErr + } + + return string(result), nil +} + +func GZipCompress(input string) ([]byte, error) { + var buf bytes.Buffer + gz := gzip.NewWriter(&buf) + + _, err := gz.Write([]byte(input)) + if err != nil { + return nil, err + } + + err = gz.Flush() + if err != nil { + return nil, err + } + + err = gz.Close() + if err != nil { + return nil, err + } + + return buf.Bytes(), nil +}