Skip to content

Commit

Permalink
Merge pull request #2025 from Shopify/dnwe/apiversionsv3
Browse files Browse the repository at this point in the history
feat: support ApiVersionsRequest V3 protocol
  • Loading branch information
dnwe authored Sep 15, 2021
2 parents 12c9785 + 0b24192 commit 74aaffe
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 64 deletions.
61 changes: 51 additions & 10 deletions api_versions_request.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,69 @@
package sarama

// ApiVersionsRequest ...
type ApiVersionsRequest struct{}
// const defaultClientSoftwareName = "sarama"

type ApiVersionsRequest struct {
// Version defines the protocol version to use for encode and decode
Version int16
// ClientSoftwareName contains the name of the client.
ClientSoftwareName string
// ClientSoftwareVersion contains the version of the client.
ClientSoftwareVersion string
}

func (r *ApiVersionsRequest) encode(pe packetEncoder) (err error) {
if r.Version >= 3 {
if err := pe.putCompactString(r.ClientSoftwareName); err != nil {
return err
}
if err := pe.putCompactString(r.ClientSoftwareVersion); err != nil {
return err
}
pe.putEmptyTaggedFieldArray()
}

func (a *ApiVersionsRequest) encode(pe packetEncoder) error {
return nil
}

func (a *ApiVersionsRequest) decode(pd packetDecoder, version int16) (err error) {
func (r *ApiVersionsRequest) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
if r.Version >= 3 {
if r.ClientSoftwareName, err = pd.getCompactString(); err != nil {
return err
}
if r.ClientSoftwareVersion, err = pd.getCompactString(); err != nil {
return err
}
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}

return nil
}

func (a *ApiVersionsRequest) key() int16 {
func (r *ApiVersionsRequest) key() int16 {
return 18
}

func (a *ApiVersionsRequest) version() int16 {
return 0
func (r *ApiVersionsRequest) version() int16 {
return r.Version
}

func (a *ApiVersionsRequest) headerVersion() int16 {
func (r *ApiVersionsRequest) headerVersion() int16 {
if r.Version >= 3 {
return 2
}
return 1
}

func (a *ApiVersionsRequest) requiredVersion() KafkaVersion {
return V0_10_0_0
func (r *ApiVersionsRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 0:
return V0_10_0_0
case 3:
return V2_4_0_0
default:
return V0_10_0_0
}
}
18 changes: 17 additions & 1 deletion api_versions_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,25 @@ package sarama

import "testing"

var apiVersionRequest []byte
var (
apiVersionRequest []byte

apiVersionRequestV3 = []byte{
0x07, 's', 'a', 'r', 'a', 'm', 'a',
0x07, '0', '.', '1', '0', '.', '0',
0x00,
}
)

func TestApiVersionsRequest(t *testing.T) {
request := new(ApiVersionsRequest)
testRequest(t, "basic", request, apiVersionRequest)
}

func TestApiVersionsRequestV3(t *testing.T) {
request := new(ApiVersionsRequest)
request.Version = 3
request.ClientSoftwareName = "sarama"
request.ClientSoftwareVersion = "0.10.0"
testRequest(t, "v3", request, apiVersionRequestV3)
}
137 changes: 100 additions & 37 deletions api_versions_response.go
Original file line number Diff line number Diff line change
@@ -1,76 +1,130 @@
package sarama

// ApiVersionsResponseBlock is an api version response block type
type ApiVersionsResponseBlock struct {
ApiKey int16
// ApiVersionsResponseKey contains the APIs supported by the broker.
type ApiVersionsResponseKey struct {
// Version defines the protocol version to use for encode and decode
Version int16
// ApiKey contains the API index.
ApiKey int16
// MinVersion contains the minimum supported version, inclusive.
MinVersion int16
// MaxVersion contains the maximum supported version, inclusive.
MaxVersion int16
}

func (b *ApiVersionsResponseBlock) encode(pe packetEncoder) error {
pe.putInt16(b.ApiKey)
pe.putInt16(b.MinVersion)
pe.putInt16(b.MaxVersion)
func (a *ApiVersionsResponseKey) encode(pe packetEncoder, version int16) (err error) {
a.Version = version
pe.putInt16(a.ApiKey)

pe.putInt16(a.MinVersion)

pe.putInt16(a.MaxVersion)

if version >= 3 {
pe.putEmptyTaggedFieldArray()
}

return nil
}

func (b *ApiVersionsResponseBlock) decode(pd packetDecoder) error {
var err error

if b.ApiKey, err = pd.getInt16(); err != nil {
func (a *ApiVersionsResponseKey) decode(pd packetDecoder, version int16) (err error) {
a.Version = version
if a.ApiKey, err = pd.getInt16(); err != nil {
return err
}

if b.MinVersion, err = pd.getInt16(); err != nil {
if a.MinVersion, err = pd.getInt16(); err != nil {
return err
}

if b.MaxVersion, err = pd.getInt16(); err != nil {
if a.MaxVersion, err = pd.getInt16(); err != nil {
return err
}

if version >= 3 {
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}

return nil
}

// ApiVersionsResponse is an api version response type
type ApiVersionsResponse struct {
Err KError
ApiVersions []*ApiVersionsResponseBlock
// Version defines the protocol version to use for encode and decode
Version int16
// ErrorCode contains the top-level error code.
ErrorCode int16
// ApiKeys contains the APIs supported by the broker.
ApiKeys []ApiVersionsResponseKey
// ThrottleTimeMs contains the duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
ThrottleTimeMs int32
}

func (r *ApiVersionsResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(r.Err))
if err := pe.putArrayLength(len(r.ApiVersions)); err != nil {
return err
func (r *ApiVersionsResponse) encode(pe packetEncoder) (err error) {
pe.putInt16(r.ErrorCode)

if r.Version >= 3 {
pe.putCompactArrayLength(len(r.ApiKeys))
} else {
if err := pe.putArrayLength(len(r.ApiKeys)); err != nil {
return err
}
}
for _, apiVersion := range r.ApiVersions {
if err := apiVersion.encode(pe); err != nil {
for _, block := range r.ApiKeys {
if err := block.encode(pe, r.Version); err != nil {
return err
}
}

if r.Version >= 1 {
pe.putInt32(r.ThrottleTimeMs)
}

if r.Version >= 3 {
pe.putEmptyTaggedFieldArray()
}

return nil
}

func (r *ApiVersionsResponse) decode(pd packetDecoder, version int16) error {
kerr, err := pd.getInt16()
if err != nil {
func (r *ApiVersionsResponse) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
if r.ErrorCode, err = pd.getInt16(); err != nil {
return err
}

r.Err = KError(kerr)
var numApiKeys int
if r.Version >= 3 {
numApiKeys, err = pd.getCompactArrayLength()
if err != nil {
return err
}
} else {
numApiKeys, err = pd.getArrayLength()
if err != nil {
return err
}
}
r.ApiKeys = make([]ApiVersionsResponseKey, numApiKeys)
for i := 0; i < numApiKeys; i++ {
var block ApiVersionsResponseKey
if err = block.decode(pd, r.Version); err != nil {
return err
}
r.ApiKeys[i] = block
}

numBlocks, err := pd.getArrayLength()
if err != nil {
return err
if r.Version >= 1 {
if r.ThrottleTimeMs, err = pd.getInt32(); err != nil {
return err
}
}

r.ApiVersions = make([]*ApiVersionsResponseBlock, numBlocks)
for i := 0; i < numBlocks; i++ {
block := new(ApiVersionsResponseBlock)
if err := block.decode(pd); err != nil {
if r.Version >= 3 {
if _, err = pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
r.ApiVersions[i] = block
}

return nil
Expand All @@ -81,13 +135,22 @@ func (r *ApiVersionsResponse) key() int16 {
}

func (r *ApiVersionsResponse) version() int16 {
return 0
return r.Version
}

func (a *ApiVersionsResponse) headerVersion() int16 {
func (r *ApiVersionsResponse) headerVersion() int16 {
// ApiVersionsResponse always includes a v0 header.
// See KIP-511 for details
return 0
}

func (r *ApiVersionsResponse) requiredVersion() KafkaVersion {
return V0_10_0_0
switch r.Version {
case 0:
return V0_10_0_0
case 3:
return V2_4_0_0
default:
return V0_10_0_0
}
}
61 changes: 46 additions & 15 deletions api_versions_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,58 @@ package sarama

import "testing"

var apiVersionResponse = []byte{
0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
0x00, 0x03,
0x00, 0x02,
0x00, 0x01,
}
var (
apiVersionResponse = []byte{
0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
0x00, 0x03,
0x00, 0x02,
0x00, 0x01,
}

apiVersionResponseV3 = []byte{
0x00, 0x00, // no error
0x02, // compact array length 1
0x00, 0x03,
0x00, 0x02,
0x00, 0x01,
0x00, // tagged fields
0x00, 0x00, 0x00, 0x00, // throttle time
0x00, // tagged fields
}
)

func TestApiVersionsResponse(t *testing.T) {
response := new(ApiVersionsResponse)
testVersionDecodable(t, "no error", response, apiVersionResponse, 0)
if response.Err != ErrNoError {
t.Error("Decoding error failed: no error expected but found", response.Err)
if response.ErrorCode != int16(ErrNoError) {
t.Error("Decoding error failed: no error expected but found", response.ErrorCode)
}
if response.ApiKeys[0].ApiKey != 0x03 {
t.Error("Decoding error: expected 0x03 but got", response.ApiKeys[0].ApiKey)
}
if response.ApiKeys[0].MinVersion != 0x02 {
t.Error("Decoding error: expected 0x02 but got", response.ApiKeys[0].MinVersion)
}
if response.ApiKeys[0].MaxVersion != 0x01 {
t.Error("Decoding error: expected 0x01 but got", response.ApiKeys[0].MaxVersion)
}
}

func TestApiVersionsResponseV3(t *testing.T) {
response := new(ApiVersionsResponse)
response.Version = 3
testVersionDecodable(t, "no error", response, apiVersionResponseV3, 3)
if response.ErrorCode != int16(ErrNoError) {
t.Error("Decoding error failed: no error expected but found", response.ErrorCode)
}
if response.ApiVersions[0].ApiKey != 0x03 {
t.Error("Decoding error: expected 0x03 but got", response.ApiVersions[0].ApiKey)
if response.ApiKeys[0].ApiKey != 0x03 {
t.Error("Decoding error: expected 0x03 but got", response.ApiKeys[0].ApiKey)
}
if response.ApiVersions[0].MinVersion != 0x02 {
t.Error("Decoding error: expected 0x02 but got", response.ApiVersions[0].MinVersion)
if response.ApiKeys[0].MinVersion != 0x02 {
t.Error("Decoding error: expected 0x02 but got", response.ApiKeys[0].MinVersion)
}
if response.ApiVersions[0].MaxVersion != 0x01 {
t.Error("Decoding error: expected 0x01 but got", response.ApiVersions[0].MaxVersion)
if response.ApiKeys[0].MaxVersion != 0x01 {
t.Error("Decoding error: expected 0x01 but got", response.ApiKeys[0].MaxVersion)
}
}
Loading

0 comments on commit 74aaffe

Please sign in to comment.