-
Notifications
You must be signed in to change notification settings - Fork 800
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add DeleteGroups function to Client #1095
Merged
Merged
Changes from 4 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
package kafka | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net" | ||
"time" | ||
|
||
"github.com/segmentio/kafka-go/protocol/deletegroups" | ||
) | ||
|
||
// DeleteGroupsRequest represents a request sent to a kafka broker to delete | ||
// consumer groups. | ||
type DeleteGroupsRequest struct { | ||
// Address of the kafka broker to send the request to. | ||
Addr net.Addr | ||
|
||
// Identifiers of groups to delete. | ||
GroupIDs []string | ||
} | ||
|
||
// DeleteGroupsResponse represents a response from a kafka broker to a consumer group | ||
// deletion request. | ||
type DeleteGroupsResponse struct { | ||
// The amount of time that the broker throttled the request. | ||
Throttle time.Duration | ||
|
||
// Mapping of group ids to errors that occurred while attempting to delete those groups. | ||
// | ||
// The errors contain the kafka error code. Programs may use the standard | ||
// errors.Is function to test the error against kafka error codes. | ||
Errors map[string]error | ||
} | ||
|
||
// DeleteGroups sends a delete groups request and returns the response. The request is sent to the group coordinator of the first group | ||
// of the request. All deleted groups must be managed by the same group coordinator. | ||
func (c *Client) DeleteGroups( | ||
ctx context.Context, | ||
req *DeleteGroupsRequest, | ||
) (*DeleteGroupsResponse, error) { | ||
m, err := c.roundTrip(ctx, req.Addr, &deletegroups.Request{ | ||
GroupIDs: req.GroupIDs, | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("kafka.(*Client).DeleteGroups: %w", err) | ||
} | ||
|
||
r := m.(*deletegroups.Response) | ||
|
||
ret := &DeleteGroupsResponse{ | ||
Throttle: makeDuration(r.ThrottleTimeMs), | ||
Errors: make(map[string]error, len(r.Responses)), | ||
} | ||
|
||
for _, t := range r.Responses { | ||
if t.ErrorCode == 0 { | ||
ret.Errors[t.GroupID] = nil | ||
} else { | ||
ret.Errors[t.GroupID] = Error(t.ErrorCode) | ||
} | ||
} | ||
|
||
return ret, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
package kafka | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"testing" | ||
"time" | ||
|
||
ktesting "github.com/segmentio/kafka-go/testing" | ||
) | ||
|
||
func TestClientDeleteGroups(t *testing.T) { | ||
if !ktesting.KafkaIsAtLeast("1.1.0") { | ||
t.Skip("Skipping test because kafka version is not high enough.") | ||
} | ||
|
||
client, shutdown := newLocalClient() | ||
defer shutdown() | ||
|
||
topic := makeTopic() | ||
createTopic(t, topic, 1) | ||
|
||
groupID := makeGroupID() | ||
|
||
group, err := NewConsumerGroup(ConsumerGroupConfig{ | ||
ID: groupID, | ||
Topics: []string{topic}, | ||
Brokers: []string{"localhost:9092"}, | ||
HeartbeatInterval: 2 * time.Second, | ||
RebalanceTimeout: 2 * time.Second, | ||
RetentionTime: time.Hour, | ||
Logger: &testKafkaLogger{T: t}, | ||
}) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
defer group.Close() | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | ||
defer cancel() | ||
|
||
gen, err := group.Next(ctx) | ||
if gen == nil { | ||
t.Fatalf("expected generation 1 not to be nil") | ||
} | ||
if err != nil { | ||
t.Fatalf("expected no error, but got %+v", err) | ||
} | ||
|
||
// delete not empty group | ||
res, err := client.DeleteGroups(ctx, &DeleteGroupsRequest{ | ||
GroupIDs: []string{groupID}, | ||
}) | ||
|
||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
if !errors.Is(res.Errors[groupID], NonEmptyGroup) { | ||
t.Fatalf("expected NonEmptyGroup error, but got %+v", res.Errors[groupID]) | ||
} | ||
|
||
err = group.Close() | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
// delete empty group | ||
res, err = client.DeleteGroups(ctx, &DeleteGroupsRequest{ | ||
GroupIDs: []string{groupID}, | ||
}) | ||
|
||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
if err = res.Errors[groupID]; err != nil { | ||
t.Error(err) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package deletegroups | ||
|
||
import "github.com/segmentio/kafka-go/protocol" | ||
|
||
func init() { | ||
protocol.Register(&Request{}, &Response{}) | ||
} | ||
|
||
type Request struct { | ||
// We need at least one tagged field to indicate that this is a "flexible" message | ||
// type. | ||
_ struct{} `kafka:"min=v2,max=v2,tag"` | ||
|
||
GroupIDs []string `kafka:"min=v0,max=v2"` | ||
} | ||
|
||
func (r *Request) Group() string { | ||
// use first group to determine group coordinator | ||
if len(r.GroupIDs) > 0 { | ||
return r.GroupIDs[0] | ||
} | ||
return "" | ||
} | ||
|
||
func (r *Request) ApiKey() protocol.ApiKey { return protocol.DeleteGroups } | ||
|
||
var ( | ||
_ protocol.GroupMessage = (*Request)(nil) | ||
) | ||
|
||
type Response struct { | ||
// We need at least one tagged field to indicate that this is a "flexible" message | ||
// type. | ||
_ struct{} `kafka:"min=v2,max=v2,tag"` | ||
|
||
ThrottleTimeMs int32 `kafka:"min=v0,max=v2"` | ||
Responses []ResponseGroup `kafka:"min=v0,max=v2"` | ||
} | ||
|
||
func (r *Response) ApiKey() protocol.ApiKey { return protocol.DeleteGroups } | ||
|
||
type ResponseGroup struct { | ||
GroupID string `kafka:"min=v0,max=v2"` | ||
ErrorCode int16 `kafka:"min=v0,max=v2"` | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
package deletegroups_test | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/segmentio/kafka-go/protocol/deletegroups" | ||
"github.com/segmentio/kafka-go/protocol/prototest" | ||
) | ||
|
||
func TestDeleteGroupsRequest(t *testing.T) { | ||
for _, version := range []int16{0, 1, 2} { | ||
prototest.TestRequest(t, version, &deletegroups.Request{ | ||
GroupIDs: []string{"group1", "group2"}, | ||
}) | ||
} | ||
} | ||
|
||
func TestDeleteGroupsResponse(t *testing.T) { | ||
for _, version := range []int16{0, 1, 2} { | ||
prototest.TestResponse(t, version, &deletegroups.Response{ | ||
Responses: []deletegroups.ResponseGroup{ | ||
{ | ||
GroupID: "group1", | ||
ErrorCode: 1, | ||
}, | ||
{ | ||
GroupID: "group2", | ||
ErrorCode: 1, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ErrorCode 1 is a failure, could you write a test case that succeeds as well |
||
}, | ||
}, | ||
}) | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use the
makeError
function instead ofError
(see other examples for usage)also don't need the if/else statement as
makeError
takes care of that tooThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the review, I've made the changes.