Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-339: Add Incremental Config updates API #1966

Merged
merged 2 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 43 additions & 28 deletions alter_configs_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,9 @@ func (a *AlterConfigsResponse) encode(pe packetEncoder) error {
return err
}

for i := range a.Resources {
pe.putInt16(a.Resources[i].ErrorCode)
err := pe.putString(a.Resources[i].ErrorMsg)
if err != nil {
return nil
}
pe.putInt8(int8(a.Resources[i].Type))
err = pe.putString(a.Resources[i].Name)
if err != nil {
return nil
for _, v := range a.Resources {
if err := v.encode(pe); err != nil {
return err
}
}

Expand All @@ -56,30 +49,52 @@ func (a *AlterConfigsResponse) decode(pd packetDecoder, version int16) error {
for i := range a.Resources {
a.Resources[i] = new(AlterConfigsResourceResponse)

errCode, err := pd.getInt16()
if err != nil {
if err := a.Resources[i].decode(pd, version); err != nil {
return err
}
a.Resources[i].ErrorCode = errCode
}

e, err := pd.getString()
if err != nil {
return err
}
a.Resources[i].ErrorMsg = e
return nil
}

t, err := pd.getInt8()
if err != nil {
return err
}
a.Resources[i].Type = ConfigResourceType(t)
func (a *AlterConfigsResourceResponse) encode(pe packetEncoder) error {
pe.putInt16(a.ErrorCode)
err := pe.putString(a.ErrorMsg)
if err != nil {
return nil
}
pe.putInt8(int8(a.Type))
err = pe.putString(a.Name)
if err != nil {
return nil
}
return nil
}

name, err := pd.getString()
if err != nil {
return err
}
a.Resources[i].Name = name
func (a *AlterConfigsResourceResponse) decode(pd packetDecoder, version int16) error {
errCode, err := pd.getInt16()
if err != nil {
return err
}
a.ErrorCode = errCode

e, err := pd.getString()
if err != nil {
return err
}
a.ErrorMsg = e

t, err := pd.getInt8()
if err != nil {
return err
}
a.Type = ConfigResourceType(t)

name, err := pd.getString()
if err != nil {
return err
}
a.Name = name

return nil
}
Expand Down
12 changes: 12 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,18 @@ func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsRespon
return response, nil
}

// IncrementalAlterConfigs sends a request to incremental alter config and return a response or error
func (b *Broker) IncrementalAlterConfigs(request *IncrementalAlterConfigsRequest) (*IncrementalAlterConfigsResponse, error) {
response := new(IncrementalAlterConfigsResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

// DeleteGroups sends a request to delete groups and returns a response or error
func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
response := new(DeleteGroupsResponse)
Expand Down
173 changes: 173 additions & 0 deletions incremental_alter_configs_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package sarama

type IncrementalAlterConfigsOperation int8

const (
IncrementalAlterConfigsOperationSet IncrementalAlterConfigsOperation = iota
IncrementalAlterConfigsOperationDelete
IncrementalAlterConfigsOperationAppend
IncrementalAlterConfigsOperationSubtract
)

// IncrementalAlterConfigsRequest is an incremental alter config request type
type IncrementalAlterConfigsRequest struct {
Resources []*IncrementalAlterConfigsResource
ValidateOnly bool
}

type IncrementalAlterConfigsResource struct {
Type ConfigResourceType
Name string
ConfigEntries map[string]IncrementalAlterConfigsEntry
}

type IncrementalAlterConfigsEntry struct {
Operation IncrementalAlterConfigsOperation
Value *string
}

func (a *IncrementalAlterConfigsRequest) encode(pe packetEncoder) error {
if err := pe.putArrayLength(len(a.Resources)); err != nil {
return err
}

for _, r := range a.Resources {
if err := r.encode(pe); err != nil {
return err
}
}

pe.putBool(a.ValidateOnly)
return nil
}

func (a *IncrementalAlterConfigsRequest) decode(pd packetDecoder, version int16) error {
resourceCount, err := pd.getArrayLength()
if err != nil {
return err
}

a.Resources = make([]*IncrementalAlterConfigsResource, resourceCount)
for i := range a.Resources {
r := &IncrementalAlterConfigsResource{}
err = r.decode(pd, version)
if err != nil {
return err
}
a.Resources[i] = r
}

validateOnly, err := pd.getBool()
if err != nil {
return err
}

a.ValidateOnly = validateOnly

return nil
}

func (a *IncrementalAlterConfigsResource) encode(pe packetEncoder) error {
pe.putInt8(int8(a.Type))

if err := pe.putString(a.Name); err != nil {
return err
}

if err := pe.putArrayLength(len(a.ConfigEntries)); err != nil {
return err
}

for name, e := range a.ConfigEntries {
if err := pe.putString(name); err != nil {
return err
}

if err := e.encode(pe); err != nil {
return err
}
}

return nil
}

func (a *IncrementalAlterConfigsResource) decode(pd packetDecoder, version int16) error {
t, err := pd.getInt8()
if err != nil {
return err
}
a.Type = ConfigResourceType(t)

name, err := pd.getString()
if err != nil {
return err
}
a.Name = name

n, err := pd.getArrayLength()
if err != nil {
return err
}

if n > 0 {
a.ConfigEntries = make(map[string]IncrementalAlterConfigsEntry, n)
for i := 0; i < n; i++ {
name, err := pd.getString()
if err != nil {
return err
}

var v IncrementalAlterConfigsEntry

if err := v.decode(pd, version); err != nil {
return err
}

a.ConfigEntries[name] = v
}
}
return err
}

func (a *IncrementalAlterConfigsEntry) encode(pe packetEncoder) error {
pe.putInt8(int8(a.Operation))

if err := pe.putNullableString(a.Value); err != nil {
return err
}

return nil
}

func (a *IncrementalAlterConfigsEntry) decode(pd packetDecoder, version int16) error {
t, err := pd.getInt8()
if err != nil {
return err
}
a.Operation = IncrementalAlterConfigsOperation(t)

s, err := pd.getNullableString()
if err != nil {
return err
}

a.Value = s

return nil
}

func (a *IncrementalAlterConfigsRequest) key() int16 {
return 44
}

func (a *IncrementalAlterConfigsRequest) version() int16 {
return 0
}

func (a *IncrementalAlterConfigsRequest) headerVersion() int16 {
return 1
}

func (a *IncrementalAlterConfigsRequest) requiredVersion() KafkaVersion {
return V2_3_0_0
}
98 changes: 98 additions & 0 deletions incremental_alter_configs_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package sarama

import "testing"

var (
emptyIncrementalAlterConfigsRequest = []byte{
0, 0, 0, 0, // 0 configs
0, // don't Validate
}

singleIncrementalAlterConfigsRequest = []byte{
0, 0, 0, 1, // 1 config
2, // a topic
0, 3, 'f', 'o', 'o', // topic name: foo
0, 0, 0, 1, // 1 config name
0, 10, // 10 chars
's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
0, // OperationSet
0, 4,
'1', '0', '0', '0',
0, // don't validate
}

doubleIncrementalAlterConfigsRequest = []byte{
0, 0, 0, 2, // 2 config
2, // a topic
0, 3, 'f', 'o', 'o', // topic name: foo
0, 0, 0, 1, // 1 config name
0, 10, // 10 chars
's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
0, // OperationSet
0, 4,
'1', '0', '0', '0',
2, // a topic
0, 3, 'b', 'a', 'r', // topic name: foo
0, 0, 0, 1, // 2 config
0, 12, // 12 chars
'r', 'e', 't', 'e', 'n', 't', 'i', 'o', 'n', '.', 'm', 's',
1, // OperationDelete
0, 4,
'1', '0', '0', '0',
0, // don't validate
}
)

func TestIncrementalAlterConfigsRequest(t *testing.T) {
var request *IncrementalAlterConfigsRequest

request = &IncrementalAlterConfigsRequest{
Resources: []*IncrementalAlterConfigsResource{},
}
testRequest(t, "no requests", request, emptyIncrementalAlterConfigsRequest)

configValue := "1000"
request = &IncrementalAlterConfigsRequest{
Resources: []*IncrementalAlterConfigsResource{
{
Type: TopicResource,
Name: "foo",
ConfigEntries: map[string]IncrementalAlterConfigsEntry{
"segment.ms": {
Operation: IncrementalAlterConfigsOperationSet,
Value: &configValue,
},
},
},
},
}

testRequest(t, "one config", request, singleIncrementalAlterConfigsRequest)

request = &IncrementalAlterConfigsRequest{
Resources: []*IncrementalAlterConfigsResource{
{
Type: TopicResource,
Name: "foo",
ConfigEntries: map[string]IncrementalAlterConfigsEntry{
"segment.ms": {
Operation: IncrementalAlterConfigsOperationSet,
Value: &configValue,
},
},
},
{
Type: TopicResource,
Name: "bar",
ConfigEntries: map[string]IncrementalAlterConfigsEntry{
"retention.ms": {
Operation: IncrementalAlterConfigsOperationDelete,
Value: &configValue,
},
},
},
},
}

testRequest(t, "two configs", request, doubleIncrementalAlterConfigsRequest)
}
Loading