Skip to content

Commit

Permalink
Release: v1.0.23-alpha
Browse files Browse the repository at this point in the history
  • Loading branch information
amir-the-h committed Oct 15, 2021
1 parent 1448a67 commit e72850e
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 47 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@ Changelog
=========
All notable changes to this project will be documented in this file.

v1.0.23-alpha
------------

### Changed

- Changed login mechanism

v1.0.22-alpha
------------

Expand Down
31 changes: 27 additions & 4 deletions api/ws/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ type ClientWs struct {
lastTransmit map[bool]*time.Time
mu map[bool]*sync.Mutex
rmu map[bool]*sync.Mutex
AuthorizedUntil *time.Time
AuthRequested *time.Time
Authorized bool
Private *Private
Public *Public
Trade *Trade
Expand Down Expand Up @@ -106,7 +107,7 @@ func (c *ClientWs) Connect(p bool) error {
//
// https://www.okex.com/docs-v5/en/#websocket-api-login
func (c *ClientWs) Login() error {
if c.AuthorizedUntil != nil && time.Since(*c.AuthorizedUntil) < PingPeriod {
if c.AuthRequested != nil || time.Since(*c.AuthRequested) < PingPeriod {
return nil
}
method := http.MethodGet
Expand Down Expand Up @@ -191,6 +192,21 @@ func (c *ClientWs) SetChannels(errCh chan *events.Error, subCh chan *events.Subs
c.SuccessChan = sCh
}

// WaitForAuthorization waits for the auth response and try to log in if it was needed
func (c *ClientWs) WaitForAuthorization() error {
ticker := time.NewTicker(time.Millisecond * 300)
defer ticker.Stop()
if err := c.Login(); err != nil {
return err
}
for range ticker.C {
if c.Authorized {
return nil
}
}
return nil
}

func (c *ClientWs) dial(p bool) error {
c.mu[p].Lock()
c.rmu[p].Lock()
Expand Down Expand Up @@ -309,6 +325,8 @@ func (c *ClientWs) handleCancel(msg string) error {
}()
return fmt.Errorf("operation cancelled: %s", msg)
}

// TODO: break each case into a separate function
func (c *ClientWs) process(data []byte, e *events.Basic) bool {
switch e.Event {
case "error":
Expand Down Expand Up @@ -339,8 +357,13 @@ func (c *ClientWs) process(data []byte, e *events.Basic) bool {
}()
return true
case "login":
au := time.Now().Add(time.Second * 30)
c.AuthorizedUntil = &au
au := time.Now().Add(time.Second * -30)
if au.After(*c.AuthRequested) {
c.AuthRequested = nil
_ = c.Login()
break
}
c.Authorized = true
e := events.Login{}
_ = json.Unmarshal(data, &e)
go func() {
Expand Down
35 changes: 12 additions & 23 deletions api/ws/private.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/amir-the-h/okex/events"
"github.com/amir-the-h/okex/events/private"
requests "github.com/amir-the-h/okex/requests/ws/private"
"time"
)

// Private
Expand Down Expand Up @@ -34,10 +33,10 @@ func (c *Private) Account(req requests.Account, ch ...chan *private.Account) err
if len(ch) > 0 {
c.aCh = ch[0]
}
if err := c.Login(); err != nil {
err := c.WaitForAuthorization()
if err != nil {
return err
}
c.waitForAuthorization()
return c.Subscribe(true, []okex.ChannelName{"account"}, m)
}

Expand All @@ -61,10 +60,10 @@ func (c *Private) Position(req requests.Position, ch ...chan *private.Position)
if len(ch) > 0 {
c.pCh = ch[0]
}
if err := c.Login(); err != nil {
err := c.WaitForAuthorization()
if err != nil {
return err
}
c.waitForAuthorization()
return c.Subscribe(true, []okex.ChannelName{"positions"}, m)
}

Expand All @@ -76,10 +75,10 @@ func (c *Private) UPosition(req requests.Position, rCh ...bool) error {
if len(rCh) > 0 && rCh[0] {
c.pCh = nil
}
if err := c.Login(); err != nil {
err := c.WaitForAuthorization()
if err != nil {
return err
}
c.waitForAuthorization()
return c.Unsubscribe(true, []okex.ChannelName{"positions"}, m)
}

Expand All @@ -92,10 +91,10 @@ func (c *Private) BalanceAndPosition(ch ...chan *private.BalanceAndPosition) err
if len(ch) > 0 {
c.bnpCh = ch[0]
}
if err := c.Login(); err != nil {
err := c.WaitForAuthorization()
if err != nil {
return err
}
c.waitForAuthorization()
return c.Subscribe(true, []okex.ChannelName{"balance_and_position"}, m)
}

Expand All @@ -119,10 +118,10 @@ func (c *Private) Order(req requests.Order, ch ...chan *private.Order) error {
if len(ch) > 0 {
c.oCh = ch[0]
}
if err := c.Login(); err != nil {
err := c.WaitForAuthorization()
if err != nil {
return err
}
c.waitForAuthorization()
return c.Subscribe(true, []okex.ChannelName{"orders"}, m)
}

Expand All @@ -134,10 +133,10 @@ func (c *Private) UOrder(req requests.Order, rCh ...bool) error {
if len(rCh) > 0 && rCh[0] {
c.oCh = nil
}
if err := c.Login(); err != nil {
err := c.WaitForAuthorization()
if err != nil {
return err
}
c.waitForAuthorization()
return c.Unsubscribe(true, []okex.ChannelName{"orders"}, m)
}

Expand Down Expand Up @@ -204,13 +203,3 @@ func (c *Private) Process(data []byte, e *events.Basic) bool {
}
return false
}

func (c *Private) waitForAuthorization() {
ticker := time.NewTicker(time.Millisecond * 300)
defer ticker.Stop()
for range ticker.C {
if c.AuthorizedUntil != nil && time.Since(*c.AuthorizedUntil) < PingPeriod {
return
}
}
}
26 changes: 6 additions & 20 deletions api/ws/trade.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ws
import (
"github.com/amir-the-h/okex"
requests "github.com/amir-the-h/okex/requests/rest/trade"
"time"
)

// Trade
Expand Down Expand Up @@ -35,10 +34,10 @@ func (c *Trade) PlaceOrder(req ...requests.PlaceOrder) error {
for i, order := range req {
tmpArgs[i] = okex.S2M(order)
}
if err := c.Login(); err != nil {
err := c.WaitForAuthorization()
if err != nil {
return err
}
c.waitForAuthorization()
return c.Send(true, op, tmpArgs, map[string]string{"id": req[0].ID})
}

Expand All @@ -59,10 +58,10 @@ func (c *Trade) CancelOrder(req ...requests.CancelOrder) error {
for i, order := range req {
tmpArgs[i] = okex.S2M(order)
}
if err := c.Login(); err != nil {
err := c.WaitForAuthorization()
if err != nil {
return err
}
c.waitForAuthorization()
return c.Send(true, op, tmpArgs, map[string]string{"id": req[0].ID})
}

Expand All @@ -83,22 +82,9 @@ func (c *Trade) AmendOrder(req ...requests.AmendOrder) error {
for i, order := range req {
tmpArgs[i] = okex.S2M(order)
}
if err := c.Login(); err != nil {
err := c.WaitForAuthorization()
if err != nil {
return err
}
c.waitForAuthorization()
return c.Send(true, op, tmpArgs, map[string]string{"id": req[0].ID})
}

func (c *Trade) waitForAuthorization() {
if c.AuthorizedUntil != nil && time.Since(*c.AuthorizedUntil) < PingPeriod {
return
}
ticker := time.NewTicker(time.Millisecond * 300)
defer ticker.Stop()
for range ticker.C {
if c.AuthorizedUntil != nil && time.Since(*c.AuthorizedUntil) < PingPeriod {
return
}
}
}

0 comments on commit e72850e

Please sign in to comment.