Skip to content

Commit

Permalink
Add naive multithreading to 'mocset query'
Browse files Browse the repository at this point in the history
  • Loading branch information
fxpineau committed Jan 29, 2024
1 parent e252b0a commit e61dc50
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 53 deletions.
17 changes: 17 additions & 0 deletions crates/set/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
# `moc-set` Change Log

## 0.8.2

Released 2024-01-29

### Added

* naive parallelism in 'mocset query':
we expect poor performances on HDD with cold cache but better ones with
SSDs with cold cache (parallel reading).
Performances does not seems to improve a lot so far on a single MVNe SSD.

### Bug correction

* No more 'panic' info showing-up on stderr when piping output in commands
endding the process before full write, such as 'head'.


## 0.8.1

Released 2023-12-20
Expand Down
1 change: 1 addition & 0 deletions crates/set/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ cdshealpix = "0.6.7"
memmap = { package = "memmap2", version = "0.5" }
clap = { version = "4.4", features = ["derive"] }
byteorder = "1"
rayon = "1.8.1"

[package.metadata.deb]
maintainer = "F.-X. Pineau <[email protected]>"
Expand Down
5 changes: 4 additions & 1 deletion crates/set/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ Install from using `cargo`:
```bash
cargo install --path crates/set
```

or
```
RUSTFLAGS='-C target-cpu=native' cargo install --path crates/set
```

## Usage

Expand Down
204 changes: 152 additions & 52 deletions crates/set/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ use std::{
error::Error,
fs::File,
io::{BufRead, BufReader, Write},
ops::Range,
path::Path,
path::PathBuf,
str::FromStr,
};

use clap::Parser;
use rayon::prelude::*;

use cdshealpix::{best_starting_depth, has_best_starting_depth, nested};

Expand All @@ -31,7 +33,7 @@ use moclib::{
ranges::{BorrowedRanges, Ranges, SNORanges},
};

use crate::{MocSetFileReader, StatusFlag};
use crate::{FlagDepthId, MocSetFileReader, StatusFlag};

const HALF_PI: f64 = 0.5 * std::f64::consts::PI;
const TWICE_PI: f64 = 2.0 * std::f64::consts::PI;
Expand All @@ -48,6 +50,9 @@ pub struct Query {
#[clap(short = 'c', long = "print-coverage")]
/// Print in output the sky fraction (in '[0.0, 1.0]') covered by each selected MOC
print_coverage: bool,
#[clap(short = 'p', long = "parallel")]
/// Switch on multi-threading, with 'parallel' threads
parallel: Option<u16>,
#[clap(subcommand)]
/// Sky region that overlap (or is included in) the select MOCs
region: Region,
Expand Down Expand Up @@ -142,13 +147,15 @@ impl Query {
self.include_deprecated,
move |ranges| ranges.contains_val(&idx32),
move |ranges| ranges.contains_val(&idx64),
self.parallel,
)
} else {
exec_gen(
self.file,
self.include_deprecated,
move |ranges| ranges.contains_val(&idx32),
move |ranges| ranges.contains_val(&idx64),
self.parallel,
)
}
}
Expand Down Expand Up @@ -185,13 +192,15 @@ impl Query {
self.include_deprecated,
move |ranges| ranges.contains(&moc32_ref),
move |ranges| ranges.contains(&moc64_ref),
self.parallel,
)
} else {
exec_gen(
self.file,
self.include_deprecated,
move |ranges| ranges.contains(&moc32_ref),
move |ranges| ranges.contains(&moc64_ref),
self.parallel,
)
}
} else if self.print_coverage {
Expand All @@ -200,13 +209,15 @@ impl Query {
self.include_deprecated,
move |ranges| ranges.intersects(&moc32_ref),
move |ranges| ranges.intersects(&moc64_ref),
self.parallel,
)
} else {
exec_gen(
self.file,
self.include_deprecated,
move |ranges| ranges.intersects(&moc32_ref),
move |ranges| ranges.intersects(&moc64_ref),
self.parallel,
)
}
}
Expand Down Expand Up @@ -248,13 +259,15 @@ impl Query {
self.include_deprecated,
move |ranges| ranges.contains(&moc32_ref),
move |ranges| ranges.contains(&moc64_ref),
self.parallel,
)
} else {
exec_gen(
self.file,
self.include_deprecated,
move |ranges| ranges.contains(&moc32_ref),
move |ranges| ranges.contains(&moc64_ref),
self.parallel,
)
}
} else if self.print_coverage {
Expand All @@ -263,13 +276,15 @@ impl Query {
self.include_deprecated,
move |ranges| ranges.intersects(&moc32_ref),
move |ranges| ranges.intersects(&moc64_ref),
self.parallel,
)
} else {
exec_gen(
self.file,
self.include_deprecated,
move |ranges| ranges.intersects(&moc32_ref),
move |ranges| ranges.intersects(&moc64_ref),
self.parallel,
)
}
}
Expand Down Expand Up @@ -364,34 +379,73 @@ pub fn load_moc<R: BufRead>(
}
}

