Skip to content

Commit

Permalink
Minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Oct 4, 2019
1 parent e31643d commit 28c5ed2
Show file tree
Hide file tree
Showing 20 changed files with 613 additions and 585 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ require (
)

// Uncomment for tests with alternate branches of 'dmsg'
replace github.com/skycoin/dmsg => ../dmsg
// replace github.com/skycoin/dmsg => ../dmsg
24 changes: 0 additions & 24 deletions pkg/app/packet_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/app2/client_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package app2

import (
"errors"
"testing"

"github.com/pkg/errors"
"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/skycoin/src/util/logging"
"github.com/stretchr/testify/require"
Expand Down
4 changes: 3 additions & 1 deletion pkg/app2/errors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package app2

import "github.com/pkg/errors"
import (
"errors"
)

var (
// ErrPortAlreadyBound is being returned when trying to bind to the port
Expand Down
3 changes: 1 addition & 2 deletions pkg/app2/id_manager.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package app2

import (
"errors"
"fmt"
"sync"

"github.com/pkg/errors"
)

var (
Expand Down
3 changes: 1 addition & 2 deletions pkg/app2/id_manager_util.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package app2

import (
"errors"
"net"

"github.com/pkg/errors"
)

// assertListener asserts that `v` is of type `net.Listener`.
Expand Down
2 changes: 1 addition & 1 deletion pkg/app2/listener_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package app2

import (
"errors"
"testing"

"github.com/pkg/errors"
"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/skycoin/src/util/logging"
"github.com/stretchr/testify/require"
Expand Down
6 changes: 3 additions & 3 deletions pkg/app2/rpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package app2

import (
"context"
"errors"
"net"
"net/rpc"
"testing"

"github.com/pkg/errors"
"github.com/skycoin/dmsg"
"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/skycoin/src/util/logging"
Expand Down Expand Up @@ -47,7 +47,7 @@ func TestRPCClient_Dial(t *testing.T) {

dialCtx := context.Background()
dialConn := dmsg.NewTransport(&MockConn{}, logging.MustGetLogger("dmsg_tp"),
dmsgLocal, dmsgRemote, 0, func() {})
dmsgLocal, dmsgRemote, 0, func(_ uint16) {})
var noErr error

n := &network.MockNetworker{}
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestRPCClient_Accept(t *testing.T) {
Port: remotePort,
}
lisConn := dmsg.NewTransport(&MockConn{}, logging.MustGetLogger("dmsg_tp"),
dmsgLocal, dmsgRemote, 0, func() {})
dmsgLocal, dmsgRemote, 0, func(_ uint16) {})
var noErr error

lis := &MockListener{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/app2/rpc_gateway.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package app2

import (
"errors"
"fmt"
"net"

"github.com/pkg/errors"
"github.com/skycoin/skycoin/src/util/logging"

"github.com/skycoin/skywire/pkg/app2/network"
Expand Down
4 changes: 2 additions & 2 deletions pkg/app2/rpc_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package app2

import (
"context"
"errors"
"math"
"net"
"strings"
"testing"

"github.com/pkg/errors"
"github.com/skycoin/dmsg"
"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/skycoin/src/util/logging"
Expand All @@ -29,7 +29,7 @@ func TestRPCGateway_Dial(t *testing.T) {
localPort := routing.Port(100)

dialCtx := context.Background()
dialConn := dmsg.NewTransport(nil, nil, dmsg.Addr{Port: uint16(localPort)}, dmsg.Addr{}, 0, func() {})
dialConn := dmsg.NewTransport(nil, nil, dmsg.Addr{Port: uint16(localPort)}, dmsg.Addr{}, 0, func(_ uint16) {})
var dialErr error

n := &network.MockNetworker{}
Expand Down
16 changes: 13 additions & 3 deletions pkg/router/route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"errors"
"io"
"net"
"sync"
"time"
Expand Down Expand Up @@ -41,6 +42,8 @@ type RouteGroup struct {
// and push to the appropriate '(RouteGroup).readCh'.
readCh chan []byte // push reads from Router
readBuf bytes.Buffer // for read overflow
done chan struct{}
once sync.Once

rt routing.Table
}
Expand Down Expand Up @@ -68,7 +71,12 @@ func (r *RouteGroup) Read(p []byte) (n int, err error) {
return r.readBuf.Read(p)
}

return ioutil.BufRead(&r.readBuf, <-r.readCh, p)
data, ok := <-r.readCh
if !ok {
return 0, io.ErrClosedPipe
}

return ioutil.BufRead(&r.readBuf, data, p)
}

// Write writes payload to a RouteGroup
Expand Down Expand Up @@ -102,7 +110,6 @@ func (r *RouteGroup) Write(p []byte) (n int, err error) {
// - Delete all rules (ForwardRules and ConsumeRules) from routing table.
// - Close all go channels.
func (r *RouteGroup) Close() error {

r.mu.Lock()
defer r.mu.Unlock()

Expand All @@ -124,7 +131,10 @@ func (r *RouteGroup) Close() error {
}
r.rt.DelRules(routeIDs)

close(r.readCh) // TODO(nkryuchkov): close readCh properly
r.once.Do(func() {
close(r.done)
close(r.readCh)
})

return nil
}
Expand Down
41 changes: 18 additions & 23 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func New(n *snet.Network, config *Config) (*router, error) {
trustedNodes: trustedNodes,
}

if err := r.rpcSrv.Register(NewGateway(r)); err != nil {
if err := r.rpcSrv.Register(NewRPCGateway(r)); err != nil {
return nil, fmt.Errorf("failed to register RPC server")
}

Expand Down Expand Up @@ -276,7 +276,7 @@ func (r *router) handleTransportPacket(ctx context.Context, packet routing.Packe
}

desc := rule.RouteDescriptor()
rg, ok := r.rgs[desc]
rg, ok := r.routeGroup(desc)
if !ok {
return errors.New("route descriptor does not exist")
}
Expand All @@ -288,10 +288,23 @@ func (r *router) handleTransportPacket(ctx context.Context, packet routing.Packe
switch t := rule.Type(); t {
case routing.RuleForward, routing.RuleIntermediaryForward:
return r.forwardPacket(ctx, packet.Payload(), rule)
default:
rg.readCh <- packet.Payload()
return nil
default: // TODO(nkryuchkov): try to simplify
select {
case <-rg.done:
return io.ErrClosedPipe
default:
rg.mu.Lock()
defer rg.mu.Unlock()
select {
case rg.readCh <- packet.Payload():
return nil
case <-rg.done:
return io.ErrClosedPipe
}

}
}

}

// GetRule gets routing rule.
Expand Down Expand Up @@ -342,24 +355,6 @@ func (r *router) forwardPacket(ctx context.Context, payload []byte, rule routing
return nil
}

// func (r *router) consumePacket(payload []byte, rule routing.Rule) error {
// laddr := routing.Addr{Port: rule.RouteDescriptor().SrcPort()}
// raddr := routing.Addr{PubKey: rule.RouteDescriptor().DstPK(), Port: rule.RouteDescriptor().DstPort()}
//
// route := routing.Route{Desc: routing.NewRouteDescriptor(laddr.PubKey, raddr.PubKey, laddr.Port, raddr.Port)}
// p := &app.Packet{Desc: route.Desc, Payload: payload}
// b, err := r.pm.Get(rule.RouteDescriptor().SrcPort())
// if err != nil {
// return err
// }
// if err := b.conn.Send(app.FrameSend, p, nil); err != nil { // TODO: Stuck here.
// return err
// }
//
// r.logger.Infof("Forwarded packet to App on Port %d", rule.RouteDescriptor().SrcPort())
// return nil
// }

// RemoveRouteDescriptor removes loop rule.
func (r *router) RemoveRouteDescriptor(desc routing.RouteDescriptor) {
rules := r.rt.AllRules()
Expand Down
Loading

0 comments on commit 28c5ed2

Please sign in to comment.