Skip to content

Commit

Permalink
feat(core-distributor)!: add cuid cache (#2381)
Browse files Browse the repository at this point in the history
* feat(core-distributor): add cuid cache

* fix test

* rename cpu_cache

* fix test

* feat(chain-listener): add ws ping to config

* fix tests

* fix tests

---------

Co-authored-by: Alexey Proshutinskiy <[email protected]>
Co-authored-by: Aleksey Proshutinskiy <[email protected]>
  • Loading branch information
3 people authored Sep 23, 2024
1 parent 6ec487f commit 7c2ff1a
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 38 deletions.
24 changes: 11 additions & 13 deletions crates/chain-listener/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ use crate::proof_tracker::ProofTracker;
use crate::types::{CUGroups, PhysicalCoreGroups};

const PROOF_POLL_LIMIT: usize = 50;
// TODO: move to config
const WS_PING_PERIOD_SEC: u64 = 10;

#[derive(Clone)]
struct OnChainWorker {
Expand Down Expand Up @@ -457,19 +455,17 @@ impl ChainListener {
}
}

pub async fn create_ws_client(ws_endpoint: &str) -> Result<WsClient, client::Error> {
pub async fn create_ws_client(config: &ChainListenerConfig) -> Result<WsClient, client::Error> {
let ws_client = retry(ExponentialBackoff::default(), || async {
let client = WsClientBuilder::default()
.enable_ws_ping(
PingConfig::new().ping_interval(Duration::from_secs(WS_PING_PERIOD_SEC)),
)
.build(ws_endpoint)
.enable_ws_ping(PingConfig::new().ping_interval(config.ws_ping_period))
.build(config.ws_endpoint.clone())
.await
.map_err(|err| {
tracing::warn!(
target: "chain-listener",
"Error connecting to websocket endpoint {}, error: {}; Retrying...",
ws_endpoint,
config.ws_endpoint,
err
);
err
Expand All @@ -482,7 +478,7 @@ impl ChainListener {
tracing::info!(
target: "chain-listener",
"Successfully connected to websocket endpoint: {}",
ws_endpoint
config.ws_endpoint
);

Ok(ws_client)
Expand All @@ -506,8 +502,7 @@ impl ChainListener {

async fn refresh_subscriptions(&mut self) -> Result<(), client::Error> {
if !self.ws_client.is_connected() {
self.ws_client =
ChainListener::create_ws_client(&self.listener_config.ws_endpoint).await?;
self.ws_client = ChainListener::create_ws_client(&self.listener_config).await?;
}

// loop because subscriptions can fail and require reconnection, we can't proceed without them
Expand All @@ -532,8 +527,7 @@ impl ChainListener {
client::Error::RestartNeeded(_) => {
tracing::warn!(target: "chain-listener", "Failed to refresh subscriptions: {err}; Restart client...");
self.ws_client =
ChainListener::create_ws_client(&self.listener_config.ws_endpoint)
.await?;
ChainListener::create_ws_client(&self.listener_config).await?;
}
_ => {
tracing::error!(target: "chain-listener", "Failed to refresh subscriptions: {err}; Retrying...");
Expand All @@ -551,6 +545,10 @@ impl ChainListener {

let in_deal: Vec<_> = units.extract_if(|cu| !cu.deal.is_zero()).collect();

let current_units: Vec<CUID> = units.iter().map(|unit| CUID::new(unit.id.0)).collect();
self.core_distributor
.cleanup_cache(current_units.as_slice());

self.cc_compute_units = units
.into_iter()
.map(|unit| (CUID::new(unit.id.0), unit))
Expand Down
64 changes: 64 additions & 0 deletions crates/core-distributor/src/distributor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::Arc;
use ccp_shared::types::{LogicalCoreId, PhysicalCoreId, CUID};
use cpu_utils::CPUTopology;
use fxhash::FxBuildHasher;
use fxhash::FxHashSet;
use parking_lot::RwLock;
use range_set_blaze::RangeSetBlaze;

Expand All @@ -44,6 +45,8 @@ pub trait CoreDistributor: Send + Sync {
fn release_worker_cores(&self, unit_ids: &[CUID]);

fn get_system_cpu_assignment(&self) -> SystemAssignment;

fn cleanup_cache(&self, allowed_cuids: &[CUID]);
}

/// `PersistentCoreDistributor` is a CPU core distributor responsible for allocating and releasing CPU cores
Expand All @@ -67,6 +70,7 @@ impl From<PersistentCoreDistributorState> for CoreDistributorState {
available_cores: value.available_cores.into_iter().collect(),
unit_id_mapping: value.unit_id_mapping.into_iter().collect(),
work_type_mapping: value.work_type_mapping.into_iter().collect(),
cuid_cache: value.cuid_cache.into_iter().collect(),
}
}
}
Expand Down Expand Up @@ -203,12 +207,16 @@ impl PersistentCoreDistributor {
let type_mapping =
Map::with_capacity_and_hasher(available_core_count, FxBuildHasher::default());

let cpu_cache =
Map::with_capacity_and_hasher(available_core_count, FxBuildHasher::default());

let inner_state = CoreDistributorState {
cores_mapping,
system_cores,
available_cores,
unit_id_mapping,
work_type_mapping: type_mapping,
cuid_cache: cpu_cache,
};

let result = Self::make_instance_with_task(file_name, inner_state, acquire_strategy);
Expand Down Expand Up @@ -288,6 +296,13 @@ impl CoreDistributor for PersistentCoreDistributor {
}
SystemAssignment::new(lock.system_cores.clone(), logical_core_ids)
}

fn cleanup_cache(&self, allowed_cuids: &[CUID]) {
let mut lock = self.state.write();
let allowed_unit_ids: FxHashSet<CUID> = allowed_cuids.iter().cloned().collect();
lock.cuid_cache
.retain(|cuid, _| allowed_unit_ids.contains(cuid))
}
}

pub(crate) struct CoreDistributorState {
Expand All @@ -302,6 +317,8 @@ pub(crate) struct CoreDistributorState {
pub unit_id_mapping: BiMap<PhysicalCoreId, CUID>,
// mapping between unit id and workload type
pub work_type_mapping: Map<CUID, WorkType>,
// cache
pub cuid_cache: Map<CUID, PhysicalCoreId>,
}

#[cfg(test)]
Expand Down Expand Up @@ -431,6 +448,52 @@ mod tests {
assert_eq!(assignment_1, assignment_3);
}

#[test]
fn test_acquire_same_cuid_strict() {
let cpu_topology = mocked_topology();
let temp_dir = tempfile::tempdir().expect("Failed to create temp dir");

let (distributor, _task) = PersistentCoreDistributor::from_path(
temp_dir.path().join("test.toml"),
2,
CoreRange::from_str("0-7").unwrap(),
AcquireStrategy::Strict,
&cpu_topology,
)
.unwrap();
let unit_id_1 =
<CUID>::from_hex("54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea")
.unwrap();
let unit_id_2 =
<CUID>::from_hex("1cce3d08f784b11d636f2fb55adf291d43c2e9cbe7ae7eeb2d0301a96be0a3a0")
.unwrap();
let unit_id_3 =
<CUID>::from_hex("1cce3d08f784b11d636f2fb55adf291d43c2e9cbe7ae7eeb2d0301a96be0a3d0")
.unwrap();
let all_unit_ids = vec![unit_id_1, unit_id_2, unit_id_3];

let unit_ids = vec![unit_id_1, unit_id_2];
let assignment_1 = distributor
.acquire_worker_cores(AcquireRequest {
unit_ids: unit_ids.clone(),
work_type: WorkType::CapacityCommitment,
})
.unwrap();
distributor.release_worker_cores(all_unit_ids.as_slice());
let unit_ids = vec![unit_id_2, unit_id_3];
let assignment_2 = distributor
.acquire_worker_cores(AcquireRequest {
unit_ids: unit_ids.clone(),
work_type: WorkType::CapacityCommitment,
})
.unwrap();

assert_eq!(
assignment_1.cuid_cores.get(&unit_id_2),
assignment_2.cuid_cores.get(&unit_id_2)
)
}

#[test]
fn test_acquire_and_release_strict() {
let cpu_topology = mocked_topology();
Expand Down Expand Up @@ -535,6 +598,7 @@ mod tests {
available_cores: vec![PhysicalCoreId::new(2)],
unit_id_mapping: vec![(PhysicalCoreId::new(3), init_id_1)],
work_type_mapping: vec![(init_id_1, WorkType::Deal)],
cuid_cache: Default::default(),
};
let (distributor, _task) = PersistentCoreDistributor::make_instance_with_task(
temp_dir.into_path(),
Expand Down
2 changes: 2 additions & 0 deletions crates/core-distributor/src/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,6 @@ impl CoreDistributor for DummyCoreDistibutor {
fn get_system_cpu_assignment(&self) -> SystemAssignment {
SystemAssignment::new(vec![PhysicalCoreId::new(0)], vec![LogicalCoreId::new(0)])
}

fn cleanup_cache(&self, _allowed_cuids: &[CUID]) {}
}
13 changes: 9 additions & 4 deletions crates/core-distributor/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ pub struct PersistentCoreDistributorState {
pub unit_id_mapping: Vec<(PhysicalCoreId, CUID)>,
#[serde_as(as = "Vec<(Hex, _)>")]
pub work_type_mapping: Vec<(CUID, WorkType)>,
#[serde_as(as = "Vec<(Hex, _)>")]
pub cuid_cache: Vec<(CUID, PhysicalCoreId)>,
}

impl PersistentCoreDistributorState {
Expand Down Expand Up @@ -124,6 +126,7 @@ impl From<&CoreDistributorState> for PersistentCoreDistributorState {
.iter()
.map(|(k, v)| ((*k), *v))
.collect(),
cuid_cache: value.cuid_cache.iter().map(|(k, v)| ((*k), *v)).collect(),
}
}
}
Expand All @@ -137,7 +140,7 @@ mod tests {

#[test]
fn test_serde() {
let init_id_1 =
let unit_id_1 =
<CUID>::from_hex("54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea")
.unwrap();
let persistent_state = PersistentCoreDistributorState {
Expand All @@ -153,15 +156,17 @@ mod tests {
],
system_cores: vec![PhysicalCoreId::new(1)],
available_cores: vec![PhysicalCoreId::new(2), PhysicalCoreId::new(3)],
unit_id_mapping: vec![(PhysicalCoreId::new(4), init_id_1)],
work_type_mapping: vec![(init_id_1, WorkType::Deal)],
unit_id_mapping: vec![(PhysicalCoreId::new(4), unit_id_1)],
work_type_mapping: vec![(unit_id_1, WorkType::Deal)],
cuid_cache: vec![(unit_id_1, PhysicalCoreId::new(1))],
};
let actual = toml::to_string(&persistent_state).unwrap();
let expected = "cores_mapping = [[1, 1], [1, 2], [2, 3], [2, 4], [3, 5], [3, 6], [4, 7], [4, 8]]\n\
system_cores = [1]\n\
available_cores = [2, 3]\n\
unit_id_mapping = [[4, \"54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea\"]]\n\
work_type_mapping = [[\"54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea\", \"Deal\"]]\n";
work_type_mapping = [[\"54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea\", \"Deal\"]]\n\
cuid_cache = [[\"54ae1b506c260367a054f80800a545f23e32c6bc4a8908c9a794cb8dad23e5ea\", 1]]\n";
assert_eq!(expected, actual)
}
}
75 changes: 57 additions & 18 deletions crates/core-distributor/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,39 +109,53 @@ impl AcquireStrategyOperations for StrictAcquireStrategy {
});
}

let mut new_core_allocation = Vec::with_capacity(core_allocation.len());
for (unit_id, physical_core_id) in core_allocation {
match state.cuid_cache.get(&unit_id) {
None => new_core_allocation.push((unit_id, physical_core_id)),
Some(cached_physical_core_id) => {
let position = state
.available_cores
.iter()
.position(|core_id| core_id == cached_physical_core_id);
match position {
None => new_core_allocation.push((unit_id, physical_core_id)),
Some(index) => {
// SAFETY: this should never happen because we already found position in the previous step
let physical_core_id = state
.available_cores
.remove(index)
.expect("Unexpected state. Should not be empty never");

state.unit_id_mapping.insert(physical_core_id, unit_id);
state.work_type_mapping.insert(unit_id, *worker_unit_type);
Self::add_cuid_cores(state, unit_id, physical_core_id, &mut cuid_cores);
}
}
}
}
}

for (unit_id, physical_core_id) in new_core_allocation {
let physical_core_id = match physical_core_id {
None => {
// SAFETY: this should never happen because we already checked the availability of cores
let core_id = state
let physical_core_id = state
.available_cores
.pop_back()
.expect("Unexpected state. Should not be empty never");
state.unit_id_mapping.insert(core_id, unit_id);
state.unit_id_mapping.insert(physical_core_id, unit_id);
state.work_type_mapping.insert(unit_id, *worker_unit_type);
core_id
state.cuid_cache.insert(unit_id, physical_core_id);
physical_core_id
}
Some(core_id) => {
state.work_type_mapping.insert(unit_id, *worker_unit_type);
core_id
}
};

// SAFETY: The physical core always has corresponding logical ids,
// unit_id_mapping can't have a wrong physical_core_id
let logical_core_ids = state
.cores_mapping
.get_vec(&physical_core_id)
.cloned()
.expect("Unexpected state. Should not be empty never");

cuid_cores.insert(
unit_id,
Cores {
physical_core_id,
logical_core_ids,
},
);
Self::add_cuid_cores(state, unit_id, physical_core_id, &mut cuid_cores);
}

Ok(Assignment::new(cuid_cores))
Expand All @@ -157,6 +171,31 @@ impl AcquireStrategyOperations for StrictAcquireStrategy {
}
}

impl StrictAcquireStrategy {
fn add_cuid_cores(
state: &CoreDistributorState,
unit_id: CUID,
physical_core_id: PhysicalCoreId,
cuid_cores: &mut Map<CUID, Cores>,
) {
// SAFETY: The physical core always has corresponding logical ids,
// unit_id_mapping can't have a wrong physical_core_id
let logical_core_ids = state
.cores_mapping
.get_vec(&physical_core_id)
.cloned()
.expect("Unexpected state. Should not be empty never");

cuid_cores.insert(
unit_id,
Cores {
physical_core_id,
logical_core_ids,
},
);
}
}

pub(crate) struct RoundRobinAcquireStrategy;

impl AcquireStrategyOperations for RoundRobinAcquireStrategy {
Expand Down
5 changes: 3 additions & 2 deletions crates/nox-tests/tests/chain_listener/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ impl ChainListenerTestEntities {
max_batch_count: 0,
max_proof_batch_size: 0,
epoch_end_window: Default::default(),
ws_ping_period: Duration::from_secs(5),
});

cfg.cc_events_dir = Some(cc_events_dir.clone());
Expand Down Expand Up @@ -393,8 +394,8 @@ async fn test_deal_insufficient_funds_flow() {
assert_allocation(
result_2.cu_allocation,
vec![
(PhysicalCoreId::new(125), cu_id_2),
(PhysicalCoreId::new(126), cu_id_3),
(PhysicalCoreId::new(125), cu_id_3),
(PhysicalCoreId::new(126), cu_id_2),
],
);
}
Expand Down
4 changes: 4 additions & 0 deletions crates/server-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,3 +291,7 @@ pub fn default_max_proof_batch_size() -> usize {
pub fn default_epoch_end_window() -> Duration {
Duration::from_secs(300)
}

pub fn default_ws_ping_period() -> Duration {
Duration::from_secs(10)
}
3 changes: 3 additions & 0 deletions crates/server-config/src/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,9 @@ pub struct ChainListenerConfig {
#[serde(default = "default_epoch_end_window")]
#[serde(with = "humantime_serde")]
pub epoch_end_window: Duration,
#[serde(default = "default_ws_ping_period")]
#[serde(with = "humantime_serde")]
pub ws_ping_period: Duration, // TODO: must be >0
}

/// Name of the effector module
Expand Down
Loading

0 comments on commit 7c2ff1a

Please sign in to comment.