Skip to content

Commit

Permalink
Enable fetching aligned and unaligned reads from CRAM files
Browse files Browse the repository at this point in the history
  • Loading branch information
kvg committed Jun 19, 2024
1 parent 5b95efe commit eeca860
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 39 deletions.
8 changes: 8 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ Prerequisites

Hidive is designed to access local files or data in Google Cloud Storage (GCS). Within certain cloud-computing environments (i.e. Terra, All of Us Researcher Workbench), access to GCS is already configured. For accessing files in GCS on your local machine, you will also need to install the `Google Cloud CLI <https://cloud.google.com/sdk/docs/install-sdk>`_. Then, configure your `Application Default Credentials (ADC) <https://cloud.google.com/docs/authentication/provide-credentials-adc#local-dev>`_.

If accessing `requester pays buckets <https://cloud.google.com/storage/docs/requester-pays>`_ buckets, set the following environment variable before running hidive commands:


.. code-block:: bash
export GCS_REQUESTER_PAYS_PROJECT=<Google Project ID>
Installation
------------
Expand Down
4 changes: 3 additions & 1 deletion src/hidive/src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ pub fn start(output: &PathBuf, loci_list: &Vec<String>, seq_paths: &Vec<PathBuf>
let r = skydive::stage::stage_data(&output, &loci, &seq_urls, &cache_path);

match r {
Ok(_) => {}
Ok(n) => {
skydive::elog!("Fetched {} records.", n)
}
Err(_) => {
panic!("Failed to write output FASTA.")
}
Expand Down
106 changes: 68 additions & 38 deletions src/skydive/src/stage.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Import the Result type from the anyhow crate for error handling.
use anyhow::Result;
use parquet::data_type::AsBytes;
use rust_htslib::bam::record::Aux;

// Import various standard library collections.
Expand All @@ -20,17 +21,15 @@ use rayon::prelude::*;
use rayon::iter::{ParallelIterator, IntoParallelRefIterator};

// Import types from rust_htslib for working with BAM files.
use rust_htslib::bam::{ self, IndexedReader, Read };
use rust_htslib::bam::{ self, FetchDefinition, IndexedReader, Read };
use rust_htslib::faidx::Reader;
use bio::io::fasta;

// Import functions for authorizing access to Google Cloud Storage.
use crate::env::{ gcs_authorize_data_access, local_guess_curl_ca_bundle };

// Function to open a BAM file from a URL and cache its contents locally.
fn open_bam(seqs_url: &Url, cache_path: &PathBuf) -> Result<IndexedReader> {
env::set_current_dir(cache_path).unwrap();

// Function to open a BAM/CRAM file from a URL and cache its contents locally.
fn open_bam(seqs_url: &Url) -> Result<IndexedReader> {
if env::var("GCS_OAUTH_TOKEN").is_err() {
gcs_authorize_data_access();
}
Expand Down Expand Up @@ -64,9 +63,7 @@ fn open_bam(seqs_url: &Url, cache_path: &PathBuf) -> Result<IndexedReader> {
}

// Function to open a FASTA file from a URL and cache its contents locally.
fn open_fasta(seqs_url: &Url, cache_path: &PathBuf) -> Result<Reader> {
env::set_current_dir(cache_path).unwrap();

fn open_fasta(seqs_url: &Url) -> Result<Reader> {
if env::var("GCS_OAUTH_TOKEN").is_err() {
gcs_authorize_data_access();
}
Expand Down Expand Up @@ -114,9 +111,8 @@ fn get_rg_to_sm_mapping(bam: &IndexedReader) -> HashMap<String, String> {
rg_sm_map
}

fn get_sm_name_from_rg(alignment: &bam::pileup::Alignment, rg_sm_map: &HashMap<String, String>) -> Result<String> {
let r = alignment.record();
let rg = r.aux(b"RG")?;
fn get_sm_name_from_rg(read: &bam::Record, rg_sm_map: &HashMap<String, String>) -> Result<String> {
let rg = read.aux(b"RG")?;

if let Aux::String(v) = rg {
if let Some(sm) = rg_sm_map.get(v) {
Expand All @@ -130,7 +126,7 @@ fn get_sm_name_from_rg(alignment: &bam::pileup::Alignment, rg_sm_map: &HashMap<S
}

// Function to extract seqs from a BAM file within a specified genomic region.
fn extract_bam_reads(basename: &String, bam: &mut IndexedReader, chr: &String, start: &u64, stop: &u64) -> Result<Vec<fasta::Record>> {
fn extract_aligned_bam_reads(basename: &String, bam: &mut IndexedReader, chr: &String, start: &u64, stop: &u64) -> Result<Vec<fasta::Record>> {
let rg_sm_map = get_rg_to_sm_mapping(&bam);

let mut bmap = HashMap::new();
Expand All @@ -142,7 +138,7 @@ fn extract_bam_reads(basename: &String, bam: &mut IndexedReader, chr: &String, s
if *start <= (pileup.pos() as u64) && (pileup.pos() as u64) < *stop {
for alignment in pileup.alignments() {
let qname = String::from_utf8_lossy(alignment.record().qname()).into_owned();
let sm = get_sm_name_from_rg(&alignment, &rg_sm_map)?;
let sm = get_sm_name_from_rg(&alignment.record(), &rg_sm_map)?;

let seq_name = format!("{}|{}", qname, sm);

Expand Down Expand Up @@ -180,6 +176,36 @@ fn extract_bam_reads(basename: &String, bam: &mut IndexedReader, chr: &String, s
Ok(records)
}

// Function to extract unaligned seqs from a BAM file
fn extract_unaligned_bam_reads(basename: &String, bam: &mut IndexedReader) -> Result<Vec<fasta::Record>> {
let rg_sm_map = get_rg_to_sm_mapping(&bam);

let _ = bam.fetch(FetchDefinition::Unmapped);
let records = bam
.records()
.map(|r| {
let read = r.unwrap();
let qname = String::from_utf8_lossy(read.qname()).into_owned();
let sm = get_sm_name_from_rg(&read, &rg_sm_map).unwrap();

let seq_name = format!("{}|{}", qname, sm);

let vseq = read.seq().as_bytes();
let bseq = vseq.as_bytes();

let seq = fasta::Record::with_attrs(
seq_name.as_str(),
Some(""),
bseq
);

seq
})
.collect();

Ok(records)
}

// Function to extract seqs from a FASTA file within a specified genomic region.
fn extract_fasta_seqs(basename: &String, fasta: &mut Reader, chr: &String, start: &u64, stop: &u64) -> Result<Vec<fasta::Record>> {
let id = format!("{}:{}-{}|{}", chr, start, stop, basename);
Expand All @@ -190,30 +216,35 @@ fn extract_fasta_seqs(basename: &String, fasta: &mut Reader, chr: &String, start
Ok(records)
}

// Function to stage data from a single BAM file.
// Function to stage data from a single file.
fn stage_data_from_one_file(
seqs_url: &Url,
loci: &HashSet<(String, u64, u64)>,
cache_path: &PathBuf,
loci: &HashSet<(String, u64, u64)>
) -> Result<Vec<fasta::Record>> {
let mut all_seqs = Vec::new();

let basename = seqs_url.path_segments().map(|c| c.collect::<Vec<_>>()).unwrap().last().unwrap().to_string();

let seqs_str = seqs_url.as_str();
if seqs_str.ends_with(".bam") {
// Handle BAM file processing
if seqs_str.ends_with(".bam") || seqs_str.ends_with(".cram") {
// Handle BAM/CRAM file processing
let basename = basename
.trim_end_matches(".bam")
.trim_end_matches(".cram")
.to_string();
let mut bam = open_bam(seqs_url, cache_path)?;
let mut bam = open_bam(seqs_url)?;

for (chr, start, stop) in loci.iter() {
// Extract seqs for the current locus.
let seqs = extract_bam_reads(&basename, &mut bam, chr, start, stop).unwrap();
let aligned_seqs = extract_aligned_bam_reads(&basename, &mut bam, chr, start, stop).unwrap();

// Extend the all_seqs vector with the seqs from the current locus.
all_seqs.extend(seqs);
all_seqs.extend(aligned_seqs);

if seqs_str.ends_with(".cram") {
let unaligned_seqs = extract_unaligned_bam_reads(&basename, &mut bam).unwrap();
all_seqs.extend(unaligned_seqs);
}
}
} else if seqs_str.ends_with(".fa") || seqs_str.ends_with(".fasta") || seqs_str.ends_with(".fa.gz") || seqs_str.ends_with(".fasta.gz") {
// Handle FASTA file processing
Expand All @@ -223,7 +254,7 @@ fn stage_data_from_one_file(
.trim_end_matches(".fa.gz")
.trim_end_matches(".fa")
.to_string();
let mut fasta = open_fasta(seqs_url, cache_path)?;
let mut fasta = open_fasta(seqs_url)?;

for (chr, start, stop) in loci.iter() {
// Extract seqs for the current locus.
Expand All @@ -232,10 +263,6 @@ fn stage_data_from_one_file(
// Extend the all_seqs vector with the seqs from the current locus.
all_seqs.extend(seqs);
}
} else if seqs_str.ends_with(".cram") {
// Handle CRAM file processing

!unimplemented!();
} else {
// Handle unknown file extension
return Err(anyhow::anyhow!("Unsupported file type: {}", seqs_url));
Expand All @@ -248,7 +275,6 @@ fn stage_data_from_one_file(
fn stage_data_from_all_files(
seq_urls: &HashSet<Url>,
loci: &HashSet<(String, u64, u64)>,
cache_path: &PathBuf,
) -> Result<Vec<fasta::Record>> {

// Use a parallel iterator to process multiple BAM files concurrently.
Expand All @@ -257,7 +283,7 @@ fn stage_data_from_all_files(
.map(|seqs_url| {
// Define an operation to stage data from one file.
let op = || {
let seqs = stage_data_from_one_file(seqs_url, loci, cache_path)?;
let seqs = stage_data_from_one_file(seqs_url, loci)?;
Ok(seqs)
};

Expand All @@ -279,12 +305,9 @@ fn stage_data_from_all_files(
}

// Function to retrieve the header of a BAM file.
fn get_bam_header(
seqs_url: &Url,
cache_path: &PathBuf,
) -> Result<bam::HeaderView> {
fn get_bam_header(seqs_url: &Url) -> Result<bam::HeaderView> {
// Open the BAM file.
let bam = open_bam(seqs_url, cache_path)?;
let bam = open_bam(seqs_url)?;

// Return the header of the BAM file.
Ok(bam.header().to_owned())
Expand All @@ -304,15 +327,20 @@ pub fn stage_data(
loci: &HashSet<(String, u64, u64)>,
seq_urls: &HashSet<Url>,
cache_path: &PathBuf
) -> Result<()> {
) -> Result<usize> {
let current_dir = env::current_dir()?;
env::set_current_dir(cache_path).unwrap();

// Stage data from all BAM files.
let all_data = match stage_data_from_all_files(seq_urls, loci, cache_path) {
let all_data = match stage_data_from_all_files(seq_urls, loci) {
Ok(all_data) => all_data,
Err(e) => {
panic!("Error: {}", e);
}
};

env::set_current_dir(current_dir).unwrap();

// Write to a FASTA file.
let mut buf_writer = BufWriter::new(File::create(output_path)?);
let mut fasta_writer = fasta::Writer::new(&mut buf_writer);
Expand All @@ -321,7 +349,9 @@ pub fn stage_data(
fasta_writer.write_record(record)?;
}

Ok(())
let _ = fasta_writer.flush();

Ok(all_data.len())
}

#[cfg(test)]
Expand All @@ -340,7 +370,7 @@ mod tests {
).unwrap();
let cache_path = std::env::temp_dir();

let bam = open_bam(&seqs_url, &cache_path);
let bam = open_bam(&seqs_url);

assert!(bam.is_ok(), "Failed to open bam file");
}
Expand All @@ -353,7 +383,7 @@ mod tests {
let loci = HashSet::from([("chr15".to_string(), 23960193, 23963918)]);
let cache_path = std::env::temp_dir();

let result = stage_data_from_one_file(&seqs_url, &loci, &cache_path);
let result = stage_data_from_one_file(&seqs_url, &loci);

assert!(result.is_ok(), "Failed to stage data from one file");
}
Expand Down

0 comments on commit eeca860

Please sign in to comment.