Skip to content

Commit

Permalink
Merge pull request #304 from azdolinski/main
Browse files Browse the repository at this point in the history
Add examples/pipelines/integrations for DifyAI and N8N
  • Loading branch information
tjbck authored Oct 22, 2024
2 parents 1136b83 + ca35b9d commit 1c02f70
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 0 deletions.
84 changes: 84 additions & 0 deletions examples/pipelines/integrations/dify_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from typing import List, Union, Generator, Iterator, Optional
from pprint import pprint
import requests, json, warnings

# Uncomment to disable SSL verification warnings if needed.
# warnings.filterwarnings('ignore', message='Unverified HTTPS request')

class Pipeline:
def __init__(self):
self.name = "Dify Agent Pipeline"
self.api_url = "http://dify.hostname/v1/workflows/run" # Set correct hostname
self.api_key = "app-dify-key" # Insert your actual API key here.v
self.api_request_stream = True # Dify support stream
self.verify_ssl = True
self.debug = False

async def on_startup(self):
# This function is called when the server is started.
print(f"on_startup: {__name__}")
pass

async def on_shutdown(self):
# This function is called when the server is shutdown.
print(f"on_shutdown: {__name__}")
pass

async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
# This function is called before the OpenAI API request is made. You can modify the form data before it is sent to the OpenAI API.
print(f"inlet: {__name__}")
if self.debug:
print(f"inlet: {__name__} - body:")
pprint(body)
print(f"inlet: {__name__} - user:")
pprint(user)
return body

async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
# This function is called after the OpenAI API response is completed. You can modify the messages after they are received from the OpenAI API.
print(f"outlet: {__name__}")
if self.debug:
print(f"outlet: {__name__} - body:")
pprint(body)
print(f"outlet: {__name__} - user:")
pprint(user)
return body

def pipe(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> Union[str, Generator, Iterator]:
print(f"pipe: {__name__}")

if self.debug:
print(f"pipe: {__name__} - received message from user: {user_message}")

# Set reponse mode Dify API parameter
if self.api_request_stream is True:
response_mode = "streaming"
else:
response_mode = "blocking"

# This function triggers the workflow using the specified API.
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
data = {
"inputs": {"prompt": user_message},
"response_mode": response_mode,
"user": body["user"]["email"]
}

response = requests.post(self.api_url, headers=headers, json=data, stream=self.api_request_stream, verify=self.verify_ssl)
if response.status_code == 200:
# Process and yield each chunk from the response
for line in response.iter_lines():
if line:
try:
# Remove 'data: ' prefix and parse JSON
json_data = json.loads(line.decode('utf-8').replace('data: ', ''))
# Extract and yield only the 'text' field from the nested 'data' object
if 'data' in json_data and 'text' in json_data['data']:
yield json_data['data']['text']
except json.JSONDecodeError:
print(f"Failed to parse JSON: {line}")
else:
yield f"Workflow request failed with status code: {response.status_code}"
79 changes: 79 additions & 0 deletions examples/pipelines/integrations/n8n_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from typing import List, Union, Generator, Iterator, Optional
from pprint import pprint
import requests, json, warnings

# Uncomment to disable SSL verification warnings if needed.
# warnings.filterwarnings('ignore', message='Unverified HTTPS request')

class Pipeline:
def __init__(self):
self.name = "N8N Agent Pipeline"
self.api_url = "https://n8n.host/webhook/myflow" # Set correct hostname
self.api_key = "" # Insert your actual API key here
self.verify_ssl = True
self.debug = False
# Please note that N8N do not support stream reponses

async def on_startup(self):
# This function is called when the server is started.
print(f"on_startup: {__name__}")
pass

async def on_shutdown(self):
# This function is called when the server is shutdown.
print(f"on_shutdown: {__name__}")
pass

async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
# This function is called before the OpenAI API request is made. You can modify the form data before it is sent to the OpenAI API.
print(f"inlet: {__name__}")
if self.debug:
print(f"inlet: {__name__} - body:")
pprint(body)
print(f"inlet: {__name__} - user:")
pprint(user)
return body

async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
# This function is called after the OpenAI API response is completed. You can modify the messages after they are received from the OpenAI API.
print(f"outlet: {__name__}")
if self.debug:
print(f"outlet: {__name__} - body:")
pprint(body)
print(f"outlet: {__name__} - user:")
pprint(user)
return body

def pipe(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> Union[str, Generator, Iterator]:
# This is where you can add your custom pipelines like RAG.
print(f"pipe: {__name__}")

if self.debug:
print(f"pipe: {__name__} - received message from user: {user_message}")

# This function triggers the workflow using the specified API.
headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
data = {
"inputs": {"prompt": user_message},
"user": body["user"]["email"]
}

response = requests.post(self.api_url, headers=headers, json=data, verify=self.verify_ssl)
if response.status_code == 200:
# Process and yield each chunk from the response
try:
for line in response.iter_lines():
if line:
# Decode each line assuming UTF-8 encoding and directly parse it as JSON
json_data = json.loads(line.decode('utf-8'))
# Check if 'output' exists in json_data and yield it
if 'output' in json_data:
yield json_data['output']
except json.JSONDecodeError as e:
print(f"Failed to parse JSON from line. Error: {str(e)}")
yield "Error in JSON parsing."
else:
yield f"Workflow request failed with status code: {response.status_code}"

0 comments on commit 1c02f70

Please sign in to comment.