From 297a7cecdfbf717fb5812b170dc8358dd52beddb Mon Sep 17 00:00:00 2001 From: VJalili Date: Mon, 8 Jan 2024 10:59:34 -0500 Subject: [PATCH 01/21] Move carrot-based tests to the tests directory. --- .gitignore | 4 ++-- .../carrot}/ExpansionHunterDenovo/casecontrol/eval.wdl | 0 .../casecontrol/eval_input_defaults.json | 0 .../casecontrol/simulated_data/eval_input.json | 0 .../casecontrol/simulated_data/test_input.json | 0 .../casecontrol/test_input_defaults.json | 0 {carrot => tests/carrot}/README.md | 0 {carrot => tests/carrot}/carrot_helper.py | 2 +- 8 files changed, 3 insertions(+), 3 deletions(-) rename {carrot => tests/carrot}/ExpansionHunterDenovo/casecontrol/eval.wdl (100%) rename {carrot => tests/carrot}/ExpansionHunterDenovo/casecontrol/eval_input_defaults.json (100%) rename {carrot => tests/carrot}/ExpansionHunterDenovo/casecontrol/simulated_data/eval_input.json (100%) rename {carrot => tests/carrot}/ExpansionHunterDenovo/casecontrol/simulated_data/test_input.json (100%) rename {carrot => tests/carrot}/ExpansionHunterDenovo/casecontrol/test_input_defaults.json (100%) rename {carrot => tests/carrot}/README.md (100%) rename {carrot => tests/carrot}/carrot_helper.py (99%) diff --git a/.gitignore b/.gitignore index b38e78e18..71ec16e6a 100644 --- a/.gitignore +++ b/.gitignore @@ -15,5 +15,5 @@ src/svtk/svtk/utils/utils.pyc src/svtk/svtk/vcfcluster.pyc /inputs/build/ /inputs/values/google_cloud.*.json -/carrot/.runs.json -/carrot/.configs.json +/tests/carrot/.runs.json +/tests/carrot/.configs.json diff --git a/carrot/ExpansionHunterDenovo/casecontrol/eval.wdl b/tests/carrot/ExpansionHunterDenovo/casecontrol/eval.wdl similarity index 100% rename from carrot/ExpansionHunterDenovo/casecontrol/eval.wdl rename to tests/carrot/ExpansionHunterDenovo/casecontrol/eval.wdl diff --git a/carrot/ExpansionHunterDenovo/casecontrol/eval_input_defaults.json b/tests/carrot/ExpansionHunterDenovo/casecontrol/eval_input_defaults.json similarity index 100% rename from carrot/ExpansionHunterDenovo/casecontrol/eval_input_defaults.json rename to tests/carrot/ExpansionHunterDenovo/casecontrol/eval_input_defaults.json diff --git a/carrot/ExpansionHunterDenovo/casecontrol/simulated_data/eval_input.json b/tests/carrot/ExpansionHunterDenovo/casecontrol/simulated_data/eval_input.json similarity index 100% rename from carrot/ExpansionHunterDenovo/casecontrol/simulated_data/eval_input.json rename to tests/carrot/ExpansionHunterDenovo/casecontrol/simulated_data/eval_input.json diff --git a/carrot/ExpansionHunterDenovo/casecontrol/simulated_data/test_input.json b/tests/carrot/ExpansionHunterDenovo/casecontrol/simulated_data/test_input.json similarity index 100% rename from carrot/ExpansionHunterDenovo/casecontrol/simulated_data/test_input.json rename to tests/carrot/ExpansionHunterDenovo/casecontrol/simulated_data/test_input.json diff --git a/carrot/ExpansionHunterDenovo/casecontrol/test_input_defaults.json b/tests/carrot/ExpansionHunterDenovo/casecontrol/test_input_defaults.json similarity index 100% rename from carrot/ExpansionHunterDenovo/casecontrol/test_input_defaults.json rename to tests/carrot/ExpansionHunterDenovo/casecontrol/test_input_defaults.json diff --git a/carrot/README.md b/tests/carrot/README.md similarity index 100% rename from carrot/README.md rename to tests/carrot/README.md diff --git a/carrot/carrot_helper.py b/tests/carrot/carrot_helper.py similarity index 99% rename from carrot/carrot_helper.py rename to tests/carrot/carrot_helper.py index e9474b292..ff46a3128 100644 --- a/carrot/carrot_helper.py +++ b/tests/carrot/carrot_helper.py @@ -19,7 +19,7 @@ # The directories where the WDLs # and their tests are located. -WDLS_DIR_RELATIVE = "../wdl" +WDLS_DIR_RELATIVE = "../../wdl" WDLS_DIR = "wdl" WDLS_TEST_DIR = "wdl_test" From 896f69d2cd95f9a60de078e6c24d9a74ad0796c5 Mon Sep 17 00:00:00 2001 From: VJalili Date: Tue, 2 Apr 2024 12:58:50 -0400 Subject: [PATCH 02/21] Scripts to create downsampled files. --- tests/utilities/README.md | 20 +++ tests/utilities/downsamplers.py | 120 ++++++++++++++ tests/utilities/generate_test_data.py | 215 ++++++++++++++++++++++++++ tests/utilities/requirements.txt | 2 + 4 files changed, 357 insertions(+) create mode 100644 tests/utilities/README.md create mode 100644 tests/utilities/downsamplers.py create mode 100644 tests/utilities/generate_test_data.py create mode 100644 tests/utilities/requirements.txt diff --git a/tests/utilities/README.md b/tests/utilities/README.md new file mode 100644 index 000000000..8461833e7 --- /dev/null +++ b/tests/utilities/README.md @@ -0,0 +1,20 @@ + +# Installation + +1. Create a virtual environment: + + ```shell + virtualenv .venv + ``` + +2. Activate the virtual environment: + + ```shell + source .venv/bin/activate + ``` + +3. Install requirements: + + ```shell + pip install -r requirements.txt + ``` \ No newline at end of file diff --git a/tests/utilities/downsamplers.py b/tests/utilities/downsamplers.py new file mode 100644 index 000000000..60cb76942 --- /dev/null +++ b/tests/utilities/downsamplers.py @@ -0,0 +1,120 @@ +import pysam + +from typing import Callable, List +from dataclasses import dataclass +from pathlib import Path + +@dataclass +class Region: + chr: str + start: int + end: int + + +class BaseDownsampler: + def __init__(self, working_dir: str, callback: Callable[[str, ...], dict]): + # Convert the string to an ABS path and make sure it exists. + self.working_dir = Path(working_dir).resolve(strict=True) + self.callback = callback + + def get_output_filename(self, input_filename, output_prefix): + return str(self.working_dir.joinpath(f"{output_prefix}{Path(input_filename).name}")) + + @staticmethod + def get_supported_file_types() -> List[str]: + """ + The file types should include the '.' prefix to match with Path().suffix output + (e.g., it should return '.cram' instead of 'cram'). + """ + raise NotImplementedError() + + def downsample(self, input_filename: str, output_prefix: str, regions: List[Region]) -> dict: + raise NotImplementedError() + + +class CramDownsampler(BaseDownsampler): + def __init__(self, working_dir, callback: Callable[[str, str], dict]): + super().__init__(working_dir, callback) + + @staticmethod + def get_supported_file_types() -> List[str]: + return [".cram"] + + def downsample(self, input_filename: str, output_prefix: str, regions: List[Region]) -> dict: + output_filename = self.get_output_filename(input_filename, output_prefix) + with \ + pysam.AlignmentFile(input_filename, "rc") as input_cram_file, \ + pysam.AlignmentFile(output_filename, "wc", + header=input_cram_file.header, + reference_names=input_cram_file.references) as output_cram_file: + for region in regions: + for read in input_cram_file.fetch(region=f"{region.chr}:{region.start}-{region.end}"): + output_cram_file.write(read) + index_filename = f"{output_filename}.crai" + pysam.index(output_filename, index_filename) + return self.callback(output_filename, index_filename) + + +class VcfDownsampler(BaseDownsampler): + def __init__(self, working_dir, callback: Callable[[str], dict]): + super().__init__(working_dir, callback) + + @staticmethod + def get_supported_file_types() -> List[str]: + return [".vcf"] + + def downsample(self, input_filename: str, output_prefix: str, regions: List[Region]) -> dict: + output_filename = self.get_output_filename(input_filename, output_prefix) + with \ + pysam.VariantFile(input_filename) as input_file, \ + pysam.VariantFile(output_filename, "w", header=input_file.header) as output_file: + for record in input_file: + for region in regions: + if record.contig == region.chr and region.start <= record.pos <= region.end: + output_file.write(record) + return self.callback(output_filename) + + +class IntervalListDownsampler(BaseDownsampler): + def __init__(self, working_dir, callback: Callable[[str], dict]): + super().__init__(working_dir, callback) + + @staticmethod + def get_supported_file_types() -> List[str]: + return [".interval_list"] + + def downsample(self, input_filename: str, output_prefix: str, regions: List[Region]) -> dict: + output_filename = self.get_output_filename(input_filename, output_prefix) + # Note that this algorithm is not efficient. + with open(input_filename, "r") as input_file: + with open(output_filename, "w") as output_file: + for line in input_file: + if line.startswith("@"): + output_file.write(line) + else: + cols = line.rstrip().split() + chr, start, end = cols[0], int(cols[1]), int(cols[2]) + for region in regions: + if chr == region.chr and start <= region.end and end >= region.start: + output_file.write(line) + return self.callback(output_filename) + + +class PrimaryContigsDownsampler(BaseDownsampler): + def __init__(self, working_dir, callback: Callable[[str], dict], delimiter: str="\t"): + super().__init__(working_dir, callback) + self.delimiter = delimiter + + @staticmethod + def get_supported_file_types() -> List[str]: + return [] + + def downsample(self, input_filename: str, output_prefix: str, regions: List[Region]) -> dict: + output_filename = self.get_output_filename(input_filename, output_prefix) + include_chrs = set([r.chr for r in regions]) + with open(input_filename, "r") as input_file, open(output_filename, "w") as output_file: + for line in input_file: + cols = line.strip().split(self.delimiter) + if cols[0] in include_chrs: + output_file.write(self.delimiter.join(cols) + "\n") + return self.callback(output_filename) diff --git a/tests/utilities/generate_test_data.py b/tests/utilities/generate_test_data.py new file mode 100644 index 000000000..082e430e5 --- /dev/null +++ b/tests/utilities/generate_test_data.py @@ -0,0 +1,215 @@ +import argparse +import downsamplers +import json +import logging +import os + +from pathlib import Path +from dataclasses import dataclass +from google.cloud import storage +from typing import Callable, List, Type, Union + + +logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO) + + +@dataclass +class Region: + chr: str + start: int + end: int + + +@dataclass +class Handler: + downsampler: Union[downsamplers.BaseDownsampler, Type[downsamplers.BaseDownsampler]] + callback: Callable[[str, ...], dict] + + +SUBJECT_WORKFLOW_INPUTS = { + "GatherSampleEvidence": { + "bam_or_cram_file": Handler( + downsamplers.CramDownsampler, + lambda cram, index: {"bam_or_cram_file": cram, "bam_or_cram_index": index} + ), + "preprocessed_intervals": Handler( + downsamplers.IntervalListDownsampler, + lambda x: {"preprocessed_intervals": x} + ), + "sd_locs_vcf": Handler( + downsamplers.VcfDownsampler, + lambda x: {"sd_locs_vcf": x} + ), + "primary_contigs_list": Handler( + downsamplers.PrimaryContigsDownsampler, + lambda x: {"primary_contigs_list": x} + ), + "primary_contigs_fai": Handler( + downsamplers.PrimaryContigsDownsampler, + lambda x: {"primary_contigs_fai": x} + ) + } +} + + +def parse_target_regions(input_filename): + regions = [] + with open(input_filename, "r") as f: + for line in f: + cols = line.strip().split("\t") + if len(cols) != 3: + raise ValueError( + f"Invalid line in {input_filename}. Expected a line with three columns, " + f"chr, start, and stop positions, found: {repr(line.strip())}") + regions.append(Region(str(cols[0]), int(cols[1]), int(cols[2]))) + return regions + + +def localize_file(input_filename, output_filename): + if os.path.isfile(output_filename): + return + if input_filename.startswith("gs://"): + logging.info(f"Localizing from GCP; blob: {input_filename} ...") + download_blob_from_gs(input_filename, output_filename) + logging.info(f"Finished localizing blob {input_filename}") + else: + raise NotImplementedError() + + +def initialize_downsamplers(working_dir: str): + for _, inputs in SUBJECT_WORKFLOW_INPUTS.items(): + for _, handler in inputs.items(): + handler.downsampler = handler.downsampler(working_dir, handler.callback) + + +def update_workflow_json(working_dir: str, input_filename: str, output_filename: str, output_filename_prefix: str, regions: List[Region], bucket_name: str=None, blob_name: str=None): + with open(input_filename, "r") as f: + workflow_inputs = json.load(f) + + for k, v in dict(workflow_inputs).items(): + # Example of the following split: + # k="a.b.c" --> workflow_name="a.b", input_var="c" + workflow_name, input_var = k.rsplit(".", maxsplit=1) + + try: + handler = SUBJECT_WORKFLOW_INPUTS[workflow_name][input_var] + except KeyError: + # This workflow input is not set to be downsampled. + continue + + logging.info(f"Processing input {k}.") + workflow_input_local_filename = Path(working_dir).joinpath(Path(v).name) + localize_file(v, workflow_input_local_filename) + #downsampler = downsampler_factory.get_downsampler(Path(workflow_input_local_filename).suffix) + updated_files = handler.downsampler.downsample(workflow_input_local_filename, output_filename_prefix, regions) + #updated_files = downsampler.downsample(workflow_input_local_filename, output_filename_prefix, regions, callback) + if bucket_name is not None and blob_name is not None: + for varname, filename in updated_files.items(): + logging.info(f"Uploading downsampled file {filename} to bucket {bucket_name}.") + blob = upload_to_gs_blob(filename, bucket_name, blob_name) + logging.info(f"Finished uploading {filename}.") + workflow_inputs[f"{workflow_name}.{varname}"] = blob + logging.info(f"Creating output JSON {output_filename}.") + with open(output_filename, "w") as f: + json.dump(workflow_inputs, f, indent=4) + logging.info(f"Finished creating output JSON {output_filename}.") + + +def upload_to_gs_blob(filename, bucket_name, blob_name): + path = Path(filename) + blob_name = blob_name + "/" + path.stem + "".join(path.suffix) + blob = storage.Client().bucket(bucket_name).blob(blob_name) + blob.upload_from_filename(filename) + return "gs://" + bucket_name + "/" + blob_name + + +def download_blob_from_gs(gs_link, local_filename): + bucket_name, blob_name = gs_link.split("/", maxsplit=3)[2:] + blob = storage.Client().bucket(bucket_name).blob(blob_name) + blob.download_to_filename(local_filename) + + +def main(): + parser = argparse.ArgumentParser( + description="This is a utility script to downsample the inputs of a workflow " + "in order to run the workflow faster or prepare data for unit testing." + "In addition to other inputs, the script takes a JSON file containing the inputs to a workflow, " + "downsamples the inputs according to the defined rules (see `SUBJECT_WORKFLOW_INPUTS`), " + "pushes the downsampled files to a given cloud storage, and creates a new JSON " + "file with the updated downsampled inputs.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + parser.add_argument( + "-w", "--working-dir", + default=os.getcwd(), + help="Sets the working directory where the downsampled files will be stored before pushed to a cloud storage." + ) + + parser.add_argument( + "input_workflow_json", + help="Sets a JSON filename containing the inputs to a workflow." + ) + + parser.add_argument( + "--output-workflow-json", + help="Sets a JSON filename containing the updated input arguments from the input JSON." + "The default value is a JSON file created in the working directory with the same " + "name as the input JSON with an added prefix." + ) + + parser.add_argument( + "--output-filename-prefix", + default="downsampled_", + help="Sets a prefix to be added to all the output files generated." + ) + + parser.add_argument( + "--bucket-name", + help="Sets the cloud bucket name where the downsampled files will be pushed. " + "The script skips uploading to cloud if a value for this argument is not provided." + ) + + parser.add_argument( + "--blob-name", + help="Sets the cloud blob name where the downsampled files will be pushed." + "The script skips uploading to cloud if a value for this argument is not provided." + ) + + this_script_folder = os.path.dirname(os.path.abspath(__file__)) + gatk_sv_path = os.path.dirname(os.path.dirname(this_script_folder)) + + parser.add_argument( + "--target-regions", + default=os.path.join(gatk_sv_path, "tests", "utilities", "default_downsampling_regions.bed"), + help="Sets a BED filename containing target regions for downsampling, " + "such that the downsampled files contains data from the input files overlapping these regions." + ) + + args = parser.parse_args() + + regions = parse_target_regions(args.target_regions) + logging.info(f"Found {len(regions)} target regions for downsampling.") + + output_workflow_json = args.output_workflow_json + if not output_workflow_json: + output_workflow_json = os.path.join( + args.working_dir, + f"{args.output_filename_prefix}{Path(args.input_workflow_json).name}" + ) + + initialize_downsamplers(args.working_dir) + + update_workflow_json( + working_dir=args.working_dir, + input_filename=args.input_workflow_json, + output_filename=output_workflow_json, + output_filename_prefix=args.output_filename_prefix, + regions=regions, + bucket_name=args.bucket_name, + blob_name=args.blob_name) + + logging.info("All process finished successfully.") + + +if __name__ == '__main__': + main() diff --git a/tests/utilities/requirements.txt b/tests/utilities/requirements.txt new file mode 100644 index 000000000..6e16d8d3b --- /dev/null +++ b/tests/utilities/requirements.txt @@ -0,0 +1,2 @@ +google-cloud-storage +pysam From 3eae196b17622f303c0bdb2c94d7003214441a9d Mon Sep 17 00:00:00 2001 From: VJalili Date: Thu, 4 Apr 2024 12:31:57 -0400 Subject: [PATCH 03/21] Fix lint errors. --- tests/utilities/downsamplers.py | 41 +++++++++++++++++---------- tests/utilities/generate_test_data.py | 6 ++-- 2 files changed, 29 insertions(+), 18 deletions(-) diff --git a/tests/utilities/downsamplers.py b/tests/utilities/downsamplers.py index 60cb76942..6cd2c24e3 100644 --- a/tests/utilities/downsamplers.py +++ b/tests/utilities/downsamplers.py @@ -4,6 +4,7 @@ from dataclasses import dataclass from pathlib import Path + @dataclass class Region: chr: str @@ -47,9 +48,9 @@ def downsample(self, input_filename: str, output_prefix: str, regions: List[Regi pysam.AlignmentFile(output_filename, "wc", header=input_cram_file.header, reference_names=input_cram_file.references) as output_cram_file: - for region in regions: - for read in input_cram_file.fetch(region=f"{region.chr}:{region.start}-{region.end}"): - output_cram_file.write(read) + for region in regions: + for read in input_cram_file.fetch(region=f"{region.chr}:{region.start}-{region.end}"): + output_cram_file.write(read) index_filename = f"{output_filename}.crai" pysam.index(output_filename, index_filename) return self.callback(output_filename, index_filename) @@ -76,6 +77,17 @@ def downsample(self, input_filename: str, output_prefix: str, regions: List[Regi class IntervalListDownsampler(BaseDownsampler): + # Implementation note: + # An alternative to the down sampling approach implemented here is to take + # a BED file containing target regions as input, and convert the BED to .interval_list + # as the following. + # + # > java -jar picard.jar BedToIntervalList I=regions.bed O=regions.interval_list SD=/Homo_sapiens_assembly38.dict + # + # There are two downsides to converting BED to .interval_list: + # 1. It needs the picard tool installed; + # 2. It needs an additional "SD" input, which would make the `downsample` method signature complicated. + def __init__(self, working_dir, callback: Callable[[str], dict]): super().__init__(working_dir, callback) @@ -86,22 +98,21 @@ def get_supported_file_types() -> List[str]: def downsample(self, input_filename: str, output_prefix: str, regions: List[Region]) -> dict: output_filename = self.get_output_filename(input_filename, output_prefix) # Note that this algorithm is not efficient. - with open(input_filename, "r") as input_file: - with open(output_filename, "w") as output_file: - for line in input_file: - if line.startswith("@"): - output_file.write(line) - else: - cols = line.rstrip().split() - chr, start, end = cols[0], int(cols[1]), int(cols[2]) - for region in regions: - if chr == region.chr and start <= region.end and end >= region.start: - output_file.write(line) + with open(input_filename, "r") as input_file, open(output_filename, "w") as output_file: + for line in input_file: + if line.startswith("@"): + output_file.write(line) + else: + cols = line.rstrip().split() + chr, start, end = cols[0], int(cols[1]), int(cols[2]) + for region in regions: + if chr == region.chr and max(start, region.start) < min(end, region.end): + output_file.write(line) return self.callback(output_filename) class PrimaryContigsDownsampler(BaseDownsampler): - def __init__(self, working_dir, callback: Callable[[str], dict], delimiter: str="\t"): + def __init__(self, working_dir, callback: Callable[[str], dict], delimiter: str = "\t"): super().__init__(working_dir, callback) self.delimiter = delimiter diff --git a/tests/utilities/generate_test_data.py b/tests/utilities/generate_test_data.py index 082e430e5..e5a93d2ec 100644 --- a/tests/utilities/generate_test_data.py +++ b/tests/utilities/generate_test_data.py @@ -82,7 +82,9 @@ def initialize_downsamplers(working_dir: str): handler.downsampler = handler.downsampler(working_dir, handler.callback) -def update_workflow_json(working_dir: str, input_filename: str, output_filename: str, output_filename_prefix: str, regions: List[Region], bucket_name: str=None, blob_name: str=None): +def update_workflow_json( + working_dir: str, input_filename: str, output_filename: str, output_filename_prefix: str, + regions: List[Region], bucket_name: str = None, blob_name: str = None): with open(input_filename, "r") as f: workflow_inputs = json.load(f) @@ -100,9 +102,7 @@ def update_workflow_json(working_dir: str, input_filename: str, output_filename: logging.info(f"Processing input {k}.") workflow_input_local_filename = Path(working_dir).joinpath(Path(v).name) localize_file(v, workflow_input_local_filename) - #downsampler = downsampler_factory.get_downsampler(Path(workflow_input_local_filename).suffix) updated_files = handler.downsampler.downsample(workflow_input_local_filename, output_filename_prefix, regions) - #updated_files = downsampler.downsample(workflow_input_local_filename, output_filename_prefix, regions, callback) if bucket_name is not None and blob_name is not None: for varname, filename in updated_files.items(): logging.info(f"Uploading downsampled file {filename} to bucket {bucket_name}.") From 6f6efc13b9a3fe3ce5047be912e23ac7db87096c Mon Sep 17 00:00:00 2001 From: VJalili Date: Thu, 4 Apr 2024 12:33:53 -0400 Subject: [PATCH 04/21] Add `melt_metrics_intervals` # since otherwise coverage, needed for running MELT, # is calculated incorrectly as it will calculate # across the entire genome. --- tests/utilities/generate_test_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/utilities/generate_test_data.py b/tests/utilities/generate_test_data.py index e5a93d2ec..3d6d546f9 100644 --- a/tests/utilities/generate_test_data.py +++ b/tests/utilities/generate_test_data.py @@ -34,7 +34,7 @@ class Handler: ), "preprocessed_intervals": Handler( downsamplers.IntervalListDownsampler, - lambda x: {"preprocessed_intervals": x} + lambda x: {"preprocessed_intervals": x, "melt_metrics_intervals": x} ), "sd_locs_vcf": Handler( downsamplers.VcfDownsampler, From 4642826641c5f0dd1c3073842ca62946f04fd427 Mon Sep 17 00:00:00 2001 From: VJalili Date: Fri, 5 Apr 2024 10:28:42 -0400 Subject: [PATCH 05/21] Add a downsampler for BED format. --- tests/utilities/downsamplers.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tests/utilities/downsamplers.py b/tests/utilities/downsamplers.py index 6cd2c24e3..761a69179 100644 --- a/tests/utilities/downsamplers.py +++ b/tests/utilities/downsamplers.py @@ -73,6 +73,7 @@ def downsample(self, input_filename: str, output_prefix: str, regions: List[Regi for region in regions: if record.contig == region.chr and region.start <= record.pos <= region.end: output_file.write(record) + break return self.callback(output_filename) @@ -108,6 +109,29 @@ def downsample(self, input_filename: str, output_prefix: str, regions: List[Regi for region in regions: if chr == region.chr and max(start, region.start) < min(end, region.end): output_file.write(line) + break + return self.callback(output_filename) + + +class BedDownsampler(BaseDownsampler): + def __init__(self, working_dir, callback: Callable[[str], dict]): + super().__init__(working_dir, callback) + + @staticmethod + def get_supported_file_types() -> List[str]: + return [".bed"] + + def downsample(self, input_filename: str, output_prefix: str, regions: List[Region]) -> dict: + output_filename = self.get_output_filename(input_filename, output_prefix) + # Note that this algorithm is not efficient. + with open(input_filename, "r") as input_file, open(output_filename, "w") as output_file: + for line in input_file: + cols = line.rstrip().split() + chr, start, end = cols[0], int(cols[1]), int(cols[2]) + for region in regions: + if chr == region.chr and max(start, region.start) < min(end, region.end): + output_file.write(line) + break return self.callback(output_filename) From 6138b442745c13c65f30fa345c7e1db8a1e3cf0d Mon Sep 17 00:00:00 2001 From: VJalili Date: Fri, 5 Apr 2024 10:29:18 -0400 Subject: [PATCH 06/21] Downsample wham_include_list_bed_file. --- tests/utilities/generate_test_data.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/utilities/generate_test_data.py b/tests/utilities/generate_test_data.py index 3d6d546f9..b527043dc 100644 --- a/tests/utilities/generate_test_data.py +++ b/tests/utilities/generate_test_data.py @@ -47,6 +47,10 @@ class Handler: "primary_contigs_fai": Handler( downsamplers.PrimaryContigsDownsampler, lambda x: {"primary_contigs_fai": x} + ), + "wham_include_list_bed_file": Handler( + downsamplers.BedDownsampler, + lambda x: {"wham_include_list_bed_file": x} ) } } From 94d7541ba50c0df7d4dde8928274135f41977364 Mon Sep 17 00:00:00 2001 From: VJalili Date: Fri, 5 Apr 2024 10:29:48 -0400 Subject: [PATCH 07/21] Add default downsampling regions file. --- tests/utilities/default_downsampling_regions.bed | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 tests/utilities/default_downsampling_regions.bed diff --git a/tests/utilities/default_downsampling_regions.bed b/tests/utilities/default_downsampling_regions.bed new file mode 100644 index 000000000..131ff12ec --- /dev/null +++ b/tests/utilities/default_downsampling_regions.bed @@ -0,0 +1,5 @@ +chr1 47000000 48000000 +chr4 156467305 156545919 +chr6 32000000 33000000 +chr16 21000000 23000000 +chrX 123409545 123667508 From 501681bd968743eeea847d071dc147bd1181555b Mon Sep 17 00:00:00 2001 From: VJalili Date: Fri, 5 Apr 2024 15:41:39 -0400 Subject: [PATCH 08/21] Implement both downsampler and converter & implement a base type that both derives from. --- tests/utilities/downsamplers.py | 95 ++++++++++++++++++++++----- tests/utilities/generate_test_data.py | 71 +++++++++++++------- 2 files changed, 125 insertions(+), 41 deletions(-) diff --git a/tests/utilities/downsamplers.py b/tests/utilities/downsamplers.py index 761a69179..c13308730 100644 --- a/tests/utilities/downsamplers.py +++ b/tests/utilities/downsamplers.py @@ -1,4 +1,6 @@ +import os import pysam +import subprocess from typing import Callable, List from dataclasses import dataclass @@ -12,15 +14,60 @@ class Region: end: int -class BaseDownsampler: +class BaseTransformer: def __init__(self, working_dir: str, callback: Callable[[str, ...], dict]): # Convert the string to an ABS path and make sure it exists. self.working_dir = Path(working_dir).resolve(strict=True) self.callback = callback + @staticmethod + def get_supported_file_types() -> List[str]: + """ + The file types should include the '.' prefix to match with Path().suffix output + (e.g., it should return '.cram' instead of 'cram'). + """ + raise NotImplementedError() + def get_output_filename(self, input_filename, output_prefix): return str(self.working_dir.joinpath(f"{output_prefix}{Path(input_filename).name}")) + +class BaseConverter(BaseTransformer): + def __init__(self, working_dir: str, callback: Callable[[str, ...], dict]): + super().__init__(working_dir, callback) + + @staticmethod + def get_supported_file_types() -> List[str]: + raise NotImplementedError() + + def convert(self, input_filename: str, output_prefix: str) -> dict: + raise NotImplementedError() + + +class BedToIntervalListConverter(BaseConverter): + def __init__(self, working_dir, callback: Callable[[str], dict], sequence_dict_filename: str, picard_path: str, **kwargs): + super().__init__(working_dir, callback) + self.sequence_dict_filename = sequence_dict_filename + self.picard_path = picard_path + + @staticmethod + def get_supported_file_types() -> List[str]: + return [".interval_list"] + + def convert(self, input_filename: str, output_prefix: str) -> dict: + output_filename = self.get_output_filename(input_filename, output_prefix) + subprocess.run( + ["java", "-jar", self.picard_path, "BedToIntervalList", + "-I", input_filename, "-O", output_filename, "-SD", self.sequence_dict_filename], + check=True) + return self.callback(output_filename) + + + +class BaseDownsampler(BaseTransformer): + def __init__(self, working_dir: str, callback: Callable[[str, ...], dict]): + super().__init__(working_dir, callback) + @staticmethod def get_supported_file_types() -> List[str]: """ @@ -34,30 +81,44 @@ def downsample(self, input_filename: str, output_prefix: str, regions: List[Regi class CramDownsampler(BaseDownsampler): - def __init__(self, working_dir, callback: Callable[[str, str], dict]): + def __init__(self, working_dir, callback: Callable[[str, str], dict], reference_fasta: str, reference_index: str, **kwargs): super().__init__(working_dir, callback) + self.reference_fasta = reference_fasta + self.reference_index = reference_index @staticmethod def get_supported_file_types() -> List[str]: return [".cram"] def downsample(self, input_filename: str, output_prefix: str, regions: List[Region]) -> dict: - output_filename = self.get_output_filename(input_filename, output_prefix) - with \ - pysam.AlignmentFile(input_filename, "rc") as input_cram_file, \ - pysam.AlignmentFile(output_filename, "wc", - header=input_cram_file.header, - reference_names=input_cram_file.references) as output_cram_file: + # Implementation notes: + # 1. This method calls `samtools` instead of using `pysam` since it needs to include + # distant pair-end reads in the downsampled regions (i.e., the `--fetch-pairs` flag). + # Such reads are needed for MELT to function properly. + # + # 2. The method writes target regions to a BED file. While taking the BED file containing + # the regions as input seems a better option, the current approach is implemented as + # taking a BED file as input instead of a list of regions would convolut the method signature. + + regions_filename = os.path.join(self.working_dir, "_tmp_regions.bed") + with open(regions_filename, "w") as f: for region in regions: - for read in input_cram_file.fetch(region=f"{region.chr}:{region.start}-{region.end}"): - output_cram_file.write(read) - index_filename = f"{output_filename}.crai" - pysam.index(output_filename, index_filename) - return self.callback(output_filename, index_filename) + f.write("\t".join([str(region.chr), str(region.start), str(region.end)]) + "\n") + + output_filename = self.get_output_filename(input_filename, output_prefix) + subprocess.run( + ["samtools", "view", input_filename, "--reference", self.reference_fasta, + "--targets-file", regions_filename, "--output", output_filename, "--cram", "--fetch-pairs"] + ) + + subprocess.run(["samtools", "index", output_filename]) + + os.remove(regions_filename) + return self.callback(output_filename, output_filename + ".fai") class VcfDownsampler(BaseDownsampler): - def __init__(self, working_dir, callback: Callable[[str], dict]): + def __init__(self, working_dir, callback: Callable[[str], dict], **kwargs): super().__init__(working_dir, callback) @staticmethod @@ -89,7 +150,7 @@ class IntervalListDownsampler(BaseDownsampler): # 1. It needs the picard tool installed; # 2. It needs an additional "SD" input, which would make the `downsample` method signature complicated. - def __init__(self, working_dir, callback: Callable[[str], dict]): + def __init__(self, working_dir, callback: Callable[[str], dict], **kwargs): super().__init__(working_dir, callback) @staticmethod @@ -114,7 +175,7 @@ def downsample(self, input_filename: str, output_prefix: str, regions: List[Regi class BedDownsampler(BaseDownsampler): - def __init__(self, working_dir, callback: Callable[[str], dict]): + def __init__(self, working_dir, callback: Callable[[str], dict], **kwargs): super().__init__(working_dir, callback) @staticmethod @@ -136,7 +197,7 @@ def downsample(self, input_filename: str, output_prefix: str, regions: List[Regi class PrimaryContigsDownsampler(BaseDownsampler): - def __init__(self, working_dir, callback: Callable[[str], dict], delimiter: str = "\t"): + def __init__(self, working_dir, callback: Callable[[str], dict], delimiter: str = "\t", **kwargs): super().__init__(working_dir, callback) self.delimiter = delimiter diff --git a/tests/utilities/generate_test_data.py b/tests/utilities/generate_test_data.py index b527043dc..853d9949f 100644 --- a/tests/utilities/generate_test_data.py +++ b/tests/utilities/generate_test_data.py @@ -22,7 +22,7 @@ class Region: @dataclass class Handler: - downsampler: Union[downsamplers.BaseDownsampler, Type[downsamplers.BaseDownsampler]] + transformer: Union[downsamplers.BaseTransformer, Type[downsamplers.BaseTransformer]] callback: Callable[[str, ...], dict] @@ -32,26 +32,26 @@ class Handler: downsamplers.CramDownsampler, lambda cram, index: {"bam_or_cram_file": cram, "bam_or_cram_index": index} ), - "preprocessed_intervals": Handler( - downsamplers.IntervalListDownsampler, - lambda x: {"preprocessed_intervals": x, "melt_metrics_intervals": x} - ), - "sd_locs_vcf": Handler( - downsamplers.VcfDownsampler, - lambda x: {"sd_locs_vcf": x} - ), - "primary_contigs_list": Handler( - downsamplers.PrimaryContigsDownsampler, - lambda x: {"primary_contigs_list": x} - ), - "primary_contigs_fai": Handler( - downsamplers.PrimaryContigsDownsampler, - lambda x: {"primary_contigs_fai": x} - ), - "wham_include_list_bed_file": Handler( - downsamplers.BedDownsampler, - lambda x: {"wham_include_list_bed_file": x} - ) + # "preprocessed_intervals": Handler( + # downsamplers.BedToIntervalListConverter, + # lambda x: {"preprocessed_intervals": x, "melt_metrics_intervals": x} + # ), + # "sd_locs_vcf": Handler( + # downsamplers.VcfDownsampler, + # lambda x: {"sd_locs_vcf": x} + # ), + # "primary_contigs_list": Handler( + # downsamplers.PrimaryContigsDownsampler, + # lambda x: {"primary_contigs_list": x} + # ), + # "primary_contigs_fai": Handler( + # downsamplers.PrimaryContigsDownsampler, + # lambda x: {"primary_contigs_fai": x} + # ), + # "wham_include_list_bed_file": Handler( + # downsamplers.BedDownsampler, + # lambda x: {"wham_include_list_bed_file": x} + # ) } } @@ -83,7 +83,17 @@ def localize_file(input_filename, output_filename): def initialize_downsamplers(working_dir: str): for _, inputs in SUBJECT_WORKFLOW_INPUTS.items(): for _, handler in inputs.items(): - handler.downsampler = handler.downsampler(working_dir, handler.callback) + # if type(handler.transformer) == type(downsamplers.BedToIntervalListConverter): + # handler.transformer = handler.transformer(working_dir, handler.callback, ) + # exit() + handler.transformer = handler.transformer( + working_dir=working_dir, + callback=handler.callback, + reference_fasta="gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.fasta", + reference_index="gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.fasta.fai", + sequence_dict_filename="gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.dict", + picard_path="/Users/jvahid/code/picard.jar" + ) def update_workflow_json( @@ -106,7 +116,7 @@ def update_workflow_json( logging.info(f"Processing input {k}.") workflow_input_local_filename = Path(working_dir).joinpath(Path(v).name) localize_file(v, workflow_input_local_filename) - updated_files = handler.downsampler.downsample(workflow_input_local_filename, output_filename_prefix, regions) + updated_files = handler.transformer.downsample(workflow_input_local_filename, output_filename_prefix, regions) if bucket_name is not None and blob_name is not None: for varname, filename in updated_files.items(): logging.info(f"Uploading downsampled file {filename} to bucket {bucket_name}.") @@ -140,7 +150,8 @@ def main(): "In addition to other inputs, the script takes a JSON file containing the inputs to a workflow, " "downsamples the inputs according to the defined rules (see `SUBJECT_WORKFLOW_INPUTS`), " "pushes the downsampled files to a given cloud storage, and creates a new JSON " - "file with the updated downsampled inputs.", + "file with the updated downsampled inputs." + "This script needs samtools version 1.19.2 or newer installed and added to PATH.", formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument( @@ -161,6 +172,11 @@ def main(): "name as the input JSON with an added prefix." ) + parser.add_argument( + "picard_path", + help="Sets the absolute path, including the filename, to `picard.jar`." + ) + parser.add_argument( "--output-filename-prefix", default="downsampled_", @@ -191,6 +207,13 @@ def main(): args = parser.parse_args() + # sd = "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.dict" + # sd_out = "./tmp_homo.dict" + # localize_file(sd, sd_out) + # test = downsamplers.BedToIntervallistConverter(".", lambda x: {}, sd_out, "/Users/jvahid/code/picard.jar") + # test.convert("default_downsampling_regions.bed", "tmp_tmp_") + # + regions = parse_target_regions(args.target_regions) logging.info(f"Found {len(regions)} target regions for downsampling.") From 36b64dd6beaadffac9d057be0bd0476367f24650 Mon Sep 17 00:00:00 2001 From: VJalili Date: Fri, 5 Apr 2024 15:44:23 -0400 Subject: [PATCH 09/21] refactor downsamplers to transformers. --- tests/utilities/generate_test_data.py | 60 ++++++++----------- .../{downsamplers.py => transformers.py} | 0 2 files changed, 25 insertions(+), 35 deletions(-) rename tests/utilities/{downsamplers.py => transformers.py} (100%) diff --git a/tests/utilities/generate_test_data.py b/tests/utilities/generate_test_data.py index 853d9949f..0c6990772 100644 --- a/tests/utilities/generate_test_data.py +++ b/tests/utilities/generate_test_data.py @@ -1,5 +1,5 @@ import argparse -import downsamplers +import transformers import json import logging import os @@ -22,36 +22,36 @@ class Region: @dataclass class Handler: - transformer: Union[downsamplers.BaseTransformer, Type[downsamplers.BaseTransformer]] + transformer: Union[transformers.BaseTransformer, Type[transformers.BaseTransformer]] callback: Callable[[str, ...], dict] SUBJECT_WORKFLOW_INPUTS = { "GatherSampleEvidence": { "bam_or_cram_file": Handler( - downsamplers.CramDownsampler, + transformers.CramDownsampler, lambda cram, index: {"bam_or_cram_file": cram, "bam_or_cram_index": index} ), - # "preprocessed_intervals": Handler( - # downsamplers.BedToIntervalListConverter, - # lambda x: {"preprocessed_intervals": x, "melt_metrics_intervals": x} - # ), - # "sd_locs_vcf": Handler( - # downsamplers.VcfDownsampler, - # lambda x: {"sd_locs_vcf": x} - # ), - # "primary_contigs_list": Handler( - # downsamplers.PrimaryContigsDownsampler, - # lambda x: {"primary_contigs_list": x} - # ), - # "primary_contigs_fai": Handler( - # downsamplers.PrimaryContigsDownsampler, - # lambda x: {"primary_contigs_fai": x} - # ), - # "wham_include_list_bed_file": Handler( - # downsamplers.BedDownsampler, - # lambda x: {"wham_include_list_bed_file": x} - # ) + "preprocessed_intervals": Handler( + transformers.BedToIntervalListConverter, + lambda x: {"preprocessed_intervals": x, "melt_metrics_intervals": x} + ), + "sd_locs_vcf": Handler( + transformers.VcfDownsampler, + lambda x: {"sd_locs_vcf": x} + ), + "primary_contigs_list": Handler( + transformers.PrimaryContigsDownsampler, + lambda x: {"primary_contigs_list": x} + ), + "primary_contigs_fai": Handler( + transformers.PrimaryContigsDownsampler, + lambda x: {"primary_contigs_fai": x} + ), + "wham_include_list_bed_file": Handler( + transformers.BedDownsampler, + lambda x: {"wham_include_list_bed_file": x} + ) } } @@ -80,12 +80,9 @@ def localize_file(input_filename, output_filename): raise NotImplementedError() -def initialize_downsamplers(working_dir: str): +def initialize_transformers(working_dir: str): for _, inputs in SUBJECT_WORKFLOW_INPUTS.items(): for _, handler in inputs.items(): - # if type(handler.transformer) == type(downsamplers.BedToIntervalListConverter): - # handler.transformer = handler.transformer(working_dir, handler.callback, ) - # exit() handler.transformer = handler.transformer( working_dir=working_dir, callback=handler.callback, @@ -207,13 +204,6 @@ def main(): args = parser.parse_args() - # sd = "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.dict" - # sd_out = "./tmp_homo.dict" - # localize_file(sd, sd_out) - # test = downsamplers.BedToIntervallistConverter(".", lambda x: {}, sd_out, "/Users/jvahid/code/picard.jar") - # test.convert("default_downsampling_regions.bed", "tmp_tmp_") - # - regions = parse_target_regions(args.target_regions) logging.info(f"Found {len(regions)} target regions for downsampling.") @@ -224,7 +214,7 @@ def main(): f"{args.output_filename_prefix}{Path(args.input_workflow_json).name}" ) - initialize_downsamplers(args.working_dir) + initialize_transformers(args.working_dir) update_workflow_json( working_dir=args.working_dir, diff --git a/tests/utilities/downsamplers.py b/tests/utilities/transformers.py similarity index 100% rename from tests/utilities/downsamplers.py rename to tests/utilities/transformers.py From 0d3cefbd7c3364cc8d763e7ccdd49f4bfe9df41d Mon Sep 17 00:00:00 2001 From: VJalili Date: Wed, 10 Apr 2024 14:14:38 -0400 Subject: [PATCH 10/21] Reimplement downsampling cram files to correctly fetch pairs. --- tests/utilities/transformers.py | 61 ++++++++++++++++++++++----------- 1 file changed, 41 insertions(+), 20 deletions(-) diff --git a/tests/utilities/transformers.py b/tests/utilities/transformers.py index c13308730..46d4e96e6 100644 --- a/tests/utilities/transformers.py +++ b/tests/utilities/transformers.py @@ -2,6 +2,7 @@ import pysam import subprocess +from collections import defaultdict from typing import Callable, List from dataclasses import dataclass from pathlib import Path @@ -91,30 +92,50 @@ def get_supported_file_types() -> List[str]: return [".cram"] def downsample(self, input_filename: str, output_prefix: str, regions: List[Region]) -> dict: - # Implementation notes: - # 1. This method calls `samtools` instead of using `pysam` since it needs to include - # distant pair-end reads in the downsampled regions (i.e., the `--fetch-pairs` flag). - # Such reads are needed for MELT to function properly. - # - # 2. The method writes target regions to a BED file. While taking the BED file containing - # the regions as input seems a better option, the current approach is implemented as - # taking a BED file as input instead of a list of regions would convolut the method signature. - - regions_filename = os.path.join(self.working_dir, "_tmp_regions.bed") - with open(regions_filename, "w") as f: - for region in regions: - f.write("\t".join([str(region.chr), str(region.start), str(region.end)]) + "\n") + output_filename_unsorted = self.get_output_filename(input_filename, f"unsorted_{output_prefix}") + with pysam.AlignmentFile(input_filename, "rc") as cram: + header = cram.header + references = cram.references + with pysam.AlignmentFile(output_filename_unsorted, "wc",header=header,reference_names=references) as output_cram_file: + for region in regions: + for r1, r2 in self.read_pair_generator(input_filename, + region=f"{region.chr}:{region.start}-{region.end}"): + output_cram_file.write(r1) + output_cram_file.write(r2) output_filename = self.get_output_filename(input_filename, output_prefix) - subprocess.run( - ["samtools", "view", input_filename, "--reference", self.reference_fasta, - "--targets-file", regions_filename, "--output", output_filename, "--cram", "--fetch-pairs"] - ) + pysam.sort("-o", output_filename, output_filename_unsorted) + os.remove(output_filename_unsorted) + index_filename = f"{output_filename}.crai" + pysam.index(output_filename, index_filename) + return self.callback(output_filename, index_filename) + + @staticmethod + def read_pair_generator(filename, region=None): + """ + Generate read pairs in a BAM file or within a region string. + Reads are added to read_dict until a pair is found. - subprocess.run(["samtools", "index", output_filename]) + See the following Q&A for details: https://www.biostars.org/p/306041/#332022 + """ + read_dict = defaultdict(lambda: [None, None]) + with pysam.AlignmentFile(filename, "rc") as cram: + for read in cram.fetch(region=region): + if not read.is_proper_pair or read.is_secondary or read.is_supplementary: + continue + q_name = read.query_name + if q_name not in read_dict: + if read.is_read1: + read_dict[q_name][0] = read + else: + read_dict[q_name][1] = read + else: + if read.is_read1: + yield read, read_dict[q_name][1] + else: + yield read_dict[q_name][0], read + del read_dict[q_name] - os.remove(regions_filename) - return self.callback(output_filename, output_filename + ".fai") class VcfDownsampler(BaseDownsampler): From da08f68633c0752e3af4d7d85da4a89fbb7cb592 Mon Sep 17 00:00:00 2001 From: VJalili Date: Wed, 10 Apr 2024 14:43:54 -0400 Subject: [PATCH 11/21] Rework the interface so that all the methods implement transform method. --- tests/utilities/generate_test_data.py | 7 ++++++- tests/utilities/transformers.py | 21 +++++++++++---------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/tests/utilities/generate_test_data.py b/tests/utilities/generate_test_data.py index 0c6990772..bac8eda0e 100644 --- a/tests/utilities/generate_test_data.py +++ b/tests/utilities/generate_test_data.py @@ -113,7 +113,12 @@ def update_workflow_json( logging.info(f"Processing input {k}.") workflow_input_local_filename = Path(working_dir).joinpath(Path(v).name) localize_file(v, workflow_input_local_filename) - updated_files = handler.transformer.downsample(workflow_input_local_filename, output_filename_prefix, regions) + updated_files = handler.transformer.transform( + input_filename=workflow_input_local_filename, + output_prefix=output_filename_prefix, + regions=regions + ) + if bucket_name is not None and blob_name is not None: for varname, filename in updated_files.items(): logging.info(f"Uploading downsampled file {filename} to bucket {bucket_name}.") diff --git a/tests/utilities/transformers.py b/tests/utilities/transformers.py index 46d4e96e6..1b9faa26e 100644 --- a/tests/utilities/transformers.py +++ b/tests/utilities/transformers.py @@ -32,6 +32,9 @@ def get_supported_file_types() -> List[str]: def get_output_filename(self, input_filename, output_prefix): return str(self.working_dir.joinpath(f"{output_prefix}{Path(input_filename).name}")) + def transform(self, input_filename: str, output_prefix: str, **kwargs): + raise NotImplementedError() + class BaseConverter(BaseTransformer): def __init__(self, working_dir: str, callback: Callable[[str, ...], dict]): @@ -41,7 +44,7 @@ def __init__(self, working_dir: str, callback: Callable[[str, ...], dict]): def get_supported_file_types() -> List[str]: raise NotImplementedError() - def convert(self, input_filename: str, output_prefix: str) -> dict: + def transform(self, input_filename: str, output_prefix: str, **kwargs) -> dict: raise NotImplementedError() @@ -55,7 +58,7 @@ def __init__(self, working_dir, callback: Callable[[str], dict], sequence_dict_f def get_supported_file_types() -> List[str]: return [".interval_list"] - def convert(self, input_filename: str, output_prefix: str) -> dict: + def transform(self, input_filename: str, output_prefix: str, **kwargs) -> dict: output_filename = self.get_output_filename(input_filename, output_prefix) subprocess.run( ["java", "-jar", self.picard_path, "BedToIntervalList", @@ -64,7 +67,6 @@ def convert(self, input_filename: str, output_prefix: str) -> dict: return self.callback(output_filename) - class BaseDownsampler(BaseTransformer): def __init__(self, working_dir: str, callback: Callable[[str, ...], dict]): super().__init__(working_dir, callback) @@ -77,7 +79,7 @@ def get_supported_file_types() -> List[str]: """ raise NotImplementedError() - def downsample(self, input_filename: str, output_prefix: str, regions: List[Region]) -> dict: + def transform(self, input_filename: str, output_prefix: str, regions: List[Region], **kwargs) -> dict: raise NotImplementedError() @@ -91,7 +93,7 @@ def __init__(self, working_dir, callback: Callable[[str, str], dict], reference_ def get_supported_file_types() -> List[str]: return [".cram"] - def downsample(self, input_filename: str, output_prefix: str, regions: List[Region]) -> dict: + def transform(self, input_filename: str, output_prefix: str, regions: List[Region], **kwargs) -> dict: output_filename_unsorted = self.get_output_filename(input_filename, f"unsorted_{output_prefix}") with pysam.AlignmentFile(input_filename, "rc") as cram: header = cram.header @@ -137,7 +139,6 @@ def read_pair_generator(filename, region=None): del read_dict[q_name] - class VcfDownsampler(BaseDownsampler): def __init__(self, working_dir, callback: Callable[[str], dict], **kwargs): super().__init__(working_dir, callback) @@ -146,7 +147,7 @@ def __init__(self, working_dir, callback: Callable[[str], dict], **kwargs): def get_supported_file_types() -> List[str]: return [".vcf"] - def downsample(self, input_filename: str, output_prefix: str, regions: List[Region]) -> dict: + def transform(self, input_filename: str, output_prefix: str, regions: List[Region], **kwargs) -> dict: output_filename = self.get_output_filename(input_filename, output_prefix) with \ pysam.VariantFile(input_filename) as input_file, \ @@ -178,7 +179,7 @@ def __init__(self, working_dir, callback: Callable[[str], dict], **kwargs): def get_supported_file_types() -> List[str]: return [".interval_list"] - def downsample(self, input_filename: str, output_prefix: str, regions: List[Region]) -> dict: + def transform(self, input_filename: str, output_prefix: str, regions: List[Region], **kwargs) -> dict: output_filename = self.get_output_filename(input_filename, output_prefix) # Note that this algorithm is not efficient. with open(input_filename, "r") as input_file, open(output_filename, "w") as output_file: @@ -203,7 +204,7 @@ def __init__(self, working_dir, callback: Callable[[str], dict], **kwargs): def get_supported_file_types() -> List[str]: return [".bed"] - def downsample(self, input_filename: str, output_prefix: str, regions: List[Region]) -> dict: + def transform(self, input_filename: str, output_prefix: str, regions: List[Region], **kwargs) -> dict: output_filename = self.get_output_filename(input_filename, output_prefix) # Note that this algorithm is not efficient. with open(input_filename, "r") as input_file, open(output_filename, "w") as output_file: @@ -226,7 +227,7 @@ def __init__(self, working_dir, callback: Callable[[str], dict], delimiter: str def get_supported_file_types() -> List[str]: return [] - def downsample(self, input_filename: str, output_prefix: str, regions: List[Region]) -> dict: + def transform(self, input_filename: str, output_prefix: str, regions: List[Region], **kwargs) -> dict: output_filename = self.get_output_filename(input_filename, output_prefix) include_chrs = set([r.chr for r in regions]) with open(input_filename, "r") as input_file, open(output_filename, "w") as output_file: From 6384cd66724dc6d51ec2e2c278e1ff6c74021974 Mon Sep 17 00:00:00 2001 From: VJalili Date: Tue, 16 Apr 2024 12:56:44 -0400 Subject: [PATCH 12/21] Simplify the interface by removing BaseDownsampler & BaseConverter. --- tests/utilities/transformers.py | 55 +++++---------------------------- 1 file changed, 8 insertions(+), 47 deletions(-) diff --git a/tests/utilities/transformers.py b/tests/utilities/transformers.py index 1b9faa26e..efcfcceec 100644 --- a/tests/utilities/transformers.py +++ b/tests/utilities/transformers.py @@ -32,23 +32,11 @@ def get_supported_file_types() -> List[str]: def get_output_filename(self, input_filename, output_prefix): return str(self.working_dir.joinpath(f"{output_prefix}{Path(input_filename).name}")) - def transform(self, input_filename: str, output_prefix: str, **kwargs): + def transform(self, input_filename: str, output_prefix: str, regions: List[Region], **kwargs): raise NotImplementedError() -class BaseConverter(BaseTransformer): - def __init__(self, working_dir: str, callback: Callable[[str, ...], dict]): - super().__init__(working_dir, callback) - - @staticmethod - def get_supported_file_types() -> List[str]: - raise NotImplementedError() - - def transform(self, input_filename: str, output_prefix: str, **kwargs) -> dict: - raise NotImplementedError() - - -class BedToIntervalListConverter(BaseConverter): +class BedToIntervalListConverter(BaseTransformer): def __init__(self, working_dir, callback: Callable[[str], dict], sequence_dict_filename: str, picard_path: str, **kwargs): super().__init__(working_dir, callback) self.sequence_dict_filename = sequence_dict_filename @@ -58,7 +46,7 @@ def __init__(self, working_dir, callback: Callable[[str], dict], sequence_dict_f def get_supported_file_types() -> List[str]: return [".interval_list"] - def transform(self, input_filename: str, output_prefix: str, **kwargs) -> dict: + def transform(self, input_filename: str, output_prefix: str, regions: List[Region], **kwargs) -> dict: output_filename = self.get_output_filename(input_filename, output_prefix) subprocess.run( ["java", "-jar", self.picard_path, "BedToIntervalList", @@ -67,23 +55,7 @@ def transform(self, input_filename: str, output_prefix: str, **kwargs) -> dict: return self.callback(output_filename) -class BaseDownsampler(BaseTransformer): - def __init__(self, working_dir: str, callback: Callable[[str, ...], dict]): - super().__init__(working_dir, callback) - - @staticmethod - def get_supported_file_types() -> List[str]: - """ - The file types should include the '.' prefix to match with Path().suffix output - (e.g., it should return '.cram' instead of 'cram'). - """ - raise NotImplementedError() - - def transform(self, input_filename: str, output_prefix: str, regions: List[Region], **kwargs) -> dict: - raise NotImplementedError() - - -class CramDownsampler(BaseDownsampler): +class CramDownsampler(BaseTransformer): def __init__(self, working_dir, callback: Callable[[str, str], dict], reference_fasta: str, reference_index: str, **kwargs): super().__init__(working_dir, callback) self.reference_fasta = reference_fasta @@ -139,7 +111,7 @@ def read_pair_generator(filename, region=None): del read_dict[q_name] -class VcfDownsampler(BaseDownsampler): +class VcfDownsampler(BaseTransformer): def __init__(self, working_dir, callback: Callable[[str], dict], **kwargs): super().__init__(working_dir, callback) @@ -160,18 +132,7 @@ def transform(self, input_filename: str, output_prefix: str, regions: List[Regio return self.callback(output_filename) -class IntervalListDownsampler(BaseDownsampler): - # Implementation note: - # An alternative to the down sampling approach implemented here is to take - # a BED file containing target regions as input, and convert the BED to .interval_list - # as the following. - # - # > java -jar picard.jar BedToIntervalList I=regions.bed O=regions.interval_list SD=/Homo_sapiens_assembly38.dict - # - # There are two downsides to converting BED to .interval_list: - # 1. It needs the picard tool installed; - # 2. It needs an additional "SD" input, which would make the `downsample` method signature complicated. - +class IntervalListDownsampler(BaseTransformer): def __init__(self, working_dir, callback: Callable[[str], dict], **kwargs): super().__init__(working_dir, callback) @@ -196,7 +157,7 @@ def transform(self, input_filename: str, output_prefix: str, regions: List[Regio return self.callback(output_filename) -class BedDownsampler(BaseDownsampler): +class BedDownsampler(BaseTransformer): def __init__(self, working_dir, callback: Callable[[str], dict], **kwargs): super().__init__(working_dir, callback) @@ -218,7 +179,7 @@ def transform(self, input_filename: str, output_prefix: str, regions: List[Regio return self.callback(output_filename) -class PrimaryContigsDownsampler(BaseDownsampler): +class PrimaryContigsDownsampler(BaseTransformer): def __init__(self, working_dir, callback: Callable[[str], dict], delimiter: str = "\t", **kwargs): super().__init__(working_dir, callback) self.delimiter = delimiter From 7a0dd8027d3f909b366c5599a020ae0a6774619e Mon Sep 17 00:00:00 2001 From: VJalili Date: Tue, 16 Apr 2024 12:58:51 -0400 Subject: [PATCH 13/21] Convert serialized regions to .intervallist instead of using the existing file given in the json. --- tests/utilities/transformers.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/tests/utilities/transformers.py b/tests/utilities/transformers.py index efcfcceec..d802e529d 100644 --- a/tests/utilities/transformers.py +++ b/tests/utilities/transformers.py @@ -1,6 +1,7 @@ import os import pysam import subprocess +import uuid from collections import defaultdict from typing import Callable, List @@ -14,6 +15,16 @@ class Region: start: int end: int + @staticmethod + def to_file(working_dir, regions): + filename = str(uuid.uuid4()) + filename = os.path.join(working_dir, filename + ".bed") + with open(filename, "w") as regions_file: + for r in regions: + regions_file.write("\t".join([str(r.chr), str(r.start), str(r.end)]) + "\n") + return filename + + class BaseTransformer: def __init__(self, working_dir: str, callback: Callable[[str, ...], dict]): @@ -48,10 +59,14 @@ def get_supported_file_types() -> List[str]: def transform(self, input_filename: str, output_prefix: str, regions: List[Region], **kwargs) -> dict: output_filename = self.get_output_filename(input_filename, output_prefix) + regions_filename = Region.to_file(self.working_dir, regions) + subprocess.run( ["java", "-jar", self.picard_path, "BedToIntervalList", - "-I", input_filename, "-O", output_filename, "-SD", self.sequence_dict_filename], + "-I", regions_filename, "-O", output_filename, "-SD", self.sequence_dict_filename], check=True) + + os.remove(regions_filename) return self.callback(output_filename) From 0ca6a91ff4e21a5a15fc502d3935159e47bc329f Mon Sep 17 00:00:00 2001 From: VJalili Date: Tue, 16 Apr 2024 13:56:13 -0400 Subject: [PATCH 14/21] Replace hardcoded config with cli args, & ensure reference dict file is localized. --- tests/utilities/generate_test_data.py | 44 ++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 7 deletions(-) diff --git a/tests/utilities/generate_test_data.py b/tests/utilities/generate_test_data.py index bac8eda0e..8a8a311d1 100644 --- a/tests/utilities/generate_test_data.py +++ b/tests/utilities/generate_test_data.py @@ -71,6 +71,7 @@ def parse_target_regions(input_filename): def localize_file(input_filename, output_filename): if os.path.isfile(output_filename): + logging.info(f"File {input_filename} exists locally, skipping localization.") return if input_filename.startswith("gs://"): logging.info(f"Localizing from GCP; blob: {input_filename} ...") @@ -80,16 +81,21 @@ def localize_file(input_filename, output_filename): raise NotImplementedError() -def initialize_transformers(working_dir: str): +def initialize_transformers( + working_dir: str, + reference_fasta: str, + reference_index: str, + sequence_dict_filename: str, + picard_path: str): for _, inputs in SUBJECT_WORKFLOW_INPUTS.items(): for _, handler in inputs.items(): handler.transformer = handler.transformer( working_dir=working_dir, callback=handler.callback, - reference_fasta="gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.fasta", - reference_index="gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.fasta.fai", - sequence_dict_filename="gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.dict", - picard_path="/Users/jvahid/code/picard.jar" + reference_fasta=reference_fasta, + reference_index=reference_index, + sequence_dict_filename=sequence_dict_filename, + picard_path=picard_path ) @@ -176,7 +182,8 @@ def main(): parser.add_argument( "picard_path", - help="Sets the absolute path, including the filename, to `picard.jar`." + help="Sets the absolute path to `picard.jar`." + "You may download picard.jar from `https://github.com/broadinstitute/picard/releases`." ) parser.add_argument( @@ -207,6 +214,24 @@ def main(): "such that the downsampled files contains data from the input files overlapping these regions." ) + parser.add_argument( + "--reference-fasta", + default="gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.fasta", + help="Set the path to reference fasta file. " + ) + + parser.add_argument( + "--reference-fasta-index", + default="gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.fasta.fai", + help="Set the path to index file of the reference fasta file. " + ) + + parser.add_argument( + "--reference-dict", + default="gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.dict", + help="Set the path to the reference dictionary file." + ) + args = parser.parse_args() regions = parse_target_regions(args.target_regions) @@ -219,7 +244,12 @@ def main(): f"{args.output_filename_prefix}{Path(args.input_workflow_json).name}" ) - initialize_transformers(args.working_dir) + Path(args.working_dir).mkdir(parents=True, exist_ok=True) + + sequence_dict_local_filename = Path(args.working_dir).joinpath(Path(args.reference_dict).name) + localize_file(args.reference_dict, sequence_dict_local_filename) + initialize_transformers(args.working_dir, args.reference_fasta, args.reference_fasta_index, + sequence_dict_local_filename, args.picard_path) update_workflow_json( working_dir=args.working_dir, From 2ed9f83709d4bac1a6b570017eeac454c4d20a5a Mon Sep 17 00:00:00 2001 From: VJalili Date: Tue, 16 Apr 2024 13:59:45 -0400 Subject: [PATCH 15/21] fix lint --- tests/utilities/transformers.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/utilities/transformers.py b/tests/utilities/transformers.py index d802e529d..ba60ede9e 100644 --- a/tests/utilities/transformers.py +++ b/tests/utilities/transformers.py @@ -25,7 +25,6 @@ def to_file(working_dir, regions): return filename - class BaseTransformer: def __init__(self, working_dir: str, callback: Callable[[str, ...], dict]): # Convert the string to an ABS path and make sure it exists. @@ -86,7 +85,9 @@ def transform(self, input_filename: str, output_prefix: str, regions: List[Regio header = cram.header references = cram.references - with pysam.AlignmentFile(output_filename_unsorted, "wc",header=header,reference_names=references) as output_cram_file: + with pysam.AlignmentFile(output_filename_unsorted, "wc", + header=header, + reference_names=references) as output_cram_file: for region in regions: for r1, r2 in self.read_pair_generator(input_filename, region=f"{region.chr}:{region.start}-{region.end}"): From 61149daa3530806f74e4c6dd033217fedd163b95 Mon Sep 17 00:00:00 2001 From: VJalili Date: Fri, 19 Apr 2024 13:23:31 -0400 Subject: [PATCH 16/21] Update downsampling cram files to perform a second pass on the cram to linearly search for distant pairs. --- tests/utilities/transformers.py | 70 +++++++++++++++++++++++++-------- 1 file changed, 54 insertions(+), 16 deletions(-) diff --git a/tests/utilities/transformers.py b/tests/utilities/transformers.py index ba60ede9e..56614e97f 100644 --- a/tests/utilities/transformers.py +++ b/tests/utilities/transformers.py @@ -7,6 +7,7 @@ from typing import Callable, List from dataclasses import dataclass from pathlib import Path +from tqdm import tqdm @dataclass @@ -85,14 +86,27 @@ def transform(self, input_filename: str, output_prefix: str, regions: List[Regio header = cram.header references = cram.references - with pysam.AlignmentFile(output_filename_unsorted, "wc", - header=header, - reference_names=references) as output_cram_file: + reads_for_second_pass = {} + with pysam.AlignmentFile(output_filename_unsorted, "wc", header=header, reference_names=references) as output_cram_file: for region in regions: - for r1, r2 in self.read_pair_generator(input_filename, - region=f"{region.chr}:{region.start}-{region.end}"): - output_cram_file.write(r1) - output_cram_file.write(r2) + generator = self.read_pair_generator(input_filename, region=f"{region.chr}:{region.start}-{region.end}") + try: + while True: + r1, r2 = next(generator) + output_cram_file.write(r1) + output_cram_file.write(r2) + except StopIteration as e: + reads_for_second_pass = reads_for_second_pass | e.value + + for r1, r2 in self.get_distant_read_pairs(input_filename, reads_for_second_pass): + output_cram_file.write(r1) + output_cram_file.write(r2) + + + # for r1, r2 in self.read_pair_generator(input_filename, region=f"{region.chr}:{region.start}-{region.end}"): + # output_cram_file.write(r1) + # output_cram_file.write(r2) + output_filename = self.get_output_filename(input_filename, output_prefix) pysam.sort("-o", output_filename, output_filename_unsorted) os.remove(output_filename_unsorted) @@ -103,22 +117,25 @@ def transform(self, input_filename: str, output_prefix: str, regions: List[Regio @staticmethod def read_pair_generator(filename, region=None): """ - Generate read pairs in a BAM file or within a region string. - Reads are added to read_dict until a pair is found. + Generate read pairs in a CRAM file. + If the `region` string is provided, it only generates the reads overlapping this region. + - See the following Q&A for details: https://www.biostars.org/p/306041/#332022 """ - read_dict = defaultdict(lambda: [None, None]) + read_dict = {} + secondary_dict = {} with pysam.AlignmentFile(filename, "rc") as cram: for read in cram.fetch(region=region): + q_name = read.query_name if not read.is_proper_pair or read.is_secondary or read.is_supplementary: + # Maybe it would be better to exclude such reads + if q_name not in secondary_dict: + secondary_dict[q_name] = [None, None] + secondary_dict[q_name][0 if read.is_read1 else 1] = read continue - q_name = read.query_name if q_name not in read_dict: - if read.is_read1: - read_dict[q_name][0] = read - else: - read_dict[q_name][1] = read + read_dict[q_name] = [None, None] + read_dict[q_name][0 if read.is_read1 else 1] = read else: if read.is_read1: yield read, read_dict[q_name][1] @@ -126,6 +143,27 @@ def read_pair_generator(filename, region=None): yield read_dict[q_name][0], read del read_dict[q_name] + return read_dict #| secondary_dict + + @staticmethod + def get_distant_read_pairs(filename, reads): + with pysam.AlignmentFile(filename, "rc") as cram: + for read in tqdm(cram.fetch(), desc="Iterating on reads", unit=" read", dynamic_ncols=True, + bar_format="{desc}: {n:,} [{elapsed}] {rate_fmt}"): + query_name = read.query_name + pair = reads.get(query_name) + if pair is not None: + if (read.is_read1 and pair[0] is not None) or (not read.is_read1 and pair[1] is not None): + # It is the same read as the one seen before. + continue + if read.is_read1: + yield read, pair[1] + else: + yield pair[0], read + del reads[query_name] + print(f"\n\n\n size at the end: {len(reads)}") + + class VcfDownsampler(BaseTransformer): def __init__(self, working_dir, callback: Callable[[str], dict], **kwargs): From 26e03330686bac1d696c0ce8f14b562b70c84c6f Mon Sep 17 00:00:00 2001 From: VJalili Date: Thu, 25 Apr 2024 18:35:27 -0400 Subject: [PATCH 17/21] Add an option for excluding some keys. --- tests/utilities/generate_test_data.py | 29 +++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/tests/utilities/generate_test_data.py b/tests/utilities/generate_test_data.py index 8a8a311d1..225745931 100644 --- a/tests/utilities/generate_test_data.py +++ b/tests/utilities/generate_test_data.py @@ -56,6 +56,15 @@ class Handler: } +WORKFLOW_INPUTS_TO_DROP = { + "GatherSampleEvidence": [ + "melt_docker", + "melt_metrics_intervals", + "melt_standard_vcf_header" + ] +} + + def parse_target_regions(input_filename): regions = [] with open(input_filename, "r") as f: @@ -105,7 +114,9 @@ def update_workflow_json( with open(input_filename, "r") as f: workflow_inputs = json.load(f) - for k, v in dict(workflow_inputs).items(): + updated_workflow_inputs = {} + + for k, v in workflow_inputs.items(): # Example of the following split: # k="a.b.c" --> workflow_name="a.b", input_var="c" workflow_name, input_var = k.rsplit(".", maxsplit=1) @@ -113,7 +124,11 @@ def update_workflow_json( try: handler = SUBJECT_WORKFLOW_INPUTS[workflow_name][input_var] except KeyError: - # This workflow input is not set to be downsampled. + if workflow_name in WORKFLOW_INPUTS_TO_DROP and input_var in WORKFLOW_INPUTS_TO_DROP[workflow_name]: + logging.info(f"Dropping {k}.") + else: + updated_workflow_inputs[k] = v + logging.info(f"Leaving {k} unchanged.") continue logging.info(f"Processing input {k}.") @@ -125,15 +140,17 @@ def update_workflow_json( regions=regions ) - if bucket_name is not None and blob_name is not None: - for varname, filename in updated_files.items(): + for varname, filename in updated_files.items(): + input_key = f"{workflow_name}.{varname}" + updated_workflow_inputs[input_key] = filename + if bucket_name is not None and blob_name is not None: logging.info(f"Uploading downsampled file {filename} to bucket {bucket_name}.") blob = upload_to_gs_blob(filename, bucket_name, blob_name) logging.info(f"Finished uploading {filename}.") - workflow_inputs[f"{workflow_name}.{varname}"] = blob + updated_workflow_inputs[input_key] = blob logging.info(f"Creating output JSON {output_filename}.") with open(output_filename, "w") as f: - json.dump(workflow_inputs, f, indent=4) + json.dump(updated_workflow_inputs, f, indent=4) logging.info(f"Finished creating output JSON {output_filename}.") From 23ad622e220051eb487198054f3c741970dff548 Mon Sep 17 00:00:00 2001 From: VJalili Date: Thu, 25 Apr 2024 18:36:27 -0400 Subject: [PATCH 18/21] Add an option to disable searching for discordant pair reads. --- tests/utilities/transformers.py | 40 +++++++++++++++------------------ 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/tests/utilities/transformers.py b/tests/utilities/transformers.py index 56614e97f..951534a99 100644 --- a/tests/utilities/transformers.py +++ b/tests/utilities/transformers.py @@ -1,15 +1,18 @@ +import logging import os import pysam import subprocess import uuid -from collections import defaultdict from typing import Callable, List from dataclasses import dataclass from pathlib import Path from tqdm import tqdm +logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO) + + @dataclass class Region: chr: str @@ -81,6 +84,7 @@ def get_supported_file_types() -> List[str]: return [".cram"] def transform(self, input_filename: str, output_prefix: str, regions: List[Region], **kwargs) -> dict: + include_discordant_reads = kwargs.get("include_discordant_reads", False) output_filename_unsorted = self.get_output_filename(input_filename, f"unsorted_{output_prefix}") with pysam.AlignmentFile(input_filename, "rc") as cram: header = cram.header @@ -98,14 +102,10 @@ def transform(self, input_filename: str, output_prefix: str, regions: List[Regio except StopIteration as e: reads_for_second_pass = reads_for_second_pass | e.value - for r1, r2 in self.get_distant_read_pairs(input_filename, reads_for_second_pass): - output_cram_file.write(r1) - output_cram_file.write(r2) - - - # for r1, r2 in self.read_pair_generator(input_filename, region=f"{region.chr}:{region.start}-{region.end}"): - # output_cram_file.write(r1) - # output_cram_file.write(r2) + if include_discordant_reads: + for r1, r2 in self.get_discordant_reads(input_filename, reads_for_second_pass): + output_cram_file.write(r1) + output_cram_file.write(r2) output_filename = self.get_output_filename(input_filename, output_prefix) pysam.sort("-o", output_filename, output_filename_unsorted) @@ -115,23 +115,16 @@ def transform(self, input_filename: str, output_prefix: str, regions: List[Regio return self.callback(output_filename, index_filename) @staticmethod - def read_pair_generator(filename, region=None): + def read_pair_generator(filename: str, region: str = None): """ Generate read pairs in a CRAM file. If the `region` string is provided, it only generates the reads overlapping this region. - - """ read_dict = {} - secondary_dict = {} with pysam.AlignmentFile(filename, "rc") as cram: for read in cram.fetch(region=region): q_name = read.query_name if not read.is_proper_pair or read.is_secondary or read.is_supplementary: - # Maybe it would be better to exclude such reads - if q_name not in secondary_dict: - secondary_dict[q_name] = [None, None] - secondary_dict[q_name][0 if read.is_read1 else 1] = read continue if q_name not in read_dict: read_dict[q_name] = [None, None] @@ -143,15 +136,16 @@ def read_pair_generator(filename, region=None): yield read_dict[q_name][0], read del read_dict[q_name] - return read_dict #| secondary_dict + return read_dict @staticmethod - def get_distant_read_pairs(filename, reads): + def get_discordant_reads(filename: str, discordant_reads: dict): + logging.info(f"Linearly scanning {filename} for discordant pairs of {len(discordant_reads)} reads.") with pysam.AlignmentFile(filename, "rc") as cram: for read in tqdm(cram.fetch(), desc="Iterating on reads", unit=" read", dynamic_ncols=True, bar_format="{desc}: {n:,} [{elapsed}] {rate_fmt}"): query_name = read.query_name - pair = reads.get(query_name) + pair = discordant_reads.get(query_name) if pair is not None: if (read.is_read1 and pair[0] is not None) or (not read.is_read1 and pair[1] is not None): # It is the same read as the one seen before. @@ -160,8 +154,10 @@ def get_distant_read_pairs(filename, reads): yield read, pair[1] else: yield pair[0], read - del reads[query_name] - print(f"\n\n\n size at the end: {len(reads)}") + del discordant_reads[query_name] + if len(discordant_reads) > 0: + logging.warning(f"Did not find discordant pairs for {len(read)} reads.") + logging.info(f"Finished linear scanning.") From 068fe850cfe210c1de7cff2b96ce624946383751 Mon Sep 17 00:00:00 2001 From: VJalili Date: Thu, 25 Apr 2024 18:36:52 -0400 Subject: [PATCH 19/21] Update default downsampling regions. --- tests/utilities/default_downsampling_regions.bed | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/utilities/default_downsampling_regions.bed b/tests/utilities/default_downsampling_regions.bed index 131ff12ec..2a9e1ae3c 100644 --- a/tests/utilities/default_downsampling_regions.bed +++ b/tests/utilities/default_downsampling_regions.bed @@ -1,4 +1,6 @@ +chr1 42000000 44000000 chr1 47000000 48000000 +chr1 49000000 51000000 chr4 156467305 156545919 chr6 32000000 33000000 chr16 21000000 23000000 From 26cae8f0886f6a5e34732a185f177bd9b1113bcc Mon Sep 17 00:00:00 2001 From: VJalili Date: Thu, 25 Apr 2024 19:21:27 -0400 Subject: [PATCH 20/21] Remove some sites from the default downsampling regions. --- tests/utilities/default_downsampling_regions.bed | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/utilities/default_downsampling_regions.bed b/tests/utilities/default_downsampling_regions.bed index 2a9e1ae3c..0a62a2a4c 100644 --- a/tests/utilities/default_downsampling_regions.bed +++ b/tests/utilities/default_downsampling_regions.bed @@ -1,6 +1,4 @@ chr1 42000000 44000000 -chr1 47000000 48000000 -chr1 49000000 51000000 chr4 156467305 156545919 chr6 32000000 33000000 chr16 21000000 23000000 From d9bd795ad550a7d1e3b733a78693a934288c5445 Mon Sep 17 00:00:00 2001 From: VJalili Date: Thu, 25 Apr 2024 19:23:11 -0400 Subject: [PATCH 21/21] Fix lint errors. --- tests/utilities/transformers.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/utilities/transformers.py b/tests/utilities/transformers.py index 951534a99..9b52c8b3a 100644 --- a/tests/utilities/transformers.py +++ b/tests/utilities/transformers.py @@ -157,8 +157,7 @@ def get_discordant_reads(filename: str, discordant_reads: dict): del discordant_reads[query_name] if len(discordant_reads) > 0: logging.warning(f"Did not find discordant pairs for {len(read)} reads.") - logging.info(f"Finished linear scanning.") - + logging.info("Finished linear scanning.") class VcfDownsampler(BaseTransformer):