Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bentoserver client #3321

Merged
merged 11 commits into from
Dec 9, 2022
2 changes: 2 additions & 0 deletions src/bentoml/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from .bentos import list # pylint: disable=W0622
from .bentos import pull
from .bentos import push
from .bentos import serve
from .bentos import delete
from .bentos import export_bento
from .bentos import import_bento
Expand Down Expand Up @@ -151,6 +152,7 @@
"load",
"push",
"pull",
"serve",
"Bento",
# Framework specific modules
"catboost",
Expand Down
31 changes: 31 additions & 0 deletions src/bentoml/_internal/server/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
Server class for getting the Bento client and managing server process
"""

from __future__ import annotations

import subprocess


class Server:
def __init__(self, process: subprocess.Popen[bytes], host: str, port: int) -> None:
self._process = process
self._host = host
self._port = port

def get_client(self):
from bentoml.client import Client

Client.wait_until_server_is_ready(self._host, self._port, 10)
return Client.from_url(f"http://localhost:{self._port}")

def stop(self) -> None:
self.process.kill()

@property
def process(self) -> subprocess.Popen[bytes]:
return self._process

@property
def address(self) -> str:
return f"{self._host}:{self._port}"
81 changes: 81 additions & 0 deletions src/bentoml/bentos.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

from __future__ import annotations

import sys
import typing as t
import logging
import subprocess
from typing import TYPE_CHECKING

from simple_di import inject
Expand All @@ -16,6 +18,7 @@
from ._internal.tag import Tag
from ._internal.bento import Bento
from ._internal.utils import resolve_user_filepath
from ._internal.server.server import Server
from ._internal.bento.build_config import BentoBuildConfig
from ._internal.configuration.containers import BentoMLContainer

Expand Down Expand Up @@ -419,3 +422,81 @@ def containerize(bento_tag: Tag | str, **kwargs: t.Any) -> bool:
except Exception as e: # pylint: disable=broad-except
logger.error("Failed to containerize %s: %s", bento_tag, e)
return False


@inject
def serve(
bento: str,
production: bool = False,
port: int = Provide[BentoMLContainer.http.port],
host: str = Provide[BentoMLContainer.http.host],
server_type: str = "http",
api_workers: int | None = Provide[BentoMLContainer.api_server_workers],
backlog: int = Provide[BentoMLContainer.api_server_config.backlog],
reload: bool = False,
working_dir: str | None = None,
ssl_certfile: str | None = None,
ssl_keyfile: str | None = None,
ssl_ca_certs: str | None = None,
# HTTP-specific args
ssl_keyfile_password: str | None = None,
ssl_version: int | None = None,
ssl_cert_reqs: int | None = None,
ssl_ciphers: str | None = None,
# GRPC-specific args
enable_reflection: bool = Provide[BentoMLContainer.grpc.reflection.enabled],
enable_channelz: bool = Provide[BentoMLContainer.grpc.channelz.enabled],
max_concurrent_streams: int
| None = Provide[BentoMLContainer.grpc.max_concurrent_streams],
) -> Server:
"""Launch a BentoServer and returns a client that exposes all APIs defined in target service"""

if server_type.lower() not in ["http", "grpc"]:
raise ValueError('Server type must either be "http" or "grpc"')

args = [
"-m",
"bentoml",
"serve",
bento,
"--port",
str(port),
"--host",
host,
"--backlog",
str(backlog),
]
if production:
args.append("--production")
if reload:
args.extend(["--reload", str(reload)])
if api_workers is not None:
args.extend(["--api-workers", str(api_workers)])
if working_dir is not None:
args.extend(["--working-dir", str(working_dir)])
if ssl_certfile is not None:
args.extend(["--ssl-certfile", ssl_certfile])
if ssl_keyfile is not None:
args.extend(["--ssl-keyfile", ssl_keyfile])
if ssl_ca_certs is not None:
args.extend(["--ssl-ca-certs", ssl_ca_certs])
if server_type.lower() == "http":
if ssl_keyfile_password is not None:
args.extend(["--ssl-keyfile-password", ssl_keyfile_password])
if ssl_version is not None:
args.extend(["--ssl-version", str(ssl_version)])
if ssl_cert_reqs is not None:
args.extend(["--ssl-cert-reqs", str(ssl_cert_reqs)])
if ssl_ciphers is not None:
args.extend(["--ssl-ciphers", ssl_ciphers])
if server_type.lower() == "grpc":
if enable_reflection:
args.extend(["--enable-reflection", str(enable_reflection)])
if enable_channelz:
args.extend(["--enable-channelz", str(enable_channelz)])
if max_concurrent_streams is not None:
args.extend(["--max-concurrent-streams", str(max_concurrent_streams)])

process = subprocess.Popen(args, executable=sys.executable)

return Server(process, host, port)
17 changes: 17 additions & 0 deletions src/bentoml/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,23 @@ async def _call(
) -> t.Any:
aarnphm marked this conversation as resolved.
Show resolved Hide resolved
raise NotImplementedError

@staticmethod
def wait_until_server_is_ready(host: str, port: int, timeout: int) -> None:
qu8n marked this conversation as resolved.
Show resolved Hide resolved
import time

time_end = time.time() + timeout
status = None
while status != 200:
try:
conn = HTTPConnection(host, port)
conn.request("GET", "/readyz")
status = conn.getresponse().status
except ConnectionRefusedError:
print("Connection refused. Trying again...")
if time.time() > time_end:
raise TimeoutError("The server took too long to get ready")
time.sleep(1)

@staticmethod
def from_url(server_url: str) -> Client:
server_url = server_url if "://" in server_url else "http://" + server_url
Expand Down