Skip to content

Commit

Permalink
Make RouteGroup i/o timeout configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Dec 13, 2019
1 parent 639bf8f commit 85f7412
Showing 1 changed file with 67 additions and 64 deletions.
131 changes: 67 additions & 64 deletions pkg/router/route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

const (
defaultRouteGroupKeepAliveInterval = 1 * time.Minute
defaultRouteGroupIOTimeout = 3 * time.Second
defaultReadChBufSize = 1024
)

Expand All @@ -45,13 +46,15 @@ func (timeoutError) Temporary() bool { return true }
type RouteGroupConfig struct {
ReadChBufSize int
KeepAliveInterval time.Duration
IOTimeout time.Duration
}

// DefaultRouteGroupConfig returns default RouteGroup config.
// Used by default if config is nil.
func DefaultRouteGroupConfig() *RouteGroupConfig {
return &RouteGroupConfig{
KeepAliveInterval: defaultRouteGroupKeepAliveInterval,
IOTimeout: defaultRouteGroupIOTimeout,
ReadChBufSize: defaultReadChBufSize,
}
}
Expand Down Expand Up @@ -122,13 +125,13 @@ func NewRouteGroup(cfg *RouteGroupConfig, rt routing.Table, desc routing.RouteDe
// Read reads the next packet payload of a RouteGroup.
// The Router, via transport.Manager, is responsible for reading incoming packets and pushing it
// to the appropriate RouteGroup via (*RouteGroup).readCh.
func (r *RouteGroup) Read(p []byte) (n int, err error) {
if r.isClosed() {
func (rg *RouteGroup) Read(p []byte) (n int, err error) {
if rg.isClosed() {
return 0, io.ErrClosedPipe
}

if r.readDeadline.Closed() {
r.logger.Infoln("TIMEOUT ERROR?")
if rg.readDeadline.Closed() {
rg.logger.Infoln("TIMEOUT ERROR?")
return 0, timeoutError{}
}

Expand All @@ -137,61 +140,61 @@ func (r *RouteGroup) Read(p []byte) (n int, err error) {
}

// In case the read buffer is short.
r.mu.Lock()
if r.readBuf.Len() > 0 {
data, err := r.readBuf.Read(p)
r.mu.Unlock()
rg.mu.Lock()
if rg.readBuf.Len() > 0 {
data, err := rg.readBuf.Read(p)
rg.mu.Unlock()

return data, err
}
r.mu.Unlock()
rg.mu.Unlock()

timeout := time.NewTimer(5 * time.Second)
timeout := time.NewTimer(rg.cfg.IOTimeout)
defer timeout.Stop()

var data []byte
select {
case <-r.readDeadline.Wait():
case <-rg.readDeadline.Wait():
return 0, timeoutError{}
case <-timeout.C:
return 0, io.EOF
case data = <-r.readCh:
case data = <-rg.readCh:
}

r.mu.Lock()
defer r.mu.Unlock()
rg.mu.Lock()
defer rg.mu.Unlock()

return ioutil.BufRead(&r.readBuf, data, p)
return ioutil.BufRead(&rg.readBuf, data, p)
}

// Write writes payload to a RouteGroup
// For the first version, only the first ForwardRule (fwd[0]) is used for writing.
func (r *RouteGroup) Write(p []byte) (n int, err error) {
if r.isClosed() {
func (rg *RouteGroup) Write(p []byte) (n int, err error) {
if rg.isClosed() {
return 0, io.ErrClosedPipe
}

if r.writeDeadline.Closed() {
if rg.writeDeadline.Closed() {
return 0, timeoutError{}
}

if len(p) == 0 {
return 0, nil
}

r.mu.Lock()
defer r.mu.Unlock()
rg.mu.Lock()
defer rg.mu.Unlock()

if len(r.tps) == 0 {
if len(rg.tps) == 0 {
return 0, ErrNoTransports
}

if len(r.fwd) == 0 {
if len(rg.fwd) == 0 {
return 0, ErrNoRules
}

tp := r.tps[0]
rule := r.fwd[0]
tp := rg.tps[0]
rule := rg.fwd[0]

if tp == nil {
return 0, ErrBadTransport
Expand All @@ -212,11 +215,11 @@ func (r *RouteGroup) Write(p []byte) (n int, err error) {
close(errCh)
}()

timeout := time.NewTimer(5 * time.Second)
timeout := time.NewTimer(rg.cfg.IOTimeout)
defer timeout.Stop()

select {
case <-r.writeDeadline.Wait():
case <-rg.writeDeadline.Wait():
return 0, timeoutError{}
case <-timeout.C:
return 0, io.EOF
Expand All @@ -225,7 +228,7 @@ func (r *RouteGroup) Write(p []byte) (n int, err error) {
return 0, err
}

atomic.StoreInt64(&r.lastSent, time.Now().UnixNano())
atomic.StoreInt64(&rg.lastSent, time.Now().UnixNano())

return len(p), nil
}
Expand All @@ -235,99 +238,99 @@ func (r *RouteGroup) Write(p []byte) (n int, err error) {
// - Send Close packet for all ForwardRules.
// - Delete all rules (ForwardRules and ConsumeRules) from routing table.
// - Close all go channels.
func (r *RouteGroup) Close() error {
r.mu.Lock()
defer r.mu.Unlock()
func (rg *RouteGroup) Close() error {
rg.mu.Lock()
defer rg.mu.Unlock()

if len(r.fwd) != len(r.tps) {
if len(rg.fwd) != len(rg.tps) {
return ErrRuleTransportMismatch
}

for i := 0; i < len(r.tps); i++ {
packet := routing.MakeClosePacket(r.fwd[i].KeyRouteID(), routing.CloseRequested)
if err := r.tps[i].WritePacket(context.Background(), packet); err != nil {
for i := 0; i < len(rg.tps); i++ {
packet := routing.MakeClosePacket(rg.fwd[i].KeyRouteID(), routing.CloseRequested)
if err := rg.tps[i].WritePacket(context.Background(), packet); err != nil {
return err
}
}

rules := r.rt.RulesWithDesc(r.desc)
rules := rg.rt.RulesWithDesc(rg.desc)
routeIDs := make([]routing.RouteID, 0, len(rules))

for _, rule := range rules {
routeIDs = append(routeIDs, rule.KeyRouteID())
}

r.rt.DelRules(routeIDs)
rg.rt.DelRules(routeIDs)

r.once.Do(func() {
close(r.done)
r.readChMu.Lock()
close(r.readCh)
r.readChMu.Unlock()
rg.once.Do(func() {
close(rg.done)
rg.readChMu.Lock()
close(rg.readCh)
rg.readChMu.Unlock()
})

return nil
}

// LocalAddr returns destination address of underlying RouteDescriptor.
func (r *RouteGroup) LocalAddr() net.Addr {
return r.desc.Dst()
func (rg *RouteGroup) LocalAddr() net.Addr {
return rg.desc.Dst()
}

// RemoteAddr returns source address of underlying RouteDescriptor.
func (r *RouteGroup) RemoteAddr() net.Addr {
return r.desc.Src()
func (rg *RouteGroup) RemoteAddr() net.Addr {
return rg.desc.Src()
}

// SetDeadline sets both read and write deadlines.
func (r *RouteGroup) SetDeadline(t time.Time) error {
if err := r.SetReadDeadline(t); err != nil {
func (rg *RouteGroup) SetDeadline(t time.Time) error {
if err := rg.SetReadDeadline(t); err != nil {
return err
}

return r.SetWriteDeadline(t)
return rg.SetWriteDeadline(t)
}

// SetReadDeadline sets read deadline.
func (r *RouteGroup) SetReadDeadline(t time.Time) error {
r.readDeadline.Set(t)
func (rg *RouteGroup) SetReadDeadline(t time.Time) error {
rg.readDeadline.Set(t)
return nil
}

// SetWriteDeadline sets write deadline.
func (r *RouteGroup) SetWriteDeadline(t time.Time) error {
r.writeDeadline.Set(t)
func (rg *RouteGroup) SetWriteDeadline(t time.Time) error {
rg.writeDeadline.Set(t)
return nil
}

func (r *RouteGroup) keepAliveLoop(interval time.Duration) {
func (rg *RouteGroup) keepAliveLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for range ticker.C {
lastSent := time.Unix(0, atomic.LoadInt64(&r.lastSent))
lastSent := time.Unix(0, atomic.LoadInt64(&rg.lastSent))

if time.Since(lastSent) < interval {
continue
}

if err := r.sendKeepAlive(); err != nil {
r.logger.Warnf("Failed to send keepalive: %v", err)
if err := rg.sendKeepAlive(); err != nil {
rg.logger.Warnf("Failed to send keepalive: %v", err)
}
}
}

func (r *RouteGroup) sendKeepAlive() error {
r.mu.Lock()
defer r.mu.Unlock()
func (rg *RouteGroup) sendKeepAlive() error {
rg.mu.Lock()
defer rg.mu.Unlock()

if len(r.tps) == 0 || len(r.fwd) == 0 {
if len(rg.tps) == 0 || len(rg.fwd) == 0 {
// if no transports, no rules, then no keepalive
return nil
}

tp := r.tps[0]
rule := r.fwd[0]
tp := rg.tps[0]
rule := rg.fwd[0]

if tp == nil {
return ErrBadTransport
Expand All @@ -341,9 +344,9 @@ func (r *RouteGroup) sendKeepAlive() error {
return nil
}

func (r *RouteGroup) isClosed() bool {
func (rg *RouteGroup) isClosed() bool {
select {
case <-r.done:
case <-rg.done:
return true
default:
return false
Expand Down

0 comments on commit 85f7412

Please sign in to comment.