Skip to content

Commit

Permalink
fix conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
liulx20 committed Dec 11, 2023
2 parents a212966 + 057bf4c commit b9a0a0e
Show file tree
Hide file tree
Showing 34 changed files with 3,668 additions and 480 deletions.
22 changes: 22 additions & 0 deletions .github/workflows/flex.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,28 @@ jobs:
run: |
git clone -b master --single-branch --depth=1 https://github.com/GraphScope/gstest.git ${GS_TEST_DIR}
- name: Test String edge property on modern graph
env:
FLEX_DATA_DIR: ${{ github.workspace }}/flex/interactive/examples/modern_graph/
run: |
rm -rf /tmp/csr-data-dir/
cd ${GITHUB_WORKSPACE}/flex/build/
SCHEMA_FILE=../tests/rt_mutable_graph/modern_graph_string_edge.yaml
BULK_LOAD_FILE=../interactive/examples/modern_graph/bulk_load.yaml
GLOG_v=10 ./bin/bulk_loader -g ${SCHEMA_FILE} -l ${BULK_LOAD_FILE} -d /tmp/csr-data-dir/
GLOG_v=10 ./tests/rt_mutable_graph/string_edge_property_test ${SCHEMA_FILE} /tmp/csr-data-dir/
- name: Test build empty graph
run: |
rm -rf /tmp/csr-data-dir/
cd ${GITHUB_WORKSPACE}/flex/build/
GLOG_v=10 ./tests/rt_mutable_graph/test_empty_graph /tmp/csr-data-dir/
- name: Test ACID
run: |
rm -rf /tmp/csr-data-dir/
cd ${GITHUB_WORKSPACE}/flex/build/
GLOG_v=10 ./tests/rt_mutable_graph/test_acid 8 /tmp/csr-data-dir/
- name: Test Graph Loading on modern graph
env:
FLEX_DATA_DIR: ${{ github.workspace }}/flex/interactive/examples/modern_graph/
Expand Down
9 changes: 8 additions & 1 deletion docs/interactive_engine/tinkerpop/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,14 @@ For example, in the LDBC schema, we define the property ID as the primary key fo
```scss
g.V().hasLabel('PERSON').has('id', propertyIdValue)
```
Where 'id' is the property ID, and 'propertyIdValue' is the value of the property key. By directly using the primary key index, query performance can be significantly improved, avoiding full table scans and property value filtering, thereby optimizing query performance.
Where 'id' is the property ID, and 'propertyIdValue' is the value of the property key.

Moreover, we support the `within` operator to query multiple values of the same property key, which can also be optimized by the primary key index. For example:
```scss
g.V().hasLabel('PERSON').has('id', within(propertyIdValue1, propertyIdValue2))
```

By directly using the primary key index, query performance can be significantly improved, avoiding full table scans and property value filtering, thereby optimizing query performance.

## How to use subgraph in GIE Gremlin ?

Expand Down
10 changes: 9 additions & 1 deletion flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,17 @@ project (
option(BUILD_HQPS "Whether to build HighQPS Engine" ON)
option(BUILD_TEST "Whether to build test" ON)
option(BUILD_DOC "Whether to build doc" ON)
option(USE_MMAPALLOC "Whether to use mmap allocator." OFF)
option(USE_MMAPALLOC "Whether to use mmap allocator" OFF)
option(BUILD_ODPS_FRAGMENT_LOADER "Whether to build odps fragment loader" ON)

execute_process(COMMAND uname -r OUTPUT_VARIABLE LINUX_KERNEL_VERSION)
string(STRIP ${LINUX_KERNEL_VERSION} LINUX_KERNEL_VERSION)
message(${LINUX_KERNEL_VERSION})
if(LINUX_KERNEL_VERSION VERSION_GREATER_EQUAL "4.5")
message("Use copy file range")
add_definitions(-DUSE_COPY_FILE_RANGE)
endif()

include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../)

