From 12b11ee6f2f1a9390dccfeb3633be2f88b568a50 Mon Sep 17 00:00:00 2001 From: J-Loudet Date: Mon, 8 Jul 2024 15:08:09 +0200 Subject: [PATCH] fix(storage-manager): validate presence of timestamp (#1229) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit introduces checks before accessing the `timestamp` associated with a Sample — instead of calling `unwrap()`. In theory, a Sample should never arrive to a Storage without a Timestamp. In practice, we cannot guarantee this invariant with certainty (future modifications of the code base?). With these checks, the Storage will simply discard the Sample instead of panicking the entire storage manager. * plugins/zenoh-plugin-storage-manager/src/replica/storage.rs: add checks when accessing the timestamp and remove `unwrap`. Signed-off-by: Julien Loudet --- .../src/replica/storage.rs | 41 +++++++++++++------ 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs index 17be005f08..d12b51042c 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs @@ -271,6 +271,17 @@ impl StorageService { // the trimming during PUT and GET should be handled by the plugin async fn process_sample(&self, sample: Sample) { tracing::trace!("[STORAGE] Processing sample: {:?}", sample); + + // A Sample, in theory, will not arrive to a Storage without a Timestamp. This check (which, again, should + // never enter the `None` branch) ensures that the Storage Manager does not panic even if it ever happens. + let sample_timestamp = match sample.timestamp() { + Some(timestamp) => timestamp, + None => { + tracing::error!("Discarding Sample that has no Timestamp: {:?}", sample); + return; + } + }; + // if wildcard, update wildcard_updates if sample.key_expr().is_wild() { self.register_wildcard_update(sample.clone()).await; @@ -288,12 +299,10 @@ impl StorageService { ); for k in matching_keys { - if !self - .is_deleted(&k.clone(), sample.timestamp().unwrap()) - .await + if !self.is_deleted(&k.clone(), sample_timestamp).await && (self.capability.history.eq(&History::All) || (self.capability.history.eq(&History::Latest) - && self.is_latest(&k, sample.timestamp().unwrap()).await)) + && self.is_latest(&k, sample_timestamp).await)) { tracing::trace!( "Sample `{:?}` identified as needed processing for key {}", @@ -302,9 +311,8 @@ impl StorageService { ); // there might be the case that the actual update was outdated due to a wild card update, but not stored yet in the storage. // get the relevant wild card entry and use that value and timestamp to update the storage - let sample_to_store: Sample = if let Some(update) = self - .ovderriding_wild_update(&k, sample.timestamp().unwrap()) - .await + let sample_to_store: Sample = if let Some(update) = + self.ovderriding_wild_update(&k, sample_timestamp).await { match update.kind { SampleKind::Put => { @@ -323,6 +331,16 @@ impl StorageService { .into() }; + // A Sample that is to be stored **must** have a Timestamp. In theory, the Sample generated should have + // a Timestamp and, in theory, this check is unneeded. + let sample_to_store_timestamp = match sample_to_store.timestamp() { + Some(timestamp) => *timestamp, + None => { + tracing::error!("Discarding `Sample` generated through `SampleBuilder` that has no Timestamp: {:?}", sample_to_store); + continue; + } + }; + let stripped_key = match self.strip_prefix(sample_to_store.key_expr()) { Ok(stripped) => stripped, Err(e) => { @@ -340,16 +358,15 @@ impl StorageService { sample_to_store.payload().clone(), sample_to_store.encoding().clone(), ), - *sample_to_store.timestamp().unwrap(), + sample_to_store_timestamp, ) .await } SampleKind::Delete => { // register a tombstone - self.mark_tombstone(&k, *sample_to_store.timestamp().unwrap()) - .await; + self.mark_tombstone(&k, sample_to_store_timestamp).await; storage - .delete(stripped_key, *sample_to_store.timestamp().unwrap()) + .delete(stripped_key, sample_to_store_timestamp) .await } }; @@ -363,7 +380,7 @@ impl StorageService { .as_ref() .unwrap() .log_propagation - .send((k.clone(), *sample_to_store.timestamp().unwrap())); + .send((k.clone(), sample_to_store_timestamp)); match sending { Ok(_) => (), Err(e) => {