Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GIE/Runtime] Redesign PartitionerInfo, ClusterInfo, and Router trait to better support parallel processing in Runtime #2744

Merged
merged 22 commits into from
Jun 8, 2023

Conversation

BingqingLyu
Copy link
Collaborator

@BingqingLyu BingqingLyu commented May 25, 2023

What do these changes do?

Redesign PartitionInfo, ClusterInfo, and Router trait to better support parallel processing in Runtime, where:

  • PartitionInfo is used to query the partition information when the data has been partitioned.
  • ClusterInfo is used to query the cluster information when the system is running on a cluster.
  • Router is used to route the data to the destination worker so that it can be properly processed, with PartitionInfo and ClusterInfo as input.

Related issue number

Fixes #2753

@BingqingLyu BingqingLyu marked this pull request as draft May 25, 2023 09:02
@BingqingLyu BingqingLyu changed the title [WIP] Update Partitioner trait to support parallel scan [GIE/Runtime] Update Partitioner trait to support parallel scan May 26, 2023
@BingqingLyu BingqingLyu marked this pull request as ready for review May 26, 2023 02:11
@BingqingLyu BingqingLyu requested a review from longbinlai May 26, 2023 02:11
WholePartitions(Vec<u64>),
// PartialPartitions indicates **partial partitions** to query, specified as `(i, n, partition_id)`,
// means that to query the first `i`-th part out of `n` parts, of the partition with the given `partition_id`.
PartialPartition(u32, u32, u64),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议这三个integer项目定义在struct里面。不然太不清楚了。

@codecov-commenter
Copy link

codecov-commenter commented May 30, 2023

Codecov Report

Merging #2744 (7a9977d) into main (653823b) will not change coverage.
The diff coverage is n/a.

Additional details and impacted files

Impacted file tree graph

@@           Coverage Diff           @@
##             main    #2744   +/-   ##
=======================================
  Coverage   42.44%   42.44%           
=======================================
  Files          99       99           
  Lines       10654    10654           
=======================================
  Hits         4522     4522           
  Misses       6132     6132           

Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 653823b...7a9977d. Read the comment docs.

pub trait Router: Send + Sync + 'static {
/// Given the element id and job_workers (number of workers per server),
/// return the worker id that is going to do the query.
fn route(&self, id: &ID, job_workers: usize) -> GraphProxyResult<u64>;
Copy link
Collaborator

@longbinlai longbinlai May 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the following suggestions for this api:

  • Design a structure, such like ClusterInfo here, to include how the whole cluster is managed, including how many servers (we actually need this, currently, we simply leverage a pegasus static function to obtain this information, which couples implementation with this api) in the cluster and how many workers in a server. The ClusterInfo can be initiated while starting the server, and then pass into this function as a reference.
  • Make the item of id as an abstraction of RouteData, where RouteData implements the function get_route_id(). Then we probably can directly implement Vertex and Edge (or even ID) as RouteData. ID is a little bit ambiguous given we actually want to route any data, not just ID.
  • Make u64 a type of WorkerId.
  • I think Graph partition information should also be aware to the Router, no?
  • Comment this structure as:
A `Router` is used to route the data to the destination worker so that it can be properly processed, especially
when the underlying data has been partitioned across the cluster. Given the partition information as well as how our cluster is managed (by `ClusterInfo`) and co-located with the graph data, we can implement the corresponding `route` function to guide the system to transfer the data to a proper destination worker. 

For example, suppose our computer server contains 10 servers, each further forking 10 workers for processing queries. In addition, the graph is partitioned into these 10 servers by the following strategy: vertex of give ID is placed in the server with id i (0 to 9) i given ID % 10 == i, the vertex's adjacent edges are also placed with the vertex. Then the router can decide which worker should process the vertex of ID 25534 as follows:
- It first do `25534 % 10 == 4`, which means it must be routed to the 4-th server.
- Any worker in the 4-th server can process the vertex. Thus it randomly picks a worker, saying 5-th worker, which has ID 4 * 10 + 5 = 45.
- Then 45-th worker will be returned for routing this vertex.

