Skip to content

Commit

Permalink
fix(logging): log messages are lost with heavy load (databendlabs#12385)
Browse files Browse the repository at this point in the history
`tracing_appender::non_blocking()` will drop log messages if the channel
is full. Replace it with a `BufWriter<_>`
  • Loading branch information
drmingdrmer authored and andylokandy committed Nov 27, 2023
1 parent d76ccaf commit f5b5cdc
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 25 additions & 8 deletions src/common/tracing/src/minitrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::borrow::Cow;
use std::fmt;
use std::io;
use std::io::BufWriter;
use std::io::Write;
use std::time::SystemTime;

Expand All @@ -23,6 +25,8 @@ use log::LevelFilter;
use log::Log;
use minitrace::prelude::*;
use serde_json::Map;
use tracing_appender::non_blocking::NonBlocking;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_appender::rolling::RollingFileAppender;
use tracing_appender::rolling::Rotation;

Expand Down Expand Up @@ -109,11 +113,8 @@ pub fn init_logging(name: &str, cfg: &Config) -> Vec<Box<dyn Drop + Send + Sync
// File logger
if cfg.file.on {
let (normal_log_file, flush_guard) =
tracing_appender::non_blocking(RollingFileAppender::new(
Rotation::HOURLY,
&cfg.file.dir,
format!("databend-query-{name}"),
));
new_file_log_writer(&cfg.file.dir, format!("databend-query-{name}"));

guards.push(Box::new(flush_guard));

normal_logger = normal_logger.chain(
Expand Down Expand Up @@ -141,9 +142,8 @@ pub fn init_logging(name: &str, cfg: &Config) -> Vec<Box<dyn Drop + Send + Sync

// Query logger
if cfg.query.on {
let (query_log_file, flush_guard) = tracing_appender::non_blocking(
RollingFileAppender::new(Rotation::HOURLY, &cfg.query.dir, name),
);
let (query_log_file, flush_guard) = new_file_log_writer(&cfg.query.dir, name);

guards.push(Box::new(flush_guard));

query_logger = query_logger.chain(Box::new(query_log_file) as Box<dyn Write + Send>);
Expand Down Expand Up @@ -308,3 +308,20 @@ impl Log for MinitraceLogger {

fn flush(&self) {}
}

/// Create a `BufWriter<NonBlocking>` for a rolling file logger.
///
/// `BufWriter` collects log segments into a whole before sending to underlying writer.
/// `NonBlocking` sends log to another thread to execute the write IO to avoid blocking the thread
/// that calls `log`.
///
/// Note that `NonBlocking` will discard logs if there are too many `io::Write::write(NonBlocking)`,
/// especially when `fern` sends log segments one by one to the `Writer`.
/// Therefore a `BufWriter` is used to reduce the number of `io::Write::write(NonBlocking)`.
fn new_file_log_writer(dir: &str, name: impl ToString) -> (BufWriter<NonBlocking>, WorkerGuard) {
let rolling = RollingFileAppender::new(Rotation::HOURLY, dir, name.to_string());
let (non_blocking, flush_guard) = tracing_appender::non_blocking(rolling);
let buffered_non_blocking = io::BufWriter::with_capacity(64 * 1024 * 1024, non_blocking);

(buffered_non_blocking, flush_guard)
}

0 comments on commit f5b5cdc

Please sign in to comment.