Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
PapaCharlie committed Jul 23, 2024
1 parent be4d4ab commit 5624e37
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 62 deletions.
24 changes: 16 additions & 8 deletions mem/buffer_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,21 @@ func DefaultBufferPool() BufferPool {

// SetDefaultBufferPoolForTesting updates the default buffer pool, for testing
// purposes.
//
// # Testing Only
//
// This function should ONLY be used for testing purposes.
func SetDefaultBufferPoolForTesting(pool BufferPool) {
defaultBufferPool.Store(&pool)

Check warning on line 67 in mem/buffer_pool.go

View check run for this annotation

Codecov / codecov/patch

mem/buffer_pool.go#L66-L67

Added lines #L66 - L67 were not covered by tests
}

// NewBufferPool returns a BufferPool implementation that uses multiple
// underlying pools of the given pool sizes. When a buffer is requested from the
// returned pool, it will have a
// underlying pools of the given pool sizes.
func NewBufferPool(poolSizes ...int) BufferPool {
sort.Ints(poolSizes)
pools := make([]*sizedBufferPool, len(poolSizes))
for i, s := range poolSizes {
pools[i] = newBufferPool(s)
pools[i] = newSizedBufferPool(s)
}
return &tieredBufferPool{
sizedPools: pools,
Expand Down Expand Up @@ -105,10 +108,13 @@ func (p *tieredBufferPool) getPool(size int) BufferPool {
}

// sizedBufferPool is a BufferPool implementation that is optimized for specific
// buffer sizes. For example, HTTP/2 frames within grpc are always 16kb and a
// sizedBufferPool can be configured to only return buffers with a capacity of
// 16kb. Note that however it does not support returning larger buffers and in
// fact panics if such a buffer is requested.
// buffer sizes. For example, HTTP/2 frames within gRPC have a default max size
// of 16kb and a sizedBufferPool can be configured to only return buffers with a
// capacity of 16kb. Note that however it does not support returning larger
// buffers and in fact panics if such a buffer is requested. Because of this,
// this BufferPool implementation is not meant to be used on its own and rather
// is intended to be embedded in a tieredBufferPool such that Get is only invoked
// when the required size is smaller than or equal to defaultSize.
type sizedBufferPool struct {
pool sync.Pool
defaultSize int
Expand All @@ -130,7 +136,7 @@ func (p *sizedBufferPool) Put(buf []byte) {
p.pool.Put(&buf)
}

func newBufferPool(size int) *sizedBufferPool {
func newSizedBufferPool(size int) *sizedBufferPool {
return &sizedBufferPool{
pool: sync.Pool{
New: func() any {
Expand Down Expand Up @@ -158,6 +164,8 @@ func (p *simpleBufferPool) Get(size int) []byte {
return (*bs)[:size]

Check warning on line 164 in mem/buffer_pool.go

View check run for this annotation

Codecov / codecov/patch

mem/buffer_pool.go#L164

Added line #L164 was not covered by tests
}

// A buffer was pulled from the pool, but it is tool small. Put it back in the
// pool and create one large enough.
if ok {
p.pool.Put(bs)

Check warning on line 170 in mem/buffer_pool.go

View check run for this annotation

Codecov / codecov/patch

mem/buffer_pool.go#L170

Added line #L170 was not covered by tests
}
Expand Down
29 changes: 15 additions & 14 deletions mem/buffer_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,23 @@
*
*/

package mem
package mem_test

import "testing"
import (
"testing"

func TestSharedBufferPool(t *testing.T) {
pools := []BufferPool{
NopBufferPool{},
NewBufferPool(defaultBufferPoolSizes...),
"google.golang.org/grpc/mem"
)

func (s) TestBufferPool(t *testing.T) {
var poolSizes = []int{4, 8, 16, 32}
pools := []mem.BufferPool{
mem.NopBufferPool{},
mem.NewBufferPool(poolSizes...),
}

testSizes := append(defaultBufferPoolSizes, 1<<20+1)
testSizes := append([]int{1}, poolSizes...)
testSizes = append(testSizes, 64)

for _, p := range pools {
for _, l := range testSizes {
Expand All @@ -40,13 +46,8 @@ func TestSharedBufferPool(t *testing.T) {
}
}

func TestTieredBufferPool(t *testing.T) {
pool := &tieredBufferPool{
sizedPools: []*sizedBufferPool{
newBufferPool(10),
newBufferPool(20),
},
}
func (s) TestBufferPoolIgnoresShortBuffers(t *testing.T) {
pool := mem.NewBufferPool(10, 20)
buf := pool.Get(1)
if cap(buf) != 10 {
t.Fatalf("Unexpected buffer capacity: %d", cap(buf))
Expand Down
53 changes: 40 additions & 13 deletions mem/buffer_slice.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,35 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package mem

import (
"io"
)

// BufferSlice offers a means to represent data that spans one or more Buffer
// instances.
// instances. A BufferSlice is meant to be immutable after creation, and methods
// like Ref create and return copies of the slice. This is why all methods have
// value receivers rather than pointer receivers. Note that any of the methods
// that read the underlying buffers such as Ref, Len or CopyTo etc., will panic
// if any underlying buffers have already been freed. It is recommended to not
// directly interact with any of the underlying buffers directly, rather such
// interactions should be mediated through the various methods on this type.
type BufferSlice []*Buffer

// Len returns the sum of the length of all the Buffers in this slice.
Expand All @@ -32,7 +56,7 @@ func (s BufferSlice) Ref() BufferSlice {
return out

Check warning on line 56 in mem/buffer_slice.go

View check run for this annotation

Codecov / codecov/patch

mem/buffer_slice.go#L56

Added line #L56 was not covered by tests
}

// Free invokes Buffer.Free on each Buffer in the slice.
// Free invokes Buffer.Free() on each Buffer in the slice.
func (s BufferSlice) Free() {
for _, b := range s {
b.Free()

Check warning on line 62 in mem/buffer_slice.go

View check run for this annotation

Codecov / codecov/patch

mem/buffer_slice.go#L60-L62

Added lines #L60 - L62 were not covered by tests
Expand Down Expand Up @@ -82,14 +106,17 @@ var _ io.ReadCloser = (*Reader)(nil)

// Reader exposes a BufferSlice's data as an io.Reader, allowing it to interface
// with other parts systems. It also provides an additional convenience method
// Remaining which returns the number of unread bytes remaining in the slice.
// Remaining(), which returns the number of unread bytes remaining in the slice.
//
// Note that reading data from the reader does not free the underlying buffers!
// Only calling Close once all data is read will free the buffers.
type Reader struct {
data BufferSlice
len int
dataIdx, idx int
data BufferSlice
len int
// The index of the current Buffer being read in data.
dataIdx int
// The index into data[dataIdx].ReadOnlyData().
bufferIdx int
}

// Remaining returns the number of unread bytes remaining in the slice.
Expand All @@ -105,26 +132,26 @@ func (r *Reader) Close() error {
}

func (r *Reader) Read(buf []byte) (n int, _ error) {
if r.len == 0 {
return 0, io.EOF

Check warning on line 136 in mem/buffer_slice.go

View check run for this annotation

Codecov / codecov/patch

mem/buffer_slice.go#L134-L136

Added lines #L134 - L136 were not covered by tests
}

for len(buf) != 0 && r.len != 0 {
data := r.data[r.dataIdx].ReadOnlyData()
copied := copy(buf, data[r.idx:])
copied := copy(buf, data[r.bufferIdx:])
r.len -= copied

Check warning on line 142 in mem/buffer_slice.go

View check run for this annotation

Codecov / codecov/patch

mem/buffer_slice.go#L139-L142

Added lines #L139 - L142 were not covered by tests

buf = buf[copied:]

Check warning on line 144 in mem/buffer_slice.go

View check run for this annotation

Codecov / codecov/patch

mem/buffer_slice.go#L144

Added line #L144 was not covered by tests

if copied == len(data) {
r.dataIdx++
r.idx = 0
r.bufferIdx = 0
} else {
r.idx += copied
r.bufferIdx += copied

Check warning on line 150 in mem/buffer_slice.go

View check run for this annotation

Codecov / codecov/patch

mem/buffer_slice.go#L146-L150

Added lines #L146 - L150 were not covered by tests
}
n += copied

Check warning on line 152 in mem/buffer_slice.go

View check run for this annotation

Codecov / codecov/patch

mem/buffer_slice.go#L152

Added line #L152 was not covered by tests
}

if n == 0 {
return 0, io.EOF
}

return n, nil

Check warning on line 155 in mem/buffer_slice.go

View check run for this annotation

Codecov / codecov/patch

mem/buffer_slice.go#L155

Added line #L155 was not covered by tests
}

Expand Down
32 changes: 13 additions & 19 deletions mem/buffers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,17 @@ import (
type Buffer struct {
data []byte
refs *atomic.Int32
free func([]byte)
free func()
freed bool
}

// NewBuffer creates a new buffer from the given data, initializing the reference
// counter to 1. Note that the given data is not copied.
func NewBuffer(data []byte, free func([]byte)) *Buffer {
b := &Buffer{data: data, refs: new(atomic.Int32), free: free}
b := &Buffer{data: data, refs: new(atomic.Int32)}
if free != nil {
b.free = func() { free(data) }
}
b.refs.Add(1)
return b
}
Expand Down Expand Up @@ -104,7 +107,7 @@ func (b *Buffer) Free() {
b.freed = true
refs := b.refs.Add(-1)
if refs == 0 && b.free != nil {
b.free(b.data)
b.free()
}
b.data = nil
}
Expand All @@ -130,27 +133,18 @@ func (b *Buffer) Split(n int) *Buffer {

b.refs.Add(1)

data := b.data
var free func([]byte)
if f := b.free; f != nil {
free = func(_ []byte) {
f(data)
}
split := &Buffer{
refs: b.refs,
free: b.free,
}

b.data = data[:n]
b.free = free
b.data, split.data = b.data[:n], b.data[n:]

return &Buffer{
data: data[n:],
refs: b.refs,
free: free,
}
return split
}

// String returns a string representation of the buffer that contains the address
// of the Buffer, the address of the first byte of ReadOnlyData (used to uniquely
// identify the underlying buffer) and the length of the data.
// String returns a string representation of the buffer. May be used for
// debugging purposes.
func (b *Buffer) String() string {
return fmt.Sprintf("mem.Buffer(%p, data: %p, length: %d)", b, b.ReadOnlyData(), len(b.ReadOnlyData()))

Check warning on line 149 in mem/buffers.go

View check run for this annotation

Codecov / codecov/patch

mem/buffers.go#L148-L149

Added lines #L148 - L149 were not covered by tests
}
50 changes: 42 additions & 8 deletions mem/buffers_test.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,73 @@
package mem
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package mem_test

import (
"bytes"
"testing"

"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/mem"
)

func TestSplit(t *testing.T) {
type s struct {
grpctest.Tester
}

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

func (s) TestSplit(t *testing.T) {
ready := false
freed := false
data := []byte{1, 2, 3, 4}
buf := NewBuffer(data, func(bytes []byte) {
buf := mem.NewBuffer(data, func(bytes []byte) {
if !ready {
t.Fatalf("Freed too early")
}
freed = true
})
checkBufData := func(b *Buffer, expected []byte) {
checkBufData := func(b *mem.Buffer, expected []byte) {
if !bytes.Equal(b.ReadOnlyData(), expected) {
t.Fatalf("Buffer did not contain expected data %v, got %v", expected, b.ReadOnlyData())
}
}

ref1 := buf.Ref()

split := buf.Split(2)
split1 := buf.Split(2)
checkBufData(buf, data[:2])
checkBufData(split, data[2:])
checkBufData(split1, data[2:])
checkBufData(ref1, data)

splitRef := split.Ref()
split2 := split1.Split(1)
checkBufData(split1, data[2:3])
checkBufData(split2, data[3:])

splitRef := split1.Ref()
ref2 := buf.Ref()
split.Free()
split1.Free()
buf.Free()
ref1.Free()
splitRef.Free()
split2.Free()

ready = true
ref2.Free()
Expand Down

0 comments on commit 5624e37

Please sign in to comment.