Skip to content

Commit

Permalink
ARROW-13086: [Python] Expose Parquet ArrowReaderProperties::coerce_in…
Browse files Browse the repository at this point in the history
…t96_timestamp_unit_

Closes apache#10575 from isichei/ARROW-13086

Lead-authored-by: Karik Isichei <[email protected]>
Co-authored-by: Joris Van den Bossche <[email protected]>
Co-authored-by: Krisztián Szűcs <[email protected]>
Signed-off-by: Krisztián Szűcs <[email protected]>
  • Loading branch information
3 people authored and pull[bot] committed Jan 14, 2022
1 parent a37836e commit 8a9e875
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 41 deletions.
6 changes: 5 additions & 1 deletion cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ parquet::ArrowReaderProperties MakeArrowReaderProperties(
auto column_index = metadata.schema()->ColumnIndex(name);
properties.set_read_dictionary(column_index, true);
}
properties.set_coerce_int96_timestamp_unit(
format.reader_options.coerce_int96_timestamp_unit);
return properties;
}

Expand Down Expand Up @@ -289,7 +291,9 @@ bool ParquetFileFormat::Equals(const FileFormat& other) const {
checked_cast<const ParquetFileFormat&>(other).reader_options;

// FIXME implement comparison for decryption options
return reader_options.dict_columns == other_reader_options.dict_columns;
return (reader_options.dict_columns == other_reader_options.dict_columns &&
reader_options.coerce_int96_timestamp_unit ==
other_reader_options.coerce_int96_timestamp_unit);
}

ParquetFileFormat::ParquetFileFormat(const parquet::ReaderProperties& reader_properties) {
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
///
/// @{
std::unordered_set<std::string> dict_columns;
arrow::TimeUnit::type coerce_int96_timestamp_unit = arrow::TimeUnit::NANO;
/// @}
} reader_options;

