From 0b8469bcd15c94970a23dc4369000c842d5ddf78 Mon Sep 17 00:00:00 2001 From: Artur Zdolinski Date: Tue, 22 Oct 2024 23:45:27 +0200 Subject: [PATCH 1/4] Create dify_pipeline.py --- .../pipelines/integrations/dify_pipeline.py | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 examples/pipelines/integrations/dify_pipeline.py diff --git a/examples/pipelines/integrations/dify_pipeline.py b/examples/pipelines/integrations/dify_pipeline.py new file mode 100644 index 00000000..8e0f19a8 --- /dev/null +++ b/examples/pipelines/integrations/dify_pipeline.py @@ -0,0 +1,84 @@ +from typing import List, Union, Generator, Iterator, Optional +from pprint import pprint +import requests, json, warnings + +# Unhash to disable SSL verification warnings if needed +# warnings.filterwarnings('ignore', message='Unverified HTTPS request') + +class Pipeline: + def __init__(self): + self.name = "Dify Agent Pipeline" + self.api_url = "http://dify.hostname/v1/workflows/run" # Chane hostname + self.api_key = "app-dify-key" # Replace with actual API key + self.api_request_stream = True # Dify support stream + self.verify_ssl = True + self.debug = False + + async def on_startup(self): + # This function is called when the server is started. + print(f"on_startup: {__name__}") + pass + + async def on_shutdown(self): + # This function is called when the server is shutdown. + print(f"on_shutdown: {__name__}") + pass + + async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: + # This function is called before the OpenAI API request is made. You can modify the form data before it is sent to the OpenAI API. + print(f"inlet: {__name__}") + if self.debug: + print(f"inlet: {__name__} - body:") + pprint(body) + print(f"inlet: {__name__} - user:") + pprint(user) + return body + + async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: + # This function is called after the OpenAI API response is completed. You can modify the messages after they are received from the OpenAI API. + print(f"outlet: {__name__}") + if self.debug: + print(f"outlet: {__name__} - body:") + pprint(body) + print(f"outlet: {__name__} - user:") + pprint(user) + return body + + def pipe(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> Union[str, Generator, Iterator]: + print(f"pipe: {__name__}") + + if self.debug: + print(f"pipe: {__name__} - received message from user: {user_message}") + + # Set reponse mode Dify API parameter + if self.api_request_stream is True: + response_mode = "streaming" + else: + response_mode = "blocking" + + # This function triggers the workflow using the specified API. + headers = { + 'Authorization': f'Bearer {self.api_key}', + 'Content-Type': 'application/json' + } + data = { + "inputs": {"prompt": user_message}, + "response_mode": response_mode, + "user": body["user"]["email"] + } + + response = requests.post(self.api_url, headers=headers, json=data, stream=self.api_request_stream, verify=self.verify_ssl) + if response.status_code == 200: + # Process and yield each chunk from the response + for line in response.iter_lines(): + if line: + try: + # Remove 'data: ' prefix and parse JSON + json_data = json.loads(line.decode('utf-8').replace('data: ', '')) + # Extract and yield only the 'text' field from the nested 'data' object + if 'data' in json_data and 'text' in json_data['data']: + yield json_data['data']['text'] + except json.JSONDecodeError: + print(f"Failed to parse JSON: {line}") + else: + yield f"Workflow request failed with status code: {response.status_code}" From a82c47c6d4bcf2c2cfefca471cf07c4f70169ea7 Mon Sep 17 00:00:00 2001 From: Artur Zdolinski Date: Tue, 22 Oct 2024 23:48:19 +0200 Subject: [PATCH 2/4] Update dify_pipeline.py Uncomment to disable SSL verification warnings if needed. --- examples/pipelines/integrations/dify_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/pipelines/integrations/dify_pipeline.py b/examples/pipelines/integrations/dify_pipeline.py index 8e0f19a8..e22d3f51 100644 --- a/examples/pipelines/integrations/dify_pipeline.py +++ b/examples/pipelines/integrations/dify_pipeline.py @@ -2,7 +2,7 @@ from pprint import pprint import requests, json, warnings -# Unhash to disable SSL verification warnings if needed +# Uncomment to disable SSL verification warnings if needed. # warnings.filterwarnings('ignore', message='Unverified HTTPS request') class Pipeline: From f4fc2f1f07ad238f169c309fa977c6392589b043 Mon Sep 17 00:00:00 2001 From: Artur Zdolinski Date: Tue, 22 Oct 2024 23:57:44 +0200 Subject: [PATCH 3/4] Create n8n_pipeline.py --- .../pipelines/integrations/n8n_pipeline.py | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 examples/pipelines/integrations/n8n_pipeline.py diff --git a/examples/pipelines/integrations/n8n_pipeline.py b/examples/pipelines/integrations/n8n_pipeline.py new file mode 100644 index 00000000..51e0e4d2 --- /dev/null +++ b/examples/pipelines/integrations/n8n_pipeline.py @@ -0,0 +1,79 @@ +from typing import List, Union, Generator, Iterator, Optional +from pprint import pprint +import requests, json, warnings + +# Uncomment to disable SSL verification warnings if needed. +# warnings.filterwarnings('ignore', message='Unverified HTTPS request') + +class Pipeline: + def __init__(self): + self.name = "N8N Agent Pipeline" + self.api_url = "https://n8n.host/webhook/myflow" # Set correct hostname + self.api_key = "" # Insert your actual API key here + self.verify_ssl = True + self.debug = False + # Please note that N8N do not support stream reponses + + async def on_startup(self): + # This function is called when the server is started. + print(f"on_startup: {__name__}") + pass + + async def on_shutdown(self): + # This function is called when the server is shutdown. + print(f"on_shutdown: {__name__}") + pass + + async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: + # This function is called before the OpenAI API request is made. You can modify the form data before it is sent to the OpenAI API. + print(f"inlet: {__name__}") + if self.debug: + print(f"inlet: {__name__} - body:") + pprint(body) + print(f"inlet: {__name__} - user:") + pprint(user) + return body + + async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: + # This function is called after the OpenAI API response is completed. You can modify the messages after they are received from the OpenAI API. + print(f"outlet: {__name__}") + if self.debug: + print(f"outlet: {__name__} - body:") + pprint(body) + print(f"outlet: {__name__} - user:") + pprint(user) + return body + + def pipe(self, user_message: str, model_id: str, messages: List[dict], body: dict) -> Union[str, Generator, Iterator]: + # This is where you can add your custom pipelines like RAG. + print(f"pipe: {__name__}") + + if self.debug: + print(f"pipe: {__name__} - received message from user: {user_message}") + + # This function triggers the workflow using the specified API. + headers = { + 'Authorization': f'Bearer {self.api_key}', + 'Content-Type': 'application/json' + } + data = { + "inputs": {"prompt": user_message}, + "user": body["user"]["email"] + } + + response = requests.post(self.api_url, headers=headers, json=data, verify=self.verify_ssl) + if response.status_code == 200: + # Process and yield each chunk from the response + try: + for line in response.iter_lines(): + if line: + # Decode each line assuming UTF-8 encoding and directly parse it as JSON + json_data = json.loads(line.decode('utf-8')) + # Check if 'output' exists in json_data and yield it + if 'output' in json_data: + yield json_data['output'] + except json.JSONDecodeError as e: + print(f"Failed to parse JSON from line. Error: {str(e)}") + yield "Error in JSON parsing." + else: + yield f"Workflow request failed with status code: {response.status_code}" From ca35b9df0bd1b2a971a3dccdcfa741fae4b74edc Mon Sep 17 00:00:00 2001 From: Artur Zdolinski Date: Wed, 23 Oct 2024 00:02:24 +0200 Subject: [PATCH 4/4] Update dify_pipeline.py --- examples/pipelines/integrations/dify_pipeline.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/pipelines/integrations/dify_pipeline.py b/examples/pipelines/integrations/dify_pipeline.py index e22d3f51..86e412ac 100644 --- a/examples/pipelines/integrations/dify_pipeline.py +++ b/examples/pipelines/integrations/dify_pipeline.py @@ -8,8 +8,8 @@ class Pipeline: def __init__(self): self.name = "Dify Agent Pipeline" - self.api_url = "http://dify.hostname/v1/workflows/run" # Chane hostname - self.api_key = "app-dify-key" # Replace with actual API key + self.api_url = "http://dify.hostname/v1/workflows/run" # Set correct hostname + self.api_key = "app-dify-key" # Insert your actual API key here.v self.api_request_stream = True # Dify support stream self.verify_ssl = True self.debug = False