Skip to content

Commit

Permalink
make the initial stream receive window configurable
Browse files Browse the repository at this point in the history
While the initial window size is defined by the yamux specification
(256 kB), we can just send a window update as soon as a stream is
opened / accepted. In fact, that's exactly what we do already.
  • Loading branch information
marten-seemann committed May 21, 2021
1 parent 42482e3 commit 017b014
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 13 deletions.
32 changes: 20 additions & 12 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type Config struct {
// an expectation that things will move along quickly.
ConnectionWriteTimeout time.Duration

// InitialStreamWindowSize is used to control the initial
// window size that we allow for a stream.
InitialStreamWindowSize uint32

// MaxStreamWindowSize is used to control the maximum
// window size that we allow for a stream.
MaxStreamWindowSize uint32
Expand All @@ -56,16 +60,17 @@ type Config struct {
// DefaultConfig is used to return a default configuration
func DefaultConfig() *Config {
return &Config{
AcceptBacklog: 256,
PingBacklog: 32,
EnableKeepAlive: true,
KeepAliveInterval: 30 * time.Second,
ConnectionWriteTimeout: 10 * time.Second,
MaxStreamWindowSize: maxStreamWindow,
LogOutput: os.Stderr,
ReadBufSize: 4096,
MaxMessageSize: 64 * 1024,
WriteCoalesceDelay: 100 * time.Microsecond,
AcceptBacklog: 256,
PingBacklog: 32,
EnableKeepAlive: true,
KeepAliveInterval: 30 * time.Second,
ConnectionWriteTimeout: 10 * time.Second,
InitialStreamWindowSize: initialStreamWindow,
MaxStreamWindowSize: maxStreamWindow,
LogOutput: os.Stderr,
ReadBufSize: 4096,
MaxMessageSize: 64 * 1024,
WriteCoalesceDelay: 100 * time.Microsecond,
}
}

Expand All @@ -77,8 +82,11 @@ func VerifyConfig(config *Config) error {
if config.KeepAliveInterval == 0 {
return fmt.Errorf("keep-alive interval must be positive")
}
if config.MaxStreamWindowSize < initialStreamWindow {
return errors.New("MaxStreamWindowSize must be larger than the initialStreamWindow (256 kB)")
if config.InitialStreamWindowSize < initialStreamWindow {
return errors.New("InitialStreamWindowSize must be larger or equal 256 kB")
}
if config.MaxStreamWindowSize < config.InitialStreamWindowSize {
return errors.New("MaxStreamWindowSize must be larger than the InitialStreamWindowSize")
}
if config.MaxMessageSize < 1024 {
return fmt.Errorf("MaxMessageSize must be greater than a kilobyte")
Expand Down
49 changes: 49 additions & 0 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"net"
"reflect"
"runtime"
Expand Down Expand Up @@ -1683,3 +1684,51 @@ func TestReadDeadlineInterrupt(t *testing.T) {
}
}
}

// Make sure that a transfer doesn't stall, no matter what values the peers use for their InitialStreamWindow.
func TestInitialStreamWindow(t *testing.T) {
for i := 0; i < 10; i++ {
const (
maxWindow = 5 * initialStreamWindow
transferSize = 10 * maxWindow
)
rand.Seed(time.Now().UnixNano())
randomUint32 := func(min, max uint32) uint32 { return uint32(rand.Int63n(int64(max-min))) + min }

cconf := DefaultConfig()
cconf.InitialStreamWindowSize = randomUint32(initialStreamWindow, maxWindow)
sconf := DefaultConfig()
sconf.InitialStreamWindowSize = randomUint32(initialStreamWindow, maxWindow)

conn1, conn2 := testConn()
client, _ := Client(conn1, cconf)
server, _ := Server(conn2, sconf)

errChan := make(chan error, 1)
go func() {
defer close(errChan)
str, err := client.OpenStream(context.Background())
if err != nil {
errChan <- err
return
}
defer str.Close()
if _, err := str.Write(make([]byte, transferSize)); err != nil {
errChan <- err
return
}
}()

str, err := server.AcceptStream()
if err != nil {
t.Fatal(err)
}
data, err := ioutil.ReadAll(str)
if err != nil {
t.Fatal(err)
}
if uint32(len(data)) != transferSize {
t.Fatalf("expected %d bytes to be transferred, got %d", transferSize, len(data))
}
}
}
5 changes: 4 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@ func newStream(session *Session, id uint32, state streamState) *Stream {
sendWindow: initialStreamWindow,
readDeadline: makePipeDeadline(),
writeDeadline: makePipeDeadline(),
// Initialize the recvBuf with initialStreamWindow, not config.InitialStreamWindowSize.
// The peer isn't allowed to send more data than initialStreamWindow until we've sent
// the first window update (which will grant it up to config.InitialStreamWindowSize).
recvBuf: newSegmentedBuffer(initialStreamWindow),
recvWindow: initialStreamWindow,
recvWindow: session.config.InitialStreamWindowSize,
epochStart: time.Now(),
recvNotifyCh: make(chan struct{}, 1),
sendNotifyCh: make(chan struct{}, 1),
Expand Down

0 comments on commit 017b014

Please sign in to comment.