Skip to content

Commit

Permalink
Fix storages replication (eclipse-zenoh#644)
Browse files Browse the repository at this point in the history
  • Loading branch information
JEnoch authored Jan 11, 2024
1 parent ef888b9 commit 2ad877c
Showing 1 changed file with 13 additions and 5 deletions.
18 changes: 13 additions & 5 deletions plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::collections::{HashMap, HashSet};
use std::str;
use zenoh::key_expr::{KeyExpr, OwnedKeyExpr};
use zenoh::prelude::r#async::*;
use zenoh::query::QueryConsolidation;
use zenoh::time::Timestamp;
use zenoh::Session;

Expand Down Expand Up @@ -88,7 +87,10 @@ impl Aligner {
let checksum = other.checksum;
let timestamp = other.timestamp;
let (missing_content, no_content_err) = self.get_missing_content(&other, from).await;
log::trace!("[ALIGNER] Missing content is {:?}", missing_content);
log::debug!(
"[ALIGNER] Missing {} entries; query corresponding samples",
missing_content.len()
);

// If missing content is not identified, it showcases some problem
// The problem will be addressed in the future rounds, hence will not count as processed
Expand All @@ -98,11 +100,12 @@ impl Aligner {
.await;

// Missing data might be empty since some samples in digest might be outdated
log::trace!("[ALIGNER] Missing data is {:?}", missing_data);
log::debug!("[ALIGNER] Received {} queried samples", missing_data.len());
log::trace!("[ALIGNER] Received queried samples: {missing_data:?}");

for (key, (ts, value)) in missing_data {
let sample = Sample::new(key, value).with_timestamp(ts);
log::debug!("[ALIGNER] Adding sample {:?} to storage", sample);
log::debug!("[ALIGNER] Adding {:?} to storage", sample);
self.tx_sample.send_async(sample).await.unwrap_or_else(|e| {
log::error!("[ALIGNER] Error adding sample to storage: {}", e)
});
Expand Down Expand Up @@ -140,6 +143,7 @@ impl Aligner {
}

async fn get_missing_content(&self, other: &Digest, from: &str) -> (Vec<LogEntry>, bool) {
log::debug!("[ALIGNER] Get missing content from {from} ...");
// get my digest
let this = &self.snapshotter.get_digest().await;

Expand All @@ -152,6 +156,9 @@ impl Aligner {

let ((cold_data, no_cold_err), (warm_data, no_warm_err), (hot_data, no_hot_err)) =
futures::join!(cold_alignment, warm_alignment, hot_alignment);
log::debug!("[ALIGNER] Missing content from {from} in Cold era: {cold_data:?}");
log::debug!("[ALIGNER] Missing content from {from} in Warm era: {warm_data:?}");
log::debug!("[ALIGNER] Missing content from {from} in Hot era: {hot_data:?}");
(
[cold_data, warm_data, hot_data].concat(),
no_cold_err && no_warm_err && no_hot_err,
Expand Down Expand Up @@ -313,7 +320,7 @@ impl Aligner {
match self
.session
.get(&selector)
.consolidation(QueryConsolidation::AUTO)
.consolidation(zenoh::query::ConsolidationMode::None)
.accept_replies(zenoh::query::ReplyKeyExpr::Any)
.res()
.await
Expand Down Expand Up @@ -345,6 +352,7 @@ impl Aligner {
no_err = false;
}
};
log::trace!("[ALIGNER] On Query '{selector}' received: {return_val:?} (no_err:{no_err})");
(return_val, no_err)
}
}

0 comments on commit 2ad877c

Please sign in to comment.