diff --git a/compress.go b/compress.go index 94b716e4b..9247c3553 100644 --- a/compress.go +++ b/compress.go @@ -68,7 +68,7 @@ func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) { } return buf.Bytes(), nil case CompressionZSTD: - return zstdCompressLevel(nil, data, level) + return zstdCompress(nil, data) default: return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)} } diff --git a/go.mod b/go.mod index d072c8e78..8ceacd9d6 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,6 @@ module github.com/Shopify/sarama require ( - github.com/DataDog/zstd v1.4.0 github.com/Shopify/toxiproxy v2.1.4+incompatible github.com/davecgh/go-spew v1.1.1 github.com/eapache/go-resiliency v1.1.0 @@ -12,6 +11,7 @@ require ( github.com/golang/snappy v0.0.1 // indirect github.com/hashicorp/go-uuid v1.0.1 // indirect github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 // indirect + github.com/klauspost/compress v1.8.1 github.com/pierrec/lz4 v2.2.6+incompatible github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a github.com/stretchr/testify v1.3.0 diff --git a/zstd.go b/zstd.go new file mode 100644 index 000000000..bdb21b33c --- /dev/null +++ b/zstd.go @@ -0,0 +1,28 @@ +package sarama + +import ( + "github.com/klauspost/compress/zstd" + "sync" +) + +var ( + zstdDec *zstd.Decoder + zstdEnc *zstd.Encoder + + zstdEncOnce, zstdDecOnce sync.Once +) + + +func zstdDecompress(dst, src []byte) ([]byte, error) { + zstdDecOnce.Do(func() { + zstdDec, _ = zstd.NewReader(nil) + }) + return zstdDec.DecodeAll(src, dst) +} + +func zstdCompress(dst, src []byte) ([]byte, error) { + zstdEncOnce.Do(func() { + zstdEnc, _ = zstd.NewWriter(nil) + }) + return zstdEnc.EncodeAll(src, dst), nil +} diff --git a/zstd_cgo.go b/zstd_cgo.go deleted file mode 100644 index f5ccb31a1..000000000 --- a/zstd_cgo.go +++ /dev/null @@ -1,13 +0,0 @@ -// +build cgo - -package sarama - -import "github.com/DataDog/zstd" - -func zstdDecompress(dst, src []byte) ([]byte, error) { - return zstd.Decompress(dst, src) -} - -func zstdCompressLevel(dst, src []byte, level int) ([]byte, error) { - return zstd.CompressLevel(dst, src, level) -} diff --git a/zstd_fallback.go b/zstd_fallback.go deleted file mode 100644 index 381a56bdc..000000000 --- a/zstd_fallback.go +++ /dev/null @@ -1,17 +0,0 @@ -// +build !cgo - -package sarama - -import ( - "errors" -) - -var errZstdCgo = errors.New("zstd compression requires building with cgo enabled") - -func zstdDecompress(dst, src []byte) ([]byte, error) { - return nil, errZstdCgo -} - -func zstdCompressLevel(dst, src []byte, level int) ([]byte, error) { - return nil, errZstdCgo -}