Skip to content

Commit

Permalink
refactor(impl): 1.2 loader (#4388)
Browse files Browse the repository at this point in the history
* refactor(impl): refactor 1.2 loader

* ci: auto fixes from pre-commit.ci

For more information, see https://pre-commit.ci

* fix

* doc

* debug info

* fix e2e test

* ci: auto fixes from pre-commit.ci

For more information, see https://pre-commit.ci

* revert build version

* pydantic>=2

* lock

* catch import error

* fix

* ci: auto fixes from pre-commit.ci

For more information, see https://pre-commit.ci

* fix: update lock hash

Signed-off-by: Frost Ming <[email protected]>

* fix import

Signed-off-by: Frost Ming <[email protected]>

---------

Signed-off-by: Frost Ming <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Frost Ming <[email protected]>
  • Loading branch information
3 people authored Jan 8, 2024
1 parent f19fb13 commit 7813cfa
Show file tree
Hide file tree
Showing 9 changed files with 971 additions and 683 deletions.
1,332 changes: 693 additions & 639 deletions pdm.ionext.lock

Large diffs are not rendered by default.

182 changes: 182 additions & 0 deletions src/_bentoml_impl/loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
from __future__ import annotations

import importlib
import pathlib
import sys
import typing as t

import yaml

if t.TYPE_CHECKING:
from _bentoml_sdk import Service

BENTO_YAML_FILENAME = "bento.yaml"


def normalize_identifier(
service_identifier: str,
working_dir: str | None = None,
) -> tuple[str, pathlib.Path]:
"""
Normalize a service identifier to a package:Service format, and return the bento
path.
valid identifiers:
- package:Service # bentoml serve projects or normalized
- package:Service.dependency # bentocloud dependencies services
- ~/bentoml/bentos/my_bento/version # bentocloud entry service
- ~/bentoml/bentos/my_bento/version/bento.yaml
- bento1/bentofile.yaml # bentoml serve from multi-target projects
- my_service:a7ab819 # bentoml serve built bentos
- package.py:Service
- package.py:Service.dependency
- .
"""
if working_dir is not None:
path = pathlib.Path(working_dir).joinpath(service_identifier)
else:
path = pathlib.Path(service_identifier)
if path.exists():
if path.is_file() and path.name == BENTO_YAML_FILENAME:
# this is a bento.yaml file
yaml_path = path
bento_path = path.parent
elif path.is_dir() and path.joinpath(BENTO_YAML_FILENAME).is_file():
# this is a bento directory
yaml_path = path.joinpath(BENTO_YAML_FILENAME)
bento_path = path
elif path.is_file() and path.name == "bentofile.yaml":
# this is a bentofile.yaml file
yaml_path = path
bento_path = (
pathlib.Path(working_dir) if working_dir is not None else path.parent
)
elif path.is_dir() and path.joinpath("bentofile.yaml").is_file():
# this is a bento project directory
yaml_path = path.joinpath("bentofile.yaml")
bento_path = (
pathlib.Path(working_dir) if working_dir is not None else path.parent
)
else:
raise ValueError(f"found a path but not a bento: {service_identifier}")

with open(yaml_path, "r") as f:
bento_yaml = yaml.safe_load(f)
assert "service" in bento_yaml, "service field is required in bento.yaml"
return normalize_package(bento_yaml["service"]), bento_path

elif ":" in service_identifier:
# a python import str or a built bento in store

# TODO(jiang): bento store configs are sdk configs, should be moved to sdk in the future
from bentoml._internal.configuration.containers import BentoMLContainer
from bentoml.exceptions import NotFound

bento_store = BentoMLContainer.bento_store.get()

try:
bento = bento_store.get(service_identifier)
except NotFound:
# a python import str
return normalize_package(service_identifier), pathlib.Path(
working_dir if working_dir is not None else "."
)
else:
# a built bento in bento store

yaml_path = pathlib.Path(bento.path).joinpath(BENTO_YAML_FILENAME)
with open(yaml_path, "r") as f:
bento_yaml = yaml.safe_load(f)
assert "service" in bento_yaml, "service field is required in bento.yaml"
return normalize_package(bento_yaml["service"]), yaml_path.parent
else:
raise ValueError(f"invalid service: {service_identifier}")


def import_service(
service_identifier: str,
bento_path: pathlib.Path | None = None,
) -> Service[t.Any]:
"""
import a service from a service identifier, which should be normalized by
`normalize_identifier` function.
"""
from _bentoml_sdk import Service

if bento_path is None:
bento_path = pathlib.Path(".")

# patch python path if needed
if bento_path.joinpath(BENTO_YAML_FILENAME).exists():
# a built bento
extra_python_path = str(bento_path.joinpath("src").absolute())
sys.path.insert(0, extra_python_path)
elif bento_path != pathlib.Path("."):
# a project
extra_python_path = str(bento_path.absolute())
sys.path.insert(0, extra_python_path)
else:
# a project under current directory
extra_python_path = None

# patch model store if needed
if (
bento_path.joinpath(BENTO_YAML_FILENAME).exists()
and bento_path.joinpath("models").exists()
):
from bentoml._internal.configuration.containers import BentoMLContainer
from bentoml._internal.models import ModelStore

original_model_store = BentoMLContainer.model_store.get()

BentoMLContainer.model_store.set(
ModelStore((bento_path.joinpath("models").absolute()))
)
else:
original_model_store = None

try:
module_name, _, attrs_str = service_identifier.partition(":")

assert (
module_name and attrs_str
), f'Invalid import target "{service_identifier}", must format as "<module>:<attribute>"'

instance = importlib.import_module(module_name)

for attr_str in attrs_str.split("."):
if isinstance(instance, Service):
instance = instance.dependencies[attr_str].on
else:
instance = getattr(instance, attr_str)

assert isinstance(
instance, Service
), f'import target "{module_name}:{attrs_str}" is not a bentoml.Service instance'

return instance

except (ImportError, AttributeError, KeyError, AssertionError) as e:
sys_path = sys.path.copy()
if extra_python_path is not None:
sys.path.remove(extra_python_path)

if original_model_store is not None:
from bentoml._internal.configuration.containers import BentoMLContainer

BentoMLContainer.model_store.set(original_model_store)
from bentoml.exceptions import ImportServiceError

raise ImportServiceError(
f'Failed to import service "{service_identifier}": {e}, sys.path: {sys_path}, cwd: {pathlib.Path.cwd()}'
) from None


def normalize_package(service_identifier: str) -> str:
"""
service.py:Service -> service:Service
"""
package, _, service_path = service_identifier.partition(":")
if package.endswith(".py"):
package = package[:-3]
return ":".join([package, service_path])
30 changes: 20 additions & 10 deletions src/_bentoml_impl/server/serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import logging
import os
import pathlib
import platform
import shutil
import socket
Expand Down Expand Up @@ -102,6 +103,7 @@ def create_service_watchers(
dependency_map: dict[str, str],
scheduler: ResourceAllocator,
import_string: str,
working_dir: str | None = None,
) -> tuple[list[Watcher], list[CircusSocket], str]:
from bentoml.serve import create_watcher

Expand All @@ -120,6 +122,7 @@ def create_service_watchers(
dependency_map,
scheduler,
f"{import_string}.{name}",
working_dir,
)
watchers.extend(new_watchers)
sockets.extend(new_sockets)
Expand All @@ -133,8 +136,6 @@ def create_service_watchers(
import_string,
"--fd",
f"$(circus.sockets.{svc.name})",
"--working-dir",
svc.working_dir,
"--worker-id",
"$(CIRCUS.WID)",
]
Expand All @@ -147,6 +148,7 @@ def create_service_watchers(
name=f"dependency_{svc.name}",
args=args,
numprocesses=num_workers,
working_dir=working_dir,
)
)

