Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(flex): Refine the schema definition of rt_mutable_graph #3079

Merged
merged 24 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/workflows/flex.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,27 @@ jobs:
cmake .. && sudo make -j$(nproc)
export FLEX_DATA_DIR=../../../../storages/rt_mutable_graph/modern_graph/
./run_grin_test
- name: Test Graph Loading on modern graph
env:
FLEX_DATA_DIR: ${{ github.workspace }}/flex/storages/rt_mutable_graph/modern_graph/
run: |
cd ${GITHUB_WORKSPACE}/flex/build/
SCHEMA_FILE=../storages/rt_mutable_graph/modern_graph/modern_graph.yaml
BULK_LOAD_FILE=../storages/rt_mutable_graph/modern_graph/bulk_load.yaml
GLOG_v=10 ./tests/rt_mutable_graph/test_graph_loading ${SCHEMA_FILE} ${BULK_LOAD_FILE} /tmp/csr-data-dir/
- name: Test Graph Loading on LDBC SNB sf0.1
env:
GS_TEST_DIR: ${{ github.workspace }}/gstest/
FLEX_DATA_DIR: ${{ github.workspace }}/gstest/flex/ldbc-sf01-long-date/
run: |
# remove modern graph indices
rm -rf /tmp/csr-data-dir/
git clone -b master --single-branch --depth=1 https://github.com/GraphScope/gstest.git ${GS_TEST_DIR}
cd ${GITHUB_WORKSPACE}/flex/build/
SCHEMA_FILE=${FLEX_DATA_DIR}/audit_graph_schema.yaml
BULK_LOAD_FILE=${FLEX_DATA_DIR}/audit_bulk_load.yaml
GLOG_v=10 ./tests/rt_mutable_graph/test_graph_loading ${SCHEMA_FILE} ${BULK_LOAD_FILE} /tmp/csr-data-dir/ 2
7 changes: 4 additions & 3 deletions flex/bin/rt_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ int main(int argc, char** argv) {
double t0 = -grape::GetCurrentTime();
auto& db = gs::GraphDB::get();

auto ret = gs::Schema::LoadFromYaml(graph_schema_path, bulk_load_config_path);
db.Init(std::get<0>(ret), std::get<1>(ret), std::get<2>(ret),
std::get<3>(ret), data_path, shard_num);
auto schema = gs::Schema::LoadFromYaml(graph_schema_path);
auto loading_config =
gs::LoadingConfig::ParseFromYaml(schema, bulk_load_config_path);
db.Init(schema, loading_config, data_path, shard_num);

t0 += grape::GetCurrentTime();

Expand Down
8 changes: 5 additions & 3 deletions flex/bin/sync_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "flex/engines/hqps_db/database/mutable_csr_interface.h"
#include "flex/engines/http_server/codegen_proxy.h"
#include "flex/engines/http_server/stored_procedure.h"
#include "flex/storages/rt_mutable_graph/loading_config.h"

#include <yaml-cpp/yaml.h>
#include <boost/program_options.hpp>
Expand Down Expand Up @@ -205,9 +206,10 @@ int main(int argc, char** argv) {
double t0 = -grape::GetCurrentTime();
auto& db = gs::GraphDB::get();

auto ret = gs::Schema::LoadFromYaml(graph_schema_path, bulk_load_config_path);
db.Init(std::get<0>(ret), std::get<1>(ret), std::get<2>(ret),
std::get<3>(ret), data_path, shard_num);
auto schema = gs::Schema::LoadFromYaml(graph_schema_path);
auto loading_config =
gs::LoadingConfig::ParseFromYaml(schema, bulk_load_config_path);
db.Init(schema, loading_config, data_path, shard_num);

t0 += grape::GetCurrentTime();

Expand Down
21 changes: 9 additions & 12 deletions flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,36 +48,33 @@ GraphDB& GraphDB::get() {
return db;
}

void GraphDB::Init(
const Schema& schema,
const std::vector<std::pair<std::string, std::string>>& vertex_files,
const std::vector<std::tuple<std::string, std::string, std::string,
std::string>>& edge_files,
const std::vector<std::string>& plugins, const std::string& data_dir,
int thread_num) {
void GraphDB::Init(const Schema& schema, const LoadingConfig& load_config,
const std::string& data_dir, int thread_num) {
std::filesystem::path data_dir_path(data_dir);
if (!std::filesystem::exists(data_dir_path)) {
std::filesystem::create_directory(data_dir_path);
}

std::filesystem::path serial_path = data_dir_path / "init_snapshot.bin";
if (!std::filesystem::exists(serial_path)) {
if (!vertex_files.empty() || !edge_files.empty()) {
if (!load_config.GetVertexLoadingMeta().empty() ||
!load_config.GetEdgeLoadingMeta().empty()) {
LOG(INFO) << "Initializing graph db through bulk loading";
{
MutablePropertyFragment graph;
graph.Init(schema, vertex_files, edge_files, thread_num);
graph.Init(schema, load_config, thread_num);
graph.Serialize(data_dir_path.string());
}
graph_.Deserialize(data_dir_path.string());
} else {
LOG(INFO) << "Initializing empty graph db";
graph_.Init(schema, vertex_files, edge_files, thread_num);
graph_.Init(schema, load_config, thread_num);
graph_.Serialize(data_dir_path.string());
}
} else {
LOG(INFO) << "Initializing graph db from data files of work directory";
if (!vertex_files.empty() || !edge_files.empty()) {
if (!load_config.GetVertexLoadingMeta().empty() ||
!load_config.GetEdgeLoadingMeta().empty()) {
LOG(WARNING) << "Bulk loading is ignored because data files of work "
"directory exist";
}
Expand Down Expand Up @@ -109,7 +106,7 @@ void GraphDB::Init(
contexts_[i].logger.open(wal_dir.string(), i);
}

initApps(plugins);
initApps(schema.GetPluginsList());
}

ReadTransaction GraphDB::GetReadTransaction() {
Expand Down
10 changes: 3 additions & 7 deletions flex/engines/graph_db/database/graph_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "flex/engines/graph_db/database/single_vertex_insert_transaction.h"
#include "flex/engines/graph_db/database/update_transaction.h"
#include "flex/engines/graph_db/database/version_manager.h"
#include "flex/storages/rt_mutable_graph/loading_config.h"
#include "flex/storages/rt_mutable_graph/mutable_property_fragment.h"

namespace gs {
Expand All @@ -45,13 +46,8 @@ class GraphDB {

static GraphDB& get();

void Init(
const Schema& schema,
const std::vector<std::pair<std::string, std::string>>& vertex_files,
const std::vector<std::tuple<std::string, std::string, std::string,
std::string>>& edge_files,
const std::vector<std::string>& plugins, const std::string& data_dir,
int thread_num = 1);
void Init(const Schema& schema, const LoadingConfig& config,
const std::string& data_dir, int thread_num = 1);

/** @brief Create a transaction to read vertices and edges.
*
Expand Down
1 change: 1 addition & 0 deletions flex/engines/graph_db/grin/src/predefine.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <vector>

#include "grin/predefine.h"
#include "storages/rt_mutable_graph/loading_config.h"
#include "storages/rt_mutable_graph/mutable_property_fragment.h"

typedef gs::oid_t GRIN_OID_T;
Expand Down
10 changes: 4 additions & 6 deletions flex/engines/graph_db/grin/src/topology/structure.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,12 @@ GRIN_GRAPH grin_get_graph_from_storage(const char* uri) {
!(std::filesystem::exists(bulk_load_config_path))) {
return GRIN_NULL_GRAPH;
}
auto ret = gs::Schema::LoadFromYaml(graph_schema_path, bulk_load_config_path);
const auto& schema = std::get<0>(ret);
auto& vertex_files = std::get<1>(ret);

auto& edge_files = std::get<2>(ret);
auto schema = gs::Schema::LoadFromYaml(graph_schema_path);
auto loading_config =
gs::LoadingConfig::ParseFromYaml(schema, bulk_load_config_path);

GRIN_GRAPH_T* g = new GRIN_GRAPH_T();
g->g.Init(schema, vertex_files, edge_files);
g->g.Init(schema, loading_config);
init_cache(g);
return g;
}
Expand Down
120 changes: 72 additions & 48 deletions flex/storages/rt_mutable_graph/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,64 +25,88 @@ The configuration file ([modern graph example](./modern_graph/modern_graph.yaml)
Here is an example of a configuration file:

```yaml
graph:
graph_store: mutable_csr
vertex:
- label_name: person
name: modern
store_type: mutable_csr
stored_procedures:
directory: plugins
enable_lists:
- libxxx.so
schema:
vertex_types:
- type_name: person
x_csr_params:
max_vertex_num: 100
properties:
- name: _ID
type: int64
- name: name
type: String
- name: age
type: int32
max_vertex_num: 100
- label_name: software
- property_id: 0
property_name: id
property_type:
primitive_type: DT_SIGNED_INT64
- property_id: 1
property_name: name
property_type:
primitive_type: DT_STRING
- property_id: 2
property_name: age
property_type:
primitive_type: DT_SIGNED_INT32
primary_keys:
- id
- type_name: software
x_csr_params:
max_vertex_num: 100
properties:
- name: _ID
type: int64
- name: name
type: String
- name: lang
type: String
max_vertex_num: 100
edge:
- src_label_name: person
dst_label_name: software
edge_label_name: created
- property_id: 0
property_name: id
property_type:
primitive_type: DT_SIGNED_INT64
x_csr_params:
- property_id: 1
property_name: name
property_type:
primitive_type: DT_STRING
- property_id: 2
property_name: lang
property_type:
primitive_type: DT_STRING
primary_keys:
- id
edge_types:
- type_name: knows
x_csr_params:
incoming_edge_strategy: None
outgoing_edge_strategy: Multiple
vertex_type_pair_relations:
source_vertex: person
destination_vertex: person
relation: MANY_TO_MANY
properties:
- name: _SRC
type: int64
- name: _DST
type: int64
- name: weight
type: double
incoming_edge_strategy: None
outgoing_edge_strategy: Single
- src_label_name: person
dst_label_name: person
edge_label_name: knows
- property_id: 0
property_name: weight
property_type:
primitive_type: DT_DOUBLE
- type_name: created
x_csr_params:
incoming_edge_strategy: None
outgoing_edge_strategy: Single
vertex_type_pair_relations:
source_vertex: person
destination_vertex: software
relation: ONE_TO_MANY
properties:
- name: _SRC
type: int64
- name: _DST
type: int64
- name: weight
type: double
incoming_edge_strategy: None
outgoing_edge_strategy: Multiple

stored_procedures:
- libxxx.so
- property_id: 0
property_name: weight
property_type:
primitive_type: DT_DOUBLE
```
Notes:
- `_ID`, `_SRC`, `_DST` are reserved words, they are the external id of vertices, only int64 type is supported.
- `max_vertex_num` limit the number of vertices of this type:
- Currently we only support one primary key, and the type has to be `DT_SIGNED_INT64`.
- All implementation related configuration are put under x_csr_params.
- `max_vertex_num` limit the number of vertices of this type:
- The limit number is used to `mmap` memory, so it only takes virtual memory before vertices are actually inserted.
- If `max_vertex_num` is not set, a default large number (e.g.: 2^48) will be used.
- `incoming/outgoing_edge_strategy` specifies the storing strategy of the incoming or outgoing edges of this type, there are 3 kinds of strategies
- `incoming/outgoing_edge_strategy` specifies the storing strategy of the incoming or outgoing edges of this type, there are 3 kinds of strategies
- None: no edge will be stored
- Single: only one edge will be stored
- Multiple(default): multiple edges will be stored
Expand Down
Loading