Skip to content

Commit

Permalink
Improve logging, refactor code a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkren committed Jan 30, 2020
1 parent 46879d8 commit d30b41d
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 105 deletions.
182 changes: 90 additions & 92 deletions pkg/router/route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ func (rg *RouteGroup) Read(p []byte) (n int, err error) {
}

if rg.readDeadline.Closed() {
rg.logger.Infoln("TIMEOUT ERROR?")
return 0, timeoutError{}
}

Expand All @@ -147,37 +146,6 @@ func (rg *RouteGroup) Read(p []byte) (n int, err error) {
return rg.read(p)
}

// read reads incoming data. It tries to fetch the data from the internal buffer.
// If buffer is empty it blocks on receiving from the data channel
func (rg *RouteGroup) read(p []byte) (int, error) {
// first try the buffer for any already received data
rg.mu.Lock()
if rg.readBuf.Len() > 0 {
n, err := rg.readBuf.Read(p)
rg.mu.Unlock()

return n, err
}
rg.mu.Unlock()

select {
case <-rg.readDeadline.Wait():
return 0, timeoutError{}
case data, ok := <-rg.readCh:
if !ok || len(data) == 0 {
// route group got closed or empty data received. Behavior on the empty
// data is equivalent to the behavior of `read()` unix syscall as described here:
// https://www.ibm.com/support/knowledgecenter/en/SSLTBW_2.4.0/com.ibm.zos.v2r4.bpxbd00/rtrea.htm
return 0, io.EOF
}

rg.mu.Lock()
defer rg.mu.Unlock()

return ioutil.BufRead(&rg.readBuf, data, p)
}
}

