diff --git a/inputs/values/dockers.json b/inputs/values/dockers.json index 8448074b6..8d20e0d3e 100644 --- a/inputs/values/dockers.json +++ b/inputs/values/dockers.json @@ -33,4 +33,4 @@ "sv_utils_docker": "us.gcr.io/broad-dsde-methods/gatk-sv/sv-utils:2023-02-01-v0.26.8-beta-9b25c72d", "gq_recalibrator_docker": "us.gcr.io/broad-dsde-methods/tbrookin/gatk:0a7e1d86f", "str": "us.gcr.io/broad-dsde-methods/vjalili/str:5994670" -} \ No newline at end of file +} diff --git a/src/sv-pipeline/scripts/downstream_analysis_and_filtering/sum_svcounts_perSample.R b/src/sv-pipeline/scripts/downstream_analysis_and_filtering/sum_svcounts_perSample.R index dfe02f91a..8989a6b2d 100755 --- a/src/sv-pipeline/scripts/downstream_analysis_and_filtering/sum_svcounts_perSample.R +++ b/src/sv-pipeline/scripts/downstream_analysis_and_filtering/sum_svcounts_perSample.R @@ -4,26 +4,32 @@ # in final_outlier_sample_filter.wdl -###Set master parameters & read arguments -options(stringsAsFactors=F,scipen=1000) +### Set parameters & read arguments +options(stringsAsFactors=F, scipen=1000) args <- commandArgs(trailingOnly=TRUE) INFILE <- args[1] OUTFILE <- args[2] -###Read input data & reformat -dat <- read.table(INFILE,header=F) -colnames(dat) <- c("sample","svtype","count","chrom") +### Read input data & reformat +dat <- read.table(INFILE, header=F, check.names=F, sep="\t", comment.char="") +drop.rows <- which(dat[, 1] %in% c("sample", "#sample")) +if(length(drop.rows) > 0){ + dat <- dat[-drop.rows, ] +} +colnames(dat) <- c("sample", "svtype", "count", "chrom")[1:ncol(dat)] +dat$count <- as.numeric(dat$count) samples <- as.character(unique(dat$sample)) svtypes <- as.character(unique(dat$svtype)) -###Get sum of counts per sample per svtype -summed.res <- do.call("rbind", lapply(samples,function(sample){ - return(do.call("rbind", lapply(svtypes,function(svtype){ - return(data.frame("sample"=sample, +### Get sum of counts per sample per svtype +summed.res <- do.call("rbind", lapply(samples, function(sample){ + do.call("rbind", lapply(svtypes, function(svtype){ + data.frame("sample"=sample, "svtype"=svtype, - "count"=sum(dat[which(dat$sample==sample & dat$svtype==svtype),]$count,na.rm=T))) - }))) + "count"=sum(dat[which(dat$sample==sample & dat$svtype==svtype), ]$count, + na.rm=T)) + })) })) -###Write summed results to outfile -write.table(summed.res,OUTFILE,col.names=T,row.names=F,sep="\t",quote=F) +### Write summed results to outfile +write.table(summed.res, OUTFILE, col.names=T, row.names=F, sep="\t", quote=F) diff --git a/wdl/FilterOutlierSamples.wdl b/wdl/FilterOutlierSamples.wdl index df423292e..3c54fce5d 100644 --- a/wdl/FilterOutlierSamples.wdl +++ b/wdl/FilterOutlierSamples.wdl @@ -4,54 +4,81 @@ import "Structs.wdl" import "Utils.wdl" as util import "IdentifyOutlierSamples.wdl" as identify_outliers -# Filter outlier samples by IQR or cutoff table for a single VCF. Recommended to run PlotSVCountsPerSample first to choose cutoff +# Filter outlier samples by IQR or cutoff table for one or more VCFs +# Recommended to run PlotSVCountsPerSample first to choose cutoff workflow FilterOutlierSamples { input { String name # batch or cohort - File vcf + Array[File] vcfs File? sv_counts # SV counts file from PlotSVCountsPerSample - if not provided, will create Int N_IQR_cutoff File? outlier_cutoff_table String? vcf_identifier # required (enter algorithm here) if providing outlier_cutoff_table, otherwise used in some file prefixes + String? bcftools_preprocessing_options + Boolean plot_counts = false + Array[String]? sample_subset_prefixes # if provided, will identify outliers separately within each subset + Array[String]? sample_subset_lists # if provided, will identify outliers separately within each subset + Int samples_per_shard = 5000 String sv_pipeline_docker String sv_base_mini_docker String linux_docker + RuntimeAttr? runtime_override_preprocess_vcf RuntimeAttr? runtime_attr_identify_outliers RuntimeAttr? runtime_attr_subset_vcf RuntimeAttr? runtime_attr_cat_outliers + RuntimeAttr? runtime_attr_subset_counts RuntimeAttr? runtime_attr_filter_samples RuntimeAttr? runtime_attr_ids_from_vcf RuntimeAttr? runtime_attr_count_svs + RuntimeAttr? runtime_attr_combine_counts + RuntimeAttr? runtime_attr_plot_svcounts + } + + if (defined(sample_subset_prefixes) && defined(sample_subset_lists)) { + Array[Pair[String, File]]? sample_subsets = zip(select_first([sample_subset_prefixes]), + select_first([sample_subset_lists])) } call identify_outliers.IdentifyOutlierSamples { input: - vcf = vcf, + vcfs = vcfs, name = name, sv_counts = sv_counts, N_IQR_cutoff = N_IQR_cutoff, outlier_cutoff_table = outlier_cutoff_table, vcf_identifier = vcf_identifier, + bcftools_preprocessing_options = bcftools_preprocessing_options, + plot_counts = plot_counts, + sample_subsets = sample_subsets, + samples_per_shard = samples_per_shard, sv_pipeline_docker = sv_pipeline_docker, + sv_base_mini_docker = sv_base_mini_docker, linux_docker = linux_docker, + runtime_attr_ids_from_vcf = runtime_attr_ids_from_vcf, + runtime_override_preprocess_vcf = runtime_override_preprocess_vcf, runtime_attr_identify_outliers = runtime_attr_identify_outliers, runtime_attr_cat_outliers = runtime_attr_cat_outliers, - runtime_attr_count_svs = runtime_attr_count_svs + runtime_attr_subset_counts = runtime_attr_subset_counts, + runtime_attr_count_svs = runtime_attr_count_svs, + runtime_attr_combine_counts = runtime_attr_combine_counts, + runtime_attr_plot_svcounts = runtime_attr_plot_svcounts } - call util.SubsetVcfBySamplesList { - input: - vcf = vcf, - list_of_samples = IdentifyOutlierSamples.outlier_samples_file, - outfile_name = "${name}.outliers_removed.vcf.gz", - remove_samples = true, - sv_base_mini_docker = sv_base_mini_docker, - runtime_attr_override = runtime_attr_subset_vcf + scatter ( vcf in vcfs ) { + call util.SubsetVcfBySamplesList { + input: + vcf = vcf, + list_of_samples = IdentifyOutlierSamples.outlier_samples_file, + outfile_name = basename(vcf, ".vcf.gz") + ".${name}.outliers_removed.vcf.gz", + remove_samples = true, + sv_base_mini_docker = sv_base_mini_docker, + runtime_attr_override = runtime_attr_subset_vcf + } } call util.GetSampleIdsFromVcf { input: - vcf = vcf, + vcf = vcfs[0], sv_base_mini_docker = sv_base_mini_docker, runtime_attr_override = runtime_attr_ids_from_vcf } @@ -67,7 +94,8 @@ workflow FilterOutlierSamples { } output { - File outlier_filtered_vcf = SubsetVcfBySamplesList.vcf_subset + Array[File] outlier_filtered_vcfs = SubsetVcfBySamplesList.vcf_subset + Array[File] outlier_filtered_vcf_idxs = SubsetVcfBySamplesList.vcf_subset_index Array[String] filtered_samples_list = FilterSampleList.filtered_samples_list File filtered_samples_file = FilterSampleList.filtered_samples_file Array[String] outlier_samples_excluded = IdentifyOutlierSamples.outlier_samples_list diff --git a/wdl/IdentifyOutlierSamples.wdl b/wdl/IdentifyOutlierSamples.wdl index 462c79746..273670023 100644 --- a/wdl/IdentifyOutlierSamples.wdl +++ b/wdl/IdentifyOutlierSamples.wdl @@ -2,39 +2,117 @@ version 1.0 import "Structs.wdl" import "PlotSVCountsPerSample.wdl" as plot_svcounts +import "Utils.wdl" as util +import "TasksMakeCohortVcf.wdl" as cohort_utils +import "CollectQcVcfWide.wdl" as qc_utils +import "FilterOutlierSamplesPostMinGQ.wdl" as legacy + +# Identifies a list of SV count outlier samples from one or more input VCFs workflow IdentifyOutlierSamples { input { String name # batch or cohort - File vcf + Array[File] vcfs File? sv_counts # SV counts file from PlotSVCountsPerSample - if not provided, will create Int N_IQR_cutoff File? outlier_cutoff_table String? vcf_identifier # required (enter algorithm here) if providing outlier_cutoff_table, optional otherwise to add to file prefixes (ie. as a VCF identifier) + String? bcftools_preprocessing_options + Boolean plot_counts = false + Array[Pair[String, File]]? sample_subsets # if provided, will identify outliers separately within each subset. Expected format is array of pairs, where pair.left is the subset name and pair.right is a text file with all relevant sample IDs + Int samples_per_shard = 5000 String sv_pipeline_docker + String sv_base_mini_docker String linux_docker + RuntimeAttr? runtime_attr_ids_from_vcf + RuntimeAttr? runtime_override_preprocess_vcf RuntimeAttr? runtime_attr_identify_outliers RuntimeAttr? runtime_attr_cat_outliers + RuntimeAttr? runtime_attr_subset_counts RuntimeAttr? runtime_attr_count_svs + RuntimeAttr? runtime_attr_combine_counts + RuntimeAttr? runtime_attr_plot_svcounts } String prefix = if (defined(vcf_identifier)) then "~{name}_~{vcf_identifier}" else name + call util.GetSampleIdsFromVcf as GetSamplesList { + input: + vcf = vcfs[0], + sv_base_mini_docker = sv_base_mini_docker, + runtime_attr_override = runtime_attr_ids_from_vcf + } + Array[Pair[String, File]] default_subsets = [("ALL", GetSamplesList.out_file)] + Array[Pair[String, File]] subsets_to_eval = select_first([sample_subsets, default_subsets]) + + # Collect SV counts for each VCF in parallel unless sv_counts is provided if (!defined(sv_counts)) { - call plot_svcounts.CountSVsPerSamplePerType { + + # Process each VCF in parallel + scatter ( vcf in vcfs ) { + # Preprocess VCF with bcftools, if optioned + if (defined(bcftools_preprocessing_options)) { + call qc_utils.PreprocessVcf { + input: + vcf = vcf, + prefix = basename(vcf, ".vcf.gz") + ".preprocessed", + bcftools_preprocessing_options = select_first([bcftools_preprocessing_options]), + sv_base_mini_docker = sv_pipeline_docker, + runtime_attr_override = runtime_override_preprocess_vcf + } + } + File prepped_vcf = select_first([PreprocessVcf.outvcf, vcf]) + + call plot_svcounts.CountSVsPerSamplePerType as CountPerVcf { + input: + vcf = prepped_vcf, + prefix = prefix, + sv_pipeline_docker = sv_pipeline_docker, + runtime_attr_override = runtime_attr_count_svs + } + } + + # Combine counts across all VCFs (scattered over sample chunks) + call cohort_utils.SplitUncompressed as ShardSamples { input: - vcf = vcf, - prefix = prefix, - sv_pipeline_docker = sv_pipeline_docker, - runtime_attr_override = runtime_attr_count_svs + whole_file = GetSamplesList.out_file, + lines_per_shard = samples_per_shard, + shard_prefix = prefix, + shuffle_file = true, + random_seed = 2023, + sv_pipeline_docker = sv_pipeline_docker + } + scatter ( sample_shard in ShardSamples.shards ) { + call SubsetCounts as SubsetPreCombine { + input: + svcounts = CountPerVcf.sv_counts, + samples_list = sample_shard, + outfile = "${prefix}.shard.counts.tsv", + linux_docker = linux_docker, + runtime_attr_override = runtime_attr_subset_counts + } + call legacy.CombineCounts as CombineShard { + input: + svcounts = [SubsetPreCombine.counts_subset], + prefix = prefix, + sv_pipeline_docker = sv_pipeline_docker, + runtime_attr_override = runtime_attr_combine_counts + } + } + call CatCounts as Combine { + input: + svcounts = CombineShard.summed_svcounts, + outfile = "${prefix}.merged.counts.tsv", + linux_docker = linux_docker } } + File final_counts = select_first([sv_counts, Combine.merged_counts]) + # If a precomputed outlier table is provided, directly apply those cutoffs if (defined(outlier_cutoff_table)) { call IdentifyOutliersByCutoffTable { input: - vcf = vcf, - svcounts = select_first([sv_counts, CountSVsPerSamplePerType.sv_counts]), + svcounts = final_counts, outlier_cutoff_table = select_first([outlier_cutoff_table]), outfile = "${prefix}_outliers.txt", algorithm = select_first([vcf_identifier]), @@ -42,20 +120,53 @@ workflow IdentifyOutlierSamples { runtime_attr_override = runtime_attr_identify_outliers } } - call IdentifyOutliersByIQR { - input: - vcf = vcf, - svcounts = select_first([sv_counts, CountSVsPerSamplePerType.sv_counts]), - N_IQR_cutoff = N_IQR_cutoff, - outfile = "${prefix}_IQR_outliers.txt", - sv_pipeline_docker = sv_pipeline_docker, - runtime_attr_override = runtime_attr_identify_outliers + # Otherwise, dynamically determine outliers based on N_IQR + if (!defined(outlier_cutoff_table)) { + + # Determine outliers for each sample subset, if optioned (e.g., PCR+ vs. PCR-) + scatter ( subset_info in subsets_to_eval ) { + call SubsetCounts { + input: + svcounts = [final_counts], + samples_list = subset_info.right, + outfile = "${prefix}.${subset_info.left}.counts.tsv", + linux_docker = linux_docker, + runtime_attr_override = runtime_attr_subset_counts + } + call IdentifyOutliersByIQR { + input: + svcounts = SubsetCounts.counts_subset, + N_IQR_cutoff = N_IQR_cutoff, + outfile = "${prefix}.${subset_info.left}.IQR_outliers.txt", + sv_pipeline_docker = sv_pipeline_docker, + runtime_attr_override = runtime_attr_identify_outliers + } + if (plot_counts) { + call plot_svcounts.PlotSVCountsWithCutoff as PlotCountsPerSubset { + input: + svcounts = SubsetCounts.counts_subset, + n_iqr_cutoff = N_IQR_cutoff, + prefix = "${prefix}.${subset_info.left}", + sv_pipeline_docker = sv_pipeline_docker, + runtime_attr_override = runtime_attr_plot_svcounts + } + } + } + + # Merge outlier lists across all subsets + call CatOutliers as CatSubsets { + input: + outliers = select_all(IdentifyOutliersByIQR.outliers_list), + batch = prefix, + linux_docker = linux_docker, + runtime_attr_override = runtime_attr_cat_outliers + } } # Merge list of outliers call CatOutliers { input: - outliers = select_all([IdentifyOutliersByIQR.outliers_list,IdentifyOutliersByCutoffTable.outliers_list]), + outliers = select_all([CatSubsets.outliers_file, IdentifyOutliersByCutoffTable.outliers_list]), batch = prefix, linux_docker = linux_docker, runtime_attr_override = runtime_attr_cat_outliers @@ -64,13 +175,13 @@ workflow IdentifyOutlierSamples { output { File outlier_samples_file = CatOutliers.outliers_file Array[String] outlier_samples_list = CatOutliers.outliers_list - File sv_counts_file = select_first([sv_counts, CountSVsPerSamplePerType.sv_counts]) + File sv_counts_file = final_counts + Array[File?]? outlier_plots = PlotCountsPerSubset.svcount_distrib_plots } } task IdentifyOutliersByIQR { input { - File vcf File svcounts Int N_IQR_cutoff String outfile @@ -116,7 +227,6 @@ task IdentifyOutliersByIQR { task IdentifyOutliersByCutoffTable { input { - File vcf File svcounts File outlier_cutoff_table String outfile @@ -162,6 +272,96 @@ task IdentifyOutliersByCutoffTable { } +# Restrict a file of SV counts per sample to a subset of samples +task SubsetCounts { + input { + Array[File] svcounts + File samples_list + String outfile + String linux_docker + RuntimeAttr? runtime_attr_override + } + + RuntimeAttr default_attr = object { + cpu_cores: 1, + mem_gb: 3.75, + disk_gb: 10, + boot_disk_gb: 10, + preemptible_tries: 3, + max_retries: 1 + } + RuntimeAttr runtime_attr = select_first([runtime_attr_override, default_attr]) + + output { + File counts_subset = "${outfile}" + } + + command <<< + + set -euo pipefail + head -n1 ~{svcounts[0]} > ~{outfile} + for file in ~{sep=" " svcounts}; do + fgrep -wf ~{samples_list} "$file" >> ~{outfile} + done + + >>> + + runtime { + cpu: select_first([runtime_attr.cpu_cores, default_attr.cpu_cores]) + memory: select_first([runtime_attr.mem_gb, default_attr.mem_gb]) + " GiB" + disks: "local-disk " + select_first([runtime_attr.disk_gb, default_attr.disk_gb]) + " HDD" + bootDiskSizeGb: select_first([runtime_attr.boot_disk_gb, default_attr.boot_disk_gb]) + docker: linux_docker + preemptible: select_first([runtime_attr.preemptible_tries, default_attr.preemptible_tries]) + maxRetries: select_first([runtime_attr.max_retries, default_attr.max_retries]) + } + +} + +# Naive concatenation of multiple counts files while accounting for header +task CatCounts { + input { + Array[File] svcounts + String outfile + String linux_docker + RuntimeAttr? runtime_attr_override + } + + RuntimeAttr default_attr = object { + cpu_cores: 1, + mem_gb: 3.75, + disk_gb: 10, + boot_disk_gb: 10, + preemptible_tries: 3, + max_retries: 1 + } + RuntimeAttr runtime_attr = select_first([runtime_attr_override, default_attr]) + + output { + File merged_counts = "${outfile}" + } + + command <<< + + set -euo pipefail + head -n1 ~{svcounts[0]} > ~{outfile} + for file in ~{sep=" " svcounts}; do + cat "$file" | sed '1d' >> ~{outfile} + done + + >>> + + runtime { + cpu: select_first([runtime_attr.cpu_cores, default_attr.cpu_cores]) + memory: select_first([runtime_attr.mem_gb, default_attr.mem_gb]) + " GiB" + disks: "local-disk " + select_first([runtime_attr.disk_gb, default_attr.disk_gb]) + " HDD" + bootDiskSizeGb: select_first([runtime_attr.boot_disk_gb, default_attr.boot_disk_gb]) + docker: linux_docker + preemptible: select_first([runtime_attr.preemptible_tries, default_attr.preemptible_tries]) + maxRetries: select_first([runtime_attr.max_retries, default_attr.max_retries]) + } + +} # Merge outlier sample lists across algorithms task CatOutliers { diff --git a/wdl/PlotSVCountsPerSample.wdl b/wdl/PlotSVCountsPerSample.wdl index 65602920f..180ae0a7c 100644 --- a/wdl/PlotSVCountsPerSample.wdl +++ b/wdl/PlotSVCountsPerSample.wdl @@ -1,14 +1,17 @@ version 1.0 import "Structs.wdl" +import "CollectQcVcfWide.wdl" as qc_utils -# Workflow to count SVs per sample per type and +# Workflow to count SVs per sample per type workflow PlotSVCountsPerSample { input { String prefix Array[File?] vcfs Int N_IQR_cutoff + String? bcftools_preprocessing_options String sv_pipeline_docker + RuntimeAttr? runtime_override_preprocess_vcf RuntimeAttr? runtime_attr_count_svs RuntimeAttr? runtime_attr_plot_svcounts RuntimeAttr? runtime_attr_cat_outliers_preview @@ -16,10 +19,23 @@ workflow PlotSVCountsPerSample { scatter (vcf in vcfs) { if (defined(vcf)) { - String vcf_name = basename(select_first([vcf]), ".vcf.gz") + # Preprocess VCF with bcftools, if optioned + if (defined(bcftools_preprocessing_options)) { + call qc_utils.PreprocessVcf { + input: + vcf=select_first([vcf]), + prefix=basename(select_first([vcf]), ".vcf.gz") + ".preprocessed", + bcftools_preprocessing_options=select_first([bcftools_preprocessing_options]), + sv_base_mini_docker=sv_pipeline_docker, + runtime_attr_override=runtime_override_preprocess_vcf + } + } + File prepped_vcf = select_first([PreprocessVcf.outvcf, vcf]) + + String vcf_name = basename(select_first([prepped_vcf]), ".vcf.gz") call CountSVsPerSamplePerType { input: - vcf = select_first([vcf]), + vcf = select_first([prepped_vcf]), prefix = vcf_name, sv_pipeline_docker = sv_pipeline_docker, runtime_attr_override = runtime_attr_count_svs