Skip to content

Commit

Permalink
[GIE/Runtime] Redesign PartitionerInfo, ClusterInfo, and Router
Browse files Browse the repository at this point in the history
… trait to better support parallel processing in Runtime (#2744)

<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?

<!-- Please give a short brief about these changes. -->

Redesign `PartitionInfo`, `ClusterInfo`, and `Router` trait to better
support parallel processing in Runtime, where:
* `PartitionInfo` is used to query the partition information when the
data has been partitioned.
* `ClusterInfo` is used to query the cluster information when the system
is running on a cluster.
* `Router` is used to route the data to the destination worker so that
it can be properly processed, with `PartitionInfo` and `ClusterInfo` as
input.


## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes #2753
  • Loading branch information
BingqingLyu authored Jun 8, 2023
1 parent 653823b commit 9a265dc
Show file tree
Hide file tree
Showing 40 changed files with 555 additions and 704 deletions.
6 changes: 3 additions & 3 deletions interactive_engine/executor/assembly/groot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ groot-store = { path = "../../store/groot" }
gaia_pegasus = { path = "../../engine/pegasus/pegasus", package = "pegasus" }
pegasus_network = { path = "../../engine/pegasus/network" }
pegasus_server = { path = "../../engine/pegasus/server" }
runtime_integration = { path = "../../ir/integrated" , features = ["with_global_query"]}
log = "0.4"
runtime = {path = "../../ir/runtime"}
graph_proxy = {path = "../../ir/graph_proxy", features = ["with_global_query"]}
log4rs = "1.2"
tokio = { version = "1.24", features = ["macros", "sync"] }

[features]
column_filter_push_down = ["runtime_integration/column_filter_push_down"]
column_filter_push_down = []

[profile.dev]

# TODO(siyuan): re-enable debug assertions by addressing the reports for misaligned pointer dereferences https://github.com/rust-lang/rust/pull/98112/
debug-assertions = false
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
//!
use std::ffi::CStr;
use std::net::{SocketAddr, ToSocketAddrs};
use std::os::raw::{c_char, c_void};
use std::sync::Arc;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ use std::time::Duration;

use gaia_pegasus::Configuration as GaiaConfig;
use global_query::GlobalGraph;
use graph_proxy::{apis::PegasusClusterInfo, create_gs_store, GrootMultiPartition};
use groot_store::api::PartitionId;
use groot_store::db::api::{GraphConfig, GraphResult};
use groot_store::db::graph::store::GraphStore;
use pegasus_network::config::{NetworkConfig, ServerAddr};
use pegasus_network::SimpleServerDetector;
use pegasus_server::rpc::{start_all, RPCServerConfig, ServiceStartListener};
use runtime_integration::{InitializeJobAssembly, QueryGrootGraph};
use runtime::initialize_job_assembly;
use tokio::runtime::Runtime;

use crate::global_query::GraphPartitionManager;

pub struct GaiaServer {
config: Arc<GraphConfig>,
graph: Arc<GlobalGraph>,
Expand Down Expand Up @@ -69,8 +72,20 @@ impl GaiaServer {
let gaia_rpc_config = make_gaia_rpc_config(self.config.clone());
info!("Server config {:?}\nRPC config {:?}", gaia_config, gaia_rpc_config);
let (server_port, rpc_port) = self.rpc_runtime.block_on(async {
let query = QueryGrootGraph::new(self.graph.clone(), self.graph.clone());
let job_compiler = query.initialize_job_assembly();
let column_filter_push_down = false;
#[cfg(feature = "column_filter_push_down")]
let column_filter_push_down = true;
let cluster_info = Arc::new(PegasusClusterInfo::default());
let gs_store = create_gs_store(
self.graph.clone(),
self.graph.clone(),
self.graph.get_process_partition_list(),
cluster_info.clone(),
true,
column_filter_push_down,
);
let partition_info = GrootMultiPartition::new(self.graph.clone());
let job_compiler = initialize_job_assembly(gs_store, Arc::new(partition_info), cluster_info);
let service_listener = GaiaServiceListener::default();
let service_listener_clone = service_listener.clone();
self.rpc_runtime.spawn(async move {
Expand Down
4 changes: 2 additions & 2 deletions interactive_engine/executor/assembly/v6d/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ structopt = "0.3"
lru_time_cache = "0.11"
dotproperties = "0.1.0"
tokio = { version = "1.24", features = ["macros", "sync"] }

global_query = { path = "../../store/global_query" , features = ["with_v6d"] }
pegasus = { path = "../../engine/pegasus/pegasus", package = "pegasus" }
pegasus_network = { path = "../../engine/pegasus/network" }
pegasus_server = { path = "../../engine/pegasus/server" }
runtime_integration = { path = "../../ir/integrated", features = ["with_global_query", "with_v6d"] }
runtime = {path = "../../ir/runtime", features = ["with_v6d"]}
graph_proxy = {path = "../../ir/graph_proxy", features = ["with_global_query", "with_v6d"]}
21 changes: 13 additions & 8 deletions interactive_engine/executor/assembly/v6d/src/bin/gaia_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ use std::sync::Arc;

use gaia_runtime::error::{StartServerError, StartServerResult};
use global_query::{FFIGraphStore, GraphPartitionManager};
use graph_proxy::{apis::PegasusClusterInfo, create_gs_store, VineyardMultiPartition};
use log::info;
use pegasus::api::{Fold, Sink};
use pegasus::api::Sink;
use pegasus::{wait_servers_ready, Configuration, JobConf, ServerConf};
use pegasus_network::config::NetworkConfig;
use pegasus_network::config::ServerAddr;
use pegasus_server::rpc::{start_rpc_server, RPCServerConfig, ServiceStartListener};
use runtime_integration::{InitializeJobAssembly, QueryVineyard};
use runtime::initialize_job_assembly;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand Down Expand Up @@ -87,7 +88,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let ffi_store = FFIGraphStore::new(vineyard_graph_id, worker_thread_num);

// Init partitions information
let partition_manager = ffi_store.get_partition_manager();
let partition_manager = Arc::new(ffi_store.get_partition_manager());
let process_partition_list = partition_manager.get_process_partition_list();
info!("process_partition_list: {:?}", process_partition_list);

Expand All @@ -103,13 +104,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let partition_server_index_map = compute_partition_server_mapping(process_partition_lists)?;
info!("server_index: {:?}, partition_server_index_map: {:?}", server_index, partition_server_index_map);

let query_vineyard = QueryVineyard::new(
Arc::new(ffi_store).clone(),
Arc::new(partition_manager),
partition_server_index_map,
let cluster_info = Arc::new(PegasusClusterInfo::default());
let gs_store = create_gs_store(
Arc::new(ffi_store),
partition_manager.clone(),
computed_process_partition_list,
cluster_info.clone(),
false,
false,
);
let job_assembly = query_vineyard.initialize_job_assembly();
let partition_info = VineyardMultiPartition::new(partition_manager, partition_server_index_map.clone());
let job_assembly = initialize_job_assembly(gs_store, Arc::new(partition_info), cluster_info);
start_rpc_server(server_id, rpc_config, job_assembly, GaiaServiceListener).await?;
Ok(())
}
Expand Down
3 changes: 0 additions & 3 deletions interactive_engine/executor/assembly/v6d/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
#![allow(bare_trait_objects)]

pub mod error;

#[macro_use]
extern crate log;
extern crate dotproperties;
extern crate global_query;
extern crate log4rs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
//! See the License for the specific language governing permissions and
//! limitations under the License.
mod partitioner;
mod read_graph;

pub use partitioner::CsrPartition;
pub use read_graph::create_csr_store;

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use pegasus_common::impl_as_any;

use crate::apis::graph::PKV;
use crate::apis::{
from_fn, register_graph, Details, Direction, DynDetails, Edge, PropertyValue, QueryParams, ReadGraph,
from_fn, ClusterInfo, Details, Direction, DynDetails, Edge, PropertyValue, QueryParams, ReadGraph,
Statement, Vertex, ID,
};
use crate::errors::{GraphProxyError, GraphProxyResult};
Expand All @@ -43,20 +43,20 @@ lazy_static! {
pub static ref CSR_PATH: String = configure_with_default!(String, "CSR_PATH", "".to_string());
pub static ref PARTITION_ID: usize = configure_with_default!(usize, "PARTITION_ID", 0);
pub static ref CSR: CsrDB<usize, usize> = _init_csr();
static ref GRAPH_PROXY: Arc<CSRStore> = initialize();
}

const CSR_STORE_PK: KeyId = 0;
pub const LABEL_SHIFT_BITS: usize =
8 * (std::mem::size_of::<DefaultId>() - std::mem::size_of::<StoreLabelId>());

pub struct CSRStore {
store: &'static CsrDB<usize, usize>,
pub fn create_csr_store(cluster_info: Arc<dyn ClusterInfo>) -> Arc<CSRStore> {
lazy_static::initialize(&CSR);
Arc::new(CSRStore { store: &CSR, cluster_info })
}

fn initialize() -> Arc<CSRStore> {
lazy_static::initialize(&CSR);
Arc::new(CSRStore { store: &CSR })
pub struct CSRStore {
store: &'static CsrDB<usize, usize>,
cluster_info: Arc<dyn ClusterInfo>,
}

fn _init_csr() -> CsrDB<usize, usize> {
Expand All @@ -70,16 +70,12 @@ impl ReadGraph for CSRStore {
let label_ids = encode_storage_label(&params.labels);
let props = params.columns.clone();

let worker_id = pegasus::get_current_worker_checked()
.map(|worker| worker.index)
.unwrap_or(0);
let workers_num = pegasus::get_current_worker_checked()
.map(|worker| worker.local_peers)
.unwrap_or(1);
let worker_index = self.cluster_info.get_worker_index()?;
let workers_num = self.cluster_info.get_local_worker_num()?;

let result = self
.store
.get_partitioned_vertices(label_ids.as_ref(), worker_id, workers_num)
.get_partitioned_vertices(label_ids.as_ref(), worker_index, workers_num)
.map(move |v| to_runtime_vertex(v, props.clone()));
Ok(filter_sample_limit!(result, params.filter, params.sample_ratio, params.limit))
}
Expand All @@ -96,17 +92,13 @@ impl ReadGraph for CSRStore {
let label_ids = encode_storage_label(&params.labels);
let props = params.columns.clone();

let worker_id = pegasus::get_current_worker_checked()
.map(|worker| worker.index)
.unwrap_or(0);
let workers_num = pegasus::get_current_worker_checked()
.map(|worker| worker.local_peers)
.unwrap_or(1);
let worker_index = self.cluster_info.get_worker_index()?;
let workers_num = self.cluster_info.get_local_worker_num()?;
let partition_id = self.store.partition as u8;

let result = self
.store
.get_partitioned_edges(label_ids.as_ref(), worker_id, workers_num)
.get_partitioned_edges(label_ids.as_ref(), worker_index, workers_num)
.map(move |e| to_runtime_edge(e, None, props.clone(), partition_id));
Ok(filter_sample_limit!(result, params.filter, params.sample_ratio, params.limit))
}
Expand Down Expand Up @@ -182,12 +174,6 @@ impl ReadGraph for CSRStore {
}
}

#[allow(dead_code)]
pub fn create_csr_store() {
lazy_static::initialize(&GRAPH_PROXY);
register_graph(GRAPH_PROXY.clone());
}

#[inline]
fn to_runtime_vertex(
v: LocalVertex<'static, DefaultId, DefaultId>, prop_keys: Option<Vec<NameOrId>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,20 @@
//! See the License for the specific language governing permissions and
//! limitations under the License.
use crate::apis::{Partitioner, ID};
use crate::apis::partitioner::{PartitionId, PartitionInfo, PartitionedData, ServerId};
use crate::GraphProxyResult;

/// A simple partition utility that one server contains a single graph partition
pub struct SimplePartition {
pub num_servers: usize,
}

impl Partitioner for SimplePartition {
fn get_partition(&self, id: &ID, workers: usize) -> GraphProxyResult<u64> {
let id_usize = *id as usize;
let magic_num = id_usize / self.num_servers;
// The partitioning logics is as follows:
// 1. `R = id - magic_num * num_servers = id % num_servers` routes a given id
// to the machine R that holds its data.
// 2. `R * workers` shifts the worker's id in the machine R.
// 3. `magic_num % workers` then picks up one of the workers in the machine R
// to do the computation.
Ok(((id_usize - magic_num * self.num_servers) * workers + magic_num % workers) as u64)
impl PartitionInfo for SimplePartition {
fn get_partition_id<D: PartitionedData>(&self, data: &D) -> GraphProxyResult<PartitionId> {
Ok((data.get_partition_key_id() as usize % self.num_servers) as u32)
}

fn get_worker_partitions(
&self, _job_workers: usize, _worker_id: u32,
) -> GraphProxyResult<Option<Vec<u64>>> {
// In graph that one server contains a single graph partition,
// there's no need to assign the specific partition id for query (as all workers will scan part of current partition).
// In source scan, workers will scan the vertices in a parallel way
Ok(None)
fn get_server_id(&self, partition_id: PartitionId) -> GraphProxyResult<ServerId> {
Ok(partition_id as ServerId)
}
}
Loading

0 comments on commit 9a265dc

Please sign in to comment.