Skip to content

Commit

Permalink
Add Describe + AlterConfigs (#1014)
Browse files Browse the repository at this point in the history
Add DescribeConfig and AlterConfig
  • Loading branch information
Mongey authored and eapache committed Feb 3, 2018
1 parent a5eaad6 commit f7466ea
Show file tree
Hide file tree
Showing 11 changed files with 820 additions and 0 deletions.
120 changes: 120 additions & 0 deletions alter_configs_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package sarama

type AlterConfigsRequest struct {
Resources []*AlterConfigsResource
ValidateOnly bool
}

type AlterConfigsResource struct {
Type ConfigResourceType
Name string
ConfigEntries map[string]*string
}

func (acr *AlterConfigsRequest) encode(pe packetEncoder) error {
if err := pe.putArrayLength(len(acr.Resources)); err != nil {
return err
}

for _, r := range acr.Resources {
if err := r.encode(pe); err != nil {
return err
}
}

pe.putBool(acr.ValidateOnly)
return nil
}

func (acr *AlterConfigsRequest) decode(pd packetDecoder, version int16) error {
resourceCount, err := pd.getArrayLength()
if err != nil {
return err
}

acr.Resources = make([]*AlterConfigsResource, resourceCount)
for i := range acr.Resources {
r := &AlterConfigsResource{}
err = r.decode(pd, version)
if err != nil {
return err
}
acr.Resources[i] = r
}

validateOnly, err := pd.getBool()
if err != nil {
return err
}

acr.ValidateOnly = validateOnly

return nil
}

func (ac *AlterConfigsResource) encode(pe packetEncoder) error {
pe.putInt8(int8(ac.Type))

if err := pe.putString(ac.Name); err != nil {
return err
}

if err := pe.putArrayLength(len(ac.ConfigEntries)); err != nil {
return err
}
for configKey, configValue := range ac.ConfigEntries {
if err := pe.putString(configKey); err != nil {
return err
}
if err := pe.putNullableString(configValue); err != nil {
return err
}
}

return nil
}

func (ac *AlterConfigsResource) decode(pd packetDecoder, version int16) error {
t, err := pd.getInt8()
if err != nil {
return err
}
ac.Type = ConfigResourceType(t)

name, err := pd.getString()
if err != nil {
return err
}
ac.Name = name

n, err := pd.getArrayLength()
if err != nil {
return err
}

if n > 0 {
ac.ConfigEntries = make(map[string]*string, n)
for i := 0; i < n; i++ {
configKey, err := pd.getString()
if err != nil {
return err
}
if ac.ConfigEntries[configKey], err = pd.getNullableString(); err != nil {
return err
}
}
}
return err
}

func (acr *AlterConfigsRequest) key() int16 {
return 33
}

func (acr *AlterConfigsRequest) version() int16 {
return 0
}

func (acr *AlterConfigsRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
91 changes: 91 additions & 0 deletions alter_configs_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package sarama

import "testing"

var (
emptyAlterConfigsRequest = []byte{
0, 0, 0, 0, // 0 configs
0, // don't Validate
}

singleAlterConfigsRequest = []byte{
0, 0, 0, 1, // 1 config
2, // a topic
0, 3, 'f', 'o', 'o', // topic name: foo
0, 0, 0, 1, //1 config name
0, 10, // 10 chars
's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
0, 4,
'1', '0', '0', '0',
0, // don't validate
}

doubleAlterConfigsRequest = []byte{
0, 0, 0, 2, // 2 config
2, // a topic
0, 3, 'f', 'o', 'o', // topic name: foo
0, 0, 0, 1, //1 config name
0, 10, // 10 chars
's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
0, 4,
'1', '0', '0', '0',
2, // a topic
0, 3, 'b', 'a', 'r', // topic name: foo
0, 0, 0, 2, //2 config
0, 10, // 10 chars
's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
0, 4,
'1', '0', '0', '0',
0, 12, // 12 chars
'r', 'e', 't', 'e', 'n', 't', 'i', 'o', 'n', '.', 'm', 's',
0, 4,
'1', '0', '0', '0',
0, // don't validate
}
)

func TestAlterConfigsRequest(t *testing.T) {
var request *AlterConfigsRequest

request = &AlterConfigsRequest{
Resources: []*AlterConfigsResource{},
}
testRequest(t, "no requests", request, emptyAlterConfigsRequest)

configValue := "1000"
request = &AlterConfigsRequest{
Resources: []*AlterConfigsResource{
&AlterConfigsResource{
Type: TopicResource,
Name: "foo",
ConfigEntries: map[string]*string{
"segment.ms": &configValue,
},
},
},
}

testRequest(t, "one config", request, singleAlterConfigsRequest)

request = &AlterConfigsRequest{
Resources: []*AlterConfigsResource{
&AlterConfigsResource{
Type: TopicResource,
Name: "foo",
ConfigEntries: map[string]*string{
"segment.ms": &configValue,
},
},
&AlterConfigsResource{
Type: TopicResource,
Name: "bar",
ConfigEntries: map[string]*string{
"segment.ms": &configValue,
"retention.ms": &configValue,
},
},
},
}

testRequest(t, "two configs", request, doubleAlterConfigsRequest)
}
95 changes: 95 additions & 0 deletions alter_configs_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package sarama

import "time"

type AlterConfigsResponse struct {
ThrottleTime time.Duration
Resources []*AlterConfigsResourceResponse
}

type AlterConfigsResourceResponse struct {
ErrorCode int16
ErrorMsg string
Type ConfigResourceType
Name string
}

func (ct *AlterConfigsResponse) encode(pe packetEncoder) error {
pe.putInt32(int32(ct.ThrottleTime / time.Millisecond))

if err := pe.putArrayLength(len(ct.Resources)); err != nil {
return err
}

for i := range ct.Resources {
pe.putInt16(ct.Resources[i].ErrorCode)
err := pe.putString(ct.Resources[i].ErrorMsg)
if err != nil {
return nil
}
pe.putInt8(int8(ct.Resources[i].Type))
err = pe.putString(ct.Resources[i].Name)
if err != nil {
return nil
}
}

return nil
}

func (acr *AlterConfigsResponse) decode(pd packetDecoder, version int16) error {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
acr.ThrottleTime = time.Duration(throttleTime) * time.Millisecond

responseCount, err := pd.getArrayLength()
if err != nil {
return err
}

acr.Resources = make([]*AlterConfigsResourceResponse, responseCount)

for i := range acr.Resources {
acr.Resources[i] = new(AlterConfigsResourceResponse)

errCode, err := pd.getInt16()
if err != nil {
return err
}
acr.Resources[i].ErrorCode = errCode

e, err := pd.getString()
if err != nil {
return err
}
acr.Resources[i].ErrorMsg = e

t, err := pd.getInt8()
if err != nil {
return err
}
acr.Resources[i].Type = ConfigResourceType(t)

name, err := pd.getString()
if err != nil {
return err
}
acr.Resources[i].Name = name
}

return nil
}

func (r *AlterConfigsResponse) key() int16 {
return 32
}

func (r *AlterConfigsResponse) version() int16 {
return 0
}

func (r *AlterConfigsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
45 changes: 45 additions & 0 deletions alter_configs_response_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package sarama

import (
"testing"
)

var (
alterResponseEmpty = []byte{
0, 0, 0, 0, //throttle
0, 0, 0, 0, // no configs
}

alterResponsePopulated = []byte{
0, 0, 0, 0, //throttle
0, 0, 0, 1, // response
0, 0, //errorcode
0, 0, //string
2, // topic
0, 3, 'f', 'o', 'o',
}
)

func TestAlterConfigsResponse(t *testing.T) {
var response *AlterConfigsResponse

response = &AlterConfigsResponse{
Resources: []*AlterConfigsResourceResponse{},
}
testVersionDecodable(t, "empty", response, alterResponseEmpty, 0)
if len(response.Resources) != 0 {
t.Error("Expected no groups")
}

response = &AlterConfigsResponse{
Resources: []*AlterConfigsResourceResponse{
&AlterConfigsResourceResponse{
ErrorCode: 0,
ErrorMsg: "",
Type: TopicResource,
Name: "foo",
},
},
}
testResponse(t, "response with error", response, alterResponsePopulated)
}
21 changes: 21 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,27 @@ func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCom
return response, nil
}

func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
response := new(DescribeConfigsResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) {
response := new(AlterConfigsResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}
func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
b.lock.Lock()
defer b.lock.Unlock()
Expand Down
15 changes: 15 additions & 0 deletions config_resource_type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package sarama

type ConfigResourceType int8

// Taken from :
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-133%3A+Describe+and+Alter+Configs+Admin+APIs#KIP-133:DescribeandAlterConfigsAdminAPIs-WireFormattypes

const (
UnknownResource ConfigResourceType = 0
AnyResource ConfigResourceType = 1
TopicResource ConfigResourceType = 2
GroupResource ConfigResourceType = 3
ClusterResource ConfigResourceType = 4
BrokerResource ConfigResourceType = 5
)
Loading

0 comments on commit f7466ea

Please sign in to comment.