@BingqingLyu BingqingLyu marked this pull request as draft May 31, 2023 11:11
/// A `PartitionInfo` is used to query the partition information when the data has been partitioned.
pub trait PartitionInfo: Send + Sync + 'static {
/// Given the data, return the id of the partition that holds the data.
fn get_partition_id(&self, data: &ID) -> GraphProxyResult<PartitionId>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ID for data is not very nice. We may want to define trait PartitionedData, even we may not need any function for them.

pub trait PartitionedData {  }

impl PartitionedData for Vertex {  }
impl PartitionedData for Edge {  }

pub trait PartitionInfo: Send + Sync + 'static {
    fn get_partition_id<D: PartitionedData>(&self, data: &Data) -> GraphProxyResult<PartitionId>;
}

/// - Then 45-th worker will be returned for routing this vertex.
pub trait Router: Send + Sync + 'static {
/// a route function that given the data, return the worker id that is going to do the query.
fn route(&self, data: &ID, job_workers: usize) -> GraphProxyResult<WorkerId>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, ID for data, not very well-designed. Consider:

pub trait RouteData {
    /// Comment this
    fn get_route_key(&self) -> ID; 
}

impl RouteData for Vertex { ... }
impl RouteData for Edge { ... }

/// - It first do `25534 % 10 == 4`, which means it must be routed to the 4-th server.
/// - Any worker in the 4-th server can process the vertex. Thus it randomly picks a worker, saying 5-th worker, which has ID 4 * 10 + 5 = 45.
/// - Then 45-th worker will be returned for routing this vertex.
pub trait Router: Send + Sync + 'static {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pub trait Router {
    // Make PartitionInfo and ClusterInfo as type of Router to tell the implementation that Router needs these two information 
    type P: PartitionInfo; 
    type C: ClusterInfo; 
}

fn route(&self, data: &ID, job_workers: usize) -> GraphProxyResult<WorkerId>;
}

pub struct DistributedDataRouter<P: PartitionInfo, C: ClusterInfo> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would call this DefaultRouter

}

impl<P: PartitionInfo, C: ClusterInfo> Router for DistributedDataRouter<P, C> {
fn route(&self, data: &i64, job_workers: usize) -> GraphProxyResult<u64> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the above comment:

/// For example, suppose our computer server contains 10 servers, each further forking 10 workers for processing queries.
/// In addition, the graph is partitioned into these 10 servers by the following strategy:
/// vertex of give ID is placed in the server with id i (0 to 9) i given ID % 10 == i, the vertex's adjacent edges are also placed with the vertex.
/// Then the router can decide which worker should process the vertex of ID 25534 as follows:
/// - It first do `25534 % 10 == 4`, which means it must be routed to the 4-th server.
/// - Any worker in the 4-th server can process the vertex. Thus it randomly picks a worker, saying 5-th worker, which has ID 4 * 10 + 5 = 45.
/// - Then 45-th worker will be returned for routing this vertex.

to here.

@BingqingLyu BingqingLyu force-pushed the refine_partitioner branch from b6fab15 to 0669b24 Compare June 5, 2023 08:13
@BingqingLyu BingqingLyu force-pushed the refine_partitioner branch 2 times, most recently from 3012302 to 44b2821 Compare June 6, 2023 09:09
@BingqingLyu BingqingLyu force-pushed the refine_partitioner branch from 44b2821 to 590521a Compare June 6, 2023 10:17
@BingqingLyu BingqingLyu marked this pull request as ready for review June 7, 2023 03:04
@BingqingLyu BingqingLyu changed the title [GIE/Runtime] Update Partitioner trait to support parallel scan [GIE/Runtime] Redesign PartitionerInfo, ClusterInfo, and Router trait to support parallel query in Runtime Jun 7, 2023
@BingqingLyu BingqingLyu changed the title [GIE/Runtime] Redesign PartitionerInfo, ClusterInfo, and Router trait to support parallel query in Runtime [GIE/Runtime] Redesign PartitionerInfo, ClusterInfo, and Router trait to better support parallel processing in Runtime Jun 7, 2023
Copy link
Collaborator

@longbinlai longbinlai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@BingqingLyu BingqingLyu merged commit 9a265dc into alibaba:main Jun 8, 2023
@BingqingLyu BingqingLyu deleted the refine_partitioner branch June 8, 2023 05:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[GIE] Refine Partitioner trait to better support parallel scan
3 participants