Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added number of rows read in CSV inference #765

Merged
merged 1 commit into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/csv_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ fn read_path(path: &str, projection: Option<&[usize]>) -> Result<Chunk<Arc<dyn A

// Infers the fields using the default inferer. The inferer is just a function that maps bytes
// to a `DataType`.
let fields = read::infer_schema(&mut reader, None, true, &read::infer)?;
let (fields, _) = read::infer_schema(&mut reader, None, true, &read::infer)?;

// allocate space to read from CSV to. The size of this vec denotes how many rows are read.
let mut rows = vec![read::ByteRecord::default(); 100];
Expand Down
2 changes: 1 addition & 1 deletion examples/csv_read_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async fn main() -> Result<()> {

let mut reader = AsyncReaderBuilder::new().create_reader(file);

let fields = infer_schema(&mut reader, None, true, &infer).await?;
let (fields, _) = infer_schema(&mut reader, None, true, &infer).await?;

let mut rows = vec![ByteRecord::default(); 100];
let rows_read = read_rows(&mut reader, 0, &mut rows).await?;
Expand Down
3 changes: 2 additions & 1 deletion examples/csv_read_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ fn parallel_read(path: &str) -> Result<Vec<Chunk<Arc<dyn Array>>>> {
let (tx, rx) = unbounded();

let mut reader = read::ReaderBuilder::new().from_path(path)?;
let fields = read::infer_schema(&mut reader, Some(batch_size * 10), has_header, &read::infer)?;
let (fields, _) =
read::infer_schema(&mut reader, Some(batch_size * 10), has_header, &read::infer)?;
let fields = Arc::new(fields);

let start = SystemTime::now();
Expand Down
5 changes: 3 additions & 2 deletions src/io/csv/read/infer_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ use super::super::utils::merge_schema;
use super::{ByteRecord, Reader};

/// Infers the [`Field`]s of a CSV file by reading through the first n records up to `max_rows`.
/// Also returns the number of rows used to infer.
/// Seeks back to the begining of the file _after_ the header
pub fn infer_schema<R: Read + Seek, F: Fn(&[u8]) -> DataType>(
reader: &mut Reader<R>,
max_rows: Option<usize>,
has_header: bool,
infer: &F,
) -> Result<Vec<Field>> {
) -> Result<(Vec<Field>, usize)> {
// get or create header names
// when has_header is false, creates default column names with column_ prefix
let headers: Vec<String> = if has_header {
Expand Down Expand Up @@ -57,5 +58,5 @@ pub fn infer_schema<R: Read + Seek, F: Fn(&[u8]) -> DataType>(
// return the reader seek back to the start
reader.seek(position)?;

Ok(fields)
Ok((fields, records_count))
}
4 changes: 2 additions & 2 deletions src/io/csv/read_async/infer_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub async fn infer_schema<R, F>(
max_rows: Option<usize>,
has_header: bool,
infer: &F,
) -> Result<Vec<Field>>
) -> Result<(Vec<Field>, usize)>
where
R: AsyncRead + AsyncSeek + Unpin + Send + Sync,
F: Fn(&[u8]) -> DataType,
Expand Down Expand Up @@ -65,5 +65,5 @@ where
// return the reader seek back to the start
reader.seek(position).await?;

Ok(fields)
Ok((fields, records_count))
}
6 changes: 3 additions & 3 deletions tests/it/io/csv/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn read() -> Result<()> {
"Aberdeen, Aberdeen City, UK",57.149651,-2.099075"#;
let mut reader = ReaderBuilder::new().from_reader(Cursor::new(data));

let fields = infer_schema(&mut reader, None, true, &infer)?;
let (fields, _) = infer_schema(&mut reader, None, true, &infer)?;

let mut rows = vec![ByteRecord::default(); 100];
let rows_read = read_rows(&mut reader, 0, &mut rows)?;
Expand Down Expand Up @@ -58,7 +58,7 @@ fn infer_basics() -> Result<()> {
let file = Cursor::new("1,2,3\na,b,c\na,,c");
let mut reader = ReaderBuilder::new().from_reader(file);

let fields = infer_schema(&mut reader, Some(10), false, &infer)?;
let (fields, _) = infer_schema(&mut reader, Some(10), false, &infer)?;

assert_eq!(
fields,
Expand All @@ -76,7 +76,7 @@ fn infer_ints() -> Result<()> {
let file = Cursor::new("1,2,3\n1,a,5\n2,,4");
let mut reader = ReaderBuilder::new().from_reader(file);

let fields = infer_schema(&mut reader, Some(10), false, &infer)?;
let (fields, _) = infer_schema(&mut reader, Some(10), false, &infer)?;

assert_eq!(
fields,
Expand Down
2 changes: 1 addition & 1 deletion tests/it/io/csv/read_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn read() -> Result<()> {
"Aberdeen, Aberdeen City, UK",57.149651,-2.099075"#;
let mut reader = AsyncReaderBuilder::new().create_reader(Cursor::new(data.as_bytes()));

let fields = infer_schema(&mut reader, None, true, &infer).await?;
let (fields, _) = infer_schema(&mut reader, None, true, &infer).await?;

let mut rows = vec![ByteRecord::default(); 100];
let rows_read = read_rows(&mut reader, 0, &mut rows).await?;
Expand Down