Expand Down
69 changes: 61 additions & 8 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1397,17 +1397,56 @@ cdef class ParquetReadOptions(_Weakrefable):
dictionary_columns : list of string, default None
Names of columns which should be dictionary encoded as
they are read.
coerce_int96_timestamp_unit : str, default None.
Cast timestamps that are stored in INT96 format to a particular
resolution (e.g. 'ms'). Setting to None is equivalent to 'ns'
and therefore INT96 timestamps will be infered as timestamps
in nanoseconds.
"""

cdef public:
set dictionary_columns
TimeUnit _coerce_int96_timestamp_unit

# Also see _PARQUET_READ_OPTIONS
def __init__(self, dictionary_columns=None):
def __init__(self, dictionary_columns=None,
coerce_int96_timestamp_unit=None):
self.dictionary_columns = set(dictionary_columns or set())
self.coerce_int96_timestamp_unit = coerce_int96_timestamp_unit

@property
def coerce_int96_timestamp_unit(self):
unit = self._coerce_int96_timestamp_unit
if unit == TimeUnit_SECOND:
return "s"
elif unit == TimeUnit_MILLI:
return "ms"
elif unit == TimeUnit_MICRO:
return "us"
elif unit == TimeUnit_NANO:
return "ns"
else:
return None

@coerce_int96_timestamp_unit.setter
def coerce_int96_timestamp_unit(self, unit):
if unit is None or unit == "ns":
self._coerce_int96_timestamp_unit = TimeUnit_NANO
elif unit == "us":
self._coerce_int96_timestamp_unit = TimeUnit_MICRO
elif unit == "ms":
self._coerce_int96_timestamp_unit = TimeUnit_MILLI
elif unit == "s":
self._coerce_int96_timestamp_unit = TimeUnit_SECOND
else:
raise ValueError(
f"Invalid value for coerce_int96_timestamp_unit: {unit}"
)

def equals(self, ParquetReadOptions other):
return self.dictionary_columns == other.dictionary_columns
return (self.dictionary_columns == other.dictionary_columns and
self.coerce_int96_timestamp_unit ==
other.coerce_int96_timestamp_unit)

def __eq__(self, other):
try:
Expand All @@ -1416,8 +1455,11 @@ cdef class ParquetReadOptions(_Weakrefable):
return False

def __repr__(self):
return (f"<ParquetReadOptions"
f" dictionary_columns={self.dictionary_columns}>")
return (
f"<ParquetReadOptions"
f" dictionary_columns={self.dictionary_columns}"
f" coerce_int96_timestamp_unit={self.coerce_int96_timestamp_unit}>"
)


cdef class ParquetFileWriteOptions(FileWriteOptions):
Expand Down Expand Up @@ -1500,7 +1542,9 @@ cdef class ParquetFileWriteOptions(FileWriteOptions):
self._set_arrow_properties()


cdef set _PARQUET_READ_OPTIONS = {'dictionary_columns'}
cdef set _PARQUET_READ_OPTIONS = {
'dictionary_columns', 'coerce_int96_timestamp_unit'
}


cdef class ParquetFileFormat(FileFormat):
Expand Down Expand Up @@ -1565,6 +1609,8 @@ cdef class ParquetFileFormat(FileFormat):
if read_options.dictionary_columns is not None:
for column in read_options.dictionary_columns:
options.dict_columns.insert(tobytes(column))
options.coerce_int96_timestamp_unit = \
read_options._coerce_int96_timestamp_unit

self.init(<shared_ptr[CFileFormat]> wrapped)
self.default_fragment_scan_options = default_fragment_scan_options
Expand All @@ -1577,10 +1623,15 @@ cdef class ParquetFileFormat(FileFormat):
def read_options(self):
cdef CParquetFileFormatReaderOptions* options
options = &self.parquet_format.reader_options
return ParquetReadOptions(
parquet_read_options = ParquetReadOptions(
dictionary_columns={frombytes(col)
for col in options.dict_columns},
)
# Read options getter/setter works with strings so setting
# the private property which uses the C Type
parquet_read_options._coerce_int96_timestamp_unit = \
options.coerce_int96_timestamp_unit
return parquet_read_options

def make_write_options(self, **kwargs):
opts = FileFormat.make_write_options(self)
Expand Down Expand Up @@ -1725,12 +1776,14 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
self.buffer_size == other.buffer_size and
self.pre_buffer == other.pre_buffer and
self.enable_parallel_column_conversion ==
other.enable_parallel_column_conversion)
other.enable_parallel_column_conversion
)

def __reduce__(self):
return ParquetFragmentScanOptions, (
self.use_buffered_stream, self.buffer_size, self.pre_buffer,
self.enable_parallel_column_conversion)
self.enable_parallel_column_conversion
)


cdef class IpcFileWriteOptions(FileWriteOptions):
Expand Down
2 changes: 2 additions & 0 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
int64_t batch_size()
void set_pre_buffer(c_bool pre_buffer)
c_bool pre_buffer() const
void set_coerce_int96_timestamp_unit(TimeUnit unit)
TimeUnit coerce_int96_timestamp_unit() const

ArrowReaderProperties default_arrow_reader_properties()

Expand Down
18 changes: 17 additions & 1 deletion python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,8 @@ cdef class ParquetReader(_Weakrefable):

def open(self, object source not None, bint use_memory_map=True,
read_dictionary=None, FileMetaData metadata=None,
int buffer_size=0, bint pre_buffer=False):
int buffer_size=0, bint pre_buffer=False,
coerce_int96_timestamp_unit=None):
cdef:
shared_ptr[CRandomAccessFile] rd_handle
shared_ptr[CFileMetaData] c_metadata
Expand All @@ -951,6 +952,21 @@ cdef class ParquetReader(_Weakrefable):

arrow_props.set_pre_buffer(pre_buffer)

if coerce_int96_timestamp_unit is None:
# use the default defined in default_arrow_reader_properties()
pass
elif coerce_int96_timestamp_unit == "ns":
arrow_props.set_coerce_int96_timestamp_unit(TimeUnit_NANO)
elif coerce_int96_timestamp_unit == "us":
arrow_props.set_coerce_int96_timestamp_unit(TimeUnit_MICRO)
elif coerce_int96_timestamp_unit == "ms":
arrow_props.set_coerce_int96_timestamp_unit(TimeUnit_MILLI)
elif coerce_int96_timestamp_unit == "s":
arrow_props.set_coerce_int96_timestamp_unit(TimeUnit_SECOND)
else:
raise ValueError(f"Invalid value for coerce_int96_timestamp_unit: "
f"{coerce_int96_timestamp_unit}")

self.source = source

get_reader(source, use_memory_map, &rd_handle)
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
cdef cppclass CParquetFileFormatReaderOptions \
"arrow::dataset::ParquetFileFormat::ReaderOptions":
unordered_set[c_string] dict_columns
TimeUnit coerce_int96_timestamp_unit

cdef cppclass CParquetFileFormat "arrow::dataset::ParquetFileFormat"(
CFileFormat):
Expand Down
92 changes: 61 additions & 31 deletions python/pyarrow/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,15 +214,23 @@ class ParquetFile:
Coalesce and issue file reads in parallel to improve performance on
high-latency filesystems (e.g. S3). If True, Arrow will use a
background I/O thread pool.
coerce_int96_timestamp_unit : str, default None.
Cast timestamps that are stored in INT96 format to a particular
resolution (e.g. 'ms'). Setting to None is equivalent to 'ns'
and therefore INT96 timestamps will be infered as timestamps
in nanoseconds.
"""

