diff --git a/mux.go b/mux.go index ae4655d..49bd51c 100644 --- a/mux.go +++ b/mux.go @@ -31,6 +31,10 @@ type Config struct { // an expectation that things will move along quickly. ConnectionWriteTimeout time.Duration + // InitialStreamWindowSize is used to control the initial + // window size that we allow for a stream. + InitialStreamWindowSize uint32 + // MaxStreamWindowSize is used to control the maximum // window size that we allow for a stream. MaxStreamWindowSize uint32 @@ -56,16 +60,17 @@ type Config struct { // DefaultConfig is used to return a default configuration func DefaultConfig() *Config { return &Config{ - AcceptBacklog: 256, - PingBacklog: 32, - EnableKeepAlive: true, - KeepAliveInterval: 30 * time.Second, - ConnectionWriteTimeout: 10 * time.Second, - MaxStreamWindowSize: maxStreamWindow, - LogOutput: os.Stderr, - ReadBufSize: 4096, - MaxMessageSize: 64 * 1024, - WriteCoalesceDelay: 100 * time.Microsecond, + AcceptBacklog: 256, + PingBacklog: 32, + EnableKeepAlive: true, + KeepAliveInterval: 30 * time.Second, + ConnectionWriteTimeout: 10 * time.Second, + InitialStreamWindowSize: initialStreamWindow, + MaxStreamWindowSize: maxStreamWindow, + LogOutput: os.Stderr, + ReadBufSize: 4096, + MaxMessageSize: 64 * 1024, + WriteCoalesceDelay: 100 * time.Microsecond, } } @@ -77,8 +82,11 @@ func VerifyConfig(config *Config) error { if config.KeepAliveInterval == 0 { return fmt.Errorf("keep-alive interval must be positive") } - if config.MaxStreamWindowSize < initialStreamWindow { - return errors.New("MaxStreamWindowSize must be larger than the initialStreamWindow (256 kB)") + if config.InitialStreamWindowSize < initialStreamWindow { + return errors.New("InitialStreamWindowSize must be larger or equal 256 kB") + } + if config.MaxStreamWindowSize < config.InitialStreamWindowSize { + return errors.New("MaxStreamWindowSize must be larger than the InitialStreamWindowSize") } if config.MaxMessageSize < 1024 { return fmt.Errorf("MaxMessageSize must be greater than a kilobyte") diff --git a/session_test.go b/session_test.go index 0cb9017..b857b14 100644 --- a/session_test.go +++ b/session_test.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "io/ioutil" + "math/rand" "net" "reflect" "runtime" @@ -1683,3 +1684,51 @@ func TestReadDeadlineInterrupt(t *testing.T) { } } } + +// Make sure that a transfer doesn't stall, no matter what values the peers use for their InitialStreamWindow. +func TestInitialStreamWindow(t *testing.T) { + for i := 0; i < 10; i++ { + const ( + maxWindow = 5 * initialStreamWindow + transferSize = 10 * maxWindow + ) + rand.Seed(time.Now().UnixNano()) + randomUint32 := func(min, max uint32) uint32 { return uint32(rand.Int63n(int64(max-min))) + min } + + cconf := DefaultConfig() + cconf.InitialStreamWindowSize = randomUint32(initialStreamWindow, maxWindow) + sconf := DefaultConfig() + sconf.InitialStreamWindowSize = randomUint32(initialStreamWindow, maxWindow) + + conn1, conn2 := testConn() + client, _ := Client(conn1, cconf) + server, _ := Server(conn2, sconf) + + errChan := make(chan error, 1) + go func() { + defer close(errChan) + str, err := client.OpenStream(context.Background()) + if err != nil { + errChan <- err + return + } + defer str.Close() + if _, err := str.Write(make([]byte, transferSize)); err != nil { + errChan <- err + return + } + }() + + str, err := server.AcceptStream() + if err != nil { + t.Fatal(err) + } + data, err := ioutil.ReadAll(str) + if err != nil { + t.Fatal(err) + } + if uint32(len(data)) != transferSize { + t.Fatalf("expected %d bytes to be transferred, got %d", transferSize, len(data)) + } + } +} diff --git a/stream.go b/stream.go index 06ae368..2cb015e 100644 --- a/stream.go +++ b/stream.go @@ -59,8 +59,11 @@ func newStream(session *Session, id uint32, state streamState) *Stream { sendWindow: initialStreamWindow, readDeadline: makePipeDeadline(), writeDeadline: makePipeDeadline(), + // Initialize the recvBuf with initialStreamWindow, not config.InitialStreamWindowSize. + // The peer isn't allowed to send more data than initialStreamWindow until we've sent + // the first window update (which will grant it up to config.InitialStreamWindowSize). recvBuf: newSegmentedBuffer(initialStreamWindow), - recvWindow: initialStreamWindow, + recvWindow: session.config.InitialStreamWindowSize, epochStart: time.Now(), recvNotifyCh: make(chan struct{}, 1), sendNotifyCh: make(chan struct{}, 1),