From 342ec5b32353d4263fff9f18e5a4ab4983730f93 Mon Sep 17 00:00:00 2001 From: Michael Droettboom Date: Mon, 6 Jul 2020 14:54:20 -0400 Subject: [PATCH 1/4] Bug 1644359: Support error reporting in Python bindings This works by initializing a full Glean in the subprocess that *doesn't* do any startup work such as setting metrics or submitting pings. This makes it safe to initialize Glean in the subprocess, and thus errors can be reported there. --- glean-core/ffi/glean.h | 7 +- glean-core/ffi/src/lib.rs | 103 +++--------------- glean-core/ios/Glean/GleanFfi.h | 7 +- .../python/glean/net/ping_upload_worker.py | 21 ++-- glean-core/python/tests/test_network.py | 48 ++++++++ glean-core/src/lib.rs | 23 ++-- glean-core/src/upload/mod.rs | 42 +------ 7 files changed, 101 insertions(+), 150 deletions(-) diff --git a/glean-core/ffi/glean.h b/glean-core/ffi/glean.h index 88c557829d..87bf31c1fe 100644 --- a/glean-core/ffi/glean.h +++ b/glean-core/ffi/glean.h @@ -427,7 +427,12 @@ void glean_get_upload_task(FfiPingUploadTask *result, uint8_t log_ping); */ uint8_t glean_initialize(const FfiConfiguration *cfg); -uint8_t glean_initialize_standalone_uploader(FfiStr data_dir, FfiStr language_binding_name); +/** + * # Safety + * + * A valid and non-null configuration object is required for this function. + */ +uint8_t glean_initialize_for_subprocess(const FfiConfiguration *cfg); uint8_t glean_is_dirty_flag_set(void); diff --git a/glean-core/ffi/src/lib.rs b/glean-core/ffi/src/lib.rs index 0c185f9cf9..d0371d6203 100644 --- a/glean-core/ffi/src/lib.rs +++ b/glean-core/ffi/src/lib.rs @@ -12,7 +12,6 @@ use ffi_support::{define_string_destructor, ConcurrentHandleMap, FfiStr, IntoFfi pub use glean_core::metrics::MemoryUnit; pub use glean_core::metrics::TimeUnit; pub use glean_core::upload::ffi_upload_result::*; -use glean_core::upload::PingUploadManager; use glean_core::Glean; pub use glean_core::Lifetime; @@ -128,48 +127,6 @@ where with_glean_mut(|glean| Ok(callback(glean))) } -/// Execute the callback with a reference to the PingUploadManager singleton, -/// returning a `Result`. -/// -/// The callback returns a `Result` while: -/// -/// - Catching panics, and logging them. -/// - Converting `T` to a C-compatible type using [`IntoFfi`]. -/// - Logging `E` and returning a default value. -pub(crate) fn with_standalone_uploader(callback: F) -> R::Value -where - F: UnwindSafe + FnOnce(&PingUploadManager) -> Result, - R: IntoFfi, -{ - let mut error = ffi_support::ExternError::success(); - let res = ffi_support::abort_on_panic::call_with_result(&mut error, || { - match glean_core::upload::global_upload_manager() { - Some(upload_manager) => { - let upload_manager = upload_manager.lock().unwrap(); - callback(&upload_manager) - } - None => Err(glean_core::Error::not_initialized()), - } - }); - handlemap_ext::log_if_error(error); - res -} - -/// Execute the callback with a reference to the PingUploadManager singleton, -/// returning a value. -/// -/// The callback returns a value while: -/// -/// - Catching panics, and logging them. -/// - Converting the returned value to a C-compatible type using [`IntoFfi`]. -pub(crate) fn with_standalone_uploader_value(callback: F) -> R::Value -where - F: UnwindSafe + FnOnce(&PingUploadManager) -> R, - R: IntoFfi, -{ - with_standalone_uploader(|ping_uploader| Ok(callback(ping_uploader))) -} - /// Initialize the logging system based on the target platform. This ensures /// that logging is shown when executing the Glean SDK unit tests. #[no_mangle] @@ -402,19 +359,6 @@ pub extern "C" fn glean_is_first_run() -> u8 { // * `result`: the object the output task will be written to. #[no_mangle] pub extern "C" fn glean_get_upload_task(result: *mut FfiPingUploadTask, log_ping: u8) { - // If an upload manager instance is available, use that (it should only happen - // in processes which do not initialize Glean). - if glean_core::upload::global_upload_manager().is_some() { - with_standalone_uploader_value(|ping_uploader| { - let ffi_task = FfiPingUploadTask::from(ping_uploader.get_upload_task(log_ping != 0)); - unsafe { - std::ptr::write(result, ffi_task); - } - }); - return; - } - - // Otherwise with_glean_value(|glean| { let ffi_task = FfiPingUploadTask::from(glean.get_upload_task(log_ping != 0)); unsafe { @@ -451,22 +395,6 @@ pub unsafe extern "C" fn glean_process_ping_upload_response( // but as it controls the memory, we put something valid in place, just in case. let task = std::ptr::replace(task, FfiPingUploadTask::Done); - // If an upload manager instance is available, use that (it should only happen - // in processes which do not initialize Glean). - if glean_core::upload::global_upload_manager().is_some() { - with_standalone_uploader(|ping_uploader| { - if let FfiPingUploadTask::Upload { document_id, .. } = task { - assert!(!document_id.is_null()); - let document_id_str = CStr::from_ptr(document_id) - .to_str() - .map_err(|_| glean_core::Error::utf8_error())?; - ping_uploader.process_ping_upload_response(document_id_str, status.into()); - }; - Ok(()) - }); - return; - } - with_glean(|glean| { if let FfiPingUploadTask::Upload { document_id, .. } = task { assert!(!document_id.is_null()); @@ -479,25 +407,22 @@ pub unsafe extern "C" fn glean_process_ping_upload_response( }); } +/// # Safety +/// +/// A valid and non-null configuration object is required for this function. #[no_mangle] -pub extern "C" fn glean_initialize_standalone_uploader( - data_dir: FfiStr, - language_binding_name: FfiStr, -) -> u8 { +pub unsafe extern "C" fn glean_initialize_for_subprocess(cfg: *const FfiConfiguration) -> u8 { + assert!(!cfg.is_null()); + handlemap_ext::handle_result(|| { - // Init the upload manager to perform a synchronous ping directory scan. - // Since this method is meant to be called from a process used exclusively - // for uploading, this is fine. - let mut upload_manager = PingUploadManager::new( - data_dir.to_string_fallible()?, - &language_binding_name.to_string_fallible()?, - true, - ); - upload_manager.set_rate_limiter( - /* seconds per interval */ 60, /* max tasks per interval */ 10, - ); - glean_core::upload::setup_upload_manager(upload_manager)?; - log::info!("Glean initialized in upload-only mode"); + // We can create a reference to the FfiConfiguration struct: + // 1. We did a null check + // 2. We're not holding on to it beyond this function + // and we copy out all data when needed. + let glean_cfg = glean_core::Configuration::try_from(&*cfg)?; + let glean = Glean::new_for_subprocess(&glean_cfg)?; + glean_core::setup_glean(glean)?; + log::info!("Glean initialized for subprocess"); Ok(true) }) } diff --git a/glean-core/ios/Glean/GleanFfi.h b/glean-core/ios/Glean/GleanFfi.h index 88c557829d..87bf31c1fe 100644 --- a/glean-core/ios/Glean/GleanFfi.h +++ b/glean-core/ios/Glean/GleanFfi.h @@ -427,7 +427,12 @@ void glean_get_upload_task(FfiPingUploadTask *result, uint8_t log_ping); */ uint8_t glean_initialize(const FfiConfiguration *cfg); -uint8_t glean_initialize_standalone_uploader(FfiStr data_dir, FfiStr language_binding_name); +/** + * # Safety + * + * A valid and non-null configuration object is required for this function. + */ +uint8_t glean_initialize_for_subprocess(const FfiConfiguration *cfg); uint8_t glean_is_dirty_flag_set(void); diff --git a/glean-core/python/glean/net/ping_upload_worker.py b/glean-core/python/glean/net/ping_upload_worker.py index 02580f1c8e..c3d8e556dd 100644 --- a/glean-core/python/glean/net/ping_upload_worker.py +++ b/glean-core/python/glean/net/ping_upload_worker.py @@ -7,6 +7,7 @@ import logging from pathlib import Path import re +import sys import time from typing import List, Tuple @@ -50,7 +51,7 @@ def _process(cls): from .. import Glean return ProcessDispatcher.dispatch( - _process, (Glean._data_dir, Glean._configuration) + _process, (Glean._data_dir, Glean._application_id, Glean._configuration) ) @classmethod @@ -101,21 +102,21 @@ def _parse_ping_headers( return headers -def _process(data_dir: Path, configuration) -> bool: +def _process(data_dir: Path, application_id: str, configuration) -> bool: # Import here to avoid cyclical import from ..glean import Glean if not Glean.is_initialized(): - # Always load the Glean shared object / dll even if we're in a (ping upload worker) - # subprocess. - # To make startup time better in subprocesses, consumers can initialize just the - # ping upload manager. - data_dir = ffi_support.new("char[]", _ffi.ffi_encode_string(str(data_dir))) - language_binding_name = ffi_support.new( - "char[]", _ffi.ffi_encode_string(_ffi.LANGUAGE_BINDING_NAME) + # We don't want to send pings or otherwise update the database during + # initialization in a subprocess, so we use + # `glean_initialize_for_subprocess` rather than `glean_initialize` here. + cfg = _ffi.make_config( + data_dir, application_id, True, configuration.max_events, ) - _ffi.lib.glean_initialize_standalone_uploader(data_dir, language_binding_name) + if _ffi.lib.glean_initialize_for_subprocess(cfg) == 0: + log.error("Couldn't initialize Glean in subprocess") + sys.exit(1) wait_attempts = 0 diff --git a/glean-core/python/tests/test_network.py b/glean-core/python/tests/test_network.py index 2cd2ff8d1f..083f2dcca7 100644 --- a/glean-core/python/tests/test_network.py +++ b/glean-core/python/tests/test_network.py @@ -7,11 +7,30 @@ from glean import Glean +from glean import metrics +from glean._process_dispatcher import ProcessDispatcher from glean.net import PingUploadWorker from glean.net.http_client import HttpClientUploader from glean.net import ping_uploader +def get_upload_failure_metric(): + return metrics.LabeledCounterMetricType( + disabled=False, + send_in_pings=["metrics"], + name="ping_upload_failure", + category="glean.upload", + labels=[ + "status_code_4xx", + "status_code_5xx", + "status_code_unknown", + "unrecoverable", + "recoverable", + ], + lifetime=metrics.Lifetime.PING, + ) + + def test_400_error(safe_httpserver): safe_httpserver.serve_content(b"", code=400) @@ -24,6 +43,20 @@ def test_400_error(safe_httpserver): assert 1 == len(safe_httpserver.requests) +def test_400_error_submit(safe_httpserver): + safe_httpserver.serve_content(b"", code=400) + + Glean._configuration._server_endpoint = safe_httpserver.url + Glean._submit_ping_by_name("baseline") + ProcessDispatcher._wait_for_last_process() + + assert 1 == len(safe_httpserver.requests) + + metric = get_upload_failure_metric() + assert 1 == metric["status_code_4xx"].test_get_value() + assert not metric["status_code_5xx"].test_has_value() + + def test_500_error(safe_httpserver): safe_httpserver.serve_content(b"", code=500) @@ -36,6 +69,21 @@ def test_500_error(safe_httpserver): assert 1 == len(safe_httpserver.requests) +def test_500_error_submit(safe_httpserver): + safe_httpserver.serve_content(b"", code=500) + + Glean._configuration._server_endpoint = safe_httpserver.url + Glean._submit_ping_by_name("baseline") + ProcessDispatcher._wait_for_last_process() + + # This kind of recoverable error will be tried 10 times + assert 10 == len(safe_httpserver.requests) + + metric = get_upload_failure_metric() + assert not metric["status_code_4xx"].test_has_value() + assert 10 == metric["status_code_5xx"].test_get_value() + + def test_unknown_scheme(): response = HttpClientUploader.upload( url="ftp://example.com/", data=b"{}", headers=[] diff --git a/glean-core/src/lib.rs b/glean-core/src/lib.rs index ec51336acd..10a41338c3 100644 --- a/glean-core/src/lib.rs +++ b/glean-core/src/lib.rs @@ -180,11 +180,10 @@ pub struct Glean { } impl Glean { - /// Create and initialize a new Glean object. - /// - /// This will create the necessary directories and files in `data_path`. - /// This will also initialize the core metrics. - pub fn new(cfg: Configuration) -> Result { + /// Create and initialize a new Glean object for use in a subprocess. + /// Importantly, this will not send any pings at startup, since that + /// sort of management should only happen in the main process. + pub fn new_for_subprocess(cfg: &Configuration) -> Result { log::info!("Creating new Glean v{}", GLEAN_VERSION); let application_id = sanitize_application_id(&cfg.application_id); @@ -204,21 +203,29 @@ impl Glean { /* seconds per interval */ 60, /* max tasks per interval */ 10, ); - let mut glean = Self { + Ok(Self { upload_enabled: cfg.upload_enabled, data_store, event_data_store, core_metrics: CoreMetrics::new(), internal_pings: InternalPings::new(), upload_manager, - data_path: PathBuf::from(cfg.data_path), + data_path: PathBuf::from(cfg.data_path.to_string()), application_id, ping_registry: HashMap::new(), start_time: local_now_with_offset(), max_events: cfg.max_events.unwrap_or(DEFAULT_MAX_EVENTS), is_first_run: false, debug_view_tag: None, - }; + }) + } + + /// Create and initialize a new Glean object. + /// + /// This will create the necessary directories and files in `data_path`. + /// This will also initialize the core metrics. + pub fn new(cfg: Configuration) -> Result { + let mut glean = Self::new_for_subprocess(&cfg)?; // The upload enabled flag may have changed since the last run, for // example by the changing of a config file. diff --git a/glean-core/src/upload/mod.rs b/glean-core/src/upload/mod.rs index 5bea989eb9..17202f6015 100644 --- a/glean-core/src/upload/mod.rs +++ b/glean-core/src/upload/mod.rs @@ -12,13 +12,10 @@ use std::collections::VecDeque; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard}; +use std::sync::{Arc, RwLock, RwLockWriteGuard}; use std::thread; use std::time::{Duration, Instant}; -use once_cell::sync::OnceCell; - -use crate::error::Result; use directory::PingDirectoryManager; pub use request::{HeaderMap, PingRequest}; pub use result::{ffi_upload_result, UploadResult}; @@ -27,43 +24,6 @@ mod directory; mod request; mod result; -/// A global Glean upload manager instance. -/// -/// This is only used by processes who exclusively need to manage -/// ping upload and do not want to perform a full Glean initialization. -static UPLOAD_MANAGER: OnceCell> = OnceCell::new(); - -/// Get a reference to the global Upload Manager object. -pub fn global_upload_manager() -> Option<&'static Mutex> { - UPLOAD_MANAGER.get() -} - -/// Set or replace the global Glean object. -pub fn setup_upload_manager(upload_manager: PingUploadManager) -> Result<()> { - // The `OnceCell` type wrapping our PingUploadManager is thread-safe and can only be set once. - // Therefore even if our check for it being empty succeeds, setting it could fail if a - // concurrent thread is quicker in setting it. - // However this will not cause a bigger problem, as the second `set` operation will just fail. - // We can log it and move on. - // - // For all wrappers this is not a problem, as the uploader object is intialized exactly once on - // calling the FFI `glean_standalone_uploader`. - if UPLOAD_MANAGER.get().is_none() { - if UPLOAD_MANAGER.set(Mutex::new(upload_manager)).is_err() { - log::error!( - "Global Upload Manager object is initialized already. This probably happened concurrently." - ) - } - } else { - // We allow overriding the global upload manager object to support test mode. - // In test mode the upload manager object is fully destroyed and recreated. - // This all happens behind a mutex and is therefore also thread-safe.. - let mut lock = UPLOAD_MANAGER.get().unwrap().lock().unwrap(); - *lock = upload_manager; - } - Ok(()) -} - #[derive(Debug)] struct RateLimiter { /// The instant the current interval has started. From 80dc61588d96ccc08cf092f4a8f193fe859a11a4 Mon Sep 17 00:00:00 2001 From: Michael Droettboom Date: Tue, 7 Jul 2020 08:41:27 -0400 Subject: [PATCH 2/4] Change to borrow --- glean-core/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/glean-core/src/lib.rs b/glean-core/src/lib.rs index 10a41338c3..e8fda89c6e 100644 --- a/glean-core/src/lib.rs +++ b/glean-core/src/lib.rs @@ -210,7 +210,7 @@ impl Glean { core_metrics: CoreMetrics::new(), internal_pings: InternalPings::new(), upload_manager, - data_path: PathBuf::from(cfg.data_path.to_string()), + data_path: PathBuf::from(&cfg.data_path), application_id, ping_registry: HashMap::new(), start_time: local_now_with_offset(), From 2798a9bcf765b29c632f16fcd092383425099b94 Mon Sep 17 00:00:00 2001 From: Michael Droettboom Date: Tue, 7 Jul 2020 08:45:05 -0400 Subject: [PATCH 3/4] Add CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 100e68b2de..c7d571c4fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * BUGFIX: raise an error if Glean is initialized with an empty string as the `application_id`. * Python * BUGFIX: correctly set the `app_build` metric to the newly provided `application_build_id` initialization option. + * The Python bindings now report networking errors in the `glean.upload.ping_upload_failure` metric (like all the other bindings). [Full changelog](https://github.com/mozilla/glean/compare/v31.2.3...main) From 14d9928bde82ea2662153731866875981329890a Mon Sep 17 00:00:00 2001 From: Michael Droettboom Date: Tue, 7 Jul 2020 12:28:24 -0400 Subject: [PATCH 4/4] Improve testing --- glean-core/python/tests/conftest.py | 18 ++++++++++ glean-core/python/tests/test_network.py | 44 +++++++++++++++++++++++-- 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/glean-core/python/tests/conftest.py b/glean-core/python/tests/conftest.py index bc45401dd8..b9cc440357 100644 --- a/glean-core/python/tests/conftest.py +++ b/glean-core/python/tests/conftest.py @@ -42,6 +42,24 @@ def safe_httpserver(httpserver): return httpserver +@pytest.fixture +def slow_httpserver(httpserver): + """ + An httpserver that takes 0.5 seconds to respond. + """ + wait_for_server(httpserver) + + orig_call = httpserver.__call__ + + def __call__(self, *args, **kwargs): + time.sleep(0.5) + return orig_call(*args, **kwargs) + + httpserver.__call__ = __call__ + + return httpserver + + @pytest.fixture def safe_httpsserver(httpsserver): wait_for_server(httpsserver) diff --git a/glean-core/python/tests/test_network.py b/glean-core/python/tests/test_network.py index 083f2dcca7..349b831acf 100644 --- a/glean-core/python/tests/test_network.py +++ b/glean-core/python/tests/test_network.py @@ -43,9 +43,11 @@ def test_400_error(safe_httpserver): assert 1 == len(safe_httpserver.requests) -def test_400_error_submit(safe_httpserver): +def test_400_error_submit(safe_httpserver, monkeypatch): safe_httpserver.serve_content(b"", code=400) + # Force the ping upload worker into a separate process + monkeypatch.setattr(PingUploadWorker, "process", PingUploadWorker._process) Glean._configuration._server_endpoint = safe_httpserver.url Glean._submit_ping_by_name("baseline") ProcessDispatcher._wait_for_last_process() @@ -69,9 +71,11 @@ def test_500_error(safe_httpserver): assert 1 == len(safe_httpserver.requests) -def test_500_error_submit(safe_httpserver): +def test_500_error_submit(safe_httpserver, monkeypatch): safe_httpserver.serve_content(b"", code=500) + # Force the ping upload worker into a separate process + monkeypatch.setattr(PingUploadWorker, "process", PingUploadWorker._process) Glean._configuration._server_endpoint = safe_httpserver.url Glean._submit_ping_by_name("baseline") ProcessDispatcher._wait_for_last_process() @@ -84,6 +88,42 @@ def test_500_error_submit(safe_httpserver): assert 10 == metric["status_code_5xx"].test_get_value() +def test_500_error_submit_concurrent_writing(slow_httpserver, monkeypatch): + # This tests that concurrently writing to the database from the main process + # and the ping uploading subprocess. + slow_httpserver.serve_content(b"", code=500) + + counter = metrics.CounterMetricType( + disabled=False, + category="test", + name="counter", + send_in_pings=["metrics"], + lifetime=metrics.Lifetime.PING, + ) + + # Force the ping upload worker into a separate process + monkeypatch.setattr(PingUploadWorker, "process", PingUploadWorker._process) + Glean._configuration._server_endpoint = slow_httpserver.url + Glean._submit_ping_by_name("baseline") + + # While the uploader is running, increment the counter as fast as we can + times = 0 + last_process = ProcessDispatcher._last_process + while last_process.poll() is None: + counter.add() + times += 1 + + # This kind of recoverable error will be tried 10 times + assert 10 == len(slow_httpserver.requests) + + metric = get_upload_failure_metric() + assert not metric["status_code_4xx"].test_has_value() + assert 10 == metric["status_code_5xx"].test_get_value() + + assert times > 0 + assert times == counter.test_get_value() + + def test_unknown_scheme(): response = HttpClientUploader.upload( url="ftp://example.com/", data=b"{}", headers=[]