Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to write output tables as parquet files #763

Merged
merged 6 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions activitysim/core/configuration/top.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any, Literal

from activitysim.core.configuration.base import PydanticBase, Union
from pydantic import validator


class InputTable(PydanticBase):
Expand Down Expand Up @@ -119,6 +120,11 @@ class OutputTables(PydanticBase):
h5_store: bool = False
"""Write tables into a single HDF5 store instead of individual CSVs."""

file_type: str = "csv"
jpn-- marked this conversation as resolved.
Show resolved Hide resolved
"""
Specifies the file type for output tables. Options are limited to 'csv',
'h5' or 'parquet'. Only applied if h5_store is set to False."""

action: str
"""Whether to 'include' or 'skip' the enumerated tables in `tables`."""

Expand All @@ -144,6 +150,15 @@ class OutputTables(PydanticBase):
applied to any output tables.
"""

@validator("file_type")
def method_is_valid(cls, method: str) -> str:
jpn-- marked this conversation as resolved.
Show resolved Hide resolved
"""Validates file_type setting."""

allowed_set = {"csv", "h5", "parquet"}
if method not in allowed_set:
raise ValueError(f"must be in {allowed_set}, got '{method}'")
return method


class MultiprocessStepSlice(PydanticBase, extra="forbid"):
"""
Expand Down
31 changes: 26 additions & 5 deletions activitysim/core/steps/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pandas as pd
import pyarrow as pa
import pyarrow.csv as csv
import pyarrow.parquet as parquet

from activitysim.core import configuration, workflow
from activitysim.core.workflow.checkpoint import CHECKPOINT_NAME
Expand Down Expand Up @@ -226,8 +227,13 @@ def write_data_dictionary(state: workflow.State) -> None:
@workflow.step
def write_tables(state: workflow.State) -> None:
"""
Write pipeline tables as csv files (in output directory) as specified by output_tables list
in settings file.
Write pipeline tables as csv or parquet files (in output directory) as specified
by output_tables list in settings file. Output to parquet or a single h5 file is
also supported.

'h5_store' defaults to False, which means the output will be written out to csv.
'file_type' defaults to 'csv' but can also be used to specify 'parquet' or 'h5'.
When 'h5_store' is set to True, 'file_type' is ingored and the outputs are written to h5.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old setting should now be deprecated. I opened an issue #791 to address that separate from this PR.


'output_tables' can specify either a list of output tables to include or to skip
if no output_tables list is specified, then all checkpointed tables will be written
Expand Down Expand Up @@ -261,6 +267,16 @@ def write_tables(state: workflow.State) -> None:
tables:
- households

To write tables to parquet files, use the file_type setting:

::

output_tables:
file_type: parquet
action: include
tables:
- households

Parameters
----------
output_dir: str
Expand All @@ -277,6 +293,7 @@ def write_tables(state: workflow.State) -> None:
tables = output_tables_settings.tables
prefix = output_tables_settings.prefix
h5_store = output_tables_settings.h5_store
file_type = output_tables_settings.file_type
sort = output_tables_settings.sort

registered_tables = state.registered_tables()
Expand Down Expand Up @@ -383,14 +400,18 @@ def map_func(x):
):
dt = dt.drop([f"_original_{lookup_col}"])

if h5_store:
if h5_store or file_type == "h5":
file_path = state.get_output_file_path("%soutput_tables.h5" % prefix)
dt.to_pandas().to_hdf(
str(file_path), key=table_name, mode="a", format="fixed"
)

else:
file_name = f"{prefix}{table_name}.csv"
file_name = f"{prefix}{table_name}.{file_type}"
file_path = state.get_output_file_path(file_name)

# include the index if it has a name or is a MultiIndex
csv.write_csv(dt, file_path)
if file_type == "csv":
csv.write_csv(dt, file_path)
else:
jpn-- marked this conversation as resolved.
Show resolved Hide resolved
parquet.write_table(dt, file_path)
Loading