Skip to content

Commit

Permalink
Remove unused schema inference logic and options
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Aug 8, 2023
1 parent 5f1a9bd commit 9f13bd7
Show file tree
Hide file tree
Showing 9 changed files with 22 additions and 161 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 1 addition & 8 deletions daft/logical/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@
from typing import TYPE_CHECKING, Iterator

from daft.daft import PyField as _PyField
from daft.daft import (
PyParquetSchemaInferenceOptions as _PyParquetSchemaInferenceOptions,
)
from daft.daft import PySchema as _PySchema
from daft.daft import read_parquet_schema as _read_parquet_schema
from daft.datatype import DataType, TimeUnit
from daft.datatype import DataType

if sys.version_info < (3, 8):
pass
Expand Down Expand Up @@ -132,14 +129,10 @@ def from_parquet(
cls,
path: str,
io_config: IOConfig | None = None,
schema_infer_int96_timestamps_time_unit: TimeUnit = TimeUnit.ns(),
) -> Schema:
return Schema._from_pyschema(
_read_parquet_schema(
uri=path,
io_config=io_config,
schema_inference_options=_PyParquetSchemaInferenceOptions(
int96_timestamps_time_unit=schema_infer_int96_timestamps_time_unit._timeunit,
),
)
)
13 changes: 0 additions & 13 deletions daft/runners/partitioning.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import pyarrow as pa

from daft.datatype import TimeUnit
from daft.logical.schema import Schema
from daft.table import Table

Expand Down Expand Up @@ -37,18 +36,6 @@ class TableReadOptions:
column_names: list[str] | None = None


@dataclass(frozen=True)
class TableParseParquetOptions:
"""Options for parsing Parquet
Args:
schema_infer_int96_timestamps_time_unit: Precision to use when reading int96 timestamps, defaults to "ns" which
means that Parquet INT96 timestamps outside the range of years 1678-2262 will overflow.
"""

schema_infer_int96_timestamps_time_unit: TimeUnit = TimeUnit.ns()


