diff --git a/admin.go b/admin.go index 3bac6208e..07c0053c8 100644 --- a/admin.go +++ b/admin.go @@ -116,6 +116,14 @@ type ClusterAdmin interface { // Upsert SCRAM users UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error) + // Get client quota configurations corresponding to the specified filter. + // This operation is supported by brokers with version 2.6.0.0 or higher. + DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error) + + // Alters client quota configurations with the specified alterations. + // This operation is supported by brokers with version 2.6.0.0 or higher. + AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error + // Close shuts down the admin and closes underlying client. Close() error } @@ -1035,3 +1043,62 @@ func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsU return rsp.Results, nil } + +// Describe All : use an empty/nil components slice + strict = false +// Contains components: strict = false +// Contains only components: strict = true +func (ca *clusterAdmin) DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error) { + request := &DescribeClientQuotasRequest{ + Components: components, + Strict: strict, + } + + b, err := ca.Controller() + if err != nil { + return nil, err + } + + rsp, err := b.DescribeClientQuotas(request) + if err != nil { + return nil, err + } + + if rsp.ErrorMsg != nil { + return nil, errors.New(*rsp.ErrorMsg) + } + if rsp.ErrorCode != ErrNoError { + return nil, rsp.ErrorCode + } + + return rsp.Entries, nil +} + +func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error { + entry := AlterClientQuotasEntry{ + Entity: entity, + Ops: []ClientQuotasOp{op}, + } + + request := &AlterClientQuotasRequest{ + Entries: []AlterClientQuotasEntry{entry}, + ValidateOnly: validateOnly, + } + + b, err := ca.Controller() + if err != nil { + return err + } + + rsp, err := b.AlterClientQuotas(request) + if err != nil { + return err + } + + for _, entry := range rsp.Entries { + if entry.ErrorCode != ErrNoError { + return entry.ErrorCode + } + } + + return nil +} diff --git a/alter_client_quotas_request.go b/alter_client_quotas_request.go new file mode 100644 index 000000000..f528512d0 --- /dev/null +++ b/alter_client_quotas_request.go @@ -0,0 +1,194 @@ +package sarama + +// AlterClientQuotas Request (Version: 0) => [entries] validate_only +// entries => [entity] [ops] +// entity => entity_type entity_name +// entity_type => STRING +// entity_name => NULLABLE_STRING +// ops => key value remove +// key => STRING +// value => FLOAT64 +// remove => BOOLEAN +// validate_only => BOOLEAN + +type AlterClientQuotasRequest struct { + Entries []AlterClientQuotasEntry // The quota configuration entries to alter. + ValidateOnly bool // Whether the alteration should be validated, but not performed. +} + +type AlterClientQuotasEntry struct { + Entity []QuotaEntityComponent // The quota entity to alter. + Ops []ClientQuotasOp // An individual quota configuration entry to alter. +} + +type ClientQuotasOp struct { + Key string // The quota configuration key. + Value float64 // The value to set, otherwise ignored if the value is to be removed. + Remove bool // Whether the quota configuration value should be removed, otherwise set. +} + +func (a *AlterClientQuotasRequest) encode(pe packetEncoder) error { + // Entries + if err := pe.putArrayLength(len(a.Entries)); err != nil { + return err + } + for _, e := range a.Entries { + if err := e.encode(pe); err != nil { + return err + } + } + + // ValidateOnly + pe.putBool(a.ValidateOnly) + + return nil +} + +func (a *AlterClientQuotasRequest) decode(pd packetDecoder, version int16) error { + // Entries + entryCount, err := pd.getArrayLength() + if err != nil { + return err + } + if entryCount > 0 { + a.Entries = make([]AlterClientQuotasEntry, entryCount) + for i := range a.Entries { + e := AlterClientQuotasEntry{} + if err = e.decode(pd, version); err != nil { + return err + } + a.Entries[i] = e + } + } else { + a.Entries = []AlterClientQuotasEntry{} + } + + // ValidateOnly + validateOnly, err := pd.getBool() + if err != nil { + return err + } + a.ValidateOnly = validateOnly + + return nil +} + +func (a *AlterClientQuotasEntry) encode(pe packetEncoder) error { + // Entity + if err := pe.putArrayLength(len(a.Entity)); err != nil { + return err + } + for _, component := range a.Entity { + if err := component.encode(pe); err != nil { + return err + } + } + + // Ops + if err := pe.putArrayLength(len(a.Ops)); err != nil { + return err + } + for _, o := range a.Ops { + if err := o.encode(pe); err != nil { + return err + } + } + + return nil +} + +func (a *AlterClientQuotasEntry) decode(pd packetDecoder, version int16) error { + // Entity + componentCount, err := pd.getArrayLength() + if err != nil { + return err + } + if componentCount > 0 { + a.Entity = make([]QuotaEntityComponent, componentCount) + for i := 0; i < componentCount; i++ { + component := QuotaEntityComponent{} + if err := component.decode(pd, version); err != nil { + return err + } + a.Entity[i] = component + } + } else { + a.Entity = []QuotaEntityComponent{} + } + + // Ops + opCount, err := pd.getArrayLength() + if err != nil { + return err + } + if opCount > 0 { + a.Ops = make([]ClientQuotasOp, opCount) + for i := range a.Ops { + c := ClientQuotasOp{} + if err = c.decode(pd, version); err != nil { + return err + } + a.Ops[i] = c + } + } else { + a.Ops = []ClientQuotasOp{} + } + + return nil +} + +func (c *ClientQuotasOp) encode(pe packetEncoder) error { + // Key + if err := pe.putString(c.Key); err != nil { + return err + } + + // Value + pe.putFloat64(c.Value) + + // Remove + pe.putBool(c.Remove) + + return nil +} + +func (c *ClientQuotasOp) decode(pd packetDecoder, version int16) error { + // Key + key, err := pd.getString() + if err != nil { + return err + } + c.Key = key + + // Value + value, err := pd.getFloat64() + if err != nil { + return err + } + c.Value = value + + // Remove + remove, err := pd.getBool() + if err != nil { + return err + } + c.Remove = remove + + return nil +} + +func (a *AlterClientQuotasRequest) key() int16 { + return 49 +} + +func (a *AlterClientQuotasRequest) version() int16 { + return 0 +} + +func (a *AlterClientQuotasRequest) headerVersion() int16 { + return 1 +} + +func (a *AlterClientQuotasRequest) requiredVersion() KafkaVersion { + return V2_6_0_0 +} diff --git a/alter_client_quotas_request_test.go b/alter_client_quotas_request_test.go new file mode 100644 index 000000000..46f92b9cb --- /dev/null +++ b/alter_client_quotas_request_test.go @@ -0,0 +1,154 @@ +package sarama + +import "testing" + +var ( + alterClientQuotasRequestSingleOp = []byte{ + 0, 0, 0, 1, // entries len + 0, 0, 0, 1, // entity len + 0, 4, 'u', 's', 'e', 'r', // entity type + 255, 255, // entity value + 0, 0, 0, 1, // ops len + 0, 18, 'p', 'r', 'o', 'd', 'u', 'c', 'e', 'r', '_', 'b', 'y', 't', 'e', '_', 'r', 'a', 't', 'e', // op key + 65, 46, 132, 128, 0, 0, 0, 0, // op value (1000000) + 0, // remove + 0, // validate only + } + + alterClientQuotasRequestRemoveSingleOp = []byte{ + 0, 0, 0, 1, // entries len + 0, 0, 0, 1, // entity len + 0, 4, 'u', 's', 'e', 'r', // entity type + 255, 255, // entity value + 0, 0, 0, 1, // ops len + 0, 18, 'p', 'r', 'o', 'd', 'u', 'c', 'e', 'r', '_', 'b', 'y', 't', 'e', '_', 'r', 'a', 't', 'e', // op key + 0, 0, 0, 0, 0, 0, 0, 0, // op value (ignored) + 1, // remove + 1, // validate only + } + + alterClientQuotasRequestMultipleOps = []byte{ + 0, 0, 0, 1, // entries len + 0, 0, 0, 1, // entity len + 0, 4, 'u', 's', 'e', 'r', // entity type + 255, 255, // entity value + 0, 0, 0, 2, // ops len + 0, 18, 'p', 'r', 'o', 'd', 'u', 'c', 'e', 'r', '_', 'b', 'y', 't', 'e', '_', 'r', 'a', 't', 'e', // op key + 65, 46, 132, 128, 0, 0, 0, 0, // op value (1000000) + 0, // remove + 0, 18, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', '_', 'b', 'y', 't', 'e', '_', 'r', 'a', 't', 'e', // op key + 65, 46, 132, 128, 0, 0, 0, 0, // op value (1000000) + 0, // remove + 0, // validate only + } + + alterClientQuotasRequestMultipleQuotasEntries = []byte{ + 0, 0, 0, 2, // entries len + 0, 0, 0, 1, // entity len + 0, 4, 'u', 's', 'e', 'r', // entity type + 255, 255, // entity value + 0, 0, 0, 1, // ops len + 0, 18, 'p', 'r', 'o', 'd', 'u', 'c', 'e', 'r', '_', 'b', 'y', 't', 'e', '_', 'r', 'a', 't', 'e', // op key + 65, 46, 132, 128, 0, 0, 0, 0, // op value (1000000) + 0, // remove + 0, 0, 0, 1, // entity len + 0, 9, 'c', 'l', 'i', 'e', 'n', 't', '-', 'i', 'd', // entity type + 255, 255, // entity value + 0, 0, 0, 1, // ops len + 0, 18, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', '_', 'b', 'y', 't', 'e', '_', 'r', 'a', 't', 'e', // op key + 65, 46, 132, 128, 0, 0, 0, 0, // op value (1000000) + 0, // remove + 0, // validate only + } +) + +func TestAlterClientQuotasRequest(t *testing.T) { + // default user + defaultUserComponent := QuotaEntityComponent{ + EntityType: QuotaEntityUser, + MatchType: QuotaMatchDefault, + } + + // default client-id + defaultClientIDComponent := QuotaEntityComponent{ + EntityType: QuotaEntityClientID, + MatchType: QuotaMatchDefault, + } + + // Add Quota to default user + op := ClientQuotasOp{ + Key: "producer_byte_rate", + Value: 1000000, + Remove: false, + } + entry := AlterClientQuotasEntry{ + Entity: []QuotaEntityComponent{defaultUserComponent}, + Ops: []ClientQuotasOp{op}, + } + req := &AlterClientQuotasRequest{ + Entries: []AlterClientQuotasEntry{entry}, + ValidateOnly: false, + } + testRequest(t, "Add single Quota op", req, alterClientQuotasRequestSingleOp) + + // Remove Quota from default user + op = ClientQuotasOp{ + Key: "producer_byte_rate", + Remove: true, + } + entry = AlterClientQuotasEntry{ + Entity: []QuotaEntityComponent{defaultUserComponent}, + Ops: []ClientQuotasOp{op}, + } + req = &AlterClientQuotasRequest{ + Entries: []AlterClientQuotasEntry{entry}, + ValidateOnly: true, + } + testRequest(t, "Remove single Quota op", req, alterClientQuotasRequestRemoveSingleOp) + + // Add multiple Quotas ops + op1 := ClientQuotasOp{ + Key: "producer_byte_rate", + Value: 1000000, + Remove: false, + } + op2 := ClientQuotasOp{ + Key: "consumer_byte_rate", + Value: 1000000, + Remove: false, + } + entry = AlterClientQuotasEntry{ + Entity: []QuotaEntityComponent{defaultUserComponent}, + Ops: []ClientQuotasOp{op1, op2}, + } + req = &AlterClientQuotasRequest{ + Entries: []AlterClientQuotasEntry{entry}, + ValidateOnly: false, + } + testRequest(t, "Add multiple Quota ops", req, alterClientQuotasRequestMultipleOps) + + // Add multiple Quotas Entries + op1 = ClientQuotasOp{ + Key: "producer_byte_rate", + Value: 1000000, + Remove: false, + } + entry1 := AlterClientQuotasEntry{ + Entity: []QuotaEntityComponent{defaultUserComponent}, + Ops: []ClientQuotasOp{op1}, + } + op2 = ClientQuotasOp{ + Key: "consumer_byte_rate", + Value: 1000000, + Remove: false, + } + entry2 := AlterClientQuotasEntry{ + Entity: []QuotaEntityComponent{defaultClientIDComponent}, + Ops: []ClientQuotasOp{op2}, + } + req = &AlterClientQuotasRequest{ + Entries: []AlterClientQuotasEntry{entry1, entry2}, + ValidateOnly: false, + } + testRequest(t, "Add multiple Quotas Entries", req, alterClientQuotasRequestMultipleQuotasEntries) +} diff --git a/alter_client_quotas_response.go b/alter_client_quotas_response.go new file mode 100644 index 000000000..ccd27d5f5 --- /dev/null +++ b/alter_client_quotas_response.go @@ -0,0 +1,145 @@ +package sarama + +import ( + "time" +) + +// AlterClientQuotas Response (Version: 0) => throttle_time_ms [entries] +// throttle_time_ms => INT32 +// entries => error_code error_message [entity] +// error_code => INT16 +// error_message => NULLABLE_STRING +// entity => entity_type entity_name +// entity_type => STRING +// entity_name => NULLABLE_STRING + +type AlterClientQuotasResponse struct { + ThrottleTime time.Duration // The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota. + Entries []AlterClientQuotasEntryResponse // The quota configuration entries altered. +} + +type AlterClientQuotasEntryResponse struct { + ErrorCode KError // The error code, or `0` if the quota alteration succeeded. + ErrorMsg *string // The error message, or `null` if the quota alteration succeeded. + Entity []QuotaEntityComponent // The quota entity altered. +} + +func (a *AlterClientQuotasResponse) encode(pe packetEncoder) error { + // ThrottleTime + pe.putInt32(int32(a.ThrottleTime / time.Millisecond)) + + // Entries + if err := pe.putArrayLength(len(a.Entries)); err != nil { + return err + } + for _, e := range a.Entries { + if err := e.encode(pe); err != nil { + return err + } + } + + return nil +} + +func (a *AlterClientQuotasResponse) decode(pd packetDecoder, version int16) error { + // ThrottleTime + throttleTime, err := pd.getInt32() + if err != nil { + return err + } + a.ThrottleTime = time.Duration(throttleTime) * time.Millisecond + + // Entries + entryCount, err := pd.getArrayLength() + if err != nil { + return err + } + if entryCount > 0 { + a.Entries = make([]AlterClientQuotasEntryResponse, entryCount) + for i := range a.Entries { + e := AlterClientQuotasEntryResponse{} + if err = e.decode(pd, version); err != nil { + return err + } + a.Entries[i] = e + } + } else { + a.Entries = []AlterClientQuotasEntryResponse{} + } + + return nil +} + +func (a *AlterClientQuotasEntryResponse) encode(pe packetEncoder) error { + // ErrorCode + pe.putInt16(int16(a.ErrorCode)) + + // ErrorMsg + if err := pe.putNullableString(a.ErrorMsg); err != nil { + return err + } + + // Entity + if err := pe.putArrayLength(len(a.Entity)); err != nil { + return err + } + for _, component := range a.Entity { + if err := component.encode(pe); err != nil { + return err + } + } + + return nil +} + +func (a *AlterClientQuotasEntryResponse) decode(pd packetDecoder, version int16) error { + // ErrorCode + errCode, err := pd.getInt16() + if err != nil { + return err + } + a.ErrorCode = KError(errCode) + + // ErrorMsg + errMsg, err := pd.getNullableString() + if err != nil { + return err + } + a.ErrorMsg = errMsg + + // Entity + componentCount, err := pd.getArrayLength() + if err != nil { + return err + } + if componentCount > 0 { + a.Entity = make([]QuotaEntityComponent, componentCount) + for i := 0; i < componentCount; i++ { + component := QuotaEntityComponent{} + if err := component.decode(pd, version); err != nil { + return err + } + a.Entity[i] = component + } + } else { + a.Entity = []QuotaEntityComponent{} + } + + return nil +} + +func (a *AlterClientQuotasResponse) key() int16 { + return 49 +} + +func (a *AlterClientQuotasResponse) version() int16 { + return 0 +} + +func (a *AlterClientQuotasResponse) headerVersion() int16 { + return 0 +} + +func (a *AlterClientQuotasResponse) requiredVersion() KafkaVersion { + return V2_6_0_0 +} diff --git a/alter_client_quotas_response_test.go b/alter_client_quotas_response_test.go new file mode 100644 index 000000000..40efeb547 --- /dev/null +++ b/alter_client_quotas_response_test.go @@ -0,0 +1,94 @@ +package sarama + +import "testing" + +var ( + alterClientQuotasResponseError = []byte{ + 0, 0, 0, 0, // ThrottleTime + 0, 0, 0, 1, // Entries len + 0, 42, // ErrorCode (ErrInvalidRequest) + 0, 42, 'U', 'n', 'h', 'a', 'n', 'd', 'l', 'e', 'd', ' ', 'c', 'l', 'i', 'e', 'n', 't', ' ', 'q', 'u', 'o', 't', 'a', ' ', 'e', 'n', 't', 'i', 't', 'y', ' ', 't', 'y', 'p', 'e', ':', ' ', 'f', 'a', 'u', 'l', 't', 'y', // ErrorMsg + 0, 0, 0, 1, // Entity len + 0, 6, 'f', 'a', 'u', 'l', 't', 'y', // entityType + 255, 255, // entityName + } + + alterClientQuotasResponseSingleEntry = []byte{ + 0, 0, 0, 0, // ThrottleTime + 0, 0, 0, 1, // Entries len + 0, 0, // ErrorCode + 255, 255, // ErrorMsg + 0, 0, 0, 1, // Entity len + 0, 4, 'u', 's', 'e', 'r', // entityType + 255, 255, // entityName + } + + alterClientQuotasResponseMultipleEntries = []byte{ + 0, 0, 0, 0, // ThrottleTime + 0, 0, 0, 2, // Entries len + 0, 0, // ErrorCode + 255, 255, // ErrorMsg + 0, 0, 0, 1, // Entity len + 0, 4, 'u', 's', 'e', 'r', // entityType + 255, 255, // entityName + 0, 0, // ErrorCode + 255, 255, // ErrorMsg + 0, 0, 0, 1, // Entity len + 0, 9, 'c', 'l', 'i', 'e', 'n', 't', '-', 'i', 'd', // entityType + 255, 255, // entityName + } +) + +func TestAlterClientQuotasResponse(t *testing.T) { + // default user + defaultUserComponent := QuotaEntityComponent{ + EntityType: QuotaEntityUser, + MatchType: QuotaMatchDefault, + } + + // default client-id + defaultClientIDComponent := QuotaEntityComponent{ + EntityType: QuotaEntityClientID, + MatchType: QuotaMatchDefault, + } + + // Response with error + errMsg := "Unhandled client quota entity type: faulty" + faultEntityComponent := QuotaEntityComponent{ + EntityType: QuotaEntityType("faulty"), + MatchType: QuotaMatchDefault, + } + entry := AlterClientQuotasEntryResponse{ + ErrorCode: KError(42), + ErrorMsg: &errMsg, + Entity: []QuotaEntityComponent{faultEntityComponent}, + } + res := &AlterClientQuotasResponse{ + ThrottleTime: 0, + Entries: []AlterClientQuotasEntryResponse{entry}, + } + testResponse(t, "Response With Error", res, alterClientQuotasResponseError) + + // Response Altered single entry + entry = AlterClientQuotasEntryResponse{ + Entity: []QuotaEntityComponent{defaultUserComponent}, + } + res = &AlterClientQuotasResponse{ + ThrottleTime: 0, + Entries: []AlterClientQuotasEntryResponse{entry}, + } + testResponse(t, "Altered single entry", res, alterClientQuotasResponseSingleEntry) + + // Response Altered multiple entries + entry1 := AlterClientQuotasEntryResponse{ + Entity: []QuotaEntityComponent{defaultUserComponent}, + } + entry2 := AlterClientQuotasEntryResponse{ + Entity: []QuotaEntityComponent{defaultClientIDComponent}, + } + res = &AlterClientQuotasResponse{ + ThrottleTime: 0, + Entries: []AlterClientQuotasEntryResponse{entry1, entry2}, + } + testResponse(t, "Altered multiple entries", res, alterClientQuotasResponseMultipleEntries) +} diff --git a/broker.go b/broker.go index b78525f21..cddfa6463 100644 --- a/broker.go +++ b/broker.go @@ -735,6 +735,30 @@ func (b *Broker) AlterUserScramCredentials(req *AlterUserScramCredentialsRequest return res, nil } +// DescribeClientQuotas sends a request to get the broker's quotas +func (b *Broker) DescribeClientQuotas(request *DescribeClientQuotasRequest) (*DescribeClientQuotasResponse, error) { + response := new(DescribeClientQuotasResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + +// AlterClientQuotas sends a request to alter the broker's quotas +func (b *Broker) AlterClientQuotas(request *AlterClientQuotasRequest) (*AlterClientQuotasResponse, error) { + response := new(AlterClientQuotasResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + // readFull ensures the conn ReadDeadline has been setup before making a // call to io.ReadFull func (b *Broker) readFull(buf []byte) (n int, err error) { diff --git a/describe_client_quotas_request.go b/describe_client_quotas_request.go new file mode 100644 index 000000000..17a82051c --- /dev/null +++ b/describe_client_quotas_request.go @@ -0,0 +1,141 @@ +package sarama + +// DescribeClientQuotas Request (Version: 0) => [components] strict +// components => entity_type match_type match +// entity_type => STRING +// match_type => INT8 +// match => NULLABLE_STRING +// strict => BOOLEAN + +// A filter to be applied to matching client quotas. +// Components: the components to filter on +// Strict: whether the filter only includes specified components +type DescribeClientQuotasRequest struct { + Components []QuotaFilterComponent + Strict bool +} + +// Describe a component for applying a client quota filter. +// EntityType: the entity type the filter component applies to ("user", "client-id", "ip") +// MatchType: the match type of the filter component (any, exact, default) +// Match: the name that's matched exactly (used when MatchType is QuotaMatchExact) +type QuotaFilterComponent struct { + EntityType QuotaEntityType + MatchType QuotaMatchType + Match string +} + +func (d *DescribeClientQuotasRequest) encode(pe packetEncoder) error { + // Components + if err := pe.putArrayLength(len(d.Components)); err != nil { + return err + } + for _, c := range d.Components { + if err := c.encode(pe); err != nil { + return err + } + } + + // Strict + pe.putBool(d.Strict) + + return nil +} + +func (d *DescribeClientQuotasRequest) decode(pd packetDecoder, version int16) error { + // Components + componentCount, err := pd.getArrayLength() + if err != nil { + return err + } + if componentCount > 0 { + d.Components = make([]QuotaFilterComponent, componentCount) + for i := range d.Components { + c := QuotaFilterComponent{} + if err = c.decode(pd, version); err != nil { + return err + } + d.Components[i] = c + } + } else { + d.Components = []QuotaFilterComponent{} + } + + // Strict + strict, err := pd.getBool() + if err != nil { + return err + } + d.Strict = strict + + return nil +} + +func (d *QuotaFilterComponent) encode(pe packetEncoder) error { + // EntityType + if err := pe.putString(string(d.EntityType)); err != nil { + return err + } + + // MatchType + pe.putInt8(int8(d.MatchType)) + + // Match + if d.MatchType == QuotaMatchAny { + if err := pe.putNullableString(nil); err != nil { + return err + } + } else if d.MatchType == QuotaMatchDefault { + if err := pe.putString(""); err != nil { + return err + } + } else { + if err := pe.putString(d.Match); err != nil { + return err + } + } + + return nil +} + +func (d *QuotaFilterComponent) decode(pd packetDecoder, version int16) error { + // EntityType + entityType, err := pd.getString() + if err != nil { + return err + } + d.EntityType = QuotaEntityType(entityType) + + // MatchType + matchType, err := pd.getInt8() + if err != nil { + return err + } + d.MatchType = QuotaMatchType(matchType) + + // Match + match, err := pd.getNullableString() + if err != nil { + return err + } + if match != nil { + d.Match = *match + } + return nil +} + +func (d *DescribeClientQuotasRequest) key() int16 { + return 48 +} + +func (d *DescribeClientQuotasRequest) version() int16 { + return 0 +} + +func (d *DescribeClientQuotasRequest) headerVersion() int16 { + return 1 +} + +func (d *DescribeClientQuotasRequest) requiredVersion() KafkaVersion { + return V2_6_0_0 +} diff --git a/describe_client_quotas_request_test.go b/describe_client_quotas_request_test.go new file mode 100644 index 000000000..6b8c0c9c5 --- /dev/null +++ b/describe_client_quotas_request_test.go @@ -0,0 +1,84 @@ +package sarama + +import "testing" + +var ( + describeClientQuotasRequestAll = []byte{ + 0, 0, 0, 0, // components len + 0, // strict + } + + describeClientQuotasRequestDefaultUser = []byte{ + 0, 0, 0, 1, // components len + 0, 4, 'u', 's', 'e', 'r', // entity type + 1, // match type (default) + 0, 0, // match *string + 0, // strict + } + + describeClientQuotasRequestOnlySpecificUser = []byte{ + 0, 0, 0, 1, // components len + 0, 4, 'u', 's', 'e', 'r', // entity type + 0, // match type (exact) + 0, 6, 's', 'a', 'r', 'a', 'm', 'a', // match *string + 1, // strict + } + + describeClientQuotasRequestMultiComponents = []byte{ + 0, 0, 0, 2, // components len + 0, 4, 'u', 's', 'e', 'r', // entity type + 2, // match type (any) + 255, 255, // match *string + 0, 9, 'c', 'l', 'i', 'e', 'n', 't', '-', 'i', 'd', // entity type + 1, // match type (default) + 0, 0, // match *string + 0, // strict + } +) + +func TestDescribeClientQuotasRequest(t *testing.T) { + // Match All + req := &DescribeClientQuotasRequest{ + Components: []QuotaFilterComponent{}, + Strict: false, + } + testRequest(t, "Match All", req, describeClientQuotasRequestAll) + + // Match Default User + defaultUser := QuotaFilterComponent{ + EntityType: QuotaEntityUser, + MatchType: QuotaMatchDefault, + } + req = &DescribeClientQuotasRequest{ + Components: []QuotaFilterComponent{defaultUser}, + Strict: false, + } + testRequest(t, "Match Default User", req, describeClientQuotasRequestDefaultUser) + + // Match Only Specific User + specificUser := QuotaFilterComponent{ + EntityType: QuotaEntityUser, + MatchType: QuotaMatchExact, + Match: "sarama", + } + req = &DescribeClientQuotasRequest{ + Components: []QuotaFilterComponent{specificUser}, + Strict: true, + } + testRequest(t, "Match Only Specific User", req, describeClientQuotasRequestOnlySpecificUser) + + // Match default client-id of any user + anyUser := QuotaFilterComponent{ + EntityType: QuotaEntityUser, + MatchType: QuotaMatchAny, + } + defaultClientId := QuotaFilterComponent{ + EntityType: QuotaEntityClientID, + MatchType: QuotaMatchDefault, + } + req = &DescribeClientQuotasRequest{ + Components: []QuotaFilterComponent{anyUser, defaultClientId}, + Strict: false, + } + testRequest(t, "Match default client-id of any user", req, describeClientQuotasRequestMultiComponents) +} diff --git a/describe_client_quotas_response.go b/describe_client_quotas_response.go new file mode 100644 index 000000000..555da0c48 --- /dev/null +++ b/describe_client_quotas_response.go @@ -0,0 +1,235 @@ +package sarama + +import ( + "time" +) + +// DescribeClientQuotas Response (Version: 0) => throttle_time_ms error_code error_message [entries] +// throttle_time_ms => INT32 +// error_code => INT16 +// error_message => NULLABLE_STRING +// entries => [entity] [values] +// entity => entity_type entity_name +// entity_type => STRING +// entity_name => NULLABLE_STRING +// values => key value +// key => STRING +// value => FLOAT64 + +type DescribeClientQuotasResponse struct { + ThrottleTime time.Duration // The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota. + ErrorCode KError // The error code, or `0` if the quota description succeeded. + ErrorMsg *string // The error message, or `null` if the quota description succeeded. + Entries []DescribeClientQuotasEntry // A result entry. +} + +type DescribeClientQuotasEntry struct { + Entity []QuotaEntityComponent // The quota entity description. + Values map[string]float64 // The quota values for the entity. +} + +type QuotaEntityComponent struct { + EntityType QuotaEntityType + MatchType QuotaMatchType + Name string +} + +func (d *DescribeClientQuotasResponse) encode(pe packetEncoder) error { + // ThrottleTime + pe.putInt32(int32(d.ThrottleTime / time.Millisecond)) + + // ErrorCode + pe.putInt16(int16(d.ErrorCode)) + + // ErrorMsg + if err := pe.putNullableString(d.ErrorMsg); err != nil { + return err + } + + // Entries + if err := pe.putArrayLength(len(d.Entries)); err != nil { + return err + } + for _, e := range d.Entries { + if err := e.encode(pe); err != nil { + return err + } + } + + return nil +} + +func (d *DescribeClientQuotasResponse) decode(pd packetDecoder, version int16) error { + // ThrottleTime + throttleTime, err := pd.getInt32() + if err != nil { + return err + } + d.ThrottleTime = time.Duration(throttleTime) * time.Millisecond + + // ErrorCode + errCode, err := pd.getInt16() + if err != nil { + return err + } + d.ErrorCode = KError(errCode) + + // ErrorMsg + errMsg, err := pd.getNullableString() + if err != nil { + return err + } + d.ErrorMsg = errMsg + + // Entries + entryCount, err := pd.getArrayLength() + if err != nil { + return err + } + if entryCount > 0 { + d.Entries = make([]DescribeClientQuotasEntry, entryCount) + for i := range d.Entries { + e := DescribeClientQuotasEntry{} + if err = e.decode(pd, version); err != nil { + return err + } + d.Entries[i] = e + } + } else { + d.Entries = []DescribeClientQuotasEntry{} + } + + return nil +} + +func (d *DescribeClientQuotasEntry) encode(pe packetEncoder) error { + // Entity + if err := pe.putArrayLength(len(d.Entity)); err != nil { + return err + } + for _, e := range d.Entity { + if err := e.encode(pe); err != nil { + return err + } + } + + // Values + if err := pe.putArrayLength(len(d.Values)); err != nil { + return err + } + for key, value := range d.Values { + // key + if err := pe.putString(key); err != nil { + return err + } + // value + pe.putFloat64(value) + } + + return nil +} + +func (d *DescribeClientQuotasEntry) decode(pd packetDecoder, version int16) error { + // Entity + componentCount, err := pd.getArrayLength() + if err != nil { + return err + } + if componentCount > 0 { + d.Entity = make([]QuotaEntityComponent, componentCount) + for i := 0; i < componentCount; i++ { + component := QuotaEntityComponent{} + if err := component.decode(pd, version); err != nil { + return err + } + d.Entity[i] = component + } + } else { + d.Entity = []QuotaEntityComponent{} + } + + // Values + valueCount, err := pd.getArrayLength() + if err != nil { + return err + } + if valueCount > 0 { + d.Values = make(map[string]float64, valueCount) + for i := 0; i < valueCount; i++ { + // key + key, err := pd.getString() + if err != nil { + return err + } + // value + value, err := pd.getFloat64() + if err != nil { + return err + } + d.Values[key] = value + } + } else { + d.Values = map[string]float64{} + } + + return nil +} + +func (c *QuotaEntityComponent) encode(pe packetEncoder) error { + // entity_type + if err := pe.putString(string(c.EntityType)); err != nil { + return err + } + // entity_name + if c.MatchType == QuotaMatchDefault { + if err := pe.putNullableString(nil); err != nil { + return err + } + } else { + if err := pe.putString(c.Name); err != nil { + return err + } + } + + return nil +} + +func (c *QuotaEntityComponent) decode(pd packetDecoder, version int16) error { + // entity_type + entityType, err := pd.getString() + if err != nil { + return err + } + c.EntityType = QuotaEntityType(entityType) + + // entity_name + entityName, err := pd.getNullableString() + if err != nil { + return err + } + + if entityName == nil { + c.MatchType = QuotaMatchDefault + } else { + c.MatchType = QuotaMatchExact + c.Name = *entityName + } + + return nil +} + +func (d *DescribeClientQuotasResponse) key() int16 { + return 48 +} + +func (d *DescribeClientQuotasResponse) version() int16 { + return 0 +} + +func (d *DescribeClientQuotasResponse) headerVersion() int16 { + return 0 +} + +func (d *DescribeClientQuotasResponse) requiredVersion() KafkaVersion { + return V2_6_0_0 +} diff --git a/describe_client_quotas_response_test.go b/describe_client_quotas_response_test.go new file mode 100644 index 000000000..6c681d11c --- /dev/null +++ b/describe_client_quotas_response_test.go @@ -0,0 +1,95 @@ +package sarama + +import "testing" + +var ( + describeClientQuotasResponseError = []byte{ + 0, 0, 0, 0, // ThrottleTime + 0, 35, // ErrorCode + 0, 41, 'C', 'u', 's', 't', 'o', 'm', ' ', 'e', 'n', 't', 'i', 't', 'y', ' ', 't', 'y', 'p', 'e', ' ', '\'', 'f', 'a', 'u', 'l', 't', 'y', '\'', ' ', 'n', 'o', 't', ' ', 's', 'u', 'p', 'p', 'o', 'r', 't', 'e', 'd', + 0, 0, 0, 0, // Entries + } + + describeClientQuotasResponseSingleValue = []byte{ + 0, 0, 0, 0, // ThrottleTime + 0, 0, // ErrorCode + 255, 255, // ErrorMsg (nil) + 0, 0, 0, 1, // Entries + 0, 0, 0, 1, // Entity + 0, 4, 'u', 's', 'e', 'r', // Entity type + 255, 255, // Entity name (nil) + 0, 0, 0, 1, // Values + 0, 18, 'p', 'r', 'o', 'd', 'u', 'c', 'e', 'r', '_', 'b', 'y', 't', 'e', '_', 'r', 'a', 't', 'e', + 65, 46, 132, 128, 0, 0, 0, 0, // 1000000 + } + + describeClientQuotasResponseComplexEntity = []byte{ + 0, 0, 0, 0, // ThrottleTime + 0, 0, // ErrorCode + 255, 255, // ErrorMsg (nil) + 0, 0, 0, 2, // Entries + 0, 0, 0, 1, // Entity + 0, 4, 'u', 's', 'e', 'r', // Entity type + 255, 255, // Entity name (nil) + 0, 0, 0, 1, // Values + 0, 18, 'p', 'r', 'o', 'd', 'u', 'c', 'e', 'r', '_', 'b', 'y', 't', 'e', '_', 'r', 'a', 't', 'e', + 65, 46, 132, 128, 0, 0, 0, 0, // 1000000 + 0, 0, 0, 1, // Entity + 0, 9, 'c', 'l', 'i', 'e', 'n', 't', '-', 'i', 'd', // Entity type + 0, 6, 's', 'a', 'r', 'a', 'm', 'a', // Entity name + 0, 0, 0, 1, // Values + 0, 18, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', '_', 'b', 'y', 't', 'e', '_', 'r', 'a', 't', 'e', + 65, 46, 132, 128, 0, 0, 0, 0, // 1000000 + } +) + +func TestDescribeClientQuotasResponse(t *testing.T) { + // Response With Error + errMsg := "Custom entity type 'faulty' not supported" + res := &DescribeClientQuotasResponse{ + ThrottleTime: 0, + ErrorCode: ErrUnsupportedVersion, + ErrorMsg: &errMsg, + Entries: []DescribeClientQuotasEntry{}, + } + testResponse(t, "Response With Error", res, describeClientQuotasResponseError) + + // Single Quota entry + defaultUserComponent := QuotaEntityComponent{ + EntityType: QuotaEntityUser, + MatchType: QuotaMatchDefault, + } + entry := DescribeClientQuotasEntry{ + Entity: []QuotaEntityComponent{defaultUserComponent}, + Values: map[string]float64{"producer_byte_rate": 1000000}, + } + res = &DescribeClientQuotasResponse{ + ThrottleTime: 0, + ErrorCode: ErrNoError, + ErrorMsg: nil, + Entries: []DescribeClientQuotasEntry{entry}, + } + testResponse(t, "Single Value", res, describeClientQuotasResponseSingleValue) + + // Complex Quota entry + saramaClientIDComponent := QuotaEntityComponent{ + EntityType: QuotaEntityClientID, + MatchType: QuotaMatchExact, + Name: "sarama", + } + userEntry := DescribeClientQuotasEntry{ + Entity: []QuotaEntityComponent{defaultUserComponent}, + Values: map[string]float64{"producer_byte_rate": 1000000}, + } + clientEntry := DescribeClientQuotasEntry{ + Entity: []QuotaEntityComponent{saramaClientIDComponent}, + Values: map[string]float64{"consumer_byte_rate": 1000000}, + } + res = &DescribeClientQuotasResponse{ + ThrottleTime: 0, + ErrorCode: ErrNoError, + ErrorMsg: nil, + Entries: []DescribeClientQuotasEntry{userEntry, clientEntry}, + } + testResponse(t, "Complex Quota", res, describeClientQuotasResponseComplexEntity) +} diff --git a/packet_decoder.go b/packet_decoder.go index 184bc26ae..08b433223 100644 --- a/packet_decoder.go +++ b/packet_decoder.go @@ -11,6 +11,7 @@ type packetDecoder interface { getInt64() (int64, error) getVarint() (int64, error) getUVarint() (uint64, error) + getFloat64() (float64, error) getArrayLength() (int, error) getCompactArrayLength() (int, error) getBool() (bool, error) diff --git a/packet_encoder.go b/packet_encoder.go index aea53ca83..5016e09a6 100644 --- a/packet_encoder.go +++ b/packet_encoder.go @@ -13,6 +13,7 @@ type packetEncoder interface { putInt64(in int64) putVarint(in int64) putUVarint(in uint64) + putFloat64(in float64) putCompactArrayLength(in int) putArrayLength(in int) error putBool(in bool) diff --git a/prep_encoder.go b/prep_encoder.go index 0d0137487..1602fcb3f 100644 --- a/prep_encoder.go +++ b/prep_encoder.go @@ -42,6 +42,10 @@ func (pe *prepEncoder) putUVarint(in uint64) { pe.length += binary.PutUvarint(buf[:], in) } +func (pe *prepEncoder) putFloat64(in float64) { + pe.length += 8 +} + func (pe *prepEncoder) putArrayLength(in int) error { if in > math.MaxInt32 { return PacketEncodingError{fmt.Sprintf("array too long (%d)", in)} diff --git a/quota_types.go b/quota_types.go new file mode 100644 index 000000000..4f33af0bc --- /dev/null +++ b/quota_types.go @@ -0,0 +1,21 @@ +package sarama + +type ( + QuotaEntityType string + + QuotaMatchType int +) + +// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaEntity.java +const ( + QuotaEntityUser QuotaEntityType = "user" + QuotaEntityClientID QuotaEntityType = "client-id" + QuotaEntityIP QuotaEntityType = "ip" +) + +// ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasRequest.java +const ( + QuotaMatchExact QuotaMatchType = iota + QuotaMatchDefault + QuotaMatchAny +) diff --git a/real_decoder.go b/real_decoder.go index 2482c6377..724e799d7 100644 --- a/real_decoder.go +++ b/real_decoder.go @@ -93,6 +93,16 @@ func (rd *realDecoder) getUVarint() (uint64, error) { return tmp, nil } +func (rd *realDecoder) getFloat64() (float64, error) { + if rd.remaining() < 8 { + rd.off = len(rd.raw) + return -1, ErrInsufficientData + } + tmp := math.Float64frombits(binary.BigEndian.Uint64(rd.raw[rd.off:])) + rd.off += 8 + return tmp, nil +} + func (rd *realDecoder) getArrayLength() (int, error) { if rd.remaining() < 4 { rd.off = len(rd.raw) diff --git a/real_encoder.go b/real_encoder.go index c07204cbc..d6a0ddf12 100644 --- a/real_encoder.go +++ b/real_encoder.go @@ -3,6 +3,7 @@ package sarama import ( "encoding/binary" "errors" + "math" "github.com/rcrowley/go-metrics" ) @@ -44,6 +45,11 @@ func (re *realEncoder) putUVarint(in uint64) { re.off += binary.PutUvarint(re.raw[re.off:], in) } +func (re *realEncoder) putFloat64(in float64) { + binary.BigEndian.PutUint64(re.raw[re.off:], math.Float64bits(in)) + re.off += 8 +} + func (re *realEncoder) putArrayLength(in int) error { re.putInt32(int32(in)) return nil diff --git a/request.go b/request.go index 08c2b6741..f6c27c107 100644 --- a/request.go +++ b/request.go @@ -190,6 +190,10 @@ func allocateBody(key, version int16) protocolBody { return &ListPartitionReassignmentsRequest{} case 47: return &DeleteOffsetsRequest{} + case 48: + return &DescribeClientQuotasRequest{} + case 49: + return &AlterClientQuotasRequest{} case 50: return &DescribeUserScramCredentialsRequest{} case 51: