Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(test) switch zebrad to a non-blocking tracing logger #5032

Merged
merged 17 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions zebra-state/src/service/finalized_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ impl FinalizedState {
// So we want to drop it before we exit.
std::mem::drop(new_state);

// Drops tracing log output that's hasn't already been written to stdout
// since this exits before calling drop on the WorkerGuard for the logger thread.
// This is okay for now because this is test-only code
//
// TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout
Self::exit_process();
}
}
Expand Down Expand Up @@ -307,6 +312,11 @@ impl FinalizedState {
// We're just about to do a forced exit, so it's ok to do a forced db shutdown
self.db.shutdown(true);

// Drops tracing log output that's hasn't already been written to stdout
// since this exits before calling drop on the WorkerGuard for the logger thread.
// This is okay for now because this is test-only code
//
// TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout
Self::exit_process();
}

Expand Down Expand Up @@ -343,6 +353,9 @@ impl FinalizedState {
let _ = stdout().lock().flush();
let _ = stderr().lock().flush();

// Exits before calling drop on the WorkerGuard for the logger thread,
// dropping any lines that haven't already been written to stdout.
// This is okay for now because this is test-only code
std::process::exit(0);
}
}
1 change: 1 addition & 0 deletions zebrad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ tinyvec = { version = "1.6.0", features = ["rustc_1_55"] }
thiserror = "1.0.33"

tracing-subscriber = { version = "0.3.11", features = ["env-filter"] }
tracing-appender = "0.2.2"
tracing-error = "0.2.0"
tracing-futures = "0.2.5"
tracing = "0.1.31"
Expand Down
15 changes: 13 additions & 2 deletions zebrad/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use self::entry_point::EntryPoint;
use std::{fmt::Write as _, io::Write as _, process};

