diff --git a/consumer_group.go b/consumer_group.go index 1d1ce0fe2..bb6a2c2b9 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -33,7 +33,7 @@ type ConsumerGroup interface { // to allow the user to perform any final tasks before a rebalance. // 6. Finally, marked offsets are committed one last time before claims are released. // - // Please note, that once a relance is triggered, sessions must be completed within + // Please note, that once a rebalance is triggered, sessions must be completed within // Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit // as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout // is exceeded, the consumer will be removed from the group by Kafka, which will cause offset @@ -267,7 +267,7 @@ func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, top } } - return newConsumerGroupSession(c, ctx, claims, join.MemberId, join.GenerationId, handler) + return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler) } func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (*JoinGroupResponse, error) { @@ -456,7 +456,7 @@ type consumerGroupSession struct { hbDying, hbDead chan none } -func newConsumerGroupSession(parent *consumerGroup, ctx context.Context, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) { +func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) { // init offset manager offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client) if err != nil { @@ -595,7 +595,7 @@ func (s *consumerGroupSession) consume(topic string, partition int32) { s.parent.handleError(err, topic, partition) } - // ensure consumer is clased & drained + // ensure consumer is closed & drained claim.AsyncClose() for _, err := range claim.waitClosed() { s.parent.handleError(err, topic, partition) @@ -691,7 +691,7 @@ type ConsumerGroupHandler interface { // Setup is run at the beginning of a new session, before ConsumeClaim. Setup(ConsumerGroupSession) error - // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exites + // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited // but before the offsets are committed for the very last time. Cleanup(ConsumerGroupSession) error diff --git a/consumer_group_members.go b/consumer_group_members.go index 9d92d350a..2d02cc386 100644 --- a/consumer_group_members.go +++ b/consumer_group_members.go @@ -1,5 +1,6 @@ package sarama +//ConsumerGroupMemberMetadata holds the metadata for consumer group type ConsumerGroupMemberMetadata struct { Version int16 Topics []string @@ -36,6 +37,7 @@ func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) { return nil } +//ConsumerGroupMemberAssignment holds the member assignment for a consume group type ConsumerGroupMemberAssignment struct { Version int16 Topics map[string][]int32 diff --git a/consumer_metadata_request.go b/consumer_metadata_request.go index 4de45e7bf..a8dcaefe8 100644 --- a/consumer_metadata_request.go +++ b/consumer_metadata_request.go @@ -1,5 +1,6 @@ package sarama +//ConsumerMetadataRequest is used for metadata requests type ConsumerMetadataRequest struct { ConsumerGroup string } diff --git a/consumer_metadata_response.go b/consumer_metadata_response.go index 442cbde7a..4d86e9303 100644 --- a/consumer_metadata_response.go +++ b/consumer_metadata_response.go @@ -5,6 +5,7 @@ import ( "strconv" ) +//ConsumerMetadataResponse holds the reponse for a consumer gorup meta data request type ConsumerMetadataResponse struct { Err KError Coordinator *Broker