Skip to content

Commit

Permalink
feat(consumer): use buffer pools for decompression
Browse files Browse the repository at this point in the history
Signed-off-by: Ronan Harmegnies <[email protected]>
  • Loading branch information
ronanh authored and dnwe committed Jul 24, 2023
1 parent 63ff8d1 commit cf5b296
Showing 1 changed file with 43 additions and 6 deletions.
49 changes: 43 additions & 6 deletions decompress.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"compress/gzip"
"fmt"
"io"
"sync"

snappy "github.com/eapache/go-xerial-snappy"
Expand All @@ -19,6 +18,19 @@ var (
}

gzipReaderPool sync.Pool

bufferPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}

bytesPool = sync.Pool{
New: func() interface{} {
res := make([]byte, 0, 4096)
return &res
},
}
)

func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
Expand All @@ -38,9 +50,17 @@ func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
return nil, err
}

defer gzipReaderPool.Put(reader)
buffer := bufferPool.Get().(*bytes.Buffer)
_, err = buffer.ReadFrom(reader)
// copy the buffer to a new slice with the correct length
// reuse gzipReader and buffer
gzipReaderPool.Put(reader)
res := make([]byte, buffer.Len())
copy(res, buffer.Bytes())
buffer.Reset()
bufferPool.Put(buffer)

return io.ReadAll(reader)
return res, err
case CompressionSnappy:
return snappy.Decode(data)
case CompressionLZ4:
Expand All @@ -50,11 +70,28 @@ func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
} else {
reader.Reset(bytes.NewReader(data))
}
defer lz4ReaderPool.Put(reader)
buffer := bufferPool.Get().(*bytes.Buffer)
_, err := buffer.ReadFrom(reader)
// copy the buffer to a new slice with the correct length
// reuse lz4Reader and buffer
lz4ReaderPool.Put(reader)
res := make([]byte, buffer.Len())
copy(res, buffer.Bytes())
buffer.Reset()
bufferPool.Put(buffer)

return io.ReadAll(reader)
return res, err
case CompressionZSTD:
return zstdDecompress(ZstdDecoderParams{}, nil, data)
buffer := *bytesPool.Get().(*[]byte)
var err error
buffer, err = zstdDecompress(ZstdDecoderParams{}, buffer, data)
// copy the buffer to a new slice with the correct length and reuse buffer
res := make([]byte, len(buffer))
copy(res, buffer)
buffer = buffer[:0]
bytesPool.Put(&buffer)

return res, err
default:
return nil, PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", cc)}
}
Expand Down

0 comments on commit cf5b296

Please sign in to comment.