From b4b202596a9351741ee23b98692b4e1c623bfa7f Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 7 Oct 2024 06:09:50 +0200 Subject: [PATCH 1/6] Simple example of a dora openai server --- examples/openai-server-example/.gitignore | 1 + examples/openai-server-example/README.md | 17 +++ examples/openai-server-example/api_client.py | 39 ++++++ examples/openai-server-example/dataflow.yml | 16 +++ node-hub/dora-openai-server/README.md | 5 + .../dora_openai_server/__init__.py | 11 ++ .../dora_openai_server/main.py | 129 ++++++++++++++++++ node-hub/dora-openai-server/pyproject.toml | 26 ++++ .../tests/test_dora_openai_server.py | 5 + 9 files changed, 249 insertions(+) create mode 100644 examples/openai-server-example/.gitignore create mode 100644 examples/openai-server-example/README.md create mode 100644 examples/openai-server-example/api_client.py create mode 100644 examples/openai-server-example/dataflow.yml create mode 100644 node-hub/dora-openai-server/README.md create mode 100644 node-hub/dora-openai-server/dora_openai_server/__init__.py create mode 100644 node-hub/dora-openai-server/dora_openai_server/main.py create mode 100644 node-hub/dora-openai-server/pyproject.toml create mode 100644 node-hub/dora-openai-server/tests/test_dora_openai_server.py diff --git a/examples/openai-server-example/.gitignore b/examples/openai-server-example/.gitignore new file mode 100644 index 00000000..eede66d8 --- /dev/null +++ b/examples/openai-server-example/.gitignore @@ -0,0 +1 @@ +*.pt \ No newline at end of file diff --git a/examples/openai-server-example/README.md b/examples/openai-server-example/README.md new file mode 100644 index 00000000..ebc974ec --- /dev/null +++ b/examples/openai-server-example/README.md @@ -0,0 +1,17 @@ +# Dora openai echo example + +This is a quick example to showcase how use the `dora-openai-server` to receive and send back data. + +Dora Openai Server is still experimental and may change in the future. + +Make sure to have, dora, pip and cargo installed. + +```bash +dora up +dora build dataflow.yml +dora start dataflow.yml + +# In a separate terminal +python api_client.py +dora stop +``` diff --git a/examples/openai-server-example/api_client.py b/examples/openai-server-example/api_client.py new file mode 100644 index 00000000..77497b59 --- /dev/null +++ b/examples/openai-server-example/api_client.py @@ -0,0 +1,39 @@ +from openai import OpenAI +import os + +client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy_api_key") + + +def test_list_models(): + try: + models = client.models.list() + print("Available models:") + for model in models.data: + print(f"- {model.id}") + except Exception as e: + print(f"Error listing models: {e}") + + +def test_chat_completion(user_input): + try: + response = client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": user_input}, + ], + ) + print("Chat Completion Response:") + print(response.choices[0].message.content) + except Exception as e: + print(f"Error in chat completion: {e}") + + +if __name__ == "__main__": + print("Testing API endpoints...") + test_list_models() + print("\n" + "=" * 50 + "\n") + + chat_input = input("Enter a message for chat completion: ") + test_chat_completion(chat_input) + print("\n" + "=" * 50 + "\n") diff --git a/examples/openai-server-example/dataflow.yml b/examples/openai-server-example/dataflow.yml new file mode 100644 index 00000000..6b4efdce --- /dev/null +++ b/examples/openai-server-example/dataflow.yml @@ -0,0 +1,16 @@ +nodes: + - id: dora-openai-server + build: pip install -e ../../node-hub/dora-openai-server + path: dora-openai-server + outputs: + - v1/chat/completions + inputs: + echo: dora-echo/echo + + - id: dora-echo + build: pip install -e ../../node-hub/dora-echo + path: dora-echo + inputs: + echo: dora-openai-server/v1/chat/completions + outputs: + - echo diff --git a/node-hub/dora-openai-server/README.md b/node-hub/dora-openai-server/README.md new file mode 100644 index 00000000..aa4e3aac --- /dev/null +++ b/node-hub/dora-openai-server/README.md @@ -0,0 +1,5 @@ +# Dora OpenAI Server + +This is an experimental to expose an openai server endpoint with dora. + +Check example at [examples/api-echo](examples/api-echo) diff --git a/node-hub/dora-openai-server/dora_openai_server/__init__.py b/node-hub/dora-openai-server/dora_openai_server/__init__.py new file mode 100644 index 00000000..ac3cbef9 --- /dev/null +++ b/node-hub/dora-openai-server/dora_openai_server/__init__.py @@ -0,0 +1,11 @@ +import os + +# Define the path to the README file relative to the package directory +readme_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "README.md") + +# Read the content of the README file +try: + with open(readme_path, "r", encoding="utf-8") as f: + __doc__ = f.read() +except FileNotFoundError: + __doc__ = "README file not found." diff --git a/node-hub/dora-openai-server/dora_openai_server/main.py b/node-hub/dora-openai-server/dora_openai_server/main.py new file mode 100644 index 00000000..2332c51e --- /dev/null +++ b/node-hub/dora-openai-server/dora_openai_server/main.py @@ -0,0 +1,129 @@ +from fastapi import FastAPI +from pydantic import BaseModel +from typing import List, Optional +import uvicorn +from dora import Node +import asyncio +import pyarrow as pa +import ast +import os + +DORA_RESPONSE_TIMEOUT = 10 +app = FastAPI() + + +class ChatCompletionMessage(BaseModel): + role: str + content: str + + +class ChatCompletionRequest(BaseModel): + model: str + messages: List[ChatCompletionMessage] + temperature: Optional[float] = 1.0 + max_tokens: Optional[int] = 100 + + +class ChatCompletionResponse(BaseModel): + id: str + object: str + created: int + model: str + choices: List[dict] + usage: dict + + +if not os.getenv("PYTEST_CURRENT_TEST"): + node = Node() # provide the name to connect to the dataflow if dynamic node + + +@app.post("/v1/chat/completions") +async def create_chat_completion(request: ChatCompletionRequest): + data = next( + (msg.content for msg in request.messages if msg.role == "user"), + "No user message found.", + ) + + # Convert user_message to Arrow array + # user_message_array = pa.array([user_message]) + # Publish user message to dora-echo + # node.send_output("user_query", user_message_array) + + try: + data = ast.literal_eval(data) + except ValueError: + print("Passing input as string") + except SyntaxError: + print("Passing input as string") + if isinstance(data, list): + data = pa.array(data) # initialize pyarrow array + elif isinstance(data, str): + data = pa.array([data]) + elif isinstance(data, int): + data = pa.array([data]) + elif isinstance(data, float): + data = pa.array([data]) + elif isinstance(data, dict): + data = pa.array([data]) + else: + data = pa.array(data) # initialize pyarrow array + node.send_output("v1/chat/completions", data) + + # Wait for response from dora-echo + event = node.next(timeout=DORA_RESPONSE_TIMEOUT) + if event["type"] == "ERROR": + print("Timedout") + response_str = "No response received" + else: + + response = event["value"] + response_str = response[0].as_py() if response else "No response received" + + return ChatCompletionResponse( + id="chatcmpl-1234", + object="chat.completion", + created=1234567890, + model=request.model, + choices=[ + { + "index": 0, + "message": {"role": "assistant", "content": response_str}, + "finish_reason": "stop", + } + ], + usage={ + "prompt_tokens": len(data), + "completion_tokens": len(response_str), + "total_tokens": len(data) + len(response_str), + }, + ) + + +@app.get("/v1/models") +async def list_models(): + return { + "object": "list", + "data": [ + { + "id": "gpt-3.5-turbo", + "object": "model", + "created": 1677610602, + "owned_by": "openai", + } + ], + } + + +async def run_fastapi(): + config = uvicorn.Config(app, host="0.0.0.0", port=8000, log_level="info") + server = uvicorn.Server(config) + + await asyncio.gather(server.serve()) + + +def main(): + asyncio.run(run_fastapi()) + + +if __name__ == "__main__": + asyncio.run(run_fastapi()) diff --git a/node-hub/dora-openai-server/pyproject.toml b/node-hub/dora-openai-server/pyproject.toml new file mode 100644 index 00000000..c0a1d678 --- /dev/null +++ b/node-hub/dora-openai-server/pyproject.toml @@ -0,0 +1,26 @@ +[tool.poetry] +name = "dora-openai-server" +version = "0.3.6" +authors = [ + "Haixuan Xavier Tao ", + "Enzo Le Van ", +] +description = "Dora OpenAI API Server" +license = "MIT License" +homepage = "https://github.com/dora-rs/dora.git" +documentation = "https://github.com/dora-rs/dora/blob/main/node-hub/dora-openai-server/README.md" +readme = "README.md" +packages = [{ include = "dora_openai_server" }] + +[tool.poetry.dependencies] +dora-rs = "^0.3.6" +numpy = "< 2.0.0" +pyarrow = ">= 5.0.0" +python = "^3.7" + +[tool.poetry.scripts] +dora-openai-server = "dora_openai_server.main:main" + +[build-system] +requires = ["poetry-core>=1.8.0"] +build-backend = "poetry.core.masonry.api" diff --git a/node-hub/dora-openai-server/tests/test_dora_openai_server.py b/node-hub/dora-openai-server/tests/test_dora_openai_server.py new file mode 100644 index 00000000..904f148c --- /dev/null +++ b/node-hub/dora-openai-server/tests/test_dora_openai_server.py @@ -0,0 +1,5 @@ +import pytest + + +def test_import_main(): + pass From 05056f7fcee2ceee3e7694cdd9ee01516ab75d2f Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 7 Oct 2024 06:21:06 +0200 Subject: [PATCH 2/6] Fix small typo and add dependency for dora openai server --- examples/{openai-server-example => openai-server}/.gitignore | 0 examples/{openai-server-example => openai-server}/README.md | 2 +- .../{openai-server-example => openai-server}/dataflow.yml | 0 .../api_client.py => openai-server/openai_api_client.py} | 1 - node-hub/dora-openai-server/dora_openai_server/main.py | 3 +-- node-hub/dora-openai-server/pyproject.toml | 4 ++++ 6 files changed, 6 insertions(+), 4 deletions(-) rename examples/{openai-server-example => openai-server}/.gitignore (100%) rename examples/{openai-server-example => openai-server}/README.md (92%) rename examples/{openai-server-example => openai-server}/dataflow.yml (100%) rename examples/{openai-server-example/api_client.py => openai-server/openai_api_client.py} (99%) diff --git a/examples/openai-server-example/.gitignore b/examples/openai-server/.gitignore similarity index 100% rename from examples/openai-server-example/.gitignore rename to examples/openai-server/.gitignore diff --git a/examples/openai-server-example/README.md b/examples/openai-server/README.md similarity index 92% rename from examples/openai-server-example/README.md rename to examples/openai-server/README.md index ebc974ec..34c0aad9 100644 --- a/examples/openai-server-example/README.md +++ b/examples/openai-server/README.md @@ -12,6 +12,6 @@ dora build dataflow.yml dora start dataflow.yml # In a separate terminal -python api_client.py +python openai_api_client.py dora stop ``` diff --git a/examples/openai-server-example/dataflow.yml b/examples/openai-server/dataflow.yml similarity index 100% rename from examples/openai-server-example/dataflow.yml rename to examples/openai-server/dataflow.yml diff --git a/examples/openai-server-example/api_client.py b/examples/openai-server/openai_api_client.py similarity index 99% rename from examples/openai-server-example/api_client.py rename to examples/openai-server/openai_api_client.py index 77497b59..7b8a51dd 100644 --- a/examples/openai-server-example/api_client.py +++ b/examples/openai-server/openai_api_client.py @@ -1,5 +1,4 @@ from openai import OpenAI -import os client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy_api_key") diff --git a/node-hub/dora-openai-server/dora_openai_server/main.py b/node-hub/dora-openai-server/dora_openai_server/main.py index 2332c51e..34bd24b7 100644 --- a/node-hub/dora-openai-server/dora_openai_server/main.py +++ b/node-hub/dora-openai-server/dora_openai_server/main.py @@ -33,8 +33,7 @@ class ChatCompletionResponse(BaseModel): usage: dict -if not os.getenv("PYTEST_CURRENT_TEST"): - node = Node() # provide the name to connect to the dataflow if dynamic node +node = Node() # provide the name to connect to the dataflow if dynamic node @app.post("/v1/chat/completions") diff --git a/node-hub/dora-openai-server/pyproject.toml b/node-hub/dora-openai-server/pyproject.toml index c0a1d678..ba9bc605 100644 --- a/node-hub/dora-openai-server/pyproject.toml +++ b/node-hub/dora-openai-server/pyproject.toml @@ -17,6 +17,10 @@ dora-rs = "^0.3.6" numpy = "< 2.0.0" pyarrow = ">= 5.0.0" python = "^3.7" +fastapi = "^0.115" +asyncio = "^3.4" +uvicorn = "^0.31" +pydantic = "^2.9" [tool.poetry.scripts] dora-openai-server = "dora_openai_server.main:main" From 6f3bdb179c208220b03ac67bac8d01b2d130e708 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 7 Oct 2024 06:25:19 +0200 Subject: [PATCH 3/6] Fix README.md in openai server example --- node-hub/dora-openai-server/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node-hub/dora-openai-server/README.md b/node-hub/dora-openai-server/README.md index aa4e3aac..b044f32b 100644 --- a/node-hub/dora-openai-server/README.md +++ b/node-hub/dora-openai-server/README.md @@ -2,4 +2,4 @@ This is an experimental to expose an openai server endpoint with dora. -Check example at [examples/api-echo](examples/api-echo) +Check example at [examples/openai-server](../../examples/openai-server/README.md) From c8cc2f8e1d58ad63e9705da58f8852fa5f1f8e85 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 7 Oct 2024 08:22:56 +0200 Subject: [PATCH 4/6] Fix ungraceful stop for dora-openai-server --- node-hub/dora-openai-server/dora_openai_server/main.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/node-hub/dora-openai-server/dora_openai_server/main.py b/node-hub/dora-openai-server/dora_openai_server/main.py index 34bd24b7..fa2d90f2 100644 --- a/node-hub/dora-openai-server/dora_openai_server/main.py +++ b/node-hub/dora-openai-server/dora_openai_server/main.py @@ -6,7 +6,6 @@ import asyncio import pyarrow as pa import ast -import os DORA_RESPONSE_TIMEOUT = 10 app = FastAPI() @@ -74,7 +73,6 @@ async def create_chat_completion(request: ChatCompletionRequest): print("Timedout") response_str = "No response received" else: - response = event["value"] response_str = response[0].as_py() if response else "No response received" @@ -117,7 +115,12 @@ async def run_fastapi(): config = uvicorn.Config(app, host="0.0.0.0", port=8000, log_level="info") server = uvicorn.Server(config) - await asyncio.gather(server.serve()) + server = asyncio.gather(server.serve()) + while True: + await asyncio.sleep(1) + event = node.next(0.001) + if event["type"] == "STOP": + break def main(): From 7dbcff8863867ab92880a3a3266adf7c6a55b6b0 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 7 Oct 2024 08:29:36 +0200 Subject: [PATCH 5/6] Fix linting --- node-hub/dora-openai-server/tests/test_dora_openai_server.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/node-hub/dora-openai-server/tests/test_dora_openai_server.py b/node-hub/dora-openai-server/tests/test_dora_openai_server.py index 904f148c..58bc516c 100644 --- a/node-hub/dora-openai-server/tests/test_dora_openai_server.py +++ b/node-hub/dora-openai-server/tests/test_dora_openai_server.py @@ -1,5 +1,2 @@ -import pytest - - def test_import_main(): pass From 9e9c4bab0b97a21eb560918fa91254dda6043c22 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 7 Oct 2024 09:18:45 +0200 Subject: [PATCH 6/6] Make openai server resilient to unusual input and pin response message topic --- examples/openai-server/dataflow.yml | 2 +- .../dora_openai_server/main.py | 18 +++++++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/examples/openai-server/dataflow.yml b/examples/openai-server/dataflow.yml index 6b4efdce..4941df1a 100644 --- a/examples/openai-server/dataflow.yml +++ b/examples/openai-server/dataflow.yml @@ -5,7 +5,7 @@ nodes: outputs: - v1/chat/completions inputs: - echo: dora-echo/echo + v1/chat/completions: dora-echo/echo - id: dora-echo build: pip install -e ../../node-hub/dora-echo diff --git a/node-hub/dora-openai-server/dora_openai_server/main.py b/node-hub/dora-openai-server/dora_openai_server/main.py index fa2d90f2..115d37ed 100644 --- a/node-hub/dora-openai-server/dora_openai_server/main.py +++ b/node-hub/dora-openai-server/dora_openai_server/main.py @@ -68,13 +68,17 @@ async def create_chat_completion(request: ChatCompletionRequest): node.send_output("v1/chat/completions", data) # Wait for response from dora-echo - event = node.next(timeout=DORA_RESPONSE_TIMEOUT) - if event["type"] == "ERROR": - print("Timedout") - response_str = "No response received" - else: - response = event["value"] - response_str = response[0].as_py() if response else "No response received" + while True: + event = node.next(timeout=DORA_RESPONSE_TIMEOUT) + if event["type"] == "ERROR": + response_str = "No response received. Err: " + event["value"][0].as_py() + break + elif event["type"] == "INPUT" and event["id"] == "v1/chat/completions": + response = event["value"] + response_str = response[0].as_py() if response else "No response received" + break + else: + pass return ChatCompletionResponse( id="chatcmpl-1234",