-
Notifications
You must be signed in to change notification settings - Fork 386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expose metrics via Control #338
Conversation
Please review is this general idea could make it in @patriknw @kciesielski @13h3r :) |
Yes, I think it could be implemented this way |
Thanks, LGTM. As for producer, there's |
thanks, will finish this PR Then soon
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor api adjustments, otherwise good
/** Public protocol which can be used to interact with the Actor */ | ||
object Protocol { | ||
// requests | ||
final case class RequestMetrics() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
strange with case class without params, case object?
then it need accessor for Java API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
|
||
// responses | ||
/** Contains a snapshot of metrics obtained from the underlying kafka consumer */ | ||
final case class ConsumerMetrics(metrics: Map[MetricName, Metric]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java API: getMetrics: java.util.Map
What about this PR? Seems there is no way to get consumer metrics now. |
Hah, got reminded of this one via discuss.akka.io Will fix it |
2a1eb86
to
a36a736
Compare
Addressed feedback, checked Java API, added docs (also for producer where it's trivial however added it so it's all explained symmetrically). Please review @kciesielski @patriknw |
override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { | ||
onRevoke(partitions.asScala) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this have to do with metrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rebase gone weird :\ Thanks
/** Java API */ | ||
def getMetrics: util.Map[MetricName, Metric] = metrics.asJava | ||
} | ||
// ------------ end of public protocol ------------ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works when the KafkaConsumer actor is created and passed in, but what about the other cases where the actor is created by the library? Wouldn't it be better to expose the metrics via the Control
materialized value?
Could be a Future and using ask internally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah good point, will add that
Yeah you're right that a |
Perhaps |
Yeah, tried doing that. It's hard to make it It is easier to make it a |
Done, now the only API is to get metrics from the control value |
Please give it a look @patriknw when you have some time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. LGTM with a few import cleanups which we should automate :)
import org.apache.kafka.clients.consumer.ConsumerRecord | ||
import org.apache.kafka.common.TopicPartition | ||
import akka.util.Timeout | ||
import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consumer is unused
@@ -9,6 +9,8 @@ import java.util.concurrent.TimeUnit | |||
|
|||
import akka.NotUsed | |||
import akka.actor.{ActorRef, ExtendedActorSystem, Terminated} | |||
import akka.dispatch.ExecutionContexts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clean up imports, some of the new unused
@@ -7,20 +7,23 @@ package sample.scaladsl | |||
import akka.pattern.ask | |||
import akka.kafka.ConsumerMessage.{CommittableMessage, CommittableOffsetBatch} | |||
import akka.kafka._ | |||
import akka.actor.{Props, ActorRef, Actor, ActorSystem, ActorLogging, PoisonPill} | |||
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, PoisonPill, Props} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of these new imports are unused
@@ -187,6 +192,11 @@ private[kafka] class KafkaConsumerActor[K, V](settings: ConsumerSettings[K, V]) | |||
stopInProgress = true | |||
context.become(stopping) | |||
} | |||
|
|||
case RequestMetrics => | |||
val unmodifiableYetMutableMetrics: java.util.Map[MetricName, _ <: Metric] = consumer.metrics() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This map goes through a lot of conversions for the Java case.
I guess there's no point trying to avoid this. Looking at the Kafka code they give us the raw map wrapped in an unmodifiable so we shouldn't pass that around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah it is converted a lot indeed... I figured to leave it along until or rather IF reported as a problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks! Exciting one :) |
We'd like to be able to expose metrics to users when they use the Sharing the Consumer Actor pattern; I thought this style would likely be good enough - I assume the same could be done in the Publisher?