Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support ApiVersionsRequest V3 protocol #2025

Merged
merged 1 commit into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 51 additions & 10 deletions api_versions_request.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,69 @@
package sarama

// ApiVersionsRequest ...
type ApiVersionsRequest struct{}
// const defaultClientSoftwareName = "sarama"

type ApiVersionsRequest struct {
// Version defines the protocol version to use for encode and decode
Version int16
// ClientSoftwareName contains the name of the client.
ClientSoftwareName string
// ClientSoftwareVersion contains the version of the client.
ClientSoftwareVersion string
}

func (r *ApiVersionsRequest) encode(pe packetEncoder) (err error) {
if r.Version >= 3 {
if err := pe.putCompactString(r.ClientSoftwareName); err != nil {
return err
}
if err := pe.putCompactString(r.ClientSoftwareVersion); err != nil {
return err
}
pe.putEmptyTaggedFieldArray()
}

func (a *ApiVersionsRequest) encode(pe packetEncoder) error {
return nil
}

func (a *ApiVersionsRequest) decode(pd packetDecoder, version int16) (err error) {
func (r *ApiVersionsRequest) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
if r.Version >= 3 {
if r.ClientSoftwareName, err = pd.getCompactString(); err != nil {
return err
}
if r.ClientSoftwareVersion, err = pd.getCompactString(); err != nil {
return err
}
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}

return nil
}

func (a *ApiVersionsRequest) key() int16 {
func (r *ApiVersionsRequest) key() int16 {
return 18
}

func (a *ApiVersionsRequest) version() int16 {
return 0
func (r *ApiVersionsRequest) version() int16 {
return r.Version
}

func (a *ApiVersionsRequest) headerVersion() int16 {
func (r *ApiVersionsRequest) headerVersion() int16 {
if r.Version >= 3 {
return 2
}
return 1
}

func (a *ApiVersionsRequest) requiredVersion() KafkaVersion {
return V0_10_0_0
func (r *ApiVersionsRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 0:
return V0_10_0_0
case 3:
return V2_4_0_0
default:
return V0_10_0_0
}
}
18 changes: 17 additions & 1 deletion api_versions_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,25 @@ package sarama

import "testing"

var apiVersionRequest []byte
var (
apiVersionRequest []byte

apiVersionRequestV3 = []byte{
0x07, 's', 'a', 'r', 'a', 'm', 'a',
0x07, '0', '.', '1', '0', '.', '0',
0x00,
}
)

func TestApiVersionsRequest(t *testing.T) {
request := new(ApiVersionsRequest)
testRequest(t, "basic", request, apiVersionRequest)
}

func TestApiVersionsRequestV3(t *testing.T) {
request := new(ApiVersionsRequest)
request.Version = 3
request.ClientSoftwareName = "sarama"
request.ClientSoftwareVersion = "0.10.0"
testRequest(t, "v3", request, apiVersionRequestV3)
}
137 changes: 100 additions & 37 deletions api_versions_response.go
Original file line number Diff line number Diff line change
@@ -1,76 +1,130 @@
package sarama

// ApiVersionsResponseBlock is an api version response block type
type ApiVersionsResponseBlock struct {
ApiKey int16
// ApiVersionsResponseKey contains the APIs supported by the broker.
type ApiVersionsResponseKey struct {
// Version defines the protocol version to use for encode and decode
Version int16
// ApiKey contains the API index.
ApiKey int16
// MinVersion contains the minimum supported version, inclusive.
MinVersion int16
// MaxVersion contains the maximum supported version, inclusive.
MaxVersion int16
}

func (b *ApiVersionsResponseBlock) encode(pe packetEncoder) error {
pe.putInt16(b.ApiKey)
pe.putInt16(b.MinVersion)
pe.putInt16(b.MaxVersion)
func (a *ApiVersionsResponseKey) encode(pe packetEncoder, version int16) (err error) {
a.Version = version
pe.putInt16(a.ApiKey)

pe.putInt16(a.MinVersion)

pe.putInt16(a.MaxVersion)

if version >= 3 {
pe.putEmptyTaggedFieldArray()
}

return nil
}

func (b *ApiVersionsResponseBlock) decode(pd packetDecoder) error {
var err error

if b.ApiKey, err = pd.getInt16(); err != nil {
func (a *ApiVersionsResponseKey) decode(pd packetDecoder, version int16) (err error) {
a.Version = version
if a.ApiKey, err = pd.getInt16(); err != nil {
return err
}

if b.MinVersion, err = pd.getInt16(); err != nil {
if a.MinVersion, err = pd.getInt16(); err != nil {
return err
}

if b.MaxVersion, err = pd.getInt16(); err != nil {
if a.MaxVersion, err = pd.getInt16(); err != nil {
return err
}

if version >= 3 {
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}

return nil
}

// ApiVersionsResponse is an api version response type
type ApiVersionsResponse struct {
Err KError
ApiVersions []*ApiVersionsResponseBlock
// Version defines the protocol version to use for encode and decode
Version int16
// ErrorCode contains the top-level error code.
ErrorCode int16
// ApiKeys contains the APIs supported by the broker.
ApiKeys []ApiVersionsResponseKey
// ThrottleTimeMs contains the duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
ThrottleTimeMs int32
}

func (r *ApiVersionsResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(r.Err))
if err := pe.putArrayLength(len(r.ApiVersions)); err != nil {
return err
func (r *ApiVersionsResponse) encode(pe packetEncoder) (err error) {
pe.putInt16(r.ErrorCode)

if r.Version >= 3 {
pe.putCompactArrayLength(len(r.ApiKeys))
} else {
if err := pe.putArrayLength(len(r.ApiKeys)); err != nil {
return err
}
}
for _, apiVersion := range r.ApiVersions {
if err := apiVersion.encode(pe); err != nil {
for _, block := range r.ApiKeys {
if err := block.encode(pe, r.Version); err != nil {
return err
}
}

if r.Version >= 1 {
pe.putInt32(r.ThrottleTimeMs)
}

if r.Version >= 3 {
pe.putEmptyTaggedFieldArray()
}

return nil
}

func (r *ApiVersionsResponse) decode(pd packetDecoder, version int16) error {
kerr, err := pd.getInt16()
if err != nil {
func (r *ApiVersionsResponse) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
if r.ErrorCode, err = pd.getInt16(); err != nil {
return err
}

r.Err = KError(kerr)
var numApiKeys int
if r.Version >= 3 {
numApiKeys, err = pd.getCompactArrayLength()
if err != nil {
return err
}
} else {
numApiKeys, err = pd.getArrayLength()
if err != nil {
return err
}
}
r.ApiKeys = make([]ApiVersionsResponseKey, numApiKeys)
for i := 0; i < numApiKeys; i++ {
var block ApiVersionsResponseKey
if err = block.decode(pd, r.Version); err != nil {
return err
}
r.ApiKeys[i] = block
}

numBlocks, err := pd.getArrayLength()
if err != nil {
return err
if r.Version >= 1 {
if r.ThrottleTimeMs, err = pd.getInt32(); err != nil {
return err
}
}

r.ApiVersions = make([]*ApiVersionsResponseBlock, numBlocks)
for i := 0; i < numBlocks; i++ {
block := new(ApiVersionsResponseBlock)
if err := block.decode(pd); err != nil {
if r.Version >= 3 {
if _, err = pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
r.ApiVersions[i] = block
}

return nil
Expand All @@ -81,13 +135,22 @@ func (r *ApiVersionsResponse) key() int16 {
}

func (r *ApiVersionsResponse) version() int16 {
return 0
return r.Version
}

func (a *ApiVersionsResponse) headerVersion() int16 {
func (r *ApiVersionsResponse) headerVersion() int16 {
// ApiVersionsResponse always includes a v0 header.
// See KIP-511 for details
return 0
}

func (r *ApiVersionsResponse) requiredVersion() KafkaVersion {
return V0_10_0_0
switch r.Version {
case 0:
return V0_10_0_0
case 3:
return V2_4_0_0
default:
return V0_10_0_0
}
}
61 changes: 46 additions & 15 deletions api_versions_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,58 @@ package sarama

import "testing"

var apiVersionResponse = []byte{
0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
0x00, 0x03,
0x00, 0x02,
0x00, 0x01,
}
var (
apiVersionResponse = []byte{
0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
0x00, 0x03,
0x00, 0x02,
0x00, 0x01,
}

apiVersionResponseV3 = []byte{
0x00, 0x00, // no error
0x02, // compact array length 1
0x00, 0x03,
0x00, 0x02,
0x00, 0x01,
0x00, // tagged fields
0x00, 0x00, 0x00, 0x00, // throttle time
0x00, // tagged fields
}
)

func TestApiVersionsResponse(t *testing.T) {
response := new(ApiVersionsResponse)
testVersionDecodable(t, "no error", response, apiVersionResponse, 0)
if response.Err != ErrNoError {
t.Error("Decoding error failed: no error expected but found", response.Err)
if response.ErrorCode != int16(ErrNoError) {
t.Error("Decoding error failed: no error expected but found", response.ErrorCode)
}
if response.ApiKeys[0].ApiKey != 0x03 {
t.Error("Decoding error: expected 0x03 but got", response.ApiKeys[0].ApiKey)
}
if response.ApiKeys[0].MinVersion != 0x02 {
t.Error("Decoding error: expected 0x02 but got", response.ApiKeys[0].MinVersion)
}
if response.ApiKeys[0].MaxVersion != 0x01 {
t.Error("Decoding error: expected 0x01 but got", response.ApiKeys[0].MaxVersion)
}
}

func TestApiVersionsResponseV3(t *testing.T) {
response := new(ApiVersionsResponse)
response.Version = 3
testVersionDecodable(t, "no error", response, apiVersionResponseV3, 3)
if response.ErrorCode != int16(ErrNoError) {
t.Error("Decoding error failed: no error expected but found", response.ErrorCode)
}
if response.ApiVersions[0].ApiKey != 0x03 {
t.Error("Decoding error: expected 0x03 but got", response.ApiVersions[0].ApiKey)
if response.ApiKeys[0].ApiKey != 0x03 {
t.Error("Decoding error: expected 0x03 but got", response.ApiKeys[0].ApiKey)
}
if response.ApiVersions[0].MinVersion != 0x02 {
t.Error("Decoding error: expected 0x02 but got", response.ApiVersions[0].MinVersion)
if response.ApiKeys[0].MinVersion != 0x02 {
t.Error("Decoding error: expected 0x02 but got", response.ApiKeys[0].MinVersion)
}
if response.ApiVersions[0].MaxVersion != 0x01 {
t.Error("Decoding error: expected 0x01 but got", response.ApiVersions[0].MaxVersion)
if response.ApiKeys[0].MaxVersion != 0x01 {
t.Error("Decoding error: expected 0x01 but got", response.ApiKeys[0].MaxVersion)
}
}
Loading