Skip to content

Commit

Permalink
Improve code quality
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Dec 13, 2019
1 parent 85f7412 commit cb815a0
Show file tree
Hide file tree
Showing 4 changed files with 296 additions and 216 deletions.
69 changes: 47 additions & 22 deletions pkg/router/route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,36 +185,21 @@ func (rg *RouteGroup) Write(p []byte) (n int, err error) {
rg.mu.Lock()
defer rg.mu.Unlock()

if len(rg.tps) == 0 {
return 0, ErrNoTransports
tp, err := rg.tp()
if err != nil {
return 0, err
}

if len(rg.fwd) == 0 {
return 0, ErrNoRules
}

tp := rg.tps[0]
rule := rg.fwd[0]

if tp == nil {
return 0, ErrBadTransport
rule, err := rg.rule()
if err != nil {
return 0, err
}

packet := routing.MakeDataPacket(rule.KeyRouteID(), p)

ctx, cancel := context.WithCancel(context.Background())
errCh, cancel := rg.writePacketAsync(tp, packet)
defer cancel()

errCh := make(chan error)

go func() {
select {
case <-ctx.Done():
case errCh <- tp.WritePacket(context.Background(), packet):
}
close(errCh)
}()

timeout := time.NewTimer(rg.cfg.IOTimeout)
defer timeout.Stop()

Expand All @@ -234,6 +219,46 @@ func (rg *RouteGroup) Write(p []byte) (n int, err error) {
}
}

func (rg *RouteGroup) writePacketAsync(tp *transport.ManagedTransport, packet routing.Packet) (chan error, func()) {
ctx, cancel := context.WithCancel(context.Background())

errCh := make(chan error)

go func() {
select {
case <-ctx.Done():
case errCh <- tp.WritePacket(context.Background(), packet):
}
close(errCh)
}()

return errCh, cancel
}

func (rg *RouteGroup) rule() (routing.Rule, error) {
if len(rg.fwd) == 0 {
return nil, ErrNoRules
}

rule := rg.fwd[0]

return rule, nil
}

func (rg *RouteGroup) tp() (*transport.ManagedTransport, error) {
if len(rg.tps) == 0 {
return nil, ErrNoTransports
}

tp := rg.tps[0]

if tp == nil {
return nil, ErrBadTransport
}

return tp, nil
}

// Close closes a RouteGroup:
// - Send Close packet for all ForwardRules.
// - Delete all rules (ForwardRules and ConsumeRules) from routing table.
Expand Down
17 changes: 13 additions & 4 deletions pkg/router/route_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ func TestRouteGroup_Read(t *testing.T) {

func TestRouteGroup_Write(t *testing.T) {
msg1 := []byte("hello1")
msg2 := []byte("hello2")

rg1 := createRouteGroup()
require.NotNil(t, rg1)
Expand All @@ -108,6 +107,16 @@ func TestRouteGroup_Write(t *testing.T) {
m1, m2, teardown := createTransports(t, rg1, rg2, stcp.Type)
defer teardown()

testWrite(t, rg1, rg2, m1, m2)

require.NoError(t, rg1.Close())
require.NoError(t, rg2.Close())
}

func testWrite(t *testing.T, rg1, rg2 *RouteGroup, m1, m2 *transport.Manager) {
msg1 := []byte("hello1")
msg2 := []byte("hello2")

n, err := rg1.Write([]byte{})
require.Equal(t, 0, n)
require.NoError(t, err)
Expand All @@ -134,22 +143,22 @@ func TestRouteGroup_Write(t *testing.T) {
rg1.tps[0] = nil
_, err = rg1.Write(msg1)
require.Equal(t, ErrBadTransport, err)

rg1.tps[0] = tpBackup

tpsBackup := rg1.tps
rg1.tps = nil
_, err = rg1.Write(msg1)
require.Equal(t, ErrNoTransports, err)

rg1.tps = tpsBackup

fwdBackup := rg1.fwd
rg1.fwd = nil
_, err = rg1.Write(msg1)
require.Equal(t, ErrNoRules, err)
rg1.fwd = fwdBackup

require.NoError(t, rg1.Close())
require.NoError(t, rg2.Close())
rg1.fwd = fwdBackup
}

func TestRouteGroup_ReadWrite(t *testing.T) {
Expand Down
18 changes: 15 additions & 3 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (c *Config) SetDefaults() {
if c.Logger == nil {
c.Logger = logging.MustGetLogger("router")
}

if c.RouteGroupDialer == nil {
c.RouteGroupDialer = setupclient.NewSetupNodeDialer()
}
Expand Down Expand Up @@ -217,15 +218,26 @@ func (r *router) DialRoutes(
// - Save to routing.Table and internal RouteGroup map.
// - Return the RoutingGroup.
func (r *router) AcceptRoutes(ctx context.Context) (*RouteGroup, error) {
var rules routing.EdgeRules
var ok bool
var (
rules routing.EdgeRules
ok bool
)

select {
case <-ctx.Done():
return nil, ctx.Err()
case rules, ok = <-r.accept:
}

if !ok {
return nil, &net.OpError{Op: "accept", Net: "skynet", Source: nil, Err: errors.New("use of closed network connection")}
err := &net.OpError{
Op: "accept",
Net: "skynet",
Source: nil,
Err: errors.New("use of closed network connection"),
}

return nil, err
}

if err := r.SaveRoutingRules(rules.Forward, rules.Reverse); err != nil {
Expand Down
Loading

0 comments on commit cb815a0

Please sign in to comment.