Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new metric for kafka topic partition size. #298

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ var (
topicPartitions *prometheus.Desc
topicCurrentOffset *prometheus.Desc
topicOldestOffset *prometheus.Desc
topicPartitionSize *prometheus.Desc
topicPartitionLeader *prometheus.Desc
topicPartitionReplicas *prometheus.Desc
topicPartitionInSyncReplicas *prometheus.Desc
Expand Down Expand Up @@ -294,6 +295,7 @@ func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {
ch <- topicCurrentOffset
ch <- topicOldestOffset
ch <- topicPartitions
ch <- topicPartitionSize
ch <- topicPartitionLeader
ch <- topicPartitionReplicas
ch <- topicPartitionInSyncReplicas
Expand Down Expand Up @@ -399,6 +401,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
topicPartitions, prometheus.GaugeValue, float64(len(partitions)), topic,
)

e.mu.Lock()
offset[topic] = make(map[int32]int64, len(partitions))
e.mu.Unlock()
Expand All @@ -412,6 +415,23 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) {
)
}

if err := broker.Open(e.client.Config()); err != nil && err != sarama.ErrAlreadyConnected {
glog.Errorf("Error open Kafka broker: %v", err)
}
describeLogDirs, err := broker.DescribeLogDirs(&sarama.DescribeLogDirsRequest{1, []sarama.DescribeLogDirsRequestTopic{{string(topic), []int32{int32(partition)}}}})
if err != nil {
glog.Errorf("Error describe log dirs: %v", err)
}
for _, logDir := range describeLogDirs.LogDirs {
for _, topic4size := range logDir.Topics {
for _, partition4size := range topic4size.Partitions {
ch <- prometheus.MustNewConstMetric(
topicPartitionSize, prometheus.GaugeValue, float64(partition4size.Size), topic, strconv.FormatInt(int64(partition), 10),
)
}
}
}

currentOffset, err := e.client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
glog.Errorf("Cannot get current offset of topic %s partition %d: %v", topic, partition, err)
Expand Down Expand Up @@ -570,7 +590,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) {
glog.Errorf("Cannot get GetMemberAssignment of group member %v : %v", member, err)
return
}
for topic, partions := range assignment.Topics {
for topic, partions := range assignment.Topics {
for _, partition := range partions {
offsetFetchRequest.AddPartition(topic, partition)
}
Expand Down Expand Up @@ -811,6 +831,12 @@ func setup(
[]string{"topic", "partition"}, labels,
)

topicPartitionSize = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_size"),
"Size for this Topic Partition",
[]string{"topic", "partition"}, labels,
)

topicPartitionInSyncReplicas = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_in_sync_replica"),
"Number of In-Sync Replicas for this Topic/Partition",
Expand Down