def __init__(self, source, metadata=None, common_metadata=None,
read_dictionary=None, memory_map=False, buffer_size=0,
pre_buffer=False):
pre_buffer=False, coerce_int96_timestamp_unit=None):
self.reader = ParquetReader()
self.reader.open(source, use_memory_map=memory_map,
buffer_size=buffer_size, pre_buffer=pre_buffer,
read_dictionary=read_dictionary, metadata=metadata)
self.reader.open(
source, use_memory_map=memory_map,
buffer_size=buffer_size, pre_buffer=pre_buffer,
read_dictionary=read_dictionary, metadata=metadata,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
)
self.common_metadata = common_metadata
self._nested_paths_by_prefix = self._build_nested_paths()

Expand Down Expand Up @@ -1254,13 +1262,18 @@ class ParquetDataset:
use_legacy_dataset=False. If using a filesystem layer that itself
performs readahead (e.g. fsspec's S3FS), disable readahead for best
results.
coerce_int96_timestamp_unit : str, default None.
Cast timestamps that are stored in INT96 format to a particular resolution
(e.g. 'ms'). Setting to None is equivalent to 'ns' and therefore INT96
timestamps will be infered as timestamps in nanoseconds.
""".format(_read_docstring_common, _DNF_filter_doc)

def __new__(cls, path_or_paths=None, filesystem=None, schema=None,
metadata=None, split_row_groups=False, validate_schema=True,
filters=None, metadata_nthreads=1, read_dictionary=None,
memory_map=False, buffer_size=0, partitioning="hive",
use_legacy_dataset=None, pre_buffer=True):
use_legacy_dataset=None, pre_buffer=True,
coerce_int96_timestamp_unit=None):
if use_legacy_dataset is None:
# if a new filesystem is passed -> default to new implementation
if isinstance(filesystem, FileSystem):
Expand All @@ -1270,26 +1283,30 @@ def __new__(cls, path_or_paths=None, filesystem=None, schema=None,
use_legacy_dataset = True

if not use_legacy_dataset:
return _ParquetDatasetV2(path_or_paths, filesystem=filesystem,
filters=filters,
partitioning=partitioning,
read_dictionary=read_dictionary,
memory_map=memory_map,
buffer_size=buffer_size,
pre_buffer=pre_buffer,
# unsupported keywords
schema=schema, metadata=metadata,
split_row_groups=split_row_groups,
validate_schema=validate_schema,
metadata_nthreads=metadata_nthreads)
return _ParquetDatasetV2(
path_or_paths, filesystem=filesystem,
filters=filters,
partitioning=partitioning,
read_dictionary=read_dictionary,
memory_map=memory_map,
buffer_size=buffer_size,
pre_buffer=pre_buffer,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
# unsupported keywords
schema=schema, metadata=metadata,
split_row_groups=split_row_groups,
validate_schema=validate_schema,
metadata_nthreads=metadata_nthreads
)
self = object.__new__(cls)
return self

def __init__(self, path_or_paths, filesystem=None, schema=None,
metadata=None, split_row_groups=False, validate_schema=True,
filters=None, metadata_nthreads=1, read_dictionary=None,
memory_map=False, buffer_size=0, partitioning="hive",
use_legacy_dataset=True, pre_buffer=True):
use_legacy_dataset=True, pre_buffer=True,
coerce_int96_timestamp_unit=None):
if partitioning != "hive":
raise ValueError(
'Only "hive" for hive-like partitioning is supported when '
Expand Down Expand Up @@ -1582,7 +1599,7 @@ class _ParquetDatasetV2:
def __init__(self, path_or_paths, filesystem=None, filters=None,
partitioning="hive", read_dictionary=None, buffer_size=None,
memory_map=False, ignore_prefixes=None, pre_buffer=True,
**kwargs):
coerce_int96_timestamp_unit=None, **kwargs):
import pyarrow.dataset as ds

# Raise error for not supported keywords
Expand All @@ -1596,7 +1613,10 @@ def __init__(self, path_or_paths, filesystem=None, filters=None,
"Dataset API".format(keyword))

# map format arguments
read_options = {"pre_buffer": pre_buffer}
read_options = {
"pre_buffer": pre_buffer,
"coerce_int96_timestamp_unit": coerce_int96_timestamp_unit
}
if buffer_size:
read_options.update(use_buffered_stream=True,
buffer_size=buffer_size)
Expand Down Expand Up @@ -1825,7 +1845,8 @@ def read_table(source, columns=None, use_threads=True, metadata=None,
use_pandas_metadata=False, memory_map=False,
read_dictionary=None, filesystem=None, filters=None,
buffer_size=0, partitioning="hive", use_legacy_dataset=False,
ignore_prefixes=None, pre_buffer=True):
ignore_prefixes=None, pre_buffer=True,
coerce_int96_timestamp_unit=None):
if not use_legacy_dataset:
if metadata is not None:
raise ValueError(
Expand All @@ -1845,6 +1866,7 @@ def read_table(source, columns=None, use_threads=True, metadata=None,
filters=filters,
ignore_prefixes=ignore_prefixes,
pre_buffer=pre_buffer,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
)
except ImportError:
# fall back on ParquetFile for simple cases when pyarrow.dataset
Expand All @@ -1866,7 +1888,9 @@ def read_table(source, columns=None, use_threads=True, metadata=None,
dataset = ParquetFile(
source, metadata=metadata, read_dictionary=read_dictionary,
memory_map=memory_map, buffer_size=buffer_size,
pre_buffer=pre_buffer)
pre_buffer=pre_buffer,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
)

return dataset.read(columns=columns, use_threads=use_threads,
use_pandas_metadata=use_pandas_metadata)
Expand All @@ -1877,16 +1901,22 @@ def read_table(source, columns=None, use_threads=True, metadata=None,
"use_legacy_dataset=False")

if _is_path_like(source):
pf = ParquetDataset(source, metadata=metadata, memory_map=memory_map,
read_dictionary=read_dictionary,
buffer_size=buffer_size,
filesystem=filesystem, filters=filters,
partitioning=partitioning)
pf = ParquetDataset(
source, metadata=metadata, memory_map=memory_map,
read_dictionary=read_dictionary,
buffer_size=buffer_size,
filesystem=filesystem, filters=filters,
partitioning=partitioning,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
)
else:
pf = ParquetFile(source, metadata=metadata,
read_dictionary=read_dictionary,
memory_map=memory_map,
buffer_size=buffer_size)
pf = ParquetFile(
source, metadata=metadata,
read_dictionary=read_dictionary,
memory_map=memory_map,
buffer_size=buffer_size,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
)
return pf.read(columns=columns, use_threads=use_threads,
use_pandas_metadata=use_pandas_metadata)

Expand Down
Loading

0 comments on commit 8a9e875

Please sign in to comment.