Skip to content

Commit

Permalink
Critical dependency logging (sigp#4988)
Browse files Browse the repository at this point in the history
* add metrics layer

* add metrics

* simplify getting the target

* make clippy happy

* fix typos

* unify deps under workspace

* make import statement shorter, fix typos

* enable warn by default, mark flag as deprecated

* do not exit on error when initializing logging fails

* revert exit on error

* adjust bootnode logging

* add logging layer

* non blocking file writer

* non blocking file writer

* add tracing visitor

* use target as is by default

* make libp2p events register correctly

* adjust repilcated cli help

* refactor tracing layer

* linting

* filesize

* log gossipsub, dont filter by log level

* turn on debug logs by default, remove deprecation warning

* truncate file, add timestamp, add unit test

* suppress output (#5)

* use tracing appender

* cleanup

* Add a task to remove old log files and upgrade to warn level

* Add the time feature for tokio

* Udeps and fmt

---------

Co-authored-by: Diva M <[email protected]>
Co-authored-by: Divma <[email protected]>
Co-authored-by: Age Manning <[email protected]>
  • Loading branch information
4 people authored Jan 16, 2024
1 parent a68b701 commit f22e5b0
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 35 deletions.
48 changes: 45 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,10 @@ tempfile = "3"
tokio = { version = "1", features = ["rt-multi-thread", "sync", "signal"] }
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-util = { version = "0.6", features = ["codec", "compat", "time"] }
tracing-appender = "0.2"
tracing-core = "0.1"
tracing-log = "0.2"
tracing-subscriber = "0.3"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tree_hash = "0.5"
tree_hash_derive = "0.5"
url = "2"
Expand Down
1 change: 1 addition & 0 deletions common/directory/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub const DEFAULT_NETWORK_DIR: &str = "network";
pub const DEFAULT_VALIDATOR_DIR: &str = "validators";
pub const DEFAULT_SECRET_DIR: &str = "secrets";
pub const DEFAULT_WALLET_DIR: &str = "wallets";
pub const DEFAULT_TRACING_DIR: &str = "tracing";

/// Base directory name for unnamed testnets passed through the --testnet-dir flag
pub const CUSTOM_TESTNET_DIR: &str = "custom";
Expand Down
4 changes: 3 additions & 1 deletion common/logging/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ slog-async = { workspace = true }
slog-term = { workspace = true }
sloggers = { workspace = true }
take_mut = "0.2.2"
tokio = { workspace = true }
tokio = { workspace = true, features = [ "time" ] }
tracing = "0.1"
tracing-core = { workspace = true }
tracing-log = { workspace = true }
tracing-subscriber = { workspace = true }
tracing-appender = { workspace = true }
48 changes: 48 additions & 0 deletions common/logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,21 @@ use lighthouse_metrics::{
use slog::Logger;
use slog_term::Decorator;
use std::io::{Result, Write};
use std::path::PathBuf;
use std::time::{Duration, Instant};
use tracing_appender::non_blocking::NonBlocking;
use tracing_logging_layer::LoggingLayer;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

pub const MAX_MESSAGE_WIDTH: usize = 40;

pub mod async_record;
mod sse_logging_components;
mod tracing_logging_layer;
mod tracing_metrics_layer;

pub use sse_logging_components::SSELoggingComponents;
pub use tracing_logging_layer::cleanup_logging_task;
pub use tracing_metrics_layer::MetricsLayer;

/// The minimum interval between log messages indicating that a queue is full.
Expand Down Expand Up @@ -217,6 +223,48 @@ impl TimeLatch {
}
}

pub fn create_tracing_layer(base_tracing_log_path: PathBuf, turn_on_terminal_logs: bool) {
let filter_layer = match tracing_subscriber::EnvFilter::try_from_default_env()
.or_else(|_| tracing_subscriber::EnvFilter::try_new("warn"))
{
Ok(filter) => filter,
Err(e) => {
eprintln!("Failed to initialize dependency logging {e}");
return;
}
};

let libp2p_writer =
tracing_appender::rolling::daily(base_tracing_log_path.clone(), "libp2p.log");
let discv5_writer =
tracing_appender::rolling::daily(base_tracing_log_path.clone(), "discv5.log");

let (libp2p_non_blocking_writer, libp2p_guard) = NonBlocking::new(libp2p_writer);
let (discv5_non_blocking_writer, discv5_guard) = NonBlocking::new(discv5_writer);

let custom_layer = LoggingLayer {
libp2p_non_blocking_writer,
libp2p_guard,
discv5_non_blocking_writer,
discv5_guard,
};

if let Err(e) = tracing_subscriber::fmt()
.with_env_filter(filter_layer)
.with_writer(move || {
tracing_subscriber::fmt::writer::OptionalWriter::<std::io::Stdout>::from(
turn_on_terminal_logs.then(std::io::stdout),
)
})
.finish()
.with(MetricsLayer)
.with(custom_layer)
.try_init()
{
eprintln!("Failed to initialize dependency logging {e}");
}
}

/// Return a logger suitable for test usage.
///
/// By default no logs will be printed, but they can be enabled via
Expand Down
115 changes: 115 additions & 0 deletions common/logging/src/tracing_logging_layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use chrono::{naive::Days, prelude::*};
use slog::{debug, warn};
use std::io::Write;
use tracing::Subscriber;
use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
use tracing_subscriber::layer::Context;
use tracing_subscriber::Layer;

pub struct LoggingLayer {
pub libp2p_non_blocking_writer: NonBlocking,
pub libp2p_guard: WorkerGuard,
pub discv5_non_blocking_writer: NonBlocking,
pub discv5_guard: WorkerGuard,
}

impl<S> Layer<S> for LoggingLayer
where
S: Subscriber,
{
fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<S>) {
let meta = event.metadata();
let log_level = meta.level();
let timestamp = Local::now().format("%Y-%m-%d %H:%M:%S").to_string();

let target = match meta.target().split_once("::") {
Some((crate_name, _)) => crate_name,
None => "unknown",
};

let mut writer = match target {
"libp2p_gossipsub" => self.libp2p_non_blocking_writer.clone(),
"discv5" => self.discv5_non_blocking_writer.clone(),
_ => return,
};

let mut visitor = LogMessageExtractor {
message: String::default(),
};

event.record(&mut visitor);
let message = format!("{} {} {}\n", timestamp, log_level, visitor.message);

if let Err(e) = writer.write_all(message.as_bytes()) {
eprintln!("Failed to write log: {}", e);
}
}
}

struct LogMessageExtractor {
message: String,
}

impl tracing_core::field::Visit for LogMessageExtractor {
fn record_debug(&mut self, _: &tracing_core::Field, value: &dyn std::fmt::Debug) {
self.message = format!("{} {:?}", self.message, value);
}
}

/// Creates a long lived async task that routinely deletes old tracing log files
pub async fn cleanup_logging_task(path: std::path::PathBuf, log: slog::Logger) {
loop {
// Delay for 1 day and then prune old logs
tokio::time::sleep(std::time::Duration::from_secs(60 * 60 * 24)).await;

let Some(yesterday_date) = chrono::prelude::Local::now()
.naive_local()
.checked_sub_days(Days::new(1))
else {
warn!(log, "Could not calculate the current date");
return;
};

// Search for old log files
let dir = path.as_path();

if dir.is_dir() {
let Ok(files) = std::fs::read_dir(dir) else {
warn!(log, "Could not read log directory contents"; "path" => ?dir);
break;
};

for file in files {
let Ok(dir_entry) = file else {
warn!(log, "Could not read file");
continue;
};

let Ok(file_name) = dir_entry.file_name().into_string() else {
warn!(log, "Could not read file"; "file" => ?dir_entry);
continue;
};

if file_name.starts_with("libp2p.log") | file_name.starts_with("discv5.log") {
let log_file_date = file_name.split('.').collect::<Vec<_>>();
if log_file_date.len() == 3 {
let Ok(log_file_date_type) =
NaiveDate::parse_from_str(log_file_date[2], "%Y-%m-%d")
else {
warn!(log, "Could not parse log file date"; "file" => file_name);
continue;
};

if log_file_date_type < yesterday_date.into() {
// Delete the file, its too old
debug!(log, "Removing old log file"; "file" => &file_name);
if let Err(e) = std::fs::remove_file(dir_entry.path()) {
warn!(log, "Failed to remove log file"; "file" => file_name, "error" => %e);
}
}
}
}
}
}
}
}
54 changes: 24 additions & 30 deletions lighthouse/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use slog::{crit, info};
use std::path::PathBuf;
use std::process::exit;
use task_executor::ShutdownReason;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use types::{EthSpec, EthSpecId};
use validator_client::ProductionValidatorClient;

Expand Down Expand Up @@ -365,34 +364,6 @@ fn main() {
}
}

