Skip to content

Commit

Permalink
use pep574 pickling for pandas container
Browse files Browse the repository at this point in the history
  • Loading branch information
larme committed Apr 6, 2023
1 parent 91ae226 commit 50a2825
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 35 deletions.
61 changes: 26 additions & 35 deletions src/bentoml/_internal/runner/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,10 @@
import typing as t
import itertools

from simple_di import inject
from simple_di import Provide

from ..types import LazyType
from ..utils import LazyLoader
from ..utils.pickle import pep574_dumps
from ..utils.pickle import pep574_loads
from ..configuration.containers import BentoMLContainer

SingleType = t.TypeVar("SingleType")
BatchType = t.TypeVar("BatchType")
Expand Down Expand Up @@ -338,7 +334,6 @@ def batch_to_payloads(
return payloads

@classmethod
@inject
def from_batch_payloads(
cls,
payloads: t.Sequence[Payload],
Expand Down Expand Up @@ -385,12 +380,10 @@ def batch_to_batches(
]

@classmethod
@inject
def to_payload(
cls,
batch: ext.PdDataFrame | ext.PdSeries,
batch_dim: int,
plasma_db: ext.PlasmaClient | None = Provide[BentoMLContainer.plasma_db],
) -> Payload:
import pandas as pd

Expand All @@ -401,59 +394,60 @@ def to_payload(
if isinstance(batch, pd.Series):
batch = pd.DataFrame([batch])

if plasma_db:
return cls.create_payload(
plasma_db.put(batch).binary(),
batch.size,
{"plasma": True},
)
meta: dict[str, bool | int | float | str | list[int]] = {"format": "pickle5"}

bs: bytes
concat_buffer_bs: bytes
indices: list[int]
bs, concat_buffer_bs, indices = pep574_dumps(batch)

if indices:
meta["with_buffer"] = True
data = concat_buffer_bs
meta["pickle_bytes_str"] = base64.b64encode(bs).decode("ascii")
meta["indices"] = indices
else:
meta["with_buffer"] = False
data = bs

return cls.create_payload(
pickle.dumps(batch),
data,
batch.size,
{"plasma": False},
meta=meta,
)

@classmethod
@inject
def from_payload(
cls,
payload: Payload,
plasma_db: ext.PlasmaClient | None = Provide[BentoMLContainer.plasma_db],
) -> ext.PdDataFrame:
if payload.meta.get("plasma"):
import pyarrow.plasma as plasma

assert plasma_db
return plasma_db.get(plasma.ObjectID(payload.data))

return pickle.loads(payload.data)
if payload.meta["with_buffer"]:
bs_str = t.cast(str, payload.meta["pickle_bytes_str"])
bs = base64.b64decode(bs_str)
indices = t.cast(t.List[int], payload.meta["indices"])
return pep574_loads(bs, payload.data, indices)
else:
return pep574_loads(payload.data, b"", [])

@classmethod
@inject
def batch_to_payloads(
cls,
batch: ext.PdDataFrame,
indices: t.Sequence[int],
batch_dim: int = 0,
plasma_db: ext.PlasmaClient | None = Provide[BentoMLContainer.plasma_db],
) -> list[Payload]:
batches = cls.batch_to_batches(batch, indices, batch_dim)

payloads = [
cls.to_payload(subbatch, batch_dim, plasma_db) for subbatch in batches
]
payloads = [cls.to_payload(subbatch, batch_dim) for subbatch in batches]
return payloads

@classmethod
@inject
def from_batch_payloads( # pylint: disable=arguments-differ
cls,
payloads: t.Sequence[Payload],
batch_dim: int = 0,
plasma_db: ext.PlasmaClient | None = Provide[BentoMLContainer.plasma_db],
) -> tuple[ext.PdDataFrame, list[int]]:
batches = [cls.from_payload(payload, plasma_db) for payload in payloads]
batches = [cls.from_payload(payload) for payload in payloads]
return cls.batches_to_batch(batches, batch_dim)


Expand Down Expand Up @@ -514,7 +508,6 @@ def to_payload(cls, batch: t.Any, batch_dim: int) -> Payload:
)

@classmethod
@inject
def from_payload(cls, payload: Payload) -> t.Any:
if payload.meta["with_buffer"]:
bs_str = t.cast(str, payload.meta["pickle_bytes_str"])
Expand All @@ -525,7 +518,6 @@ def from_payload(cls, payload: Payload) -> t.Any:
return pep574_loads(payload.data, b"", [])

@classmethod
@inject
def batch_to_payloads(
cls,
batch: list[t.Any],
Expand All @@ -538,7 +530,6 @@ def batch_to_payloads(
return payloads

@classmethod
@inject
def from_batch_payloads(
cls,
payloads: t.Sequence[Payload],
Expand Down
22 changes: 22 additions & 0 deletions tests/unit/_internal/utils/test_pickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

def test_pep574_restore() -> None:
import numpy as np
import pandas as pd

from bentoml._internal.utils.pickle import pep574_dumps
from bentoml._internal.utils.pickle import pep574_loads
Expand Down Expand Up @@ -35,3 +36,24 @@ def test_pep574_restore() -> None:
)
for key, arr in dic.items():
assert np.isclose(arr, restored[key]).all()

df1: ext.PdDataFrame = pd.DataFrame(arr1)
df2: ext.PdDataFrame = pd.DataFrame(arr2)
df3: ext.PdDataFrame = pd.DataFrame(arr3)

df_lst = [df1, df2, df3]

bs, concat_buffer_bs, indices = pep574_dumps(df_lst)
restored = t.cast(
t.List["ext.PdDataFrame"], pep574_loads(bs, concat_buffer_bs, indices)
)
for idx, df in enumerate(df_lst):
assert np.isclose(df.to_numpy(), restored[idx].to_numpy()).all()

df_dic: dict[str, ext.PdDataFrame] = dict(a=df1, b=df2, c=df3)
bs, concat_buffer_bs, indices = pep574_dumps(df_dic)
restored = t.cast(
t.Dict[str, "ext.PdDataFrame"], pep574_loads(bs, concat_buffer_bs, indices)
)
for key, df in df_dic.items():
assert np.isclose(df.to_numpy(), restored[key].to_numpy()).all()

0 comments on commit 50a2825

Please sign in to comment.