// Write writes payload to a RouteGroup
// For the first version, only the first ForwardRule (fwd[0]) is used for writing.
func (rg *RouteGroup) Write(p []byte) (n int, err error) {
Expand Down Expand Up @@ -211,6 +179,89 @@ func (rg *RouteGroup) Write(p []byte) (n int, err error) {
return rg.write(p, tp, rule)
}

// Close closes a RouteGroup.
func (rg *RouteGroup) Close() error {
if rg.isClosed() {
return io.ErrClosedPipe
}

if rg.isRemoteClosed() {
// remote already closed, everything is cleaned up,
// we just need to close signal channel at this point
close(rg.closed)
return nil
}

atomic.StoreInt32(&rg.closeInitiated, 1)

rg.mu.Lock()
defer rg.mu.Unlock()

return rg.close(routing.CloseRequested)
}

// LocalAddr returns destination address of underlying RouteDescriptor.
func (rg *RouteGroup) LocalAddr() net.Addr {
return rg.desc.Dst()
}

// RemoteAddr returns source address of underlying RouteDescriptor.
func (rg *RouteGroup) RemoteAddr() net.Addr {
return rg.desc.Src()
}

// SetDeadline sets both read and write deadlines.
func (rg *RouteGroup) SetDeadline(t time.Time) error {
if err := rg.SetReadDeadline(t); err != nil {
return err
}

return rg.SetWriteDeadline(t)
}

// SetReadDeadline sets read deadline.
func (rg *RouteGroup) SetReadDeadline(t time.Time) error {
rg.readDeadline.Set(t)
return nil
}

// SetWriteDeadline sets write deadline.
func (rg *RouteGroup) SetWriteDeadline(t time.Time) error {
rg.writeDeadline.Set(t)
return nil
}

// read reads incoming data. It tries to fetch the data from the internal buffer.
// If buffer is empty it blocks on receiving from the data channel
func (rg *RouteGroup) read(p []byte) (int, error) {
// first try the buffer for any already received data
rg.mu.Lock()
if rg.readBuf.Len() > 0 {
n, err := rg.readBuf.Read(p)
rg.mu.Unlock()

return n, err
}
rg.mu.Unlock()

select {
case <-rg.readDeadline.Wait():
return 0, timeoutError{}
case data, ok := <-rg.readCh:
if !ok || len(data) == 0 {
// route group got closed or empty data received. Behavior on the empty
// data is equivalent to the behavior of `read()` unix syscall as described here:
// https://www.ibm.com/support/knowledgecenter/en/SSLTBW_2.4.0/com.ibm.zos.v2r4.bpxbd00/rtrea.htm
return 0, io.EOF
}

rg.mu.Lock()
defer rg.mu.Unlock()

return ioutil.BufRead(&rg.readBuf, data, p)
}
}

func (rg *RouteGroup) write(data []byte, tp *transport.ManagedTransport, rule routing.Rule) (int, error) {
packet := routing.MakeDataPacket(rule.KeyRouteID(), data)

Expand Down Expand Up @@ -251,6 +302,8 @@ func (rg *RouteGroup) writePacketAsync(ctx context.Context, tp *transport.Manage
return errCh
}

// rule fetches first available forward rule.
// NOTE: not thread-safe.
func (rg *RouteGroup) rule() (routing.Rule, error) {
if len(rg.fwd) == 0 {
return nil, ErrNoRules
Expand All @@ -261,6 +314,8 @@ func (rg *RouteGroup) rule() (routing.Rule, error) {
return rule, nil
}

// tp fetches first available transport.
// NOTE: not thread-safe.
func (rg *RouteGroup) tp() (*transport.ManagedTransport, error) {
if len(rg.tps) == 0 {
return nil, ErrNoTransports
Expand All @@ -275,60 +330,6 @@ func (rg *RouteGroup) tp() (*transport.ManagedTransport, error) {
return tp, nil
}

// Close closes a RouteGroup.
func (rg *RouteGroup) Close() error {
if rg.isClosed() {
return io.ErrClosedPipe
}

if rg.isRemoteClosed() {
// remote already closed, everything is cleaned up,
// we just need to close signal channel at this point
close(rg.closed)
return nil
}

atomic.StoreInt32(&rg.closeInitiated, 1)

fmt.Println("BEFORE MU")
rg.mu.Lock()
defer rg.mu.Unlock()
fmt.Println("MU ACQUIRED")

return rg.close(routing.CloseRequested)
}

// LocalAddr returns destination address of underlying RouteDescriptor.
func (rg *RouteGroup) LocalAddr() net.Addr {
return rg.desc.Dst()
}

// RemoteAddr returns source address of underlying RouteDescriptor.
func (rg *RouteGroup) RemoteAddr() net.Addr {
return rg.desc.Src()
}

// SetDeadline sets both read and write deadlines.
func (rg *RouteGroup) SetDeadline(t time.Time) error {
if err := rg.SetReadDeadline(t); err != nil {
return err
}

return rg.SetWriteDeadline(t)
}

// SetReadDeadline sets read deadline.
func (rg *RouteGroup) SetReadDeadline(t time.Time) error {
rg.readDeadline.Set(t)
return nil
}

// SetWriteDeadline sets write deadline.
func (rg *RouteGroup) SetWriteDeadline(t time.Time) error {
rg.writeDeadline.Set(t)
return nil
}

func (rg *RouteGroup) keepAliveLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
Expand Down Expand Up @@ -370,7 +371,6 @@ func (rg *RouteGroup) sendKeepAlive() error {

packet := routing.MakeKeepAlivePacket(rule.KeyRouteID())
if err := tp.WritePacket(context.Background(), packet); err != nil {
rg.logger.WithError(err).Error("Failed to send keep-alive packet")
return err
}

Expand All @@ -382,7 +382,6 @@ func (rg *RouteGroup) sendKeepAlive() error {
// - Delete all rules (ForwardRules and ConsumeRules) from routing table.
// - Close all go channels.
func (rg *RouteGroup) close(code routing.CloseCode) error {
fmt.Println("GOT IN close")
if rg.isClosed() {
return nil
}
Expand Down Expand Up @@ -431,11 +430,11 @@ func (rg *RouteGroup) close(code routing.CloseCode) error {
}

func (rg *RouteGroup) handleClosePacket(code routing.CloseCode) error {
rg.logger.Infof("Got close packet with code %d on %s", code, rg.LocalAddr().String())
rg.logger.Infof("Got close packet with code %d", code)

if rg.isCloseInitiator() {
// this route group initiated close loop and got response
rg.logger.Debugf("Handling response close packet with code %d on %s", code, rg.LocalAddr().String())
rg.logger.Debugf("Handling response close packet with code %d", code)

rg.closeDone.Done()
return nil
Expand All @@ -446,11 +445,10 @@ func (rg *RouteGroup) handleClosePacket(code routing.CloseCode) error {
}

func (rg *RouteGroup) broadcastClosePackets(code routing.CloseCode) {
rg.logger.Infof("Broadcasting Close packets to %d addresses", len(rg.tps))
for i := 0; i < len(rg.tps); i++ {
packet := routing.MakeClosePacket(rg.fwd[i].KeyRouteID(), code)
if err := rg.tps[i].WritePacket(context.Background(), packet); err != nil {
rg.logger.WithError(err).Error("Failed to send close packet")
rg.logger.WithError(err).Errorf("Failed to send close packet to %s", rg.tps[i].Remote())
}
}
}
Expand Down
25 changes: 12 additions & 13 deletions pkg/router/route_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import (
"testing"
"time"

"github.com/SkycoinProject/dmsg/cipher"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/nettest"

"github.com/SkycoinProject/dmsg/cipher"
"github.com/SkycoinProject/skywire-mainnet/pkg/routing"
"github.com/SkycoinProject/skywire-mainnet/pkg/snet/snettest"
"github.com/SkycoinProject/skywire-mainnet/pkg/snet/stcp"
"github.com/SkycoinProject/skywire-mainnet/pkg/transport"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewRouteGroup(t *testing.T) {
Expand All @@ -29,7 +29,7 @@ func TestNewRouteGroup(t *testing.T) {
}

func TestRouteGroup_Close(t *testing.T) {
rg1, rg2, m1, m2, _, _, teardown := setupEnv(t)
rg1, rg2, m1, m2, teardown := setupEnv(t)

ctx, cancel := context.WithCancel(context.Background())

Expand Down Expand Up @@ -60,7 +60,7 @@ func TestRouteGroup_Close(t *testing.T) {
}

func TestRouteGroup_Read(t *testing.T) {
rg1, rg2, m1, m2, _, _, teardown := setupEnv(t)
rg1, rg2, m1, m2, teardown := setupEnv(t)

ctx, cancel := context.WithCancel(context.Background())

Expand Down Expand Up @@ -112,7 +112,7 @@ func TestRouteGroup_Read(t *testing.T) {
}

func TestRouteGroup_Write(t *testing.T) {
rg1, rg2, m1, m2, _, _, teardown := setupEnv(t)
rg1, rg2, m1, m2, teardown := setupEnv(t)

testWrite(t, rg1, rg2, m1, m2)

Expand Down Expand Up @@ -178,7 +178,7 @@ func TestRouteGroup_ReadWrite(t *testing.T) {
}

func testReadWrite(t *testing.T, iterations int) {
rg1, rg2, m1, m2, _, _, teardown := setupEnv(t)
rg1, rg2, m1, m2, teardown := setupEnv(t)

ctx, cancel := context.WithCancel(context.Background())

Expand Down Expand Up @@ -378,7 +378,7 @@ func TestArbitrarySizeMultipleMessagesByChunks(t *testing.T) {
}

func testArbitrarySizeMultipleMessagesByChunks(t *testing.T, size int) {
rg1, rg2, m1, m2, _, _, teardown := setupEnv(t)
rg1, rg2, m1, m2, teardown := setupEnv(t)

ctx, cancel := context.WithCancel(context.Background())

Expand Down Expand Up @@ -438,7 +438,7 @@ func testArbitrarySizeMultipleMessagesByChunks(t *testing.T, size int) {
}

func testArbitrarySizeOneMessage(t *testing.T, size int) {
rg1, rg2, m1, m2, _, _, teardown := setupEnv(t)
rg1, rg2, m1, m2, teardown := setupEnv(t)

ctx, cancel := context.WithCancel(context.Background())

Expand Down Expand Up @@ -507,7 +507,7 @@ func TestRouteGroup_RemoteAddr(t *testing.T) {

func TestRouteGroup_TestConn(t *testing.T) {
mp := func() (c1, c2 net.Conn, stop func(), err error) {
rg1, rg2, m1, m2, _, _, teardown := setupEnv(t)
rg1, rg2, m1, m2, teardown := setupEnv(t)

ctx, cancel := context.WithCancel(context.Background())

Expand Down Expand Up @@ -602,8 +602,7 @@ func createRouteGroup(cfg *RouteGroupConfig) *RouteGroup {
return rg
}

func setupEnv(t *testing.T) (rg1, rg2 *RouteGroup, m1, m2 *transport.Manager,
tp1, tp2 *transport.ManagedTransport, teardown func()) {
func setupEnv(t *testing.T) (rg1, rg2 *RouteGroup, m1, m2 *transport.Manager, teardown func()) {
keys := snettest.GenKeyPairs(2)

pk1 := keys[0].PK
Expand Down Expand Up @@ -660,5 +659,5 @@ func setupEnv(t *testing.T) (rg1, rg2 *RouteGroup, m1, m2 *transport.Manager,
nEnv.Teardown()
}

return rg1, rg2, m1, m2, tp1, tp2, teardown
return rg1, rg2, m1, m2, teardown
}

0 comments on commit d30b41d

Please sign in to comment.