Skip to content

Commit

Permalink
MINOR: improve StreamThread periodic processing log (apache#18430)
Browse files Browse the repository at this point in the history
The current log is really helpful, this PR adds a bit more information to that log to help debug some issues. In particular, it is interesting to be able to debug situations that have long intervals between polls. It also includes a reference to how long it has been since it last logged so you don't have to find the previous time it was logged to compute quick per-second ratios.

Reviewers: Anna Sophie Blee-Goldman <[email protected]>
  • Loading branch information
agavra authored and ableegoldman committed Jan 9, 2025
1 parent b1112a4 commit 5b35304
Showing 1 changed file with 7 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ public boolean isStartingRunningOrPartitionAssigned() {
private long lastLogSummaryMs = -1L;
private long totalRecordsProcessedSinceLastSummary = 0L;
private long totalPunctuatorsSinceLastSummary = 0L;
private long totalPolledSinceLastSummary = 0L;
private long totalCommittedSinceLastSummary = 0L;

private long now;
Expand Down Expand Up @@ -955,6 +956,7 @@ void runOnceWithoutProcessingThreads() {
final long pollLatency;
taskManager.resumePollingForPartitionsWithAvailableSpace();
pollLatency = pollPhase();
totalPolledSinceLastSummary += 1;

// Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN during #pollRequests().
// The task manager internal states could be uninitialized if the state transition happens during #onPartitionsAssigned().
Expand Down Expand Up @@ -1070,12 +1072,14 @@ void runOnceWithoutProcessingThreads() {
pollRatioSensor.record((double) pollLatency / runOnceLatency, now);
commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, now);

if (logSummaryIntervalMs > 0 && now - lastLogSummaryMs > logSummaryIntervalMs) {
log.info("Processed {} total records, ran {} punctuators, and committed {} total tasks since the last update",
totalRecordsProcessedSinceLastSummary, totalPunctuatorsSinceLastSummary, totalCommittedSinceLastSummary);
final long timeSinceLastLog = now - lastLogSummaryMs;
if (logSummaryIntervalMs > 0 && timeSinceLastLog > logSummaryIntervalMs) {
log.info("Processed {} total records, ran {} punctuators, polled {} times and committed {} total tasks since the last update {}ms ago",
totalRecordsProcessedSinceLastSummary, totalPunctuatorsSinceLastSummary, totalPolledSinceLastSummary, totalCommittedSinceLastSummary, timeSinceLastLog);

totalRecordsProcessedSinceLastSummary = 0L;
totalPunctuatorsSinceLastSummary = 0L;
totalPolledSinceLastSummary = 0L;
totalCommittedSinceLastSummary = 0L;
lastLogSummaryMs = now;
}
Expand Down

0 comments on commit 5b35304

Please sign in to comment.