Skip to content

Commit

Permalink
Implement merge-read-extract.
Browse files Browse the repository at this point in the history
Reading from parallel filesystems, e.g. GPFS, requires reading few but
large chunks. Reading multiple times from the same block/page, come with
a hefty performance penalty.

The commit implements the functionality for merging nearby reads by
adding or modifying:

  * `sortAndMerge` to allow merging ranges across gaps of a certain
    size.

  * `bulkRead` to read block-by-block and extract the requested slices
    in memory.

  * `_readSelection` to always combine reads.

  * `?fferent_edges` to optimize reading of edge IDs.

It requires a compile-time constant `SONATA_PAGESIZE` to specify the
block/pagesize to be targeted.
  • Loading branch information
1uc authored and joni-herttuainen committed Nov 3, 2023
1 parent 6da4886 commit d2e189e
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 35 deletions.
53 changes: 44 additions & 9 deletions src/edge_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <unordered_map>
#include <vector>

#include "read_bulk.hpp"

namespace bbp {
namespace sonata {
Expand Down Expand Up @@ -53,7 +54,6 @@ const HighFive::Group targetIndex(const HighFive::Group& h5Root) {
return h5Root.getGroup(TARGET_INDEX_GROUP);
}


Selection resolve(const HighFive::Group& indexGroup, const NodeID nodeID) {
if (nodeID >= indexGroup.getDataSet(NODE_ID_TO_RANGES_DSET).getSpace().getDimensions()[0]) {
// Returning empty set for out-of-range node IDs, to be aligned with SYN2 reader
Expand Down Expand Up @@ -90,19 +90,54 @@ Selection resolve(const HighFive::Group& indexGroup, const NodeID nodeID) {
return Selection(std::move(ranges));
}


Selection resolve(const HighFive::Group& indexGroup, const std::vector<NodeID>& nodeIDs) {
constexpr size_t min_gap_size = SONATA_PAGESIZE / (2 * sizeof(uint64_t));
constexpr size_t max_aggregated_block_size = 128 * min_gap_size;

if (nodeIDs.size() == 1) {
return resolve(indexGroup, nodeIDs[0]);
}
// TODO optimize: bulk read for primary index
// TODO optimize: range merging
std::set<EdgeID> merged;
for (NodeID nodeID : nodeIDs) {
const auto ids = resolve(indexGroup, nodeID).flatten();
merged.insert(ids.begin(), ids.end());

auto readBlock = [&indexGroup](auto& buffer, const auto& range, const std::string& dset_name) {
size_t i_begin = std::get<0>(range);
size_t i_end = std::get<1>(range);

indexGroup.getDataSet(dset_name).select({i_begin, 0}, {i_end - i_begin, 2}).read(buffer);
};

auto primaryRange = bulk_read::bulkRead<std::array<uint64_t, 2>>(
[&readBlock](auto& buffer, const auto& range) {
readBlock(buffer, range, NODE_ID_TO_RANGES_DSET);
},
Selection::fromValues(nodeIDs),
min_gap_size,
max_aggregated_block_size);

// Sort and eliminate empty ranges.
primaryRange = bulk_read::sortAndMerge(primaryRange);
if (primaryRange.empty()) {
return Selection({});
}
return Selection::fromValues(merged.begin(), merged.end());

auto secondaryRange = bulk_read::bulkRead<std::array<uint64_t, 2>>(
[&readBlock](auto& buffer, const auto& range) {
readBlock(buffer, range, RANGE_TO_EDGE_ID_DSET);
},
primaryRange,
min_gap_size,
max_aggregated_block_size);

// Sort and eliminate empty ranges.
secondaryRange = bulk_read::sortAndMerge(secondaryRange);

// Copy `secondaryRange`, because the types don't match.
Selection::Ranges edgeIds;
edgeIds.reserve(secondaryRange.size());
for (const auto& range : secondaryRange) {
edgeIds.emplace_back(range[0], range[1]);
}

return Selection(std::move(edgeIds));
}


Expand Down
26 changes: 2 additions & 24 deletions src/population.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,7 @@ void _checkRanges(const Ranges& ranges) {
}

Ranges _sortAndMerge(const Ranges& ranges) {
if (ranges.empty()) {
return ranges;
}
Ranges ret;
Ranges sorted(ranges);
std::sort(sorted.begin(), sorted.end());

auto it = sorted.cbegin();
ret.push_back(*(it++));

for (; it != sorted.cend(); ++it) {
auto& last = ret[ret.size() - 1].second;
if (last < it->first) {
ret.push_back(*it);
} else {
last = std::max(last, it->second);
}
}
return ret;
return bulk_read::sortAndMerge(ranges);
}

Selection intersection_(const Ranges& lhs, const Ranges& rhs) {
Expand Down Expand Up @@ -122,11 +104,7 @@ Selection::Values Selection::flatten() const {


size_t Selection::flatSize() const {
size_t result = 0;
for (const auto& range : ranges_) {
result += (range.second - range.first);
}
return result;
return bulk_read::detail::flatSize(ranges_);
}


Expand Down
2 changes: 2 additions & 0 deletions src/population.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#include <highfive/H5File.hpp>

#include "read_bulk.hpp"

namespace bbp {
namespace sonata {

Expand Down
213 changes: 213 additions & 0 deletions src/read_bulk.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
#pragma once

#include <bbp/sonata/population.h>
#include <cstdint>

#define SONATA_PAGESIZE (4 * 1 << 20)

namespace bbp {
namespace sonata {
namespace bulk_read {

namespace detail {

/** Is the selection sorted and non-overlapping?
*/
template <class Range>
bool isCanonical(const std::vector<Range>& ranges) {
for (size_t i = 1; i < ranges.size(); ++i) {
if (std::get<1>(ranges[i - 1]) > std::get<0>(ranges[i])) {
return false;
}
}

return true;
}

inline bool isCanonical(const Selection selection) {
return isCanonical(selection.ranges());
}

/** Number of elements in the selection.
*/
template <class Range>
size_t flatSize(const std::vector<Range>& ranges) {
size_t size = 0;
for (const auto& range : ranges) {
size += std::get<1>(range) - std::get<0>(range);
}

return size;
}

template <class T, class Pred>
void erase_if(std::vector<T>& v, Pred pred) {
auto it = std::remove_if(v.begin(), v.end(), pred);
v.erase(it, v.end());
}
} // namespace detail

/** Sort the selection and merge small gaps.
*
* The ranges of the selection are sorted and then ranges that are separated by
* a gap of less than `min_gap_size` are merged into a single range. A gap size
* of: `0` will only sort, `1` will merge ranges that touch, pass the page size
* to merge anything that's separated less than one page.
*
* Reading only a few elements from each page could lead to reading very large
* ranges into memory. To avoid this issue the one must pick a maximum
* aggregated block size. Consecutive blocks wont be merged if the current
* block exceeds the threshold `max_aggregated_block_size`.
*
* Note, that the returned selection is canonical (sorted and non-overlapping)
* and may have a larger `flatSize` than `ranges`. Additionally any empty
* ranges are removed.
*/
template <class Range>
std::vector<Range> sortAndMerge(const std::vector<Range>& ranges,
size_t min_gap_size = 1,
size_t max_aggregated_block_size = size_t(-1)) {
if (ranges.empty()) {
return std::vector<Range>{};
}

std::vector<Range> ret;
std::vector<Range> sorted(ranges);
std::sort(sorted.begin(), sorted.end());

detail::erase_if(sorted,
[](const auto& range) { return std::get<0>(range) >= std::get<1>(range); });

if (sorted.empty()) {
return std::vector<Range>{};
}

auto it = sorted.cbegin();
ret.push_back(*(it++));

for (; it != sorted.cend(); ++it) {
auto& current_range = ret.back();
auto& last = std::get<1>(current_range);

size_t current_range_size = last - std::get<0>(current_range);
if (last + min_gap_size <= std::get<0>(*it) ||
current_range_size >= max_aggregated_block_size) {
// Start a new range.
ret.push_back(*it);
} else {
// Extend the current range.
last = std::max(last, std::get<1>(*it));
}
}

return ret;
}

inline Selection sortAndMerge(const Selection& selection, size_t min_gap_size = 0) {
return Selection(sortAndMerge(selection.ranges(), min_gap_size));
}


/** Extract a slice of values from a block.
*
* This is one step of the merge-read-extract algorithm. It operates on one
* block of data, i.e. one merged range.
*
* Remember ranges refer to the indices of the array on disk. They are not the
* indices into either `out` or `buffer`.
*
* This copies the slice `range_small` from `buffer` to `out`. The `buffer` has
* size `range_large[1] - range_large[0]`. The `out` pointer must point to the
* first element to be written to.
*/
template <class T, class Range>
void extractBlock(T* const out,
T const* const buffer,
const Range& range_large,
const Range& range_small) {
size_t i0_small = std::get<0>(range_small);
size_t i1_small = std::get<1>(range_small);

size_t i0_large = std::get<0>(range_large);

size_t buffer_offset = i0_small - i0_large;
size_t count_small = i1_small - i0_small;

std::copy(buffer + buffer_offset, buffer + buffer_offset + count_small, out);
}

/** Read larger block and extract required values in memory.
*
* This implements the read and extract parts of the merge-read-extract
* algorithm.
*
* The ranges denote the indices of the array on disk. The algorithm will read
* one block, i.e. one element of `ranges` at a time. It reads a block by
* calling
*
* readBlock(buffer, range);
*
* the function object `readBlock` must fill `buffer` (an `std::vector<T>`)
* with the values for `range`. The algorithm will then extract the required
* values (controlled by `subranges`).
*
* Note that both `ranges` and `subranges` must be canonical (sorted and
* non-overlapping). Additionally, any range in `subranges` must be fully
* contained in exactly one range in `ranges`.
*/
template <class T, class F, class Range>
std::vector<T> bulkRead(F readBlock,
const std::vector<Range>& ranges,
const std::vector<Range>& subranges) {
std::vector<T> values(detail::flatSize(subranges));
T* values_ptr = values.data();

std::vector<T> buffer;

size_t k_sub = 0;
size_t n_sub = subranges.size();
for (const auto& range : ranges) {
readBlock(buffer, range);

for (; k_sub < n_sub; ++k_sub) {
const auto& subrange = subranges[k_sub];
if (std::get<1>(subrange) > std::get<1>(range)) {
break;
}

extractBlock(values_ptr, buffer.data(), range, subrange);
values_ptr += std::get<1>(subrange) - std::get<0>(subrange);
}
}

return values;
}

/** Read `ranges` using merge-read-extract.
*
* @sa `sortAndMerge` and `bulkRead`.
*/
template <class T, class F, class Ranges>
std::vector<T> bulkRead(F readBlock,
const std::vector<Ranges>& ranges,
size_t min_gap_size,
size_t max_aggregated_block_size) {
auto super_ranges = sortAndMerge(ranges, min_gap_size, max_aggregated_block_size);
return bulkRead<T>(readBlock, super_ranges, ranges);
}

/** Read `ranges` using merge-read-extract.
*
* @sa `sortAndMerge` and `bulkRead`.
*/
template <class T, class F>
std::vector<T> bulkRead(F readBlock,
const Selection& selection,
size_t min_gap_size,
size_t max_aggregated_block_size) {
return bulkRead<T>(readBlock, selection.ranges(), min_gap_size, max_aggregated_block_size);
}

} // namespace bulk_read
} // namespace sonata
} // namespace bbp
25 changes: 23 additions & 2 deletions tests/test_edges.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ std::ostream& operator<<(std::ostream& oss, const Selection& selection) {


TEST_CASE("EdgePopulation", "[edges]") {
// E-mapping-good Dataset {6}
// Data:
// 2, 1, 2, 0, 2, 2
//
// @library/E-mapping-good Dataset {3}
// Data:
// "A", "B", "C"

const EdgePopulation population("./data/edges1.h5", "", "edges-AB");

CHECK(population.source() == "nodes-A");
Expand Down Expand Up @@ -60,8 +68,21 @@ TEST_CASE("EdgePopulation", "[edges]") {
// duplicate node IDs are ignored; order of node IDs is not relevant
CHECK(population.connectingEdges({2, 1, 2}, {2, 1, 2}) == Selection({{0, 4}}));

CHECK(population.getAttribute<size_t>("E-mapping-good", Selection({{0, 1}, {2, 3}})) ==
std::vector<size_t>{2, 2});
// Canonical.
CHECK(population.getAttribute<size_t>("E-mapping-good", Selection({{0, 1}, {2, 4}})) ==
std::vector<size_t>{2, 2, 0});

// Non-overlapping but not sorted.
CHECK(population.getAttribute<size_t>("E-mapping-good", Selection({{2, 4}, {0, 1}})) ==
std::vector<size_t>{2, 0, 2});

// Sorted, but overlapping.
CHECK(population.getAttribute<size_t>("E-mapping-good", Selection({{0, 2}, {1, 4}})) ==
std::vector<size_t>{2, 1, 1, 2, 0});

// Overlapping and not sorted.
CHECK(population.getAttribute<size_t>("E-mapping-good", Selection({{1, 4}, {0, 2}})) ==
std::vector<size_t>{1, 2, 0, 2, 1});

CHECK(population.getAttribute<std::string>("E-mapping-good", Selection({{0, 1}})) ==
std::vector<std::string>{"C"});
Expand Down

0 comments on commit d2e189e

Please sign in to comment.