Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Change IPC FileReader to own the underlying reader (#518)
Browse files Browse the repository at this point in the history
  • Loading branch information
blakesmith authored Oct 10, 2021
1 parent e77d4e1 commit 6462c83
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 17 deletions.
8 changes: 4 additions & 4 deletions examples/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ fn write_ipc<W: Write + Seek>(writer: W, array: impl Array + 'static) -> Result<
Ok(writer.into_inner())
}

fn read_ipc(reader: &[u8]) -> Result<RecordBatch> {
let mut reader = Cursor::new(reader);
let metadata = read::read_file_metadata(&mut reader)?;
let mut reader = read::FileReader::new(&mut reader, metadata, None);
fn read_ipc(buf: &[u8]) -> Result<RecordBatch> {
let mut cursor = Cursor::new(buf);
let metadata = read::read_file_metadata(&mut cursor)?;
let mut reader = read::FileReader::new(cursor, metadata, None);
reader.next().unwrap()
}
2 changes: 1 addition & 1 deletion examples/ipc_file_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ fn read_batches(path: &str) -> Result<Vec<RecordBatch>> {
let metadata = read_file_metadata(&mut file)?;

// Simplest way: use the reader, an iterator over batches.
let reader = FileReader::new(&mut file, metadata, None);
let reader = FileReader::new(file, metadata, None);

reader.collect()
}
Expand Down
2 changes: 1 addition & 1 deletion integration-testing/src/bin/arrow-file-to-stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn main() -> Result<()> {
let filename = &args[1];
let mut f = File::open(filename)?;
let metadata = read::read_file_metadata(&mut f)?;
let mut reader = read::FileReader::new(&mut f, metadata, None);
let mut reader = read::FileReader::new(f, metadata, None);
let schema = reader.schema();

let mut writer = StreamWriter::try_new(std::io::stdout(), schema)?;
Expand Down
4 changes: 2 additions & 2 deletions integration-testing/src/bin/arrow-json-integration-test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()>

let mut arrow_file = File::open(arrow_name)?;
let metadata = read::read_file_metadata(&mut arrow_file)?;
let reader = read::FileReader::new(&mut arrow_file, metadata, None);
let reader = read::FileReader::new(arrow_file, metadata, None);

let mut fields: Vec<ArrowJsonField> = vec![];
for f in reader.schema().fields() {
Expand Down Expand Up @@ -137,7 +137,7 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> {
// open Arrow file
let mut arrow_file = File::open(arrow_name)?;
let metadata = read::read_file_metadata(&mut arrow_file)?;
let reader = read::FileReader::new(&mut arrow_file, metadata, None);
let reader = read::FileReader::new(arrow_file, metadata, None);
let arrow_schema = reader.schema().as_ref().to_owned();

// compare schemas
Expand Down
17 changes: 11 additions & 6 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ impl FileMetadata {
}

/// Arrow File reader
pub struct FileReader<'a, R: Read + Seek> {
reader: &'a mut R,
pub struct FileReader<R: Read + Seek> {
reader: R,
metadata: FileMetadata,
current_block: usize,
projection: Option<(Vec<usize>, Arc<Schema>)>,
Expand Down Expand Up @@ -231,11 +231,11 @@ pub fn read_batch<R: Read + Seek>(
}
}

impl<'a, R: Read + Seek> FileReader<'a, R> {
impl<R: Read + Seek> FileReader<R> {
/// Creates a new [`FileReader`]. Use `projection` to only take certain columns.
/// # Panic
/// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid)
pub fn new(reader: &'a mut R, metadata: FileMetadata, projection: Option<Vec<usize>>) -> Self {
pub fn new(reader: R, metadata: FileMetadata, projection: Option<Vec<usize>>) -> Self {
if let Some(projection) = projection.as_ref() {
let _ = projection.iter().fold(0, |mut acc, v| {
assert!(
Expand Down Expand Up @@ -270,9 +270,14 @@ impl<'a, R: Read + Seek> FileReader<'a, R> {
.map(|x| &x.1)
.unwrap_or(&self.metadata.schema)
}

/// Consumes this FileReader, returning the underlying reader
pub fn into_inner(self) -> R {
self.reader
}
}

impl<'a, R: Read + Seek> Iterator for FileReader<'a, R> {
impl<R: Read + Seek> Iterator for FileReader<R> {
type Item = Result<RecordBatch>;

fn next(&mut self) -> Option<Self::Item> {
Expand All @@ -295,7 +300,7 @@ impl<'a, R: Read + Seek> Iterator for FileReader<'a, R> {
}
}

impl<'a, R: Read + Seek> RecordBatchReader for FileReader<'a, R> {
impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
fn schema(&self) -> &Schema {
self.schema().as_ref()
}
Expand Down
2 changes: 1 addition & 1 deletion tests/it/io/ipc/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> {
let (schema, batches) = read_gzip_json(version, file_name)?;

let metadata = read_file_metadata(&mut file)?;
let reader = FileReader::new(&mut file, metadata, None);
let reader = FileReader::new(file, metadata, None);

assert_eq!(&schema, reader.schema().as_ref());

Expand Down
4 changes: 2 additions & 2 deletions tests/it/io/ipc/write/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ fn round_trip(batch: RecordBatch) -> Result<()> {
let metadata = read_file_metadata(&mut reader)?;
let schema = metadata.schema().clone();

let reader = FileReader::new(&mut reader, metadata, None);
let reader = FileReader::new(reader, metadata, None);

// read expected JSON output
let (expected_schema, expected_batches) = (batch.schema().clone(), vec![batch]);
Expand Down Expand Up @@ -55,7 +55,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> {
let metadata = read_file_metadata(&mut reader)?;
let schema = metadata.schema().clone();

let reader = FileReader::new(&mut reader, metadata, None);
let reader = FileReader::new(reader, metadata, None);

// read expected JSON output
let (expected_schema, expected_batches) = read_gzip_json(version, file_name)?;
Expand Down

0 comments on commit 6462c83

Please sign in to comment.