Skip to content

Commit

Permalink
Add support for random reading of vcf file.
Browse files Browse the repository at this point in the history
  • Loading branch information
niyarin committed Oct 16, 2019
1 parent e94da96 commit abb6a71
Show file tree
Hide file tree
Showing 10 changed files with 264 additions and 98 deletions.
6 changes: 4 additions & 2 deletions src/cljam/io/bam_index.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
(ns cljam.io.bam-index
"Parser for a BAM index file."
(:require [cljam.io.bam-index.core :as bai-core]))
(:require
[cljam.io.bam-index.core :as bai-core]
[cljam.io.util.bin :as util-bin]))

(defn bin-index
"Returns binning index for the given reference index."
Expand All @@ -15,7 +17,7 @@
(defn get-spans
"Returns regions of a BAM file that may contain an alignment for the given range."
[bai ref-idx beg end]
(bai-core/get-spans bai ref-idx beg end))
(util-bin/get-spans bai ref-idx beg end))

(defn bam-index
"Returns a cljam.bam-index.core.BAMIndex."
Expand Down
55 changes: 16 additions & 39 deletions src/cljam/io/bam_index/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@
"The core of BAM index features."
(:require [clojure.java.io :as cio]
[clojure.tools.logging :as logging]
[cljam.io.bam-index [common :refer :all]
[chunk :as chunk]
[reader :as reader]
[writer :as writer]]
[cljam.util :as util])
[cljam.io.bam-index [reader :as reader]
[writer :as writer]]
[cljam.util :as util]
[cljam.io.util.bin :as util-bin])
(:import [java.io DataOutputStream FileOutputStream]
cljam.io.bam_index.reader.BAIReader
cljam.io.bam_index.writer.BAIWriter))

(deftype BAMIndex [url bidx lidx])
(deftype BAMIndex [url bidx lidx]
util-bin/IBinaryIndex
(bidx-ref [this]
(.bidx this))
(lidx-ref [this]
(.lidx this)))

(defn bam-index [f]
(let [{:keys [bidx lidx]} (with-open [r ^BAIReader (reader/reader f)] (reader/read-all-index! r))]
Expand All @@ -29,39 +33,6 @@
(with-open [r ^BAIReader (reader/reader f)]
(reader/read-linear-index! r ref-idx)))

(defn- reg->bins*
"Returns candidate bins for the specified region as a vector."
[^long beg ^long end]
(let [max-pos 0x1FFFFFFF
beg (if (<= beg 0) 0 (bit-and (dec beg) max-pos))
end (if (<= end 0) max-pos (bit-and (dec end) max-pos))]
(if (<= beg end)
(loop [bins (transient [0])
xs [[1 26] [9 23] [73 20] [585 17] [4681 14]]]
(if-let [[^long ini shift] (first xs)]
(let [ini* (+ ini (bit-shift-right beg shift))
end* (+ ini (bit-shift-right end shift))]
(recur
(loop [b bins k ini*]
(if (<= k end*)
(recur (conj! b k) (inc k))
b))
(next xs)))
(persistent! bins))))))

(def ^:private reg->bins (memoize reg->bins*))

(defn get-spans
[^BAMIndex bai ^long ref-idx ^long beg ^long end]
(let [bins (reg->bins beg end)
bidx (get (.bidx bai) ref-idx)
lidx (get (.lidx bai) ref-idx)
chunks (into [] (comp (map bidx) cat) bins)
lin-beg (writer/pos->lidx-offset beg)
min-offset (get lidx lin-beg 0)]
(->> (chunk/optimize-chunks chunks min-offset)
(map vals))))

(defn get-unplaced-spans
[^BAMIndex bai]
(if-let [begin (some->>
Expand All @@ -74,8 +45,14 @@
[[begin Long/MAX_VALUE]]
[]))

(defn get-spans
[^BAMIndex bai ^long ref-idx ^long beg ^long end]
(util-bin/get-spans bai ref-idx beg end))


;; ## Writing


(defn ^BAIWriter writer
[f refs]
(BAIWriter. (DataOutputStream. (FileOutputStream. (cio/file f)))
Expand Down
88 changes: 54 additions & 34 deletions src/cljam/io/tabix.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,64 @@
"Alpha - subject to change.
Reader of a TABIX format file."
(:require [cljam.io.util.bgzf :as bgzf]
[cljam.io.util.lsb :as lsb])
[cljam.io.util.lsb :as lsb]
[cljam.io.util.bin :as util-bin])
(:import java.util.Arrays
[java.io DataInputStream IOException]))
[java.io DataInputStream IOException]
[cljam.io.bam_index.chunk Chunk]))

(deftype Tabix [n-ref preset sc bc ec meta skip seq bidx lidx]
util-bin/IBinaryIndex
(bidx-ref [this]
(.bidx this))
(lidx-ref [this]
(.lidx this)))

(def tabix-magic "TBI\1")

(defn- read-chunks!
[rdr]
(let [n (lsb/read-int rdr)]
(loop [i 0, chunks []]
(if (< i n)
(recur (inc i) (conj chunks (Chunk. (lsb/read-long rdr) (lsb/read-long rdr))))
chunks))))

(defn- read-seq
[buf len]
(loop [i 0, j 0, seq* []]
(if (< i len)
(if (zero? (nth buf i))
(let [ba-len (- i j)
ba (byte-array ba-len)]
(System/arraycopy buf j ba 0 ba-len)
(recur (inc i) (inc i) (conj seq* (String. ba))))
(recur (inc i) j seq*))
seq*)))
(if (zero? (nth buf i))
(let [ba-len (- i j)
ba (byte-array ba-len)]
(System/arraycopy buf j ba 0 ba-len)
(recur (inc i) (inc i) (conj seq* (String. ba))))
(recur (inc i) j seq*))
seq*)))

(defn- read-chunk
[^DataInputStream rdr]
{:beg (lsb/read-long rdr)
:end (lsb/read-long rdr)})
(defn- read-bin-index**!
[rdr]
(let [n (lsb/read-int rdr)]
(loop [i 0, bidx []]
(if (< i n)
(let [bin (lsb/read-int rdr)
chunks (read-chunks! rdr)]
(recur (inc i) (conj bidx {:bin bin, :chunks chunks})))
bidx))))

(defn- read-bin
[^DataInputStream rdr]
(let [bin (lsb/read-int rdr)
n-chunk (lsb/read-int rdr)]
{:bin bin
:chunks (doall (map (fn [_] (read-chunk rdr)) (range n-chunk)))}))
(defn- read-linear-index**!
[rdr]
(let [n (lsb/read-int rdr)]
(loop [i 0, lidx []]
(if (< i n)
(recur (inc i) (conj lidx (lsb/read-long rdr)))
lidx))))

(defn- read-index*
[^DataInputStream rdr]
(when-not (Arrays/equals ^bytes (lsb/read-bytes rdr 4) (.getBytes ^String tabix-magic))
(throw (IOException. "Invalid TABIX file")))
(let [n-seq (lsb/read-int rdr)
(let [n-ref (lsb/read-int rdr)
preset (lsb/read-int rdr)
sc (lsb/read-int rdr)
bc (lsb/read-int rdr)
Expand All @@ -46,20 +69,17 @@
len (lsb/read-int rdr)
buf (lsb/read-bytes rdr len)
seq (read-seq buf len)
index (loop [i 0
bin-index []
linear-index []]
(if (< i n-seq)
(let [n-bin (lsb/read-int rdr)
new-bin-index (doall (map (fn [_] (read-bin rdr)) (range n-bin)))
n-linear-index (lsb/read-int rdr)
new-linear-index (doall (map (fn [_] (lsb/read-long rdr)) (range n-linear-index)))]
(recur (inc i)
(conj bin-index new-bin-index)
(conj linear-index new-linear-index)))
[bin-index linear-index]))]
{:n-seq n-seq, :preset preset, :sc sc, :bc bc, :ec ec, :meta meta,
:skip skip, :seq seq ,:bin-index (first index), :linear-index (last index)}))
refs (range n-ref)
all-idx (map (fn [_] [(read-bin-index**! rdr) (read-linear-index**! rdr)]) refs)
bidx-seq (map first all-idx)
bidx (zipmap
refs
(map (fn [bins]
(zipmap (map :bin bins) (map :chunks bins)))
bidx-seq))
lidx (zipmap refs (map second all-idx))]
(->Tabix n-ref preset sc bc ec meta
skip seq bidx lidx)))

(defn read-index
[f]
Expand Down
45 changes: 45 additions & 0 deletions src/cljam/io/util/bin.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
(ns cljam.io.util.bin
(:refer-clojure :exclude [compare])
(:require [cljam.io.bam-index [chunk :as bam-index-chunk]
[writer :as bam-index-writer]])
(:import [java.io File]
[java.net MalformedURLException URI URL]
[cljam.io.bam_index.chunk Chunk]
[bgzf4j BGZFInputStream BGZFOutputStream]))

(defn- reg->bins*
"Returns candidate bins for the specified region as a vector."
[^long beg ^long end]
(let [max-pos 0x1FFFFFFF
beg (if (<= beg 0) 0 (bit-and (dec beg) max-pos))
end (if (<= end 0) max-pos (bit-and (dec end) max-pos))]
(if (<= beg end)
(loop [bins (transient [0])
xs [[1 26] [9 23] [73 20] [585 17] [4681 14]]]
(if-let [[^long ini shift] (first xs)]
(let [ini* (+ ini (bit-shift-right beg shift))
end* (+ ini (bit-shift-right end shift))]
(recur
(loop [b bins k ini*]
(if (<= k end*)
(recur (conj! b k) (inc k))
b))
(next xs)))
(persistent! bins))))))

(def ^:private reg->bins (memoize reg->bins*))

(defprotocol IBinaryIndex
(bidx-ref [this])
(lidx-ref [this]))

(defn get-spans
[^cljam.io.util.bin.IBinaryIndex index-data ^long ref-idx ^long beg ^long end]
(let [bins (reg->bins beg end)
bidx (get (bidx-ref index-data) ref-idx)
lidx (get (lidx-ref index-data) ref-idx)
chunks (into [] (comp (map bidx) cat) bins)
lin-beg (bam-index-writer/pos->lidx-offset beg)
min-offset (get lidx lin-beg 0)]
(->> (bam-index-chunk/optimize-chunks chunks min-offset)
(map vals))))
17 changes: 14 additions & 3 deletions src/cljam/io/vcf.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
[cljam.io.vcf.reader :as vcf-reader]
[cljam.io.vcf.writer :as vcf-writer]
[cljam.io.bcf.reader :as bcf-reader]
[cljam.io.bcf.writer :as bcf-writer])
[cljam.io.bcf.writer :as bcf-writer]
[cljam.io.util.bgzf :as bgzf])
(:import java.io.Closeable
cljam.io.vcf.reader.VCFReader
cljam.io.vcf.writer.VCFWriter
cljam.io.bcf.reader.BCFReader
cljam.io.bcf.writer.BCFWriter))
cljam.io.bcf.writer.BCFWriter
bgzf4j.BGZFInputStream))

;; Reading
;; -------
Expand All @@ -28,7 +30,9 @@
header (with-open [r (cio/reader (util/compressor-input-stream f))]
(vcf-reader/load-header r))]
(VCFReader. (util/as-url f) meta-info header
(cio/reader (util/compressor-input-stream f)))))
(if (bgzf/bgzip? f)
(bgzf/bgzf-input-stream f)
(cio/reader (util/compressor-input-stream f))))))

(defn ^BCFReader bcf-reader
"Returns an open cljam.io.bcf.reader.BCFReader of f. Should be used inside
Expand Down Expand Up @@ -70,6 +74,13 @@
([rdr] (protocols/read-variants rdr))
([rdr option] (protocols/read-variants rdr option)))

(defn read-variants-randomly
"Reads variants of the VCF file randomly, returning them as a lazy sequence."
([rdr option]
(vcf-reader/read-variants-randomly
rdr
option)))

;; Writing
;; -------

Expand Down
53 changes: 50 additions & 3 deletions src/cljam/io/vcf/reader.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@
[cljam.io.protocols :as protocols]
[camel-snake-kebab.core :refer [->kebab-case-keyword]]
[proton.core :refer [as-long]]
[cljam.io.vcf.util :as vcf-util])
[cljam.io.util.bin :as util-bin]
[cljam.io.vcf.util :as vcf-util]
[cljam.io.tabix :as tabix])
(:import [java.io Closeable]
[clojure.lang LazilyPersistentVector]))
[cljam.io.tabix Tabix]
[clojure.lang LazilyPersistentVector]
bgzf4j.BGZFInputStream))

;; VCFReader
;; ---------

(declare read-variants)
(declare read-variants read-variants-randomly)

(deftype VCFReader [url meta-info header reader]
Closeable
Expand Down Expand Up @@ -178,3 +182,46 @@
:deep (vcf-util/variant-parser (.meta-info rdr) (.header rdr))
:vcf identity)]
(map parse-fn (read-data-lines (.reader rdr) (.header rdr) kws)))))

(defn read-variants-randomly
[^VCFReader rdr {:keys [chr start end depth] :or {depth :deep start 1 end 4294967296}}]
(let [kws (mapv keyword (drop 8 (.header rdr)))
tabix-path (str (.getPath ^java.net.URL (.url rdr)) ".tbi")
tabix-data (tabix/read-index tabix-path)
ref-idx (.indexOf
^clojure.lang.PersistentVector (.seq ^Tabix tabix-data)
(if (and (string? chr)
(> (count chr) 3)
(= (subs chr 0 3) "chr"))
(subs chr 3)
(throw (ex-info "Invalid chr." {:chr chr}))))
spans
(if (= ref-idx -1)
'()
(util-bin/get-spans tabix-data ref-idx start end))
input-stream ^bgzf4j.BGZFInputStream (.reader rdr)
parse-fn (case depth
:deep (vcf-util/variant-parser (.meta-info rdr) (.header rdr))
:vcf identity)]
(reduce
(fn [res span]
(.seek input-stream (first span))
(loop [res res]

(if (< (.getFilePointer input-stream) (second span))
(let [variant
(parse-fn
(parse-data-line
(.readLine input-stream)
kws))
variant-pos (:pos variant)]
(if (or (< variant-pos (first span))
(>= variant-pos (second span)))
res
(recur
(cons
variant
res))))
res)))
'()
spans)))
Loading

0 comments on commit abb6a71

Please sign in to comment.