Expand All @@ -156,7 +158,7 @@ def create_service_watchers(
@inject
def serve_http(
bento_identifier: str | AnyService,
working_dir: str,
working_dir: str | None = None,
host: str = Provide[BentoMLContainer.http.host],
port: int = Provide[BentoMLContainer.http.port],
backlog: int = Provide[BentoMLContainer.api_server_config.backlog],
Expand All @@ -177,7 +179,6 @@ def serve_http(
from circus.sockets import CircusSocket

from bentoml._internal.log import SERVER_LOGGING_CONFIG
from bentoml._internal.service.loader import load
from bentoml._internal.utils import reserve_free_port
from bentoml._internal.utils.analytics.usage_stats import track_serve
from bentoml._internal.utils.circus import create_standalone_arbiter
Expand All @@ -186,14 +187,24 @@ def serve_http(
from bentoml.serve import ensure_prometheus_dir
from bentoml.serve import make_reload_plugin

from ..loader import import_service
from ..loader import normalize_identifier
from .allocator import ResourceAllocator

prometheus_dir = ensure_prometheus_dir()
working_dir = os.path.realpath(os.path.expanduser(working_dir))
if isinstance(bento_identifier, Service):
svc = bento_identifier
assert (
working_dir is None
), "working_dir should not be set when passing a service in process"
# use cwd
bento_path = pathlib.Path(".")
else:
svc = t.cast(AnyService, load(bento_identifier, working_dir))
bento_identifier, bento_path = normalize_identifier(
bento_identifier, working_dir
)

svc = import_service(bento_identifier, bento_path)

watchers: list[Watcher] = []
sockets: list[CircusSocket] = []
Expand All @@ -220,6 +231,7 @@ def serve_http(
dependency_map,
allocator,
f"{svc.import_string}.{name}",
str(bento_path.absolute()),
)
watchers.extend(new_watchers)
sockets.extend(new_sockets)
Expand Down Expand Up @@ -269,8 +281,6 @@ def serve_http(
else bento_identifier.import_string,
"--fd",
f"$(circus.sockets.{API_SERVER_NAME})",
"--working-dir",
working_dir,
"--backlog",
str(backlog),
"--worker-id",
Expand All @@ -291,7 +301,7 @@ def serve_http(
create_watcher(
name="api_server",
args=server_args,
working_dir=working_dir,
working_dir=str(bento_path.absolute()),
numprocesses=num_workers,
close_child_stdin=not development_mode,
)
Expand All @@ -317,7 +327,7 @@ def serve_http(
arbiter_kwargs: dict[str, t.Any] = {"watchers": watchers, "sockets": sockets}

if reload:
reload_plugin = make_reload_plugin(working_dir, bentoml_home)
reload_plugin = make_reload_plugin(str(bento_path.absolute()), bentoml_home)
arbiter_kwargs["plugins"] = [reload_plugin]

if development_mode:
Expand Down
14 changes: 3 additions & 11 deletions src/_bentoml_impl/worker/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@

import click

if t.TYPE_CHECKING:
from _bentoml_sdk import Service


@click.command()
@click.argument("bento_identifier", type=click.STRING, required=False, default=".")
Expand All @@ -27,11 +24,6 @@
@click.option(
"--backlog", type=click.INT, default=2048, help="Backlog size for the socket"
)
@click.option(
"--working-dir",
type=click.Path(exists=True),
help="Working directory for the API server",
)
@click.option(
"--prometheus-dir",
type=click.Path(exists=True),
Expand Down Expand Up @@ -109,7 +101,6 @@ def main(
runner_map: str | None,
backlog: int,
worker_env: str | None,
working_dir: str | None,
worker_id: int | None,
prometheus_dir: str | None,
ssl_certfile: str | None,
Expand Down Expand Up @@ -141,18 +132,19 @@ def main(
)
os.environ.update(env_list[worker_key])

from _bentoml_impl.loader import import_service
from bentoml._internal.container import BentoMLContainer
from bentoml._internal.context import component_context
from bentoml._internal.log import configure_server_logging
from bentoml._internal.service import load

from ..server.app import ServiceAppFactory

if runner_map:
BentoMLContainer.remote_runner_mapping.set(
t.cast(t.Dict[str, str], json.loads(runner_map))
)
service = t.cast("Service[t.Any]", load(bento_identifier, working_dir=working_dir))

service = import_service(bento_identifier)
service.inject_config()

component_context.component_type = "api_server"
Expand Down
2 changes: 0 additions & 2 deletions src/_bentoml_sdk/service/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,6 @@ def serve_http(

configure_logging()

if working_dir is None:
working_dir = self.working_dir
serve_http(
self,
working_dir=working_dir,
Expand Down
6 changes: 6 additions & 0 deletions src/bentoml/_internal/bento/bento.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,12 @@ def from_fs(cls, item_fs: FS) -> Bento:

return res

@classmethod
def from_path(cls, item_path: str) -> Bento:
item_path = os.path.expanduser(item_path)
item_fs = fs.open_fs(encode_path_for_uri(item_path))
return cls.from_fs(item_fs)

def export(
self,
path: str,
Expand Down
13 changes: 13 additions & 0 deletions src/bentoml/_internal/service/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,19 @@ def load(
# Load from local BentoStore
return load_bento(bento_identifier)

try:
from _bentoml_impl.loader import import_service as import_1_2_service
from _bentoml_impl.loader import normalize_identifier

_bento_identifier, _working_dir = normalize_identifier(
bento_identifier, working_dir
)
_svc = import_1_2_service(_bento_identifier, _working_dir)

return _svc
except (ValueError, ImportServiceError, ImportError):
logger.debug("Failed to load bento with v1.2 loader, fallback to old loader")

if os.path.isdir(os.path.expanduser(bento_identifier)):
bento_path = os.path.abspath(os.path.expanduser(bento_identifier))

Expand Down
Loading

0 comments on commit 7813cfa

Please sign in to comment.