Skip to content

Commit

Permalink
Feat/remote storage workflow (#837)
Browse files Browse the repository at this point in the history
* implement remote-storage from sdk in backend under featureflag

* added feature flags in imports

* minor changes

* handled fetching result from filestorage

* minor changes and adding updating docs and sample envs

* addressed pr comments

* minor changes in test

* updated tox ini
  • Loading branch information
muhammad-ali-e authored Nov 22, 2024
1 parent c685aba commit f2a55f2
Show file tree
Hide file tree
Showing 19 changed files with 519 additions and 41 deletions.
1 change: 1 addition & 0 deletions backend/backend/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ class FeatureFlag:
"""Temporary feature flags."""

APP_DEPLOYMENT = "app_deployment"
REMOTE_FILE_STORAGE = "remote_file_storage"
19 changes: 19 additions & 0 deletions backend/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,22 @@ TOOL_REGISTRY_CONFIG_PATH="/data/tool_registry_config"

# Flipt Service
FLIPT_SERVICE_AVAILABLE=False


# File System Configuration for Workflow and API Execution

# Directory Prefixes for storing execution files
WORKFLOW_EXECUTION_DIR_PREFIX="unstract/execution"
API_EXECUTION_DIR_PREFIX="unstract/api"

# Storage Provider for Workflow Execution
# Valid options: MINIO, S3, etc..
WORKFLOW_EXECUTION_FS_PROVIDER="MINIO"
WORKFLOW_EXECUTION_FS_CREDENTIAL='{"endpoint_url": "", "key": "", "secret": ""}'

# Storage Provider for API Execution
API_STORAGE_FS_PROVIDER="MINIO"
API_STORAGE_FS_CREDENTIAL='{"endpoint_url": "", "key": "", "secret": ""}'

# Optional: Legacy storage path (if applicable)
LEGACY_STORAGE_PATH="/path/to/legacy/storage"
6 changes: 0 additions & 6 deletions backend/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@ class Account:
ORGANIZATION_ID = "organization_id"


class FeatureFlag:
"""Temporary feature flags."""

pass


class Common:
METADATA = "metadata"
MODULE = "module"
Expand Down
39 changes: 28 additions & 11 deletions backend/workflow_manager/endpoint_v2/base_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@
from utils.constants import Common
from utils.user_context import UserContext

from backend.constants import FeatureFlag
from unstract.connectors.filesystems import connectors
from unstract.connectors.filesystems.unstract_file_system import UnstractFileSystem
from unstract.flags.feature_flag import check_feature_flag_status

if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
from unstract.filesystem import FileStorageType, FileSystem


class BaseConnector(ExecutionFileHandler):
Expand All @@ -22,13 +27,14 @@ def __init__(
This class serves as a base for connectors and provides common
utilities.
"""
if not (settings.API_STORAGE_DIR and settings.WORKFLOW_DATA_DIR):
raise ValueError("Missed env API_STORAGE_DIR or WORKFLOW_DATA_DIR")
super().__init__(workflow_id, execution_id, organization_id)
# Directory path for storing execution-related files for API
self.api_storage_dir: str = self.create_execution_dir_path(
workflow_id, execution_id, organization_id, settings.API_STORAGE_DIR
)
if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
if not (settings.API_STORAGE_DIR and settings.WORKFLOW_DATA_DIR):
raise ValueError("Missed env API_STORAGE_DIR or WORKFLOW_DATA_DIR")
# Directory path for storing execution-related files for API
self.api_storage_dir: str = self.create_execution_dir_path(
workflow_id, execution_id, organization_id, settings.API_STORAGE_DIR
)

def get_fsspec(
self, settings: dict[str, Any], connector_id: str
Expand Down Expand Up @@ -80,8 +86,14 @@ def get_json_schema(cls, file_path: str) -> dict[str, Any]:
json.JSONDecodeError: If there is an issue decoding the JSON file.
"""
try:
with open(file_path, encoding="utf-8") as file:
schema: dict[str, Any] = json.load(file)
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
file_storage = file_system.get_file_storage()
file_contents = file_storage.read(path=file_path, encoding="utf-8")
schema: dict[str, Any] = json.load(file_contents)
else:
with open(file_path, encoding="utf-8") as file:
schema: dict[str, Any] = json.load(file)
except OSError:
schema = {}
return schema
Expand All @@ -100,7 +112,12 @@ def get_api_storage_dir_path(cls, workflow_id: str, execution_id: str) -> str:
str: The directory path for the execution.
"""
organization_id = UserContext.get_organization_identifier()
api_storage_dir: str = cls.create_execution_dir_path(
workflow_id, execution_id, organization_id, settings.API_STORAGE_DIR
)
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
api_storage_dir: str = cls.get_api_execution_dir(
workflow_id, execution_id, organization_id
)
else:
api_storage_dir: str = cls.create_execution_dir_path(
workflow_id, execution_id, organization_id, settings.API_STORAGE_DIR
)
return api_storage_dir
63 changes: 59 additions & 4 deletions backend/workflow_manager/endpoint_v2/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from connector_v2.models import ConnectorInstance
from fsspec.implementations.local import LocalFileSystem
from unstract.sdk.constants import ToolExecKey
from unstract.sdk.tool.mime_types import EXT_MIME_MAP
from unstract.workflow_execution.constants import ToolOutputType
from utils.user_context import UserContext
from workflow_manager.endpoint_v2.base_connector import BaseConnector
Expand All @@ -36,8 +37,13 @@
from workflow_manager.workflow_v2.models.file_history import FileHistory
from workflow_manager.workflow_v2.models.workflow import Workflow

from backend.constants import FeatureFlag
from backend.exceptions import UnstractFSException
from unstract.connectors.exceptions import ConnectorError
from unstract.flags.feature_flag import check_feature_flag_status

if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
from unstract.filesystem import FileStorageType, FileSystem

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -405,6 +411,8 @@ def get_result(self, file_history: Optional[FileHistory] = None) -> Optional[Any
Returns:
Union[dict[str, Any], str]: Result data.
"""
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
return self.get_result_with_file_storage(file_history=file_history)
if file_history and file_history.result:
return self.parse_string(file_history.result)
output_file = os.path.join(self.execution_dir, WorkflowFileType.INFILE)
Expand Down Expand Up @@ -434,6 +442,43 @@ def get_result(self, file_history: Optional[FileHistory] = None) -> Optional[Any
logger.error(f"Error while getting result {err}")
return result

def get_result_with_file_storage(
self, file_history: Optional[FileHistory] = None
) -> Optional[Any]:
"""Get result data from the output file.
Returns:
Union[dict[str, Any], str]: Result data.
"""
if file_history and file_history.result:
return self.parse_string(file_history.result)
output_file = os.path.join(self.execution_dir, WorkflowFileType.INFILE)
metadata: dict[str, Any] = self.get_workflow_metadata()
output_type = self.get_output_type(metadata)
result: Union[dict[str, Any], str] = ""
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
file_storage = file_system.get_file_storage()
try:
# TODO: SDK handles validation; consider removing here.
file_type = file_storage.mime_type(output_file)
if output_type == ToolOutputType.JSON:
if file_type != EXT_MIME_MAP[ToolOutputType.JSON.lower()]:
logger.error(f"Output type json mismatched {file_type}")
raise ToolOutputTypeMismatch()
file_content = file_storage.read(output_file, mode="r")
result = json.loads(file_content)
elif output_type == ToolOutputType.TXT:
if file_type == EXT_MIME_MAP[ToolOutputType.JSON.lower()]:
logger.error(f"Output type txt mismatched {file_type}")
raise ToolOutputTypeMismatch()
file_content = file_storage.read(output_file, mode="r")
result = file_content.encode("utf-8").decode("unicode-escape")
else:
raise InvalidToolOutputType()
except (FileNotFoundError, json.JSONDecodeError) as err:
logger.error(f"Error while getting result {err}")
return result

def get_metadata(
self, file_history: Optional[FileHistory] = None
) -> Optional[dict[str, Any]]:
Expand All @@ -454,8 +499,13 @@ def delete_execution_directory(self) -> None:
Returns:
None
"""
fs: LocalFileSystem = fsspec.filesystem("file")
fs.rm(self.execution_dir, recursive=True)
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
file_storage = file_system.get_file_storage()
file_storage.rm(self.execution_dir, recursive=True)
else:
fs: LocalFileSystem = fsspec.filesystem("file")
fs.rm(self.execution_dir, recursive=True)
self.delete_api_storage_dir(self.workflow_id, self.execution_id)

@classmethod
Expand All @@ -468,8 +518,13 @@ def delete_api_storage_dir(cls, workflow_id: str, execution_id: str) -> None:
api_storage_dir = cls.get_api_storage_dir_path(
workflow_id=workflow_id, execution_id=execution_id
)
fs: LocalFileSystem = fsspec.filesystem("file")
fs.rm(api_storage_dir, recursive=True)
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.API_EXECUTION)
file_storage = file_system.get_file_storage()
file_storage.rm(api_storage_dir, recursive=True)
else:
fs: LocalFileSystem = fsspec.filesystem("file")
fs.rm(api_storage_dir, recursive=True)

@classmethod
def create_endpoint_for_workflow(
Expand Down
106 changes: 97 additions & 9 deletions backend/workflow_manager/endpoint_v2/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
from workflow_manager.workflow_v2.file_history_helper import FileHistoryHelper
from workflow_manager.workflow_v2.models.workflow import Workflow

from backend.constants import FeatureFlag
from unstract.flags.feature_flag import check_feature_flag_status

if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
from unstract.filesystem import FileStorageType, FileSystem

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -475,11 +481,17 @@ def add_input_from_connector_to_volume(self, input_file_path: str) -> str:
)
self.publish_input_file_content(input_file_path, input_log)

with fsspec.open(source_file, "wb") as local_file:
local_file.write(file_content)
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
file_storage = file_system.get_file_storage()
file_storage.write(path=source_file_path, mode="wb", data=file_content)
file_storage.write(path=infile_path, mode="wb", data=file_content)
else:
with fsspec.open(source_file, "wb") as local_file:
local_file.write(file_content)

# Copy file to infile directory
self.copy_file_to_infile_dir(source_file_path, infile_path)
# Copy file to infile directory
self.copy_file_to_infile_dir(source_file_path, infile_path)

logger.info(f"{input_file_path} is added to execution directory")
return hash_value_of_file_content
Expand All @@ -488,8 +500,76 @@ def add_input_from_api_storage_to_volume(self, input_file_path: str) -> None:
"""Add input file to execution directory from api storage."""
infile_path = os.path.join(self.execution_dir, WorkflowFileType.INFILE)
source_path = os.path.join(self.execution_dir, WorkflowFileType.SOURCE)
shutil.copyfile(input_file_path, infile_path)
shutil.copyfile(input_file_path, source_path)
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
api_file_system = FileSystem(FileStorageType.API_EXECUTION)
api_file_storage = api_file_system.get_file_storage()
workflow_file_system = FileSystem(FileStorageType.WORKFLOW_EXECUTION)
workflow_file_storage = workflow_file_system.get_file_storage()
self._copy_file_to_destination(
source_storage=api_file_storage,
destination_storage=workflow_file_storage,
source_path=input_file_path,
destination_paths=[infile_path, source_path],
)
else:
shutil.copyfile(input_file_path, infile_path)
shutil.copyfile(input_file_path, source_path)

# TODO: replace it with method from SDK Utils
def _copy_file_to_destination(
self,
source_storage: Any,
destination_storage: Any,
source_path: str,
destination_paths: list[str],
chunk_size: int = 4096,
) -> None:
"""
Copy a file from a source storage to one or more paths in a
destination storage.
This function reads the source file in chunks and writes each chunk to
the specified destination paths. The function will continue until the
entire source file is copied.
Args:
source_storage (FileStorage): The storage object from which
the file is read.
destination_storage (FileStorage): The storage object to which
the file is written.
source_path (str): The path of the file in the source storage.
destination_paths (list[str]): A list of paths where the file will be
copied in the destination storage.
chunk_size (int, optional): The number of bytes to read per chunk.
Default is 4096 bytes.
"""
seek_position = 0 # Start from the beginning
end_of_file = False

# Loop to read and write in chunks until the end of the file
while not end_of_file:
# Read a chunk from the source file
chunk = source_storage.read(
path=source_path,
mode="rb",
seek_position=seek_position,
length=chunk_size,
)
# Check if the end of the file has been reached
if not chunk:
end_of_file = True
else:
# Write the chunk to each destination path
for destination_file in destination_paths:
destination_storage.write(
path=destination_file,
mode="ab",
seek_position=seek_position,
data=chunk,
)

# Update the seek position
seek_position += len(chunk)

def add_file_to_volume(self, input_file_path: str, file_hash: FileHash) -> str:
"""Add input file to execution directory.
Expand Down Expand Up @@ -584,12 +664,20 @@ def add_input_file_to_api_storage(
for file in file_objs:
file_name = file.name
destination_path = os.path.join(api_storage_dir, file_name)
os.makedirs(os.path.dirname(destination_path), exist_ok=True)
with open(destination_path, "wb") as f:
if check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_system = FileSystem(FileStorageType.API_EXECUTION)
file_storage = file_system.get_file_storage()
buffer = bytearray()
for chunk in file.chunks():
buffer.extend(chunk)
f.write(buffer)
file_storage.write(path=destination_path, mode="wb", data=buffer)
else:
os.makedirs(os.path.dirname(destination_path), exist_ok=True)
with open(destination_path, "wb") as f:
buffer = bytearray()
for chunk in file.chunks():
buffer.extend(chunk)
f.write(buffer)
file_hash = cls.hash_str(buffer)
connection_type = WorkflowEndpoint.ConnectionType.API

Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ env_list = py{39,310,311}, worker
changedir = worker
setenv =
PDM_IGNORE_SAVED_PYTHON="1"
PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python
deps = pdm
allowlist_externals=
sh
Expand Down
Loading

0 comments on commit f2a55f2

Please sign in to comment.