Skip to content

Commit

Permalink
Merge pull request #1646 from mimaison/admin-describelogdirs
Browse files Browse the repository at this point in the history
Add DescribeLogDirs to admin client
  • Loading branch information
dnwe authored May 5, 2020
2 parents c6eb1d4 + d89fa9d commit 1e4e077
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 0 deletions.
48 changes: 48 additions & 0 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ type ClusterAdmin interface {
// Get information about the nodes in the cluster
DescribeCluster() (brokers []*Broker, controllerID int32, err error)

// Get information about all log directories on the given set of brokers
DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)

// Close shuts down the admin and closes underlying client.
Close() error
}
Expand Down Expand Up @@ -878,3 +881,48 @@ func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {

return nil
}

func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) {
allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata)

// Query brokers in parallel, since we may have to query multiple brokers
logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
errors := make(chan error, len(brokerIds))
wg := sync.WaitGroup{}

for _, b := range brokerIds {
wg.Add(1)
broker, err := ca.findBroker(b)
if err != nil {
Logger.Printf("Unable to find broker with ID = %v\n", b)
continue
}
go func(b *Broker, conf *Config) {
defer wg.Done()
_ = b.Open(conf) // Ensure that broker is opened

response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{})
if err != nil {
errors <- err
return
}
logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
logDirs[b.ID()] = response.LogDirs
logDirsMaps <- logDirs
}(broker, ca.conf)
}

wg.Wait()
close(logDirsMaps)
close(errors)

for logDirsMap := range logDirsMaps {
for id, logDirs := range logDirsMap {
allLogDirs[id] = logDirs
}
}

// Intentionally return only the first error for simplicity
err = <-errors
return
}
48 changes: 48 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1309,3 +1309,51 @@ func TestRefreshMetaDataWithDifferentController(t *testing.T) {
seedBroker2.BrokerID(), b.ID())
}
}

func TestDescribeLogDirs(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
"DescribeLogDirsRequest": NewMockDescribeLogDirsResponse(t).
SetLogDirs("/tmp/logs", map[string]int{"topic1": 2, "topic2": 2}),
})

config := NewConfig()
config.Version = V1_0_0_0

admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

logDirsPerBroker, err := admin.DescribeLogDirs([]int32{seedBroker.BrokerID()})
if err != nil {
t.Fatal(err)
}

if len(logDirsPerBroker) != 1 {
t.Fatalf("Expected %v results, got %v", 1, len(logDirsPerBroker))
}
logDirs := logDirsPerBroker[seedBroker.BrokerID()]
if len(logDirs) != 1 {
t.Fatalf("Expected log dirs for broker %v to be returned, but it did not, got %v", seedBroker.BrokerID(), len(logDirs))
}
logDirsBroker := logDirs[0]
if logDirsBroker.ErrorCode != ErrNoError {
t.Fatalf("Expected no error for broker %v, but it was %v", seedBroker.BrokerID(), logDirsBroker.ErrorCode)
}
if logDirsBroker.Path != "/tmp/logs" {
t.Fatalf("Expected log dirs for broker %v to be '/tmp/logs', but it was %v", seedBroker.BrokerID(), logDirsBroker.Path)
}
if len(logDirsBroker.Topics) != 2 {
t.Fatalf("Expected log dirs for broker %v to have 2 topics, but it had %v", seedBroker.BrokerID(), len(logDirsBroker.Topics))
}
err = admin.Close()
if err != nil {
t.Fatal(err)
}
}
6 changes: 6 additions & 0 deletions describe_log_dirs_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func (r *DescribeLogDirsResponseDirMetadata) encode(pe packetEncoder) error {
return err
}

if err := pe.putArrayLength(len(r.Topics)); err != nil {
return err
}
for _, topic := range r.Topics {
if err := topic.encode(pe); err != nil {
return err
Expand Down Expand Up @@ -137,6 +140,9 @@ func (r *DescribeLogDirsResponseTopic) encode(pe packetEncoder) error {
return err
}

if err := pe.putArrayLength(len(r.Partitions)); err != nil {
return err
}
for _, partition := range r.Partitions {
if err := partition.encode(pe); err != nil {
return err
Expand Down
42 changes: 42 additions & 0 deletions mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -1029,3 +1029,45 @@ func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHead
}
return resp
}

type MockDescribeLogDirsResponse struct {
t TestReporter
logDirs []DescribeLogDirsResponseDirMetadata
}

func NewMockDescribeLogDirsResponse(t TestReporter) *MockDescribeLogDirsResponse {
return &MockDescribeLogDirsResponse{t: t}
}

func (m *MockDescribeLogDirsResponse) SetLogDirs(logDirPath string, topicPartitions map[string]int) *MockDescribeLogDirsResponse {
topics := []DescribeLogDirsResponseTopic{}
for topic := range topicPartitions {
partitions := []DescribeLogDirsResponsePartition{}
for i := 0; i < topicPartitions[topic]; i++ {
partitions = append(partitions, DescribeLogDirsResponsePartition{
PartitionID: int32(i),
IsTemporary: false,
OffsetLag: int64(0),
Size: int64(1234),
})
}
topics = append(topics, DescribeLogDirsResponseTopic{
Topic: topic,
Partitions: partitions,
})
}
logDir := DescribeLogDirsResponseDirMetadata{
ErrorCode: ErrNoError,
Path: logDirPath,
Topics: topics,
}
m.logDirs = []DescribeLogDirsResponseDirMetadata{logDir}
return m
}

func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithHeader {
resp := &DescribeLogDirsResponse{
LogDirs: m.logDirs,
}
return resp
}

0 comments on commit 1e4e077

Please sign in to comment.