Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix/implicit single connection #224

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions core_api/src/publisher_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from faststream.redis import RedisBroker

from redbox.models import File


class FilePublisher:
"""This class is a bit of a hack to overcome a shortcoming (bug?) in faststream
whereby the broker is not automatically connected in sub-applications.

TODO: fix this properly, or raise an issue against faststream
"""

def __init__(self, broker: RedisBroker, queue_name: str):
self.connected = False
self.broker = broker
self.queue_name = queue_name

async def publish(self, file: File):
if not self.connected:
# we only do this once
await self.broker.connect()
self.connected = True
await self.broker.publish(file, self.queue_name)
27 changes: 9 additions & 18 deletions core_api/src/routes/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

from fastapi import FastAPI, HTTPException
from faststream.redis.fastapi import RedisRouter
from pydantic import AnyHttpUrl

from core_api.src.publisher_handler import FilePublisher
from redbox.models import Chunk, File, FileStatus, Settings
from redbox.storage import ElasticsearchStorageHandler

Expand All @@ -23,8 +23,8 @@

# === Queues ===
router = RedisRouter(url=env.redis_url)
publisher = router.publisher(env.ingest_queue_name)

file_publisher = FilePublisher(router.broker, env.ingest_queue_name)

# === Storage ===

Expand All @@ -48,31 +48,22 @@


@file_app.post("/", tags=["file"])
async def create_upload_file(name: str, type: str, location: AnyHttpUrl) -> UUID:
"""Upload a file to the object store and create a record in the database
async def add_file(file: File) -> File:
"""Create a File record in the database

Args:
name (str): The file name to be recorded
type (str): The file type to be recorded
location (AnyHttpUrl): The presigned file resource location
file (File): The file to be recorded

Returns:
UUID: The file uuid from the elastic database
File: The file uuid from the elastic database
"""

file = File(
name=name,
url=str(location), # avoids JSON serialisation error
content_type=type,
)

storage_handler.write_item(file)

log.info(f"publishing {file.uuid}")
await router.broker.connect()
await publisher.publish(file)
await file_publisher.publish(file)

return file.uuid
return file


@file_app.get("/{file_uuid}", response_model=File, tags=["file"])
Expand All @@ -99,7 +90,7 @@ def delete_file(file_uuid: UUID) -> File:
File: The file that was deleted
"""
file = storage_handler.read_item(file_uuid, model_type="File")
s3.delete_object(Bucket=env.bucket_name, Key=file.name)
s3.delete_object(Bucket=env.bucket_name, Key=file.key)
storage_handler.delete_item(file)

chunks = storage_handler.get_file_chunks(file.uuid)
Expand Down
15 changes: 1 addition & 14 deletions core_api/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ def elasticsearch_storage_handler(es_client):

@pytest.fixture
def file(s3_client, file_pdf_path) -> YieldFixture[File]:
"""
TODO: this is a cut and paste of core_api:create_upload_file
When we come to test core_api we should think about
the relationship between core_api and the ingester app
"""
file_name = os.path.basename(file_pdf_path)
file_type = f'.{file_name.split(".")[-1]}'

Expand All @@ -64,15 +59,7 @@ def file(s3_client, file_pdf_path) -> YieldFixture[File]:
Tagging=f"file_type={file_type}",
)

authenticated_s3_url = s3_client.generate_presigned_url(
"get_object",
Params={"Bucket": env.bucket_name, "Key": file_name},
ExpiresIn=3600,
)

# Strip off the query string (we don't need the keys)
simple_s3_url = authenticated_s3_url.split("?")[0]
file_record = File(name=file_name, url=simple_s3_url, content_type=file_type)
file_record = File(key=file_name, bucket=env.bucket_name)

yield file_record

Expand Down
17 changes: 5 additions & 12 deletions core_api/tests/routes/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,12 @@ async def test_post_file_upload(s3_client, app_client, elasticsearch_storage_han
ExtraArgs={"Tagging": "file_type=pdf"},
)