fn exec_gen<F, D>(file: PathBuf, include_deprecated: bool, f: F, d: D) -> Result<(), Box<dyn Error>>
fn exec_gen<F, D>(
file: PathBuf,
include_deprecated: bool,
f: F,
d: D,
parallel: Option<u16>,
) -> Result<(), Box<dyn Error>>
where
F: Fn(&BorrowedRanges<'_, u32>) -> bool,
D: Fn(&BorrowedRanges<'_, u64>) -> bool,
F: Sync + Fn(&BorrowedRanges<'_, u32>) -> bool,
D: Sync + Fn(&BorrowedRanges<'_, u64>) -> bool,
{
let moc_set_reader = MocSetFileReader::new(file)?;
let meta_it = moc_set_reader.meta().into_iter();
let bytes_it = moc_set_reader.index().into_iter();
let stdout = std::io::stdout();
let mut out = stdout.lock();
let _ = writeln!(&mut out, "id");
for (flg_depth_id, byte_range) in meta_it.zip(bytes_it) {
let id = flg_depth_id.identifier();
let status = flg_depth_id.status();
let depth = flg_depth_id.depth();
if status == StatusFlag::Valid || (include_deprecated && status == StatusFlag::Deprecated) {
if depth <= Hpx::<u32>::MAX_DEPTH {
let ranges = moc_set_reader.ranges::<u32>(byte_range);
if f(&ranges) {
let _ = writeln!(&mut out, "{}", id);
}
} else {
let ranges = moc_set_reader.ranges::<u64>(byte_range);
if d(&ranges) {
let _ = writeln!(&mut out, "{}", id);
match parallel {
None => {
let stdout = std::io::stdout();
let mut out = stdout.lock();
let _ = writeln!(&mut out, "id");
for (flg_depth_id, byte_range) in meta_it.zip(bytes_it) {
let id = flg_depth_id.identifier();
let status = flg_depth_id.status();
let depth = flg_depth_id.depth();
if status == StatusFlag::Valid || (include_deprecated && status == StatusFlag::Deprecated) {
if depth <= Hpx::<u32>::MAX_DEPTH {
let ranges = moc_set_reader.ranges::<u32>(byte_range);
if f(&ranges) {
let _ = writeln!(&mut out, "{}", id);
}
} else {
let ranges = moc_set_reader.ranges::<u64>(byte_range);
if d(&ranges) {
let _ = writeln!(&mut out, "{}", id);
}
}
}
}
}
Some(n_threads) => {
rayon::ThreadPoolBuilder::new()
.num_threads(n_threads as usize)
.build_global()
.unwrap();
let elements: Vec<(FlagDepthId, Range<usize>)> = meta_it.zip(bytes_it).collect();
let _ = writeln!(&mut std::io::stdout(), "id");
elements
.into_par_iter()
.for_each(|(flg_depth_id, byte_range)| {
let id = flg_depth_id.identifier();
let status = flg_depth_id.status();
let depth = flg_depth_id.depth();
if status == StatusFlag::Valid || (include_deprecated && status == StatusFlag::Deprecated)
{
if depth <= Hpx::<u32>::MAX_DEPTH {
let ranges = moc_set_reader.ranges::<u32>(byte_range);
if f(&ranges) {
let _ = writeln!(&mut std::io::stdout(), "{}", id);
}
} else {
let ranges = moc_set_reader.ranges::<u64>(byte_range);
if d(&ranges) {
let _ = writeln!(&mut std::io::stdout(), "{}", id);
}
}
}
});
}
}
Ok(())
}
Expand All @@ -401,46 +455,92 @@ fn exec_gen_with_coverage<F, D>(
include_deprecated: bool,
f: F,
d: D,
parallel: Option<u16>,
) -> Result<(), Box<dyn Error>>
where
F: Fn(&BorrowedRanges<'_, u32>) -> bool,
D: Fn(&BorrowedRanges<'_, u64>) -> bool,
F: Fn(&BorrowedRanges<'_, u32>) -> bool + Sync,
D: Fn(&BorrowedRanges<'_, u64>) -> bool + Sync,
{
let moc_set_reader = MocSetFileReader::new(file)?;
let meta_it = moc_set_reader.meta().into_iter();
let bytes_it = moc_set_reader.index().into_iter();
let stdout = std::io::stdout();
let mut out = stdout.lock();
let _ = writeln!(&mut out, "id,moc_coverage");
for (flg_depth_id, byte_range) in meta_it.zip(bytes_it) {
let id = flg_depth_id.identifier();
let status = flg_depth_id.status();
let depth = flg_depth_id.depth();
if status == StatusFlag::Valid || (include_deprecated && status == StatusFlag::Deprecated) {
if depth <= Hpx::<u32>::MAX_DEPTH {
let ranges = moc_set_reader.ranges::<u32>(byte_range);
if f(&ranges) {
let borrowed_moc_ranges = BorrowedMocRanges::<'_, u32, Hpx<u32>>::from(ranges);
let _ = writeln!(
&mut out,
"{},{:.6e}",
id,
borrowed_moc_ranges.coverage_percentage()
);
}
} else {
let ranges = moc_set_reader.ranges::<u64>(byte_range);
if d(&ranges) {
let borrowed_moc_ranges = BorrowedMocRanges::<'_, u64, Hpx<u64>>::from(ranges);
let _ = writeln!(
&mut out,
"{},{:.6e}",
id,
borrowed_moc_ranges.coverage_percentage()
);
match parallel {
None => {
let stdout = std::io::stdout();
let mut out = stdout.lock();
let _ = writeln!(&mut out, "id,moc_coverage");
for (flg_depth_id, byte_range) in meta_it.zip(bytes_it) {
let id = flg_depth_id.identifier();
let status = flg_depth_id.status();
let depth = flg_depth_id.depth();
if status == StatusFlag::Valid || (include_deprecated && status == StatusFlag::Deprecated) {
if depth <= Hpx::<u32>::MAX_DEPTH {
let ranges = moc_set_reader.ranges::<u32>(byte_range);
if f(&ranges) {
let borrowed_moc_ranges = BorrowedMocRanges::<'_, u32, Hpx<u32>>::from(ranges);
let _ = writeln!(
&mut out,
"{},{:.6e}",
id,
borrowed_moc_ranges.coverage_percentage()
);
}
} else {
let ranges = moc_set_reader.ranges::<u64>(byte_range);
if d(&ranges) {
let borrowed_moc_ranges = BorrowedMocRanges::<'_, u64, Hpx<u64>>::from(ranges);
let _ = writeln!(
&mut out,
"{},{:.6e}",
id,
borrowed_moc_ranges.coverage_percentage()
);
}
}
}
}
}
Some(n_threads) => {
rayon::ThreadPoolBuilder::new()
.num_threads(n_threads as usize)
.build_global()
.unwrap();
let elements: Vec<(FlagDepthId, Range<usize>)> = meta_it.zip(bytes_it).collect();
let _ = writeln!(&mut std::io::stdout(), "id,moc_coverage");
elements
.into_par_iter()
.for_each(|(flg_depth_id, byte_range)| {
let id = flg_depth_id.identifier();
let status = flg_depth_id.status();
let depth = flg_depth_id.depth();
if status == StatusFlag::Valid || (include_deprecated && status == StatusFlag::Deprecated)
{
if depth <= Hpx::<u32>::MAX_DEPTH {
let ranges = moc_set_reader.ranges::<u32>(byte_range);
if f(&ranges) {
let borrowed_moc_ranges = BorrowedMocRanges::<'_, u32, Hpx<u32>>::from(ranges);
let _ = writeln!(
&mut std::io::stdout(),
"{},{:.6e}",
id,
borrowed_moc_ranges.coverage_percentage()
);
}
} else {
let ranges = moc_set_reader.ranges::<u64>(byte_range);
if d(&ranges) {
let borrowed_moc_ranges = BorrowedMocRanges::<'_, u64, Hpx<u64>>::from(ranges);
let _ = writeln!(
&mut std::io::stdout(),
"{},{:.6e}",
id,
borrowed_moc_ranges.coverage_percentage()
);
}
}
}
});
}
}
Ok(())
}
Expand Down

0 comments on commit e61dc50

Please sign in to comment.