diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index 783e7720bd..cf0305b43f 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -21,6 +21,7 @@ import ( const ( defaultRouteGroupKeepAliveInterval = 1 * time.Minute + defaultRouteGroupIOTimeout = 3 * time.Second defaultReadChBufSize = 1024 ) @@ -45,6 +46,7 @@ func (timeoutError) Temporary() bool { return true } type RouteGroupConfig struct { ReadChBufSize int KeepAliveInterval time.Duration + IOTimeout time.Duration } // DefaultRouteGroupConfig returns default RouteGroup config. @@ -52,6 +54,7 @@ type RouteGroupConfig struct { func DefaultRouteGroupConfig() *RouteGroupConfig { return &RouteGroupConfig{ KeepAliveInterval: defaultRouteGroupKeepAliveInterval, + IOTimeout: defaultRouteGroupIOTimeout, ReadChBufSize: defaultReadChBufSize, } } @@ -122,13 +125,13 @@ func NewRouteGroup(cfg *RouteGroupConfig, rt routing.Table, desc routing.RouteDe // Read reads the next packet payload of a RouteGroup. // The Router, via transport.Manager, is responsible for reading incoming packets and pushing it // to the appropriate RouteGroup via (*RouteGroup).readCh. -func (r *RouteGroup) Read(p []byte) (n int, err error) { - if r.isClosed() { +func (rg *RouteGroup) Read(p []byte) (n int, err error) { + if rg.isClosed() { return 0, io.ErrClosedPipe } - if r.readDeadline.Closed() { - r.logger.Infoln("TIMEOUT ERROR?") + if rg.readDeadline.Closed() { + rg.logger.Infoln("TIMEOUT ERROR?") return 0, timeoutError{} } @@ -137,41 +140,41 @@ func (r *RouteGroup) Read(p []byte) (n int, err error) { } // In case the read buffer is short. - r.mu.Lock() - if r.readBuf.Len() > 0 { - data, err := r.readBuf.Read(p) - r.mu.Unlock() + rg.mu.Lock() + if rg.readBuf.Len() > 0 { + data, err := rg.readBuf.Read(p) + rg.mu.Unlock() return data, err } - r.mu.Unlock() + rg.mu.Unlock() - timeout := time.NewTimer(5 * time.Second) + timeout := time.NewTimer(rg.cfg.IOTimeout) defer timeout.Stop() var data []byte select { - case <-r.readDeadline.Wait(): + case <-rg.readDeadline.Wait(): return 0, timeoutError{} case <-timeout.C: return 0, io.EOF - case data = <-r.readCh: + case data = <-rg.readCh: } - r.mu.Lock() - defer r.mu.Unlock() + rg.mu.Lock() + defer rg.mu.Unlock() - return ioutil.BufRead(&r.readBuf, data, p) + return ioutil.BufRead(&rg.readBuf, data, p) } // Write writes payload to a RouteGroup // For the first version, only the first ForwardRule (fwd[0]) is used for writing. -func (r *RouteGroup) Write(p []byte) (n int, err error) { - if r.isClosed() { +func (rg *RouteGroup) Write(p []byte) (n int, err error) { + if rg.isClosed() { return 0, io.ErrClosedPipe } - if r.writeDeadline.Closed() { + if rg.writeDeadline.Closed() { return 0, timeoutError{} } @@ -179,19 +182,19 @@ func (r *RouteGroup) Write(p []byte) (n int, err error) { return 0, nil } - r.mu.Lock() - defer r.mu.Unlock() + rg.mu.Lock() + defer rg.mu.Unlock() - if len(r.tps) == 0 { + if len(rg.tps) == 0 { return 0, ErrNoTransports } - if len(r.fwd) == 0 { + if len(rg.fwd) == 0 { return 0, ErrNoRules } - tp := r.tps[0] - rule := r.fwd[0] + tp := rg.tps[0] + rule := rg.fwd[0] if tp == nil { return 0, ErrBadTransport @@ -212,11 +215,11 @@ func (r *RouteGroup) Write(p []byte) (n int, err error) { close(errCh) }() - timeout := time.NewTimer(5 * time.Second) + timeout := time.NewTimer(rg.cfg.IOTimeout) defer timeout.Stop() select { - case <-r.writeDeadline.Wait(): + case <-rg.writeDeadline.Wait(): return 0, timeoutError{} case <-timeout.C: return 0, io.EOF @@ -225,7 +228,7 @@ func (r *RouteGroup) Write(p []byte) (n int, err error) { return 0, err } - atomic.StoreInt64(&r.lastSent, time.Now().UnixNano()) + atomic.StoreInt64(&rg.lastSent, time.Now().UnixNano()) return len(p), nil } @@ -235,99 +238,99 @@ func (r *RouteGroup) Write(p []byte) (n int, err error) { // - Send Close packet for all ForwardRules. // - Delete all rules (ForwardRules and ConsumeRules) from routing table. // - Close all go channels. -func (r *RouteGroup) Close() error { - r.mu.Lock() - defer r.mu.Unlock() +func (rg *RouteGroup) Close() error { + rg.mu.Lock() + defer rg.mu.Unlock() - if len(r.fwd) != len(r.tps) { + if len(rg.fwd) != len(rg.tps) { return ErrRuleTransportMismatch } - for i := 0; i < len(r.tps); i++ { - packet := routing.MakeClosePacket(r.fwd[i].KeyRouteID(), routing.CloseRequested) - if err := r.tps[i].WritePacket(context.Background(), packet); err != nil { + for i := 0; i < len(rg.tps); i++ { + packet := routing.MakeClosePacket(rg.fwd[i].KeyRouteID(), routing.CloseRequested) + if err := rg.tps[i].WritePacket(context.Background(), packet); err != nil { return err } } - rules := r.rt.RulesWithDesc(r.desc) + rules := rg.rt.RulesWithDesc(rg.desc) routeIDs := make([]routing.RouteID, 0, len(rules)) for _, rule := range rules { routeIDs = append(routeIDs, rule.KeyRouteID()) } - r.rt.DelRules(routeIDs) + rg.rt.DelRules(routeIDs) - r.once.Do(func() { - close(r.done) - r.readChMu.Lock() - close(r.readCh) - r.readChMu.Unlock() + rg.once.Do(func() { + close(rg.done) + rg.readChMu.Lock() + close(rg.readCh) + rg.readChMu.Unlock() }) return nil } // LocalAddr returns destination address of underlying RouteDescriptor. -func (r *RouteGroup) LocalAddr() net.Addr { - return r.desc.Dst() +func (rg *RouteGroup) LocalAddr() net.Addr { + return rg.desc.Dst() } // RemoteAddr returns source address of underlying RouteDescriptor. -func (r *RouteGroup) RemoteAddr() net.Addr { - return r.desc.Src() +func (rg *RouteGroup) RemoteAddr() net.Addr { + return rg.desc.Src() } // SetDeadline sets both read and write deadlines. -func (r *RouteGroup) SetDeadline(t time.Time) error { - if err := r.SetReadDeadline(t); err != nil { +func (rg *RouteGroup) SetDeadline(t time.Time) error { + if err := rg.SetReadDeadline(t); err != nil { return err } - return r.SetWriteDeadline(t) + return rg.SetWriteDeadline(t) } // SetReadDeadline sets read deadline. -func (r *RouteGroup) SetReadDeadline(t time.Time) error { - r.readDeadline.Set(t) +func (rg *RouteGroup) SetReadDeadline(t time.Time) error { + rg.readDeadline.Set(t) return nil } // SetWriteDeadline sets write deadline. -func (r *RouteGroup) SetWriteDeadline(t time.Time) error { - r.writeDeadline.Set(t) +func (rg *RouteGroup) SetWriteDeadline(t time.Time) error { + rg.writeDeadline.Set(t) return nil } -func (r *RouteGroup) keepAliveLoop(interval time.Duration) { +func (rg *RouteGroup) keepAliveLoop(interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() for range ticker.C { - lastSent := time.Unix(0, atomic.LoadInt64(&r.lastSent)) + lastSent := time.Unix(0, atomic.LoadInt64(&rg.lastSent)) if time.Since(lastSent) < interval { continue } - if err := r.sendKeepAlive(); err != nil { - r.logger.Warnf("Failed to send keepalive: %v", err) + if err := rg.sendKeepAlive(); err != nil { + rg.logger.Warnf("Failed to send keepalive: %v", err) } } } -func (r *RouteGroup) sendKeepAlive() error { - r.mu.Lock() - defer r.mu.Unlock() +func (rg *RouteGroup) sendKeepAlive() error { + rg.mu.Lock() + defer rg.mu.Unlock() - if len(r.tps) == 0 || len(r.fwd) == 0 { + if len(rg.tps) == 0 || len(rg.fwd) == 0 { // if no transports, no rules, then no keepalive return nil } - tp := r.tps[0] - rule := r.fwd[0] + tp := rg.tps[0] + rule := rg.fwd[0] if tp == nil { return ErrBadTransport @@ -341,9 +344,9 @@ func (r *RouteGroup) sendKeepAlive() error { return nil } -func (r *RouteGroup) isClosed() bool { +func (rg *RouteGroup) isClosed() bool { select { - case <-r.done: + case <-rg.done: return true default: return false