-
Notifications
You must be signed in to change notification settings - Fork 597
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ploidy for Foxtrot VDS [VS-1418] #9082
Changes from all commits
722edb3
00a914a
35e923f
8ded04f
5f83cab
dfd983e
3c19062
68d0e46
d24f823
3a9cf8a
2a3879d
13699f3
bab3c4c
6c66380
c33b181
7c02f5e
68dc165
65acdd8
ea988e4
1fee364
9a686c9
655f574
059b5e7
0f9e6b2
931fd55
141776e
2d26103
8a19214
e0bd822
4b71c26
7ddac79
ec6134c
ab5bce1
050569f
97c2874
207c99c
84e1793
427f032
3d4c8f0
d236c20
19b86cd
57d5b10
078edeb
2eb184d
c8dac51
bfacc6f
661cff1
1361b02
307e2bb
637481b
ad4dc8f
e82ff67
9907061
d5be505
50c15e6
eab2668
092c736
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
import os | ||
import json | ||
import gzip | ||
|
||
from collections import namedtuple, defaultdict, abc | ||
|
||
import hail as hl | ||
|
||
from avro.datafile import DataFileReader | ||
from avro.io import DatumReader | ||
from hail.utils.java import info | ||
|
||
def import_ploidy(*avros) -> dict[str, hl.Struct]: | ||
""" | ||
Parameters | ||
---------- | ||
avros : | ||
Path(s) of ploidy data | ||
""" | ||
PloidyRecord = namedtuple("PloidyRecord", "location sample_name ploidy") | ||
|
||
# the implementation of GCS for Hadoop doesn't allow seeking to the end of a file | ||
# so I'm monkey patching DataFileReader | ||
def patched_determine_file_length(self) -> int: | ||
remember_pos = self.reader.tell() | ||
self.reader.seek(-1, 2) | ||
file_length = self.reader.tell() + 1 | ||
self.reader.seek(remember_pos) | ||
return file_length | ||
|
||
original_determine_file_length = DataFileReader.determine_file_length | ||
DataFileReader.determine_file_length = patched_determine_file_length | ||
|
||
fs = hl.current_backend().fs | ||
ploidy_table = defaultdict(dict) | ||
for file in avros: | ||
with fs.open(file, "rb") as data: | ||
for record in DataFileReader(data, DatumReader()): | ||
location, sample_name, ploidy = PloidyRecord(**record) | ||
if sample_name in ploidy_table[location]: | ||
raise ValueError( | ||
f"duplicate key `{sample_name}` for location {location}" | ||
) | ||
ploidy_table[location][sample_name] = ploidy | ||
|
||
# undo our monkey patch | ||
DataFileReader.determine_file_length = original_determine_file_length | ||
|
||
# hg38 = hl.get_reference("GRCh38") | ||
# xy_contigs = set(hg38.x_contigs + hg38.y_contigs) | ||
# ploidy_table = { | ||
# contig: ploidy_table[key] | ||
# for contig, key in zip(hg38.contigs, sorted(ploidy_table)) | ||
# if contig in xy_contigs | ||
# } | ||
|
||
x_table = ploidy_table["chrX"] | ||
y_table = ploidy_table["chrY"] | ||
assert set(x_table) == set(y_table) | ||
|
||
return { | ||
sample_name: hl.Struct( | ||
x_ploidy=x_table[sample_name], y_ploidy=y_table[sample_name] | ||
) | ||
for sample_name in x_table | ||
} | ||
|
||
|
||
def update_reference_data_ploidy(rd, ploidy) -> hl.MatrixTable: | ||
""" | ||
Parameters | ||
---------- | ||
rd : MatrixTable | ||
vds reference data | ||
ploidy : dict[str, dict[str, int]] | ||
table of ploidy information. Keys of outer dict are contigs. Keys of inner dict are sample names. | ||
Values of inner dict are the ploidy to use for the reference genotype in nonpar regions. | ||
""" | ||
rd = rd.annotate_cols(ploidy_data=hl.literal(ploidy)[rd.s]) | ||
rd = rd.annotate_rows(autosome_or_par=rd.locus.in_autosome_or_par(), is_y=rd.locus.contig == 'chrY') | ||
rd = rd.annotate_entries( | ||
GT=hl.if_else( | ||
rd.autosome_or_par, | ||
hl.call(0, 0), | ||
hl.rbind( | ||
hl.if_else(rd.is_y, rd.ploidy_data.y_ploidy, rd.ploidy_data.x_ploidy), | ||
lambda ploidy: hl.switch(ploidy) | ||
.when(1, hl.call(0)) | ||
.when(2, hl.call(0, 0)) | ||
.or_error( | ||
"expected 1 or 2 for ploidy information, found: " + hl.str(ploidy) | ||
), | ||
), | ||
) | ||
) | ||
|
||
return rd.drop("ploidy_data", "autosome_or_par", "is_y") |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,7 @@ def run_in_cluster(cluster_name, account, worker_machine_type, master_machine_ty | |
--num-worker-local-ssds 1 | ||
--subnet=projects/{gcs_project}/regions/{region}/subnetworks/subnetwork | ||
--properties=dataproc:dataproc.monitoring.stackdriver.enable=true,dataproc:dataproc.logging.stackdriver.enable=true,core:fs.gs.outputstream.sync.min.interval=5 | ||
--packages=python-snappy | ||
{cluster_name} | ||
""") | ||
|
@@ -64,7 +65,9 @@ def run_in_cluster(cluster_name, account, worker_machine_type, master_machine_ty | |
) | ||
|
||
# prepare custom arguments | ||
secondary_script_path_arg = f'--py-files {" ".join(secondary_script_path_list)}' if secondary_script_path_list else '' | ||
# the following says `--py-files` is supposed to be a comma separated list | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How did this work before? Did we never pass multiple py-files? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oooooh it probably didn't work before--we probably only ever gave it a single secondary script at a time There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes exactly, I was the lucky first person to supply more than one |
||
# https://fig.io/manual/gcloud/dataproc/jobs/submit/pyspark | ||
secondary_script_path_arg = f'--py-files {",".join(secondary_script_path_list)}' if secondary_script_path_list else '' | ||
with open(script_arguments_json_path, 'r') as input_file: | ||
items = ijson.items(input_file, '', use_float=True) | ||
arguments = items.__next__(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ workflow GvsExtractAvroFilesForHail { | |
String? basic_docker | ||
String? cloud_sdk_docker | ||
String? variants_docker | ||
String ploidy_table_name | ||
} | ||
|
||
if (!defined(git_hash) || !defined(basic_docker) || !defined(cloud_sdk_docker) || !defined(variants_docker)) { | ||
|
@@ -68,6 +69,16 @@ workflow GvsExtractAvroFilesForHail { | |
variants_docker = effective_variants_docker, | ||
} | ||
|
||
call ExtractFromPloidyTable { | ||
input: | ||
project_id = project_id, | ||
dataset_name = dataset_name, | ||
ploidy_table_name = ploidy_table_name, | ||
avro_sibling = OutputPath.out, | ||
call_set_identifier = call_set_identifier, | ||
variants_docker = effective_variants_docker, | ||
} | ||
|
||
call Utils.CountSuperpartitions { | ||
input: | ||
project_id = project_id, | ||
|
@@ -239,6 +250,59 @@ task ExtractFromFilterTables { | |
} | ||
|
||
|
||
task ExtractFromPloidyTable { | ||
meta { | ||
description: "Extracts from the sample chromosome ploidy table" | ||
# Not dealing with caching for now as that would introduce a lot of complexity. | ||
volatile: true | ||
} | ||
input { | ||
String project_id | ||
String dataset_name | ||
String ploidy_table_name | ||
String avro_sibling | ||
String call_set_identifier | ||
String variants_docker | ||
} | ||
|
||
parameter_meta { | ||
avro_sibling: "Cloud path to a file that will be the sibling to the 'avro' 'directory' under which output Avro files will be written." | ||
} | ||
command <<< | ||
# Prepend date, time and pwd to xtrace log entries. | ||
PS4='\D{+%F %T} \w $ ' | ||
set -o errexit -o nounset -o pipefail -o xtrace | ||
|
||
avro_prefix="$(dirname ~{avro_sibling})/avro" | ||
echo $avro_prefix > "avro_prefix.out" | ||
|
||
# Note the query below extracts ploidy data for chrX and chrY only as those are the only chromosomes the VDS | ||
# ploidy logic looks at. | ||
|
||
python3 /app/run_avro_query.py --sql " | ||
EXPORT DATA OPTIONS( | ||
uri='${avro_prefix}/ploidy_data/ploidy_data_*.avro', format='AVRO', compression='SNAPPY') AS | ||
SELECT ( | ||
CASE (p.chromosome / 1000000000000) | ||
WHEN 23 THEN 'chrX' | ||
WHEN 24 THEN 'chrY' | ||
END) AS location, s.sample_name, p.ploidy | ||
FROM \`~{project_id}.~{dataset_name}.~{ploidy_table_name}\` p | ||
JOIN \`~{project_id}.~{dataset_name}.sample_info\` s ON p.sample_id = s.sample_id | ||
WHERE (p.chromosome / 1000000000000 = 23 or p.chromosome / 1000000000000 = 24) | ||
" --call_set_identifier ~{call_set_identifier} --dataset_name ~{dataset_name} --table_name ~{ploidy_table_name} --project_id=~{project_id} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. didn't we make a change where we only got the avro files for one BQ partition/one vet_x table/group of 4k samples at a time? We did that to make Hail faster when we passed in the vet and ref data. Do we not need to do this with ploidy data because it is added to the VDS at the end? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That and ploidy data is tiny: two rows per sample. |
||
>>> | ||
output { | ||
Boolean done = true | ||
String output_prefix = read_string("avro_prefix.out") | ||
} | ||
|
||
runtime { | ||
docker: variants_docker | ||
disks: "local-disk 500 HDD" | ||
} | ||
} | ||
|
||
task ExtractFromSuperpartitionedTables { | ||
meta { | ||
description: "Extracts from the superpartitioned tables: vet_<table index>, ref_ranges_<table index>" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
version 1.0 | ||
|
||
import "GvsUtils.wdl" as Utils | ||
import "../GvsUtils.wdl" as Utils | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops |
||
|
||
workflow GvsTieoutVcfMaxAltAlleles { | ||
input { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are the lines from the original PR that were giving me trouble, in particular the
zip
when I was supplying Avro data with more than just the X and Y contigs.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we understand why Chris did this? Should we check in with him about it's removal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to George's comment