Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

extend throttling metric scope #2533

Merged
merged 5 commits into from
Jul 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions acl_create_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (c *CreateAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}

func (r *CreateAclsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}

// AclCreationResponse is an acl creation response type
type AclCreationResponse struct {
Err KError
Expand Down
4 changes: 4 additions & 0 deletions acl_delete_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func (d *DeleteAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}

func (r *DeleteAclsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}

// FilterResponse is a filter response type
type FilterResponse struct {
Err KError
Expand Down
4 changes: 4 additions & 0 deletions acl_describe_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,7 @@ func (d *DescribeAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
}

func (r *DescribeAclsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
4 changes: 4 additions & 0 deletions add_offsets_to_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,7 @@ func (a *AddOffsetsToTxnResponse) headerVersion() int16 {
func (a *AddOffsetsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}

func (r *AddOffsetsToTxnResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
4 changes: 4 additions & 0 deletions add_partitions_to_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}

func (r *AddPartitionsToTxnResponse) throttleTime() time.Duration {
return r.ThrottleTime
}

// PartitionError is a partition error type
type PartitionError struct {
Partition int32
Expand Down
4 changes: 4 additions & 0 deletions alter_client_quotas_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,7 @@ func (a *AlterClientQuotasResponse) headerVersion() int16 {
func (a *AlterClientQuotasResponse) requiredVersion() KafkaVersion {
return V2_6_0_0
}

func (r *AlterClientQuotasResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
4 changes: 4 additions & 0 deletions alter_configs_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,7 @@ func (a *AlterConfigsResponse) headerVersion() int16 {
func (a *AlterConfigsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}

func (r *AlterConfigsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
6 changes: 6 additions & 0 deletions alter_partition_reassignments_response.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "time"

type alterPartitionReassignmentsErrorBlock struct {
errorCode KError
errorMessage *string
Expand Down Expand Up @@ -155,3 +157,7 @@ func (r *AlterPartitionReassignmentsResponse) headerVersion() int16 {
func (r *AlterPartitionReassignmentsResponse) requiredVersion() KafkaVersion {
return V2_4_0_0
}

func (r *AlterPartitionReassignmentsResponse) throttleTime() time.Duration {
return time.Duration(r.ThrottleTimeMs) * time.Millisecond
}
4 changes: 4 additions & 0 deletions alter_user_scram_credentials_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,7 @@ func (r *AlterUserScramCredentialsResponse) headerVersion() int16 {
func (r *AlterUserScramCredentialsResponse) requiredVersion() KafkaVersion {
return V2_7_0_0
}

func (r *AlterUserScramCredentialsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
6 changes: 6 additions & 0 deletions api_versions_response.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "time"

// ApiVersionsResponseKey contains the APIs supported by the broker.
type ApiVersionsResponseKey struct {
// Version defines the protocol version to use for encode and decode
Expand Down Expand Up @@ -154,3 +156,7 @@ func (r *ApiVersionsResponse) requiredVersion() KafkaVersion {
return V0_10_0_0
}
}

func (r *ApiVersionsResponse) throttleTime() time.Duration {
return time.Duration(r.ThrottleTimeMs) * time.Millisecond
}
41 changes: 28 additions & 13 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error
}

// Well-formed response
b.updateThrottleMetric(res.ThrottleTime)
b.updateThrottleMetric(res)
cb(res, nil)
},
}
Expand All @@ -479,7 +479,6 @@ func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
} else {
response = new(ProduceResponse)
err = b.sendAndReceive(request, response)
b.updateThrottleMetric(response.ThrottleTime)
}

if err != nil {
Expand Down Expand Up @@ -944,7 +943,7 @@ func (b *Broker) write(buf []byte) (n int, err error) {
return b.conn.Write(buf)
}

// b.lock must be haled by caller
// b.lock must be held by caller
func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
var promise *responsePromise
if promiseResponse {
Expand Down Expand Up @@ -1042,7 +1041,14 @@ func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
return nil
}

return handleResponsePromise(req, res, promise, b.metricRegistry)
err = handleResponsePromise(req, res, promise, b.metricRegistry)
if err != nil {
return err
}
if res != nil {
b.updateThrottleMetric(res)
}
return nil
}

func handleResponsePromise(req protocolBody, res protocolBody, promise *responsePromise, metricRegistry metrics.Registry) error {
Expand Down Expand Up @@ -1635,15 +1641,24 @@ func (b *Broker) updateProtocolMetrics(rb protocolBody) {
}
}

func (b *Broker) updateThrottleMetric(throttleTime time.Duration) {
if throttleTime != time.Duration(0) {
DebugLogger.Printf(
"producer/broker/%d ProduceResponse throttled %v\n",
b.ID(), throttleTime)
if b.brokerThrottleTime != nil {
throttleTimeInMs := int64(throttleTime / time.Millisecond)
b.brokerThrottleTime.Update(throttleTimeInMs)
}
type throttleSupport interface {
throttleTime() time.Duration
}

func (b *Broker) updateThrottleMetric(resp protocolBody) {
throttledResponse, ok := resp.(throttleSupport)
if !ok {
return
}
throttleTime := throttledResponse.throttleTime()
if throttleTime == time.Duration(0) {
return
}
DebugLogger.Printf(
"broker/%d %T throttled %v\n", b.ID(), resp, throttleTime)

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information

[Sensitive data returned by an access to ApiKey](1) flows to a logging call. [Sensitive data returned by an access to ApiKeys](2) flows to a logging call.
if b.brokerThrottleTime != nil {
throttleTimeInMs := int64(throttleTime / time.Millisecond)
b.brokerThrottleTime.Update(throttleTimeInMs)
}
}

Expand Down
4 changes: 4 additions & 0 deletions create_partitions_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ func (r *CreatePartitionsResponse) requiredVersion() KafkaVersion {
return V1_0_0_0
}

func (r *CreatePartitionsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}

type TopicPartitionError struct {
Err KError
ErrMsg *string
Expand Down
4 changes: 4 additions & 0 deletions create_topics_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func (c *CreateTopicsResponse) requiredVersion() KafkaVersion {
}
}

func (r *CreateTopicsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}

type TopicError struct {
Err KError
ErrMsg *string
Expand Down
4 changes: 4 additions & 0 deletions delete_groups_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,7 @@ func (r *DeleteGroupsResponse) headerVersion() int16 {
func (r *DeleteGroupsResponse) requiredVersion() KafkaVersion {
return V1_1_0_0
}

func (r *DeleteGroupsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
4 changes: 4 additions & 0 deletions delete_offsets_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,7 @@ func (r *DeleteOffsetsResponse) headerVersion() int16 {
func (r *DeleteOffsetsResponse) requiredVersion() KafkaVersion {
return V2_4_0_0
}

func (r *DeleteOffsetsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
4 changes: 4 additions & 0 deletions delete_records_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func (d *DeleteRecordsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}

func (r *DeleteRecordsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}

type DeleteRecordsResponseTopic struct {
Partitions map[int32]*DeleteRecordsResponsePartition
}
Expand Down
4 changes: 4 additions & 0 deletions delete_topics_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,7 @@ func (d *DeleteTopicsResponse) requiredVersion() KafkaVersion {
return V0_10_1_0
}
}

func (r *DeleteTopicsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
4 changes: 4 additions & 0 deletions describe_client_quotas_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,7 @@ func (d *DescribeClientQuotasResponse) headerVersion() int16 {
func (d *DescribeClientQuotasResponse) requiredVersion() KafkaVersion {
return V2_6_0_0
}

func (r *DescribeClientQuotasResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
4 changes: 4 additions & 0 deletions describe_configs_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion {
}
}

func (r *DescribeConfigsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}

func (r *ResourceResponse) encode(pe packetEncoder, version int16) (err error) {
pe.putInt16(r.ErrorCode)

Expand Down
6 changes: 6 additions & 0 deletions describe_groups_response.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "time"

type DescribeGroupsResponse struct {
// Version defines the protocol version to use for encode and decode
Version int16
Expand Down Expand Up @@ -77,6 +79,10 @@ func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion {
return V0_9_0_0
}

func (r *DescribeGroupsResponse) throttleTime() time.Duration {
return time.Duration(r.ThrottleTimeMs) * time.Millisecond
}

// GroupDescription contains each described group.
type GroupDescription struct {
// Version defines the protocol version to use for encode and decode
Expand Down
4 changes: 4 additions & 0 deletions describe_log_dirs_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func (r *DescribeLogDirsResponse) requiredVersion() KafkaVersion {
return V1_0_0_0
}

func (r *DescribeLogDirsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}

type DescribeLogDirsResponseDirMetadata struct {
ErrorCode KError

Expand Down
4 changes: 4 additions & 0 deletions describe_user_scram_credentials_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,7 @@ func (r *DescribeUserScramCredentialsResponse) headerVersion() int16 {
func (r *DescribeUserScramCredentialsResponse) requiredVersion() KafkaVersion {
return V2_7_0_0
}

func (r *DescribeUserScramCredentialsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
4 changes: 4 additions & 0 deletions end_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ func (r *EndTxnResponse) headerVersion() int16 {
func (e *EndTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}

func (r *EndTxnResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
4 changes: 4 additions & 0 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,10 @@ func (r *FetchResponse) requiredVersion() KafkaVersion {
}
}

func (r *FetchResponse) throttleTime() time.Duration {
return r.ThrottleTime
}

func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
if r.Blocks == nil {
return nil
Expand Down
4 changes: 4 additions & 0 deletions find_coordinator_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,7 @@ func (f *FindCoordinatorResponse) requiredVersion() KafkaVersion {
return V0_8_2_0
}
}

func (r *FindCoordinatorResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
6 changes: 6 additions & 0 deletions heartbeat_response.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "time"

type HeartbeatResponse struct {
Version int16
ThrottleTime int32
Expand Down Expand Up @@ -50,3 +52,7 @@ func (r *HeartbeatResponse) requiredVersion() KafkaVersion {
}
return V0_9_0_0
}

func (r *HeartbeatResponse) throttleTime() time.Duration {
return time.Duration(r.ThrottleTime) * time.Millisecond
}
4 changes: 4 additions & 0 deletions incremental_alter_configs_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,7 @@ func (a *IncrementalAlterConfigsResponse) headerVersion() int16 {
func (a *IncrementalAlterConfigsResponse) requiredVersion() KafkaVersion {
return V2_3_0_0
}

func (r *IncrementalAlterConfigsResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
4 changes: 4 additions & 0 deletions init_producer_id_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,7 @@ func (i *InitProducerIDResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
}

func (r *InitProducerIDResponse) throttleTime() time.Duration {
return r.ThrottleTime
}
6 changes: 6 additions & 0 deletions join_group_response.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "time"

type JoinGroupResponse struct {
Version int16
ThrottleTime int32
Expand Down Expand Up @@ -157,3 +159,7 @@ func (r *JoinGroupResponse) requiredVersion() KafkaVersion {
return V0_9_0_0
}
}

func (r *JoinGroupResponse) throttleTime() time.Duration {
return time.Duration(r.ThrottleTime) * time.Millisecond
}
6 changes: 6 additions & 0 deletions leave_group_response.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sarama

import "time"

type MemberResponse struct {
MemberId string
GroupInstanceId *string
Expand Down Expand Up @@ -90,3 +92,7 @@ func (r *LeaveGroupResponse) requiredVersion() KafkaVersion {
}
return V0_9_0_0
}

func (r *LeaveGroupResponse) throttleTime() time.Duration {
return time.Duration(r.ThrottleTime) * time.Millisecond
}
Loading