From cff986eae5ccafe3bb8714aee9a72a89dda0086f Mon Sep 17 00:00:00 2001 From: ZiniuYu Date: Wed, 10 Aug 2022 18:36:29 +0800 Subject: [PATCH 1/9] feat: do not send blob from server when it is loaded in client --- client/clip_client/client.py | 5 ++++- server/clip_server/executors/helper.py | 2 ++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/client/clip_client/client.py b/client/clip_client/client.py index 425f3bae7..1904cfdfd 100644 --- a/client/clip_client/client.py +++ b/client/clip_client/client.py @@ -160,7 +160,8 @@ def _iter_doc(self, content) -> Generator['Document', None, None]: _mime = mimetypes.guess_type(c)[0] if _mime and _mime.startswith('image'): yield Document( - tags={'__created_by_CAS__': True}, uri=c + tags={'__created_by_CAS__': True, '__loaded_by_CAS__': True}, + uri=c, ).load_uri_to_blob() else: yield Document(tags={'__created_by_CAS__': True}, text=c) @@ -169,6 +170,7 @@ def _iter_doc(self, content) -> Generator['Document', None, None]: yield c elif not c.blob and c.uri: c.load_uri_to_blob() + c.tags['__loaded_by_CAS__'] = True yield c elif c.tensor is not None: yield c @@ -331,6 +333,7 @@ def _prepare_single_doc(d: 'Document'): return d elif not d.blob and d.uri: d.load_uri_to_blob() + d.tags['__loaded_by_CAS__'] = True return d elif d.tensor is not None: return d diff --git a/server/clip_server/executors/helper.py b/server/clip_server/executors/helper.py index 3c97a34de..7a8c1c888 100644 --- a/server/clip_server/executors/helper.py +++ b/server/clip_server/executors/helper.py @@ -38,6 +38,8 @@ def preproc_image( # recover doc content d.content = content + if d.content.tags.pop('__loaded_by_CAS__'): + d.blob = None tensors_batch = torch.stack(tensors_batch).type(torch.float32) From 1fe3028b0dfb8a5bdad8856669ca0e0f66ff4b9c Mon Sep 17 00:00:00 2001 From: ZiniuYu Date: Thu, 11 Aug 2022 11:23:33 +0800 Subject: [PATCH 2/9] fix: pop tensor --- server/clip_server/executors/helper.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/clip_server/executors/helper.py b/server/clip_server/executors/helper.py index 7a8c1c888..421ebccba 100644 --- a/server/clip_server/executors/helper.py +++ b/server/clip_server/executors/helper.py @@ -38,8 +38,8 @@ def preproc_image( # recover doc content d.content = content - if d.content.tags.pop('__loaded_by_CAS__'): - d.blob = None + if d.tags.pop('__loaded_by_CAS__'): + d.pop('tensor') tensors_batch = torch.stack(tensors_batch).type(torch.float32) From 341b7862d78b7c1ac4f16396e924618f0919f6c4 Mon Sep 17 00:00:00 2001 From: ZiniuYu Date: Thu, 11 Aug 2022 11:44:05 +0800 Subject: [PATCH 3/9] fix: pop tensor --- server/clip_server/executors/helper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/clip_server/executors/helper.py b/server/clip_server/executors/helper.py index 421ebccba..04ad91bc5 100644 --- a/server/clip_server/executors/helper.py +++ b/server/clip_server/executors/helper.py @@ -38,7 +38,7 @@ def preproc_image( # recover doc content d.content = content - if d.tags.pop('__loaded_by_CAS__'): + if d.tags.pop('__loaded_by_CAS__', None): d.pop('tensor') tensors_batch = torch.stack(tensors_batch).type(torch.float32) From 801174304f3e533ec18dc742fad4dab7d5aaab07 Mon Sep 17 00:00:00 2001 From: ZiniuYu Date: Thu, 11 Aug 2022 12:09:41 +0800 Subject: [PATCH 4/9] test: remove unused tags --- tests/test_ranker.py | 4 ++++ tests/test_simple.py | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/tests/test_ranker.py b/tests/test_ranker.py index 2269035e3..202779322 100644 --- a/tests/test_ranker.py +++ b/tests/test_ranker.py @@ -30,8 +30,10 @@ async def test_torch_executor_rank_img2texts(encoder_class): for d in da: for c in d.matches: assert c.scores['clip_score'].value is not None + assert '__loaded_by_CAS__' not in c.tags org_score = d.matches[:, 'scores__clip_score__value'] assert org_score == list(sorted(org_score, reverse=True)) + assert '__loaded_by_CAS__' not in d.tags @pytest.mark.asyncio @@ -53,9 +55,11 @@ async def test_torch_executor_rank_text2imgs(encoder_class): for c in d.matches: assert c.scores['clip_score'].value is not None assert c.scores['clip_score_cosine'].value is not None + assert '__loaded_by_CAS__' not in c.tags np.testing.assert_almost_equal( sum(c.scores['clip_score'].value for c in d.matches), 1 ) + assert '__loaded_by_CAS__' not in d.tags @pytest.mark.parametrize( diff --git a/tests/test_simple.py b/tests/test_simple.py index 0f19bacc9..f651ccb01 100644 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -78,6 +78,7 @@ def test_docarray_inputs(make_flow, inputs, port_generator): assert isinstance(r, DocumentArray) assert r.embeddings.shape assert '__created_by_CAS__' not in r[0].tags + assert '__loaded_by_CAS__' not in r[0].tags @pytest.mark.parametrize( @@ -104,6 +105,7 @@ def test_docarray_preserve_original_inputs(make_flow, inputs, port_generator): assert r.embeddings.shape assert r.contents == inputs.contents assert '__created_by_CAS__' not in r[0].tags + assert '__loaded_by_CAS__' not in r[0].tags @pytest.mark.parametrize( @@ -134,5 +136,7 @@ def test_docarray_traversal(make_flow, inputs, port_generator): r2 = c.post(on='/', inputs=da, parameters={'access_paths': '@c'}) assert r1[0].chunks.embeddings.shape[0] == len(inputs) assert '__created_by_CAS__' not in r1[0].tags + assert '__loaded_by_CAS__' not in r1[0].tags assert r2[0].chunks.embeddings.shape[0] == len(inputs) assert '__created_by_CAS__' not in r2[0].tags + assert '__loaded_by_CAS__' not in r2[0].tags From 3ecf61f57385e311f9f8d49e1277a22b0fa74c8d Mon Sep 17 00:00:00 2001 From: ZiniuYu Date: Thu, 11 Aug 2022 13:14:43 +0800 Subject: [PATCH 5/9] test: preseve original docs --- server/clip_server/executors/helper.py | 3 ++- tests/test_ranker.py | 8 ++++++++ tests/test_simple.py | 10 ++++++++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/server/clip_server/executors/helper.py b/server/clip_server/executors/helper.py index 04ad91bc5..8588f4fda 100644 --- a/server/clip_server/executors/helper.py +++ b/server/clip_server/executors/helper.py @@ -37,9 +37,10 @@ def preproc_image( tensors_batch.append(preprocess_fn(d.tensor).detach()) # recover doc content - d.content = content if d.tags.pop('__loaded_by_CAS__', None): d.pop('tensor') + else: + d.content = content tensors_batch = torch.stack(tensors_batch).type(torch.float32) diff --git a/tests/test_ranker.py b/tests/test_ranker.py index 202779322..2cacb7338 100644 --- a/tests/test_ranker.py +++ b/tests/test_ranker.py @@ -31,9 +31,13 @@ async def test_torch_executor_rank_img2texts(encoder_class): for c in d.matches: assert c.scores['clip_score'].value is not None assert '__loaded_by_CAS__' not in c.tags + assert not c.tensor + assert not c.blob org_score = d.matches[:, 'scores__clip_score__value'] assert org_score == list(sorted(org_score, reverse=True)) assert '__loaded_by_CAS__' not in d.tags + assert not d.tensor + assert not d.blob @pytest.mark.asyncio @@ -56,10 +60,14 @@ async def test_torch_executor_rank_text2imgs(encoder_class): assert c.scores['clip_score'].value is not None assert c.scores['clip_score_cosine'].value is not None assert '__loaded_by_CAS__' not in c.tags + assert not c.tensor + assert not c.blob np.testing.assert_almost_equal( sum(c.scores['clip_score'].value for c in d.matches), 1 ) assert '__loaded_by_CAS__' not in d.tags + assert not d.tensor + assert not d.blob @pytest.mark.parametrize( diff --git a/tests/test_simple.py b/tests/test_simple.py index f651ccb01..32a1edbdf 100644 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -79,6 +79,8 @@ def test_docarray_inputs(make_flow, inputs, port_generator): assert r.embeddings.shape assert '__created_by_CAS__' not in r[0].tags assert '__loaded_by_CAS__' not in r[0].tags + assert not r[0].tensor + assert not r[0].blob @pytest.mark.parametrize( @@ -137,6 +139,14 @@ def test_docarray_traversal(make_flow, inputs, port_generator): assert r1[0].chunks.embeddings.shape[0] == len(inputs) assert '__created_by_CAS__' not in r1[0].tags assert '__loaded_by_CAS__' not in r1[0].tags + assert not r1[0].tensor + assert not r1[0].blob + assert not r1[0].chunks[0].tensor + assert not r1[0].chunks[0].blob assert r2[0].chunks.embeddings.shape[0] == len(inputs) assert '__created_by_CAS__' not in r2[0].tags assert '__loaded_by_CAS__' not in r2[0].tags + assert not r2[0].tensor + assert not r2[0].blob + assert not r2[0].chunks[0].tensor + assert not r2[0].chunks[0].blob From 4d61e28704ac8e3929d2773adb05998b0a5faccf Mon Sep 17 00:00:00 2001 From: ZiniuYu Date: Thu, 11 Aug 2022 15:15:06 +0800 Subject: [PATCH 6/9] fix: reset input --- client/clip_client/client.py | 22 ++++++++++++++++++++++ server/clip_server/executors/helper.py | 2 +- tests/conftest.py | 2 +- tests/test_ranker.py | 6 ++++++ tests/test_simple.py | 2 ++ 5 files changed, 32 insertions(+), 2 deletions(-) diff --git a/client/clip_client/client.py b/client/clip_client/client.py index 1904cfdfd..5ede61448 100644 --- a/client/clip_client/client.py +++ b/client/clip_client/client.py @@ -104,6 +104,8 @@ def encode( ... def encode(self, content, **kwargs): + from docarray import Document + if isinstance(content, str): raise TypeError( f'content must be an Iterable of [str, Document], try `.encode(["{content}"])` instead' @@ -119,6 +121,11 @@ def encode(self, content, **kwargs): **self._get_post_payload(content, kwargs), on_done=partial(self._gather_result, results=results), ) + + for c in content: + if isinstance(c, Document) and c.tags.pop('__loaded_by_CAS__', False): + c.pop('blob') + return self._unboxed_result(results) def _gather_result(self, response, results: 'DocumentArray'): @@ -349,6 +356,18 @@ def _prepare_rank_doc(d: 'Document', _source: str = 'matches'): setattr(d, _source, [Client._prepare_single_doc(c) for c in _get(d)]) return d + @staticmethod + def _reset_rank_doc(d: 'Document', _source: str = 'matches'): + _get = lambda d: getattr(d, _source) + + if d.tags.pop('__loaded_by_CAS__', False): + d.pop('blob') + + for c in _get(d): + if c.tags.pop('__loaded_by_CAS__', False): + c.pop('blob') + return d + def _iter_rank_docs( self, content, _source='matches' ) -> Generator['Document', None, None]: @@ -411,6 +430,9 @@ def rank(self, docs: Iterable['Document'], **kwargs) -> 'DocumentArray': **self._get_rank_payload(docs, kwargs), on_done=partial(self._gather_result, results=results), ) + for d in docs: + self._reset_rank_doc(d, _source=kwargs.get('source', 'matches')) + return results async def arank(self, docs: Iterable['Document'], **kwargs) -> 'DocumentArray': diff --git a/server/clip_server/executors/helper.py b/server/clip_server/executors/helper.py index 8588f4fda..bfe852d7c 100644 --- a/server/clip_server/executors/helper.py +++ b/server/clip_server/executors/helper.py @@ -37,7 +37,7 @@ def preproc_image( tensors_batch.append(preprocess_fn(d.tensor).detach()) # recover doc content - if d.tags.pop('__loaded_by_CAS__', None): + if d.tags.pop('__loaded_by_CAS__', False): d.pop('tensor') else: d.content = content diff --git a/tests/conftest.py b/tests/conftest.py index 280970a27..f7c668d04 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -16,7 +16,7 @@ def random_port(): return random_port -@pytest.fixture(scope='session', params=['onnx', 'torch', 'onnx_custom']) +@pytest.fixture(scope='session', params=['onnx']) def make_flow(port_generator, request): if request.param != 'onnx_custom': if request.param == 'onnx': diff --git a/tests/test_ranker.py b/tests/test_ranker.py index 2cacb7338..d7c39af34 100644 --- a/tests/test_ranker.py +++ b/tests/test_ranker.py @@ -91,6 +91,12 @@ async def test_torch_executor_rank_text2imgs(encoder_class): def test_docarray_inputs(make_flow, d): c = Client(server=f'grpc://0.0.0.0:{make_flow.port}') r = c.rank([d]) + assert '__loaded_by_CAS__' not in d.tags + assert not d.blob + assert not d.tensor + assert '__loaded_by_CAS__' not in d.matches[0].tags + assert not d.matches[0].blob + assert not d.matches[0].tensor assert isinstance(r, DocumentArray) rv1 = r['@m', 'scores__clip_score__value'] rv2 = r['@m', 'scores__clip_score_cosine__value'] diff --git a/tests/test_simple.py b/tests/test_simple.py index 32a1edbdf..066026822 100644 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -108,6 +108,8 @@ def test_docarray_preserve_original_inputs(make_flow, inputs, port_generator): assert r.contents == inputs.contents assert '__created_by_CAS__' not in r[0].tags assert '__loaded_by_CAS__' not in r[0].tags + assert not r[0].tensor + assert not r[0].blob @pytest.mark.parametrize( From 7f6fb2a921d1c1c13de24aca7e0091e0f2a6eb6e Mon Sep 17 00:00:00 2001 From: ZiniuYu Date: Thu, 11 Aug 2022 15:17:14 +0800 Subject: [PATCH 7/9] chore: clean up --- tests/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/conftest.py b/tests/conftest.py index f7c668d04..280970a27 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -16,7 +16,7 @@ def random_port(): return random_port -@pytest.fixture(scope='session', params=['onnx']) +@pytest.fixture(scope='session', params=['onnx', 'torch', 'onnx_custom']) def make_flow(port_generator, request): if request.param != 'onnx_custom': if request.param == 'onnx': From f4f31468ef8b93750612da6be0538e63ab6c0979 Mon Sep 17 00:00:00 2001 From: ZiniuYu Date: Fri, 12 Aug 2022 18:41:26 +0800 Subject: [PATCH 8/9] test: preproc_image --- client/clip_client/client.py | 4 +--- tests/test_helper.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/client/clip_client/client.py b/client/clip_client/client.py index 5ede61448..991de3af4 100644 --- a/client/clip_client/client.py +++ b/client/clip_client/client.py @@ -104,8 +104,6 @@ def encode( ... def encode(self, content, **kwargs): - from docarray import Document - if isinstance(content, str): raise TypeError( f'content must be an Iterable of [str, Document], try `.encode(["{content}"])` instead' @@ -123,7 +121,7 @@ def encode(self, content, **kwargs): ) for c in content: - if isinstance(c, Document) and c.tags.pop('__loaded_by_CAS__', False): + if hasattr(c, 'tags') and c.tags.pop('__loaded_by_CAS__', False): c.pop('blob') return self._unboxed_result(results) diff --git a/tests/test_helper.py b/tests/test_helper.py index f7d79ac62..49cbfe88f 100644 --- a/tests/test_helper.py +++ b/tests/test_helper.py @@ -2,6 +2,7 @@ import numpy as np from clip_server.executors.helper import numpy_softmax from clip_server.executors.helper import split_img_txt_da +from clip_server.executors.helper import preproc_image from docarray import Document, DocumentArray @@ -77,3 +78,32 @@ def test_split_img_txt_da(inputs): split_img_txt_da(doc, img_da, txt_da) assert len(txt_da) == inputs[1][0] assert len(img_da) == inputs[1][1] + + +@pytest.mark.parametrize( + 'inputs', + [ + DocumentArray( + [ + Document( + uri='https://docarray.jina.ai/_static/favicon.png', + tags={'__loaded_by_CAS__': True}, + ).load_uri_to_blob(), + Document( + uri='https://docarray.jina.ai/_static/favicon.png', + ).load_uri_to_blob(), + ] + ) + ], +) +def test_preproc_image(inputs): + from clip_server.model import clip + + preprocess_fn = clip._transform_ndarray(224) + da, pixel_values = preproc_image(inputs, preprocess_fn) + assert len(da) == 2 + assert not da[0].blob + assert da[1].blob + assert not da[0].tensor + assert not da[1].tensor + assert pixel_values.get('pixel_values') is not None From 05786c80b05b41d4356f9837d4f1df8624d87b2a Mon Sep 17 00:00:00 2001 From: ZiniuYu Date: Fri, 12 Aug 2022 19:08:25 +0800 Subject: [PATCH 9/9] test: rank preserve input --- client/clip_client/client.py | 7 +++++++ tests/test_asyncio.py | 37 +++++++++++++++++++++++++++++++++++- tests/test_ranker.py | 4 ++++ 3 files changed, 47 insertions(+), 1 deletion(-) diff --git a/client/clip_client/client.py b/client/clip_client/client.py index 991de3af4..417566f6d 100644 --- a/client/clip_client/client.py +++ b/client/clip_client/client.py @@ -308,6 +308,10 @@ async def aencode(self, content, **kwargs): ), ) + for c in content: + if hasattr(c, 'tags') and c.tags.pop('__loaded_by_CAS__', False): + c.pop('blob') + return self._unboxed_result(results) def _prepare_streaming(self, disable, total): @@ -458,4 +462,7 @@ async def arank(self, docs: Iterable['Document'], **kwargs) -> 'DocumentArray': ), ) + for d in docs: + self._reset_rank_doc(d, _source=kwargs.get('source', 'matches')) + return results diff --git a/tests/test_asyncio.py b/tests/test_asyncio.py index dd52b2acd..3ab177746 100644 --- a/tests/test_asyncio.py +++ b/tests/test_asyncio.py @@ -1,8 +1,9 @@ import asyncio - +import os import pytest from clip_client import Client +from docarray import Document, DocumentArray async def another_heavylifting_job(): @@ -16,3 +17,37 @@ async def test_async_encode(make_flow): t2 = asyncio.create_task(c.aencode(['hello world'] * 10)) await asyncio.gather(t1, t2) assert t2.result().shape + + +@pytest.mark.parametrize( + 'inputs', + [ + DocumentArray([Document(text='hello, world'), Document(text='goodbye, world')]), + DocumentArray( + [ + Document( + uri='https://docarray.jina.ai/_static/favicon.png', + text='hello, world', + ), + ] + ), + DocumentArray.from_files( + f'{os.path.dirname(os.path.abspath(__file__))}/**/*.jpg' + ), + ], +) +@pytest.mark.asyncio +async def test_async_docarray_preserve_original_inputs( + make_flow, inputs, port_generator +): + c = Client(server=f'grpc://0.0.0.0:{make_flow.port}') + t1 = asyncio.create_task(another_heavylifting_job()) + t2 = asyncio.create_task(c.aencode(inputs if not callable(inputs) else inputs())) + await asyncio.gather(t1, t2) + assert isinstance(t2.result(), DocumentArray) + assert t2.result().embeddings.shape + assert t2.result().contents == inputs.contents + assert '__created_by_CAS__' not in t2.result()[0].tags + assert '__loaded_by_CAS__' not in t2.result()[0].tags + assert not t2.result()[0].tensor + assert not t2.result()[0].blob diff --git a/tests/test_ranker.py b/tests/test_ranker.py index d7c39af34..f5f177484 100644 --- a/tests/test_ranker.py +++ b/tests/test_ranker.py @@ -91,6 +91,8 @@ async def test_torch_executor_rank_text2imgs(encoder_class): def test_docarray_inputs(make_flow, d): c = Client(server=f'grpc://0.0.0.0:{make_flow.port}') r = c.rank([d]) + assert r[0].content == d.content + assert r[0].matches.contents == d.matches.contents assert '__loaded_by_CAS__' not in d.tags assert not d.blob assert not d.tensor @@ -130,6 +132,8 @@ async def test_async_arank(make_flow, d): c = Client(server=f'grpc://0.0.0.0:{make_flow.port}') r = await c.arank([d]) assert isinstance(r, DocumentArray) + assert r[0].content == d.content + assert r[0].matches.contents == d.matches.contents rv = r['@m', 'scores__clip_score__value'] for v in rv: assert v is not None