Skip to content

Commit

Permalink
Merge branch 'fix/app2-router2-tests' of https://github.com/Darkren/s…
Browse files Browse the repository at this point in the history
…kywire-mainnet into fix/app2-router2-tests
  • Loading branch information
Darkren committed Nov 18, 2019
2 parents 49ce993 + f388188 commit 8583100
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 251 deletions.
61 changes: 44 additions & 17 deletions pkg/router/route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"sync"
"sync/atomic"
"time"

"github.com/SkycoinProject/dmsg/ioutil"
"github.com/SkycoinProject/skycoin/src/util/logging"

"github.com/SkycoinProject/skywire-mainnet/pkg/routing"
"github.com/SkycoinProject/skywire-mainnet/pkg/transport"
Expand All @@ -20,39 +22,51 @@ const (
readChBufSize = 1024
)

var (
// ErrNoTransports is returned when RouteGroup has no transports.
ErrNoTransports = errors.New("no transports")
// ErrNoTransport is returned when RouteGroup has no rules.
ErrNoRules = errors.New("no rules")
// ErrNoTransport is returned when transport is nil.
ErrBadTransport = errors.New("bad transport")
)

// RouteGroup should implement 'io.ReadWriteCloser'.
// It implements 'net.Conn'.
type RouteGroup struct {
mu sync.RWMutex

desc routing.RouteDescriptor // describes the route group
fwd []routing.Rule // forward rules (for writing)
rvs []routing.Rule // reverse rules (for reading)
logger *logging.Logger

// The following fields are used for writing:
// - fwd/tps should have the same number of elements.
// - the corresponding element of tps should have tpID of the corresponding rule in fwd.
// - rg.fwd references 'ForwardRule' rules for writes.
desc routing.RouteDescriptor // describes the route group
rt routing.Table

// 'tps' is transports used for writing/forward rules.
// It should have the same number of elements as 'fwd'
// where each element corresponds with the adjacent element in 'fwd'.
tps []*transport.ManagedTransport

// The following fields are used for writing:
// - fwd/tps should have the same number of elements.
// - the corresponding element of tps should have tpID of the corresponding rule in fwd.
// - fwd references 'ForwardRule' rules for writes.
fwd []routing.Rule // forward rules (for writing)
rvs []routing.Rule // reverse rules (for reading)

lastSent int64

// 'readCh' reads in incoming packets of this route group.
// - Router should serve call '(*transport.Manager).ReadPacket' in a loop,
// and push to the appropriate '(RouteGroup).readCh'.
readCh chan []byte // push reads from Router
readBuf bytes.Buffer // for read overflow
done chan struct{}
once sync.Once

rt routing.Table

lastSent int64
}

func NewRouteGroup(rt routing.Table, desc routing.RouteDescriptor) *RouteGroup {
rg := &RouteGroup{
logger: logging.MustGetLogger(fmt.Sprintf("RouteGroup %v", desc)),
desc: desc,
fwd: make([]routing.Rule, 0),
rvs: make([]routing.Rule, 0),
Expand Down Expand Up @@ -89,21 +103,25 @@ func (r *RouteGroup) Read(p []byte) (n int, err error) {
// Write writes payload to a RouteGroup
// For the first version, only the first ForwardRule (fwd[0]) is used for writing.
func (r *RouteGroup) Write(p []byte) (n int, err error) {
if r.isClosing() {
return 0, io.ErrClosedPipe
}

r.mu.Lock()
defer r.mu.Unlock()

if len(r.tps) == 0 {
return 0, errors.New("no transports") // TODO(nkryuchkov): proper error
return 0, ErrNoTransports
}
if len(r.fwd) == 0 {
return 0, errors.New("no rules") // TODO(nkryuchkov): proper error
return 0, ErrNoRules
}

tp := r.tps[0]
rule := r.fwd[0]

if tp == nil {
return 0, errors.New("unknown transport")
return 0, ErrBadTransport
}

packet := routing.MakeDataPacket(rule.KeyRouteID(), p)
Expand Down Expand Up @@ -187,7 +205,7 @@ func (r *RouteGroup) keepAliveLoop() {
}

if err := r.sendKeepAlive(); err != nil {
// TODO: handle error
r.logger.Warnf("Failed to send keepalive: %v", err)
}
}
}
Expand All @@ -197,10 +215,10 @@ func (r *RouteGroup) sendKeepAlive() error {
defer r.mu.Unlock()

if len(r.tps) == 0 {
return errors.New("no transports") // TODO(nkryuchkov): proper error
return ErrNoTransports
}
if len(r.fwd) == 0 {
return errors.New("no rules") // TODO(nkryuchkov): proper error
return ErrNoRules
}

tp := r.tps[0]
Expand All @@ -216,3 +234,12 @@ func (r *RouteGroup) sendKeepAlive() error {
}
return nil
}

func (r *RouteGroup) isClosing() bool {
select {
case <-r.done:
return true
default:
return false
}
}
32 changes: 16 additions & 16 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,23 +318,23 @@ func (r *router) handleDataPacket(ctx context.Context, packet routing.Packet) er
}

r.logger.Infof("Got new remote packet with route ID %d. Using rule: %s", packet.RouteID(), rule)
switch t := rule.Type(); t {
case routing.RuleForward, routing.RuleIntermediaryForward:

if t := rule.Type(); t == routing.RuleForward || t == routing.RuleIntermediaryForward {
return r.forwardPacket(ctx, packet.Payload(), rule)
default: // TODO(nkryuchkov): try to simplify
select {
case <-rg.done:
return io.ErrClosedPipe
default:
rg.mu.Lock()
defer rg.mu.Unlock()
select {
case rg.readCh <- packet.Payload():
return nil
case <-rg.done:
return io.ErrClosedPipe
}
}
}

if rg.isClosing() {
return io.ErrClosedPipe
}

rg.mu.Lock()
defer rg.mu.Unlock()

select {
case <-rg.done:
return io.ErrClosedPipe
case rg.readCh <- packet.Payload():
return nil
}
}

Expand Down
Loading

0 comments on commit 8583100

Please sign in to comment.