Skip to content

Commit

Permalink
added publisher-handler
Browse files Browse the repository at this point in the history
  • Loading branch information
George Burton committed Apr 13, 2024
1 parent bfcfd65 commit 8896f46
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 19 deletions.
23 changes: 23 additions & 0 deletions core_api/src/publisher_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from faststream.redis import RedisBroker

from redbox.models import File


class FilePublisher:
"""This class is a bit of a hack to overcome a shortcoming (bug?) in faststream
whereby the broker is not automatically connected in sub-applications.
TODO: fix this properly, or raise an issue against faststream
"""

def __init__(self, broker: RedisBroker, queue_name: str):
self.connected = False
self.broker = broker
self.queue_name = queue_name

async def publish(self, file: File):
if not self.connected:
# we only do this once
await self.broker.connect()
self.connected = True
return self.broker.publish(file, self.queue_name)
6 changes: 3 additions & 3 deletions core_api/src/routes/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from faststream.redis.fastapi import RedisRouter
from pydantic import AnyHttpUrl

from core_api.src.publisher_handler import FilePublisher
from redbox.models import Chunk, File, FileStatus, Settings
from redbox.storage import ElasticsearchStorageHandler

Expand All @@ -23,8 +24,8 @@

# === Queues ===
router = RedisRouter(url=env.redis_url)
publisher = router.publisher(env.ingest_queue_name)

file_publisher = FilePublisher(router.broker, env.ingest_queue_name)

# === Storage ===

Expand Down Expand Up @@ -69,8 +70,7 @@ async def create_upload_file(name: str, type: str, location: AnyHttpUrl) -> UUID
storage_handler.write_item(file)

log.info(f"publishing {file.uuid}")
await router.broker.connect()
await publisher.publish(file)
await file_publisher.publish(file)

return file.uuid

Expand Down
4 changes: 1 addition & 3 deletions core_api/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ def stored_file(elasticsearch_storage_handler, file) -> YieldFixture[File]:
@pytest.fixture
def chunked_file(elasticsearch_storage_handler, stored_file) -> YieldFixture[File]:
for i in range(5):
chunk = Chunk(
text="hello", index=i, parent_file_uuid=stored_file.uuid, metadata={}
)
chunk = Chunk(text="hello", index=i, parent_file_uuid=stored_file.uuid, metadata={})
elasticsearch_storage_handler.write_item(chunk)
elasticsearch_storage_handler.refresh()
yield stored_file
Expand Down
17 changes: 4 additions & 13 deletions core_api/tests/routes/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@


@pytest.mark.asyncio
async def test_post_file_upload(
s3_client, app_client, elasticsearch_storage_handler, file_pdf_path
):
async def test_post_file_upload(s3_client, app_client, elasticsearch_storage_handler, file_pdf_path):
"""
Given a new file
When I POST it to /file
Expand All @@ -20,7 +18,6 @@ async def test_post_file_upload(
file_key = os.path.basename(file_pdf_path)

with open(file_pdf_path, "rb") as f:

s3_client.upload_fileobj(
Bucket=env.bucket_name,
Fileobj=f,
Expand Down Expand Up @@ -57,19 +54,15 @@ def test_get_file(app_client, stored_file):
assert response.status_code == 200


def test_delete_file(
s3_client, app_client, elasticsearch_storage_handler, chunked_file
):
def test_delete_file(s3_client, app_client, elasticsearch_storage_handler, chunked_file):
"""
Given a previously saved file
When I DELETE it to /file
I Expect to see it removed from s3 and elastic-search, including the chunks
"""
# check assets exist
assert s3_client.get_object(Bucket=env.bucket_name, Key=chunked_file.name)
assert elasticsearch_storage_handler.read_item(
item_uuid=chunked_file.uuid, model_type="file"
)
assert elasticsearch_storage_handler.read_item(item_uuid=chunked_file.uuid, model_type="file")
assert elasticsearch_storage_handler.get_file_chunks(chunked_file.uuid)

response = app_client.delete(f"/file/{chunked_file.uuid}")
Expand All @@ -82,9 +75,7 @@ def test_delete_file(
s3_client.get_object(Bucket=env.bucket_name, Key=chunked_file.name)

with pytest.raises(NotFoundError):
elasticsearch_storage_handler.read_item(
item_uuid=chunked_file.uuid, model_type="file"
)
elasticsearch_storage_handler.read_item(item_uuid=chunked_file.uuid, model_type="file")

assert not elasticsearch_storage_handler.get_file_chunks(chunked_file.uuid)

Expand Down

0 comments on commit 8896f46

Please sign in to comment.