Skip to content

Commit

Permalink
Update import logic to use polars instead of pyarrow, added loggi…
Browse files Browse the repository at this point in the history
…ng (#63)

With pola-rs/polars#5687 closed, I've updated all import logic for pdstools to use polars over pyarrow. Since we make heavy use of ndjson, this issue was a blocker. All imports should now be much faster.

Additionally, I've added some basic logging to the file import logic.


* Fill SnapshotTime with nan if missing

* Changed all import logic to Polars from pyarrow

* Updated ValueFinder to use correct keyword for polars instead of pyarrow

Co-authored-by: Stijn Kas <[email protected]>
  • Loading branch information
StijnKas and StijnKas authored Dec 12, 2022
1 parent f9e0e92 commit c3c55c4
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 97 deletions.
3 changes: 2 additions & 1 deletion python/pdstools/adm/ADMDatamart.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ def _set_types(df: pd.DataFrame, verbose=True) -> pd.DataFrame:

for col in {"Performance"} & set(df.columns):
df[col] = df[col].astype(float)

if 'SnapshotTime' not in df.columns:
df['SnapshotTime'] = np.nan
try:
df["SnapshotTime"] = pd.to_datetime(df["SnapshotTime"])
except Exception:
Expand Down
148 changes: 91 additions & 57 deletions python/pdstools/utils/cdh_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,28 @@


def readDSExport(
filename: Union[pd.DataFrame, str],
filename: Union[pd.DataFrame, pl.DataFrame, str],
path: str = ".",
verbose: bool = True,
**kwargs,
) -> pd.DataFrame:
) -> Union[pd.DataFrame, pl.DataFrame]:
"""Read a Pega dataset export file.
Can accept either a Pandas DataFrame or one of the following formats:
- .csv
- .json
- .zip (zipped json or CSV)
- .feather
- .ipc
- .parquet
It automatically infers the default file names for both model data as well as predictor data.
If you supply either 'modelData' or 'predictorData' as the 'file' argument, it will search for them.
If you supply the full name of the file in the 'path' directory, it will import that instead.
Parameters
----------
filename : [pd.DataFrame, str]
Either a Pandas DataFrame with the source data (for compatibility),
filename : [pd.DataFrame, pl.DataFrame, str]
Either a Pandas/Polars DataFrame with the source data (for compatibility),
or a string, in which case it can either be:
- The name of the file (if a custom name) or
- Whether we want to look for 'modelData' or 'predictorData' in the path folder.
Expand All @@ -49,12 +52,17 @@ def readDSExport(
verbose : bool, default = True
Whether to print out which file will be imported
Keyword arguments:
Any arguments to plug into the read csv or json function, from either PyArrow or Pandas.
Keyword arguments
-----------------
return_pl: bool
Whether to return polars dataframe
if False, transforms to Pandas
Any:
Any arguments to plug into the read_* function from Polars.
Returns
-------
pd.DataFrame
pd.DataFrame | pl.DataFrame
The read data from the given file
Examples:
Expand All @@ -67,35 +75,41 @@ def readDSExport(
"""

# If a dataframe is supplied directly, we can just return it
if isinstance(filename, pd.DataFrame):
if isinstance(filename, pd.DataFrame) | isinstance(filename, pl.DataFrame):
logging.debug("Dataframe returned directly")
return filename

# If the data is a BytesIO object, such as an uploaded file
# in certain webapps, then we can simply return the object
# as is, while extracting the extension as well.
if isinstance(filename, BytesIO):
logging.debug("Filename is of type BytesIO, importing that directly")
name, extension = os.path.splitext(filename.name)
return import_file(filename, extension, **kwargs)

# If the filename is simply a string, then we first
# extract the extension of the file, then look for
# the file in the user's directory.
if os.path.isfile(os.path.join(path, filename)):
logging.debug("File found in directory")
file = os.path.join(path, filename)
else:
logging.debug("File not found in directory, scanning for latest file")
file = get_latest_file(path, filename)

# If we can't find the file locally, we can try
# if the file's a URL. If it is, we need to wrap
# the file in a BytesIO object, and read the file
# fully to disk for pyarrow to read it.
if file == "Target not found" or file == None:
logging.debug("Could not find file in directory, checking if URL")
import requests

