Skip to content

Commit

Permalink
Kvg fix docker deps (#44)
Browse files Browse the repository at this point in the history
* Fix Docker dependencies

* Perform phasing in parallel chunks
  • Loading branch information
kvg authored Dec 1, 2024
1 parent dfcde8f commit 25004d3
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 104 deletions.
3 changes: 2 additions & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ FROM python:3.9-slim

# Install some libraries in final image
RUN apt-get update && \
apt-get install -y --no-install-recommends fontconfig libcurl4-gnutls-dev && \
apt-get install -y --no-install-recommends fontconfig libcurl4-gnutls-dev \
libbz2-dev zlib1g-dev libncurses5-dev libncursesw5-dev liblzma-dev && \
rm -rf /var/lib/apt/lists/*

# Copy necessary files from builder stage
Expand Down
101 changes: 5 additions & 96 deletions src/hidive/src/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,10 @@ use std::collections::{BTreeMap, HashMap, HashSet};
use std::{fs::File, path::PathBuf, io::Write};

use itertools::Itertools;
use linked_hash_map::LinkedHashMap;
use minimap2::Aligner;
use needletail::Sequence;
use petgraph::graph::NodeIndex;
use rayon::prelude::*;
use indicatif::ParallelProgressIterator;

use rust_htslib::bam::{FetchDefinition, Read};
use skydive::ldbg::LdBG;
use skydive::mldbg::MLdBG;

use skydive::wmec::*;
use spoa::AlignmentType;

pub fn start(
output: &PathBuf,
Expand Down Expand Up @@ -42,19 +33,16 @@ pub fn start(
let mut bam = skydive::stage::open_bam(&bam_url).unwrap();

skydive::elog!("Processing locus {} ({}:{}-{})...", name, chr, start, stop);
// skydive::elog!("Reference sequence: {}", seq);

let mut read_ids = HashMap::new();
let mut matrix = Vec::new();
let mut metadata: BTreeMap<u32, String> = BTreeMap::new();
let mut mloci = Vec::new();

let _ = bam.fetch(FetchDefinition::RegionString(chr.as_bytes(), start as i64, stop as i64));
for (cursor, p) in bam.pileup().enumerate() {
for p in bam.pileup() {
let pileup = p.unwrap();

// skydive::elog!("Processing position {}:{}-{} {}...", chr, start, stop, pileup.pos() + 1);

if pileup.pos() + 1 < start as u32 || pileup.pos() + 1 > stop as u32 {
continue;
}
Expand Down Expand Up @@ -116,20 +104,15 @@ pub fn start(
}

if is_variant {
// skydive::elog!("Variant found at {}:{} {} ({:?})", chr, pileup.pos() + 1, ref_base, alleles);
matrix.push(allele_map);
// metadata.push((pileup.pos(), ref_base));
// metadata.insert(cursor, ref_base);
metadata.insert(pileup.pos() + 1, String::from_utf8(vec![ref_base]).unwrap());
mloci.push(pileup.pos());
}
}

let (h1, h2) = phase_variants(&matrix);
skydive::elog!(" - phasing {} variants...", mloci.len());

skydive::elog!("Haplotype 1: {:?} ({})", h1, h1.len());
skydive::elog!("Haplotype 2: {:?} ({})", h2, h2.len());
skydive::elog!("Metadata: {:?}", metadata);
let (h1, h2) = phase_variants(&matrix);

let mut hap_alleles = HashMap::new();
for ((pos, a1), a2) in mloci.iter().zip(h1.iter()).zip(h2.iter()) {
Expand Down Expand Up @@ -237,14 +220,11 @@ fn phase_variants(matrix: &Vec<BTreeMap<usize, (String, u8)>>) -> (Vec<Option<St

let wmec_matrix = WMECData::new(reads, confidences);

let (p1, p2) = skydive::wmec::phase(&wmec_matrix);
let (p1, p2) = skydive::wmec::phase_all(&wmec_matrix, 30, 20);

let mut h1 = Vec::new();
let mut h2 = Vec::new();
for i in 0..p1.len() {
// h1.push(all_alleles[i].get(&p1[i]).unwrap().clone());
// h2.push(all_alleles[i].get(&p2[i]).unwrap().clone());

let a1 = all_alleles[i].get(&p1[i]).cloned();
let a2 = all_alleles[i].get(&p2[i]).cloned();

Expand All @@ -253,75 +233,4 @@ fn phase_variants(matrix: &Vec<BTreeMap<usize, (String, u8)>>) -> (Vec<Option<St
}

(h1, h2)
}

fn create_read_allele_matrix(lr_msas: &Vec<String>) -> Vec<BTreeMap<usize, String>> {
let mut matrix = Vec::new();

let mut index1 = 0;
while index1 < lr_msas[0].len() {
let combined_base_counts = allele_counts(lr_msas, index1, index1+1);

if combined_base_counts.len() > 1 {
let mut index2 = index1;
let mut allele_base_counts = allele_counts(lr_msas, index2, index2+1);

while index2 < lr_msas[0].len() && allele_base_counts.len() > 1 {
index2 += 1;
allele_base_counts = allele_counts(lr_msas, index2, index2+1);
}

let allele_counts = allele_counts(lr_msas, index1, index2)
.into_iter()
.sorted_by(|a, b| b.1.cmp(&a.1))
.take(2)
.collect::<BTreeMap<_, _>>();

// println!("{} {} {:?}", index1, index2, allele_counts);

if allele_counts.len() == 2 {
let mut column = BTreeMap::new();

let alleles = allele_indices(lr_msas, index1, index2);

let mut allele_index = 0;
for (allele, _) in &allele_counts {
alleles.iter().enumerate().for_each(|(i, a)| {
if *a == *allele {
// column.insert(i, allele.clone());
column.insert(i, allele_index.to_string());
}
});

allele_index += 1;
}

matrix.push(column);
}

index1 = index2;
} else {
index1 += 1;
}
}

matrix
}

fn allele_indices(lr_msas: &Vec<String>, index1: usize, index2: usize) -> Vec<String> {
let alleles = lr_msas.iter()
.map(|msa| msa[index1..index2].to_string().replace(" ", ""))
.collect::<Vec<String>>();
alleles
}

fn allele_counts(lr_msas: &Vec<String>, index1: usize, index2: usize) -> BTreeMap<String, i32> {
let combined_allele_counts = lr_msas.iter()
.map(|msa| msa[index1..index2].to_string().replace(" ", ""))
.filter(|allele| !allele.is_empty())
.fold(BTreeMap::new(), |mut counts, base| {
*counts.entry(base).or_insert(0) += 1;
counts
});
combined_allele_counts
}
}
2 changes: 1 addition & 1 deletion src/hidive/src/correct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ pub fn start(
let mut qual = vec![0; joined_seq.len()];
for i in 0..prob.len() {
prob[i] = prob[i].powf(1.0/count[i] as f32);
qual[i] = (-10.0*(1.0 - (prob[i] - 0.0001).max(0.0)).log10()) as u8 + 33;
qual[i] = ((-10.0*(1.0 - (prob[i] - 0.0001).max(0.0)).log10()) as u8).clamp(1, 40);
}

let _ = writeln!(file, "@{}\n{}\n+\n{}", id, String::from_utf8(joined_seq).unwrap(), String::from_utf8_lossy(&qual));
Expand Down
87 changes: 87 additions & 0 deletions src/skydive/src/wmec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use std::collections::{BTreeSet, HashMap};
use std::fs::File;
use std::io::Write;

use rayon::prelude::*;

#[derive(Debug)]
pub struct WMECData {
pub reads: Vec<Vec<Option<u8>>>, // Reads matrix where None represents missing data
Expand Down Expand Up @@ -231,6 +233,91 @@ pub fn phase(data: &WMECData) -> (Vec<u8>, Vec<u8>) {
backtrack_haplotypes(data, &dp, &backtrack)
}

#[must_use]
pub fn phase_all(data: &WMECData, window: usize, stride: usize) -> (Vec<u8>, Vec<u8>) {
// First, collect all window ranges
let mut windows: Vec<_> = (0..data.num_snps)
.step_by(stride)
.map(|start| (start, (start + window).min(data.num_snps)))
.collect();

// Filter out last window if it completely overlaps with previous window
if let Some(&(_, prev_end)) = windows.get(windows.len() - 2) {
if let Some(&(_, last_end)) = windows.last() {
if last_end == prev_end {
windows.pop();
}
}
}

// crate::elog!("Windows: {:?}", windows);

// Process windows in parallel
let haplotype_pairs: Vec<_> = windows.par_iter()
.map(|&(start, end)| {
let (window_reads, window_confidences): (Vec<Vec<Option<u8>>>, Vec<Vec<Option<u32>>>) = data.reads.iter().zip(data.confidences.iter())
.filter_map(|(read, confidence)| {
let window_read = read[start..end].to_vec();
if window_read.iter().any(|x| x.is_some()) {
Some((window_read, confidence[start..end].to_vec()))
} else {
None
}
})
.unzip();

let window_data = WMECData::new(window_reads, window_confidences);

// crate::elog!("Processing window {} to {} ({})", start, end, window_data.num_snps);

let (mut dp, mut backtrack) = initialize_dp(&window_data);

for snp in 1..window_data.num_snps {
update_dp(&window_data, &mut dp, &mut backtrack, snp);
}

backtrack_haplotypes(&window_data, &dp, &backtrack)
})
.collect();

let mut haplotype1 = Vec::new();
let mut haplotype2 = Vec::new();

let overlap = window - stride;

for (hap1, hap2) in haplotype_pairs {
if haplotype1.len() == 0 {
haplotype1 = hap1;
haplotype2 = hap2;
} else {
// Compare overlap regions to determine orientation
let h1_overlap = &haplotype1[haplotype1.len()-overlap..];
let h2_overlap = &haplotype2[haplotype2.len()-overlap..];
let new_overlap = &hap1[..overlap];

// Count matches between overlapping regions
let h1_matches = h1_overlap.iter().zip(new_overlap.iter())
.filter(|(a,b)| a == b)
.count();
let h2_matches = h2_overlap.iter().zip(new_overlap.iter())
.filter(|(a,b)| a == b)
.count();

// Append new haplotypes in correct orientation based on best overlap match
if h1_matches >= h2_matches {
haplotype1.extend_from_slice(&hap1[overlap..]);
haplotype2.extend_from_slice(&hap2[overlap..]);
} else {
haplotype1.extend_from_slice(&hap2[overlap..]);
haplotype2.extend_from_slice(&hap1[overlap..]);
}

}
}

(haplotype1, haplotype2)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
62 changes: 56 additions & 6 deletions wdl/HidiveCorrect.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,32 @@ workflow Hidive {
short_read_fasta = Rescue.fasta,
}

call Align {
call Align as AlignReads {
input:
reference = reference,
sequences = Correct.fasta,
}

call Call {
input:
locus = locus,
reference = reference,
aligned_reads_bam = AlignReads.aligned_bam,
aligned_reads_bai = AlignReads.aligned_bai,
}

call Align as AlignHaplotypes {
input:
reference = reference,
sequences = Call.fasta,
}

output {
File corrected_fa = Correct.fasta
File aligned_bam = Align.aligned_bam
File aligned_bai = Align.aligned_bai
File aligned_reads_bam = AlignReads.aligned_bam
File aligned_reads_bai = AlignReads.aligned_bai
File aligned_haplotypes_bam = AlignHaplotypes.aligned_bam
File aligned_haplotypes_bai = AlignHaplotypes.aligned_bai
}
}

Expand Down Expand Up @@ -174,18 +190,52 @@ task Align {
command <<<
set -euxo pipefail

minimap2 -t ~{num_cpus} -ayYL -x ~{preset} ~{reference} ~{sequences} | samtools sort --write-index -O BAM -o ~{prefix}.bam
grep '^@' -A1 ~{sequences} | grep -v -- "^--$" | sed 's/^@/>/' | \
minimap2 -t ~{num_cpus} -ayYL -x ~{preset} ~{reference} - | \
samtools sort --write-index -O BAM -o ~{prefix}.bam
>>>

output {
File aligned_bam = "~{prefix}.bam"
File aligned_bai = "~{prefix}.bam.bai"
File aligned_bai = "~{prefix}.bam.csi"
}

runtime {
docker: "us.gcr.io/broad-dsp-lrma/lr-hidive:kvg_phase"
docker: "us.gcr.io/broad-dsp-lrma/lr-hidive:kvg_fix_docker_deps"
memory: "32 GB"
cpu: num_cpus
disks: "local-disk 100 SSD"
}
}

task Call {
input {
File reference
File aligned_reads_bam
File aligned_reads_bai

String locus
String prefix = "out"

Int num_cpus = 4
}

Int disk_size_gb = 4

command <<<
set -euxo pipefail

hidive call -l "~{locus}" -r ~{reference} ~{aligned_reads_bam} > ~{prefix}.fa
>>>

output {
File fasta = "~{prefix}.fa"
}

runtime {
docker: "us.gcr.io/broad-dsp-lrma/lr-hidive:kvg_phase"
memory: "4 GB"
cpu: num_cpus
disks: "local-disk ~{disk_size_gb} SSD"
}
}

0 comments on commit 25004d3

Please sign in to comment.