diff --git a/Cargo.lock b/Cargo.lock index b41daf32..9fff0685 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1602,7 +1602,7 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] name = "hidive" -version = "0.1.15" +version = "0.1.16" dependencies = [ "anyhow", "bio", @@ -3391,7 +3391,7 @@ dependencies = [ [[package]] name = "skydive" -version = "0.1.15" +version = "0.1.16" dependencies = [ "anyhow", "backoff", diff --git a/go.sh b/go.sh index 1ea473c5..def8c326 100644 --- a/go.sh +++ b/go.sh @@ -4,28 +4,12 @@ set -euxo pipefail cargo build --release -#./target/release/hidive fetch \ -# -l chr6:29,940,532-29,947,870 \ -# -o HG002.HLA-A.fasta \ -# gs://fc-bb12940d-f7ba-4515-8a98-42de84f63c34/HPRC_grch38/PBCCSWholeGenome/HG002/alignments/HG002.bam -# -#./target/release/hidive fetch \ -# -l chr6:29,940,532-29,947,870 \ -# -o HG00438.HLA-A.fasta \ -# gs://fc-bb12940d-f7ba-4515-8a98-42de84f63c34/HPRC_grch38/PBCCSWholeGenome/HG00438/alignments/HG00438.bam -# -#./target/release/hidive fetch \ -# -l chr6:29,940,532-29,947,870 \ -# -o HG02559.HLA-A.fasta \ -# gs://fc-bb12940d-f7ba-4515-8a98-42de84f63c34/HPRC_grch38/PBCCSWholeGenome/HG02559/alignments/HG02559.bam - ./target/release/hidive fetch \ -l chr6:29,940,532-29,947,870 \ -o HLA-A.fasta \ gs://fc-bb12940d-f7ba-4515-8a98-42de84f63c34/HPRC_grch38/PBCCSWholeGenome/HG002/alignments/HG002.bam \ gs://fc-bb12940d-f7ba-4515-8a98-42de84f63c34/HPRC_grch38/PBCCSWholeGenome/HG00438/alignments/HG00438.bam \ - gs://fc-bb12940d-f7ba-4515-8a98-42de84f63c34/HPRC_grch38/PBCCSWholeGenome/HG02559/alignments/HG02559.bam - -#cat HG002.HLA-A.fasta HG00438.HLA-A.fasta HG02559.HLA-A.fasta > HLA-A.fasta + gs://fc-bb12940d-f7ba-4515-8a98-42de84f63c34/HPRC_grch38/PBCCSWholeGenome/HG02559/alignments/HG02559.bam \ + gs://broad-dsde-methods-long-reads/resources/references/grch38_noalt/GCA_000001405.15_GRCh38_no_alt_analysis_set.fa -./target/release/hidive build -l chr6:29,940,532-29,947,870 -o HLA-A.gfa HLA-A.fasta GCA_000001405.15_GRCh38_no_alt_analysis_set.fa +#./target/release/hidive build -l chr6:29,940,532-29,947,870 -o HLA-A.gfa HLA-A.fasta GCA_000001405.15_GRCh38_no_alt_analysis_set.fa diff --git a/src/skydive/src/stage.rs b/src/skydive/src/stage.rs index d1325216..9929ea07 100644 --- a/src/skydive/src/stage.rs +++ b/src/skydive/src/stage.rs @@ -21,6 +21,7 @@ use rayon::iter::{ParallelIterator, IntoParallelRefIterator}; // Import types from rust_htslib for working with BAM files. use rust_htslib::bam::{ self, IndexedReader, Read }; +use rust_htslib::faidx::{ self, Reader }; use bio::io::fasta; // Import functions for authorizing access to Google Cloud Storage. @@ -62,6 +63,42 @@ fn open_bam(seqs_url: &Url, cache_path: &PathBuf) -> Result { Ok(bam) } +// Function to open a FASTA file from a URL and cache its contents locally. +fn open_fasta(seqs_url: &Url, cache_path: &PathBuf) -> Result { + env::set_current_dir(cache_path).unwrap(); + + if env::var("GCS_OAUTH_TOKEN").is_err() { + gcs_authorize_data_access(); + } + + // Try to open the FASTA file from the URL, with retries for authorization. + let fasta = match Reader::from_url(seqs_url) { + Ok(fasta) => fasta, + Err(_) => { + eprintln!("[{}] Read '{}', attempt 2 (reauthorizing to GCS)", chrono::Local::now().format("%Y-%m-%d %H:%M:%S"), seqs_url); + + // If opening fails, try authorizing access to Google Cloud Storage. + gcs_authorize_data_access(); + + // Try opening the BAM file again. + match Reader::from_url(seqs_url) { + Ok(bam) => bam, + Err(_) => { + eprintln!("[{}] Read '{}', attempt 3 (overriding cURL CA bundle)", chrono::Local::now().format("%Y-%m-%d %H:%M:%S"), seqs_url); + + // If it still fails, guess the cURL CA bundle path. + local_guess_curl_ca_bundle(); + + // Try one last time to open the FASTA file. + Reader::from_url(seqs_url)? + } + } + } + }; + + Ok(fasta) +} + // Function to get a mapping between read group and sample name from a BAM header. fn get_rg_to_sm_mapping(bam: &IndexedReader) -> HashMap { let header = bam::Header::from_template(bam.header()); @@ -143,6 +180,17 @@ fn extract_bam_reads(bam: &mut IndexedReader, chr: &String, start: &u64, stop: & Ok(records) } +// Function to extract seqs from a FASTA file within a specified genomic region. +fn extract_fasta_seqs(fasta: &mut Reader, chr: &String, start: &u64, stop: &u64) -> Result> { + let id = format!("{}:{}-{}", chr, start, stop); + let seq = fasta.fetch_seq_string(chr, *start as usize, *stop as usize).unwrap(); + + let records = vec![fasta::Record::with_attrs(id.as_str(), None, seq.as_bytes())]; + + Ok(records) +} + + // Function to stage data from a single BAM file. fn stage_data_from_one_file( seqs_url: &Url, @@ -151,33 +199,37 @@ fn stage_data_from_one_file( ) -> Result> { let mut all_seqs = Vec::new(); - let extension = seqs_url.path().split('.').last().unwrap_or_default(); - match extension { - "bam" => { - // Handle BAM file processing - let mut bam = open_bam(seqs_url, cache_path)?; + let seqs_str = seqs_url.as_str(); + if seqs_str.ends_with(".bam") { + // Handle BAM file processing + let mut bam = open_bam(seqs_url, cache_path)?; - for (chr, start, stop) in loci.iter() { - // Extract seqs for the current locus. - let seqs = extract_bam_reads(&mut bam, chr, start, stop).unwrap(); + for (chr, start, stop) in loci.iter() { + // Extract seqs for the current locus. + let seqs = extract_bam_reads(&mut bam, chr, start, stop).unwrap(); - // Extend the all_seqs vector with the seqs from the current locus. - all_seqs.extend(seqs); - } - }, - "cram" => { - // Handle CRAM file processing - }, - "fa" | "fasta" => { - // Handle FASTA file processing - }, - "fa.gz" | "fasta.gz" => { - // Handle compressed FASTA file processing - }, - _ => { - // Handle unknown file extension - return Err(anyhow::anyhow!("Unsupported file extension: {}", extension)); - }, + // Extend the all_seqs vector with the seqs from the current locus. + all_seqs.extend(seqs); + } + } else if seqs_str.ends_with(".fa") || seqs_str.ends_with(".fasta") || seqs_str.ends_with(".fa.gz") || seqs_str.ends_with(".fasta.gz") { + // Handle FASTA file processing + + let mut fasta = open_fasta(seqs_url, cache_path)?; + + for (chr, start, stop) in loci.iter() { + // Extract seqs for the current locus. + let seqs = extract_fasta_seqs(&mut fasta, chr, start, stop).unwrap(); + + // Extend the all_seqs vector with the seqs from the current locus. + all_seqs.extend(seqs); + } + } else if seqs_str.ends_with(".cram") { + // Handle CRAM file processing + + !unimplemented!(); + } else { + // Handle unknown file extension + return Err(anyhow::anyhow!("Unsupported file type: {}", seqs_url)); } Ok(all_seqs)