From 2f61e36a4176ef6b14d0cc4188cdb40f663632db Mon Sep 17 00:00:00 2001 From: stefancoe Date: Mon, 11 Dec 2023 19:58:30 -0800 Subject: [PATCH 1/5] Adding parquet file support for output tables. --- activitysim/core/configuration/top.py | 14 ++++++++++++++ activitysim/core/steps/output.py | 14 ++++++++++---- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/activitysim/core/configuration/top.py b/activitysim/core/configuration/top.py index 3fd38baee..5970c29b7 100644 --- a/activitysim/core/configuration/top.py +++ b/activitysim/core/configuration/top.py @@ -119,6 +119,11 @@ class OutputTables(PydanticBase): h5_store: bool = False """Write tables into a single HDF5 store instead of individual CSVs.""" + file_type: str = 'csv' + """ + Specifies the file type for output tables. Options are limited to 'csv', + 'h5' or 'parquet'. Only applied if h5_store is set to False.""" + action: str """Whether to 'include' or 'skip' the enumerated tables in `tables`.""" @@ -143,6 +148,15 @@ class OutputTables(PydanticBase): If omitted, the all tables are written out and no decoding will be applied to any output tables. """ + + @validator("file_type") + def method_is_valid(cls, method: str) -> str: + """Validates file_type setting.""" + + allowed_set = {'csv', 'h5', 'parquet'} + if method not in allowed_set: + raise ValueError(f"must be in {allowed_set}, got '{method}'") + return method class MultiprocessStepSlice(PydanticBase, extra="forbid"): diff --git a/activitysim/core/steps/output.py b/activitysim/core/steps/output.py index 325fd2bbb..d8649b7a5 100644 --- a/activitysim/core/steps/output.py +++ b/activitysim/core/steps/output.py @@ -9,6 +9,7 @@ import pandas as pd import pyarrow as pa import pyarrow.csv as csv +import pyarrow.parquet as parquet from activitysim.core import configuration, workflow from activitysim.core.workflow.checkpoint import CHECKPOINT_NAME @@ -277,6 +278,7 @@ def write_tables(state: workflow.State) -> None: tables = output_tables_settings.tables prefix = output_tables_settings.prefix h5_store = output_tables_settings.h5_store + file_type = output_tables_settings.file_type sort = output_tables_settings.sort registered_tables = state.registered_tables() @@ -383,14 +385,18 @@ def map_func(x): ): dt = dt.drop([f"_original_{lookup_col}"]) - if h5_store: + if h5_store or file_type == 'h5': file_path = state.get_output_file_path("%soutput_tables.h5" % prefix) dt.to_pandas().to_hdf( str(file_path), key=table_name, mode="a", format="fixed" ) - else: - file_name = f"{prefix}{table_name}.csv" + + else: + file_name = f"{prefix}{table_name}.{file_type}" file_path = state.get_output_file_path(file_name) # include the index if it has a name or is a MultiIndex - csv.write_csv(dt, file_path) + if file_type =='csv': + csv.write_csv(dt, file_path) + else: + parquet.write_table(dt, file_path) From 4b33bca07f2925b7b7bca7cb2524a7ec5132805b Mon Sep 17 00:00:00 2001 From: stefancoe Date: Mon, 11 Dec 2023 20:07:40 -0800 Subject: [PATCH 2/5] Adding missing import- pydantic validator --- activitysim/core/configuration/top.py | 1 + 1 file changed, 1 insertion(+) diff --git a/activitysim/core/configuration/top.py b/activitysim/core/configuration/top.py index 5970c29b7..3bd1dcd0f 100644 --- a/activitysim/core/configuration/top.py +++ b/activitysim/core/configuration/top.py @@ -4,6 +4,7 @@ from typing import Any, Literal from activitysim.core.configuration.base import PydanticBase, Union +from pydantic import validator class InputTable(PydanticBase): From c636cfbc5b248d850fa834e59d3b79e2756a228a Mon Sep 17 00:00:00 2001 From: stefancoe Date: Tue, 12 Dec 2023 08:04:26 -0800 Subject: [PATCH 3/5] Updated write_tables doc string. --- activitysim/core/steps/output.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/activitysim/core/steps/output.py b/activitysim/core/steps/output.py index d8649b7a5..b736676b3 100644 --- a/activitysim/core/steps/output.py +++ b/activitysim/core/steps/output.py @@ -227,8 +227,13 @@ def write_data_dictionary(state: workflow.State) -> None: @workflow.step def write_tables(state: workflow.State) -> None: """ - Write pipeline tables as csv files (in output directory) as specified by output_tables list - in settings file. + Write pipeline tables as csv or parquet files (in output directory) as specified + by output_tables list in settings file. Output to parquet or a single h5 file is + also supported. + + 'h5_store' defaults to False, which means the output will be written out to csv. + 'file_type' defaults to 'csv' but can also be used to specify 'parquet' or 'h5'. + When 'h5_store' is set to True, 'file_type' is ingored and the outputs are written to h5. 'output_tables' can specify either a list of output tables to include or to skip if no output_tables list is specified, then all checkpointed tables will be written @@ -262,6 +267,16 @@ def write_tables(state: workflow.State) -> None: tables: - households + To write tables to parquet files, use the file_type setting: + + :: + + output_tables: + file_type: parquet + action: include + tables: + - households + Parameters ---------- output_dir: str From 45f74ea8c59a400f0bd489dedc29c94eeba6b074 Mon Sep 17 00:00:00 2001 From: stefancoe Date: Tue, 12 Dec 2023 11:03:18 -0800 Subject: [PATCH 4/5] Code formatted with black. --- activitysim/core/configuration/top.py | 6 +++--- activitysim/core/steps/output.py | 20 ++++++++++---------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/activitysim/core/configuration/top.py b/activitysim/core/configuration/top.py index 3bd1dcd0f..2ddcd0dfb 100644 --- a/activitysim/core/configuration/top.py +++ b/activitysim/core/configuration/top.py @@ -120,7 +120,7 @@ class OutputTables(PydanticBase): h5_store: bool = False """Write tables into a single HDF5 store instead of individual CSVs.""" - file_type: str = 'csv' + file_type: str = "csv" """ Specifies the file type for output tables. Options are limited to 'csv', 'h5' or 'parquet'. Only applied if h5_store is set to False.""" @@ -149,12 +149,12 @@ class OutputTables(PydanticBase): If omitted, the all tables are written out and no decoding will be applied to any output tables. """ - + @validator("file_type") def method_is_valid(cls, method: str) -> str: """Validates file_type setting.""" - allowed_set = {'csv', 'h5', 'parquet'} + allowed_set = {"csv", "h5", "parquet"} if method not in allowed_set: raise ValueError(f"must be in {allowed_set}, got '{method}'") return method diff --git a/activitysim/core/steps/output.py b/activitysim/core/steps/output.py index b736676b3..8c9855544 100644 --- a/activitysim/core/steps/output.py +++ b/activitysim/core/steps/output.py @@ -227,13 +227,13 @@ def write_data_dictionary(state: workflow.State) -> None: @workflow.step def write_tables(state: workflow.State) -> None: """ - Write pipeline tables as csv or parquet files (in output directory) as specified - by output_tables list in settings file. Output to parquet or a single h5 file is - also supported. + Write pipeline tables as csv or parquet files (in output directory) as specified + by output_tables list in settings file. Output to parquet or a single h5 file is + also supported. - 'h5_store' defaults to False, which means the output will be written out to csv. - 'file_type' defaults to 'csv' but can also be used to specify 'parquet' or 'h5'. - When 'h5_store' is set to True, 'file_type' is ingored and the outputs are written to h5. + 'h5_store' defaults to False, which means the output will be written out to csv. + 'file_type' defaults to 'csv' but can also be used to specify 'parquet' or 'h5'. + When 'h5_store' is set to True, 'file_type' is ingored and the outputs are written to h5. 'output_tables' can specify either a list of output tables to include or to skip if no output_tables list is specified, then all checkpointed tables will be written @@ -400,18 +400,18 @@ def map_func(x): ): dt = dt.drop([f"_original_{lookup_col}"]) - if h5_store or file_type == 'h5': + if h5_store or file_type == "h5": file_path = state.get_output_file_path("%soutput_tables.h5" % prefix) dt.to_pandas().to_hdf( str(file_path), key=table_name, mode="a", format="fixed" ) - - else: + + else: file_name = f"{prefix}{table_name}.{file_type}" file_path = state.get_output_file_path(file_name) # include the index if it has a name or is a MultiIndex - if file_type =='csv': + if file_type == "csv": csv.write_csv(dt, file_path) else: parquet.write_table(dt, file_path) From d18ceef0a5f60a97fa1155d397228b088cac736e Mon Sep 17 00:00:00 2001 From: Jeff Newman Date: Tue, 6 Feb 2024 12:49:58 -0600 Subject: [PATCH 5/5] simplify code --- activitysim/core/configuration/top.py | 16 ++++------------ activitysim/core/steps/output.py | 4 +++- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/activitysim/core/configuration/top.py b/activitysim/core/configuration/top.py index 2ddcd0dfb..afb7595c0 100644 --- a/activitysim/core/configuration/top.py +++ b/activitysim/core/configuration/top.py @@ -3,9 +3,10 @@ from pathlib import Path from typing import Any, Literal -from activitysim.core.configuration.base import PydanticBase, Union from pydantic import validator +from activitysim.core.configuration.base import PydanticBase, Union + class InputTable(PydanticBase): """ @@ -120,9 +121,9 @@ class OutputTables(PydanticBase): h5_store: bool = False """Write tables into a single HDF5 store instead of individual CSVs.""" - file_type: str = "csv" + file_type: Literal["csv", "parquet", "h5"] = "csv" """ - Specifies the file type for output tables. Options are limited to 'csv', + Specifies the file type for output tables. Options are limited to 'csv', 'h5' or 'parquet'. Only applied if h5_store is set to False.""" action: str @@ -150,15 +151,6 @@ class OutputTables(PydanticBase): applied to any output tables. """ - @validator("file_type") - def method_is_valid(cls, method: str) -> str: - """Validates file_type setting.""" - - allowed_set = {"csv", "h5", "parquet"} - if method not in allowed_set: - raise ValueError(f"must be in {allowed_set}, got '{method}'") - return method - class MultiprocessStepSlice(PydanticBase, extra="forbid"): """ diff --git a/activitysim/core/steps/output.py b/activitysim/core/steps/output.py index 8c9855544..9608806ef 100644 --- a/activitysim/core/steps/output.py +++ b/activitysim/core/steps/output.py @@ -413,5 +413,7 @@ def map_func(x): # include the index if it has a name or is a MultiIndex if file_type == "csv": csv.write_csv(dt, file_path) - else: + elif file_type == "parquet": parquet.write_table(dt, file_path) + else: + raise ValueError(f"unknown file_type {file_type}")