Skip to content

Commit

Permalink
fix(test) switch zebrad to a non-blocking tracing logger (#5032)
Browse files Browse the repository at this point in the history
* adds non-blocking writer for tracing subscriber

* use non_blocking writer for the fmt::Layer with the tokio-console feature as well

* adds doc comment to _guard field

* adds acceptance test

* update filter_handle type to use NonBlocking

* adds more detail on lossy non-blocking writer and sets tracing.filter to "trace" in acceptance test

* drops ZebradApp before process::exit(1) in the event of a FrameworkError

* reduces buffered lines limit to 8000

* adds tracing.buffer_limit config and some comments

* update acceptance.rs

* fix acceptance test

* fixes ambigious phrasing in comment

* updates zebrad/src/application.rs

* Find out what the join error is in the GitHub runner tests

* updates acceptance test to use recv_timeout instead of always waiting 10 seconds, removes unnecessary echo command, and reduces # of rpc requests to 500

* see if sleeping for a few seconds before exiting helps the macOS test pass

* Expand exit sleep docs

Co-authored-by: Arya <[email protected]>

Co-authored-by: teor <[email protected]>
  • Loading branch information
arya2 and teor2345 authored Sep 7, 2022
1 parent fb2a1e8 commit d9fae6e
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 4 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions zebra-state/src/service/finalized_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ impl FinalizedState {
// So we want to drop it before we exit.
std::mem::drop(new_state);

// Drops tracing log output that's hasn't already been written to stdout
// since this exits before calling drop on the WorkerGuard for the logger thread.
// This is okay for now because this is test-only code
//
// TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout
Self::exit_process();
}
}
Expand Down Expand Up @@ -356,6 +361,11 @@ impl FinalizedState {
// We're just about to do a forced exit, so it's ok to do a forced db shutdown
self.db.shutdown(true);

// Drops tracing log output that's hasn't already been written to stdout
// since this exits before calling drop on the WorkerGuard for the logger thread.
// This is okay for now because this is test-only code
//
// TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout
Self::exit_process();
}

Expand Down Expand Up @@ -392,6 +402,13 @@ impl FinalizedState {
let _ = stdout().lock().flush();
let _ = stderr().lock().flush();

// Give some time to logger thread to flush out any remaining lines to stdout
// and yield so that tests pass on MacOS
std::thread::sleep(std::time::Duration::from_secs(3));

// Exits before calling drop on the WorkerGuard for the logger thread,
// dropping any lines that haven't already been written to stdout.
// This is okay for now because this is test-only code
std::process::exit(0);
}
}
1 change: 1 addition & 0 deletions zebrad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ tinyvec = { version = "1.6.0", features = ["rustc_1_55"] }
thiserror = "1.0.34"

tracing-subscriber = { version = "0.3.11", features = ["env-filter"] }
tracing-appender = "0.2.2"
tracing-error = "0.2.0"
tracing-futures = "0.2.5"
tracing = "0.1.31"
Expand Down
15 changes: 13 additions & 2 deletions zebrad/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use self::entry_point::EntryPoint;
use std::{fmt::Write as _, io::Write as _, process};