// read the `RUST_LOG` statement
let filter_layer = match tracing_subscriber::EnvFilter::try_from_default_env()
.or_else(|_| tracing_subscriber::EnvFilter::try_new("debug"))
{
Ok(filter) => filter,
Err(e) => {
eprintln!("Failed to initialize dependency logging {e}");
exit(1)
}
};

let turn_on_terminal_logs = matches.is_present("env_log");

if let Err(e) = tracing_subscriber::fmt()
.with_env_filter(filter_layer)
.with_writer(move || {
tracing_subscriber::fmt::writer::OptionalWriter::<std::io::Stdout>::from(
turn_on_terminal_logs.then(std::io::stdout),
)
})
.finish()
.with(logging::MetricsLayer)
.try_init()
{
eprintln!("Failed to initialize dependency logging {e}");
exit(1)
}

let result = get_eth2_network_config(&matches).and_then(|eth2_network_config| {
let eth_spec_id = eth2_network_config.eth_spec_id()?;

Expand Down Expand Up @@ -534,7 +505,7 @@ fn run<E: EthSpec>(
};

let logger_config = LoggerConfig {
path: log_path,
path: log_path.clone(),
debug_level: String::from(debug_level),
logfile_debug_level: String::from(logfile_debug_level),
log_format: log_format.map(String::from),
Expand All @@ -557,6 +528,29 @@ fn run<E: EthSpec>(

let log = environment.core_context().log().clone();

let mut tracing_log_path: Option<PathBuf> = clap_utils::parse_optional(matches, "logfile")?;

if tracing_log_path.is_none() {
tracing_log_path = Some(
parse_path_or_default(matches, "datadir")?
.join(DEFAULT_BEACON_NODE_DIR)
.join("logs"),
)
}

let path = tracing_log_path.clone().unwrap();

let turn_on_terminal_logs = matches.is_present("env_log");

// Run a task to clean up old tracing logs.
let log_cleaner_context = environment.service_context("log_cleaner".to_string());
log_cleaner_context.executor.spawn(
logging::cleanup_logging_task(path.clone(), log.clone()),
"log_cleaner",
);

logging::create_tracing_layer(path, turn_on_terminal_logs);

// Allow Prometheus to export the time at which the process was started.
metrics::expose_process_start_time(&log);

Expand Down

0 comments on commit f22e5b0

Please sign in to comment.