diff --git a/core_api/src/publisher_handler.py b/core_api/src/publisher_handler.py new file mode 100644 index 000000000..efd34d086 --- /dev/null +++ b/core_api/src/publisher_handler.py @@ -0,0 +1,23 @@ +from faststream.redis import RedisBroker + +from redbox.models import File + + +class FilePublisher: + """This class is a bit of a hack to overcome a shortcoming (bug?) in faststream + whereby the broker is not automatically connected in sub-applications. + + TODO: fix this properly, or raise an issue against faststream + """ + + def __init__(self, broker: RedisBroker, queue_name: str): + self.connected = False + self.broker = broker + self.queue_name = queue_name + + async def publish(self, file: File): + if not self.connected: + # we only do this once + await self.broker.connect() + self.connected = True + return self.broker.publish(file, self.queue_name) diff --git a/core_api/src/routes/file.py b/core_api/src/routes/file.py index c8367cf71..5f153d28d 100644 --- a/core_api/src/routes/file.py +++ b/core_api/src/routes/file.py @@ -5,6 +5,7 @@ from faststream.redis.fastapi import RedisRouter from pydantic import AnyHttpUrl +from core_api.src.publisher_handler import FilePublisher from redbox.models import Chunk, File, FileStatus, Settings from redbox.storage import ElasticsearchStorageHandler @@ -23,8 +24,8 @@ # === Queues === router = RedisRouter(url=env.redis_url) -publisher = router.publisher(env.ingest_queue_name) +file_publisher = FilePublisher(router.broker, env.ingest_queue_name) # === Storage === @@ -69,8 +70,7 @@ async def create_upload_file(name: str, type: str, location: AnyHttpUrl) -> UUID storage_handler.write_item(file) log.info(f"publishing {file.uuid}") - await router.broker.connect() - await publisher.publish(file) + await file_publisher.publish(file) return file.uuid diff --git a/core_api/tests/conftest.py b/core_api/tests/conftest.py index 02630b0ad..c73a5611f 100644 --- a/core_api/tests/conftest.py +++ b/core_api/tests/conftest.py @@ -87,9 +87,7 @@ def stored_file(elasticsearch_storage_handler, file) -> YieldFixture[File]: @pytest.fixture def chunked_file(elasticsearch_storage_handler, stored_file) -> YieldFixture[File]: for i in range(5): - chunk = Chunk( - text="hello", index=i, parent_file_uuid=stored_file.uuid, metadata={} - ) + chunk = Chunk(text="hello", index=i, parent_file_uuid=stored_file.uuid, metadata={}) elasticsearch_storage_handler.write_item(chunk) elasticsearch_storage_handler.refresh() yield stored_file diff --git a/core_api/tests/routes/test_file.py b/core_api/tests/routes/test_file.py index f2de150ae..efff97d2a 100644 --- a/core_api/tests/routes/test_file.py +++ b/core_api/tests/routes/test_file.py @@ -8,9 +8,7 @@ @pytest.mark.asyncio -async def test_post_file_upload( - s3_client, app_client, elasticsearch_storage_handler, file_pdf_path -): +async def test_post_file_upload(s3_client, app_client, elasticsearch_storage_handler, file_pdf_path): """ Given a new file When I POST it to /file @@ -20,7 +18,6 @@ async def test_post_file_upload( file_key = os.path.basename(file_pdf_path) with open(file_pdf_path, "rb") as f: - s3_client.upload_fileobj( Bucket=env.bucket_name, Fileobj=f, @@ -57,9 +54,7 @@ def test_get_file(app_client, stored_file): assert response.status_code == 200 -def test_delete_file( - s3_client, app_client, elasticsearch_storage_handler, chunked_file -): +def test_delete_file(s3_client, app_client, elasticsearch_storage_handler, chunked_file): """ Given a previously saved file When I DELETE it to /file @@ -67,9 +62,7 @@ def test_delete_file( """ # check assets exist assert s3_client.get_object(Bucket=env.bucket_name, Key=chunked_file.name) - assert elasticsearch_storage_handler.read_item( - item_uuid=chunked_file.uuid, model_type="file" - ) + assert elasticsearch_storage_handler.read_item(item_uuid=chunked_file.uuid, model_type="file") assert elasticsearch_storage_handler.get_file_chunks(chunked_file.uuid) response = app_client.delete(f"/file/{chunked_file.uuid}") @@ -82,9 +75,7 @@ def test_delete_file( s3_client.get_object(Bucket=env.bucket_name, Key=chunked_file.name) with pytest.raises(NotFoundError): - elasticsearch_storage_handler.read_item( - item_uuid=chunked_file.uuid, model_type="file" - ) + elasticsearch_storage_handler.read_item(item_uuid=chunked_file.uuid, model_type="file") assert not elasticsearch_storage_handler.get_file_chunks(chunked_file.uuid)