Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX] Fixes for Prompt studio Indexing and tool runs #1052

Merged
merged 31 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
efe74c8
Roll to latest SDK
gaya3-zipstack Dec 4, 2024
3ba42c2
Commit pdm.lock changes
gaya3-zipstack Dec 4, 2024
0796874
Merge main
gaya3-zipstack Dec 6, 2024
12cd28e
Remove pandoc and tessaract
gaya3-zipstack Dec 10, 2024
bc4deae
Revert change
gaya3-zipstack Dec 11, 2024
3def519
Merge remote-tracking branch 'origin' into feature/remote_storage
gaya3-zipstack Dec 11, 2024
26b2fcb
Roll tool versions
gaya3-zipstack Dec 11, 2024
ea0fc25
Roll tool versions
gaya3-zipstack Dec 11, 2024
bc9f145
Roll version
gaya3-zipstack Dec 11, 2024
587322f
Commit pdm.lock changes
gaya3-zipstack Dec 11, 2024
b7f17e0
Merge from main
gaya3-zipstack Dec 12, 2024
e5a4890
Remove reote storage tool registry
gaya3-zipstack Dec 12, 2024
435249e
Merge remote-tracking branch 'origin' into feature/remote_storage
gaya3-zipstack Dec 20, 2024
d5cb94d
Use ENvHelper for env standardisation
gaya3-zipstack Dec 26, 2024
97bccef
Commit pdm.lock changes
gaya3-zipstack Dec 27, 2024
21dc09b
Minor improvement
gaya3-zipstack Dec 31, 2024
f3806dd
Merge remote-tracking branch 'origin' into feature/remote_storage
gaya3-zipstack Dec 31, 2024
0bee7c3
Merge branch 'feature/remote_storage' of https://github.com/Zipstack/…
gaya3-zipstack Dec 31, 2024
ae8b076
Resolve conflicts
gaya3-zipstack Jan 2, 2025
a6c654b
Indexing and env fixes
gaya3-zipstack Jan 7, 2025
9b60731
Indexing and env fixes
gaya3-zipstack Jan 7, 2025
bdf42ec
Correcting sample env
gaya3-zipstack Jan 7, 2025
d916d3d
Merge branch 'main' into feature/remote_storage
gaya3-zipstack Jan 8, 2025
123b5c5
Roll SDK version
gaya3-zipstack Jan 8, 2025
012eca8
Merge from main
gaya3-zipstack Jan 8, 2025
0440ed5
SDK version roll
gaya3-zipstack Jan 8, 2025
9864e3d
Add sample env
gaya3-zipstack Jan 8, 2025
2123c30
Add sample env
gaya3-zipstack Jan 8, 2025
000598a
Merge remote-tracking branch 'origin' into feature/remote_storage
gaya3-zipstack Jan 8, 2025
6f85236
Lock file check in
gaya3-zipstack Jan 9, 2025
bf01dc3
Merge branch 'main' into feature/remote_storage
hari-kuriakose Jan 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 63 additions & 25 deletions backend/prompt_studio/prompt_studio_core_v2/prompt_studio_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,18 +381,36 @@ def index_document(
process_text = None
if text_processor:
process_text = text_processor.process
doc_id = PromptStudioHelper.dynamic_indexer(
profile_manager=default_profile,
tool_id=tool_id,
file_path=file_path,
org_id=org_id,
document_id=document_id,
is_summary=is_summary,
reindex=True,
run_id=run_id,
user_id=user_id,
process_text=process_text,
)
if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
doc_id = PromptStudioHelper.dynamic_indexer(
profile_manager=default_profile,
tool_id=tool_id,
file_path=file_path,
org_id=org_id,
document_id=document_id,
is_summary=is_summary,
reindex=True,
run_id=run_id,
user_id=user_id,
process_text=process_text,
)
else:
fs_instance = FileStorageHelper.initialize_file_storage(
type=FileStorageType.PERMANENT
)
doc_id = PromptStudioHelper.dynamic_indexer(
profile_manager=default_profile,
tool_id=tool_id,
file_path=file_path,
org_id=org_id,
document_id=document_id,
is_summary=is_summary,
reindex=True,
run_id=run_id,
user_id=user_id,
process_text=process_text,
fs=fs_instance,
)

elapsed_time = time.time() - start_time
logger.info(
Expand Down Expand Up @@ -870,7 +888,7 @@ def _fetch_response(
if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_hash = ToolUtils.get_hash_from_file(file_path=doc_path)
else:
file_hash = ToolUtils.get_hash_from_file(file_path=doc_path, fs=fs_instance)
file_hash = fs_instance.get_hash_from_file(path=doc_path)

payload = {
TSPKeys.TOOL_SETTINGS: tool_settings,
Expand Down Expand Up @@ -1123,17 +1141,34 @@ def _fetch_single_pass_response(
if not default_profile:
raise DefaultProfileError()

index_result = PromptStudioHelper.dynamic_indexer(
profile_manager=default_profile,
file_path=file_path,
tool_id=tool_id,
org_id=org_id,
is_summary=tool.summarize_as_source,
document_id=document_id,
run_id=run_id,
user_id=user_id,
process_text=process_text,
)
if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
index_result = PromptStudioHelper.dynamic_indexer(
profile_manager=default_profile,
file_path=file_path,
tool_id=tool_id,
org_id=org_id,
is_summary=tool.summarize_as_source,
document_id=document_id,
run_id=run_id,
user_id=user_id,
process_text=process_text,
)
else:
fs_instance = FileStorageHelper.initialize_file_storage(
type=FileStorageType.PERMANENT
)
index_result = PromptStudioHelper.dynamic_indexer(
profile_manager=default_profile,
file_path=file_path,
tool_id=tool_id,
org_id=org_id,
is_summary=tool.summarize_as_source,
document_id=document_id,
run_id=run_id,
user_id=user_id,
process_text=process_text,
fs=fs_instance,
)
if index_result.get("status") == IndexingStatus.PENDING_STATUS.value:
return {
"status": IndexingStatus.PENDING_STATUS.value,
Expand Down Expand Up @@ -1174,7 +1209,10 @@ def _fetch_single_pass_response(
if tool.summarize_as_source:
path = Path(file_path)
file_path = str(path.parent / TSPKeys.SUMMARIZE / (path.stem + ".txt"))
file_hash = ToolUtils.get_hash_from_file(file_path=file_path)
if not check_feature_flag_status(FeatureFlag.REMOTE_FILE_STORAGE):
file_hash = ToolUtils.get_hash_from_file(file_path=file_path)
else:
file_hash = fs_instance.get_hash_from_file(path=file_path)

payload = {
TSPKeys.TOOL_SETTINGS: tool_settings,
Expand Down
6 changes: 2 additions & 4 deletions backend/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,10 @@ API_EXECUTION_DIR_PREFIX="unstract/api"

# Storage Provider for Workflow Execution
# Valid options: MINIO, S3, etc..
WORKFLOW_EXECUTION_FS_PROVIDER="MINIO"
WORKFLOW_EXECUTION_FS_CREDENTIAL='{"endpoint_url": "", "key": "", "secret": ""}'
WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS='{"provider":"minio","credentials": {"endpoint_url":"http://unstract-minio:9000","key":"XXX","secret":"XXX"}}'

# Storage Provider for API Execution
API_STORAGE_FS_PROVIDER="MINIO"
API_STORAGE_FS_CREDENTIAL='{"endpoint_url": "", "key": "", "secret": ""}'
API_FILE_STORAGE_CREDENTIALS='{"provider": "minio", "credentials": {"endpoint_url": "http://unstract-minio:9000", "key": "XXX", "secret": "XXX"}}'

# Optional: Legacy storage path (if applicable)
LEGACY_STORAGE_PATH="/path/to/legacy/storage"
3 changes: 1 addition & 2 deletions tools/classifier/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@ X2TEXT_PORT=3004
# (e.g., bucket/execution/org_id/workflow_id/execution_id)
EXECUTION_DATA_DIR=<execution_dir_path_with_bucket>
# Storage provider for Workflow Execution (e.g., minio, S3)
WORKFLOW_EXECUTION_FS_PROVIDER="minio"
WORKFLOW_EXECUTION_FS_CREDENTIAL={"endpoint_url":"http://localhost:9000","key":"","secret":""}
WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS='{"provider":"minio","credentials"={"endpoint_url":"http://localhost:9000","key":"XXX","secret":"XXX"}}'
2 changes: 1 addition & 1 deletion tools/classifier/src/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def _extract_from_adapter(self, file: str, adapter_id: str) -> Optional[str]:

try:
extraction_result: TextExtractionResult = x2text.process(
input_file_path=file
input_file_path=file, fs=self.tool.workflow_filestorage
)
extracted_text: str = extraction_result.extracted_text
return extracted_text
Expand Down
3 changes: 1 addition & 2 deletions tools/structure/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,4 @@ X2TEXT_PORT=3004
# (e.g., bucket/execution/org_id/workflow_id/execution_id)
EXECUTION_DATA_DIR=<execution_dir_path_with_bucket>
# Storage provider for Workflow Execution (e.g., minio, S3)
WORKFLOW_EXECUTION_FS_PROVIDER="minio"
WORKFLOW_EXECUTION_FS_CREDENTIAL={"endpoint_url":"http://localhost:9000","key":"","secret":""}
WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS='{"provider":"minio","credentials"={"endpoint_url":"http://localhost:9000","key":"","secret":""}}'
16 changes: 13 additions & 3 deletions tools/structure/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ def run(
reindex=reindex,
usage_kwargs=usage_kwargs,
process_text=process_text,
**(
{"fs": self.workflow_filestorage}
if self.workflow_filestorage is not None
else {}
),
)
index_metrics[output[SettingsKeys.NAME]] = {
SettingsKeys.INDEXING: index.get_metrics()
Expand Down Expand Up @@ -379,9 +384,14 @@ def _summarize_and_index(
f.write(summarized_context)

self.stream_log("Indexing summarized context")
summarize_file_hash: str = ToolUtils.get_hash_from_file(
file_path=summarize_file_path
)
if self.workflow_filestorage:
summarize_file_hash: str = self.workflow_filestorage.get_hash_from_file(
file_path=summarize_file_path
)
else:
summarize_file_hash: str = ToolUtils.get_hash_from_file(
file_path=summarize_file_path
)
index.index(
tool_id=tool_id,
embedding_instance_id=embedding_instance_id,
Expand Down
3 changes: 1 addition & 2 deletions tools/text_extractor/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@ X2TEXT_PORT=
# (e.g., bucket/execution/org_id/workflow_id/execution_id)
EXECUTION_DATA_DIR=<execution_dir_path_with_bucket>
# Storage provider for Workflow Execution (e.g., minio, S3)
WORKFLOW_EXECUTION_FS_PROVIDER="minio"
WORKFLOW_EXECUTION_FS_CREDENTIAL={"endpoint_url":"http://localhost:9000","key":"","secret":""}
WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS='{"provider":"minio","credentials"={"endpoint_url":"http://localhost:9000","key":"XXX","secret":"XXX"}}'
11 changes: 8 additions & 3 deletions tools/text_extractor/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,14 @@ def run(
tool=self, adapter_instance_id=text_extraction_adapter_id
)
self.stream_log("Text extraction adapter has been created successfully.")
extraction_result: TextExtractionResult = text_extraction_adapter.process(
input_file_path=input_file
)
if self.workflow_filestorage:
extraction_result: TextExtractionResult = text_extraction_adapter.process(
input_file_path=input_file, fs=self.workflow_filestorage
)
else:
extraction_result: TextExtractionResult = text_extraction_adapter.process(
input_file_path=input_file
)
extracted_text = self.convert_to_actual_string(extraction_result.extracted_text)

self.stream_log("Text has been extracted successfully.")
Expand Down
24 changes: 6 additions & 18 deletions unstract/filesystem/src/unstract/filesystem/file_storage_config.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import json
import logging
import os

from unstract.sdk.file_storage import FileStorageProvider, SharedTemporaryFileStorage
from unstract.sdk.file_storage import FileStorageProvider, StorageType

from .exceptions import ProviderNotFound
from .file_storage_types import FileStorageType # Import the shared enum
Expand All @@ -29,22 +28,11 @@ def get_provider(var_name: str, default: str = "minio") -> FileStorageProvider:

# Mappings for storage types, credentials, and providers
STORAGE_MAPPING = {
FileStorageType.WORKFLOW_EXECUTION: SharedTemporaryFileStorage,
FileStorageType.API_EXECUTION: SharedTemporaryFileStorage,
FileStorageType.WORKFLOW_EXECUTION: StorageType.SHARED_TEMPORARY,
FileStorageType.API_EXECUTION: StorageType.SHARED_TEMPORARY,
}

FILE_STORAGE_CREDENTIALS_MAPPING = {
FileStorageType.WORKFLOW_EXECUTION: json.loads(
os.environ.get("WORKFLOW_EXECUTION_FS_CREDENTIAL", "{}")
),
FileStorageType.API_EXECUTION: json.loads(
os.environ.get("API_STORAGE_FS_CREDENTIAL", "{}")
),
}

FILE_STORAGE_PROVIDER_MAPPING = {
FileStorageType.WORKFLOW_EXECUTION: get_provider(
"WORKFLOW_EXECUTION_FS_PROVIDER", "minio"
),
FileStorageType.API_EXECUTION: get_provider("API_STORAGE_FS_PROVIDER", "minio"),
FILE_STORAGE_CREDENTIALS_TO_ENV_NAME_MAPPING = {
FileStorageType.WORKFLOW_EXECUTION: "WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS",
FileStorageType.API_EXECUTION: "API_FILE_STORAGE_CREDENTIALS",
}
42 changes: 8 additions & 34 deletions unstract/filesystem/src/unstract/filesystem/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
import logging
import os
from typing import Any

from unstract.sdk.file_storage import (
FileStorage,
FileStorageProvider,
PermanentFileStorage,
)
from unstract.sdk.file_storage import EnvHelper, FileStorage

from unstract.filesystem.file_storage_types import FileStorageType

from .file_storage_config import (
FILE_STORAGE_CREDENTIALS_MAPPING,
FILE_STORAGE_PROVIDER_MAPPING,
FILE_STORAGE_CREDENTIALS_TO_ENV_NAME_MAPPING,
STORAGE_MAPPING,
)

Expand All @@ -23,31 +16,12 @@ class FileSystem:
def __init__(self, file_storage_type: FileStorageType):
self.file_storage_type = file_storage_type

def _get_credentials(self) -> dict[str, Any]:
"""Retrieve storage credentials based on the storage type."""
return FILE_STORAGE_CREDENTIALS_MAPPING.get(self.file_storage_type, {})

def _get_provider(self) -> FileStorageProvider:
"""Retrieve provider based on the storage type."""
provider_name = FILE_STORAGE_PROVIDER_MAPPING.get(self.file_storage_type)
if not provider_name:
raise ValueError(f"No provider found for {self.file_storage_type}.")
return provider_name

def get_file_storage(self) -> FileStorage:
"""Initialize and return the appropriate file storage instance."""
# Get the storage class based on the FileStorageType
storage_class = STORAGE_MAPPING.get(self.file_storage_type)

if not storage_class:
raise ValueError(f"Unsupported FileStorageType: {self.file_storage_type}")

# Base configuration for all storage classes
credentials = self._get_credentials()

# Add specific parameters based on the storage class type
if storage_class == PermanentFileStorage:
credentials["legacy_storage_path"] = os.getenv("LEGACY_STORAGE_PATH", None)

# Return an instance of the storage class with the appropriate configuration
return storage_class(provider=self._get_provider(), **credentials)
storage_type = STORAGE_MAPPING.get(self.file_storage_type)
env_name = FILE_STORAGE_CREDENTIALS_TO_ENV_NAME_MAPPING.get(
self.file_storage_type
)
file_storage = EnvHelper.get_storage(storage_type, env_name)
return file_storage
5 changes: 3 additions & 2 deletions worker/src/unstract/worker/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ class Env:
LOG_LEVEL = "LOG_LEVEL"
REMOVE_CONTAINER_ON_EXIT = "REMOVE_CONTAINER_ON_EXIT"
WORKFLOW_EXECUTION_DIR_PREFIX = "WORKFLOW_EXECUTION_DIR_PREFIX"
WORKFLOW_EXECUTION_FS_PROVIDER = "WORKFLOW_EXECUTION_FS_PROVIDER"
WORKFLOW_EXECUTION_FS_CREDENTIAL = "WORKFLOW_EXECUTION_FS_CREDENTIAL"
WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS = (
"WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS"
)
EXECUTION_DATA_DIR = "EXECUTION_DATA_DIR"
FLIPT_SERVICE_AVAILABLE = "FLIPT_SERVICE_AVAILABLE"

Expand Down
7 changes: 2 additions & 5 deletions worker/src/unstract/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,8 @@ def run_container(
workflow_id,
execution_id,
)
envs[Env.WORKFLOW_EXECUTION_FS_PROVIDER] = os.getenv(
Env.WORKFLOW_EXECUTION_FS_PROVIDER, ""
)
envs[Env.WORKFLOW_EXECUTION_FS_CREDENTIAL] = os.getenv(
Env.WORKFLOW_EXECUTION_FS_CREDENTIAL, "{}"
envs[Env.WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS] = os.getenv(
Env.WORKFLOW_EXECUTION_FILE_STORAGE_CREDENTIALS, "{}"
)
else:
envs[Env.TOOL_DATA_DIR] = tool_data_dir
Expand Down
Loading