Skip to content

Commit

Permalink
Remediate a number go-routine leaks.
Browse files Browse the repository at this point in the history
- NewClusterAdmin would leak a Client if Controller could not be found
- unclosed MockBroker and Client leaks in unit tests
  • Loading branch information
k-wall committed Mar 30, 2022
1 parent f07b7b8 commit 376a8b8
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 3 deletions.
6 changes: 5 additions & 1 deletion admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
if err != nil {
return nil, err
}
return NewClusterAdminFromClient(client)
admin, err := NewClusterAdminFromClient(client)
if err != nil {
client.Close()
}
return admin, err
}

// NewClusterAdminFromClient creates a new ClusterAdmin using the given client.
Expand Down
3 changes: 3 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1498,6 +1498,7 @@ func TestDeleteConsumerGroup(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer admin.Close()

err = admin.DeleteConsumerGroup(group)
if err != nil {
Expand Down Expand Up @@ -1537,6 +1538,7 @@ func TestDeleteOffset(t *testing.T) {
if err != nil {
t.Fatalf("DeleteConsumerGroupOffset failed with error %v", err)
}
defer admin.Close()

// Test Error
handlerMap["DeleteOffsetsRequest"] = NewMockDeleteOffsetRequest(t).SetDeletedOffset(ErrNotCoordinatorForConsumer, topic, partition, ErrNoError)
Expand Down Expand Up @@ -1577,6 +1579,7 @@ func TestRefreshMetaDataWithDifferentController(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer client.Close()

ca := clusterAdmin{client: client, conf: config}

Expand Down
13 changes: 11 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,9 @@ func TestClientRefreshBehaviour(t *testing.T) {

func TestClientRefreshBrokers(t *testing.T) {
initialSeed := NewMockBroker(t, 0)
defer initialSeed.Close()
leader := NewMockBroker(t, 5)
defer leader.Close()

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
Expand Down Expand Up @@ -549,7 +551,9 @@ func TestClientRefreshBrokers(t *testing.T) {

func TestClientRefreshMetadataBrokerOffline(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
leader := NewMockBroker(t, 5)
defer leader.Close()

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
Expand All @@ -560,6 +564,7 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer client.Close()

if len(client.Brokers()) != 2 {
t.Error("Meta broker is not 2")
Expand All @@ -579,7 +584,9 @@ func TestClientRefreshMetadataBrokerOffline(t *testing.T) {

func TestClientGetBroker(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
leader := NewMockBroker(t, 5)
defer leader.Close()

metadataResponse1 := new(MetadataResponse)
metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
Expand All @@ -590,6 +597,7 @@ func TestClientGetBroker(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer client.Close()

broker, err := client.Broker(leader.BrokerID())
if err != nil {
Expand Down Expand Up @@ -669,6 +677,7 @@ func TestClientResurrectDeadSeeds(t *testing.T) {
t.Error("incorrect number of dead seeds")
}

seed2.Close()
safeClose(t, c)
}

Expand Down Expand Up @@ -938,6 +947,7 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {

func TestClientAutorefreshShutdownRace(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

metadataResponse := new(MetadataResponse)
seedBroker.Returns(metadataResponse)
Expand All @@ -964,6 +974,7 @@ func TestClientAutorefreshShutdownRace(t *testing.T) {

// Then return some metadata to the still-running background thread
leader := NewMockBroker(t, 2)
defer leader.Close()
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("foo", 0, leader.BrokerID(), []int32{2}, []int32{2}, []int32{}, ErrNoError)
seedBroker.Returns(metadataResponse)
Expand All @@ -973,8 +984,6 @@ func TestClientAutorefreshShutdownRace(t *testing.T) {
t.Fatalf("goroutine client.Close():%s", err)
}

seedBroker.Close()

// give the update time to happen so we get a panic if it's still running (which it shouldn't)
time.Sleep(10 * time.Millisecond)
}
Expand Down
1 change: 1 addition & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,7 @@ func TestConsumeMessagesTrackLeader(t *testing.T) {

safeClose(t, pConsumer)
safeClose(t, consumer)
safeClose(t, client)
leader1.Close()
leader2.Close()
}
Expand Down
1 change: 1 addition & 0 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ func TestAbortPartitionOffsetManager(t *testing.T) {

// Response to refresh coordinator request
newCoordinator := NewMockBroker(t, 3)
defer newCoordinator.Close()
broker.Returns(&ConsumerMetadataResponse{
CoordinatorID: newCoordinator.BrokerID(),
CoordinatorHost: "127.0.0.1",
Expand Down

0 comments on commit 376a8b8

Please sign in to comment.