use abscissa_core::{
application::{self, fatal_error, AppCell},
application::{self, AppCell},
config::{self, Configurable},
status_err,
terminal::{component::Terminal, stderr, stdout, ColorChoice},
Expand All @@ -18,6 +18,13 @@ use zebra_state::constants::{DATABASE_FORMAT_VERSION, LOCK_FILE_ERROR};

use crate::{commands::ZebradCmd, components::tracing::Tracing, config::ZebradConfig};

/// See <https://docs.rs/abscissa_core/latest/src/abscissa_core/application/exit.rs.html#7-10>
/// Print a fatal error message and exit
fn fatal_error(app_name: String, err: &dyn std::error::Error) -> ! {
status_err!("{} fatal error: {}", app_name, err);
process::exit(1)
}

/// Application state
pub static APPLICATION: AppCell<ZebradApp> = AppCell::new();

Expand Down Expand Up @@ -462,7 +469,11 @@ impl Application for ZebradApp {
let _ = stderr().lock().flush();

if let Err(e) = self.state().components.shutdown(self, shutdown) {
fatal_error(self, &e)
let app_name = self.name().to_string();

// Swap out a fake app so we can trigger the destructor on the original
let _ = std::mem::take(self);
fatal_error(app_name, &e);
}

// Swap out a fake app so we can trigger the destructor on the original
Expand Down
29 changes: 27 additions & 2 deletions zebrad/src/components/tracing/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@
use abscissa_core::{Component, FrameworkError, Shutdown};
use tracing_error::ErrorLayer;
use tracing_subscriber::{
fmt::Formatter, layer::SubscriberExt, reload::Handle, util::SubscriberInitExt, EnvFilter,
fmt::{format, Formatter},
layer::SubscriberExt,
reload::Handle,
util::SubscriberInitExt,
EnvFilter,
};

use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};

use crate::{application::app_version, config::TracingSection};

#[cfg(feature = "flamegraph")]
Expand All @@ -16,14 +22,23 @@ pub struct Tracing {
/// The installed filter reloading handle, if enabled.
//
// TODO: when fmt::Subscriber supports per-layer filtering, remove the Option
filter_handle: Option<Handle<EnvFilter, Formatter>>,
filter_handle: Option<
Handle<
EnvFilter,
Formatter<format::DefaultFields, format::Format<format::Full>, NonBlocking>,
>,
>,

/// The originally configured filter.
initial_filter: String,

/// The installed flame graph collector, if enabled.
#[cfg(feature = "flamegraph")]
flamegrapher: Option<flame::Grapher>,

/// Drop guard for worker thread of non-blocking logger,
/// responsible for flushing any remaining logs when the program terminates
_guard: WorkerGuard,
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
}

impl Tracing {
Expand All @@ -32,6 +47,13 @@ impl Tracing {
let filter = config.filter.unwrap_or_else(|| "".to_string());
let flame_root = &config.flamegraph;

// Builds a lossy NonBlocking logger with a default line limit of 128_000 or an explicit buffer_limit.
// The write method queues lines down a bounded channel with this capacity to a worker thread that writes to stdout.
// Increments error_counter and drops lines when the buffer is full.
let (non_blocking, _guard) = NonBlockingBuilder::default()
.buffered_lines_limit(config.buffer_limit.max(100))
.finish(std::io::stdout());

// Only use color if tracing output is being sent to a terminal or if it was explicitly
// forced to.
let use_color =
Expand All @@ -47,6 +69,7 @@ impl Tracing {

let logger = FmtSubscriber::builder()
.with_ansi(use_color)
.with_writer(non_blocking)
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
.with_env_filter(&filter);

// Enable reloading if that feature is selected.
Expand Down Expand Up @@ -82,6 +105,7 @@ impl Tracing {
// Using `FmtSubscriber` as the base subscriber, all the logs get filtered.
let logger = fmt::Layer::new()
.with_ansi(use_color)
.with_writer(non_blocking)
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
.with_filter(EnvFilter::from(&filter));

let subscriber = subscriber.with(logger);
Expand Down Expand Up @@ -185,6 +209,7 @@ impl Tracing {
initial_filter: filter,
#[cfg(feature = "flamegraph")]
flamegrapher,
_guard,
})
}

Expand Down
7 changes: 7 additions & 0 deletions zebrad/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ pub struct TracingSection {
/// verification of every 1000th block.
pub filter: Option<String>,

/// The buffer_limit size sets the number of log lines that can be queued by the tracing subscriber
/// to be written to stdout before logs are dropped.
///
/// Defaults to 128,000 with a minimum of 100.
pub buffer_limit: usize,

/// The address used for an ad-hoc RPC endpoint allowing dynamic control of the tracing filter.
///
/// Install Zebra using `cargo install --features=filter-reload` to enable this config.
Expand Down Expand Up @@ -140,6 +146,7 @@ impl Default for TracingSection {
use_color: true,
force_use_color: false,
filter: None,
buffer_limit: 128_000,
endpoint_addr: None,
flamegraph: None,
use_journald: false,
Expand Down
71 changes: 71 additions & 0 deletions zebrad/tests/acceptance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,77 @@ async fn rpc_endpoint(parallel_cpu_threads: bool) -> Result<()> {
Ok(())
}

#[test]
fn non_blocking_logger() -> Result<()> {
use futures::FutureExt;
use std::{sync::mpsc, time::Duration};

let rt = tokio::runtime::Runtime::new().unwrap();
let (done_tx, done_rx) = mpsc::channel();

let test_task_handle: tokio::task::JoinHandle<Result<()>> = rt.spawn(async move {
let _init_guard = zebra_test::init();

// Write a configuration that has RPC listen_addr set
// [Note on port conflict](#Note on port conflict)
let mut config = random_known_rpc_port_config(false)?;
config.tracing.filter = Some("trace".to_string());
config.tracing.buffer_limit = 100;
let zebra_rpc_address = config.rpc.listen_addr.unwrap();

let dir = testdir()?.with_config(&mut config)?;
let mut child = dir.spawn_child(args!["start"])?;
// Wait until port is open.
child.expect_stdout_line_matches(
format!("Opened RPC endpoint at {}", config.rpc.listen_addr.unwrap()).as_str(),
)?;

// Create an http client
let client = reqwest::Client::new();

// Most of Zebra's lines are 100-200 characters long, so 500 requests should print enough to fill the unix pipe,
// fill the channel that tracing logs are queued onto, and drop logs rather than block execution.
for _ in 0..500 {
let res = client
.post(format!("http://{}", &zebra_rpc_address))
.body(r#"{"jsonrpc":"1.0","method":"getinfo","params":[],"id":123}"#)
.header("Content-Type", "application/json")
.send()
.await?;

// Test that zebrad rpc endpoint is still responding to requests
assert!(res.status().is_success());
}

child.kill(false)?;

let output = child.wait_with_output()?;
let output = output.assert_failure()?;

// [Note on port conflict](#Note on port conflict)
output
.assert_was_killed()
.wrap_err("Possible port conflict. Are there other acceptance tests running?")?;

done_tx.send(())?;

Ok(())
});

// Wait until the spawned task finishes or return an error in 45 seconds
if done_rx.recv_timeout(Duration::from_secs(45)).is_err() {
return Err(eyre!("unexpected test task hang"));
}

rt.shutdown_timeout(Duration::from_secs(3));

match test_task_handle.now_or_never() {
Some(Ok(result)) => result,
Some(Err(error)) => Err(eyre!("join error: {:?}", error)),
None => Err(eyre!("unexpected test task hang")),
}
}

/// Make sure `lightwalletd` works with Zebra, when both their states are empty.
///
/// This test only runs when the `ZEBRA_TEST_LIGHTWALLETD` env var is set.
Expand Down