Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Safekeeper peer recovery preparatory patches #5118

Merged
merged 6 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions libs/safekeeper_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ fn lsn_invalid() -> Lsn {
#[serde_as]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SkTimelineInfo {
/// Term.
pub term: Option<u64>,
/// Term of the last entry.
pub last_log_term: Option<u64>,
/// LSN of the last record.
Expand Down Expand Up @@ -58,4 +60,6 @@ pub struct SkTimelineInfo {
/// A connection string to use for WAL receiving.
#[serde(default)]
pub safekeeper_connstr: Option<String>,
#[serde(default)]
pub http_connstr: Option<String>,
}
1 change: 1 addition & 0 deletions libs/utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ url.workspace = true
uuid.workspace = true

pq_proto.workspace = true
postgres_connection.workspace = true
metrics.workspace = true
workspace_hack.workspace = true

Expand Down
2 changes: 2 additions & 0 deletions libs/utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ pub mod serde_regex;

pub mod pageserver_feedback;

pub mod postgres_client;

pub mod tracing_span_assert;

pub mod rate_limit;
Expand Down
37 changes: 37 additions & 0 deletions libs/utils/src/postgres_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//! Postgres client connection code common to other crates (safekeeper and
//! pageserver) which depends on tenant/timeline ids and thus not fitting into
//! postgres_connection crate.

use anyhow::Context;
use postgres_connection::{parse_host_port, PgConnectionConfig};

use crate::id::TenantTimelineId;

/// Create client config for fetching WAL from safekeeper on particular timeline.
/// listen_pg_addr_str is in form host:\[port\].
pub fn wal_stream_connection_config(
TenantTimelineId {
tenant_id,
timeline_id,
}: TenantTimelineId,
listen_pg_addr_str: &str,
auth_token: Option<&str>,
availability_zone: Option<&str>,
) -> anyhow::Result<PgConnectionConfig> {
let (host, port) =
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
let port = port.unwrap_or(5432);
let mut connstr = PgConnectionConfig::new_host_port(host, port)
.extend_options([
"-c".to_owned(),
format!("timeline_id={}", timeline_id),
format!("tenant_id={}", tenant_id),
])
.set_password(auth_token.map(|s| s.to_owned()));

if let Some(availability_zone) = availability_zone {
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
}

Ok(connstr)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ use storage_broker::Streaming;
use tokio::select;
use tracing::*;

use postgres_connection::{parse_host_port, PgConnectionConfig};
use postgres_connection::PgConnectionConfig;
use utils::backoff::{
exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
};
use utils::postgres_client::wal_stream_connection_config;
use utils::{
id::{NodeId, TenantTimelineId},
lsn::Lsn,
Expand Down Expand Up @@ -876,33 +877,6 @@ impl ReconnectReason {
}
}

fn wal_stream_connection_config(
TenantTimelineId {
tenant_id,
timeline_id,
}: TenantTimelineId,
listen_pg_addr_str: &str,
auth_token: Option<&str>,
availability_zone: Option<&str>,
) -> anyhow::Result<PgConnectionConfig> {
let (host, port) =
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
let port = port.unwrap_or(5432);
let mut connstr = PgConnectionConfig::new_host_port(host, port)
.extend_options([
"-c".to_owned(),
format!("timeline_id={}", timeline_id),
format!("tenant_id={}", tenant_id),
])
.set_password(auth_token.map(|s| s.to_owned()));

if let Some(availability_zone) = availability_zone {
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
}

Ok(connstr)
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -918,6 +892,7 @@ mod tests {
timeline: SafekeeperTimelineInfo {
safekeeper_id: 0,
tenant_timeline_id: None,
term: 0,
last_log_term: 0,
flush_lsn: 0,
commit_lsn,
Expand All @@ -926,6 +901,7 @@ mod tests {
peer_horizon_lsn: 0,
local_start_lsn: 0,
safekeeper_connstr: safekeeper_connstr.to_owned(),
http_connstr: safekeeper_connstr.to_owned(),
availability_zone: None,
},
latest_update,
Expand Down
37 changes: 20 additions & 17 deletions safekeeper/src/bin/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,21 +341,35 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {

let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);

// Load all timelines from disk to memory.
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?;

// Keep handles to main tasks to die if any of them disappears.
let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> =
FuturesUnordered::new();

// Start wal backup launcher before loading timelines as we'll notify it
// through the channel about timelines which need offloading, not draining
// the channel would cause deadlock.
let current_thread_rt = conf
.current_thread_runtime
.then(|| Handle::try_current().expect("no runtime in main"));
let conf_ = conf.clone();
let wal_backup_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle())
.spawn(wal_backup::wal_backup_launcher_task_main(
conf_,
wal_backup_launcher_rx,
))
.map(|res| ("WAL backup launcher".to_owned(), res));
tasks_handles.push(Box::pin(wal_backup_handle));

// Load all timelines from disk to memory.
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx).await?;

let conf_ = conf.clone();
// Run everything in current thread rt, if asked.
if conf.current_thread_runtime {
info!("running in current thread runtime");
}
let current_thread_rt = conf
.current_thread_runtime
.then(|| Handle::try_current().expect("no runtime in main"));

let wal_service_handle = current_thread_rt
.as_ref()
Expand Down Expand Up @@ -408,17 +422,6 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
.map(|res| ("WAL remover".to_owned(), res));
tasks_handles.push(Box::pin(wal_remover_handle));

let conf_ = conf.clone();
let wal_backup_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle())
.spawn(wal_backup::wal_backup_launcher_task_main(
conf_,
wal_backup_launcher_rx,
))
.map(|res| ("WAL backup launcher".to_owned(), res));
tasks_handles.push(Box::pin(wal_backup_handle));

set_build_info_metric(GIT_VERSION);

// TODO: update tokio-stream, convert to real async Stream with
Expand Down
5 changes: 2 additions & 3 deletions safekeeper/src/control_file_upgrade.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Code to deal with safekeeper control file upgrades
use crate::safekeeper::{
AcceptorState, PersistedPeers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory,
TermSwitchEntry,
AcceptorState, PersistedPeers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermLsn,
};
use anyhow::{bail, Result};
use pq_proto::SystemId;
Expand Down Expand Up @@ -145,7 +144,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
let oldstate = SafeKeeperStateV1::des(&buf[..buf.len()])?;
let ac = AcceptorState {
term: oldstate.acceptor_state.term,
term_history: TermHistory(vec![TermSwitchEntry {
term_history: TermHistory(vec![TermLsn {
term: oldstate.acceptor_state.epoch,
lsn: Lsn(0),
}]),
Expand Down
6 changes: 6 additions & 0 deletions safekeeper/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::receive_wal::WalReceiverState;
use crate::safekeeper::ServerInfo;
use crate::safekeeper::Term;
use crate::send_wal::WalSenderState;
use crate::timeline::PeerInfo;
use crate::{debug_dump, pull_timeline};

use crate::timelines_global_map::TimelineDeleteForceResult;
Expand Down Expand Up @@ -101,6 +102,7 @@ pub struct TimelineStatus {
pub peer_horizon_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn: Lsn,
pub peers: Vec<PeerInfo>,
pub walsenders: Vec<WalSenderState>,
pub walreceivers: Vec<WalReceiverState>,
}
Expand Down Expand Up @@ -140,6 +142,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
term_history,
};

let conf = get_conf(&request);
// Note: we report in memory values which can be lost.
let status = TimelineStatus {
tenant_id: ttid.tenant_id,
Expand All @@ -153,6 +156,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
backup_lsn: inmem.backup_lsn,
peer_horizon_lsn: inmem.peer_horizon_lsn,
remote_consistent_lsn: tli.get_walsenders().get_remote_consistent_lsn(),
peers: tli.get_peers(conf).await,
walsenders: tli.get_walsenders().get_all(),
walreceivers: tli.get_walreceivers().get_all(),
};
Expand Down Expand Up @@ -282,12 +286,14 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
tenant_id: ttid.tenant_id.as_ref().to_owned(),
timeline_id: ttid.timeline_id.as_ref().to_owned(),
}),
term: sk_info.term.unwrap_or(0),
last_log_term: sk_info.last_log_term.unwrap_or(0),
flush_lsn: sk_info.flush_lsn.0,
commit_lsn: sk_info.commit_lsn.0,
remote_consistent_lsn: sk_info.remote_consistent_lsn.0,
peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
backup_lsn: sk_info.backup_lsn.0,
local_start_lsn: sk_info.local_start_lsn.0,
availability_zone: None,
Expand Down
4 changes: 2 additions & 2 deletions safekeeper/src/json_ctrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};
use crate::safekeeper::{
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected,
};
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermLsn};
use crate::timeline::Timeline;
use crate::GlobalTimelines;
use postgres_backend::PostgresBackend;
Expand Down Expand Up @@ -119,7 +119,7 @@ async fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> any
let history = tli.get_state().await.1.acceptor_state.term_history;
let history = history.up_to(lsn.checked_sub(1u64).unwrap());
let mut history_entries = history.0;
history_entries.push(TermSwitchEntry { term, lsn });
history_entries.push(TermLsn { term, lsn });
let history = TermHistory(history_entries);

let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
Expand Down
1 change: 1 addition & 0 deletions safekeeper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod json_ctrl;
pub mod metrics;
pub mod pull_timeline;
pub mod receive_wal;
pub mod recovery;
pub mod remove_wal;
pub mod safekeeper;
pub mod send_wal;
Expand Down
4 changes: 3 additions & 1 deletion safekeeper/src/pull_timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,9 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?;
tokio::fs::rename(tli_dir_path, &timeline_path).await?;

let tli = GlobalTimelines::load_timeline(ttid).context("Failed to load timeline after copy")?;
let tli = GlobalTimelines::load_timeline(ttid)
.await
.context("Failed to load timeline after copy")?;

info!(
"Loaded timeline {}, flush_lsn={}",
Expand Down
40 changes: 40 additions & 0 deletions safekeeper/src/recovery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//! This module implements pulling WAL from peer safekeepers if compute can't
//! provide it, i.e. safekeeper lags too much.

use std::sync::Arc;

use tokio::{select, time::sleep, time::Duration};
use tracing::{info, instrument};

use crate::{timeline::Timeline, SafeKeeperConf};

/// Entrypoint for per timeline task which always runs, checking whether
/// recovery for this safekeeper is needed and starting it if so.
#[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))]
pub async fn recovery_main(tli: Arc<Timeline>, _conf: SafeKeeperConf) {
info!("started");
let mut cancellation_rx = match tli.get_cancellation_rx() {
Ok(rx) => rx,
Err(_) => {
info!("timeline canceled during task start");
return;
}
};

select! {
_ = recovery_main_loop(tli) => { unreachable!() }
_ = cancellation_rx.changed() => {
info!("stopped");
}
}
}

const CHECK_INTERVAL_MS: u64 = 2000;

/// Check regularly whether we need to start recovery.
async fn recovery_main_loop(_tli: Arc<Timeline>) {
let check_duration = Duration::from_millis(CHECK_INTERVAL_MS);
arssher marked this conversation as resolved.
Show resolved Hide resolved
loop {
sleep(check_duration).await;
arssher marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading