diff --git a/bytebuffer.go b/bytebuffer.go index 07a055a..3154920 100644 --- a/bytebuffer.go +++ b/bytebuffer.go @@ -1,6 +1,9 @@ package bytebufferpool -import "io" +import ( + "io" + "unsafe" +) // ByteBuffer provides byte buffer, which can be used for minimizing // memory allocations. @@ -102,7 +105,7 @@ func (b *ByteBuffer) SetString(s string) { // String returns string representation of ByteBuffer.B. func (b *ByteBuffer) String() string { - return string(b.B) + return *(*string)(unsafe.Pointer(&b.B)) } // Reset makes ByteBuffer.B empty. diff --git a/bytebuffer_example_test.go b/bytebuffer_example_test.go index 1cbaaf5..a38d7fb 100644 --- a/bytebuffer_example_test.go +++ b/bytebuffer_example_test.go @@ -3,7 +3,7 @@ package bytebufferpool_test import ( "fmt" - "github.com/valyala/bytebufferpool" + "github.com/gallir/bytebufferpool" ) func ExampleByteBuffer() { diff --git a/bytebuffer_test.go b/bytebuffer_test.go index 7bb658f..d83b4dc 100644 --- a/bytebuffer_test.go +++ b/bytebuffer_test.go @@ -70,6 +70,10 @@ func TestByteBufferGetPutSerial(t *testing.T) { testByteBufferGetPut(t) } +func TestByteBufferGetLenPutSerial(t *testing.T) { + testByteBufferGetLenPut(t) +} + func TestByteBufferGetPutConcurrent(t *testing.T) { concurrency := 10 ch := make(chan struct{}, concurrency) @@ -89,6 +93,25 @@ func TestByteBufferGetPutConcurrent(t *testing.T) { } } +func TestByteBufferGetLenPutConcurrent(t *testing.T) { + concurrency := 10 + ch := make(chan struct{}, concurrency) + for i := 0; i < concurrency; i++ { + go func() { + testByteBufferGetLenPut(t) + ch <- struct{}{} + }() + } + + for i := 0; i < concurrency; i++ { + select { + case <-ch: + case <-time.After(time.Second): + t.Fatalf("timeout!") + } + } +} + func testByteBufferGetPut(t *testing.T) { for i := 0; i < 10; i++ { expectedS := fmt.Sprintf("num %d", i) @@ -102,6 +125,24 @@ func testByteBufferGetPut(t *testing.T) { } } +func testByteBufferGetLenPut(t *testing.T) { + bytes := []byte("test len ") + for i := 0; i < 10; i++ { + expectedS := fmt.Sprintf("%s num %d", string(bytes), i) + b := GetLen(len(bytes)) + if len(b.B) != len(bytes) { + t.Fatalf("unexpected len: %d. Expecting %d", len(b.B), len(bytes)) + } + copy(b.B, bytes) + b.B = append(b.B, " num "...) + b.B = append(b.B, fmt.Sprintf("%d", i)...) + if string(b.B) != expectedS { + t.Fatalf("unexpected result: %q. Expecting %q", b.B, expectedS) + } + Put(b) + } +} + func testByteBufferGetString(t *testing.T) { for i := 0; i < 10; i++ { expectedS := fmt.Sprintf("num %d", i) diff --git a/go.mod b/go.mod index be783c1..d498416 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ -module github.com/valyala/bytebufferpool +module github.com/gallir/bytebufferpool go 1.12 diff --git a/pool.go b/pool.go index 8bb4134..12915fb 100644 --- a/pool.go +++ b/pool.go @@ -7,11 +7,11 @@ import ( ) const ( - minBitSize = 6 // 2**6=64 is a CPU cache line size - steps = 20 + defaultMinBitSize = 6 // 2**6=64 is a CPU cache line size + steps = 20 - minSize = 1 << minBitSize - maxSize = 1 << (minBitSize + steps - 1) + defaultMinSize = 1 << defaultMinBitSize + defaultMaxSize = 1 << (defaultMinBitSize + steps - 1) calibrateCallsThreshold = 42000 maxPercentile = 0.95 @@ -29,6 +29,9 @@ type Pool struct { defaultSize uint64 maxSize uint64 + minBitSize uint64 + minSize uint64 + pool sync.Pool } @@ -48,13 +51,56 @@ func Get() *ByteBuffer { return defaultPool.Get() } func (p *Pool) Get() *ByteBuffer { v := p.pool.Get() if v != nil { - return v.(*ByteBuffer) + b := v.(*ByteBuffer) + b.Reset() + return b } return &ByteBuffer{ B: make([]byte, 0, atomic.LoadUint64(&p.defaultSize)), } } +// GetLen returns a buufer with its +// []byte slice of the exact len as specified +// +// The byte buffer may be returned to the pool via Put after the use +// in order to minimize GC overhead. +func GetLen(s int) *ByteBuffer { return defaultPool.GetLen(s) } + +// GetLen return a buufer with its +// []byte slice of the exact len as specified +// +// The byte buffer may be returned to the pool via Put after the use +// in order to minimize GC overhead. +func (p *Pool) GetLen(s int) *ByteBuffer { + v := p.pool.Get() + if v == nil { + size := int(p.minSize << uint(index(p.minBitSize, s))) + if size < s { + size = s + } + return &ByteBuffer{ + B: make([]byte, s, size), + } + } + + b := v.(*ByteBuffer) + if cap(b.B) >= s { + b.B = b.B[:s] + return b + } + + // The size is smaller, return it to the pool and create another one + p.pool.Put(b) + size := int(p.minSize << uint(index(p.minBitSize, s))) + if size < s { + size = s + } + return &ByteBuffer{ + B: make([]byte, s, size), + } +} + // Put returns byte buffer to the pool. // // ByteBuffer.B mustn't be touched after returning it to the pool. @@ -65,7 +111,11 @@ func Put(b *ByteBuffer) { defaultPool.Put(b) } // // The buffer mustn't be accessed after returning to the pool. func (p *Pool) Put(b *ByteBuffer) { - idx := index(len(b.B)) + if p.minBitSize == 0 { + p.initBins() + } + + idx := index(p.minBitSize, len(b.B)) if atomic.AddUint64(&p.calls[idx], 1) > calibrateCallsThreshold { p.calibrate() @@ -73,7 +123,6 @@ func (p *Pool) Put(b *ByteBuffer) { maxSize := int(atomic.LoadUint64(&p.maxSize)) if maxSize == 0 || cap(b.B) <= maxSize { - b.Reset() p.pool.Put(b) } } @@ -83,6 +132,10 @@ func (p *Pool) calibrate() { return } + if p.minBitSize == 0 { + p.initBins() + } + a := make(callSizes, 0, steps) var callsSum uint64 for i := uint64(0); i < steps; i++ { @@ -90,9 +143,19 @@ func (p *Pool) calibrate() { callsSum += calls a = append(a, callSize{ calls: calls, - size: minSize << i, + size: p.minSize << i, }) } + if p.minBitSize+steps < 32 && a[steps-1].calls > a[0].calls { + // Increase the first bin's size + p.resizeBins(p.minBitSize + 1) + } else if p.minBitSize > defaultMinBitSize && + a[0].calls > 0 && + a[steps-2].calls == 0 && + a[steps-1].calls == 0 { + // Decrease the size of first bin's size + p.resizeBins(p.minBitSize - 1) + } sort.Sort(a) defaultSize := a[0].size @@ -117,6 +180,16 @@ func (p *Pool) calibrate() { atomic.StoreUint64(&p.calibrating, 0) } +func (p *Pool) resizeBins(minBitSize uint64) { + atomic.StoreUint64(&p.minBitSize, minBitSize) + atomic.StoreUint64(&p.minSize, 1<>= minBitSize idx := 0 diff --git a/pool_test.go b/pool_test.go index 6d3bcb8..00a0fcd 100644 --- a/pool_test.go +++ b/pool_test.go @@ -10,21 +10,21 @@ func TestIndex(t *testing.T) { testIndex(t, 0, 0) testIndex(t, 1, 0) - testIndex(t, minSize-1, 0) - testIndex(t, minSize, 0) - testIndex(t, minSize+1, 1) + testIndex(t, defaultMinSize-1, 0) + testIndex(t, defaultMinSize, 0) + testIndex(t, defaultMinSize+1, 1) - testIndex(t, 2*minSize-1, 1) - testIndex(t, 2*minSize, 1) - testIndex(t, 2*minSize+1, 2) + testIndex(t, 2*defaultMinSize-1, 1) + testIndex(t, 2*defaultMinSize, 1) + testIndex(t, 2*defaultMinSize+1, 2) - testIndex(t, maxSize-1, steps-1) - testIndex(t, maxSize, steps-1) - testIndex(t, maxSize+1, steps-1) + testIndex(t, defaultMaxSize-1, steps-1) + testIndex(t, defaultMaxSize, steps-1) + testIndex(t, defaultMaxSize+1, steps-1) } func testIndex(t *testing.T, n, expectedIdx int) { - idx := index(n) + idx := index(defaultMinBitSize, n) if idx != expectedIdx { t.Fatalf("unexpected idx for n=%d: %d. Expecting %d", n, idx, expectedIdx) } @@ -92,3 +92,48 @@ func allocNBytes(dst []byte, n int) []byte { } return append(dst, make([]byte, diff)...) } + +func TestPoolGetLenVariousSizesSerial(t *testing.T) { + testPoolGetLenVariousSizesSerial(t) +} + +func testPoolGetLenVariousSizesSerial(t *testing.T) { + for i := 0; i < steps+1; i++ { + n := (1 << uint32(i)) + + testGetLenPut(t, n) + testGetLenPut(t, n+1) + testGetLenPut(t, n-1) + + for j := 0; j < 10; j++ { + testGetLenPut(t, j+n) + } + } +} + +func testGetLenPut(t *testing.T, n int) { + bb := GetLen(n) + if len(bb.B) != n { + t.Fatalf("wrong len returned by GetLen %d", n) + } + bb.B = allocNBytes(bb.B, n) + Put(bb) +} + +func TestPoolGetLenVariousSizesConcurrent(t *testing.T) { + concurrency := 5 + ch := make(chan struct{}) + for i := 0; i < concurrency; i++ { + go func() { + testPoolGetLenVariousSizesSerial(t) + ch <- struct{}{} + }() + } + for i := 0; i < concurrency; i++ { + select { + case <-ch: + case <-time.After(3 * time.Second): + t.Fatalf("timeout") + } + } +}