try:
response = requests.get(f"{path}/{filename}")
logging.info(f"Response: {response}")
if response.status_code == 200:
logging.debug("File found online, importing and parsing to BytesIO")
file = f"{path}/{filename}"
file = BytesIO(urllib.request.urlopen(file).read())
name, extension = os.path.splitext(filename)
Expand All @@ -111,44 +125,72 @@ def readDSExport(
name, extension = os.path.splitext(file)

# Now we should either have a full path to a file, or a
# BytesIO wrapper around the file. Pyarrow can read those both.
# BytesIO wrapper around the file. Polars can read those both.
return import_file(file, extension, **kwargs)


def import_file(file, extension, **kwargs):
import pyarrow
def import_file(
file: str, extension: str, **kwargs
) -> Union[pl.DataFrame, pd.DataFrame]:
"""Imports a file using Polars
By default, exports to Pandas
Parameters
----------
File: str
The path to the file, passed directly to the read functions
extension: str
The extension of the file, used to determine which function to use
Keyword arguments
-----------------
return_pl: bool, default = False
Whether to return Polars or Pandas
If return_pl is True, then keeps the Polars dataframe
Returns
-------
Union[pd.DataFrame | pl.DataFrame]
A Pandas or Polars dataframe, depending on `return_pl`
"""

if extension == ".zip":
file = readZippedFile(file)
elif extension == ".csv":
from pyarrow import csv
logging.debug("Zip file found, extracting data.json to BytesIO.")
file, extension = readZippedFile(file)

file = csv.read_csv(
if extension == ".csv":
file = pl.read_csv(
file,
parse_options=pyarrow.csv.ParseOptions(delimiter=kwargs.get("sep", ",")),
sep=kwargs.get("sep", ","),
)

elif extension == ".json":
from pyarrow import json
try:
file = pl.read_ndjson(file)
except:
file = pl.read_json(file)

file = json.read_json(file, **kwargs)
elif extension == ".parquet":
from pyarrow import parquet
file = pl.read_parquet(file)

elif extension == ".feather" | extension.casefold() == ".ipc":
file = pl.read_ipc(file)

file = parquet.read_table(file)
else:
raise ValueError(f"Could not import file: {file}, with extension {extension}")

if not kwargs.pop("return_pa", False) and isinstance(file, pyarrow.Table):
return file.to_pandas()
else:
if kwargs.pop("return_pl", False):
return file
else:
return file.to_pandas()


def readZippedFile(file: str, verbose: bool = False, **kwargs) -> pd.DataFrame:
"""Read a zipped file.
def readZippedFile(file: str, verbose: bool = False, **kwargs) -> BytesIO:
"""Read a zipped NDJSON file.
Reads a dataset export file as exported and downloaded from Pega. The export
file is formatted as a zipped multi-line JSON file or CSV file
and the data is read into a pandas dataframe.
file is formatted as a zipped multi-line JSON file. It reads the file,
and then returns the file as a BytesIO object.
Parameters
----------
Expand All @@ -159,35 +201,17 @@ def readZippedFile(file: str, verbose: bool = False, **kwargs) -> pd.DataFrame:
Returns
-------
pd.DataFrame
A pandas dataframe with the contents.
os.BytesIO
A Polars dataframe with the contents.
"""

with zipfile.ZipFile(file, mode="r") as z:
logging.debug("Opened zip file.")
files = z.namelist()
if verbose:
print(files) # pragma: no cover
logging.debug(f"Files found: {files}")
if "data.json" in files:
logging.debug("data.json found.")
with z.open("data.json") as zippedfile:
try:
from pyarrow import json

return json.read_json(zippedfile)
except ImportError: # pragma: no cover
try:
dataset = pd.read_json(zippedfile, lines=True)
return dataset
except ValueError:
dataset = pd.read_json(zippedfile)
return dataset
if "csv.json" in files: # pragma: no cover
with z.open("data.csv") as zippedfile:
try:
from pyarrow import csv

return csv.read_json(zippedfile).to_pandas()
except ImportError:
return pd.read_csv(zippedfile)
return (BytesIO(zippedfile.read()), ".json")
else: # pragma: no cover
raise FileNotFoundError("Cannot find a 'data' file in the zip folder.")

Expand Down Expand Up @@ -217,7 +241,7 @@ def get_latest_file(path: str, target: str, verbose: bool = False) -> str:
if target not in {"modelData", "predictorData", "ValueFinder"}:
return f"Target not found"

supported = [".json", ".csv", ".zip", ".parquet"]
supported = [".json", ".csv", ".zip", ".parquet", ".feather", ".ipc"]

files_dir = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]
files_dir = [f for f in files_dir if os.path.splitext(f)[-1].lower() in supported]
Expand Down Expand Up @@ -279,33 +303,43 @@ def getMatches(files_dir, target):


def cache_to_file(
df: pd.DataFrame, path: os.PathLike, name: str, cache_type="parquet"
df: pd.DataFrame,
path: os.PathLike,
name: str,
cache_type: str = "ipc",
compression: str = "uncompressed",
) -> os.PathLike:
"""Very simple convenience function to cache data.
Caches in parquet format for very fast reading.
Parameters
----------
df: pd.DataFrame
df: pd.DataFrame | pl.DataFrame
The dataframe to cache
path: os.PathLike
The location to cache the data
name: str
The name to give to the file
cache_type: str
The type of file to export.
Currently, only Parquet is supported,
will support Arrow/Feather/IPC soon.
Default is IPC, also supports parquet
compression: str
The compression to apply, default is uncompressed
Returns
-------
os.PathLike:
The filepath to the cached file
"""
outpath = f"{path}/{name}"
if isinstance(df, pd.DataFrame):
df = pl.DataFrame(df)
if cache_type == "ipc":
outpath = f"{outpath}.arrow"
df.write_ipc(outpath, compression=compression)
if cache_type == "parquet":
outpath = f"{outpath}.parquet"
df.to_parquet(outpath)
df.write_parquet(outpath, compression=compression)
return outpath


Expand Down
4 changes: 1 addition & 3 deletions python/pdstools/valuefinder/ValueFinder.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,8 @@ def __init__(
start = time.time()
filename = kwargs.pop("filename", "ValueFinder")
self.df = cdh_utils.readDSExport(
filename, path, return_pa=True, verbose=verbose
filename, path, return_pl=True, verbose=verbose
)
if kwargs.get("subset", True):
self.df = self.df.select(keep_cols)
if verbose:
print(f"Data import took {round(time.time() - start,2)} seconds")

Expand Down
Loading

0 comments on commit c3c55c4

Please sign in to comment.