-
Notifications
You must be signed in to change notification settings - Fork 799
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KIP-546: Add Client Quota APIs #1119
Changes from 17 commits
2956635
89c534e
c7dd87c
05dd93c
65cefed
f697fff
21f84b0
13ba79e
7450047
af7dc5d
441539d
edbbbe6
15c6979
8b47e27
47269fe
db01413
66ec903
f8535d3
032aac5
80b8082
35535cf
5b5b590
259b170
10b8107
b5875a6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
package kafka | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net" | ||
"time" | ||
|
||
"github.com/segmentio/kafka-go/protocol/alterclientquotas" | ||
) | ||
|
||
// AlterClientQuotasRequest represents a request sent to a kafka broker to | ||
// alter client quotas. | ||
type AlterClientQuotasRequest struct { | ||
// Address of the kafka broker to send the request to. | ||
Addr net.Addr | ||
|
||
// List of client quotas entries to alter. | ||
Entries []AlterClientQuotaEntry | ||
|
||
// Whether the alteration should be validated, but not performed. | ||
ValidateOnly bool | ||
} | ||
|
||
type AlterClientQuotaEntry struct { | ||
// The quota entities to alter. | ||
Entities []AlterClientQuotaEntity | ||
|
||
// An individual quota configuration entry to alter. | ||
Ops []AlterClientQuotaOps | ||
} | ||
|
||
type AlterClientQuotaEntity struct { | ||
// The quota entity type. | ||
EntityType string | ||
|
||
// The name of the quota entity, or null if the default. | ||
EntityName string | ||
} | ||
|
||
type AlterClientQuotaOps struct { | ||
// The quota configuration key. | ||
Key string | ||
|
||
// The quota configuration value to set, otherwise ignored if the value is to be removed. | ||
Value float64 | ||
|
||
// Whether the quota configuration value should be removed, otherwise set. | ||
Remove bool | ||
} | ||
|
||
type AlterClientQuotaResponseQuotas struct { | ||
// The error code, or `0` if the quota alteration succeeded. | ||
ErrorCode int16 | ||
rhansen2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// The error message, or `nil` if the quota alteration succeeded. | ||
ErrorMessage string | ||
|
||
// The altered quota entities. | ||
Entities []AlterClientQuotaEntity | ||
} | ||
|
||
// AlterClientQuotasResponse represents a response from a kafka broker to an alter client | ||
// quotas request. | ||
type AlterClientQuotasResponse struct { | ||
// The amount of time that the broker throttled the request. | ||
Throttle time.Duration | ||
|
||
// List of altered client quotas responses. | ||
Entries []AlterClientQuotaResponseQuotas | ||
} | ||
|
||
// AlterClientQuotas sends client quotas alteration request to a kafka broker and returns | ||
// the response. | ||
func (c *Client) AlterClientQuotas(ctx context.Context, req *AlterClientQuotasRequest) (*AlterClientQuotasResponse, error) { | ||
entries := make([]alterclientquotas.Entry, len(req.Entries)) | ||
|
||
for entryIdx, entry := range req.Entries { | ||
entities := make([]alterclientquotas.Entity, len(entry.Entities)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I followed the pattern used elsewhere where we choose not to reuse the types defined in the protocol package and create new types in the kafka package |
||
for entityIdx, entity := range entry.Entities { | ||
entities[entityIdx] = alterclientquotas.Entity{ | ||
EntityType: entity.EntityType, | ||
EntityName: entity.EntityName, | ||
} | ||
} | ||
|
||
ops := make([]alterclientquotas.Ops, len(entry.Ops)) | ||
for opsIdx, op := range entry.Ops { | ||
ops[opsIdx] = alterclientquotas.Ops{ | ||
Key: op.Key, | ||
Value: op.Value, | ||
Remove: op.Remove, | ||
} | ||
} | ||
|
||
entries[entryIdx] = alterclientquotas.Entry{ | ||
Entities: entities, | ||
Ops: ops, | ||
} | ||
} | ||
|
||
m, err := c.roundTrip(ctx, req.Addr, &alterclientquotas.Request{ | ||
Entries: entries, | ||
ValidateOnly: req.ValidateOnly, | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("kafka.(*Client).AlterClientQuotas: %w", err) | ||
} | ||
|
||
res := m.(*alterclientquotas.Response) | ||
responseEntries := make([]AlterClientQuotaResponseQuotas, len(res.Results)) | ||
|
||
for responseEntryIdx, responseEntry := range res.Results { | ||
responseEntities := make([]AlterClientQuotaEntity, len(responseEntry.Entities)) | ||
for responseEntityIdx, responseEntity := range responseEntry.Entities { | ||
responseEntities[responseEntityIdx] = AlterClientQuotaEntity{ | ||
EntityType: responseEntity.EntityType, | ||
EntityName: responseEntity.EntityName, | ||
} | ||
} | ||
|
||
responseEntries[responseEntryIdx] = AlterClientQuotaResponseQuotas{ | ||
ErrorCode: responseEntry.ErrorCode, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looked like elsewhere we created a new errors map field in the return and grouped all the errors there instead of keeping them nested under the individual entries. Do we want that instead of what I have here? |
||
ErrorMessage: responseEntry.ErrorMessage, | ||
Entities: responseEntities, | ||
} | ||
} | ||
ret := &AlterClientQuotasResponse{ | ||
Throttle: makeDuration(res.ThrottleTimeMs), | ||
Entries: responseEntries, | ||
} | ||
|
||
return ret, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
package kafka | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
ktesting "github.com/segmentio/kafka-go/testing" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestClientAlterClientQuotas(t *testing.T) { | ||
// Added in Version 2.6.0 https://issues.apache.org/jira/browse/KAFKA-7740 | ||
if !ktesting.KafkaIsAtLeast("2.6.0") { | ||
return | ||
} | ||
|
||
const ( | ||
entityType = "client-id" | ||
entityName = "my-client-id" | ||
key = "producer_byte_rate" | ||
value = 500000.0 | ||
) | ||
|
||
client, shutdown := newLocalClient() | ||
defer shutdown() | ||
|
||
alterResp, err := client.AlterClientQuotas(context.Background(), &AlterClientQuotasRequest{ | ||
Entries: []AlterClientQuotaEntry{ | ||
{ | ||
Entities: []AlterClientQuotaEntity{ | ||
{ | ||
EntityType: entityType, | ||
EntityName: entityName, | ||
}, | ||
}, | ||
Ops: []AlterClientQuotaOps{ | ||
{ | ||
Key: key, | ||
Value: value, | ||
Remove: false, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}) | ||
|
||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
expectedAlterResp := AlterClientQuotasResponse{ | ||
Throttle: 0, | ||
Entries: []AlterClientQuotaResponseQuotas{ | ||
{ | ||
ErrorCode: 0, | ||
Entities: []AlterClientQuotaEntity{ | ||
{ | ||
EntityName: entityName, | ||
EntityType: entityType, | ||
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
|
||
assert.Equal(t, expectedAlterResp, *alterResp) | ||
|
||
describeResp, err := client.DescribeClientQuotas(context.Background(), &DescribeClientQuotasRequest{ | ||
Components: []DescribeClientQuotasRequestComponent{ | ||
{ | ||
EntityType: entityType, | ||
MatchType: 0, | ||
Match: entityName, | ||
}, | ||
}, | ||
}) | ||
|
||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
expectedDescribeResp := DescribeClientQuotasResponse{ | ||
Throttle: 0, | ||
ErrorCode: 0, | ||
Entries: []DescribeClientQuotasResponseQuotas{ | ||
{ | ||
Entities: []DescribeClientQuotasEntity{ | ||
{ | ||
EntityType: entityType, | ||
EntityName: entityName, | ||
}, | ||
}, | ||
Values: []DescribeClientQuotasValue{ | ||
{ | ||
Key: key, | ||
Value: value, | ||
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
|
||
assert.Equal(t, expectedDescribeResp, *describeResp) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
package kafka | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net" | ||
"time" | ||
|
||
"github.com/segmentio/kafka-go/protocol/describeclientquotas" | ||
) | ||
|
||
// DescribeClientQuotasRequest represents a request sent to a kafka broker to | ||
// describe client quotas. | ||
type DescribeClientQuotasRequest struct { | ||
// Address of the kafka broker to send the request to | ||
Addr net.Addr | ||
|
||
// List of quota components to describe. | ||
Components []DescribeClientQuotasRequestComponent `kafka:"min=v0,max=v1"` | ||
rhansen2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Whether the match is strict, i.e. should exclude entities with | ||
// unspecified entity types. | ||
Strict bool `kafka:"min=v0,max=v1"` | ||
} | ||
|
||
type DescribeClientQuotasRequestComponent struct { | ||
// The entity type that the filter component applies to. | ||
EntityType string `kafka:"min=v0,max=v1"` | ||
|
||
// How to match the entity (0 = exact name, 1 = default name, | ||
// 2 = any specified name). | ||
MatchType int8 `kafka:"min=v0,max=v1"` | ||
|
||
// The string to match against, or null if unused for the match type. | ||
Match string `kafka:"min=v0,max=v1,nullable"` | ||
} | ||
|
||
// DescribeClientQuotasReesponse represents a response from a kafka broker to a describe client quota request. | ||
type DescribeClientQuotasResponse struct { | ||
// The amount of time that the broker throttled the request. | ||
Throttle time.Duration `kafka:"min=v0,max=v1"` | ||
|
||
// The error code, or `0` if the quota description succeeded. | ||
ErrorCode int16 `kafka:"min=v0,max=v1"` | ||
|
||
// The error message, or `null` if the quota description succeeded. | ||
ErrorMessage string `kafka:"min=v0,max=v1,nullable"` | ||
|
||
// List of describe client quota responses. | ||
Entries []DescribeClientQuotasResponseQuotas `kafka:"min=v0,max=v1"` | ||
} | ||
|
||
type DescribeClientQuotasEntity struct { | ||
// The quota entity type. | ||
EntityType string `kafka:"min=v0,max=v1"` | ||
|
||
// The name of the quota entity, or null if the default. | ||
EntityName string `kafka:"min=v0,max=v1,nullable"` | ||
} | ||
|
||
type DescribeClientQuotasValue struct { | ||
// The quota configuration key. | ||
Key string `kafka:"min=v0,max=v1"` | ||
|
||
// The quota configuration value. | ||
Value float64 `kafka:"min=v0,max=v1"` | ||
} | ||
|
||
type DescribeClientQuotasResponseQuotas struct { | ||
// List of client quota entities and their descriptions. | ||
Entities []DescribeClientQuotasEntity `kafka:"min=v0,max=v1"` | ||
|
||
// The client quota configuration values. | ||
Values []DescribeClientQuotasValue `kafka:"min=v0,max=v1"` | ||
} | ||
|
||
// DescribeClientQuotas sends a describe client quotas request to a kafka broker and returns | ||
// the response. | ||
func (c *Client) DescribeClientQuotas(ctx context.Context, req *DescribeClientQuotasRequest) (*DescribeClientQuotasResponse, error) { | ||
components := make([]describeclientquotas.Component, len(req.Components)) | ||
|
||
for componentIdx, component := range req.Components { | ||
components[componentIdx] = describeclientquotas.Component{ | ||
EntityType: component.EntityType, | ||
MatchType: component.MatchType, | ||
Match: component.Match, | ||
} | ||
} | ||
|
||
m, err := c.roundTrip(ctx, req.Addr, &describeclientquotas.Request{ | ||
Components: components, | ||
Strict: req.Strict, | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("kafka.(*Client).DescribeClientQuotas: %w", err) | ||
} | ||
|
||
res := m.(*describeclientquotas.Response) | ||
responseEntries := make([]DescribeClientQuotasResponseQuotas, len(res.Entries)) | ||
|
||
for responseEntryIdx, responseEntry := range res.Entries { | ||
responseEntities := make([]DescribeClientQuotasEntity, len(responseEntry.Entities)) | ||
for responseEntityIdx, responseEntity := range responseEntry.Entities { | ||
responseEntities[responseEntityIdx] = DescribeClientQuotasEntity{ | ||
EntityType: responseEntity.EntityType, | ||
EntityName: responseEntity.EntityName, | ||
} | ||
} | ||
|
||
responseValues := make([]DescribeClientQuotasValue, len(responseEntry.Values)) | ||
for responseValueIdx, responseValue := range responseEntry.Values { | ||
responseValues[responseValueIdx] = DescribeClientQuotasValue{ | ||
Key: responseValue.Key, | ||
Value: responseValue.Value, | ||
} | ||
} | ||
responseEntries[responseEntryIdx] = DescribeClientQuotasResponseQuotas{ | ||
Entities: responseEntities, | ||
Values: responseValues, | ||
} | ||
} | ||
ret := &DescribeClientQuotasResponse{ | ||
Throttle: time.Duration(res.ThrottleTimeMs), | ||
Entries: responseEntries, | ||
} | ||
|
||
return ret, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a word missing at the end there ("false" maybe)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of these doc strings (including this one) are just quotes from the API docs. This verbiage seems common in the Kafka API docs so I think it is ok. I can change it if you feel otherwise