From 5b353044a0fbbaf6a755f31f01262e69487ad6e9 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Thu, 9 Jan 2025 11:01:43 -0800 Subject: [PATCH] MINOR: improve StreamThread periodic processing log (#18430) The current log is really helpful, this PR adds a bit more information to that log to help debug some issues. In particular, it is interesting to be able to debug situations that have long intervals between polls. It also includes a reference to how long it has been since it last logged so you don't have to find the previous time it was logged to compute quick per-second ratios. Reviewers: Anna Sophie Blee-Goldman --- .../streams/processor/internals/StreamThread.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 05c832811adc4..1243c85af0b6e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -307,6 +307,7 @@ public boolean isStartingRunningOrPartitionAssigned() { private long lastLogSummaryMs = -1L; private long totalRecordsProcessedSinceLastSummary = 0L; private long totalPunctuatorsSinceLastSummary = 0L; + private long totalPolledSinceLastSummary = 0L; private long totalCommittedSinceLastSummary = 0L; private long now; @@ -955,6 +956,7 @@ void runOnceWithoutProcessingThreads() { final long pollLatency; taskManager.resumePollingForPartitionsWithAvailableSpace(); pollLatency = pollPhase(); + totalPolledSinceLastSummary += 1; // Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN during #pollRequests(). // The task manager internal states could be uninitialized if the state transition happens during #onPartitionsAssigned(). @@ -1070,12 +1072,14 @@ void runOnceWithoutProcessingThreads() { pollRatioSensor.record((double) pollLatency / runOnceLatency, now); commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now); - if (logSummaryIntervalMs > 0 && now - lastLogSummaryMs > logSummaryIntervalMs) { - log.info("Processed {} total records, ran {} punctuators, and committed {} total tasks since the last update", - totalRecordsProcessedSinceLastSummary, totalPunctuatorsSinceLastSummary, totalCommittedSinceLastSummary); + final long timeSinceLastLog = now - lastLogSummaryMs; + if (logSummaryIntervalMs > 0 && timeSinceLastLog > logSummaryIntervalMs) { + log.info("Processed {} total records, ran {} punctuators, polled {} times and committed {} total tasks since the last update {}ms ago", + totalRecordsProcessedSinceLastSummary, totalPunctuatorsSinceLastSummary, totalPolledSinceLastSummary, totalCommittedSinceLastSummary, timeSinceLastLog); totalRecordsProcessedSinceLastSummary = 0L; totalPunctuatorsSinceLastSummary = 0L; + totalPolledSinceLastSummary = 0L; totalCommittedSinceLastSummary = 0L; lastLogSummaryMs = now; }