Skip to content

Commit

Permalink
Attempt to fix errors
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Jul 16, 2019
1 parent 4073c3b commit 0a3b309
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 43 deletions.
58 changes: 34 additions & 24 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,36 +77,22 @@ func New(config *Config) *Router {

// Serve starts transport listening loop.
func (r *Router) Serve(ctx context.Context) error {

go func() {
for tp := range r.tm.TrChan {
for tp := range r.tm.DataTpChan {
r.mu.Lock()
isAccepted, isSetup := tp.Accepted, r.IsSetupTransport(tp)
isAccepted := tp.Accepted
r.mu.Unlock()

r.Logger.Infof("New transport: isAccepted: %v, isSetup: %v", isAccepted, isSetup)
r.Logger.Infof("New transport: isAccepted: %v, isSetup: %v", isAccepted, false)

var serve func(io.ReadWriter) error
switch {
case isAccepted && isSetup:
serve = r.rm.Serve
case !isSetup:
serve = r.serveTransport
default:
continue
}
r.handleTransport(tp, isAccepted, false)
}
}()

go func(tp transport.Transport) {
defer tp.Close()
for {
if err := serve(tp); err != nil {
if err != io.EOF {
r.Logger.Warnf("Stopped serving Transport: %s", err)
}
return
}
}
}(tp)
go func() {
for tp := range r.tm.SetupTpChan {
r.Logger.Infof("New transport: isAccepted: %v, isSetup: %v", true, true)
r.handleTransport(tp, true, false)
}
}()

Expand All @@ -122,6 +108,30 @@ func (r *Router) Serve(ctx context.Context) error {
return r.tm.Serve(ctx)
}

func (r *Router) handleTransport(tp transport.Transport, isAccepted, isSetup bool) {
var serve func(io.ReadWriter) error
switch {
case isAccepted && isSetup:
serve = r.rm.Serve
case !isSetup:
serve = r.serveTransport
default:
return
}

go func(tp transport.Transport) {
defer tp.Close()
for {
if err := serve(tp); err != nil {
if err != io.EOF {
r.Logger.Warnf("Stopped serving Transport: %s", err)
}
return
}
}
}(tp)
}

// ServeApp handles App packets from the App connection on provided port.
func (r *Router) ServeApp(conn net.Conn, port uint16, appConf *app.Config) error {
r.wg.Add(1)
Expand Down
6 changes: 3 additions & 3 deletions pkg/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ func TestRouterSetupLoop(t *testing.T) {
errCh := make(chan error)
go func() {
var tr *transport.ManagedTransport
for tr = range m2.TrChan {
for tr = range m2.DataTpChan {
if tr.Accepted {
break
}
Expand Down Expand Up @@ -564,7 +564,7 @@ func TestRouterCloseLoop(t *testing.T) {
// acceptCh, _ := m2.Observe()
// tr := <-acceptCh
var tr *transport.ManagedTransport
for tr = range m2.TrChan {
for tr = range m2.DataTpChan {
if tr.Accepted {
break
}
Expand Down Expand Up @@ -656,7 +656,7 @@ func TestRouterCloseLoopOnAppClose(t *testing.T) {
errCh := make(chan error)
go func() {
var tr *transport.ManagedTransport
for tr = range m2.TrChan {
for tr = range m2.DataTpChan {
if tr.Accepted {
break
}
Expand Down
44 changes: 32 additions & 12 deletions pkg/transport/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ type Manager struct {
transports map[uuid.UUID]*ManagedTransport
entries map[Entry]struct{}

doneChan chan struct{}
TrChan chan *ManagedTransport
mu sync.RWMutex
doneChan chan struct{}
SetupTpChan chan Transport
DataTpChan chan *ManagedTransport
mu sync.RWMutex

mgrQty int32 // Count of spawned manageTransport goroutines

Expand All @@ -57,13 +58,14 @@ func NewManager(config *ManagerConfig, factories ...Factory) (*Manager, error) {
}

return &Manager{
Logger: logging.MustGetLogger("trmanager"),
config: config,
factories: fMap,
transports: make(map[uuid.UUID]*ManagedTransport),
entries: mEntries,
TrChan: make(chan *ManagedTransport, 9), // TODO: eliminate or justify buffering here
doneChan: make(chan struct{}),
Logger: logging.MustGetLogger("trmanager"),
config: config,
factories: fMap,
transports: make(map[uuid.UUID]*ManagedTransport),
entries: mEntries,
SetupTpChan: make(chan Transport, 9), // TODO: eliminate or justify buffering here
DataTpChan: make(chan *ManagedTransport, 9), // TODO: eliminate or justify buffering here
doneChan: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -319,12 +321,23 @@ func (tm *Manager) createTransport(ctx context.Context, remote cipher.PubKey, tp
tm.transports[entry.ID] = mTr
tm.mu.Unlock()

var setupTpChan chan Transport
var dataTpChan chan *ManagedTransport

if tm.IsSetupTransport(tr) {
setupTpChan = tm.SetupTpChan
} else {
dataTpChan = tm.DataTpChan
}

select {
case <-tm.doneChan:
return nil, io.ErrClosedPipe
case tm.TrChan <- mTr:
case dataTpChan <- mTr:
go tm.manageTransport(ctx, mTr, factory, remote)
return mTr, nil
case setupTpChan <- mTr:
return mTr, nil
}
}

Expand All @@ -338,10 +351,15 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (*Manag
return nil, errors.New("transport.Manager is closing. Skipping incoming transport")
}

var setupTpChan chan Transport
var dataTpChan chan *ManagedTransport

var entry *Entry
if tm.IsSetupTransport(tr) {
setupTpChan = tm.SetupTpChan
entry = makeEntry(tr, false)
} else {
dataTpChan = tm.DataTpChan
entry, err = settlementResponderHandshake().Do(tm, tr, 30*time.Second)
if err != nil {
tr.Close()
Expand Down Expand Up @@ -370,9 +388,11 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (*Manag
select {
case <-tm.doneChan:
return nil, io.ErrClosedPipe
case tm.TrChan <- mTr:
case dataTpChan <- mTr:
go tm.manageTransport(ctx, mTr, factory, remote)
return mTr, nil
case setupTpChan <- mTr:
return mTr, nil
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/transport/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestTransportManager(t *testing.T) {
var mu sync.Mutex
m1Observed := uint32(0)

acceptCh := m1.TrChan
acceptCh := m1.DataTpChan
go func() {
for tr := range acceptCh {
mu.Lock()
Expand All @@ -71,7 +71,7 @@ func TestTransportManager(t *testing.T) {
}()

m2Observed := uint32(0)
dialCh := m2.TrChan
dialCh := m2.DataTpChan
go func() {
for tr := range dialCh {
mu.Lock()
Expand Down
3 changes: 2 additions & 1 deletion vendor/github.com/skycoin/dmsg/disc/entry.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/github.com/skycoin/dmsg/noise/net.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 0a3b309

Please sign in to comment.