Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remediate a number go-routine leaks (mainly test issues) #2198

Merged
merged 1 commit into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
if err != nil {
return nil, err
}
return NewClusterAdminFromClient(client)
admin, err := NewClusterAdminFromClient(client)
if err != nil {
client.Close()
}
return admin, err
}

// NewClusterAdminFromClient creates a new ClusterAdmin using the given client.
Expand Down
3 changes: 3 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1498,6 +1498,7 @@ func TestDeleteConsumerGroup(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer admin.Close()

err = admin.DeleteConsumerGroup(group)
if err != nil {
Expand Down Expand Up @@ -1537,6 +1538,7 @@ func TestDeleteOffset(t *testing.T) {
if err != nil {
t.Fatalf("DeleteConsumerGroupOffset failed with error %v", err)
}
defer admin.Close()

// Test Error
handlerMap["DeleteOffsetsRequest"] = NewMockDeleteOffsetRequest(t).SetDeletedOffset(ErrNotCoordinatorForConsumer, topic, partition, ErrNoError)
Expand Down Expand Up @@ -1577,6 +1579,7 @@ func TestRefreshMetaDataWithDifferentController(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer client.Close()

ca := clusterAdmin{client: client, conf: config}

Expand Down
13 changes: 11 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,9 @@ func TestClientRefreshBehaviour(t *testing.T) {

func TestClientRefreshBrokers(t *testing.T) {
initialSeed := NewMockBroker(t, 0)
defer initialSeed.Close()
leader := NewMockBroker(t, 5)
defer leader.Close()

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
Expand Down Expand Up @@ -549,7 +551,9 @@ func TestClientRefreshBrokers(t *testing.T) {

func TestClientRefreshMetadataBrokerOffline(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
leader := NewMockBroker(t, 5)
defer leader.Close()

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
Expand All @@ -560,6 +564,7 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer client.Close()

if len(client.Brokers()) != 2 {
t.Error("Meta broker is not 2")
Expand All @@ -579,7 +584,9 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) {

func TestClientGetBroker(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
leader := NewMockBroker(t, 5)
defer leader.Close()

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
Expand All @@ -590,6 +597,7 @@ func TestClientGetBroker(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer client.Close()

broker, err := client.Broker(leader.BrokerID())
if err != nil {
Expand Down Expand Up @@ -669,6 +677,7 @@ func TestClientResurrectDeadSeeds(t *testing.T) {
t.Error("incorrect number of dead seeds")
}

seed2.Close()
safeClose(t, c)
}

Expand Down Expand Up @@ -938,6 +947,7 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {

func TestClientAutorefreshShutdownRace(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

metadataResponse := new(MetadataResponse)
seedBroker.Returns(metadataResponse)
Expand All @@ -964,6 +974,7 @@ func TestClientAutorefreshShutdownRace(t *testing.T) {

// Then return some metadata to the still-running background thread
leader := NewMockBroker(t, 2)
defer leader.Close()
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("foo", 0, leader.BrokerID(), []int32{2}, []int32{2}, []int32{}, ErrNoError)
seedBroker.Returns(metadataResponse)
Expand All @@ -973,8 +984,6 @@ func TestClientAutorefreshShutdownRace(t *testing.T) {
t.Fatalf("goroutine client.Close():%s", err)
}

seedBroker.Close()

// give the update time to happen so we get a panic if it's still running (which it shouldn't)
time.Sleep(10 * time.Millisecond)
}
Expand Down
1 change: 1 addition & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,7 @@ func TestConsumeMessagesTrackLeader(t *testing.T) {

safeClose(t, pConsumer)
safeClose(t, consumer)
safeClose(t, client)
leader1.Close()
leader2.Close()
}
Expand Down
1 change: 1 addition & 0 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ func TestAbortPartitionOffsetManager(t *testing.T) {

// Response to refresh coordinator request
newCoordinator := NewMockBroker(t, 3)
defer newCoordinator.Close()
broker.Returns(&ConsumerMetadataResponse{
CoordinatorID: newCoordinator.BrokerID(),
CoordinatorHost: "127.0.0.1",
Expand Down