Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Autotuning low and high bins' limits. #13

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
7 changes: 5 additions & 2 deletions bytebuffer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package bytebufferpool

import "io"
import (
"io"
"unsafe"
)

// ByteBuffer provides byte buffer, which can be used for minimizing
// memory allocations.
Expand Down Expand Up @@ -102,7 +105,7 @@ func (b *ByteBuffer) SetString(s string) {

// String returns string representation of ByteBuffer.B.
func (b *ByteBuffer) String() string {
return string(b.B)
return *(*string)(unsafe.Pointer(&b.B))
}

// Reset makes ByteBuffer.B empty.
Expand Down
2 changes: 1 addition & 1 deletion bytebuffer_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package bytebufferpool_test
import (
"fmt"

"github.com/valyala/bytebufferpool"
"github.com/gallir/bytebufferpool"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert this

)

func ExampleByteBuffer() {
Expand Down
41 changes: 41 additions & 0 deletions bytebuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func TestByteBufferGetPutSerial(t *testing.T) {
testByteBufferGetPut(t)
}

func TestByteBufferGetLenPutSerial(t *testing.T) {
testByteBufferGetLenPut(t)
}

func TestByteBufferGetPutConcurrent(t *testing.T) {
concurrency := 10
ch := make(chan struct{}, concurrency)
Expand All @@ -89,6 +93,25 @@ func TestByteBufferGetPutConcurrent(t *testing.T) {
}
}

func TestByteBufferGetLenPutConcurrent(t *testing.T) {
concurrency := 10
ch := make(chan struct{}, concurrency)
for i := 0; i < concurrency; i++ {
go func() {
testByteBufferGetLenPut(t)
ch <- struct{}{}
}()
}

for i := 0; i < concurrency; i++ {
select {
case <-ch:
case <-time.After(time.Second):
t.Fatalf("timeout!")
}
}
}

func testByteBufferGetPut(t *testing.T) {
for i := 0; i < 10; i++ {
expectedS := fmt.Sprintf("num %d", i)
Expand All @@ -102,6 +125,24 @@ func testByteBufferGetPut(t *testing.T) {
}
}

func testByteBufferGetLenPut(t *testing.T) {
bytes := []byte("test len ")
for i := 0; i < 10; i++ {
expectedS := fmt.Sprintf("%s num %d", string(bytes), i)
b := GetLen(len(bytes))
if len(b.B) != len(bytes) {
t.Fatalf("unexpected len: %d. Expecting %d", len(b.B), len(bytes))
}
copy(b.B, bytes)
b.B = append(b.B, " num "...)
b.B = append(b.B, fmt.Sprintf("%d", i)...)
if string(b.B) != expectedS {
t.Fatalf("unexpected result: %q. Expecting %q", b.B, expectedS)
}
Put(b)
}
}

func testByteBufferGetString(t *testing.T) {
for i := 0; i < 10; i++ {
expectedS := fmt.Sprintf("num %d", i)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/valyala/bytebufferpool
module github.com/gallir/bytebufferpool

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert this

go 1.12
91 changes: 82 additions & 9 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
)

const (
minBitSize = 6 // 2**6=64 is a CPU cache line size
steps = 20
defaultMinBitSize = 6 // 2**6=64 is a CPU cache line size
steps = 20

minSize = 1 << minBitSize
maxSize = 1 << (minBitSize + steps - 1)
defaultMinSize = 1 << defaultMinBitSize
defaultMaxSize = 1 << (defaultMinBitSize + steps - 1)

calibrateCallsThreshold = 42000
maxPercentile = 0.95
Expand All @@ -29,6 +29,9 @@ type Pool struct {
defaultSize uint64
maxSize uint64

minBitSize uint64
minSize uint64

pool sync.Pool
}

Expand All @@ -48,13 +51,56 @@ func Get() *ByteBuffer { return defaultPool.Get() }
func (p *Pool) Get() *ByteBuffer {
v := p.pool.Get()
if v != nil {
return v.(*ByteBuffer)
b := v.(*ByteBuffer)
b.Reset()
return b
}
return &ByteBuffer{
B: make([]byte, 0, atomic.LoadUint64(&p.defaultSize)),
}
}

// GetLen returns a buufer with its

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buffer

// []byte slice of the exact len as specified
//
// The byte buffer may be returned to the pool via Put after the use
// in order to minimize GC overhead.
func GetLen(s int) *ByteBuffer { return defaultPool.GetLen(s) }

// GetLen return a buufer with its

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buffer

// []byte slice of the exact len as specified
//
// The byte buffer may be returned to the pool via Put after the use
// in order to minimize GC overhead.
func (p *Pool) GetLen(s int) *ByteBuffer {
v := p.pool.Get()
if v == nil {
size := int(p.minSize << uint(index(p.minBitSize, s)))
if size < s {
size = s
}
return &ByteBuffer{
B: make([]byte, s, size),
}
}

b := v.(*ByteBuffer)
if cap(b.B) >= s {
b.B = b.B[:s]
return b
}

// The size is smaller, return it to the pool and create another one
p.pool.Put(b)
size := int(p.minSize << uint(index(p.minBitSize, s)))
if size < s {
size = s
}
return &ByteBuffer{
B: make([]byte, s, size),
}
}

// Put returns byte buffer to the pool.
//
// ByteBuffer.B mustn't be touched after returning it to the pool.
Expand All @@ -65,15 +111,18 @@ func Put(b *ByteBuffer) { defaultPool.Put(b) }
//
// The buffer mustn't be accessed after returning to the pool.
func (p *Pool) Put(b *ByteBuffer) {
idx := index(len(b.B))
if p.minBitSize == 0 {
p.initBins()
}

idx := index(p.minBitSize, len(b.B))

if atomic.AddUint64(&p.calls[idx], 1) > calibrateCallsThreshold {
p.calibrate()
}

maxSize := int(atomic.LoadUint64(&p.maxSize))
if maxSize == 0 || cap(b.B) <= maxSize {
b.Reset()
p.pool.Put(b)
}
}
Expand All @@ -83,16 +132,30 @@ func (p *Pool) calibrate() {
return
}

if p.minBitSize == 0 {
p.initBins()
}

a := make(callSizes, 0, steps)
var callsSum uint64
for i := uint64(0); i < steps; i++ {
calls := atomic.SwapUint64(&p.calls[i], 0)
callsSum += calls
a = append(a, callSize{
calls: calls,
size: minSize << i,
size: p.minSize << i,
})
}
if p.minBitSize+steps < 32 && a[steps-1].calls > a[0].calls {
// Increase the first bin's size
p.resizeBins(p.minBitSize + 1)
} else if p.minBitSize > defaultMinBitSize &&
a[0].calls > 0 &&
a[steps-2].calls == 0 &&
a[steps-1].calls == 0 {
// Decrease the size of first bin's size
p.resizeBins(p.minBitSize - 1)
}
sort.Sort(a)

defaultSize := a[0].size
Expand All @@ -117,6 +180,16 @@ func (p *Pool) calibrate() {
atomic.StoreUint64(&p.calibrating, 0)
}

func (p *Pool) resizeBins(minBitSize uint64) {
atomic.StoreUint64(&p.minBitSize, minBitSize)
atomic.StoreUint64(&p.minSize, 1<<minBitSize)
}

func (p *Pool) initBins() {
atomic.StoreUint64(&p.minBitSize, defaultMinBitSize)
atomic.StoreUint64(&p.minSize, 1<<defaultMinBitSize)
}

type callSize struct {
calls uint64
size uint64
Expand All @@ -136,7 +209,7 @@ func (ci callSizes) Swap(i, j int) {
ci[i], ci[j] = ci[j], ci[i]
}

func index(n int) int {
func index(minBitSize uint64, n int) int {
n--
n >>= minBitSize
idx := 0
Expand Down
65 changes: 55 additions & 10 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@ func TestIndex(t *testing.T) {
testIndex(t, 0, 0)
testIndex(t, 1, 0)

testIndex(t, minSize-1, 0)
testIndex(t, minSize, 0)
testIndex(t, minSize+1, 1)
testIndex(t, defaultMinSize-1, 0)
testIndex(t, defaultMinSize, 0)
testIndex(t, defaultMinSize+1, 1)

testIndex(t, 2*minSize-1, 1)
testIndex(t, 2*minSize, 1)
testIndex(t, 2*minSize+1, 2)
testIndex(t, 2*defaultMinSize-1, 1)
testIndex(t, 2*defaultMinSize, 1)
testIndex(t, 2*defaultMinSize+1, 2)

testIndex(t, maxSize-1, steps-1)
testIndex(t, maxSize, steps-1)
testIndex(t, maxSize+1, steps-1)
testIndex(t, defaultMaxSize-1, steps-1)
testIndex(t, defaultMaxSize, steps-1)
testIndex(t, defaultMaxSize+1, steps-1)
}

func testIndex(t *testing.T, n, expectedIdx int) {
idx := index(n)
idx := index(defaultMinBitSize, n)
if idx != expectedIdx {
t.Fatalf("unexpected idx for n=%d: %d. Expecting %d", n, idx, expectedIdx)
}
Expand Down Expand Up @@ -92,3 +92,48 @@ func allocNBytes(dst []byte, n int) []byte {
}
return append(dst, make([]byte, diff)...)
}

func TestPoolGetLenVariousSizesSerial(t *testing.T) {
testPoolGetLenVariousSizesSerial(t)
}

func testPoolGetLenVariousSizesSerial(t *testing.T) {
for i := 0; i < steps+1; i++ {
n := (1 << uint32(i))

testGetLenPut(t, n)
testGetLenPut(t, n+1)
testGetLenPut(t, n-1)

for j := 0; j < 10; j++ {
testGetLenPut(t, j+n)
}
}
}

func testGetLenPut(t *testing.T, n int) {
bb := GetLen(n)
if len(bb.B) != n {
t.Fatalf("wrong len returned by GetLen %d", n)
}
bb.B = allocNBytes(bb.B, n)
Put(bb)
}

func TestPoolGetLenVariousSizesConcurrent(t *testing.T) {
concurrency := 5
ch := make(chan struct{})
for i := 0; i < concurrency; i++ {
go func() {
testPoolGetLenVariousSizesSerial(t)
ch <- struct{}{}
}()
}
for i := 0; i < concurrency; i++ {
select {
case <-ch:
case <-time.After(3 * time.Second):
t.Fatalf("timeout")
}
}
}