authenticated_s3_url = s3_client.generate_presigned_url(
"get_object",
Params={"Bucket": env.bucket_name, "Key": file_key},
ExpiresIn=3600,
)

async with TestRedisBroker(router.broker):
response = app_client.post(
"/file",
params={
"name": "filename",
"type": ".pdf",
"location": authenticated_s3_url,
json={
"key": file_key,
"bucket": env.bucket_name,
},
)
assert response.status_code == 200
Expand All @@ -61,7 +54,7 @@ def test_delete_file(s3_client, app_client, elasticsearch_storage_handler, chunk
I Expect to see it removed from s3 and elastic-search, including the chunks
"""
# check assets exist
assert s3_client.get_object(Bucket=env.bucket_name, Key=chunked_file.name)
assert s3_client.get_object(Bucket=env.bucket_name, Key=chunked_file.key)
assert elasticsearch_storage_handler.read_item(item_uuid=chunked_file.uuid, model_type="file")
assert elasticsearch_storage_handler.get_file_chunks(chunked_file.uuid)

Expand All @@ -72,7 +65,7 @@ def test_delete_file(s3_client, app_client, elasticsearch_storage_handler, chunk

# check assets dont exist
with pytest.raises(Exception):
s3_client.get_object(Bucket=env.bucket_name, Key=chunked_file.name)
s3_client.get_object(Bucket=env.bucket_name, Key=chunked_file.key)

with pytest.raises(NotFoundError):
elasticsearch_storage_handler.read_item(item_uuid=chunked_file.uuid, model_type="file")
Expand Down
16 changes: 6 additions & 10 deletions django_app/redbox_app/redbox_core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,19 @@ def __init__(self, host: str, port: int):
def url(self) -> str:
return f"{self.host}:{self.port}"

def upload_file(self, s3_url: str, name: str, extension: str):
def upload_file(self, name: str):
if self.host == "testserver":
file = {
"url": "s3 url",
"content_type": "application/pdf",
"name": "my-test-file.pdf",
"text": "once upon a time....",
"processing_status": "uploaded",
"key": name,
"bucket": settings.BUCKET_NAME,
}
return file

response = requests.post(
f"{self.url}/file",
params={
"name": name,
"type": extension,
"location": s3_url,
json={
"key": name,
"bucket": settings.BUCKET_NAME,
},
)
if response.status_code != 201:
Expand Down
18 changes: 1 addition & 17 deletions django_app/redbox_app/redbox_core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,27 +115,11 @@ def upload_view(request):
),
)

# TODO: Handle S3 upload errors
authenticated_s3_url = s3.generate_presigned_url(
"get_object",
Params={
"Bucket": settings.BUCKET_NAME,
"Key": file_key,
},
ExpiresIn=3600,
)
# Strip off the query string (we don't need the keys)
simple_s3_url = authenticated_s3_url.split("?")[0]

# ingest file
api = CoreApiClient(host=settings.CORE_API_HOST, port=settings.CORE_API_PORT)

try:
api.upload_file(
uploaded_file.name,
file_extension,
simple_s3_url,
)
api.upload_file(uploaded_file.name)
# TODO: update improved File object with elastic uuid
uploaded = True
except ValueError as value_error:
Expand Down
2 changes: 1 addition & 1 deletion django_app/redbox_app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@
"s3.amazonaws.com",
)
CSP_SCRIPT_SRC = (
"'self'",
"'self'",
"plausible.io",
"'sha256-GUQ5ad8JK5KmEWmROf3LZd9ge94daqNvd8xy9YS1iDw='",
)
Expand Down
13 changes: 2 additions & 11 deletions ingester/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,9 @@ def file(s3_client, file_pdf_path):
Tagging=f"file_type={file_type}",
)

authenticated_s3_url = s3_client.generate_presigned_url(
"get_object",
Params={"Bucket": env.bucket_name, "Key": file_name},
ExpiresIn=3600,
)

# Strip off the query string (we don't need the keys)
simple_s3_url = authenticated_s3_url.split("?")[0]
file_record = File(
name=file_name,
url=simple_s3_url,
content_type=file_type,
key=file_name,
bucket=env.bucket_name,
)

yield file_record
48 changes: 3 additions & 45 deletions redbox/models/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,53 +18,11 @@ class ProcessingStatusEnum(str, Enum):
complete = "complete"


class ContentType(str, Enum):
EML = ".eml"
HTML = ".html"
HTM = ".htm"
JSON = ".json"
MD = ".md"
MSG = ".msg"
RST = ".rst"
RTF = ".rtf"
TXT = ".txt"
XML = ".xml"
JPEG = ".jpeg" # Must have tesseract installed
PNG = ".png" # Must have tesseract installed
CSV = ".csv"
DOC = ".doc"
DOCX = ".docx"
EPUB = ".epub"
ODT = ".odt"
PDF = ".pdf"
PPT = ".ppt"
PPTX = ".pptx"
TSV = ".tsv"
XLSX = ".xlsx"


class File(PersistableModel):
url: AnyUrl = Field(description="s3 url")
content_type: ContentType = Field(description="content_type of file")
name: str = Field(description="file name")
text: Optional[str] = Field(description="file content", default=None)

@computed_field
def text_hash(self) -> str:
return hashlib.md5(
(self.text or "").encode(encoding="UTF-8", errors="strict"),
usedforsecurity=False,
).hexdigest()

@computed_field
def token_count(self) -> int:
return len(encoding.encode(self.text or ""))
"""Reference to file stored on s3"""

def to_document(self) -> Document:
return Document(
page_content=f"<Doc{self.uuid}>Title: {self.name}\n\n{self.text}</Doc{self.uuid}>\n\n",
metadata={"source": self.url},
)
key: str = Field(description="file key")
bucket: str = Field(description="s3 bucket")


class Chunk(PersistableModel):
Expand Down
2 changes: 1 addition & 1 deletion redbox/parsing/chunkers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
def other_chunker(file: File) -> list[Chunk]:
authenticated_s3_url = s3_client.generate_presigned_url(
"get_object",
Params={"Bucket": env.bucket_name, "Key": file.name},
Params={"Bucket": env.bucket_name, "Key": file.key},
ExpiresIn=3600,
)

Expand Down
29 changes: 28 additions & 1 deletion redbox/parsing/file_chunker.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,37 @@
from enum import Enum

from sentence_transformers import SentenceTransformer

from redbox.models.file import Chunk, ContentType, File
from redbox.models.file import Chunk, File
from redbox.parsing.chunk_clustering import cluster_chunks
from redbox.parsing.chunkers import other_chunker


class ContentType(str, Enum):
EML = ".eml"
HTML = ".html"
HTM = ".htm"
JSON = ".json"
MD = ".md"
MSG = ".msg"
RST = ".rst"
RTF = ".rtf"
TXT = ".txt"
XML = ".xml"
JPEG = ".jpeg" # Must have tesseract installed
PNG = ".png" # Must have tesseract installed
CSV = ".csv"
DOC = ".doc"
DOCX = ".docx"
EPUB = ".epub"
ODT = ".odt"
PDF = ".pdf"
PPT = ".ppt"
PPTX = ".pptx"
TSV = ".tsv"
XLSX = ".xlsx"


class FileChunker:
"""A class to wrap unstructured and generate compliant chunks from files"""

Expand Down
15 changes: 4 additions & 11 deletions tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,15 @@ def test_upload_to_elastic(file_pdf_path, s3_client):
ExtraArgs={"Tagging": "file_type=pdf"},
)

authenticated_s3_url = s3_client.generate_presigned_url(
"get_object",
Params={"Bucket": bucket_name, "Key": file_key},
ExpiresIn=3600,
)

response = requests.post(
url="http://localhost:5002/file",
params={
"name": "filename",
"type": ".pdf",
"location": authenticated_s3_url,
json={
"key": file_key,
"bucket": bucket_name,
},
)
assert response.status_code == 200
file_uuid = response.json()
file_uuid = response.json()["uuid"]

timeout = 120
start_time = time.time()
Expand Down
Loading