Skip to content

Commit

Permalink
Sets ConfigEntry.Default flag in addition to the ConfigEntry.Source f…
Browse files Browse the repository at this point in the history
…or Kafka versions > V1_1_0_0 (#1594)

* Set describeConfigsRequest.Version in ListTopics for consistency with DescribeConfig

This breaks the output of ListTopics for newer request versions, it now includes default configuration settings.

* Set ConfigEntry.Default for KafkaVersions > 0

Clients can now rely on the `Default` flag again and don't have to check the `Source` for higher Kafka versions.

* Set ConfigEntry.Source to default for KafkaVersions <= 0 when applicable

* Add tests for default flag/source
  • Loading branch information
sladkoff authored Feb 21, 2020
1 parent 4ee86d9 commit 6d92277
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 2 deletions.
9 changes: 9 additions & 0 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,15 @@ func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
describeConfigsReq := &DescribeConfigsRequest{
Resources: describeConfigsResources,
}

if ca.conf.Version.IsAtLeast(V1_1_0_0) {
describeConfigsReq.Version = 1
}

if ca.conf.Version.IsAtLeast(V2_0_0_0) {
describeConfigsReq.Version = 2
}

describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestClusterAdminListTopics(t *testing.T) {
})

config := NewConfig()
config.Version = V1_0_0_0
config.Version = V1_1_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
Expand Down
4 changes: 4 additions & 0 deletions describe_configs_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,16 @@ func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {
return err
}
r.Default = defaultB
if defaultB {
r.Source = SourceDefault
}
} else {
source, err := pd.getInt8()
if err != nil {
return err
}
r.Source = ConfigSource(source)
r.Default = r.Source == SourceDefault
}

sensitive, err := pd.getBool()
Expand Down
104 changes: 104 additions & 0 deletions describe_configs_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@ var (
0, // Sensitive
}

describeConfigsResponseWithDefaultv0 = []byte{
0, 0, 0, 0, //throttle
0, 0, 0, 1, // response
0, 0, //errorcode
0, 0, //string
2, // topic
0, 3, 'f', 'o', 'o',
0, 0, 0, 1, //configs
0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
0, 4, '1', '0', '0', '0',
0, // ReadOnly
1, // Default
0, // Sensitive
}

describeConfigsResponsePopulatedv1 = []byte{
0, 0, 0, 0, //throttle
0, 0, 0, 1, // response
Expand Down Expand Up @@ -59,6 +74,22 @@ var (
0, 4, '1', '0', '0', '0',
4, // Source
}

describeConfigsResponseWithDefaultv1 = []byte{
0, 0, 0, 0, //throttle
0, 0, 0, 1, // response
0, 0, //errorcode
0, 0, //string
2, // topic
0, 3, 'f', 'o', 'o',
0, 0, 0, 1, //configs
0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
0, 4, '1', '0', '0', '0',
0, // ReadOnly
5, // Source
0, // Sensitive
0, 0, 0, 0, // No Synonym
}
)

func TestDescribeConfigsResponsev0(t *testing.T) {
Expand Down Expand Up @@ -86,6 +117,7 @@ func TestDescribeConfigsResponsev0(t *testing.T) {
ReadOnly: false,
Default: false,
Sensitive: false,
Source: SourceUnknown,
},
},
},
Expand All @@ -94,6 +126,40 @@ func TestDescribeConfigsResponsev0(t *testing.T) {
testResponse(t, "response with error", response, describeConfigsResponsePopulatedv0)
}

func TestDescribeConfigsResponseWithDefaultv0(t *testing.T) {
var response *DescribeConfigsResponse

response = &DescribeConfigsResponse{
Resources: []*ResourceResponse{},
}
testVersionDecodable(t, "empty", response, describeConfigsResponseEmpty, 0)
if len(response.Resources) != 0 {
t.Error("Expected no groups")
}

response = &DescribeConfigsResponse{
Version: 0, Resources: []*ResourceResponse{
{
ErrorCode: 0,
ErrorMsg: "",
Type: TopicResource,
Name: "foo",
Configs: []*ConfigEntry{
{
Name: "segment.ms",
Value: "1000",
ReadOnly: false,
Default: true,
Sensitive: false,
Source: SourceDefault,
},
},
},
},
}
testResponse(t, "response with default", response, describeConfigsResponseWithDefaultv0)
}

func TestDescribeConfigsResponsev1(t *testing.T) {
var response *DescribeConfigsResponse

Expand All @@ -119,6 +185,7 @@ func TestDescribeConfigsResponsev1(t *testing.T) {
Value: "1000",
ReadOnly: false,
Source: SourceStaticBroker,
Default: false,
Sensitive: false,
Synonyms: []*ConfigSynonym{},
},
Expand Down Expand Up @@ -154,6 +221,7 @@ func TestDescribeConfigsResponseWithSynonym(t *testing.T) {
Value: "1000",
ReadOnly: false,
Source: SourceStaticBroker,
Default: false,
Sensitive: false,
Synonyms: []*ConfigSynonym{
{
Expand All @@ -169,3 +237,39 @@ func TestDescribeConfigsResponseWithSynonym(t *testing.T) {
}
testResponse(t, "response with error", response, describeConfigsResponseWithSynonymv1)
}

func TestDescribeConfigsResponseWithDefaultv1(t *testing.T) {
var response *DescribeConfigsResponse

response = &DescribeConfigsResponse{
Resources: []*ResourceResponse{},
}
testVersionDecodable(t, "empty", response, describeConfigsResponseEmpty, 0)
if len(response.Resources) != 0 {
t.Error("Expected no groups")
}

response = &DescribeConfigsResponse{
Version: 1,
Resources: []*ResourceResponse{
{
ErrorCode: 0,
ErrorMsg: "",
Type: TopicResource,
Name: "foo",
Configs: []*ConfigEntry{
{
Name: "segment.ms",
Value: "1000",
ReadOnly: false,
Source: SourceDefault,
Default: true,
Sensitive: false,
Synonyms: []*ConfigSynonym{},
},
},
},
},
}
testResponse(t, "response with error", response, describeConfigsResponseWithDefaultv1)
}
6 changes: 5 additions & 1 deletion mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,7 @@ func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
}

includeSynonyms := (req.Version > 0)
includeSource := (req.Version > 0)

for _, r := range req.Resources {
var configEntries []*ConfigEntry
Expand Down Expand Up @@ -770,9 +771,12 @@ func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
maxMessageBytes := &ConfigEntry{Name: "max.message.bytes",
Value: "1000000",
ReadOnly: false,
Default: true,
Default: !includeSource,
Sensitive: false,
}
if includeSource {
maxMessageBytes.Source = SourceDefault
}
if includeSynonyms {
maxMessageBytes.Synonyms = []*ConfigSynonym{
{
Expand Down

0 comments on commit 6d92277

Please sign in to comment.