Skip to content

Commit

Permalink
Ploidy for Foxtrot VDS [VS-1418] (#9082)
Browse files Browse the repository at this point in the history
  • Loading branch information
mcovarr authored Jan 28, 2025
1 parent a3be817 commit c8feb1b
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 4 deletions.
3 changes: 3 additions & 0 deletions .dockstore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ workflows:
branches:
- master
- ah_var_store
- vs_1418_ploidy_for_foxtrot_vds
tags:
- /.*/
- name: GvsValidateVDS
Expand Down Expand Up @@ -316,6 +317,7 @@ workflows:
branches:
- master
- ah_var_store
- vs_1418_ploidy_for_foxtrot_vds
tags:
- /.*/
- name: GvsIngestTieout
Expand Down Expand Up @@ -343,6 +345,7 @@ workflows:
branches:
- master
- ah_var_store
- vs_1418_ploidy_for_foxtrot_vds
tags:
- /.*/
- name: GvsCallsetStatistics
Expand Down
1 change: 1 addition & 0 deletions scripts/variantstore/scripts/hail_gvs_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def create_vds(argsfn, vds_path, references_path, temp_path, intermediate_resume
sample_mapping=argsfn('sample_mapping'),
site_filtering_data=argsfn('site_filtering_data'),
vets_filtering_data=argsfn('vets_filtering_data'),
ploidy_data=argsfn('ploidy_data'),
reference_genome=rg38,
final_path=vds_path,
tmp_dir=f'{temp_path}/hail_tmp_create_vds',
Expand Down
10 changes: 10 additions & 0 deletions scripts/variantstore/scripts/import_gvs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
from hail.vds.combiner.combine import merge_alleles, calculate_new_intervals
from hail.genetics.reference_genome import reference_genome_type
from hail.typecheck import typecheck, sequenceof, numeric
import import_gvs_ploidy


@typecheck(refs=sequenceof(sequenceof(str)),
vets=sequenceof(sequenceof(str)),
sample_mapping=sequenceof(str),
site_filtering_data=sequenceof(str),
vets_filtering_data=sequenceof(str),
ploidy_data=sequenceof(str),
final_path=str,
tmp_dir=str,
truth_sensitivity_snp_threshold=float,
Expand All @@ -27,6 +29,7 @@ def import_gvs(refs: 'List[List[str]]',
sample_mapping: 'List[str]',
site_filtering_data: 'List[str]',
vets_filtering_data: 'List[str]',
ploidy_data: 'List[str]',
final_path: 'str',
tmp_dir: 'str',
truth_sensitivity_snp_threshold: 'float' = 0.997,
Expand Down Expand Up @@ -71,6 +74,8 @@ def import_gvs(refs: 'List[List[str]]',
scope of the resulting variant table, where keys of the dictionary are an alternate
allele string, where the dictionary values are the full records from the input VETS
filtering table, minus the `ref` and `alt` fields.
- `ploidy_data` -- This data is used to determine and assign reference data ploidy. Some
VCFs record ploidy for X and Y chromosomes as haploid, others as diploid.
Execution notes
---------------
Expand Down Expand Up @@ -106,6 +111,8 @@ def import_gvs(refs: 'List[List[str]]',
Paths to site filtering files.
vets_filtering_data : List[str]
Paths to VETS filtering files.
ploidy_data : List[str]
Path(s) to ploidy data file(s).
final_path : :class:`str`
Desired path to final VariantDataset on disk.
tmp_dir : :class:`str`
Expand Down Expand Up @@ -190,6 +197,8 @@ def convert_array_with_id_keys_to_dense_array(arr, ids, drop=[]):
vets_filter = vets_filter.key_by('locus')
vets_filter.write(vets_filter_path, overwrite=True)

ploidy = import_gvs_ploidy.import_ploidy(*ploidy_data)

n_samples = 0

with hl._with_flags(use_new_shuffle='1'):
Expand Down Expand Up @@ -250,6 +259,7 @@ def convert_array_with_id_keys_to_dense_array(arr, ids, drop=[]):
ref_ht = ref_ht.annotate_globals(col_data=sample_names_lit.map(lambda s: hl.struct(s=s)),
ref_block_max_length=ref_block_max_length)
ref_mt = ref_ht._unlocalize_entries('entries', 'col_data', col_key=['s'])
ref_mt = import_gvs_ploidy.update_reference_data_ploidy(ref_mt, ploidy)

var_ht = hl.import_avro(var_group)
var_ht = var_ht.transmute(locus=translate_locus(var_ht.location),
Expand Down
97 changes: 97 additions & 0 deletions scripts/variantstore/scripts/import_gvs_ploidy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import os
import json
import gzip

from collections import namedtuple, defaultdict, abc

import hail as hl

from avro.datafile import DataFileReader
from avro.io import DatumReader
from hail.utils.java import info

def import_ploidy(*avros) -> dict[str, hl.Struct]:
"""
Parameters
----------
avros :
Path(s) of ploidy data
"""
PloidyRecord = namedtuple("PloidyRecord", "location sample_name ploidy")

# the implementation of GCS for Hadoop doesn't allow seeking to the end of a file
# so I'm monkey patching DataFileReader
def patched_determine_file_length(self) -> int:
remember_pos = self.reader.tell()
self.reader.seek(-1, 2)
file_length = self.reader.tell() + 1
self.reader.seek(remember_pos)
return file_length

original_determine_file_length = DataFileReader.determine_file_length
DataFileReader.determine_file_length = patched_determine_file_length

fs = hl.current_backend().fs
ploidy_table = defaultdict(dict)
for file in avros:
with fs.open(file, "rb") as data:
for record in DataFileReader(data, DatumReader()):
location, sample_name, ploidy = PloidyRecord(**record)
if sample_name in ploidy_table[location]:
raise ValueError(
f"duplicate key `{sample_name}` for location {location}"
)
ploidy_table[location][sample_name] = ploidy

# undo our monkey patch
DataFileReader.determine_file_length = original_determine_file_length

# hg38 = hl.get_reference("GRCh38")
# xy_contigs = set(hg38.x_contigs + hg38.y_contigs)
# ploidy_table = {
# contig: ploidy_table[key]
# for contig, key in zip(hg38.contigs, sorted(ploidy_table))
# if contig in xy_contigs
# }

x_table = ploidy_table["chrX"]
y_table = ploidy_table["chrY"]
assert set(x_table) == set(y_table)

return {
sample_name: hl.Struct(
x_ploidy=x_table[sample_name], y_ploidy=y_table[sample_name]
)
for sample_name in x_table
}


def update_reference_data_ploidy(rd, ploidy) -> hl.MatrixTable:
"""
Parameters
----------
rd : MatrixTable
vds reference data
ploidy : dict[str, dict[str, int]]
table of ploidy information. Keys of outer dict are contigs. Keys of inner dict are sample names.
Values of inner dict are the ploidy to use for the reference genotype in nonpar regions.
"""
rd = rd.annotate_cols(ploidy_data=hl.literal(ploidy)[rd.s])
rd = rd.annotate_rows(autosome_or_par=rd.locus.in_autosome_or_par(), is_y=rd.locus.contig == 'chrY')
rd = rd.annotate_entries(
GT=hl.if_else(
rd.autosome_or_par,
hl.call(0, 0),
hl.rbind(
hl.if_else(rd.is_y, rd.ploidy_data.y_ploidy, rd.ploidy_data.x_ploidy),
lambda ploidy: hl.switch(ploidy)
.when(1, hl.call(0))
.when(2, hl.call(0, 0))
.or_error(
"expected 1 or 2 for ploidy information, found: " + hl.str(ploidy)
),
),
)
)

return rd.drop("ploidy_data", "autosome_or_par", "is_y")
5 changes: 4 additions & 1 deletion scripts/variantstore/scripts/run_in_hail_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def run_in_cluster(cluster_name, account, worker_machine_type, master_machine_ty
--num-worker-local-ssds 1
--subnet=projects/{gcs_project}/regions/{region}/subnetworks/subnetwork
--properties=dataproc:dataproc.monitoring.stackdriver.enable=true,dataproc:dataproc.logging.stackdriver.enable=true,core:fs.gs.outputstream.sync.min.interval=5
--packages=python-snappy
{cluster_name}
""")
Expand All @@ -64,7 +65,9 @@ def run_in_cluster(cluster_name, account, worker_machine_type, master_machine_ty
)

# prepare custom arguments
secondary_script_path_arg = f'--py-files {" ".join(secondary_script_path_list)}' if secondary_script_path_list else ''
# the following says `--py-files` is supposed to be a comma separated list
# https://fig.io/manual/gcloud/dataproc/jobs/submit/pyspark
secondary_script_path_arg = f'--py-files {",".join(secondary_script_path_list)}' if secondary_script_path_list else ''
with open(script_arguments_json_path, 'r') as input_file:
items = ijson.items(input_file, '', use_float=True)
arguments = items.__next__();
Expand Down
4 changes: 4 additions & 0 deletions scripts/variantstore/wdl/GvsCreateVDS.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ workflow GvsCreateVDS {
hail_temp_path = hail_temp_path,
run_in_hail_cluster_script = GetHailScripts.run_in_hail_cluster_script,
gvs_import_script = GetHailScripts.gvs_import_script,
gvs_import_ploidy_script = GetHailScripts.gvs_import_ploidy_script,
hail_gvs_import_script = GetHailScripts.hail_gvs_import_script,
intermediate_resume_point = intermediate_resume_point,
workspace_project = effective_google_project,
Expand Down Expand Up @@ -159,6 +160,7 @@ task CreateVds {
Boolean leave_cluster_running_at_end
File hail_gvs_import_script
File gvs_import_script
File gvs_import_ploidy_script
File run_in_hail_cluster_script
String? hail_version
File? hail_wheel
Expand Down Expand Up @@ -237,6 +239,7 @@ task CreateVds {
python3 ~{run_in_hail_cluster_script} \
--script-path ~{hail_gvs_import_script} \
--secondary-script-path-list ~{gvs_import_script} \
--secondary-script-path-list ~{gvs_import_ploidy_script} \
--script-arguments-json-path script-arguments.json \
--account ${account_name} \
--autoscaling-policy gvs-autoscaling-policy \
Expand Down Expand Up @@ -287,6 +290,7 @@ task GetHailScripts {
File run_in_hail_cluster_script = "app/run_in_hail_cluster.py"
File hail_gvs_import_script = "app/hail_gvs_import.py"
File gvs_import_script = "app/import_gvs.py"
File gvs_import_ploidy_script = "app/import_gvs_ploidy.py"
}
runtime {
docker: variants_docker
Expand Down
64 changes: 64 additions & 0 deletions scripts/variantstore/wdl/GvsExtractAvroFilesForHail.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ workflow GvsExtractAvroFilesForHail {
String? basic_docker
String? cloud_sdk_docker
String? variants_docker
String ploidy_table_name
}

if (!defined(git_hash) || !defined(basic_docker) || !defined(cloud_sdk_docker) || !defined(variants_docker)) {
Expand Down Expand Up @@ -68,6 +69,16 @@ workflow GvsExtractAvroFilesForHail {
variants_docker = effective_variants_docker,
}

call ExtractFromPloidyTable {
input:
project_id = project_id,
dataset_name = dataset_name,
ploidy_table_name = ploidy_table_name,
avro_sibling = OutputPath.out,
call_set_identifier = call_set_identifier,
variants_docker = effective_variants_docker,
}

call Utils.CountSuperpartitions {
input:
project_id = project_id,
Expand Down Expand Up @@ -239,6 +250,59 @@ task ExtractFromFilterTables {
}


task ExtractFromPloidyTable {
meta {
description: "Extracts from the sample chromosome ploidy table"
# Not dealing with caching for now as that would introduce a lot of complexity.
volatile: true
}
input {
String project_id
String dataset_name
String ploidy_table_name
String avro_sibling
String call_set_identifier
String variants_docker
}

parameter_meta {
avro_sibling: "Cloud path to a file that will be the sibling to the 'avro' 'directory' under which output Avro files will be written."
}
command <<<
# Prepend date, time and pwd to xtrace log entries.
PS4='\D{+%F %T} \w $ '
set -o errexit -o nounset -o pipefail -o xtrace

avro_prefix="$(dirname ~{avro_sibling})/avro"
echo $avro_prefix > "avro_prefix.out"

# Note the query below extracts ploidy data for chrX and chrY only as those are the only chromosomes the VDS
# ploidy logic looks at.

python3 /app/run_avro_query.py --sql "
EXPORT DATA OPTIONS(
uri='${avro_prefix}/ploidy_data/ploidy_data_*.avro', format='AVRO', compression='SNAPPY') AS
SELECT (
CASE (p.chromosome / 1000000000000)
WHEN 23 THEN 'chrX'
WHEN 24 THEN 'chrY'
END) AS location, s.sample_name, p.ploidy
FROM \`~{project_id}.~{dataset_name}.~{ploidy_table_name}\` p
JOIN \`~{project_id}.~{dataset_name}.sample_info\` s ON p.sample_id = s.sample_id
WHERE (p.chromosome / 1000000000000 = 23 or p.chromosome / 1000000000000 = 24)
" --call_set_identifier ~{call_set_identifier} --dataset_name ~{dataset_name} --table_name ~{ploidy_table_name} --project_id=~{project_id}
>>>
output {
Boolean done = true
String output_prefix = read_string("avro_prefix.out")
}

runtime {
docker: variants_docker
disks: "local-disk 500 HDD"
}
}

task ExtractFromSuperpartitionedTables {
meta {
description: "Extracts from the superpartitioned tables: vet_<table index>, ref_ranges_<table index>"
Expand Down
2 changes: 1 addition & 1 deletion scripts/variantstore/wdl/GvsUtils.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ task GetToolVersions {
# GVS generally uses the smallest `alpine` version of the Google Cloud SDK as it suffices for most tasks, but
# there are a handlful of tasks that require the larger GNU libc-based `slim`.
String cloud_sdk_slim_docker = "gcr.io/google.com/cloudsdktool/cloud-sdk:435.0.0-slim"
String variants_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/variants:2024-11-25-alpine-913039adf8f4"
String variants_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/variants:2025-01-27-alpine-46895d996b6b"
String variants_nirvana_docker = "us.gcr.io/broad-dsde-methods/variantstore:nirvana_2022_10_19"
String gatk_docker = "us-central1-docker.pkg.dev/broad-dsde-methods/gvs/gatk:2024-11-28-gatkbase-b71132a18899"
String real_time_genomics_docker = "docker.io/realtimegenomics/rtg-tools:latest"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ workflow GvsQuickstartHailIntegration {
String? submission_id

Int? maximum_alternate_alleles
String ploidy_table_name
}

String project_id = "gvs-internal"
Expand Down Expand Up @@ -100,6 +101,7 @@ workflow GvsQuickstartHailIntegration {
project_id = project_id,
dataset_name = GvsQuickstartVcfIntegration.dataset_name,
filter_set_name = GvsQuickstartVcfIntegration.filter_set_name,
ploidy_table_name = ploidy_table_name,
scatter_width = 10,
call_set_identifier = git_branch_or_tag,
basic_docker = effective_basic_docker,
Expand Down Expand Up @@ -177,7 +179,7 @@ task TieOutVds {

# Copy the versions of the Hail import and tieout scripts for this branch from GitHub.
script_url_prefix="https://raw.githubusercontent.com/broadinstitute/gatk/~{git_branch_or_tag}/scripts/variantstore/scripts"
for script in hail_gvs_import.py hail_join_vds_vcfs.py gvs_vds_tie_out.py import_gvs.py
for script in hail_gvs_import.py hail_join_vds_vcfs.py gvs_vds_tie_out.py import_gvs.py import_gvs_ploidy.py
do
curl --silent --location --remote-name "${script_url_prefix}/${script}"
done
Expand Down
2 changes: 2 additions & 0 deletions scripts/variantstore/wdl/test/GvsQuickstartIntegration.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ workflow GvsQuickstartIntegration {
String? hail_version
Boolean chr20_X_Y_only = true
Int? maximum_alternate_alleles
String ploidy_table_name = "sample_chromosome_ploidy"
}

File full_wgs_interval_list = "gs://gcp-public-data--broad-references/hg38/v0/wgs_calling_regions.hg38.noCentromeres.noTelomeres.interval_list"
Expand Down Expand Up @@ -103,6 +104,7 @@ workflow GvsQuickstartIntegration {
submission_id = GetToolVersions.submission_id,
hail_version = effective_hail_version,
maximum_alternate_alleles = maximum_alternate_alleles,
ploidy_table_name = ploidy_table_name,
}

if (GvsQuickstartHailVETSIntegration.used_tighter_gcp_quotas) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
version 1.0

import "GvsUtils.wdl" as Utils
import "../GvsUtils.wdl" as Utils

workflow GvsTieoutVcfMaxAltAlleles {
input {
Expand Down

0 comments on commit c8feb1b

Please sign in to comment.