Skip to content

Commit

Permalink
Replace nested scatter in TinyResolve with sharding subworkflow (#240)
Browse files Browse the repository at this point in the history
  • Loading branch information
epiercehoffman authored Oct 14, 2021
1 parent b0baf08 commit 6e99475
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 17 deletions.
21 changes: 21 additions & 0 deletions wdl/GetShardInputs.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
version 1.0

workflow GetShardInputs {
input {
Array[String] all_items
Int shard_number
Int items_per_shard
Int num_items
}

scatter (j in range(items_per_shard)) {
Int idx = shard_number * items_per_shard + j
if (idx < num_items) {
String shard_items_ = all_items[idx]
}
}

output {
Array[String] shard_items = select_all(shard_items_)
}
}
55 changes: 38 additions & 17 deletions wdl/TinyResolve.wdl
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
version 1.0

import "Structs.wdl"
# Does perlim translocation resolve from raw manta calls
import "GetShardInputs.wdl"

# Does prelim translocation resolve from raw manta calls
workflow TinyResolve {
input {
Array[String] samples # Sample ID
Expand All @@ -24,29 +26,47 @@ workflow TinyResolve {
Int num_shards = ceil(num_samples_float / samples_per_shard)

scatter (i in range(num_shards)) {
scatter (j in range(samples_per_shard)) {
Int idx_ = i * samples_per_shard + j
if (idx_ < num_samples) {
String shard_samples_ = samples[idx_]
File shard_vcfs_ = manta_vcfs[idx_]
File shard_discfiles_ = discfile[idx_]
File shard_discfiles_idx_ = discfile_idx[idx_]
}
call GetShardInputs.GetShardInputs as GetShardSamples {
input:
items_per_shard = samples_per_shard,
shard_number = i,
num_items = num_samples,
all_items = samples
}

call GetShardInputs.GetShardInputs as GetShardDiscfiles {
input:
items_per_shard = samples_per_shard,
shard_number = i,
num_items = num_samples,
all_items = discfile
}

call GetShardInputs.GetShardInputs as GetShardDiscfileIndexes {
input:
items_per_shard = samples_per_shard,
shard_number = i,
num_items = num_samples,
all_items = discfile_idx
}

call GetShardInputs.GetShardInputs as GetShardVcfs {
input:
items_per_shard = samples_per_shard,
shard_number = i,
num_items = num_samples,
all_items = manta_vcfs
}
Array[String] shard_samples = select_all(shard_samples_)
Array[File] shard_vcfs = select_all(shard_vcfs_)
Array[File] shard_discfiles = select_all(shard_discfiles_)
Array[File] shard_discfiles_idx = select_all(shard_discfiles_idx_)

call ResolveManta {
input:
raw_vcfs=shard_vcfs,
samples=shard_samples,
raw_vcfs=GetShardVcfs.shard_items,
samples=GetShardSamples.shard_items,
sv_pipeline_docker = sv_pipeline_docker,
cytoband=cytoband,
cytoband_idx=cytoband_idx,
discfile=shard_discfiles,
discfile_idx=shard_discfiles_idx,
discfile=GetShardDiscfiles.shard_items,
discfile_idx=GetShardDiscfileIndexes.shard_items,
mei_bed=mei_bed,
runtime_attr_override=runtime_attr
}
Expand All @@ -57,6 +77,7 @@ workflow TinyResolve {
}
}


task ResolveManta {
input {
Array[File] raw_vcfs
Expand Down

0 comments on commit 6e99475

Please sign in to comment.