use abscissa_core::{
application::{self, fatal_error, AppCell},
application::{self, AppCell},
config::{self, Configurable},
status_err,
terminal::{component::Terminal, stderr, stdout, ColorChoice},
Expand All @@ -18,6 +18,13 @@ use zebra_state::constants::{DATABASE_FORMAT_VERSION, LOCK_FILE_ERROR};

use crate::{commands::ZebradCmd, components::tracing::Tracing, config::ZebradConfig};

/// See <https://docs.rs/abscissa_core/latest/src/abscissa_core/application/exit.rs.html#7-10>
/// Print a fatal error message and exit
fn fatal_error(app_name: String, err: &dyn std::error::Error) -> ! {
status_err!("{} fatal error: {}", app_name, err);
process::exit(1)
}

/// Application state
pub static APPLICATION: AppCell<ZebradApp> = AppCell::new();

Expand Down Expand Up @@ -462,7 +469,11 @@ impl Application for ZebradApp {
let _ = stderr().lock().flush();

if let Err(e) = self.state().components.shutdown(self, shutdown) {
fatal_error(self, &e)
let app_name = self.name().to_string();

// Swap out a fake app so we can trigger the destructor on the original
let _ = std::mem::take(self);
fatal_error(app_name, &e);
}

// Swap out a fake app so we can trigger the destructor on the original
Expand Down
29 changes: 27 additions & 2 deletions zebrad/src/components/tracing/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@
use abscissa_core::{Component, FrameworkError, Shutdown};
use tracing_error::ErrorLayer;
use tracing_subscriber::{
fmt::Formatter, layer::SubscriberExt, reload::Handle, util::SubscriberInitExt, EnvFilter,
fmt::{format, Formatter},
layer::SubscriberExt,
reload::Handle,
util::SubscriberInitExt,
EnvFilter,
};

use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};

use crate::{application::app_version, config::TracingSection};

#[cfg(feature = "flamegraph")]
Expand All @@ -16,14 +22,23 @@ pub struct Tracing {
/// The installed filter reloading handle, if enabled.
//
// TODO: when fmt::Subscriber supports per-layer filtering, remove the Option
filter_handle: Option<Handle<EnvFilter, Formatter>>,
filter_handle: Option<
Handle<
EnvFilter,
Formatter<format::DefaultFields, format::Format<format::Full>, NonBlocking>,
>,
>,

/// The originally configured filter.
initial_filter: String,

/// The installed flame graph collector, if enabled.
#[cfg(feature = "flamegraph")]
flamegrapher: Option<flame::Grapher>,

/// Drop guard for worker thread of non-blocking logger,
/// responsible for flushing any remaining logs when the program terminates
_guard: WorkerGuard,
}

impl Tracing {
Expand All @@ -32,6 +47,13 @@ impl Tracing {
let filter = config.filter.unwrap_or_else(|| "".to_string());
let flame_root = &config.flamegraph;

// Builds a lossy NonBlocking logger with a default line limit of 128_000 or an explicit buffer_limit.
// The write method queues lines down a bounded channel with this capacity to a worker thread that writes to stdout.
// Increments error_counter and drops lines when the buffer is full.
let (non_blocking, _guard) = NonBlockingBuilder::default()
.buffered_lines_limit(config.buffer_limit.max(100))
.finish(std::io::stdout());

// Only use color if tracing output is being sent to a terminal or if it was explicitly
// forced to.
let use_color =
Expand All @@ -47,6 +69,7 @@ impl Tracing {

let logger = FmtSubscriber::builder()
.with_ansi(use_color)
.with_writer(non_blocking)
.with_env_filter(&filter);

// Enable reloading if that feature is selected.
Expand Down Expand Up @@ -82,6 +105,7 @@ impl Tracing {
// Using `FmtSubscriber` as the base subscriber, all the logs get filtered.
let logger = fmt::Layer::new()
.with_ansi(use_color)
.with_writer(non_blocking)
.with_filter(EnvFilter::from(&filter));

let subscriber = subscriber.with(logger);
Expand Down Expand Up @@ -185,6 +209,7 @@ impl Tracing {
initial_filter: filter,
#[cfg(feature = "flamegraph")]
flamegrapher,
_guard,
})
}

Expand Down
7 changes: 7 additions & 0 deletions zebrad/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ pub struct TracingSection {
/// verification of every 1000th block.
pub filter: Option<String>,

/// The buffer_limit size sets the number of log lines that can be queued by the tracing subscriber
/// to be written to stdout before logs are dropped.
///
/// Defaults to 128,000 with a minimum of 100.
pub buffer_limit: usize,

/// The address used for an ad-hoc RPC endpoint allowing dynamic control of the tracing filter.
///
/// Install Zebra using `cargo install --features=filter-reload` to enable this config.
Expand Down Expand Up @@ -140,6 +146,7 @@ impl Default for TracingSection {
use_color: true,
force_use_color: false,
filter: None,
buffer_limit: 128_000,
endpoint_addr: None,
flamegraph: None,
use_journald: false,
Expand Down
71 changes: 71 additions & 0 deletions zebrad/tests/acceptance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,77 @@ async fn rpc_endpoint(parallel_cpu_threads: bool) -> Result<()> {
Ok(())
}

#[test]
fn non_blocking_logger() -> Result<()> {
use futures::FutureExt;
use std::{sync::mpsc, time::Duration};

let rt = tokio::runtime::Runtime::new().unwrap();
let (done_tx, done_rx) = mpsc::channel();

let test_task_handle: tokio::task::JoinHandle<Result<()>> = rt.spawn(async move {
let _init_guard = zebra_test::init();

// Write a configuration that has RPC listen_addr set
// [Note on port conflict](#Note on port conflict)
let mut config = random_known_rpc_port_config(false)?;
config.tracing.filter = Some("trace".to_string());
config.tracing.buffer_limit = 100;
let zebra_rpc_address = config.rpc.listen_addr.unwrap();

let dir = testdir()?.with_config(&mut config)?;
let mut child = dir.spawn_child(args!["start"])?;
// Wait until port is open.
child.expect_stdout_line_matches(
format!("Opened RPC endpoint at {}", config.rpc.listen_addr.unwrap()).as_str(),
)?;

// Create an http client
let client = reqwest::Client::new();

// Most of Zebra's lines are 100-200 characters long, so 500 requests should print enough to fill the unix pipe,
// fill the channel that tracing logs are queued onto, and drop logs rather than block execution.
for _ in 0..500 {
let res = client
.post(format!("http://{}", &zebra_rpc_address))
.body(r#"{"jsonrpc":"1.0","method":"getinfo","params":[],"id":123}"#)
.header("Content-Type", "application/json")
.send()
.await?;

// Test that zebrad rpc endpoint is still responding to requests
assert!(res.status().is_success());
}

child.kill(false)?;

let output = child.wait_with_output()?;
let output = output.assert_failure()?;

// [Note on port conflict](#Note on port conflict)
output
.assert_was_killed()
.wrap_err("Possible port conflict. Are there other acceptance tests running?")?;

done_tx.send(())?;

Ok(())
});

// Wait until the spawned task finishes or return an error in 45 seconds
if done_rx.recv_timeout(Duration::from_secs(45)).is_err() {
return Err(eyre!("unexpected test task hang"));
}

rt.shutdown_timeout(Duration::from_secs(3));

match test_task_handle.now_or_never() {
Some(Ok(result)) => result,
Some(Err(error)) => Err(eyre!("join error: {:?}", error)),
None => Err(eyre!("unexpected test task hang")),
}
}

/// Make sure `lightwalletd` works with Zebra, when both their states are empty.
///
/// This test only runs when the `ZEBRA_TEST_LIGHTWALLETD` env var is set.
Expand Down

0 comments on commit d9fae6e

Please sign in to comment.