@dataclass(frozen=True)
class TableParseCSVOptions:
"""Options for parsing CSVs
Expand Down
9 changes: 1 addition & 8 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@
from loguru import logger

from daft.arrow_utils import ensure_table
from daft.daft import (
PyParquetSchemaInferenceOptions as _PyParquetSchemaInferenceOptions,
)
from daft.daft import PyTable as _PyTable
from daft.daft import _read_parquet
from daft.daft import read_parquet_statistics as _read_parquet_statistics
from daft.datatype import DataType, TimeUnit
from daft.datatype import DataType
from daft.expressions import Expression, ExpressionsProjection
from daft.logical.schema import Schema
from daft.series import Series
Expand Down Expand Up @@ -362,7 +359,6 @@ def read_parquet(
num_rows: int | None = None,
io_config: IOConfig | None = None,
schema: Schema | None = None,
schema_infer_int96_timestamps_time_unit: TimeUnit = TimeUnit.ns(),
) -> Table:
return Table._from_pytable(
_read_parquet(
Expand All @@ -372,9 +368,6 @@ def read_parquet(
num_rows=num_rows,
io_config=io_config,
schema=schema._schema if schema is not None else None,
schema_inference_options=_PyParquetSchemaInferenceOptions(
int96_timestamps_time_unit=schema_infer_int96_timestamps_time_unit._timeunit,
),
)
)

Expand Down
17 changes: 3 additions & 14 deletions daft/table/table_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@
from daft.expressions import ExpressionsProjection
from daft.filesystem import _resolve_paths_and_filesystem
from daft.logical.schema import Schema
from daft.runners.partitioning import (
TableParseCSVOptions,
TableParseParquetOptions,
TableReadOptions,
)
from daft.runners.partitioning import TableParseCSVOptions, TableReadOptions
from daft.table import Table

if TYPE_CHECKING:
Expand Down Expand Up @@ -101,7 +97,6 @@ def read_parquet(
schema: Schema,
fs: fsspec.AbstractFileSystem | None = None,
read_options: TableReadOptions = TableReadOptions(),
parquet_options: TableParseParquetOptions = TableParseParquetOptions(),
io_config: IOConfig | None = None,
use_native_downloader: bool = False,
) -> Table:
Expand All @@ -124,7 +119,6 @@ def read_parquet(
num_rows=read_options.num_rows,
io_config=io_config,
schema=schema,
schema_infer_int96_timestamps_time_unit=parquet_options.schema_infer_int96_timestamps_time_unit,
)
return tbl

Expand All @@ -139,15 +133,11 @@ def read_parquet(

# If no rows required, we manually construct an empty table with the right schema
if read_options.num_rows == 0:
pqf = papq.ParquetFile(
f, coerce_int96_timestamp_unit=str(parquet_options.schema_infer_int96_timestamps_time_unit)
)
pqf = papq.ParquetFile(f)
arrow_schema = pqf.metadata.schema.to_arrow_schema()
table = pa.Table.from_arrays([pa.array([], type=field.type) for field in arrow_schema], schema=arrow_schema)
elif read_options.num_rows is not None:
pqf = papq.ParquetFile(
f, coerce_int96_timestamp_unit=str(parquet_options.schema_infer_int96_timestamps_time_unit)
)
pqf = papq.ParquetFile(f)
# Only read the required row groups.
rows_needed = read_options.num_rows
for i in range(pqf.metadata.num_row_groups):
Expand All @@ -163,7 +153,6 @@ def read_parquet(
table = papq.read_table(
f,
columns=read_options.column_names,
coerce_int96_timestamp_unit=str(parquet_options.schema_infer_int96_timestamps_time_unit),
)

return _cast_table_to_schema(Table.from_arrow(table), read_options=read_options, schema=schema)
Expand Down
13 changes: 1 addition & 12 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{collections::HashSet, sync::Arc};

use arrow2::io::parquet::read::infer_schema;
use common_error::DaftResult;
use daft_core::{datatypes::TimeUnit, utils::arrow::cast_array_for_daft_if_needed, Series};
use daft_core::{utils::arrow::cast_array_for_daft_if_needed, Series};
use daft_io::IOClient;
use daft_table::Table;
use futures::{future::try_join_all, StreamExt};
Expand All @@ -28,7 +28,6 @@ pub(crate) struct ParquetReaderBuilder {
row_start_offset: usize,
num_rows: usize,
user_provided_arrow_schema: Option<arrow2::datatypes::Schema>,
schema_infer_int96_timestamps_time_unit: TimeUnit,
}
use parquet2::read::decompress;

Expand Down Expand Up @@ -103,7 +102,6 @@ impl ParquetReaderBuilder {
row_start_offset: 0,
num_rows,
user_provided_arrow_schema: None,
schema_infer_int96_timestamps_time_unit: TimeUnit::Nanoseconds,
})
}

Expand Down Expand Up @@ -150,15 +148,6 @@ impl ParquetReaderBuilder {
Ok(self)
}

pub fn set_schema_infer_int96_timestamps_time_unit(
mut self,
schema_infer_int96_timestamps_time_unit: &TimeUnit,
) -> Self {
self.schema_infer_int96_timestamps_time_unit =
schema_infer_int96_timestamps_time_unit.to_owned();
self
}

pub fn set_user_provided_arrow_schema(
mut self,
schema: Option<arrow2::datatypes::Schema>,
Expand Down
59 changes: 10 additions & 49 deletions src/daft-parquet/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,12 @@ use pyo3::prelude::*;
pub mod pylib {
use std::sync::Arc;

use daft_core::datatypes::TimeUnit;
use daft_core::python::datatype::PyTimeUnit;
use daft_core::python::{schema::PySchema, PySeries};
use daft_io::{get_io_client, python::IOConfig};
use daft_table::python::PyTable;
use pyo3::{pyclass, pyfunction, pymethods, PyResult, Python};
use pyo3::{pyfunction, PyResult, Python};

use crate::read::{ParquetSchemaInferenceOptions, ParquetSchemaOptions};

const DEFAULT_SCHEMA_OPTIONS: ParquetSchemaOptions =
ParquetSchemaOptions::InferenceOptions(ParquetSchemaInferenceOptions {
int96_timestamps_time_unit: TimeUnit::Nanoseconds,
});

/// Python wrapper for ParquetSchemaInferenceOptions
///
/// Represents the options for inferring Schemas from Parquet files
#[pyclass]
#[derive(Clone)]
pub struct PyParquetSchemaInferenceOptions {
int96_timestamps_time_unit: PyTimeUnit,
}

#[pymethods]
impl PyParquetSchemaInferenceOptions {
#[new]
fn new(int96_timestamps_time_unit: PyTimeUnit) -> Self {
PyParquetSchemaInferenceOptions {
int96_timestamps_time_unit,
}
}
}
use crate::read::ParquetSchemaOptions;

#[allow(clippy::too_many_arguments)]
#[pyfunction]
Expand All @@ -46,18 +20,12 @@ pub mod pylib {
num_rows: Option<usize>,
io_config: Option<IOConfig>,
schema: Option<PySchema>,
schema_inference_options: Option<PyParquetSchemaInferenceOptions>,
) -> PyResult<PyTable> {
py.allow_threads(|| {
let io_client = get_io_client(io_config.unwrap_or_default().config.into())?;
let schema_options = match (schema, schema_inference_options) {
(None, None) => DEFAULT_SCHEMA_OPTIONS,
(Some(schema), _) => ParquetSchemaOptions::UserProvidedSchema(schema.schema),
(_, Some(opts)) => {
ParquetSchemaOptions::InferenceOptions(ParquetSchemaInferenceOptions {
int96_timestamps_time_unit: opts.int96_timestamps_time_unit.timeunit,
})
}
let schema_options = match schema {
None => ParquetSchemaOptions::InferenceOptions,
Some(schema) => ParquetSchemaOptions::UserProvidedSchema(schema.schema),
};
Ok(crate::read::read_parquet(
uri,
Expand All @@ -77,23 +45,17 @@ pub mod pylib {
uri: &str,
io_config: Option<IOConfig>,
schema: Option<PySchema>,
schema_inference_options: Option<PyParquetSchemaInferenceOptions>,
) -> PyResult<PySchema> {
py.allow_threads(|| {
let schema_options = match (schema, schema_inference_options) {
(None, None) => DEFAULT_SCHEMA_OPTIONS,
(Some(schema), _) => ParquetSchemaOptions::UserProvidedSchema(schema.schema),
(_, Some(opts)) => {
ParquetSchemaOptions::InferenceOptions(ParquetSchemaInferenceOptions {
int96_timestamps_time_unit: opts.int96_timestamps_time_unit.timeunit,
})
}
let schema_options = match schema {
None => ParquetSchemaOptions::InferenceOptions,
Some(schema) => ParquetSchemaOptions::UserProvidedSchema(schema.schema),
};
match schema_options {
ParquetSchemaOptions::UserProvidedSchema(s) => Ok(PySchema { schema: s }),
ParquetSchemaOptions::InferenceOptions(opts) => {
ParquetSchemaOptions::InferenceOptions => {
let io_client = get_io_client(io_config.unwrap_or_default().config.into())?;
Ok(Arc::new(crate::read::read_parquet_schema(uri, io_client, &opts)?).into())
Ok(Arc::new(crate::read::read_parquet_schema(uri, io_client)?).into())
}
}
})
Expand All @@ -115,6 +77,5 @@ pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> {
parent.add_wrapped(wrap_pyfunction!(pylib::_read_parquet))?;
parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_schema))?;
parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_statistics))?;
parent.add_class::<pylib::PyParquetSchemaInferenceOptions>()?;
Ok(())
}
22 changes: 4 additions & 18 deletions src/daft-parquet/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::sync::Arc;

use common_error::DaftResult;

use daft_core::datatypes::TimeUnit;
use daft_core::{
datatypes::{Int32Array, UInt64Array, Utf8Array},
schema::Schema,
Expand All @@ -15,13 +14,9 @@ use snafu::ResultExt;

use crate::{file::ParquetReaderBuilder, JoinSnafu};

pub struct ParquetSchemaInferenceOptions {
pub int96_timestamps_time_unit: TimeUnit,
}

pub enum ParquetSchemaOptions {
UserProvidedSchema(Arc<Schema>),
InferenceOptions(ParquetSchemaInferenceOptions),
InferenceOptions,
}

pub fn read_parquet(
Expand Down Expand Up @@ -49,9 +44,8 @@ pub fn read_parquet(
ParquetSchemaOptions::UserProvidedSchema(s) => {
builder.set_user_provided_arrow_schema(Some(s.to_arrow()?))
}
ParquetSchemaOptions::InferenceOptions(opts) => {
builder.set_schema_infer_int96_timestamps_time_unit(&opts.int96_timestamps_time_unit)
}
// TODO: add builder options to customize schema inference
ParquetSchemaOptions::InferenceOptions => builder,
};

let metadata_num_rows = builder.metadata().num_rows;
Expand Down Expand Up @@ -102,20 +96,12 @@ pub fn read_parquet(
Ok(table)
}

pub fn read_parquet_schema(
uri: &str,
io_client: Arc<IOClient>,
schema_inference_options: &ParquetSchemaInferenceOptions,
) -> DaftResult<Schema> {
pub fn read_parquet_schema(uri: &str, io_client: Arc<IOClient>) -> DaftResult<Schema> {
let runtime_handle = get_runtime(true)?;
let _rt_guard = runtime_handle.enter();
let builder = runtime_handle
.block_on(async { ParquetReaderBuilder::from_uri(uri, io_client.clone()).await })?;

let builder = builder.set_schema_infer_int96_timestamps_time_unit(
&schema_inference_options.int96_timestamps_time_unit,
);

Schema::try_from(builder.build()?.arrow_schema())
}

Expand Down
Loading

0 comments on commit 9f13bd7

Please sign in to comment.