From fab646db792e4f2a74ce2927d820a106531fc968 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Wed, 15 Sep 2021 01:25:03 +0100 Subject: [PATCH] feat: support ApiVersionsRequest V3 protocol Add support for the ApiVersionsRequest V3 protocol which includes ClientSoftwareName and ClientSoftwareVersion as specified by KIP-511 Signed-off-by: Dominic Evans --- api_versions_request.go | 61 ++++++++++++--- api_versions_request_test.go | 18 ++++- api_versions_response.go | 137 +++++++++++++++++++++++++--------- api_versions_response_test.go | 61 +++++++++++---- mockresponses.go | 30 ++++++++ request.go | 2 +- 6 files changed, 245 insertions(+), 64 deletions(-) diff --git a/api_versions_request.go b/api_versions_request.go index bee92c0e7f..e5b3baf646 100644 --- a/api_versions_request.go +++ b/api_versions_request.go @@ -1,28 +1,69 @@ package sarama -// ApiVersionsRequest ... -type ApiVersionsRequest struct{} +const defaultClientSoftwareName = "sarama" + +type ApiVersionsRequest struct { + // Version defines the protocol version to use for encode and decode + Version int16 + // ClientSoftwareName contains the name of the client. + ClientSoftwareName string + // ClientSoftwareVersion contains the version of the client. + ClientSoftwareVersion string +} + +func (r *ApiVersionsRequest) encode(pe packetEncoder) (err error) { + if r.Version >= 3 { + if err := pe.putCompactString(r.ClientSoftwareName); err != nil { + return err + } + if err := pe.putCompactString(r.ClientSoftwareVersion); err != nil { + return err + } + pe.putEmptyTaggedFieldArray() + } -func (a *ApiVersionsRequest) encode(pe packetEncoder) error { return nil } -func (a *ApiVersionsRequest) decode(pd packetDecoder, version int16) (err error) { +func (r *ApiVersionsRequest) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + if r.Version >= 3 { + if r.ClientSoftwareName, err = pd.getCompactString(); err != nil { + return err + } + if r.ClientSoftwareVersion, err = pd.getCompactString(); err != nil { + return err + } + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } + return nil } -func (a *ApiVersionsRequest) key() int16 { +func (r *ApiVersionsRequest) key() int16 { return 18 } -func (a *ApiVersionsRequest) version() int16 { - return 0 +func (r *ApiVersionsRequest) version() int16 { + return r.Version } -func (a *ApiVersionsRequest) headerVersion() int16 { +func (r *ApiVersionsRequest) headerVersion() int16 { + if r.Version >= 3 { + return 2 + } return 1 } -func (a *ApiVersionsRequest) requiredVersion() KafkaVersion { - return V0_10_0_0 +func (r *ApiVersionsRequest) requiredVersion() KafkaVersion { + switch r.Version { + case 0: + return V0_10_0_0 + case 3: + return V2_4_0_0 + default: + return V0_10_0_0 + } } diff --git a/api_versions_request_test.go b/api_versions_request_test.go index 5fd21496a7..371ac96027 100644 --- a/api_versions_request_test.go +++ b/api_versions_request_test.go @@ -2,9 +2,25 @@ package sarama import "testing" -var apiVersionRequest []byte +var ( + apiVersionRequest []byte + + apiVersionRequestV3 = []byte{ + 0x07, 's', 'a', 'r', 'a', 'm', 'a', + 0x07, '0', '.', '1', '0', '.', '0', + 0x00, + } +) func TestApiVersionsRequest(t *testing.T) { request := new(ApiVersionsRequest) testRequest(t, "basic", request, apiVersionRequest) } + +func TestApiVersionsRequestV3(t *testing.T) { + request := new(ApiVersionsRequest) + request.Version = 3 + request.ClientSoftwareName = "sarama" + request.ClientSoftwareVersion = "0.10.0" + testRequest(t, "v3", request, apiVersionRequestV3) +} diff --git a/api_versions_response.go b/api_versions_response.go index 0e72e3926a..ade911c597 100644 --- a/api_versions_response.go +++ b/api_versions_response.go @@ -1,76 +1,130 @@ package sarama -// ApiVersionsResponseBlock is an api version response block type -type ApiVersionsResponseBlock struct { - ApiKey int16 +// ApiVersionsResponseKey contains the APIs supported by the broker. +type ApiVersionsResponseKey struct { + // Version defines the protocol version to use for encode and decode + Version int16 + // ApiKey contains the API index. + ApiKey int16 + // MinVersion contains the minimum supported version, inclusive. MinVersion int16 + // MaxVersion contains the maximum supported version, inclusive. MaxVersion int16 } -func (b *ApiVersionsResponseBlock) encode(pe packetEncoder) error { - pe.putInt16(b.ApiKey) - pe.putInt16(b.MinVersion) - pe.putInt16(b.MaxVersion) +func (a *ApiVersionsResponseKey) encode(pe packetEncoder, version int16) (err error) { + a.Version = version + pe.putInt16(a.ApiKey) + + pe.putInt16(a.MinVersion) + + pe.putInt16(a.MaxVersion) + + if version >= 3 { + pe.putEmptyTaggedFieldArray() + } + return nil } -func (b *ApiVersionsResponseBlock) decode(pd packetDecoder) error { - var err error - - if b.ApiKey, err = pd.getInt16(); err != nil { +func (a *ApiVersionsResponseKey) decode(pd packetDecoder, version int16) (err error) { + a.Version = version + if a.ApiKey, err = pd.getInt16(); err != nil { return err } - if b.MinVersion, err = pd.getInt16(); err != nil { + if a.MinVersion, err = pd.getInt16(); err != nil { return err } - if b.MaxVersion, err = pd.getInt16(); err != nil { + if a.MaxVersion, err = pd.getInt16(); err != nil { return err } + if version >= 3 { + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } + return nil } -// ApiVersionsResponse is an api version response type type ApiVersionsResponse struct { - Err KError - ApiVersions []*ApiVersionsResponseBlock + // Version defines the protocol version to use for encode and decode + Version int16 + // ErrorCode contains the top-level error code. + ErrorCode int16 + // ApiKeys contains the APIs supported by the broker. + ApiKeys []ApiVersionsResponseKey + // ThrottleTimeMs contains the duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota. + ThrottleTimeMs int32 } -func (r *ApiVersionsResponse) encode(pe packetEncoder) error { - pe.putInt16(int16(r.Err)) - if err := pe.putArrayLength(len(r.ApiVersions)); err != nil { - return err +func (r *ApiVersionsResponse) encode(pe packetEncoder) (err error) { + pe.putInt16(r.ErrorCode) + + if r.Version >= 3 { + pe.putCompactArrayLength(len(r.ApiKeys)) + } else { + if err := pe.putArrayLength(len(r.ApiKeys)); err != nil { + return err + } } - for _, apiVersion := range r.ApiVersions { - if err := apiVersion.encode(pe); err != nil { + for _, block := range r.ApiKeys { + if err := block.encode(pe, r.Version); err != nil { return err } } + + if r.Version >= 1 { + pe.putInt32(r.ThrottleTimeMs) + } + + if r.Version >= 3 { + pe.putEmptyTaggedFieldArray() + } + return nil } -func (r *ApiVersionsResponse) decode(pd packetDecoder, version int16) error { - kerr, err := pd.getInt16() - if err != nil { +func (r *ApiVersionsResponse) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + if r.ErrorCode, err = pd.getInt16(); err != nil { return err } - r.Err = KError(kerr) + var numApiKeys int + if r.Version >= 3 { + numApiKeys, err = pd.getCompactArrayLength() + if err != nil { + return err + } + } else { + numApiKeys, err = pd.getArrayLength() + if err != nil { + return err + } + } + r.ApiKeys = make([]ApiVersionsResponseKey, numApiKeys) + for i := 0; i < numApiKeys; i++ { + var block ApiVersionsResponseKey + if err = block.decode(pd, r.Version); err != nil { + return err + } + r.ApiKeys[i] = block + } - numBlocks, err := pd.getArrayLength() - if err != nil { - return err + if r.Version >= 1 { + if r.ThrottleTimeMs, err = pd.getInt32(); err != nil { + return err + } } - r.ApiVersions = make([]*ApiVersionsResponseBlock, numBlocks) - for i := 0; i < numBlocks; i++ { - block := new(ApiVersionsResponseBlock) - if err := block.decode(pd); err != nil { + if r.Version >= 3 { + if _, err = pd.getEmptyTaggedFieldArray(); err != nil { return err } - r.ApiVersions[i] = block } return nil @@ -81,13 +135,22 @@ func (r *ApiVersionsResponse) key() int16 { } func (r *ApiVersionsResponse) version() int16 { - return 0 + return r.Version } -func (a *ApiVersionsResponse) headerVersion() int16 { +func (r *ApiVersionsResponse) headerVersion() int16 { + // ApiVersionsResponse always includes a v0 header. + // See KIP-511 for details return 0 } func (r *ApiVersionsResponse) requiredVersion() KafkaVersion { - return V0_10_0_0 + switch r.Version { + case 0: + return V0_10_0_0 + case 3: + return V2_4_0_0 + default: + return V0_10_0_0 + } } diff --git a/api_versions_response_test.go b/api_versions_response_test.go index e07bf785a3..0baaeb4041 100644 --- a/api_versions_response_test.go +++ b/api_versions_response_test.go @@ -2,27 +2,58 @@ package sarama import "testing" -var apiVersionResponse = []byte{ - 0x00, 0x00, - 0x00, 0x00, 0x00, 0x01, - 0x00, 0x03, - 0x00, 0x02, - 0x00, 0x01, -} +var ( + apiVersionResponse = []byte{ + 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, + 0x00, 0x03, + 0x00, 0x02, + 0x00, 0x01, + } + + apiVersionResponseV3 = []byte{ + 0x00, 0x00, // no error + 0x02, // compact array length 1 + 0x00, 0x03, + 0x00, 0x02, + 0x00, 0x01, + 0x00, // tagged fields + 0x00, 0x00, 0x00, 0x00, // throttle time + 0x00, // tagged fields + } +) func TestApiVersionsResponse(t *testing.T) { response := new(ApiVersionsResponse) testVersionDecodable(t, "no error", response, apiVersionResponse, 0) - if response.Err != ErrNoError { - t.Error("Decoding error failed: no error expected but found", response.Err) + if response.ErrorCode != int16(ErrNoError) { + t.Error("Decoding error failed: no error expected but found", response.ErrorCode) + } + if response.ApiKeys[0].ApiKey != 0x03 { + t.Error("Decoding error: expected 0x03 but got", response.ApiKeys[0].ApiKey) + } + if response.ApiKeys[0].MinVersion != 0x02 { + t.Error("Decoding error: expected 0x02 but got", response.ApiKeys[0].MinVersion) + } + if response.ApiKeys[0].MaxVersion != 0x01 { + t.Error("Decoding error: expected 0x01 but got", response.ApiKeys[0].MaxVersion) + } +} + +func TestApiVersionsResponseV3(t *testing.T) { + response := new(ApiVersionsResponse) + response.Version = 3 + testVersionDecodable(t, "no error", response, apiVersionResponseV3, 3) + if response.ErrorCode != int16(ErrNoError) { + t.Error("Decoding error failed: no error expected but found", response.ErrorCode) } - if response.ApiVersions[0].ApiKey != 0x03 { - t.Error("Decoding error: expected 0x03 but got", response.ApiVersions[0].ApiKey) + if response.ApiKeys[0].ApiKey != 0x03 { + t.Error("Decoding error: expected 0x03 but got", response.ApiKeys[0].ApiKey) } - if response.ApiVersions[0].MinVersion != 0x02 { - t.Error("Decoding error: expected 0x02 but got", response.ApiVersions[0].MinVersion) + if response.ApiKeys[0].MinVersion != 0x02 { + t.Error("Decoding error: expected 0x02 but got", response.ApiKeys[0].MinVersion) } - if response.ApiVersions[0].MaxVersion != 0x01 { - t.Error("Decoding error: expected 0x01 but got", response.ApiVersions[0].MaxVersion) + if response.ApiKeys[0].MaxVersion != 0x01 { + t.Error("Decoding error: expected 0x01 but got", response.ApiKeys[0].MaxVersion) } } diff --git a/mockresponses.go b/mockresponses.go index d9785753c7..a5e94fb39f 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -1300,3 +1300,33 @@ func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithH } return resp } + +type MockApiVersionsResponse struct { + t TestReporter +} + +func NewMockApiVersionsResponse(t TestReporter) *MockApiVersionsResponse { + return &MockApiVersionsResponse{t: t} +} + +func (mr *MockApiVersionsResponse) For(reqBody versionedDecoder) encoderWithHeader { + req := reqBody.(*ApiVersionsRequest) + res := &ApiVersionsResponse{ + Version: req.Version, + ApiKeys: []ApiVersionsResponseKey{ + { + Version: req.Version, + ApiKey: 0, + MinVersion: 5, + MaxVersion: 8, + }, + { + Version: req.Version, + ApiKey: 1, + MinVersion: 7, + MaxVersion: 11, + }, + }, + } + return res +} diff --git a/request.go b/request.go index f6c27c1070..ce90eb8c46 100644 --- a/request.go +++ b/request.go @@ -147,7 +147,7 @@ func allocateBody(key, version int16) protocolBody { case 17: return &SaslHandshakeRequest{} case 18: - return &ApiVersionsRequest{} + return &ApiVersionsRequest{Version: version} case 19: return &CreateTopicsRequest{} case 20: