diff --git a/src/cljam/io/bam_index/reader.clj b/src/cljam/io/bam_index/reader.clj index a13046ce..d273e349 100644 --- a/src/cljam/io/bam_index/reader.clj +++ b/src/cljam/io/bam_index/reader.clj @@ -2,11 +2,11 @@ (:require [clojure.java.io :as cio] [cljam.io.util.lsb :as lsb] [cljam.io.bam-index.common :refer [bai-magic]] - [cljam.io.bam-index.chunk :as chunk] + [cljam.io.util.chunk :as chunk] [cljam.util :as util]) (:import java.util.Arrays [java.io FileInputStream Closeable IOException] - [cljam.io.bam_index.chunk Chunk])) + [cljam.io.util.chunk Chunk])) (deftype BAIReader [url reader] Closeable @@ -49,10 +49,10 @@ (defn- read-chunks! [rdr] (let [n (lsb/read-int rdr)] - (loop [i 0, chunks []] - (if (< i n) - (recur (inc i) (conj chunks (Chunk. (lsb/read-long rdr) (lsb/read-long rdr)))) - chunks)))) + (loop [i 0, chunks []] + (if (< i n) + (recur (inc i) (conj chunks (Chunk. (lsb/read-long rdr) (lsb/read-long rdr)))) + chunks)))) (defn- read-bin-index**! [rdr] diff --git a/src/cljam/io/bam_index/writer.clj b/src/cljam/io/bam_index/writer.clj index fe8bcff7..250d76ed 100644 --- a/src/cljam/io/bam_index/writer.clj +++ b/src/cljam/io/bam_index/writer.clj @@ -5,12 +5,12 @@ [cljam.io.util.bgzf :as bgzf] [cljam.io.util.lsb :as lsb] [cljam.io.bam-index.common :refer :all] - [cljam.io.bam-index.chunk :as chunk] + [cljam.io.util.chunk :as chunk] [cljam.io.bam.decoder :as bam-decoder]) (:import [java.io DataOutputStream Closeable] [java.nio ByteBuffer ByteOrder] [cljam.io.bam.decoder BAMPointerBlock] - [cljam.io.bam_index.chunk Chunk])) + [cljam.io.util.chunk Chunk])) (declare make-index-from-blocks) diff --git a/src/cljam/io/tabix.clj b/src/cljam/io/tabix.clj index 9ef30172..dfd09bc9 100644 --- a/src/cljam/io/tabix.clj +++ b/src/cljam/io/tabix.clj @@ -3,57 +3,49 @@ Reader of a TABIX format file." (:require [cljam.io.util.bgzf :as bgzf] [cljam.io.util.lsb :as lsb] - [cljam.io.util.bin :as util-bin]) + [cljam.io.util.bin :as util-bin] + [clojure.string :as cstr]) (:import java.util.Arrays [java.io DataInputStream IOException] - [cljam.io.bam_index.chunk Chunk])) + [cljam.io.util.chunk Chunk])) (deftype Tabix [n-ref preset sc bc ec meta skip seq bidx lidx] util-bin/IBinaryIndex (bidx-ref [this] (.bidx this)) (lidx-ref [this] - (.lidx this))) + (.lidx this)) + (get-ref-index [this chr] + (.indexOf + ^clojure.lang.PersistentVector + (.seq this) + chr))) (def tabix-magic "TBI\1") (defn- read-chunks! [rdr] - (let [n (lsb/read-int rdr)] - (loop [i 0, chunks []] - (if (< i n) - (recur (inc i) (conj chunks (Chunk. (lsb/read-long rdr) (lsb/read-long rdr)))) - chunks)))) + (->> #(Chunk. (lsb/read-long rdr) (lsb/read-long rdr)) + (repeatedly (lsb/read-int rdr)) + vec)) (defn- read-seq - [buf len] - (loop [i 0, j 0, seq* []] - (if (< i len) - (if (zero? (nth buf i)) - (let [ba-len (- i j) - ba (byte-array ba-len)] - (System/arraycopy buf j ba 0 ba-len) - (recur (inc i) (inc i) (conj seq* (String. ba)))) - (recur (inc i) j seq*)) - seq*))) + [^bytes buf] + (cstr/split (String. buf) #"\00")) -(defn- read-bin-index**! +(defn- read-bin-index [rdr] - (let [n (lsb/read-int rdr)] - (loop [i 0, bidx []] - (if (< i n) - (let [bin (lsb/read-int rdr) - chunks (read-chunks! rdr)] - (recur (inc i) (conj bidx {:bin bin, :chunks chunks}))) - bidx)))) + (->> #(hash-map + :bin (lsb/read-int rdr) + :chunks (read-chunks! rdr)) + (repeatedly (lsb/read-int rdr)) + vec)) -(defn- read-linear-index**! +(defn- read-linear-index [rdr] - (let [n (lsb/read-int rdr)] - (loop [i 0, lidx []] - (if (< i n) - (recur (inc i) (conj lidx (lsb/read-long rdr))) - lidx)))) + (->> #(lsb/read-long rdr) + (repeatedly (lsb/read-int rdr)) + vec)) (defn- read-index* [^DataInputStream rdr] @@ -68,14 +60,14 @@ skip (lsb/read-int rdr) len (lsb/read-int rdr) buf (lsb/read-bytes rdr len) - seq (read-seq buf len) + seq (read-seq buf) refs (range n-ref) - all-idx (map (fn [_] [(read-bin-index**! rdr) (read-linear-index**! rdr)]) refs) + all-idx (map (fn [_] [(read-bin-index rdr) (read-linear-index rdr)]) refs) bidx-seq (map first all-idx) bidx (zipmap refs (map (fn [bins] - (zipmap (map :bin bins) (map :chunks bins))) + (into {} (map (juxt :bin :chunks)) bins)) bidx-seq)) lidx (zipmap refs (map second all-idx))] (->Tabix n-ref preset sc bc ec meta diff --git a/src/cljam/io/util/bin.clj b/src/cljam/io/util/bin.clj index b74d6771..ab63ff8f 100644 --- a/src/cljam/io/util/bin.clj +++ b/src/cljam/io/util/bin.clj @@ -1,11 +1,6 @@ (ns cljam.io.util.bin - (:refer-clojure :exclude [compare]) - (:require [cljam.io.bam-index [chunk :as bam-index-chunk] - [writer :as bam-index-writer]]) - (:import [java.io File] - [java.net MalformedURLException URI URL] - [cljam.io.bam_index.chunk Chunk] - [bgzf4j BGZFInputStream BGZFOutputStream])) + (:require [cljam.io.util.chunk :as util-chunk] + [cljam.io.bam-index.writer :as bam-index-writer])) (defn- reg->bins* "Returns candidate bins for the specified region as a vector." @@ -31,15 +26,16 @@ (defprotocol IBinaryIndex (bidx-ref [this]) - (lidx-ref [this])) + (lidx-ref [this]) + (get-ref-index [this chr])) (defn get-spans - [^cljam.io.util.bin.IBinaryIndex index-data ^long ref-idx ^long beg ^long end] + [index-data ^long ref-idx ^long beg ^long end] (let [bins (reg->bins beg end) bidx (get (bidx-ref index-data) ref-idx) lidx (get (lidx-ref index-data) ref-idx) - chunks (into [] (comp (map bidx) cat) bins) + chunks (into [] (mapcat bidx) bins) lin-beg (bam-index-writer/pos->lidx-offset beg) min-offset (get lidx lin-beg 0)] - (->> (bam-index-chunk/optimize-chunks chunks min-offset) + (->> (util-chunk/optimize-chunks chunks min-offset) (map vals)))) diff --git a/src/cljam/io/bam_index/chunk.clj b/src/cljam/io/util/chunk.clj similarity index 82% rename from src/cljam/io/bam_index/chunk.clj rename to src/cljam/io/util/chunk.clj index 65bbb5b4..7c523037 100644 --- a/src/cljam/io/bam_index/chunk.clj +++ b/src/cljam/io/util/chunk.clj @@ -1,4 +1,4 @@ -(ns cljam.io.bam-index.chunk +(ns cljam.io.util.chunk (:refer-clojure :exclude [compare]) (:require [cljam.io.util.bgzf :as bgzf])) @@ -52,10 +52,10 @@ ret (transient [])] (if f (cond - (<= (.end f) min-offset) (recur r last-chunk ret) - (nil? last-chunk) (recur r f (conj! ret f)) - (and (not (overlap? last-chunk f)) - (not (adjacent? last-chunk f))) (recur r f (conj! ret f)) - (> (.end f) (.end last-chunk)) (let [l (assoc last-chunk :end (.end f))] (recur r l (conj! (pop! ret) l))) - :else (recur r last-chunk ret)) + (<= (.end f) min-offset) (recur r last-chunk ret) + (nil? last-chunk) (recur r f (conj! ret f)) + (and (not (overlap? last-chunk f)) + (not (adjacent? last-chunk f))) (recur r f (conj! ret f)) + (> (.end f) (.end last-chunk)) (let [l (assoc last-chunk :end (.end f))] (recur r l (conj! (pop! ret) l))) + :else (recur r last-chunk ret)) (persistent! ret))))) diff --git a/src/cljam/io/vcf.clj b/src/cljam/io/vcf.clj index 1c09abb2..0ed95070 100644 --- a/src/cljam/io/vcf.clj +++ b/src/cljam/io/vcf.clj @@ -10,7 +10,8 @@ [cljam.io.vcf.writer :as vcf-writer] [cljam.io.bcf.reader :as bcf-reader] [cljam.io.bcf.writer :as bcf-writer] - [cljam.io.util.bgzf :as bgzf]) + [cljam.io.util.bgzf :as bgzf] + [cljam.io.tabix :as tabix]) (:import java.io.Closeable cljam.io.vcf.reader.VCFReader cljam.io.vcf.writer.VCFWriter @@ -30,9 +31,11 @@ header (with-open [r (cio/reader (util/compressor-input-stream f))] (vcf-reader/load-header r))] (VCFReader. (util/as-url f) meta-info header + (if (bgzf/bgzip? f) (bgzf/bgzf-input-stream f) - (cio/reader (util/compressor-input-stream f)))))) + (cio/reader (util/compressor-input-stream f))) + (delay (tabix/read-index (str f ".tbi")))))) (defn ^BCFReader bcf-reader "Returns an open cljam.io.bcf.reader.BCFReader of f. Should be used inside diff --git a/src/cljam/io/vcf/reader.clj b/src/cljam/io/vcf/reader.clj index 75c19692..b3afbcb6 100644 --- a/src/cljam/io/vcf/reader.clj +++ b/src/cljam/io/vcf/reader.clj @@ -7,8 +7,8 @@ [camel-snake-kebab.core :refer [->kebab-case-keyword]] [proton.core :refer [as-long]] [cljam.io.util.bin :as util-bin] - [cljam.io.vcf.util :as vcf-util] - [cljam.io.tabix :as tabix]) + [cljam.io.tabix :as tabix] + [cljam.io.vcf.util :as vcf-util]) (:import [java.io Closeable] [cljam.io.tabix Tabix] [clojure.lang LazilyPersistentVector] @@ -19,7 +19,7 @@ (declare read-variants read-variants-randomly) -(deftype VCFReader [url meta-info header reader] +(deftype VCFReader [url meta-info header reader index-delay] Closeable (close [this] (.close ^Closeable (.reader this))) @@ -183,18 +183,17 @@ :vcf identity)] (map parse-fn (read-data-lines (.reader rdr) (.header rdr) kws))))) +(defn- make-lazy-variants [f s] + (if (not-empty s) + (concat + (f (first s)) + (make-lazy-variants f (rest s))))) + (defn read-variants-randomly [^VCFReader rdr {:keys [chr start end depth] :or {depth :deep start 1 end 4294967296}}] (let [kws (mapv keyword (drop 8 (.header rdr))) - tabix-path (str (.getPath ^java.net.URL (.url rdr)) ".tbi") - tabix-data (tabix/read-index tabix-path) - ref-idx (.indexOf - ^clojure.lang.PersistentVector (.seq ^Tabix tabix-data) - (if (and (string? chr) - (> (count chr) 3) - (= (subs chr 0 3) "chr")) - (subs chr 3) - (throw (ex-info "Invalid chr." {:chr chr})))) + tabix-data @(.index-delay rdr) + ref-idx (util-bin/get-ref-index tabix-data chr) spans (if (= ref-idx -1) '() @@ -203,25 +202,21 @@ parse-fn (case depth :deep (vcf-util/variant-parser (.meta-info rdr) (.header rdr)) :vcf identity)] - (reduce - (fn [res span] - (.seek input-stream (first span)) - (loop [res res] - - (if (< (.getFilePointer input-stream) (second span)) - (let [variant - (parse-fn - (parse-data-line - (.readLine input-stream) - kws)) - variant-pos (:pos variant)] - (if (or (< variant-pos (first span)) - (>= variant-pos (second span))) - res - (recur - (cons - variant - res)))) - res))) - '() + + (make-lazy-variants + (fn [[chunk-beg ^long chunk-end]] + (.seek input-stream chunk-beg) + + (->> #(when (< (.getFilePointer input-stream) chunk-end) + (-> input-stream + .readLine + (parse-data-line kws) + parse-fn)) + repeatedly + (take-while identity) + (filter + (fn [{chr' :chr :keys [pos ref info]}] + (and (= chr' chr) + (<= pos end) + (<= start (get info :END (dec (+ pos (count ref)))))))))) spans))) diff --git a/test-resources/vcf/test-v4_3-complex.vcf.gz b/test-resources/vcf/test-v4_3-complex.vcf.gz new file mode 100644 index 00000000..9c3520d1 Binary files /dev/null and b/test-resources/vcf/test-v4_3-complex.vcf.gz differ diff --git a/test-resources/vcf/test-v4_3-complex.vcf.gz.tbi b/test-resources/vcf/test-v4_3-complex.vcf.gz.tbi new file mode 100644 index 00000000..71632967 Binary files /dev/null and b/test-resources/vcf/test-v4_3-complex.vcf.gz.tbi differ diff --git a/test/cljam/io/tabix_test.clj b/test/cljam/io/tabix_test.clj index 214fc20c..6805d81a 100644 --- a/test/cljam/io/tabix_test.clj +++ b/test/cljam/io/tabix_test.clj @@ -5,13 +5,13 @@ [cljam.io.tabix :as tbi]) (:import [cljam.io.tabix Tabix] - [cljam.io.bam_index.chunk Chunk])) + [cljam.io.util.chunk Chunk])) (deftest about-read-index-with-error (is (thrown? java.io.IOException (tbi/read-index small-bam-file)))) (deftest about-read-index-returns-tabix-object - (is (instance? cljam.io.tabix.Tabix (tbi/read-index test-tabix-file)))) + (is (instance? Tabix (tbi/read-index test-tabix-file)))) (deftest about-read-index-check-the-returning-object (let [tabix-data ^Tabix (tbi/read-index test-tabix-file)] @@ -23,18 +23,8 @@ (is (number? (.meta tabix-data))) (is (number? (.skip tabix-data))) (is (vector? (.seq tabix-data))) - (is (instance? - Chunk - (get - ^clojure.lang.IPersistentVector - (get - ^clojure.lang.IPersistentMap - (get - ^clojure.lang.IPersistentMap - (.bidx tabix-data) 0) 4687) 0))) - (is (vector? - ^clojure.lang.IPersistentVector - (get (.lidx tabix-data) 0))))) + (is (instance? Chunk (get (get (get (.bidx tabix-data) 0) 4687) 0))) + (is (vector? (get (.lidx tabix-data) 0))))) (deftest-remote large-file (with-before-after {:before (prepare-cavia!)} @@ -42,8 +32,7 @@ (deftest source-type-test (with-open [server (http-server)] - (are [x] ((partial instance? cljam.io.tabix.Tabix) - (tbi/read-index x)) + (are [x] (instance? Tabix (tbi/read-index x)) test-tabix-file (cio/file test-tabix-file) (cio/as-url (cio/file test-tabix-file)) diff --git a/test/cljam/io/vcf_test.clj b/test/cljam/io/vcf_test.clj index 976b2931..ecbf636f 100644 --- a/test/cljam/io/vcf_test.clj +++ b/test/cljam/io/vcf_test.clj @@ -127,20 +127,42 @@ (vcf/read-variants-randomly (vcf/vcf-reader test-large-vcf-file) - {:chr "chr1" + {:chr "1" :start 20 :end 1000000}))) (is (not-throw? (vcf/read-variants-randomly (vcf/vcf-reader test-large-vcf-file) - {:chr "chr1" + {:chr "1" :start 2000}))) (is (not-throw? (vcf/read-variants-randomly (vcf/vcf-reader test-large-vcf-file) - {:chr "chr1"}))))) + {:chr "1"}))))) + +(deftest read-randomly-variants-complex-test + (doseq [index [{:chr "1"} + {:chr "2" :start 30000} + {:chr "2" :start 40000} + {:chr "2" :end 40000} + {:chr "2" :start 30000 :end 40000}]] + (with-open [vcf-file (vcf/vcf-reader test-vcf-complex-file) + vcf-gz-file (vcf/vcf-reader test-vcf-complex-gz-file)] + (is + (= + (vcf/read-variants-randomly + vcf-gz-file + index) + (let [{:keys [chr start end] :or {start 1 end 4294967296}} index] + (filter + (fn [variant] + (and (= chr (:chr variant)) + (<= start (:pos variant)) + (> end (:pos variant)))) + (vcf/read-variants + vcf-file)))))))) (deftest writer-test (testing "vcf" diff --git a/test/cljam/test_common.clj b/test/cljam/test_common.clj index b686487c..88bee24d 100644 --- a/test/cljam/test_common.clj +++ b/test/cljam/test_common.clj @@ -197,6 +197,8 @@ (def test-vcf-v4_3-file "test-resources/vcf/test-v4_3.vcf") (def test-vcf-no-samples-file "test-resources/vcf/test-no-samples.vcf") (def test-vcf-complex-file "test-resources/vcf/test-v4_3-complex.vcf") +(def test-vcf-complex-gz-file "test-resources/vcf/test-v4_3-complex.vcf.gz") +(def test-vcf-complex-tbi-file "test-resources/vcf/test-v4_3-complex.vcf.gz.tbi") (def test-large-vcf-file (cavia/resource mycavia "large.vcf.gz")) (def test-large-vcf-tbi-file (cavia/resource mycavia "large.vcf.gz.tbi"))