diff --git a/crates/set/CHANGELOG.md b/crates/set/CHANGELOG.md index b17f4c8..ed893b2 100644 --- a/crates/set/CHANGELOG.md +++ b/crates/set/CHANGELOG.md @@ -1,5 +1,22 @@ # `moc-set` Change Log +## 0.8.2 + +Released 2024-01-29 + +### Added + +* naive parallelism in 'mocset query': + we expect poor performances on HDD with cold cache but better ones with + SSDs with cold cache (parallel reading). + Performances does not seems to improve a lot so far on a single MVNe SSD. + +### Bug correction + +* No more 'panic' info showing-up on stderr when piping output in commands + endding the process before full write, such as 'head'. + + ## 0.8.1 Released 2023-12-20 diff --git a/crates/set/Cargo.toml b/crates/set/Cargo.toml index 471f5a9..d8d28d8 100644 --- a/crates/set/Cargo.toml +++ b/crates/set/Cargo.toml @@ -26,6 +26,7 @@ cdshealpix = "0.6.7" memmap = { package = "memmap2", version = "0.5" } clap = { version = "4.4", features = ["derive"] } byteorder = "1" +rayon = "1.8.1" [package.metadata.deb] maintainer = "F.-X. Pineau " diff --git a/crates/set/README.md b/crates/set/README.md index 9610e39..c433ced 100644 --- a/crates/set/README.md +++ b/crates/set/README.md @@ -110,7 +110,10 @@ Install from using `cargo`: ```bash cargo install --path crates/set ``` - +or +``` +RUSTFLAGS='-C target-cpu=native' cargo install --path crates/set +``` ## Usage diff --git a/crates/set/src/query.rs b/crates/set/src/query.rs index 0f3122e..7bd4302 100644 --- a/crates/set/src/query.rs +++ b/crates/set/src/query.rs @@ -2,12 +2,14 @@ use std::{ error::Error, fs::File, io::{BufRead, BufReader, Write}, + ops::Range, path::Path, path::PathBuf, str::FromStr, }; use clap::Parser; +use rayon::prelude::*; use cdshealpix::{best_starting_depth, has_best_starting_depth, nested}; @@ -31,7 +33,7 @@ use moclib::{ ranges::{BorrowedRanges, Ranges, SNORanges}, }; -use crate::{MocSetFileReader, StatusFlag}; +use crate::{FlagDepthId, MocSetFileReader, StatusFlag}; const HALF_PI: f64 = 0.5 * std::f64::consts::PI; const TWICE_PI: f64 = 2.0 * std::f64::consts::PI; @@ -48,6 +50,9 @@ pub struct Query { #[clap(short = 'c', long = "print-coverage")] /// Print in output the sky fraction (in '[0.0, 1.0]') covered by each selected MOC print_coverage: bool, + #[clap(short = 'p', long = "parallel")] + /// Switch on multi-threading, with 'parallel' threads + parallel: Option, #[clap(subcommand)] /// Sky region that overlap (or is included in) the select MOCs region: Region, @@ -142,6 +147,7 @@ impl Query { self.include_deprecated, move |ranges| ranges.contains_val(&idx32), move |ranges| ranges.contains_val(&idx64), + self.parallel, ) } else { exec_gen( @@ -149,6 +155,7 @@ impl Query { self.include_deprecated, move |ranges| ranges.contains_val(&idx32), move |ranges| ranges.contains_val(&idx64), + self.parallel, ) } } @@ -185,6 +192,7 @@ impl Query { self.include_deprecated, move |ranges| ranges.contains(&moc32_ref), move |ranges| ranges.contains(&moc64_ref), + self.parallel, ) } else { exec_gen( @@ -192,6 +200,7 @@ impl Query { self.include_deprecated, move |ranges| ranges.contains(&moc32_ref), move |ranges| ranges.contains(&moc64_ref), + self.parallel, ) } } else if self.print_coverage { @@ -200,6 +209,7 @@ impl Query { self.include_deprecated, move |ranges| ranges.intersects(&moc32_ref), move |ranges| ranges.intersects(&moc64_ref), + self.parallel, ) } else { exec_gen( @@ -207,6 +217,7 @@ impl Query { self.include_deprecated, move |ranges| ranges.intersects(&moc32_ref), move |ranges| ranges.intersects(&moc64_ref), + self.parallel, ) } } @@ -248,6 +259,7 @@ impl Query { self.include_deprecated, move |ranges| ranges.contains(&moc32_ref), move |ranges| ranges.contains(&moc64_ref), + self.parallel, ) } else { exec_gen( @@ -255,6 +267,7 @@ impl Query { self.include_deprecated, move |ranges| ranges.contains(&moc32_ref), move |ranges| ranges.contains(&moc64_ref), + self.parallel, ) } } else if self.print_coverage { @@ -263,6 +276,7 @@ impl Query { self.include_deprecated, move |ranges| ranges.intersects(&moc32_ref), move |ranges| ranges.intersects(&moc64_ref), + self.parallel, ) } else { exec_gen( @@ -270,6 +284,7 @@ impl Query { self.include_deprecated, move |ranges| ranges.intersects(&moc32_ref), move |ranges| ranges.intersects(&moc64_ref), + self.parallel, ) } } @@ -364,34 +379,73 @@ pub fn load_moc( } } -fn exec_gen(file: PathBuf, include_deprecated: bool, f: F, d: D) -> Result<(), Box> +fn exec_gen( + file: PathBuf, + include_deprecated: bool, + f: F, + d: D, + parallel: Option, +) -> Result<(), Box> where - F: Fn(&BorrowedRanges<'_, u32>) -> bool, - D: Fn(&BorrowedRanges<'_, u64>) -> bool, + F: Sync + Fn(&BorrowedRanges<'_, u32>) -> bool, + D: Sync + Fn(&BorrowedRanges<'_, u64>) -> bool, { let moc_set_reader = MocSetFileReader::new(file)?; let meta_it = moc_set_reader.meta().into_iter(); let bytes_it = moc_set_reader.index().into_iter(); - let stdout = std::io::stdout(); - let mut out = stdout.lock(); - let _ = writeln!(&mut out, "id"); - for (flg_depth_id, byte_range) in meta_it.zip(bytes_it) { - let id = flg_depth_id.identifier(); - let status = flg_depth_id.status(); - let depth = flg_depth_id.depth(); - if status == StatusFlag::Valid || (include_deprecated && status == StatusFlag::Deprecated) { - if depth <= Hpx::::MAX_DEPTH { - let ranges = moc_set_reader.ranges::(byte_range); - if f(&ranges) { - let _ = writeln!(&mut out, "{}", id); - } - } else { - let ranges = moc_set_reader.ranges::(byte_range); - if d(&ranges) { - let _ = writeln!(&mut out, "{}", id); + match parallel { + None => { + let stdout = std::io::stdout(); + let mut out = stdout.lock(); + let _ = writeln!(&mut out, "id"); + for (flg_depth_id, byte_range) in meta_it.zip(bytes_it) { + let id = flg_depth_id.identifier(); + let status = flg_depth_id.status(); + let depth = flg_depth_id.depth(); + if status == StatusFlag::Valid || (include_deprecated && status == StatusFlag::Deprecated) { + if depth <= Hpx::::MAX_DEPTH { + let ranges = moc_set_reader.ranges::(byte_range); + if f(&ranges) { + let _ = writeln!(&mut out, "{}", id); + } + } else { + let ranges = moc_set_reader.ranges::(byte_range); + if d(&ranges) { + let _ = writeln!(&mut out, "{}", id); + } + } } } } + Some(n_threads) => { + rayon::ThreadPoolBuilder::new() + .num_threads(n_threads as usize) + .build_global() + .unwrap(); + let elements: Vec<(FlagDepthId, Range)> = meta_it.zip(bytes_it).collect(); + let _ = writeln!(&mut std::io::stdout(), "id"); + elements + .into_par_iter() + .for_each(|(flg_depth_id, byte_range)| { + let id = flg_depth_id.identifier(); + let status = flg_depth_id.status(); + let depth = flg_depth_id.depth(); + if status == StatusFlag::Valid || (include_deprecated && status == StatusFlag::Deprecated) + { + if depth <= Hpx::::MAX_DEPTH { + let ranges = moc_set_reader.ranges::(byte_range); + if f(&ranges) { + let _ = writeln!(&mut std::io::stdout(), "{}", id); + } + } else { + let ranges = moc_set_reader.ranges::(byte_range); + if d(&ranges) { + let _ = writeln!(&mut std::io::stdout(), "{}", id); + } + } + } + }); + } } Ok(()) } @@ -401,46 +455,92 @@ fn exec_gen_with_coverage( include_deprecated: bool, f: F, d: D, + parallel: Option, ) -> Result<(), Box> where - F: Fn(&BorrowedRanges<'_, u32>) -> bool, - D: Fn(&BorrowedRanges<'_, u64>) -> bool, + F: Fn(&BorrowedRanges<'_, u32>) -> bool + Sync, + D: Fn(&BorrowedRanges<'_, u64>) -> bool + Sync, { let moc_set_reader = MocSetFileReader::new(file)?; let meta_it = moc_set_reader.meta().into_iter(); let bytes_it = moc_set_reader.index().into_iter(); - let stdout = std::io::stdout(); - let mut out = stdout.lock(); - let _ = writeln!(&mut out, "id,moc_coverage"); - for (flg_depth_id, byte_range) in meta_it.zip(bytes_it) { - let id = flg_depth_id.identifier(); - let status = flg_depth_id.status(); - let depth = flg_depth_id.depth(); - if status == StatusFlag::Valid || (include_deprecated && status == StatusFlag::Deprecated) { - if depth <= Hpx::::MAX_DEPTH { - let ranges = moc_set_reader.ranges::(byte_range); - if f(&ranges) { - let borrowed_moc_ranges = BorrowedMocRanges::<'_, u32, Hpx>::from(ranges); - let _ = writeln!( - &mut out, - "{},{:.6e}", - id, - borrowed_moc_ranges.coverage_percentage() - ); - } - } else { - let ranges = moc_set_reader.ranges::(byte_range); - if d(&ranges) { - let borrowed_moc_ranges = BorrowedMocRanges::<'_, u64, Hpx>::from(ranges); - let _ = writeln!( - &mut out, - "{},{:.6e}", - id, - borrowed_moc_ranges.coverage_percentage() - ); + match parallel { + None => { + let stdout = std::io::stdout(); + let mut out = stdout.lock(); + let _ = writeln!(&mut out, "id,moc_coverage"); + for (flg_depth_id, byte_range) in meta_it.zip(bytes_it) { + let id = flg_depth_id.identifier(); + let status = flg_depth_id.status(); + let depth = flg_depth_id.depth(); + if status == StatusFlag::Valid || (include_deprecated && status == StatusFlag::Deprecated) { + if depth <= Hpx::::MAX_DEPTH { + let ranges = moc_set_reader.ranges::(byte_range); + if f(&ranges) { + let borrowed_moc_ranges = BorrowedMocRanges::<'_, u32, Hpx>::from(ranges); + let _ = writeln!( + &mut out, + "{},{:.6e}", + id, + borrowed_moc_ranges.coverage_percentage() + ); + } + } else { + let ranges = moc_set_reader.ranges::(byte_range); + if d(&ranges) { + let borrowed_moc_ranges = BorrowedMocRanges::<'_, u64, Hpx>::from(ranges); + let _ = writeln!( + &mut out, + "{},{:.6e}", + id, + borrowed_moc_ranges.coverage_percentage() + ); + } + } } } } + Some(n_threads) => { + rayon::ThreadPoolBuilder::new() + .num_threads(n_threads as usize) + .build_global() + .unwrap(); + let elements: Vec<(FlagDepthId, Range)> = meta_it.zip(bytes_it).collect(); + let _ = writeln!(&mut std::io::stdout(), "id,moc_coverage"); + elements + .into_par_iter() + .for_each(|(flg_depth_id, byte_range)| { + let id = flg_depth_id.identifier(); + let status = flg_depth_id.status(); + let depth = flg_depth_id.depth(); + if status == StatusFlag::Valid || (include_deprecated && status == StatusFlag::Deprecated) + { + if depth <= Hpx::::MAX_DEPTH { + let ranges = moc_set_reader.ranges::(byte_range); + if f(&ranges) { + let borrowed_moc_ranges = BorrowedMocRanges::<'_, u32, Hpx>::from(ranges); + let _ = writeln!( + &mut std::io::stdout(), + "{},{:.6e}", + id, + borrowed_moc_ranges.coverage_percentage() + ); + } + } else { + let ranges = moc_set_reader.ranges::(byte_range); + if d(&ranges) { + let borrowed_moc_ranges = BorrowedMocRanges::<'_, u64, Hpx>::from(ranges); + let _ = writeln!( + &mut std::io::stdout(), + "{},{:.6e}", + id, + borrowed_moc_ranges.coverage_percentage() + ); + } + } + } + }); + } } Ok(()) }