diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index 1be72f05f..d7addcb36 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -22,7 +22,7 @@ import ( const ( defaultRouteGroupKeepAliveInterval = 1 * time.Minute defaultReadChBufSize = 1024 - closeRoutineTimeout = 5 * time.Second + closeRoutineTimeout = 2 * time.Second ) var ( @@ -164,6 +164,20 @@ func (rg *RouteGroup) Read(p []byte) (n int, err error) { return 0, io.ErrClosedPipe } + select { + case data, ok = <-rg.readCh: + if !ok || len(data) == 0 { + // route group got closed + return 0, io.EOF + } + + rg.mu.Lock() + defer rg.mu.Unlock() + + return ioutil.BufRead(&rg.readBuf, data, p) + default: + } + return 0, io.EOF case <-rg.readDeadline.Wait(): return 0, timeoutError{} @@ -428,7 +442,7 @@ func (rg *RouteGroup) handleClosePacket(code routing.CloseCode) error { } func (rg *RouteGroup) broadcastClosePackets(code routing.CloseCode) error { - time.Sleep(2 * time.Second) + //time.Sleep(2 * time.Second) for i := 0; i < len(rg.tps); i++ { packet := routing.MakeClosePacket(rg.fwd[i].KeyRouteID(), code) if err := rg.tps[i].WritePacket(context.Background(), packet); err != nil {