From 9fd21165f4c6ab1fdb4f2d00d9cb83caa1ba4295 Mon Sep 17 00:00:00 2001 From: Kiran V Garimella Date: Mon, 14 Oct 2024 20:11:41 -0400 Subject: [PATCH] Kvg multiple windows (#33) * Find superbubbles in graph * When a superbubble has more than two paths, trim off the low quality ones --- src/hidive/src/correct.rs | 24 +- src/hidive/src/main.rs | 16 +- src/skydive/Cargo.toml | 1 + src/skydive/src/ldbg.rs | 456 ++++++++++++++++++++++++++------------ src/skydive/src/utils.rs | 13 +- 5 files changed, 342 insertions(+), 168 deletions(-) diff --git a/src/hidive/src/correct.rs b/src/hidive/src/correct.rs index 51dbca68..36434998 100644 --- a/src/hidive/src/correct.rs +++ b/src/hidive/src/correct.rs @@ -12,19 +12,19 @@ pub fn start( gfa_output: Option, kmer_size: usize, model_path: &PathBuf, - long_read_fasta_paths: &Vec, - short_read_fasta_paths: &Vec, + long_read_fasta_path: &PathBuf, + short_read_fasta_path: &PathBuf, ) { - let long_read_seq_urls = skydive::parse::parse_file_names(long_read_fasta_paths); - let short_read_seq_urls = skydive::parse::parse_file_names(short_read_fasta_paths); + let long_read_seq_urls = skydive::parse::parse_file_names(&[long_read_fasta_path.clone()]); + let short_read_seq_urls = skydive::parse::parse_file_names(&[short_read_fasta_path.clone()]); // Read all long reads. skydive::elog!("Processing long-read samples {:?}...", long_read_seq_urls.iter().map(|url| url.as_str()).collect::>()); - let all_lr_seqs = skydive::utils::read_fasta(long_read_fasta_paths); + let all_lr_seqs = skydive::utils::read_fasta(&vec![long_read_fasta_path.clone()]); // Read all short reads. skydive::elog!("Processing short-read samples {:?}...", short_read_seq_urls.iter().map(|url| url.as_str()).collect::>()); - let all_sr_seqs = skydive::utils::read_fasta(short_read_fasta_paths); + let all_sr_seqs = skydive::utils::read_fasta(&vec![short_read_fasta_path.clone()]); let l1 = LdBG::from_sequences("lr".to_string(), kmer_size, &all_lr_seqs); let s1 = LdBG::from_sequences("sr".to_string(), kmer_size, &all_sr_seqs); @@ -32,12 +32,9 @@ pub fn start( let m = MLdBG::from_ldbgs(vec![l1, s1]) .score_kmers(model_path) .collapse() - .clean_color_specific_paths(1, 0.2) - .clean_tangles(1, 100, 0.2) - .clean_branches(0.01) - .clean_tips(3*kmer_size, 0.01) - .clean_contigs(100) - .build_links(&all_lr_seqs, false); + .clean(0.2, 0.01) + .build_links(&all_lr_seqs, true) + ; skydive::elog!("Built MLdBG with {} k-mers.", m.kmers.len()); @@ -52,8 +49,7 @@ pub fn start( let progress_bar = skydive::utils::default_bounded_progress_bar("Correcting reads", all_lr_seqs.len() as u64); - let corrected_seqs = - all_lr_seqs + let corrected_seqs = all_lr_seqs .par_iter() .progress_with(progress_bar) .map(|seq| m.correct_seq(seq)) diff --git a/src/hidive/src/main.rs b/src/hidive/src/main.rs index 78aee02f..17d85e8d 100644 --- a/src/hidive/src/main.rs +++ b/src/hidive/src/main.rs @@ -288,13 +288,13 @@ enum Commands { #[clap(short, long, required = true, value_parser)] model_path: PathBuf, - /// FASTA files with short-read sequences (may contain one or more samples). - #[clap(short, long, required = false, value_parser)] - short_read_fasta_paths: Vec, - /// FASTA files with long-read sequences (may contain one or more samples). #[clap(required = true, value_parser)] - long_read_fasta_paths: Vec, + long_read_fasta_path: PathBuf, + + /// FASTA files with short-read sequences (may contain one or more samples). + #[clap(required = true, value_parser)] + short_read_fasta_path: PathBuf, }, /// Co-assemble target locus from long- and short-read data using a linked de Bruijn graph. @@ -421,10 +421,10 @@ fn main() { gfa_output, kmer_size, model_path, - long_read_fasta_paths, - short_read_fasta_paths, + long_read_fasta_path, + short_read_fasta_path, } => { - correct::start(&output, gfa_output, kmer_size, &model_path, &long_read_fasta_paths, &short_read_fasta_paths); + correct::start(&output, gfa_output, kmer_size, &model_path, &long_read_fasta_path, &short_read_fasta_path); } Commands::Coassemble { output, diff --git a/src/skydive/Cargo.toml b/src/skydive/Cargo.toml index f6d33a02..f541bfbb 100644 --- a/src/skydive/Cargo.toml +++ b/src/skydive/Cargo.toml @@ -35,6 +35,7 @@ openssl = { version = "0.10", features = ["vendored"] } path-absolutize = "3.1.1" parquet = "52.1.0" petgraph = "0.6.5" +poasta = { git = "https://github.com/broadinstitute/poasta.git" } #polars = { version = "*", features = ["parquet", "lazy", "csv-file", "strings", "temporal", "dtype-duration", "dtype-categorical", "concat_str", "list", "list_eval", "rank", "lazy_regex"]} pyo3 = { version = "0.20.0", features = ["abi3-py37", "extension-module"] } pyo3-asyncio = { version = "0.20.0", features = ["attributes", "async-std-runtime", "tokio-runtime"] } diff --git a/src/skydive/src/ldbg.rs b/src/skydive/src/ldbg.rs index 7a11c5e3..b3015fe6 100644 --- a/src/skydive/src/ldbg.rs +++ b/src/skydive/src/ldbg.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use petgraph::Direction::Incoming; use std::collections::{HashMap, HashSet, VecDeque}; use std::fs::File; @@ -13,6 +14,7 @@ use std::path::PathBuf; use petgraph::graph::{DiGraph, NodeIndex}; use petgraph::visit::EdgeRef; use petgraph::dot::{Dot, Config}; +use petgraph::visit::{Dfs, DfsEvent}; use indicatif::{ProgressIterator, ParallelProgressIterator}; use rayon::iter::IntoParallelRefIterator; @@ -32,6 +34,13 @@ type KmerScores = HashMap, f32>; type Links = HashMap, HashMap>; type Sources = HashMap, Vec>; +#[derive(Clone, Copy, PartialEq, Eq)] +enum NodeState { + NotVisited, + InProgress, + Done, +} + /// Represents a linked de Bruijn graph with a k-mer size specified at construction time. #[derive(Debug, Clone)] pub struct LdBG { @@ -856,60 +865,6 @@ impl LdBG { combined_seq } - fn try_combinatorially_combining_sequences(&self, corrected_segments: &Vec>) -> Vec> { - let mut segments = corrected_segments.clone(); - let mut combined = true; - - while combined { - combined = false; - let mut new_segments = Vec::new(); - - while let Some(segment_i) = segments.pop() { - let mut combined_i = false; - - for j in 0..segments.len() { - let segment_j = &segments[j]; - - let overlap_end = self.compute_overlap(&segment_i, segment_j, 9, self.kmer_size); - let overlap_start = self.compute_overlap(segment_j, &segment_i, 9, self.kmer_size); - - if overlap_end > 0 || overlap_start > 0 { - let combined_seq = if overlap_end >= overlap_start { - [&segment_i[..segment_i.len() - overlap_end], segment_j].concat() - } else { - [segment_j, &segment_i[overlap_start..]].concat() - }; - - new_segments.push(combined_seq); - segments.remove(j); - combined = true; - combined_i = true; - - break; - } - } - - if !combined_i { - new_segments.push(segment_i); - } - } - - segments = new_segments; - } - - segments - } - - fn compute_overlap(&self, seq1: &[u8], seq2: &[u8], min_len: usize, max_len: usize) -> usize { - for overlap_len in (min_len..=max_len).rev() { - if seq1[seq1.len() - overlap_len..] == seq2[..overlap_len] { - return overlap_len; - } - } - - 0 - } - fn remove_short_branches(&self, graph: &mut petgraph::Graph, min_size: usize) { // Find nodes with more than one outgoing edge and prune short branches let mut nodes_to_remove = HashSet::new(); @@ -1590,6 +1545,18 @@ impl LdBG { Some(contig[contig.len() - self.kmer_size as usize..].to_vec()) } + pub fn clean(self, lenient_threshold: f32, strict_threshold: f32) -> Self { + let kmer_size = self.kmer_size; + + self + .clean_color_specific_paths(1, lenient_threshold) + .clean_tangles(1, 100, lenient_threshold) + .clean_branches(strict_threshold) + .clean_tips(3*kmer_size, strict_threshold) + .clean_bubbles(2.0*lenient_threshold) + .clean_contigs(100) + } + pub fn clean_color_specific_paths(mut self, color: usize, min_score: f32) -> Self { let bad_cn_kmers = self.kmers .keys() @@ -1829,6 +1796,82 @@ impl LdBG { self } + pub fn clean_bubbles(mut self, min_score: f32) -> Self { + let mut to_remove = HashSet::new(); + let mut bad_bubbles: usize = 0; + + let g = self.traverse_all_kmers(); + let bubbles = find_all_superbubbles(&g); + + for (i, ((in_node, out_node), interior)) in bubbles.iter().enumerate() { + let paths_fwd = petgraph::algo::all_simple_paths::, _>(&g, *in_node, *out_node, 0, Some(interior.len())); + let paths_rev = petgraph::algo::all_simple_paths::, _>(&g, *out_node, *in_node, 0, Some(interior.len())); + + let mut paths = paths_fwd + .chain(paths_rev) + .map(|path| { + let score = path.iter() + .map(|node| { + let kmer = g.node_weight(*node).unwrap().as_bytes(); + let cn_kmer = crate::utils::canonicalize_kmer(kmer); + *self.scores.get(&cn_kmer).unwrap_or(&1.0) + }) + .sum::() / path.len() as f32; + + (path, score) + }) + .collect::, f32)>>(); + + paths.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap()); + + if paths.len() > 2 { + let mut to_keep = HashSet::new(); + let mut cleaned_bubble = false; + + for (j, (path, total_score)) in paths.iter().enumerate() { + // let mut contig = String::new(); + + for node in path { + let kmer = g.node_weight(*node).unwrap().as_bytes(); + let cn_kmer = crate::utils::canonicalize_kmer(kmer); + + if j < 2 { + to_keep.insert(cn_kmer.clone()); + } else if !to_keep.contains(&cn_kmer) && *total_score < min_score { + to_remove.insert(cn_kmer.clone()); + cleaned_bubble = true; + } + + // if contig.is_empty() { + // contig = String::from_utf8_lossy(kmer).to_string(); + // } else { + // contig.push_str(&String::from_utf8_lossy(&kmer[self.kmer_size - 1..])); + // } + } + + // crate::elog!(" -- Bubble {}: {} (score: {}) (remove: {})", i, contig, total_score, num_remove); + } + + // crate::elog!(""); + + if cleaned_bubble { + bad_bubbles += paths.len() - 2; + } + } + } + + for cn_kmer in &to_remove { + self.kmers.remove(cn_kmer); + self.scores.remove(cn_kmer); + } + + crate::elog!(" -- Removed {} bad bubbles ({} kmers)", bad_bubbles, to_remove.len()); + + self.infer_edges(); + + self + } + pub fn clean_contigs(mut self, min_contig_length: usize) -> Self { let mut to_remove = HashSet::new(); let mut bad_contigs: usize = 0; @@ -1873,94 +1916,6 @@ impl LdBG { self } - pub fn identify_superbubbles(&self) -> Vec> { - let graph = self.traverse_all_kmers(); - let mut superbubbles = Vec::new(); - - // Perform topological sort - let topo_order = match petgraph::algo::toposort(&graph, None) { - Ok(order) => order, - Err(_) => return superbubbles, // Graph has cycles, return empty vector - }; - - let mut visited = HashSet::new(); - let mut seen = HashSet::new(); - - for &s in &topo_order { - if visited.contains(&s) { - continue; - } - - let mut stack = vec![s]; - seen.clear(); - seen.insert(s); - - while let Some(v) = stack.pop() { - visited.insert(v); - - if graph.neighbors(v).count() == 0 { - break; - } - - let mut all_children_seen = true; - for u in graph.neighbors(v) { - if u == s { - break; - } - - if !seen.contains(&u) { - seen.insert(u); - stack.push(u); - all_children_seen = false; - } - } - - if stack.len() == 1 { - let t = stack[0]; - if !graph.contains_edge(s, t) { - if let Some(superbubble) = self.validate_superbubble(&graph, s, t) { - superbubbles.push(superbubble); - } - } - break; - } - - if all_children_seen { - break; - } - } - } - - superbubbles - } - - fn validate_superbubble(&self, graph: &DiGraph, s: NodeIndex, t: NodeIndex) -> Option> { - let mut superbubble = vec![t]; - let mut current = t; - - while current != s { - let parents: Vec<_> = graph.neighbors_directed(current, petgraph::Incoming).collect(); - if parents.len() != 1 { - return None; // Not a valid superbubble - } - current = parents[0]; - superbubble.push(current); - } - - superbubble.reverse(); - Some(superbubble) - } - - fn is_entrance(&self, graph: &DiGraph, v: NodeIndex) -> bool { - graph.neighbors_directed(v, petgraph::Incoming).count() <= 1 && - graph.neighbors_directed(v, petgraph::Outgoing).count() > 1 - } - - fn is_exit(&self, graph: &DiGraph, v: NodeIndex) -> bool { - graph.neighbors_directed(v, petgraph::Incoming).count() > 1 && - graph.neighbors_directed(v, petgraph::Outgoing).count() <= 1 - } - fn traverse_forward(&self, graph: &mut petgraph::Graph, visited: &mut HashMap, start_node: NodeIndex) { let mut fwd_stack = vec![start_node]; while let Some(node) = fwd_stack.pop() { @@ -2506,11 +2461,103 @@ fn navigate_forward(node: NodeIndex, steps: u8, graph: &petgraph::Graph, s: NodeIndex, direction: petgraph::Direction) -> Option<(NodeIndex, NodeIndex, Vec)> { + let mut seen = HashSet::new(); + let mut visited = HashSet::new(); + let mut nodes_inside = Vec::new(); + + // seen.insert((s.index(), direction)); + seen.insert(s.index()); + + let mut stack = vec![(s, direction)]; + while !stack.is_empty() { + + let (v, v_direction) = stack.pop().unwrap(); + visited.insert(v.index()); + + nodes_inside.push(v); + + // seen.remove(&(v.index(), v_direction)); + seen.remove(&v.index()); + + let children = match v_direction { + petgraph::Direction::Incoming => graph.neighbors_directed(v, petgraph::Direction::Incoming).collect::>(), + petgraph::Direction::Outgoing => graph.neighbors_directed(v, petgraph::Direction::Outgoing).collect::>(), + }; + + if children.is_empty() { + break; + } + + for u in children { + let (u_child_direction, u_parents) = if v_direction == petgraph::Direction::Outgoing { + (petgraph::Direction::Outgoing, graph.neighbors_directed(u, petgraph::Direction::Incoming).collect::>()) + } else { + (petgraph::Direction::Incoming, graph.neighbors_directed(u, petgraph::Direction::Outgoing).collect::>()) + }; + + if u.index() == s.index() { + stack.clear(); + break; + } + + seen.insert(u.index()); + + if u_parents.iter().all(|&n| visited.contains(&n.index())) { + stack.push((u, u_child_direction)); + } + } + + if stack.len() == 1 && seen.len() == 1 { + let (t, _) = stack.pop().unwrap(); + nodes_inside.push(t); + + if nodes_inside.len() == 2 { + break; + } + + nodes_inside.retain(|&x| x != s && x != t); + return Some((s, t, nodes_inside)); + } + } + + None +} + +pub fn find_all_superbubbles(graph: &petgraph::Graph) -> HashMap<(NodeIndex, NodeIndex), Vec> { + let mut bubbles = HashMap::new(); + + let mut visited: HashSet = HashSet::new(); + for n in graph.node_indices() { + if !visited.contains(&n) { + for d in [petgraph::Direction::Outgoing, petgraph::Direction::Incoming] { + let bubble = find_superbubble(&graph, n, d); + + if let Some(bubble) = bubble { + visited.extend(bubble.2.iter().cloned()); + + let key_fwd = (bubble.0, bubble.1); + let key_rev = (bubble.1, bubble.0); + + if !bubbles.contains_key(&key_fwd) && !bubbles.contains_key(&key_rev) { + bubbles.insert((bubble.0, bubble.1), bubble.2); + } + } + } + } + } + + bubbles +} + #[cfg(test)] mod tests { use super::*; use crate::edges::Edges; + use petgraph::data::DataMap; use rand::{Rng, SeedableRng}; use std::collections::BTreeMap; @@ -3183,4 +3230,129 @@ mod tests { } } } + + // From "BubbleGun: enumerating bubbles and superbubbles in genome graphs", Dabbaghie et al. 2022 + // https://academic.oup.com/bioinformatics/article/38/17/4217/6633304?login=false + // https://github.com/fawaz-dabbaghieh/bubble_gun/blob/master/example/paper_example2.gfa + fn create_bubblegun_graph() -> petgraph::Graph { + // Create a new directed graph + let mut graph = petgraph::Graph::::new(); + + // Create a HashMap to store node indices + let mut node_indices = std::collections::HashMap::new(); + + // Add nodes to the graph + let gfa_nodes = vec![ + ("1", "CCCAACAAGTG"), + ("7", "AACAAGTGTACTCATTG"), + ("25", "ACTCATTGACG"), + ("31", "CATTGACGTAAATTTGGTGCGGGCCTCAAGGTGTCCA"), + ("89", "GGTGTCCATTGGGGTC"), + ("105", "TTGGGGTCAGCACAAAT"), + ("123", "GCACAAATTGCCA"), + ("163", "CATTGACGAGGCACCC"), + ("179", "AGGCACCCGTGCATTAG"), + ("197", "TGCATTAGGCAGGGTGT"), + ("215", "CAGGGTGTCCA"), + ("237", "TTGGGGTCTGCACAAAT"), + ("271", "AACAAGTGAACTCATTG"), + ("311", "AGGCACCCCTGCATTAG"), + ("427", "CATTGACGCAACCGGCATTGAATACACAGGGTGT"), + ]; + + for (id, seq) in gfa_nodes { + let node_index = graph.add_node(seq.to_string()); + node_indices.insert(id.to_string(), node_index); + } + + // Add edges to the graph + let gfa_edges = vec![ + ("1", "7"), ("1", "271"), + ("7", "25"), + ("25", "31"), ("25", "427"), ("25", "163"), + ("31", "89"), + ("89", "237"), ("89", "105"), + ("105", "123"), + ("163", "179"), ("163", "311"), + ("179", "197"), + ("197", "215"), + ("215", "89"), + ("237", "123"), + ("271", "25"), + ("311", "197"), + ("427", "215"), + ]; + + for (from, to) in gfa_edges { + if let (Some(&from_index), Some(&to_index)) = (node_indices.get(from), node_indices.get(to)) { + graph.add_edge(from_index, to_index, 1.0); + } + } + graph + } + + #[test] + fn test_find_all_superbubbles() { + let graph = create_bubblegun_graph(); + + let expected_bubbles = HashMap::from([ + ((b"CCCAACAAGTG".to_vec(), b"ACTCATTGACG".to_vec()), 2usize), + // ((b"ACTCATTGACG".to_vec(), b"CCCAACAAGTG".to_vec()), 2usize), + + ((b"ACTCATTGACG".to_vec(), b"GGTGTCCATTGGGGTC".to_vec()), 7usize), + // ((b"GGTGTCCATTGGGGTC".to_vec(), b"ACTCATTGACG".to_vec()), 7usize), + + ((b"GGTGTCCATTGGGGTC".to_vec(), b"GCACAAATTGCCA".to_vec()), 2usize), + // ((b"GCACAAATTGCCA".to_vec(), b"GGTGTCCATTGGGGTC".to_vec()), 2usize), + ]); + + let bubbles = find_all_superbubbles(&graph); + assert_eq!(bubbles.len(), expected_bubbles.len()); + + for ((in_node, out_node), interior) in bubbles { + let in_seq = graph.node_weight(in_node).unwrap().as_bytes().to_vec(); + let out_seq = graph.node_weight(out_node).unwrap().as_bytes().to_vec(); + + let key_fwd = (in_seq.clone(), out_seq.clone()); + let key_rev = (out_seq.clone(), in_seq.clone()); + + assert!(expected_bubbles.contains_key(&key_fwd) || expected_bubbles.contains_key(&key_rev)); + + if expected_bubbles.contains_key(&key_fwd) { + assert_eq!(expected_bubbles.get(&key_fwd).unwrap(), &interior.len()); + } else { + assert_eq!(expected_bubbles.get(&key_rev).unwrap(), &interior.len()); + } + } + } + + #[test] + fn test_find_all_superbubbles_2() { + let graph = crate::utils::read_gfa("/Users/kiran/repositories/hidive/test.gfa").unwrap(); + + let bubbles = find_all_superbubbles(&graph); + + let start = b"GAGGGAACAGCGACTTC".to_vec(); + let end = b"TGGACACAGGCACCTGG".to_vec(); + for ((in_node, out_node), interior) in bubbles { + let in_seq = graph.node_weight(in_node).unwrap().as_bytes().to_vec(); + let out_seq = graph.node_weight(out_node).unwrap().as_bytes().to_vec(); + + if in_seq == start && out_seq == end { + println!("Found bubble: {}", interior.len()); + + for path in petgraph::algo::all_simple_paths::, _>(&graph, in_node, out_node, 0, Some(interior.len() + 10)) { + println!("out path: {}", path.len()); + } + + for path in petgraph::algo::all_simple_paths::, _>(&graph, out_node, in_node, 0, Some(interior.len() + 10)) { + println!("in path: {}", path.len()); + + for node in path { + println!(" - {}", graph.node_weight(node).unwrap()); + } + } + } + } + } } diff --git a/src/skydive/src/utils.rs b/src/skydive/src/utils.rs index f7e7a493..ae103e14 100644 --- a/src/skydive/src/utils.rs +++ b/src/skydive/src/utils.rs @@ -189,13 +189,18 @@ pub fn read_gfa>(path: P) -> std::io::Result node_map.insert(id.to_string(), node_index); }, "L" => { + if fields.len() < 6 { + continue; // Skip malformed lines + } let from_id = fields[1]; + // let from_orient = fields[2]; let to_id = fields[3]; - let weight = fields[5] - .split(':') - .last() + // let to_orient = fields[4]; + + let weight = fields.get(5) + .and_then(|s| s.split(':').last()) .and_then(|s| s.parse::().ok()) - .unwrap_or(1.0) / 100.0; + .unwrap_or(1.0); if let (Some(&from), Some(&to)) = (node_map.get(from_id), node_map.get(to_id)) { graph.add_edge(from, to, weight);