diff --git a/Cargo.lock b/Cargo.lock index 4a9bbaec5432..e0e7e525c48b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4900,6 +4900,7 @@ dependencies = [ "nix", "once_cell", "pin-project-lite", + "postgres_connection", "pq_proto", "rand", "regex", diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 0c1310eef90b..eaea266f321c 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -31,6 +31,8 @@ fn lsn_invalid() -> Lsn { #[serde_as] #[derive(Debug, Clone, Deserialize, Serialize)] pub struct SkTimelineInfo { + /// Term. + pub term: Option, /// Term of the last entry. pub last_log_term: Option, /// LSN of the last record. @@ -58,4 +60,6 @@ pub struct SkTimelineInfo { /// A connection string to use for WAL receiving. #[serde(default)] pub safekeeper_connstr: Option, + #[serde(default)] + pub http_connstr: Option, } diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index f01131540edc..8a1d2e984e24 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -37,6 +37,7 @@ url.workspace = true uuid.workspace = true pq_proto.workspace = true +postgres_connection.workspace = true metrics.workspace = true workspace_hack.workspace = true diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 638dba427bd3..6cf829a67c2a 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -58,6 +58,8 @@ pub mod serde_regex; pub mod pageserver_feedback; +pub mod postgres_client; + pub mod tracing_span_assert; pub mod rate_limit; diff --git a/libs/utils/src/postgres_client.rs b/libs/utils/src/postgres_client.rs new file mode 100644 index 000000000000..dba74f5b0b1e --- /dev/null +++ b/libs/utils/src/postgres_client.rs @@ -0,0 +1,37 @@ +//! Postgres client connection code common to other crates (safekeeper and +//! pageserver) which depends on tenant/timeline ids and thus not fitting into +//! postgres_connection crate. + +use anyhow::Context; +use postgres_connection::{parse_host_port, PgConnectionConfig}; + +use crate::id::TenantTimelineId; + +/// Create client config for fetching WAL from safekeeper on particular timeline. +/// listen_pg_addr_str is in form host:\[port\]. +pub fn wal_stream_connection_config( + TenantTimelineId { + tenant_id, + timeline_id, + }: TenantTimelineId, + listen_pg_addr_str: &str, + auth_token: Option<&str>, + availability_zone: Option<&str>, +) -> anyhow::Result { + let (host, port) = + parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?; + let port = port.unwrap_or(5432); + let mut connstr = PgConnectionConfig::new_host_port(host, port) + .extend_options([ + "-c".to_owned(), + format!("timeline_id={}", timeline_id), + format!("tenant_id={}", tenant_id), + ]) + .set_password(auth_token.map(|s| s.to_owned())); + + if let Some(availability_zone) = availability_zone { + connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]); + } + + Ok(connstr) +} diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index d52b29f1756e..dfc824f59006 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -31,10 +31,11 @@ use storage_broker::Streaming; use tokio::select; use tracing::*; -use postgres_connection::{parse_host_port, PgConnectionConfig}; +use postgres_connection::PgConnectionConfig; use utils::backoff::{ exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, }; +use utils::postgres_client::wal_stream_connection_config; use utils::{ id::{NodeId, TenantTimelineId}, lsn::Lsn, @@ -876,33 +877,6 @@ impl ReconnectReason { } } -fn wal_stream_connection_config( - TenantTimelineId { - tenant_id, - timeline_id, - }: TenantTimelineId, - listen_pg_addr_str: &str, - auth_token: Option<&str>, - availability_zone: Option<&str>, -) -> anyhow::Result { - let (host, port) = - parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?; - let port = port.unwrap_or(5432); - let mut connstr = PgConnectionConfig::new_host_port(host, port) - .extend_options([ - "-c".to_owned(), - format!("timeline_id={}", timeline_id), - format!("tenant_id={}", tenant_id), - ]) - .set_password(auth_token.map(|s| s.to_owned())); - - if let Some(availability_zone) = availability_zone { - connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]); - } - - Ok(connstr) -} - #[cfg(test)] mod tests { use super::*; @@ -918,6 +892,7 @@ mod tests { timeline: SafekeeperTimelineInfo { safekeeper_id: 0, tenant_timeline_id: None, + term: 0, last_log_term: 0, flush_lsn: 0, commit_lsn, @@ -926,6 +901,7 @@ mod tests { peer_horizon_lsn: 0, local_start_lsn: 0, safekeeper_connstr: safekeeper_connstr.to_owned(), + http_connstr: safekeeper_connstr.to_owned(), availability_zone: None, }, latest_update, diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 8d2201b0eb60..848b1d76441c 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -341,21 +341,35 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100); - // Load all timelines from disk to memory. - GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?; - // Keep handles to main tasks to die if any of them disappears. let mut tasks_handles: FuturesUnordered> = FuturesUnordered::new(); + // Start wal backup launcher before loading timelines as we'll notify it + // through the channel about timelines which need offloading, not draining + // the channel would cause deadlock. + let current_thread_rt = conf + .current_thread_runtime + .then(|| Handle::try_current().expect("no runtime in main")); + let conf_ = conf.clone(); + let wal_backup_handle = current_thread_rt + .as_ref() + .unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle()) + .spawn(wal_backup::wal_backup_launcher_task_main( + conf_, + wal_backup_launcher_rx, + )) + .map(|res| ("WAL backup launcher".to_owned(), res)); + tasks_handles.push(Box::pin(wal_backup_handle)); + + // Load all timelines from disk to memory. + GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx).await?; + let conf_ = conf.clone(); // Run everything in current thread rt, if asked. if conf.current_thread_runtime { info!("running in current thread runtime"); } - let current_thread_rt = conf - .current_thread_runtime - .then(|| Handle::try_current().expect("no runtime in main")); let wal_service_handle = current_thread_rt .as_ref() @@ -408,17 +422,6 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { .map(|res| ("WAL remover".to_owned(), res)); tasks_handles.push(Box::pin(wal_remover_handle)); - let conf_ = conf.clone(); - let wal_backup_handle = current_thread_rt - .as_ref() - .unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle()) - .spawn(wal_backup::wal_backup_launcher_task_main( - conf_, - wal_backup_launcher_rx, - )) - .map(|res| ("WAL backup launcher".to_owned(), res)); - tasks_handles.push(Box::pin(wal_backup_handle)); - set_build_info_metric(GIT_VERSION); // TODO: update tokio-stream, convert to real async Stream with diff --git a/safekeeper/src/control_file_upgrade.rs b/safekeeper/src/control_file_upgrade.rs index 95cb96fae9c8..a7f17df797f7 100644 --- a/safekeeper/src/control_file_upgrade.rs +++ b/safekeeper/src/control_file_upgrade.rs @@ -1,7 +1,6 @@ //! Code to deal with safekeeper control file upgrades use crate::safekeeper::{ - AcceptorState, PersistedPeers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, - TermSwitchEntry, + AcceptorState, PersistedPeers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermLsn, }; use anyhow::{bail, Result}; use pq_proto::SystemId; @@ -145,7 +144,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result let oldstate = SafeKeeperStateV1::des(&buf[..buf.len()])?; let ac = AcceptorState { term: oldstate.acceptor_state.term, - term_history: TermHistory(vec![TermSwitchEntry { + term_history: TermHistory(vec![TermLsn { term: oldstate.acceptor_state.epoch, lsn: Lsn(0), }]), diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index b5a8a14558d7..6104b54f4447 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -19,6 +19,7 @@ use crate::receive_wal::WalReceiverState; use crate::safekeeper::ServerInfo; use crate::safekeeper::Term; use crate::send_wal::WalSenderState; +use crate::timeline::PeerInfo; use crate::{debug_dump, pull_timeline}; use crate::timelines_global_map::TimelineDeleteForceResult; @@ -101,6 +102,7 @@ pub struct TimelineStatus { pub peer_horizon_lsn: Lsn, #[serde_as(as = "DisplayFromStr")] pub remote_consistent_lsn: Lsn, + pub peers: Vec, pub walsenders: Vec, pub walreceivers: Vec, } @@ -140,6 +142,7 @@ async fn timeline_status_handler(request: Request) -> Result) -> Result) -> Result, term: Term, lsn: Lsn) -> any let history = tli.get_state().await.1.acceptor_state.term_history; let history = history.up_to(lsn.checked_sub(1u64).unwrap()); let mut history_entries = history.0; - history_entries.push(TermSwitchEntry { term, lsn }); + history_entries.push(TermLsn { term, lsn }); let history = TermHistory(history_entries); let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected { diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index f8adb86250b9..d785d0e53a2b 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -19,6 +19,7 @@ pub mod json_ctrl; pub mod metrics; pub mod pull_timeline; pub mod receive_wal; +pub mod recovery; pub mod remove_wal; pub mod safekeeper; pub mod send_wal; diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index 8d8ef6192c87..a2ed4c0cf4c8 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -227,7 +227,9 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?; tokio::fs::rename(tli_dir_path, &timeline_path).await?; - let tli = GlobalTimelines::load_timeline(ttid).context("Failed to load timeline after copy")?; + let tli = GlobalTimelines::load_timeline(ttid) + .await + .context("Failed to load timeline after copy")?; info!( "Loaded timeline {}, flush_lsn={}", diff --git a/safekeeper/src/recovery.rs b/safekeeper/src/recovery.rs new file mode 100644 index 000000000000..90ba3d2e164e --- /dev/null +++ b/safekeeper/src/recovery.rs @@ -0,0 +1,40 @@ +//! This module implements pulling WAL from peer safekeepers if compute can't +//! provide it, i.e. safekeeper lags too much. + +use std::sync::Arc; + +use tokio::{select, time::sleep, time::Duration}; +use tracing::{info, instrument}; + +use crate::{timeline::Timeline, SafeKeeperConf}; + +/// Entrypoint for per timeline task which always runs, checking whether +/// recovery for this safekeeper is needed and starting it if so. +#[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))] +pub async fn recovery_main(tli: Arc, _conf: SafeKeeperConf) { + info!("started"); + let mut cancellation_rx = match tli.get_cancellation_rx() { + Ok(rx) => rx, + Err(_) => { + info!("timeline canceled during task start"); + return; + } + }; + + select! { + _ = recovery_main_loop(tli) => { unreachable!() } + _ = cancellation_rx.changed() => { + info!("stopped"); + } + } +} + +const CHECK_INTERVAL_MS: u64 = 2000; + +/// Check regularly whether we need to start recovery. +async fn recovery_main_loop(_tli: Arc) { + let check_duration = Duration::from_millis(CHECK_INTERVAL_MS); + loop { + sleep(check_duration).await; + } +} diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index d0b14a1282ef..b9fcd2c0b26c 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -34,22 +34,33 @@ pub const UNKNOWN_SERVER_VERSION: u32 = 0; /// Consensus logical timestamp. pub type Term = u64; -const INVALID_TERM: Term = 0; +pub const INVALID_TERM: Term = 0; -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub struct TermSwitchEntry { +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub struct TermLsn { pub term: Term, pub lsn: Lsn, } + +// Creation from tuple provides less typing (e.g. for unit tests). +impl From<(Term, Lsn)> for TermLsn { + fn from(pair: (Term, Lsn)) -> TermLsn { + TermLsn { + term: pair.0, + lsn: pair.1, + } + } +} + #[derive(Clone, Serialize, Deserialize)] -pub struct TermHistory(pub Vec); +pub struct TermHistory(pub Vec); impl TermHistory { pub fn empty() -> TermHistory { TermHistory(Vec::new()) } - // Parse TermHistory as n_entries followed by TermSwitchEntry pairs + // Parse TermHistory as n_entries followed by TermLsn pairs pub fn from_bytes(bytes: &mut Bytes) -> Result { if bytes.remaining() < 4 { bail!("TermHistory misses len"); @@ -60,7 +71,7 @@ impl TermHistory { if bytes.remaining() < 16 { bail!("TermHistory is incomplete"); } - res.push(TermSwitchEntry { + res.push(TermLsn { term: bytes.get_u64_le(), lsn: bytes.get_u64_le().into(), }) @@ -557,12 +568,17 @@ where .up_to(self.flush_lsn()) } + /// Get current term. + pub fn get_term(&self) -> Term { + self.state.acceptor_state.term + } + pub fn get_epoch(&self) -> Term { self.state.acceptor_state.get_epoch(self.flush_lsn()) } /// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet. - fn flush_lsn(&self) -> Lsn { + pub fn flush_lsn(&self) -> Lsn { max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn) } @@ -1138,7 +1154,7 @@ mod tests { let pem = ProposerElected { term: 1, start_streaming_at: Lsn(1), - term_history: TermHistory(vec![TermSwitchEntry { + term_history: TermHistory(vec![TermLsn { term: 1, lsn: Lsn(3), }]), diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 808177d2c439..b684083446e8 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -2,12 +2,12 @@ //! with the "START_REPLICATION" message, and registry of walsenders. use crate::handler::SafekeeperPostgresHandler; -use crate::safekeeper::Term; +use crate::safekeeper::{Term, TermLsn}; use crate::timeline::Timeline; use crate::wal_service::ConnectionId; use crate::wal_storage::WalReader; use crate::GlobalTimelines; -use anyhow::Context as AnyhowContext; +use anyhow::{bail, Context as AnyhowContext}; use bytes::Bytes; use parking_lot::Mutex; use postgres_backend::PostgresBackend; @@ -390,26 +390,25 @@ impl SafekeeperPostgresHandler { self.appname.clone(), )); - let commit_lsn_watch_rx = tli.get_commit_lsn_watch_rx(); - - // Walproposer gets special handling: safekeeper must give proposer all - // local WAL till the end, whether committed or not (walproposer will - // hang otherwise). That's because walproposer runs the consensus and - // synchronizes safekeepers on the most advanced one. + // Walsender can operate in one of two modes which we select by + // application_name: give only committed WAL (used by pageserver) or all + // existing WAL (up to flush_lsn, used by walproposer or peer recovery). + // The second case is always driven by a consensus leader which term + // must generally be also supplied. However we're sloppy to do this in + // walproposer recovery which will be removed soon. So TODO is to make + // it not Option'al then. // - // There is a small risk of this WAL getting concurrently garbaged if - // another compute rises which collects majority and starts fixing log - // on this safekeeper itself. That's ok as (old) proposer will never be - // able to commit such WAL. - let stop_pos: Option = if self.is_walproposer_recovery() { - let wal_end = tli.get_flush_lsn().await; - Some(wal_end) + // Fetching WAL without term in recovery creates a small risk of this + // WAL getting concurrently garbaged if another compute rises which + // collects majority and starts fixing log on this safekeeper itself. + // That's ok as (old) proposer will never be able to commit such WAL. + let end_watch = if self.is_walproposer_recovery() { + EndWatch::Flush(tli.get_term_flush_lsn_watch_rx()) } else { - None + EndWatch::Commit(tli.get_commit_lsn_watch_rx()) }; - - // take the latest commit_lsn if don't have stop_pos - let end_pos = stop_pos.unwrap_or(*commit_lsn_watch_rx.borrow()); + // we don't check term here; it will be checked on first waiting/WAL reading anyway. + let end_pos = end_watch.get(); if end_pos < start_pos { warn!( @@ -419,8 +418,10 @@ impl SafekeeperPostgresHandler { } info!( - "starting streaming from {:?} till {:?}, available WAL ends at {}", - start_pos, stop_pos, end_pos + "starting streaming from {:?}, available WAL ends at {}, recovery={}", + start_pos, + end_pos, + matches!(end_watch, EndWatch::Flush(_)) ); // switch to copy @@ -445,9 +446,8 @@ impl SafekeeperPostgresHandler { appname, start_pos, end_pos, - stop_pos, term, - commit_lsn_watch_rx, + end_watch, ws_guard: ws_guard.clone(), wal_reader, send_buf: [0; MAX_SEND_SIZE], @@ -466,6 +466,32 @@ impl SafekeeperPostgresHandler { } } +/// Walsender streams either up to commit_lsn (normally) or flush_lsn in the +/// given term (recovery by walproposer or peer safekeeper). +enum EndWatch { + Commit(Receiver), + Flush(Receiver), +} + +impl EndWatch { + /// Get current end of WAL. + fn get(&self) -> Lsn { + match self { + EndWatch::Commit(r) => *r.borrow(), + EndWatch::Flush(r) => r.borrow().lsn, + } + } + + /// Wait for the update. + async fn changed(&mut self) -> anyhow::Result<()> { + match self { + EndWatch::Commit(r) => r.changed().await?, + EndWatch::Flush(r) => r.changed().await?, + } + Ok(()) + } +} + /// A half driving sending WAL. struct WalSender<'a, IO> { pgb: &'a mut PostgresBackend, @@ -480,14 +506,12 @@ struct WalSender<'a, IO> { // We send this LSN to the receiver as wal_end, so that it knows how much // WAL this safekeeper has. This LSN should be as fresh as possible. end_pos: Lsn, - // If present, terminate after reaching this position; used by walproposer - // in recovery. - stop_pos: Option, /// When streaming uncommitted part, the term the client acts as the leader /// in. Streaming is stopped if local term changes to a different (higher) /// value. term: Option, - commit_lsn_watch_rx: Receiver, + /// Watch channel receiver to learn end of available WAL (and wait for its advancement). + end_watch: EndWatch, ws_guard: Arc, wal_reader: WalReader, // buffer for readling WAL into to send it @@ -497,29 +521,20 @@ struct WalSender<'a, IO> { impl WalSender<'_, IO> { /// Send WAL until /// - an error occurs - /// - if we are streaming to walproposer, we've streamed until stop_pos - /// (recovery finished) - /// - receiver is caughtup and there is no computes + /// - receiver is caughtup and there is no computes (if streaming up to commit_lsn) /// /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ? /// convenience. async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> { loop { - // If we are streaming to walproposer, check it is time to stop. - if let Some(stop_pos) = self.stop_pos { - if self.start_pos >= stop_pos { - // recovery finished - return Err(CopyStreamHandlerEnd::ServerInitiated(format!( - "ending streaming to walproposer at {}, recovery finished", - self.start_pos - ))); - } - } else { - // Wait for the next portion if it is not there yet, or just - // update our end of WAL available for sending value, we - // communicate it to the receiver. - self.wait_wal().await?; - } + // Wait for the next portion if it is not there yet, or just + // update our end of WAL available for sending value, we + // communicate it to the receiver. + self.wait_wal().await?; + assert!( + self.end_pos > self.start_pos, + "nothing to send after waiting for WAL" + ); // try to send as much as available, capped by MAX_SEND_SIZE let mut send_size = self @@ -567,7 +582,7 @@ impl WalSender<'_, IO> { /// exit in the meanwhile async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> { loop { - self.end_pos = *self.commit_lsn_watch_rx.borrow(); + self.end_pos = self.end_watch.get(); if self.end_pos > self.start_pos { // We have something to send. trace!("got end_pos {:?}, streaming", self.end_pos); @@ -575,27 +590,31 @@ impl WalSender<'_, IO> { } // Wait for WAL to appear, now self.end_pos == self.start_pos. - if let Some(lsn) = wait_for_lsn(&mut self.commit_lsn_watch_rx, self.start_pos).await? { + if let Some(lsn) = wait_for_lsn(&mut self.end_watch, self.term, self.start_pos).await? { self.end_pos = lsn; trace!("got end_pos {:?}, streaming", self.end_pos); return Ok(()); } - // Timed out waiting for WAL, check for termination and send KA - if let Some(remote_consistent_lsn) = self - .ws_guard - .walsenders - .get_ws_remote_consistent_lsn(self.ws_guard.id) - { - if self.tli.should_walsender_stop(remote_consistent_lsn).await { - // Terminate if there is nothing more to send. - // Note that "ending streaming" part of the string is used by - // pageserver to identify WalReceiverError::SuccessfulCompletion, - // do not change this string without updating pageserver. - return Err(CopyStreamHandlerEnd::ServerInitiated(format!( + // Timed out waiting for WAL, check for termination and send KA. + // Check for termination only if we are streaming up to commit_lsn + // (to pageserver). + if let EndWatch::Commit(_) = self.end_watch { + if let Some(remote_consistent_lsn) = self + .ws_guard + .walsenders + .get_ws_remote_consistent_lsn(self.ws_guard.id) + { + if self.tli.should_walsender_stop(remote_consistent_lsn).await { + // Terminate if there is nothing more to send. + // Note that "ending streaming" part of the string is used by + // pageserver to identify WalReceiverError::SuccessfulCompletion, + // do not change this string without updating pageserver. + return Err(CopyStreamHandlerEnd::ServerInitiated(format!( "ending streaming to {:?} at {}, receiver is caughtup and there is no computes", self.appname, self.start_pos, ))); + } } } @@ -663,22 +682,32 @@ impl ReplyReader { const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1); -/// Wait until we have commit_lsn > lsn or timeout expires. Returns -/// - Ok(Some(commit_lsn)) if needed lsn is successfully observed; +/// Wait until we have available WAL > start_pos or timeout expires. Returns +/// - Ok(Some(end_pos)) if needed lsn is successfully observed; /// - Ok(None) if timeout expired; -/// - Err in case of error (if watch channel is in trouble, shouldn't happen). -async fn wait_for_lsn(rx: &mut Receiver, lsn: Lsn) -> anyhow::Result> { +/// - Err in case of error -- only if 1) term changed while fetching in recovery +/// mode 2) watch channel closed, which must never happen. +async fn wait_for_lsn( + rx: &mut EndWatch, + client_term: Option, + start_pos: Lsn, +) -> anyhow::Result> { let res = timeout(POLL_STATE_TIMEOUT, async move { - let mut commit_lsn; loop { - rx.changed().await?; - commit_lsn = *rx.borrow(); - if commit_lsn > lsn { - break; + let end_pos = rx.get(); + if end_pos > start_pos { + return Ok(end_pos); } + if let EndWatch::Flush(rx) = rx { + let curr_term = rx.borrow().term; + if let Some(client_term) = client_term { + if curr_term != client_term { + bail!("term changed: requested {}, now {}", client_term, curr_term); + } + } + } + rx.changed().await?; } - - Ok(commit_lsn) }) .await; diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index f0edd5396670..3e066de34fec 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -3,8 +3,11 @@ use anyhow::{anyhow, bail, Result}; use postgres_ffi::XLogSegNo; +use serde::{Deserialize, Serialize}; +use serde_with::serde_as; use tokio::fs; +use serde_with::DisplayFromStr; use std::cmp::max; use std::path::PathBuf; use std::sync::Arc; @@ -24,9 +27,10 @@ use storage_broker::proto::SafekeeperTimelineInfo; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use crate::receive_wal::WalReceivers; +use crate::recovery::recovery_main; use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, - SafekeeperMemState, ServerInfo, Term, + SafekeeperMemState, ServerInfo, Term, TermLsn, INVALID_TERM, }; use crate::send_wal::WalSenders; use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION}; @@ -37,18 +41,25 @@ use crate::SafeKeeperConf; use crate::{debug_dump, wal_storage}; /// Things safekeeper should know about timeline state on peers. -#[derive(Debug, Clone)] +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct PeerInfo { pub sk_id: NodeId, /// Term of the last entry. _last_log_term: Term, /// LSN of the last record. + #[serde_as(as = "DisplayFromStr")] _flush_lsn: Lsn, + #[serde_as(as = "DisplayFromStr")] pub commit_lsn: Lsn, /// Since which LSN safekeeper has WAL. TODO: remove this once we fill new /// sk since backup_lsn. + #[serde_as(as = "DisplayFromStr")] pub local_start_lsn: Lsn, - /// When info was received. + /// When info was received. Serde annotations are not very useful but make + /// the code compile -- we don't rely on this field externally. + #[serde(skip)] + #[serde(default = "Instant::now")] ts: Instant, } @@ -237,8 +248,9 @@ impl SharedState { tenant_id: ttid.tenant_id.as_ref().to_owned(), timeline_id: ttid.timeline_id.as_ref().to_owned(), }), + term: self.sk.state.acceptor_state.term, last_log_term: self.sk.get_epoch(), - flush_lsn: self.sk.wal_store.flush_lsn().0, + flush_lsn: self.sk.flush_lsn().0, // note: this value is not flushed to control file yet and can be lost commit_lsn: self.sk.inmem.commit_lsn.0, remote_consistent_lsn: remote_consistent_lsn.0, @@ -247,6 +259,7 @@ impl SharedState { .advertise_pg_addr .to_owned() .unwrap_or(conf.listen_pg_addr.clone()), + http_connstr: conf.listen_http_addr.to_owned(), backup_lsn: self.sk.inmem.backup_lsn.0, local_start_lsn: self.sk.state.local_start_lsn.0, availability_zone: conf.availability_zone.clone(), @@ -296,6 +309,13 @@ pub struct Timeline { commit_lsn_watch_tx: watch::Sender, commit_lsn_watch_rx: watch::Receiver, + /// Broadcasts (current term, flush_lsn) updates, walsender is interested in + /// them when sending in recovery mode (to walproposer or peers). Note: this + /// is just a notification, WAL reading should always done with lock held as + /// term can change otherwise. + term_flush_lsn_watch_tx: watch::Sender, + term_flush_lsn_watch_rx: watch::Receiver, + /// Safekeeper and other state, that should remain consistent and /// synchronized with the disk. This is tokio mutex as we write WAL to disk /// while holding it, ensuring that consensus checks are in order. @@ -317,16 +337,20 @@ pub struct Timeline { impl Timeline { /// Load existing timeline from disk. pub fn load_timeline( - conf: SafeKeeperConf, + conf: &SafeKeeperConf, ttid: TenantTimelineId, wal_backup_launcher_tx: Sender, ) -> Result { let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered(); - let shared_state = SharedState::restore(&conf, &ttid)?; + let shared_state = SharedState::restore(conf, &ttid)?; let rcl = shared_state.sk.state.remote_consistent_lsn; let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(shared_state.sk.state.commit_lsn); + let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = watch::channel(TermLsn::from(( + shared_state.sk.get_term(), + shared_state.sk.flush_lsn(), + ))); let (cancellation_tx, cancellation_rx) = watch::channel(false); Ok(Timeline { @@ -334,6 +358,8 @@ impl Timeline { wal_backup_launcher_tx, commit_lsn_watch_tx, commit_lsn_watch_rx, + term_flush_lsn_watch_tx, + term_flush_lsn_watch_rx, mutex: Mutex::new(shared_state), walsenders: WalSenders::new(rcl), walreceivers: WalReceivers::new(), @@ -345,7 +371,7 @@ impl Timeline { /// Create a new timeline, which is not yet persisted to disk. pub fn create_empty( - conf: SafeKeeperConf, + conf: &SafeKeeperConf, ttid: TenantTimelineId, wal_backup_launcher_tx: Sender, server_info: ServerInfo, @@ -353,6 +379,8 @@ impl Timeline { local_start_lsn: Lsn, ) -> Result { let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID); + let (term_flush_lsn_watch_tx, term_flush_lsn_watch_rx) = + watch::channel(TermLsn::from((INVALID_TERM, Lsn::INVALID))); let (cancellation_tx, cancellation_rx) = watch::channel(false); let state = SafeKeeperState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn); @@ -361,7 +389,9 @@ impl Timeline { wal_backup_launcher_tx, commit_lsn_watch_tx, commit_lsn_watch_rx, - mutex: Mutex::new(SharedState::create_new(&conf, &ttid, state)?), + term_flush_lsn_watch_tx, + term_flush_lsn_watch_rx, + mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?), walsenders: WalSenders::new(Lsn(0)), walreceivers: WalReceivers::new(), cancellation_rx, @@ -370,12 +400,16 @@ impl Timeline { }) } - /// Initialize fresh timeline on disk and start background tasks. If bootstrap + /// Initialize fresh timeline on disk and start background tasks. If init /// fails, timeline is cancelled and cannot be used anymore. /// - /// Bootstrap is transactional, so if it fails, created files will be deleted, + /// Init is transactional, so if it fails, created files will be deleted, /// and state on disk should remain unchanged. - pub async fn bootstrap(&self, shared_state: &mut MutexGuard<'_, SharedState>) -> Result<()> { + pub async fn init_new( + self: &Arc, + shared_state: &mut MutexGuard<'_, SharedState>, + conf: &SafeKeeperConf, + ) -> Result<()> { match fs::metadata(&self.timeline_dir).await { Ok(_) => { // Timeline directory exists on disk, we should leave state unchanged @@ -391,7 +425,7 @@ impl Timeline { // Create timeline directory. fs::create_dir_all(&self.timeline_dir).await?; - // Write timeline to disk and TODO: start background tasks. + // Write timeline to disk and start background tasks. if let Err(e) = shared_state.sk.persist().await { // Bootstrap failed, cancel timeline and remove timeline directory. self.cancel(shared_state); @@ -405,12 +439,16 @@ impl Timeline { return Err(e); } - - // TODO: add more initialization steps here - self.update_status(shared_state); + self.bootstrap(conf); Ok(()) } + /// Bootstrap new or existing timeline starting background stasks. + pub fn bootstrap(self: &Arc, conf: &SafeKeeperConf) { + // Start recovery task which always runs on the timeline. + tokio::spawn(recovery_main(self.clone(), conf.clone())); + } + /// Delete timeline from disk completely, by removing timeline directory. Background /// timeline activities will stop eventually. pub async fn delete_from_disk( @@ -444,6 +482,16 @@ impl Timeline { *self.cancellation_rx.borrow() } + /// Returns watch channel which gets value when timeline is cancelled. It is + /// guaranteed to have not cancelled value observed (errors otherwise). + pub fn get_cancellation_rx(&self) -> Result> { + let rx = self.cancellation_rx.clone(); + if *rx.borrow() { + bail!(TimelineError::Cancelled(self.ttid)); + } + Ok(rx) + } + /// Take a writing mutual exclusive lock on timeline shared_state. pub async fn write_shared_state(&self) -> MutexGuard { self.mutex.lock().await @@ -520,6 +568,11 @@ impl Timeline { self.commit_lsn_watch_rx.clone() } + /// Returns term_flush_lsn watch channel. + pub fn get_term_flush_lsn_watch_rx(&self) -> watch::Receiver { + self.term_flush_lsn_watch_rx.clone() + } + /// Pass arrived message to the safekeeper. pub async fn process_msg( &self, @@ -531,6 +584,7 @@ impl Timeline { let mut rmsg: Option; let commit_lsn: Lsn; + let term_flush_lsn: TermLsn; { let mut shared_state = self.write_shared_state().await; rmsg = shared_state.sk.process_msg(msg).await?; @@ -544,8 +598,11 @@ impl Timeline { } commit_lsn = shared_state.sk.inmem.commit_lsn; + term_flush_lsn = + TermLsn::from((shared_state.sk.get_term(), shared_state.sk.flush_lsn())); } self.commit_lsn_watch_tx.send(commit_lsn)?; + self.term_flush_lsn_watch_tx.send(term_flush_lsn)?; Ok(rmsg) } diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index eda5b9044ecd..2f591655a946 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -11,7 +11,7 @@ use serde::Serialize; use std::collections::HashMap; use std::path::PathBuf; use std::str::FromStr; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::{Arc, Mutex}; use tokio::sync::mpsc::Sender; use tracing::*; use utils::id::{TenantId, TenantTimelineId, TimelineId}; @@ -71,19 +71,23 @@ pub struct GlobalTimelines; impl GlobalTimelines { /// Inject dependencies needed for the timeline constructors and load all timelines to memory. - pub fn init( + pub async fn init( conf: SafeKeeperConf, wal_backup_launcher_tx: Sender, ) -> Result<()> { - let mut state = TIMELINES_STATE.lock().unwrap(); - assert!(state.wal_backup_launcher_tx.is_none()); - state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx); - state.conf = Some(conf); - - // Iterate through all directories and load tenants for all directories - // named as a valid tenant_id. + // clippy isn't smart enough to understand that drop(state) releases the + // lock, so use explicit block + let tenants_dir = { + let mut state = TIMELINES_STATE.lock().unwrap(); + assert!(state.wal_backup_launcher_tx.is_none()); + state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx); + state.conf = Some(conf); + + // Iterate through all directories and load tenants for all directories + // named as a valid tenant_id. + state.get_conf().workdir.clone() + }; let mut tenant_count = 0; - let tenants_dir = state.get_conf().workdir.clone(); for tenants_dir_entry in std::fs::read_dir(&tenants_dir) .with_context(|| format!("failed to list tenants dir {}", tenants_dir.display()))? { @@ -93,7 +97,7 @@ impl GlobalTimelines { TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or("")) { tenant_count += 1; - GlobalTimelines::load_tenant_timelines(&mut state, tenant_id)?; + GlobalTimelines::load_tenant_timelines(tenant_id).await?; } } Err(e) => error!( @@ -108,7 +112,7 @@ impl GlobalTimelines { info!( "found {} tenants directories, successfully loaded {} timelines", tenant_count, - state.timelines.len() + TIMELINES_STATE.lock().unwrap().timelines.len() ); Ok(()) } @@ -116,17 +120,21 @@ impl GlobalTimelines { /// Loads all timelines for the given tenant to memory. Returns fs::read_dir /// errors if any. /// - /// Note: This function (and all reading/loading below) is sync because - /// timelines are loaded while holding GlobalTimelinesState lock. Which is - /// fine as this is called only from single threaded main runtime on boot, - /// but clippy complains anyway, and suppressing that isn't trivial as async - /// is the keyword, ha. That only other user is pull_timeline.rs for which - /// being blocked is not that bad, and we can do spawn_blocking. - fn load_tenant_timelines( - state: &mut MutexGuard<'_, GlobalTimelinesState>, - tenant_id: TenantId, - ) -> Result<()> { - let timelines_dir = state.get_conf().tenant_dir(&tenant_id); + /// It is async for update_status_notify sake. Since TIMELINES_STATE lock is + /// sync and there is no important reason to make it async (it is always + /// held for a short while) we just lock and unlock it for each timeline -- + /// this function is called during init when nothing else is running, so + /// this is fine. + async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> { + let (conf, wal_backup_launcher_tx) = { + let state = TIMELINES_STATE.lock().unwrap(); + ( + state.get_conf().clone(), + state.wal_backup_launcher_tx.as_ref().unwrap().clone(), + ) + }; + + let timelines_dir = conf.tenant_dir(&tenant_id); for timelines_dir_entry in std::fs::read_dir(&timelines_dir) .with_context(|| format!("failed to list timelines dir {}", timelines_dir.display()))? { @@ -136,13 +144,16 @@ impl GlobalTimelines { TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or("")) { let ttid = TenantTimelineId::new(tenant_id, timeline_id); - match Timeline::load_timeline( - state.get_conf().clone(), - ttid, - state.wal_backup_launcher_tx.as_ref().unwrap().clone(), - ) { + match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx.clone()) { Ok(timeline) => { - state.timelines.insert(ttid, Arc::new(timeline)); + let tli = Arc::new(timeline); + TIMELINES_STATE + .lock() + .unwrap() + .timelines + .insert(ttid, tli.clone()); + tli.bootstrap(&conf); + tli.update_status_notify().await.unwrap(); } // If we can't load a timeline, it's most likely because of a corrupted // directory. We will log an error and won't allow to delete/recreate @@ -168,18 +179,22 @@ impl GlobalTimelines { } /// Load timeline from disk to the memory. - pub fn load_timeline(ttid: TenantTimelineId) -> Result> { + pub async fn load_timeline(ttid: TenantTimelineId) -> Result> { let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies(); - match Timeline::load_timeline(conf, ttid, wal_backup_launcher_tx) { + match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx) { Ok(timeline) => { let tli = Arc::new(timeline); + // TODO: prevent concurrent timeline creation/loading TIMELINES_STATE .lock() .unwrap() .timelines .insert(ttid, tli.clone()); + + tli.bootstrap(&conf); + Ok(tli) } // If we can't load a timeline, it's bad. Caller will figure it out. @@ -217,7 +232,7 @@ impl GlobalTimelines { info!("creating new timeline {}", ttid); let timeline = Arc::new(Timeline::create_empty( - conf, + &conf, ttid, wal_backup_launcher_tx, server_info, @@ -240,23 +255,24 @@ impl GlobalTimelines { // Write the new timeline to the disk and start background workers. // Bootstrap is transactional, so if it fails, the timeline will be deleted, // and the state on disk should remain unchanged. - if let Err(e) = timeline.bootstrap(&mut shared_state).await { - // Note: the most likely reason for bootstrap failure is that the timeline + if let Err(e) = timeline.init_new(&mut shared_state, &conf).await { + // Note: the most likely reason for init failure is that the timeline // directory already exists on disk. This happens when timeline is corrupted // and wasn't loaded from disk on startup because of that. We want to preserve // the timeline directory in this case, for further inspection. // TODO: this is an unusual error, perhaps we should send it to sentry // TODO: compute will try to create timeline every second, we should add backoff - error!("failed to bootstrap timeline {}: {}", ttid, e); + error!("failed to init new timeline {}: {}", ttid, e); - // Timeline failed to bootstrap, it cannot be used. Remove it from the map. + // Timeline failed to init, it cannot be used. Remove it from the map. TIMELINES_STATE.lock().unwrap().timelines.remove(&ttid); return Err(e); } // We are done with bootstrap, release the lock, return the timeline. // {} block forces release before .await } + timeline.update_status_notify().await?; timeline.wal_backup_launcher_tx.send(timeline.ttid).await?; Ok(timeline) } diff --git a/storage_broker/benches/rps.rs b/storage_broker/benches/rps.rs index 6563fec8b6a3..a0c8e1f749b8 100644 --- a/storage_broker/benches/rps.rs +++ b/storage_broker/benches/rps.rs @@ -125,6 +125,7 @@ async fn publish(client: Option, n_keys: u64) { tenant_id: vec![0xFF; 16], timeline_id: tli_from_u64(counter % n_keys), }), + term: 0, last_log_term: 0, flush_lsn: counter, commit_lsn: 2, @@ -132,6 +133,7 @@ async fn publish(client: Option, n_keys: u64) { remote_consistent_lsn: 4, peer_horizon_lsn: 5, safekeeper_connstr: "zenith-1-sk-1.local:7676".to_owned(), + http_connstr: "zenith-1-sk-1.local:7677".to_owned(), local_start_lsn: 0, availability_zone: None, }; diff --git a/storage_broker/proto/broker.proto b/storage_broker/proto/broker.proto index 4b2de1a8e511..aa9d62a29fc3 100644 --- a/storage_broker/proto/broker.proto +++ b/storage_broker/proto/broker.proto @@ -22,6 +22,8 @@ message SubscribeSafekeeperInfoRequest { message SafekeeperTimelineInfo { uint64 safekeeper_id = 1; TenantTimelineId tenant_timeline_id = 2; + // Safekeeper term + uint64 term = 12; // Term of the last entry. uint64 last_log_term = 3; // LSN of the last record. @@ -36,6 +38,8 @@ message SafekeeperTimelineInfo { uint64 local_start_lsn = 9; // A connection string to use for WAL receiving. string safekeeper_connstr = 10; + // HTTP endpoint connection string + string http_connstr = 13; // Availability zone of a safekeeper. optional string availability_zone = 11; } diff --git a/storage_broker/src/bin/storage_broker.rs b/storage_broker/src/bin/storage_broker.rs index 597d9860d8d8..23d1bee35402 100644 --- a/storage_broker/src/bin/storage_broker.rs +++ b/storage_broker/src/bin/storage_broker.rs @@ -519,6 +519,7 @@ mod tests { tenant_id: vec![0x00; 16], timeline_id, }), + term: 0, last_log_term: 0, flush_lsn: 1, commit_lsn: 2, @@ -526,6 +527,7 @@ mod tests { remote_consistent_lsn: 4, peer_horizon_lsn: 5, safekeeper_connstr: "neon-1-sk-1.local:7676".to_owned(), + http_connstr: "neon-1-sk-1.local:7677".to_owned(), local_start_lsn: 0, availability_zone: None, }