set(DEFAULT_BUILD_TYPE "Release")
Expand Down
10 changes: 6 additions & 4 deletions flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ GraphDB& GraphDB::get() {
Result<bool> GraphDB::Open(const Schema& schema, const std::string& data_dir,
int32_t thread_num, bool warmup) {
if (!std::filesystem::exists(data_dir)) {
return Result<bool>(StatusCode::NotExists, "Data directory does not exist");
std::filesystem::create_directories(data_dir);
}

std::string schema_file = schema_path(data_dir);
bool create_empty_graph = false;
if (!std::filesystem::exists(schema_file)) {
return Result<bool>(StatusCode::NotExists, "Schema file does not exist");
create_empty_graph = true;
graph_.mutable_schema() = schema;
}
work_dir_ = data_dir;
thread_num_ = thread_num;
Expand All @@ -73,7 +75,7 @@ Result<bool> GraphDB::Open(const Schema& schema, const std::string& data_dir,
"Exception: " + std::string(e.what()), false);
}

if (!graph_.schema().Equals(schema)) {
if ((!create_empty_graph) && (!graph_.schema().Equals(schema))) {
return Result<bool>(StatusCode::InternalError,
"Schema of work directory is not compatible with the "
"graph schema",
Expand All @@ -97,7 +99,7 @@ Result<bool> GraphDB::Open(const Schema& schema, const std::string& data_dir,

openWalAndCreateContexts(data_dir);

if (warmup) {
if ((!create_empty_graph) && warmup) {
graph_.Warmup(thread_num_);
}
return Result<bool>(true);
Expand Down
4 changes: 3 additions & 1 deletion flex/engines/graph_db/database/graph_db_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ Result<std::vector<char>> GraphDBSession::Eval(const std::string& input) {

LOG(INFO) << "[Query-" << (int) type << "][Thread-" << thread_id_
<< "] retry - " << i << " / " << MAX_RETRY;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
if (i + 1 < MAX_RETRY) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}

decoder.reset(str_data, str_len);
result_buffer.clear();
Expand Down
2 changes: 1 addition & 1 deletion flex/engines/graph_db/database/graph_db_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void put_argment(gs::Encoder& encoder, const query::Argument& argment);

class GraphDBSession {
public:
static constexpr int32_t MAX_RETRY = 4;
static constexpr int32_t MAX_RETRY = 3;
GraphDBSession(GraphDB& db, Allocator& alloc, WalWriter& logger,
const std::string& work_dir, int thread_id)
: db_(db),
Expand Down
19 changes: 10 additions & 9 deletions flex/engines/graph_db/database/read_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,28 @@ namespace gs {

class MutablePropertyFragment;
class VersionManager;

template <typename EDATA_T>
class AdjListView {
class nbr_iterator {
using nbr_t = MutableNbr<EDATA_T>;
using const_nbr_t = typename MutableNbrSlice<EDATA_T>::const_nbr_t;
using const_nbr_ptr_t = typename MutableNbrSlice<EDATA_T>::const_nbr_ptr_t;

public:
nbr_iterator(const nbr_t* ptr, const nbr_t* end, timestamp_t timestamp)
nbr_iterator(const_nbr_ptr_t ptr, const_nbr_ptr_t end,
timestamp_t timestamp)
: ptr_(ptr), end_(end), timestamp_(timestamp) {
while (ptr_ != end_ && ptr_->timestamp > timestamp_) {
while (ptr_ != end_ && ptr_->get_timestamp() > timestamp_) {
++ptr_;
}
}

const nbr_t& operator*() const { return *ptr_; }
const_nbr_t& operator*() const { return *ptr_; }

const nbr_t* operator->() const { return ptr_; }
const_nbr_ptr_t operator->() const { return ptr_; }

nbr_iterator& operator++() {
++ptr_;
while (ptr_ != end_ && ptr_->timestamp > timestamp_) {
while (ptr_ != end_ && ptr_->get_timestamp() > timestamp_) {
++ptr_;
}
return *this;
Expand All @@ -62,8 +63,8 @@ class AdjListView {
}

private:
const nbr_t* ptr_;
const nbr_t* end_;
const_nbr_ptr_t ptr_;
const_nbr_ptr_t end_;
timestamp_t timestamp_;
};

Expand Down
59 changes: 9 additions & 50 deletions flex/engines/graph_db/database/update_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ static size_t get_offset(
if (base->get_neighbor() == target) {
return offset;
}
offset++;
base->next();
}
return std::numeric_limits<size_t>::max();
Expand All @@ -170,7 +171,7 @@ bool UpdateTransaction::AddEdge(label_t src_label, const Any& src,
label_t edge_label, const Any& value) {
vid_t src_lid, dst_lid;
static constexpr size_t sentinel = std::numeric_limits<size_t>::max();
size_t offset_out, offset_in;
size_t offset_out = sentinel, offset_in = sentinel;
if (graph_.get_lid(src_label, src, src_lid) &&
graph_.get_lid(dst_label, dst, dst_lid)) {
const auto& oe =
Expand Down Expand Up @@ -708,35 +709,8 @@ void UpdateTransaction::batch_commit(UpdateBatch& batch) {
bool dst_flag = graph_.get_lid(dst_label, dst, dst_lid);

if (src_flag && dst_flag) {
auto oe = graph_.get_outgoing_edges_mut(src_label, src_lid, dst_label,
edge_label);
while (oe != nullptr && oe->is_valid()) {
if (oe->get_neighbor() == dst_lid) {
oe->set_data(prop, timestamp_);
src_flag = false;
break;
}
oe->next();
}
auto ie = graph_.get_incoming_edges_mut(dst_label, dst_lid, src_label,
edge_label);
while (ie != nullptr && ie->is_valid()) {
if (ie->get_neighbor() == src_lid) {
dst_flag = false;
ie->set_data(prop, timestamp_);
break;
}
ie->next();
}
if ((!src_flag) || (!dst_flag)) {
} else {
grape::InArchive arc;
arc << prop;

grape::OutArchive out_arc(std::move(arc));
graph_.IngestEdge(src_label, src_lid, dst_label, dst_lid, edge_label,
timestamp_, out_arc, alloc_);
}
graph_.UpdateEdge(src_label, src_lid, dst_label, dst_lid, edge_label,
timestamp_, prop, alloc_);
}
}
auto& arc = batch.GetArc();
Expand Down Expand Up @@ -822,9 +796,6 @@ void UpdateTransaction::applyEdgesUpdates() {
}
}

MutableCsrBase* csr =
graph_.get_oe_csr(src_label, dst_label, edge_label);

for (auto& pair : added_edges_[oe_csr_index]) {
vid_t v = pair.first;
auto& add_list = pair.second;
Expand All @@ -839,7 +810,11 @@ void UpdateTransaction::applyEdgesUpdates() {
continue;
auto u = add_list[idx];
auto value = edge_data.at(u).first;
csr->put_generic_edge(v, u, value, timestamp_, alloc_);
grape::InArchive iarc;
serialize_field(iarc, value);
grape::OutArchive oarc(std::move(iarc));
graph_.IngestEdge(src_label, v, dst_label, u, edge_label,
timestamp_, oarc, alloc_);
}
}
}
Expand Down Expand Up @@ -875,22 +850,6 @@ void UpdateTransaction::applyEdgesUpdates() {
}
}
}

MutableCsrBase* csr =
graph_.get_ie_csr(dst_label, src_label, edge_label);

for (auto& pair : added_edges_[ie_csr_index]) {
vid_t v = pair.first;
auto& add_list = pair.second;
if (add_list.empty()) {
continue;
}
auto& edge_data = updated_edge_data_[ie_csr_index].at(v);
for (auto u : add_list) {
auto value = edge_data.at(u).first;
csr->put_generic_edge(v, u, value, timestamp_, alloc_);
}
}
}
}
}
Expand Down
Loading

0 comments on commit b9a0a0e

Please sign in to comment.