Skip to content

Commit

Permalink
Dora openai server example (#676)
Browse files Browse the repository at this point in the history
You will find in this pull request a quick implementation of an openAI
server makes it possible to make request from an openai endpoint client
and receive it within the dataflow.

Note that I have only implemented chat completion and this is still
experimental.

Get started with:

```bash
cd examples/openai-server
dora build dataflow.yml
dora start dataflow.yml

# In a separate terminal
python openai_api_client.py
```
  • Loading branch information
haixuanTao authored Oct 7, 2024
2 parents 4b7be45 + 9e9c4ba commit e1c9d67
Show file tree
Hide file tree
Showing 9 changed files with 255 additions and 0 deletions.
1 change: 1 addition & 0 deletions examples/openai-server/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.pt
17 changes: 17 additions & 0 deletions examples/openai-server/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Dora openai echo example

This is a quick example to showcase how use the `dora-openai-server` to receive and send back data.

Dora Openai Server is still experimental and may change in the future.

Make sure to have, dora, pip and cargo installed.

```bash
dora up
dora build dataflow.yml
dora start dataflow.yml

# In a separate terminal
python openai_api_client.py
dora stop
```
16 changes: 16 additions & 0 deletions examples/openai-server/dataflow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
nodes:
- id: dora-openai-server
build: pip install -e ../../node-hub/dora-openai-server
path: dora-openai-server
outputs:
- v1/chat/completions
inputs:
v1/chat/completions: dora-echo/echo

- id: dora-echo
build: pip install -e ../../node-hub/dora-echo
path: dora-echo
inputs:
echo: dora-openai-server/v1/chat/completions
outputs:
- echo
38 changes: 38 additions & 0 deletions examples/openai-server/openai_api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from openai import OpenAI

client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy_api_key")


def test_list_models():
try:
models = client.models.list()
print("Available models:")
for model in models.data:
print(f"- {model.id}")
except Exception as e:
print(f"Error listing models: {e}")


def test_chat_completion(user_input):
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": user_input},
],
)
print("Chat Completion Response:")
print(response.choices[0].message.content)
except Exception as e:
print(f"Error in chat completion: {e}")


if __name__ == "__main__":
print("Testing API endpoints...")
test_list_models()
print("\n" + "=" * 50 + "\n")

chat_input = input("Enter a message for chat completion: ")
test_chat_completion(chat_input)
print("\n" + "=" * 50 + "\n")
5 changes: 5 additions & 0 deletions node-hub/dora-openai-server/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Dora OpenAI Server

This is an experimental to expose an openai server endpoint with dora.

Check example at [examples/openai-server](../../examples/openai-server/README.md)
11 changes: 11 additions & 0 deletions node-hub/dora-openai-server/dora_openai_server/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import os

# Define the path to the README file relative to the package directory
readme_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "README.md")

# Read the content of the README file
try:
with open(readme_path, "r", encoding="utf-8") as f:
__doc__ = f.read()
except FileNotFoundError:
__doc__ = "README file not found."
135 changes: 135 additions & 0 deletions node-hub/dora-openai-server/dora_openai_server/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List, Optional
import uvicorn
from dora import Node
import asyncio
import pyarrow as pa
import ast

DORA_RESPONSE_TIMEOUT = 10
app = FastAPI()


class ChatCompletionMessage(BaseModel):
role: str
content: str


class ChatCompletionRequest(BaseModel):
model: str
messages: List[ChatCompletionMessage]
temperature: Optional[float] = 1.0
max_tokens: Optional[int] = 100


class ChatCompletionResponse(BaseModel):
id: str
object: str
created: int
model: str
choices: List[dict]
usage: dict


node = Node() # provide the name to connect to the dataflow if dynamic node


@app.post("/v1/chat/completions")
async def create_chat_completion(request: ChatCompletionRequest):
data = next(
(msg.content for msg in request.messages if msg.role == "user"),
"No user message found.",
)

# Convert user_message to Arrow array
# user_message_array = pa.array([user_message])
# Publish user message to dora-echo
# node.send_output("user_query", user_message_array)

try:
data = ast.literal_eval(data)
except ValueError:
print("Passing input as string")
except SyntaxError:
print("Passing input as string")
if isinstance(data, list):
data = pa.array(data) # initialize pyarrow array
elif isinstance(data, str):
data = pa.array([data])
elif isinstance(data, int):
data = pa.array([data])
elif isinstance(data, float):
data = pa.array([data])
elif isinstance(data, dict):
data = pa.array([data])
else:
data = pa.array(data) # initialize pyarrow array
node.send_output("v1/chat/completions", data)

# Wait for response from dora-echo
while True:
event = node.next(timeout=DORA_RESPONSE_TIMEOUT)
if event["type"] == "ERROR":
response_str = "No response received. Err: " + event["value"][0].as_py()
break
elif event["type"] == "INPUT" and event["id"] == "v1/chat/completions":
response = event["value"]
response_str = response[0].as_py() if response else "No response received"
break
else:
pass

return ChatCompletionResponse(
id="chatcmpl-1234",
object="chat.completion",
created=1234567890,
model=request.model,
choices=[
{
"index": 0,
"message": {"role": "assistant", "content": response_str},
"finish_reason": "stop",
}
],
usage={
"prompt_tokens": len(data),
"completion_tokens": len(response_str),
"total_tokens": len(data) + len(response_str),
},
)


@app.get("/v1/models")
async def list_models():
return {
"object": "list",
"data": [
{
"id": "gpt-3.5-turbo",
"object": "model",
"created": 1677610602,
"owned_by": "openai",
}
],
}


async def run_fastapi():
config = uvicorn.Config(app, host="0.0.0.0", port=8000, log_level="info")
server = uvicorn.Server(config)

server = asyncio.gather(server.serve())
while True:
await asyncio.sleep(1)
event = node.next(0.001)
if event["type"] == "STOP":
break


def main():
asyncio.run(run_fastapi())


if __name__ == "__main__":
asyncio.run(run_fastapi())
30 changes: 30 additions & 0 deletions node-hub/dora-openai-server/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[tool.poetry]
name = "dora-openai-server"
version = "0.3.6"
authors = [
"Haixuan Xavier Tao <[email protected]>",
"Enzo Le Van <[email protected]>",
]
description = "Dora OpenAI API Server"
license = "MIT License"
homepage = "https://github.com/dora-rs/dora.git"
documentation = "https://github.com/dora-rs/dora/blob/main/node-hub/dora-openai-server/README.md"
readme = "README.md"
packages = [{ include = "dora_openai_server" }]

[tool.poetry.dependencies]
dora-rs = "^0.3.6"
numpy = "< 2.0.0"
pyarrow = ">= 5.0.0"
python = "^3.7"
fastapi = "^0.115"
asyncio = "^3.4"
uvicorn = "^0.31"
pydantic = "^2.9"

[tool.poetry.scripts]
dora-openai-server = "dora_openai_server.main:main"

[build-system]
requires = ["poetry-core>=1.8.0"]
build-backend = "poetry.core.masonry.api"
2 changes: 2 additions & 0 deletions node-hub/dora-openai-server/tests/test_dora_openai_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def test_import_main():
pass

0 comments on commit e1c9d67

Please sign in to comment.