Skip to content

Commit

Permalink
Addressing reviewer comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
fnothaft authored and heuermh committed Jul 26, 2017
1 parent fa07848 commit c8a2202
Showing 1 changed file with 50 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,22 +145,22 @@ protected FastqRecordReader(final Configuration conf,
CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
CompressionCodec codec = codecFactory.getCodec(file);

// if our codec is splittable, we can (tentatively) say that
// we too are splittable.
//
// if we get a bgzfenhancedcodec, the codec might not actually
// be splittable. however, if we get a non-splittable gz file,
// several things happen:
//
// 1. the input format will detect this, and will not split the
// file
// 2. the bgzfenhancedcodec will check the underlying data type
// (BGZF vs GZIP) at input stream creation time, and will
// apply the appropriate codec.
//
// if we get an unsplittable codec, really all that we do differently
// is skip the positioning check, since we know that we're at the
// start of the file and can get to reading immediately
// if our codec is splittable, we can (tentatively) say that
// we too are splittable.
//
// if we get a bgzfenhancedcodec, the codec might not actually
// be splittable. however, if we get a non-splittable gz file,
// several things happen:
//
// 1. the input format will detect this, and will not split the
// file
// 2. the bgzfenhancedcodec will check the underlying data type
// (BGZF vs GZIP) at input stream creation time, and will
// apply the appropriate codec.
//
// if we get an unsplittable codec, really all that we do differently
// is skip the positioning check, since we know that we're at the
// start of the file and can get to reading immediately
isSplittable = (codec instanceof SplittableCompressionCodec);

if (codec == null) {
Expand Down Expand Up @@ -219,15 +219,19 @@ protected final int positionAtFirstRecord(final FSDataInputStream stream,

LineReader reader;
if (codec == null) {
// Advance to the start of the first record that ends with /1
// We use a temporary LineReader to read lines until we find the
// position of the right one. We then seek the file to that position.
stream.seek(start);
// Advance to the start of the first record that ends with /1
// We use a temporary LineReader to read lines until we find the
// position of the right one. We then seek the file to that position.
stream.seek(start);
reader = new LineReader(stream);
} else {
// see above note about
// SplittableCompressionCodec.createInputStream needing the stream
// to be at offset 0
// Unlike the codec == null case, we don't seek before creating the
// reader, SplittableCompressionCodec.createInputStream places the
// stream at the start of the first compression block after our
// split start
//
// as noted above, we need to be at pos 0 in the stream before
// calling this
reader = new LineReader(((SplittableCompressionCodec)codec).createInputStream(stream,
null,
start,
Expand All @@ -242,43 +246,43 @@ protected final int positionAtFirstRecord(final FSDataInputStream stream,
if (bytesRead > 0 && !checkBuffer(bufferLength, buffer)) {
start += bytesRead;
} else {
// line starts with @. Read two more and verify that it starts
// with a +:
//
// @<readname>
// <sequence>
// +[readname]
//
// if the second line we read starts with a @, we know that
// we've read:
//
// <qualities> <-- @ is a valid ASCII phred encoding
// @<readname>
//
// and thus, the second read is the delimiter and we can break
// with a +:
//
// @<readname>
// <sequence>
// +[readname]
//
// if the second line we read starts with a @, we know that
// we've read:
//
// <qualities> <-- @ is a valid ASCII phred encoding
// @<readname>
//
// and thus, the second read is the delimiter and we can break
long trackForwardPosition = start + bytesRead;

bytesRead = reader.readLine(buffer, (int) Math.min(MAX_LINE_LENGTH, end - start));
if (buffer.getBytes()[0] == '@') {
start = trackForwardPosition;
break;
} else {
trackForwardPosition += bytesRead;
}
if (buffer.getBytes()[0] == '@') {
start = trackForwardPosition;
break;
} else {
trackForwardPosition += bytesRead;
}

bytesRead = reader.readLine(buffer, (int) Math.min(MAX_LINE_LENGTH, end - start));
trackForwardPosition += bytesRead;
trackForwardPosition += bytesRead;
if (bytesRead > 0 && buffer.getLength() > 0 && buffer.getBytes()[0] == '+') {
break; // all good!
} else {
start = trackForwardPosition;
start = trackForwardPosition;
}
}
} while (bytesRead > 0);

pos = start;
start = originalStart;
start = originalStart;
stream.seek(start);
return (int)(pos - originalStart);
}
Expand Down

0 comments on commit c8a2202

Please sign in to comment.