Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: pdf parsing, pass as str instead of posix path #4448

Merged
merged 18 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ dependencies = [
"langchain-elasticsearch>=0.2.0",
"opensearch-py>=2.7.1",
"langchain-ollama>=0.2.0",
"pymupdf~=1.24.13",
"sqlalchemy[aiosqlite,postgresql_psycopg2binary,postgresql_psycopgbinary]>=2.0.36",
"atlassian-python-api>=3.41.16",
]
Expand Down
71 changes: 34 additions & 37 deletions src/backend/base/langflow/components/data/file.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from tempfile import NamedTemporaryFile
from zipfile import ZipFile, is_zipfile

import fitz

from langflow.base.data.utils import TEXT_FILE_TYPES, parse_text_file_to_data
from langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data
from langflow.custom import Component
from langflow.io import BoolInput, FileInput, Output
from langflow.io import BoolInput, FileInput, IntInput, Output
from langflow.schema import Data


Expand Down Expand Up @@ -49,6 +46,13 @@ class FileComponent(Component):
advanced=True,
info="If true, parallel processing will be enabled for zip files.",
),
IntInput(
name="concurrency_multithreading",
display_name="Multithreading Concurrency",
advanced=True,
info="The maximum number of workers to use, if concurrency is enabled",
value=4,
),
]

outputs = [Output(display_name="Data", name="data", method="load_file")]
Expand All @@ -74,16 +78,19 @@ def load_file(self) -> Data:
# Check if the file is a zip archive
if is_zipfile(resolved_path):
self.log(f"Processing zip file: {resolved_path.name}.")

return self._process_zip_file(
resolved_path,
silent_errors=self.silent_errors,
parallel=self.use_multithreading,
)

self.log(f"Processing single file: {resolved_path.name}.")

return self._process_single_file(resolved_path, silent_errors=self.silent_errors)
except FileNotFoundError:
self.log(f"File not found: {resolved_path.name}.")

raise

def _process_zip_file(self, zip_path: Path, *, silent_errors: bool = False, parallel: bool = False) -> Data:
Expand Down Expand Up @@ -126,7 +133,7 @@ def _process_zip_file(self, zip_path: Path, *, silent_errors: bool = False, para
raise ValueError(msg)

# Define a function to process each file
def process_file(file_name):
def process_file(file_name, silent_errors=silent_errors):
with NamedTemporaryFile(delete=False) as temp_file:
temp_path = Path(temp_file.name).with_name(file_name)
with zip_file.open(file_name) as file_content:
Expand All @@ -138,19 +145,24 @@ def process_file(file_name):

# Process files in parallel if specified
if parallel:
self.log("Initializing parallel Thread Pool Executor.")
with ThreadPoolExecutor() as executor:
futures = {executor.submit(process_file, file): file for file in valid_files}
for future in as_completed(futures):
try:
data.append(future.result())
except Exception as e:
self.log(f"Error processing file {futures[future]}: {e}")
if not silent_errors:
raise
self.log(
f"Initializing parallel Thread Pool Executor with max workers: "
f"{self.concurrency_multithreading}."
)

# Process files in parallel
initial_data = parallel_load_data(
valid_files,
silent_errors=silent_errors,
load_function=process_file,
max_concurrency=self.concurrency_multithreading,
)

# Filter out empty data
data = list(filter(None, initial_data))
else:
# Sequential processing
data.extend([process_file(file_name) for file_name in valid_files])
data = [process_file(file_name) for file_name in valid_files]

self.log(f"Successfully processed zip file: {zip_path.name}.")

Expand All @@ -169,20 +181,8 @@ def _process_single_file(self, file_path: Path, *, silent_errors: bool = False)
Raises:
ValueError: For unsupported file formats.
"""

# Define a function to extract text from a PDF file
def pdf_to_text(filepath):
text = ""

# Open the PDF file
with fitz.open(filepath) as pdf:
for page in pdf:
text += page.get_text() + "\n"

return text

# Check if the file type is supported
if not any(file_path.suffix == ext for ext in ["." + f for f in [*TEXT_FILE_TYPES, "pdf"]]):
if not any(file_path.suffix == ext for ext in ["." + f for f in TEXT_FILE_TYPES]):
self.log(f"Unsupported file type: {file_path.suffix}")

# Return empty data if silent_errors is True
Expand All @@ -193,13 +193,10 @@ def pdf_to_text(filepath):
raise ValueError(msg)

try:
# Parse the file based on the file type
if file_path.suffix == ".pdf":
data = Data(data={"file_path": file_path, "text": pdf_to_text(file_path)})
else:
data = parse_text_file_to_data(str(file_path), silent_errors=silent_errors) # type: ignore[assignment]
if not data:
data = Data()
# Parse the text file as appropriate
data = parse_text_file_to_data(str(file_path), silent_errors=silent_errors) # type: ignore[assignment]
if not data:
data = Data()

self.log(f"Successfully processed file: {file_path.name}.")
except Exception as e:
Expand Down
Loading
Loading