Skip to content

Commit

Permalink
Merged with Evan fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ayuryshev committed May 29, 2019
2 parents d78d470 + 0cd5668 commit 307dfc7
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 39 deletions.
2 changes: 1 addition & 1 deletion pkg/messaging/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (mCh *msgChannel) readEncrypted(ctx context.Context, p []byte) (n int, err
}

if len(data) > len(p) {
if _, err := mCh.buf.Write(data[len(p):]); err != nil {
if _, err := mCh.buf.Write(data[len(p):]); err != nil { // TODO: data race.
return 0, io.ErrShortBuffer
}

Expand Down
54 changes: 23 additions & 31 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,37 +80,29 @@ func New(config *Config) *Router {
func (r *Router) Serve(ctx context.Context) error {

go func() {
for tr := range r.tm.TrChan {
if tr.Accepted {
go func(t *transport.ManagedTransport) {
for {
var err error
if r.IsSetupTransport(t) {
err = r.rm.Serve(t)
} else {
err = r.serveTransport(t)
}
for tp := range r.tm.TrChan {
isAccepted, isSetup := tp.Accepted, r.IsSetupTransport(tp)

var serve func(io.ReadWriter) error
switch {
case isAccepted && isSetup:
serve = r.rm.Serve
case !isSetup:
serve = r.serveTransport
default:
continue
}

if err != nil {
if err != io.EOF {
r.Logger.Warnf("Stopped serving Transport: %s", err)
}
return
go func(tp transport.Transport) {
for {
if err := serve(tp); err != nil {
if err != io.EOF {
r.Logger.Warnf("Stopped serving Transport: %s", err)
}
return
}
}(tr)
} else {
go func(t *transport.ManagedTransport) {
for {
if err := r.serveTransport(t); err != nil {
if err != io.EOF {
r.Logger.Warnf("Stopped serving Transport: %s", err)
}
return
}
}
}(tr)
}
}
}(tp)
}
}()

Expand Down Expand Up @@ -178,14 +170,14 @@ func (r *Router) Close() error {
return r.tm.Close()
}

func (r *Router) serveTransport(tr *transport.ManagedTransport) error {
func (r *Router) serveTransport(rw io.ReadWriter) error {
packet := make(routing.Packet, 6)
if _, err := io.ReadFull(tr, packet); err != nil {
if _, err := io.ReadFull(rw, packet); err != nil {
return err
}

payload := make([]byte, packet.Size())
if _, err := io.ReadFull(tr, payload); err != nil {
if _, err := io.ReadFull(rw, payload); err != nil {
return err
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,6 @@ func TestRouterSetupLoop(t *testing.T) {
r := New(conf)
errCh := make(chan error)
go func() {
// acceptCh, _ := m2.Observe()
// tr := <-acceptCh
var tr *transport.ManagedTransport
for tr = range m2.TrChan {
if tr.Accepted {
Expand Down
1 change: 0 additions & 1 deletion pkg/setup/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func (sn *Node) Serve(ctx context.Context) error {

go func() {
for tr := range sn.tm.TrChan {

if tr.Accepted {
go func(t transport.Transport) {
for {
Expand Down
6 changes: 3 additions & 3 deletions pkg/setup/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewSetupProtocol(rw io.ReadWriter) *Protocol {
// ReadPacket reads a single setup packet.
func (p *Protocol) ReadPacket() (PacketType, []byte, error) {
rawLen := make([]byte, 2)
if _, err := io.ReadFull(p.rw, rawLen); err != nil {
if _, err := io.ReadFull(p.rw, rawLen); err != nil { // TODO: data race.
return 0, nil, err
}
rawBody := make([]byte, binary.BigEndian.Uint16(rawLen))
Expand Down Expand Up @@ -146,7 +146,7 @@ func CreateLoop(p *Protocol, l *routing.Loop) error {
if err := p.WritePacket(PacketCreateLoop, l); err != nil {
return err
}
if err := readAndDecodePacket(p, nil); err != nil {
if err := readAndDecodePacket(p, nil); err != nil { // TODO: data race.
return err
}
return nil
Expand Down Expand Up @@ -187,7 +187,7 @@ func LoopClosed(p *Protocol, l *LoopData) error {
}

func readAndDecodePacket(p *Protocol, v interface{}) error {
t, raw, err := p.ReadPacket()
t, raw, err := p.ReadPacket() // TODO: data race.
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/transport/managed_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func newManagedTransport(id uuid.UUID, tr Transport, public bool, accepted bool)
// Read reads using underlying
func (tr *ManagedTransport) Read(p []byte) (n int, err error) {
tr.mu.RLock()
n, err = tr.Transport.Read(p)
n, err = tr.Transport.Read(p) // TODO: data race.
tr.mu.RUnlock()
if err == nil {
select {
Expand Down

0 comments on commit 307dfc7

Please sign in to comment.