From c3c55c4d71f3e3783948a11853032d2cebecc1ec Mon Sep 17 00:00:00 2001 From: Stijn Kas <78410144+StijnKas@users.noreply.github.com> Date: Mon, 12 Dec 2022 15:28:31 +0100 Subject: [PATCH] Update import logic to use `polars` instead of `pyarrow`, added logging (#63) With pola-rs/polars#5687 closed, I've updated all import logic for pdstools to use polars over pyarrow. Since we make heavy use of ndjson, this issue was a blocker. All imports should now be much faster. Additionally, I've added some basic logging to the file import logic. * Fill SnapshotTime with nan if missing * Changed all import logic to Polars from pyarrow * Updated ValueFinder to use correct keyword for polars instead of pyarrow Co-authored-by: Stijn Kas --- python/pdstools/adm/ADMDatamart.py | 3 +- python/pdstools/utils/cdh_utils.py | 148 +++++++++++++-------- python/pdstools/valuefinder/ValueFinder.py | 4 +- python/tests/test_cdh_utils.py | 69 +++++----- 4 files changed, 127 insertions(+), 97 deletions(-) diff --git a/python/pdstools/adm/ADMDatamart.py b/python/pdstools/adm/ADMDatamart.py index aac20dd7..35f165f2 100644 --- a/python/pdstools/adm/ADMDatamart.py +++ b/python/pdstools/adm/ADMDatamart.py @@ -485,7 +485,8 @@ def _set_types(df: pd.DataFrame, verbose=True) -> pd.DataFrame: for col in {"Performance"} & set(df.columns): df[col] = df[col].astype(float) - + if 'SnapshotTime' not in df.columns: + df['SnapshotTime'] = np.nan try: df["SnapshotTime"] = pd.to_datetime(df["SnapshotTime"]) except Exception: diff --git a/python/pdstools/utils/cdh_utils.py b/python/pdstools/utils/cdh_utils.py index c5eb28fb..3caafd44 100644 --- a/python/pdstools/utils/cdh_utils.py +++ b/python/pdstools/utils/cdh_utils.py @@ -22,16 +22,19 @@ def readDSExport( - filename: Union[pd.DataFrame, str], + filename: Union[pd.DataFrame, pl.DataFrame, str], path: str = ".", verbose: bool = True, **kwargs, -) -> pd.DataFrame: +) -> Union[pd.DataFrame, pl.DataFrame]: """Read a Pega dataset export file. Can accept either a Pandas DataFrame or one of the following formats: - .csv - .json - .zip (zipped json or CSV) + - .feather + - .ipc + - .parquet It automatically infers the default file names for both model data as well as predictor data. If you supply either 'modelData' or 'predictorData' as the 'file' argument, it will search for them. @@ -39,8 +42,8 @@ def readDSExport( Parameters ---------- - filename : [pd.DataFrame, str] - Either a Pandas DataFrame with the source data (for compatibility), + filename : [pd.DataFrame, pl.DataFrame, str] + Either a Pandas/Polars DataFrame with the source data (for compatibility), or a string, in which case it can either be: - The name of the file (if a custom name) or - Whether we want to look for 'modelData' or 'predictorData' in the path folder. @@ -49,12 +52,17 @@ def readDSExport( verbose : bool, default = True Whether to print out which file will be imported - Keyword arguments: - Any arguments to plug into the read csv or json function, from either PyArrow or Pandas. + Keyword arguments + ----------------- + return_pl: bool + Whether to return polars dataframe + if False, transforms to Pandas + Any: + Any arguments to plug into the read_* function from Polars. Returns ------- - pd.DataFrame + pd.DataFrame | pl.DataFrame The read data from the given file Examples: @@ -67,13 +75,15 @@ def readDSExport( """ # If a dataframe is supplied directly, we can just return it - if isinstance(filename, pd.DataFrame): + if isinstance(filename, pd.DataFrame) | isinstance(filename, pl.DataFrame): + logging.debug("Dataframe returned directly") return filename # If the data is a BytesIO object, such as an uploaded file # in certain webapps, then we can simply return the object # as is, while extracting the extension as well. if isinstance(filename, BytesIO): + logging.debug("Filename is of type BytesIO, importing that directly") name, extension = os.path.splitext(filename.name) return import_file(filename, extension, **kwargs) @@ -81,8 +91,10 @@ def readDSExport( # extract the extension of the file, then look for # the file in the user's directory. if os.path.isfile(os.path.join(path, filename)): + logging.debug("File found in directory") file = os.path.join(path, filename) else: + logging.debug("File not found in directory, scanning for latest file") file = get_latest_file(path, filename) # If we can't find the file locally, we can try @@ -90,12 +102,14 @@ def readDSExport( # the file in a BytesIO object, and read the file # fully to disk for pyarrow to read it. if file == "Target not found" or file == None: + logging.debug("Could not find file in directory, checking if URL") import requests try: response = requests.get(f"{path}/{filename}") logging.info(f"Response: {response}") if response.status_code == 200: + logging.debug("File found online, importing and parsing to BytesIO") file = f"{path}/{filename}" file = BytesIO(urllib.request.urlopen(file).read()) name, extension = os.path.splitext(filename) @@ -111,44 +125,72 @@ def readDSExport( name, extension = os.path.splitext(file) # Now we should either have a full path to a file, or a - # BytesIO wrapper around the file. Pyarrow can read those both. + # BytesIO wrapper around the file. Polars can read those both. return import_file(file, extension, **kwargs) -def import_file(file, extension, **kwargs): - import pyarrow +def import_file( + file: str, extension: str, **kwargs +) -> Union[pl.DataFrame, pd.DataFrame]: + """Imports a file using Polars + + By default, exports to Pandas + + Parameters + ---------- + File: str + The path to the file, passed directly to the read functions + extension: str + The extension of the file, used to determine which function to use + + Keyword arguments + ----------------- + return_pl: bool, default = False + Whether to return Polars or Pandas + If return_pl is True, then keeps the Polars dataframe + + Returns + ------- + Union[pd.DataFrame | pl.DataFrame] + A Pandas or Polars dataframe, depending on `return_pl` + """ if extension == ".zip": - file = readZippedFile(file) - elif extension == ".csv": - from pyarrow import csv + logging.debug("Zip file found, extracting data.json to BytesIO.") + file, extension = readZippedFile(file) - file = csv.read_csv( + if extension == ".csv": + file = pl.read_csv( file, - parse_options=pyarrow.csv.ParseOptions(delimiter=kwargs.get("sep", ",")), + sep=kwargs.get("sep", ","), ) + elif extension == ".json": - from pyarrow import json + try: + file = pl.read_ndjson(file) + except: + file = pl.read_json(file) - file = json.read_json(file, **kwargs) elif extension == ".parquet": - from pyarrow import parquet + file = pl.read_parquet(file) + + elif extension == ".feather" | extension.casefold() == ".ipc": + file = pl.read_ipc(file) - file = parquet.read_table(file) else: raise ValueError(f"Could not import file: {file}, with extension {extension}") - if not kwargs.pop("return_pa", False) and isinstance(file, pyarrow.Table): - return file.to_pandas() - else: + if kwargs.pop("return_pl", False): return file + else: + return file.to_pandas() -def readZippedFile(file: str, verbose: bool = False, **kwargs) -> pd.DataFrame: - """Read a zipped file. +def readZippedFile(file: str, verbose: bool = False, **kwargs) -> BytesIO: + """Read a zipped NDJSON file. Reads a dataset export file as exported and downloaded from Pega. The export - file is formatted as a zipped multi-line JSON file or CSV file - and the data is read into a pandas dataframe. + file is formatted as a zipped multi-line JSON file. It reads the file, + and then returns the file as a BytesIO object. Parameters ---------- @@ -159,35 +201,17 @@ def readZippedFile(file: str, verbose: bool = False, **kwargs) -> pd.DataFrame: Returns ------- - pd.DataFrame - A pandas dataframe with the contents. + os.BytesIO + A Polars dataframe with the contents. """ - with zipfile.ZipFile(file, mode="r") as z: + logging.debug("Opened zip file.") files = z.namelist() - if verbose: - print(files) # pragma: no cover + logging.debug(f"Files found: {files}") if "data.json" in files: + logging.debug("data.json found.") with z.open("data.json") as zippedfile: - try: - from pyarrow import json - - return json.read_json(zippedfile) - except ImportError: # pragma: no cover - try: - dataset = pd.read_json(zippedfile, lines=True) - return dataset - except ValueError: - dataset = pd.read_json(zippedfile) - return dataset - if "csv.json" in files: # pragma: no cover - with z.open("data.csv") as zippedfile: - try: - from pyarrow import csv - - return csv.read_json(zippedfile).to_pandas() - except ImportError: - return pd.read_csv(zippedfile) + return (BytesIO(zippedfile.read()), ".json") else: # pragma: no cover raise FileNotFoundError("Cannot find a 'data' file in the zip folder.") @@ -217,7 +241,7 @@ def get_latest_file(path: str, target: str, verbose: bool = False) -> str: if target not in {"modelData", "predictorData", "ValueFinder"}: return f"Target not found" - supported = [".json", ".csv", ".zip", ".parquet"] + supported = [".json", ".csv", ".zip", ".parquet", ".feather", ".ipc"] files_dir = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))] files_dir = [f for f in files_dir if os.path.splitext(f)[-1].lower() in supported] @@ -279,14 +303,18 @@ def getMatches(files_dir, target): def cache_to_file( - df: pd.DataFrame, path: os.PathLike, name: str, cache_type="parquet" + df: pd.DataFrame, + path: os.PathLike, + name: str, + cache_type: str = "ipc", + compression: str = "uncompressed", ) -> os.PathLike: """Very simple convenience function to cache data. Caches in parquet format for very fast reading. Parameters ---------- - df: pd.DataFrame + df: pd.DataFrame | pl.DataFrame The dataframe to cache path: os.PathLike The location to cache the data @@ -294,8 +322,9 @@ def cache_to_file( The name to give to the file cache_type: str The type of file to export. - Currently, only Parquet is supported, - will support Arrow/Feather/IPC soon. + Default is IPC, also supports parquet + compression: str + The compression to apply, default is uncompressed Returns ------- @@ -303,9 +332,14 @@ def cache_to_file( The filepath to the cached file """ outpath = f"{path}/{name}" + if isinstance(df, pd.DataFrame): + df = pl.DataFrame(df) + if cache_type == "ipc": + outpath = f"{outpath}.arrow" + df.write_ipc(outpath, compression=compression) if cache_type == "parquet": outpath = f"{outpath}.parquet" - df.to_parquet(outpath) + df.write_parquet(outpath, compression=compression) return outpath diff --git a/python/pdstools/valuefinder/ValueFinder.py b/python/pdstools/valuefinder/ValueFinder.py index 08edccdc..c1944edd 100644 --- a/python/pdstools/valuefinder/ValueFinder.py +++ b/python/pdstools/valuefinder/ValueFinder.py @@ -73,10 +73,8 @@ def __init__( start = time.time() filename = kwargs.pop("filename", "ValueFinder") self.df = cdh_utils.readDSExport( - filename, path, return_pa=True, verbose=verbose + filename, path, return_pl=True, verbose=verbose ) - if kwargs.get("subset", True): - self.df = self.df.select(keep_cols) if verbose: print(f"Data import took {round(time.time() - start,2)} seconds") diff --git a/python/tests/test_cdh_utils.py b/python/tests/test_cdh_utils.py index baac7304..9ce0c7c9 100644 --- a/python/tests/test_cdh_utils.py +++ b/python/tests/test_cdh_utils.py @@ -8,7 +8,7 @@ import urllib.request import datetime from pytz import timezone -import pyarrow +import polars as pl sys.path.append("python") @@ -43,57 +43,55 @@ def test_only_imports_zips(): ) -def test_import_produces_dataframe(): +def test_import_produces_polars(): assert isinstance( - cdh_utils.readZippedFile( + cdh_utils.readDSExport( + "Data-Decision-ADM-ModelSnapshot_pyModelSnapshots_20210101T010000_GMT.zip", + "data", + return_pl=True, + ), + pl.DataFrame, + ) + + +def test_import_produces_bytes(): + ret = cdh_utils.readZippedFile( os.path.join( "data", "Data-Decision-ADM-ModelSnapshot_pyModelSnapshots_20210101T010000_GMT.zip", ) - ), - pyarrow.Table - ) + ) + assert isinstance(ret[0], BytesIO) + assert ret[1] == '.json' def test_read_modelData(): - assert cdh_utils.readZippedFile( - os.path.join( - "data", - "Data-Decision-ADM-ModelSnapshot_pyModelSnapshots_20210101T010000_GMT.zip", - ) - ).shape == (20, 23) + assert cdh_utils.readDSExport( + "Data-Decision-ADM-ModelSnapshot_pyModelSnapshots_20210101T010000_GMT.zip", + "data", + ).shape == (20, 23) def test_read_predictorData(): - assert cdh_utils.readZippedFile( - os.path.join( - "data", - "Data-Decision-ADM-PredictorBinningSnapshot_pyADMPredictorSnapshots_20210101T010000_GMT.zip", - ) - ).shape == (1755, 35) + assert cdh_utils.readDSExport( + "Data-Decision-ADM-PredictorBinningSnapshot_pyADMPredictorSnapshots_20210101T010000_GMT.zip", + "data", + ).shape == (1755, 34) -def test_read_zip_from_url(): - assert pyarrow_checks( - cdh_utils.readZippedFile( - BytesIO( - urllib.request.urlopen( - "https://raw.githubusercontent.com/pegasystems/cdh-datascientist-tools/master/data/Data-Decision-ADM-ModelSnapshot_pyModelSnapshots_20210101T010000_GMT.zip" - ).read() - ) - ) +def test_polars_zip_from_url(): + assert isinstance( + cdh_utils.readDSExport('Data-Decision-ADM-ModelSnapshot_pyModelSnapshots_20210101T010000_GMT.zip', "https://raw.githubusercontent.com/pegasystems/cdh-datascientist-tools/master/data",return_pl=True), + pl.DataFrame, ) # Tests for ReadDSExport function @pytest.fixture def test_data(): - return cdh_utils.readZippedFile( - os.path.join( - "data", - "Data-Decision-ADM-ModelSnapshot_pyModelSnapshots_20210101T010000_GMT.zip", - ) - ).to_pandas() + return cdh_utils.readDSExport( + "Data-Decision-ADM-ModelSnapshot_pyModelSnapshots_20210101T010000_GMT.zip", + "data") def pandas_checks(df): @@ -101,12 +99,11 @@ def pandas_checks(df): if isinstance(df, pd.DataFrame) and len(df) > 0: return True -def pyarrow_checks(df): +def polars_checks(df): "Very simple convienence function to check if it is a dataframe with rows." - if isinstance(df, pyarrow.Table) and len(df) > 0: + if isinstance(df, pl.DataFrame) and len(df) > 0: return True - def test_dataframe_returns_itself(test_data): input = test_data assert cdh_utils.readDSExport(input).equals(input)