From 5cff5adbfc40cf179f79438fc68865b27147586a Mon Sep 17 00:00:00 2001 From: Mark Walker Date: Mon, 26 Jul 2021 18:56:33 -0400 Subject: [PATCH] Allow missing vcf samples or gvcfs in Module00c (#207) --- input_values/dockers.json | 2 +- .../scripts/Filegenerate/generate_baf.py | 27 +++-- wdl/BAFFromGVCFs.wdl | 113 +++++++----------- wdl/BAFFromShardedVCF.wdl | 67 +++++------ wdl/Module00c.wdl | 13 +- 5 files changed, 105 insertions(+), 117 deletions(-) diff --git a/input_values/dockers.json b/input_values/dockers.json index 5cd40118e..6ec418ab2 100644 --- a/input_values/dockers.json +++ b/input_values/dockers.json @@ -13,7 +13,7 @@ "sv_base_docker" : "us.gcr.io/broad-dsde-methods/vjalili/sv-base:lint-24b9cda", "sv_base_mini_docker" : "us.gcr.io/broad-dsde-methods/vjalili/sv-base-mini:lint-24b9cda", "sv_pipeline_base_docker" : "us.gcr.io/broad-dsde-methods/vjalili/sv-pipeline-base:lint-24b9cda", - "sv_pipeline_docker" : "us.gcr.io/broad-dsde-methods/vjalili/sv-pipeline:lint-24b9cda", + "sv_pipeline_docker" : "us.gcr.io/broad-dsde-methods/markw/sv-pipeline:mw-missing-baf-f14df3b", "sv_pipeline_qc_docker" : "us.gcr.io/broad-dsde-methods/vjalili/sv-pipeline-qc:lint-24b9cda", "sv_pipeline_rdtest_docker" : "us.gcr.io/broad-dsde-methods/vjalili/sv-pipeline-rdtest:lint-24b9cda", "wham_docker" : "us.gcr.io/broad-dsde-methods/wham:8645aa", diff --git a/src/sv-pipeline/02_evidence_assessment/02d_baftest/scripts/Filegenerate/generate_baf.py b/src/sv-pipeline/02_evidence_assessment/02d_baftest/scripts/Filegenerate/generate_baf.py index 7402f5a81..fcad2aa1c 100644 --- a/src/sv-pipeline/02_evidence_assessment/02d_baftest/scripts/Filegenerate/generate_baf.py +++ b/src/sv-pipeline/02_evidence_assessment/02d_baftest/scripts/Filegenerate/generate_baf.py @@ -3,7 +3,6 @@ import argparse import numpy as np import pysam -import sys def filter_record(record, unfiltered=False): @@ -17,7 +16,7 @@ def filter_record(record, unfiltered=False): Parameters ---------- - records : iterator of pysam.VariantRecords + record : Object of pysam.VariantRecords Returns ------ @@ -41,7 +40,7 @@ def filter_record(record, unfiltered=False): return False -def calc_BAF(record, samples=None): +def calc_BAF(record, samples): """ Parameters @@ -71,10 +70,7 @@ def _calc_BAF(sample): else: return np.nan - if samples is None: - samples = record.samples.keys() - - bafs = np.array([_calc_BAF(sample) for sample in samples], dtype=np.float) + bafs = np.array([_calc_BAF(sample) for sample in samples], dtype=float) return bafs, samples @@ -116,14 +112,25 @@ def main(): parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) - parser.add_argument('--samples-list', default=None) + parser.add_argument('vcf') + parser.add_argument('--samples-list') parser.add_argument('--unfiltered', action='store_true') + parser.add_argument('--ignore-missing-vcf-samples', action='store_true') args = parser.parse_args() - vcf = pysam.VariantFile(sys.stdin) + vcf = pysam.VariantFile(args.vcf) + vcf_samples = vcf.header.samples if args.samples_list is not None: samples_list = read_samples_list(args.samples_list) + samples_list_intersection = set(samples_list).intersection(set(vcf_samples)) + if args.ignore_missing_vcf_samples: + samples_list = [s for s in samples_list if s in samples_list_intersection] # Preserves order + elif len(samples_list) > len(samples_list_intersection): + missing_samples = set(samples_list) - samples_list_intersection + raise ValueError("VCF is missing samples in the samples list. Use --ignore-missing-vcf-samples to bypass " + "this error. Samples: {}".format(", ".join(missing_samples))) else: - samples_list = None + samples_list = vcf_samples + # While loop to iterate over all records, then break if reach the end for record in vcf: if not filter_record(record, args.unfiltered): diff --git a/wdl/BAFFromGVCFs.wdl b/wdl/BAFFromGVCFs.wdl index f646f4dc5..f265bb67b 100644 --- a/wdl/BAFFromGVCFs.wdl +++ b/wdl/BAFFromGVCFs.wdl @@ -6,8 +6,9 @@ import "Structs.wdl" workflow BAFFromGVCFs { input { - Array[File] gvcfs + Array[File?] gvcfs Array[String] samples + Boolean ignore_missing_gvcfs File unpadded_intervals_file File dbsnp_vcf File? dbsnp_vcf_index @@ -26,7 +27,6 @@ workflow BAFFromGVCFs { } Int num_of_original_intervals = length(read_lines(unpadded_intervals_file)) - Int num_gvcfs = length(gvcfs) # Make a 2.5:1 interval number to samples in callset ratio interval list Int possible_merge_count = floor(num_of_original_intervals / num_gvcfs / 2.5) @@ -34,9 +34,17 @@ workflow BAFFromGVCFs { File dbsnp_vcf_index_ = if defined(dbsnp_vcf_index) then select_first([dbsnp_vcf_index]) else dbsnp_vcf + ".idx" - scatter (gvcf in gvcfs) { - File gvcf_indexes = gvcf + ".tbi" + scatter (i in range(length(samples))) { + if (defined(gvcfs[i])) { + String defined_samples_optional_ = samples[i] + File defined_gvcfs_optional_ = select_first([gvcfs[i]]) + File gvcf_indexes_optional_ = select_first([gvcfs[i]]) + ".tbi" + } } + Array[String] defined_samples_ = select_all(defined_samples_optional_) + Array[File] defined_gvcfs_ = select_all(defined_gvcfs_optional_) + Array[File] gvcf_indexes_ = select_all(gvcf_indexes_optional_) + Int num_gvcfs = length(defined_gvcfs_) call DynamicallyCombineIntervals { input: @@ -47,14 +55,14 @@ workflow BAFFromGVCFs { Array[String] unpadded_intervals = read_lines(DynamicallyCombineIntervals.output_intervals) - Int disk_size_gb = 10 + ceil((size(gvcfs, "GB") + size(gvcf_indexes, "GB")) * 1.5) + Int disk_size_gb = 10 + ceil((size(defined_gvcfs_, "GB") + size(gvcf_indexes_, "GB")) * 1.5) scatter (idx in range(length(unpadded_intervals))) { call ImportGVCFs { input: - sample_names = samples, - input_gvcfs = gvcfs, - input_gvcfs_indices = gvcf_indexes, + sample_names = defined_samples_, + input_gvcfs = defined_gvcfs_, + input_gvcfs_indices = gvcf_indexes_, interval = unpadded_intervals[idx], workspace_dir_name = "genomicsdb", disk_size = disk_size_gb, @@ -82,7 +90,8 @@ workflow BAFFromGVCFs { call GenerateBAF { input: vcf = GenotypeGVCFs.output_vcf, - vcf_index = GenotypeGVCFs.output_vcf_index, + samples = samples, + ignore_missing_vcf_samples = ignore_missing_gvcfs, batch = batch, shard = "~{idx}", sv_pipeline_docker = sv_pipeline_docker, @@ -92,8 +101,8 @@ workflow BAFFromGVCFs { call MergeEvidenceFiles { input: - files = GenerateBAF.BAF, - indexes = GenerateBAF.BAF_idx, + files = GenerateBAF.out, + indexes = GenerateBAF.out_index, batch = batch, evidence = "BAF", inclusion_bed = inclusion_bed, @@ -312,84 +321,50 @@ task ImportGVCFs { task GenerateBAF { input { File vcf - File vcf_index + File? vcf_header + Array[String] samples + Boolean ignore_missing_vcf_samples String batch String shard String sv_pipeline_docker RuntimeAttr? runtime_attr_override } - RuntimeAttr default_attr = object { - cpu_cores: 1, - mem_gb: 3.75, - disk_gb: 10, - boot_disk_gb: 10, - preemptible_tries: 3, - max_retries: 1 - } - RuntimeAttr runtime_attr = select_first([runtime_attr_override, default_attr]) - - output { - File BAF = "~{batch}.BAF.shard-~{shard}.txt.gz" - File BAF_idx = "~{batch}.BAF.shard-~{shard}.txt.gz.tbi" - } - command <<< - - set -euo pipefail - bcftools view -M2 -v snps ~{vcf} \ - | python /opt/sv-pipeline/02_evidence_assessment/02d_baftest/scripts/Filegenerate/generate_baf.py --unfiltered \ - | bgzip -c \ - > ~{batch}.BAF.shard-~{shard}.txt.gz - tabix -f -s1 -b 2 -e 2 ~{batch}.BAF.shard-~{shard}.txt.gz - - >>> - runtime { - cpu: select_first([runtime_attr.cpu_cores, default_attr.cpu_cores]) - memory: select_first([runtime_attr.mem_gb, default_attr.mem_gb]) + " GiB" - disks: "local-disk " + select_first([runtime_attr.disk_gb, default_attr.disk_gb]) + " HDD" - bootDiskSizeGb: select_first([runtime_attr.boot_disk_gb, default_attr.boot_disk_gb]) - docker: sv_pipeline_docker - preemptible: select_first([runtime_attr.preemptible_tries, default_attr.preemptible_tries]) - maxRetries: select_first([runtime_attr.max_retries, default_attr.max_retries]) - } - -} - -task GatherBAF { - input { - String batch - Array[File] BAF - String sv_base_mini_docker - RuntimeAttr? runtime_attr_override - } + String ignore_missing_vcf_samples_flag = if (ignore_missing_vcf_samples) then "--ignore-missing-vcf-samples" else "" RuntimeAttr default_attr = object { - cpu_cores: 1, - mem_gb: 3.75, - disk_gb: 100, - boot_disk_gb: 10, - preemptible_tries: 3, - max_retries: 1 - } + cpu_cores: 1, + mem_gb: 3.75, + disk_gb: 10, + boot_disk_gb: 10, + preemptible_tries: 3, + max_retries: 1 + } RuntimeAttr runtime_attr = select_first([runtime_attr_override, default_attr]) output { - File out = "~{batch}.BAF.txt.gz" - File out_index = "~{batch}.BAF.txt.gz.tbi" + File out = "~{batch}.~{shard}.txt.gz" + File out_index = "~{batch}.~{shard}.txt.gz.tbi" } command <<< set -euo pipefail - cat ~{sep=" " BAF} | bgzip -c > ~{batch}.BAF.txt.gz - tabix -f -s1 -b 2 -e 2 ~{batch}.BAF.txt.gz - + python /opt/sv-pipeline/02_evidence_assessment/02d_baftest/scripts/Filegenerate/generate_baf.py \ + --unfiltered \ + --samples-list ~{write_lines(samples)} \ + ~{ignore_missing_vcf_samples_flag} \ + ~{if defined(vcf_header) then "<(cat ~{vcf_header} ~{vcf})" else vcf} \ + | bgzip \ + > ~{batch}.~{shard}.txt.gz + + tabix -s1 -b2 -e2 ~{batch}.~{shard}.txt.gz >>> runtime { cpu: select_first([runtime_attr.cpu_cores, default_attr.cpu_cores]) memory: select_first([runtime_attr.mem_gb, default_attr.mem_gb]) + " GiB" disks: "local-disk " + select_first([runtime_attr.disk_gb, default_attr.disk_gb]) + " HDD" bootDiskSizeGb: select_first([runtime_attr.boot_disk_gb, default_attr.boot_disk_gb]) - docker: sv_base_mini_docker + docker: sv_pipeline_docker preemptible: select_first([runtime_attr.preemptible_tries, default_attr.preemptible_tries]) maxRetries: select_first([runtime_attr.max_retries, default_attr.max_retries]) } @@ -436,7 +411,7 @@ task MergeEvidenceFiles { mkdir tmp sort -m -k1,1V -k2,2n -T tmp data/*.txt | bgzip -c > ~{batch}.~{evidence}.txt.gz - tabix -f -s1 -b2 -e2 ~{batch}.~{evidence}.txt.gz + tabix -s1 -b2 -e2 ~{batch}.~{evidence}.txt.gz >>> runtime { diff --git a/wdl/BAFFromShardedVCF.wdl b/wdl/BAFFromShardedVCF.wdl index e2fd01b03..bac33bc2a 100644 --- a/wdl/BAFFromShardedVCF.wdl +++ b/wdl/BAFFromShardedVCF.wdl @@ -10,6 +10,7 @@ workflow BAFFromShardedVCF { Array[File] vcfs File? vcf_header # If provided, added to the beginning of each VCF Array[String] samples # Can be a subset of samples in the VCF + Boolean ignore_missing_vcf_samples String batch String sv_base_mini_docker String sv_pipeline_docker @@ -19,11 +20,12 @@ workflow BAFFromShardedVCF { } scatter (idx in range(length(vcfs))) { - call GenerateBAF { + call baf.GenerateBAF { input: vcf = vcfs[idx], vcf_header = vcf_header, samples = samples, + ignore_missing_vcf_samples = ignore_missing_vcf_samples, batch = batch, shard = "~{idx}", sv_pipeline_docker = sv_pipeline_docker, @@ -31,10 +33,10 @@ workflow BAFFromShardedVCF { } } - call baf.GatherBAF { + call GatherBAF { input: batch = batch, - BAF = GenerateBAF.BAF, + BAF = GenerateBAF.out, sv_base_mini_docker = sv_base_mini_docker, runtime_attr_override = runtime_attr_gather } @@ -55,21 +57,20 @@ workflow BAFFromShardedVCF { } } -task GenerateBAF { +task ScatterBAFBySample { input { - File vcf - File? vcf_header - Array[String] samples - String batch - String shard - String sv_pipeline_docker + File BAF + String sample + String sv_base_mini_docker RuntimeAttr? runtime_attr_override } + Int vm_disk_size = ceil(size(BAF, "GB") + 10) + RuntimeAttr default_attr = object { cpu_cores: 1, - mem_gb: 3.75, - disk_gb: 10, + mem_gb: 1, + disk_gb: vm_disk_size, boot_disk_gb: 10, preemptible_tries: 3, max_retries: 1 @@ -77,15 +78,14 @@ task GenerateBAF { RuntimeAttr runtime_attr = select_first([runtime_attr_override, default_attr]) output { - File BAF = "BAF.~{batch}.shard-~{shard}.txt" + File out = "BAF.~{sample}.txt.gz" + File out_index = "BAF.~{sample}.txt.gz.tbi" } command <<< set -euo pipefail - bcftools view -M2 -v snps ~{if defined(vcf_header) then "<(cat ~{vcf_header} ~{vcf})" else vcf} \ - | python /opt/sv-pipeline/02_evidence_assessment/02d_baftest/scripts/Filegenerate/generate_baf.py \ - --unfiltered --samples-list ~{write_lines(samples)} \ - > BAF.~{batch}.shard-~{shard}.txt + zcat ~{BAF} | awk -F "\t" -v OFS="\t" '{if ($4=="~{sample}") print}' | bgzip -c > BAF.~{sample}.txt.gz + tabix -s 1 -b 2 -e 2 BAF.~{sample}.txt.gz >>> runtime { @@ -93,42 +93,39 @@ task GenerateBAF { memory: select_first([runtime_attr.mem_gb, default_attr.mem_gb]) + " GiB" disks: "local-disk " + select_first([runtime_attr.disk_gb, default_attr.disk_gb]) + " HDD" bootDiskSizeGb: select_first([runtime_attr.boot_disk_gb, default_attr.boot_disk_gb]) - docker: sv_pipeline_docker + docker: sv_base_mini_docker preemptible: select_first([runtime_attr.preemptible_tries, default_attr.preemptible_tries]) maxRetries: select_first([runtime_attr.max_retries, default_attr.max_retries]) } - } -task ScatterBAFBySample { +task GatherBAF { input { - File BAF - String sample + String batch + Array[File] BAF String sv_base_mini_docker RuntimeAttr? runtime_attr_override } - Int vm_disk_size = ceil(size(BAF, "GB") + 10) - RuntimeAttr default_attr = object { - cpu_cores: 1, - mem_gb: 1, - disk_gb: vm_disk_size, - boot_disk_gb: 10, - preemptible_tries: 3, - max_retries: 1 - } + cpu_cores: 1, + mem_gb: 3.75, + disk_gb: 100, + boot_disk_gb: 10, + preemptible_tries: 3, + max_retries: 1 + } RuntimeAttr runtime_attr = select_first([runtime_attr_override, default_attr]) output { - File out = "BAF.~{sample}.txt.gz" - File out_index = "BAF.~{sample}.txt.gz.tbi" + File out = "~{batch}.BAF.txt.gz" + File out_index = "~{batch}.BAF.txt.gz.tbi" } command <<< set -euo pipefail - zcat ~{BAF} | awk -F "\t" -v OFS="\t" '{if ($4=="~{sample}") print}' | bgzip -c > BAF.~{sample}.txt.gz - tabix -s 1 -b 2 -e 2 BAF.~{sample}.txt.gz + cat ~{sep=" " BAF} > ~{batch}.BAF.txt.gz + tabix -s1 -b2 -e2 ~{batch}.BAF.txt.gz >>> runtime { diff --git a/wdl/Module00c.wdl b/wdl/Module00c.wdl index 4f33ff733..90a374bca 100644 --- a/wdl/Module00c.wdl +++ b/wdl/Module00c.wdl @@ -54,10 +54,14 @@ workflow Module00c { # BAF generation # Required for cohorts if BAF_files not provided + # Note: pipeline output is not sensitive to having some samples (~1%) missing BAF + # Only set true if some samples are missing from the VCF or some gVCFs are not available + Boolean? ignore_missing_baf_samples - # BAF Option #1, GVCFs - Array[File]? gvcfs + # BAF Option #1, gVCFs + # Missing gVCFs may be "null" (without quotes in the input json) + Array[File?]? gvcfs File? unpadded_intervals_file File? dbsnp_vcf File? dbsnp_vcf_index @@ -246,11 +250,14 @@ workflow Module00c { } } + Boolean ignore_missing_baf_samples_ = if defined(ignore_missing_baf_samples) then select_first([ignore_missing_baf_samples]) else false + if (!defined(BAF_files) && defined(gvcfs)) { call baf.BAFFromGVCFs { input: gvcfs = select_first([gvcfs]), samples = samples, + ignore_missing_gvcfs = ignore_missing_baf_samples_, unpadded_intervals_file = select_first([unpadded_intervals_file]), dbsnp_vcf = select_first([dbsnp_vcf]), dbsnp_vcf_index = dbsnp_vcf_index, @@ -269,6 +276,7 @@ workflow Module00c { } } + if (!defined(BAF_files) && !defined(gvcfs) && (defined(snp_vcfs) || defined(snp_vcfs_shard_list))) { Array[File] snp_vcfs_ = if (defined(snp_vcfs)) then select_first([snp_vcfs]) else read_lines(select_first([snp_vcfs_shard_list])) call sbaf.BAFFromShardedVCF { @@ -276,6 +284,7 @@ workflow Module00c { vcfs = snp_vcfs_, vcf_header = snp_vcf_header, samples = select_first([vcf_samples, samples]), + ignore_missing_vcf_samples = ignore_missing_baf_samples_, batch = batch, sv_base_mini_docker = sv_base_mini_docker, sv_pipeline_docker = sv_pipeline_docker,