From 997bfb6c34a5b18123fd2c454f6c48725f6a6a99 Mon Sep 17 00:00:00 2001 From: ZiniuYu Date: Thu, 8 Sep 2022 11:25:33 +0800 Subject: [PATCH 1/9] test: clip_client in place --- tests/test_asyncio.py | 1 + tests/test_tensorrt.py | 1 + 2 files changed, 2 insertions(+) diff --git a/tests/test_asyncio.py b/tests/test_asyncio.py index 0501a98f8..21951575b 100644 --- a/tests/test_asyncio.py +++ b/tests/test_asyncio.py @@ -43,6 +43,7 @@ async def test_async_docarray_preserve_original_inputs(make_flow, inputs): t2 = asyncio.create_task(c.aencode(inputs if not callable(inputs) else inputs())) await asyncio.gather(t1, t2) assert isinstance(t2.result(), DocumentArray) + assert inputs[0] is t2.result()[0] assert t2.result().embeddings.shape assert t2.result().contents == inputs.contents assert '__created_by_CAS__' not in t2.result()[0].tags diff --git a/tests/test_tensorrt.py b/tests/test_tensorrt.py index 7752073bb..d6343a54a 100644 --- a/tests/test_tensorrt.py +++ b/tests/test_tensorrt.py @@ -36,6 +36,7 @@ def test_docarray_inputs(make_trt_flow, inputs): c = Client(server=f'grpc://0.0.0.0:{make_trt_flow.port}') r = c.encode(inputs if not callable(inputs) else inputs()) assert isinstance(r, DocumentArray) + assert inputs[0] is r[0] assert r.embeddings.shape if hasattr(inputs, '__len__'): assert inputs[0] is r[0] From 4ee3dcddeac10b42071a9c5cc711e5a620b2cb9d Mon Sep 17 00:00:00 2001 From: ZiniuYu Date: Thu, 8 Sep 2022 11:48:43 +0800 Subject: [PATCH 2/9] test: empty input --- tests/test_tensorrt.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_tensorrt.py b/tests/test_tensorrt.py index d6343a54a..7752073bb 100644 --- a/tests/test_tensorrt.py +++ b/tests/test_tensorrt.py @@ -36,7 +36,6 @@ def test_docarray_inputs(make_trt_flow, inputs): c = Client(server=f'grpc://0.0.0.0:{make_trt_flow.port}') r = c.encode(inputs if not callable(inputs) else inputs()) assert isinstance(r, DocumentArray) - assert inputs[0] is r[0] assert r.embeddings.shape if hasattr(inputs, '__len__'): assert inputs[0] is r[0] From eadd19ed8bc0e876cba55109376a6205a18da05c Mon Sep 17 00:00:00 2001 From: ZiniuYu Date: Fri, 16 Sep 2022 16:24:05 +0800 Subject: [PATCH 3/9] test: generator rank --- client/clip_client/client.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/client/clip_client/client.py b/client/clip_client/client.py index 4b91583e7..f689a5b62 100644 --- a/client/clip_client/client.py +++ b/client/clip_client/client.py @@ -443,8 +443,12 @@ def _prepare_rank_doc(d: 'Document', _source: str = 'matches'): def _reset_rank_doc(d: 'Document', _source: str = 'matches'): _get = lambda d: getattr(d, _source) + print(123123) if d.tags.pop('__loaded_by_CAS__', False): + print(111) d.pop('blob') + else: + print(222) for c in _get(d): if c.tags.pop('__loaded_by_CAS__', False): From c2e8e8e7a39b67d96fc92f536beab64e210ad375 Mon Sep 17 00:00:00 2001 From: ZiniuYu Date: Fri, 16 Sep 2022 17:21:29 +0800 Subject: [PATCH 4/9] fix: wrong input exception --- client/clip_client/client.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/client/clip_client/client.py b/client/clip_client/client.py index f689a5b62..4b91583e7 100644 --- a/client/clip_client/client.py +++ b/client/clip_client/client.py @@ -443,12 +443,8 @@ def _prepare_rank_doc(d: 'Document', _source: str = 'matches'): def _reset_rank_doc(d: 'Document', _source: str = 'matches'): _get = lambda d: getattr(d, _source) - print(123123) if d.tags.pop('__loaded_by_CAS__', False): - print(111) d.pop('blob') - else: - print(222) for c in _get(d): if c.tags.pop('__loaded_by_CAS__', False): From e57d21e33ad23d446dec89ef090713f2614440ab Mon Sep 17 00:00:00 2001 From: ZiniuYu Date: Thu, 8 Sep 2022 23:30:56 +0800 Subject: [PATCH 5/9] feat: option to remove image content after encoding to save space --- server/clip_server/executors/clip_onnx.py | 21 +++++++++++++++---- server/clip_server/executors/clip_tensorrt.py | 17 ++++++++++++--- server/clip_server/executors/clip_torch.py | 19 ++++++++++++++--- server/clip_server/executors/helper.py | 7 ++++--- 4 files changed, 51 insertions(+), 13 deletions(-) diff --git a/server/clip_server/executors/clip_onnx.py b/server/clip_server/executors/clip_onnx.py index c14da999d..7e466695f 100644 --- a/server/clip_server/executors/clip_onnx.py +++ b/server/clip_server/executors/clip_onnx.py @@ -2,6 +2,7 @@ import warnings from multiprocessing.pool import ThreadPool from typing import Optional, Dict +from functools import partial import onnxruntime as ort from clip_server.executors.helper import ( @@ -25,6 +26,7 @@ def __init__( minibatch_size: int = 32, access_paths: str = '@r', model_path: Optional[str] = None, + drop_image_content: bool = False, **kwargs, ): """ @@ -39,10 +41,12 @@ def __init__( :param model_path: The path to the model to be used. If not specified, the model will be downloaded or loaded from the local cache. Visit https://clip-as-service.jina.ai/user-guides/server/#use-custom-model-for-onnx to learn how to finetune custom models. + :param drop_image_content: Whether to drop the image content from the input documents. Default is False. """ super().__init__(**kwargs) self._minibatch_size = minibatch_size + self._drop_image_content = drop_image_content self._access_paths = access_paths if 'traversal_paths' in kwargs: warnings.warn( @@ -99,13 +103,16 @@ def __init__( self._model.start_sessions(sess_options=sess_options, providers=providers) - def _preproc_images(self, docs: 'DocumentArray'): + def _preproc_images(self, docs: 'DocumentArray', drop_image_content: bool): with self.monitor( name='preprocess_images_seconds', documentation='images preprocess time in seconds', ): return preproc_image( - docs, preprocess_fn=self._image_transform, return_np=True + docs, + preprocess_fn=self._image_transform, + return_np=True, + drop_image_content=drop_image_content, ) def _preproc_texts(self, docs: 'DocumentArray'): @@ -117,7 +124,10 @@ def _preproc_texts(self, docs: 'DocumentArray'): @requests(on='/rank') async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs): - await self.encode(docs['@r,m']) + drop_image_content = parameters.get( + 'drop_image_content', self._drop_image_content + ) + await self.encode(docs['@r,m'], drop_image_content=drop_image_content) set_rank(docs) @@ -129,6 +139,9 @@ async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs): f'`traversal_paths` is deprecated. Use `access_paths` instead.' ) access_paths = parameters['traversal_paths'] + drop_image_content = parameters.get( + 'drop_image_content', self._drop_image_content + ) _img_da = DocumentArray() _txt_da = DocumentArray() @@ -138,7 +151,7 @@ async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs): # for image if _img_da: for minibatch, batch_data in _img_da.map_batch( - self._preproc_images, + partial(self._preproc_images, drop_image_content=drop_image_content), batch_size=self._minibatch_size, pool=self._pool, ): diff --git a/server/clip_server/executors/clip_tensorrt.py b/server/clip_server/executors/clip_tensorrt.py index 0f13bd52e..a55bba283 100644 --- a/server/clip_server/executors/clip_tensorrt.py +++ b/server/clip_server/executors/clip_tensorrt.py @@ -1,6 +1,7 @@ import warnings from multiprocessing.pool import ThreadPool from typing import Optional, Dict +from functools import partial import numpy as np from clip_server.executors.helper import ( @@ -23,6 +24,7 @@ def __init__( num_worker_preprocess: int = 4, minibatch_size: int = 32, access_paths: str = '@r', + drop_image_content: bool = False, **kwargs, ): """ @@ -34,12 +36,14 @@ def __init__( number if you encounter OOM errors. :param access_paths: The access paths to traverse on the input documents to get the images and texts to be processed. Visit https://docarray.jina.ai/fundamentals/documentarray/access-elements for more details. + :param drop_image_content: Whether to drop the image content from the input documents. Default is False. """ super().__init__(**kwargs) self._pool = ThreadPool(processes=num_worker_preprocess) self._minibatch_size = minibatch_size + self._drop_image_content = drop_image_content self._access_paths = access_paths if 'traversal_paths' in kwargs: warnings.warn( @@ -67,7 +71,7 @@ def __init__( self._tokenizer = Tokenizer(name) self._image_transform = clip._transform_ndarray(self._model.image_size) - def _preproc_images(self, docs: 'DocumentArray'): + def _preproc_images(self, docs: 'DocumentArray', drop_image_content: bool): with self.monitor( name='preprocess_images_seconds', documentation='images preprocess time in seconds', @@ -77,6 +81,7 @@ def _preproc_images(self, docs: 'DocumentArray'): preprocess_fn=self._image_transform, device=self._device, return_np=False, + drop_image_content=drop_image_content, ) def _preproc_texts(self, docs: 'DocumentArray'): @@ -90,7 +95,10 @@ def _preproc_texts(self, docs: 'DocumentArray'): @requests(on='/rank') async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs): - await self.encode(docs['@r,m']) + drop_image_content = parameters.get( + 'drop_image_content', self._drop_image_content + ) + await self.encode(docs['@r,m'], drop_image_content=drop_image_content) set_rank(docs) @@ -102,6 +110,9 @@ async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs): f'`traversal_paths` is deprecated. Use `access_paths` instead.' ) access_paths = parameters['traversal_paths'] + drop_image_content = parameters.get( + 'drop_image_content', self._drop_image_content + ) _img_da = DocumentArray() _txt_da = DocumentArray() @@ -111,7 +122,7 @@ async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs): # for image if _img_da: for minibatch, batch_data in _img_da.map_batch( - self._preproc_images, + partial(self._preproc_images, drop_image_content=drop_image_content), batch_size=self._minibatch_size, pool=self._pool, ): diff --git a/server/clip_server/executors/clip_torch.py b/server/clip_server/executors/clip_torch.py index 64edc8236..7fabad538 100644 --- a/server/clip_server/executors/clip_torch.py +++ b/server/clip_server/executors/clip_torch.py @@ -2,6 +2,7 @@ import warnings from multiprocessing.pool import ThreadPool from typing import Optional, Dict +from functools import partial import numpy as np import torch @@ -26,6 +27,7 @@ def __init__( num_worker_preprocess: int = 4, minibatch_size: int = 32, access_paths: str = '@r', + drop_image_content: bool = False, **kwargs, ): """ @@ -38,10 +40,12 @@ def __init__( number if you encounter OOM errors. :param access_paths: The access paths to traverse on the input documents to get the images and texts to be processed. Visit https://docarray.jina.ai/fundamentals/documentarray/access-elements for more details. + :param drop_image_content: Whether to drop the image content from the input documents. Default is False. """ super().__init__(**kwargs) self._minibatch_size = minibatch_size + self._drop_image_content = drop_image_content self._access_paths = access_paths if 'traversal_paths' in kwargs: warnings.warn( @@ -77,7 +81,7 @@ def __init__( self._tokenizer = Tokenizer(name) self._image_transform = clip._transform_ndarray(self._model.image_size) - def _preproc_images(self, docs: 'DocumentArray'): + def _preproc_images(self, docs: 'DocumentArray', drop_image_content: bool): with self.monitor( name='preprocess_images_seconds', documentation='images preprocess time in seconds', @@ -87,6 +91,7 @@ def _preproc_images(self, docs: 'DocumentArray'): preprocess_fn=self._image_transform, device=self._device, return_np=False, + drop_image_content=drop_image_content, ) def _preproc_texts(self, docs: 'DocumentArray'): @@ -100,7 +105,10 @@ def _preproc_texts(self, docs: 'DocumentArray'): @requests(on='/rank') async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs): - await self.encode(docs['@r,m']) + drop_image_content = parameters.get( + 'drop_image_content', self._drop_image_content + ) + await self.encode(docs['@r,m'], drop_image_content=drop_image_content) set_rank(docs) @@ -112,6 +120,9 @@ async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs): f'`traversal_paths` is deprecated. Use `access_paths` instead.' ) access_paths = parameters['traversal_paths'] + drop_image_content = parameters.get( + 'drop_image_content', self._drop_image_content + ) _img_da = DocumentArray() _txt_da = DocumentArray() @@ -122,7 +133,9 @@ async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs): # for image if _img_da: for minibatch, batch_data in _img_da.map_batch( - self._preproc_images, + partial( + self._preproc_images, drop_image_content=drop_image_content + ), batch_size=self._minibatch_size, pool=self._pool, ): diff --git a/server/clip_server/executors/helper.py b/server/clip_server/executors/helper.py index bfe852d7c..ba9014b97 100644 --- a/server/clip_server/executors/helper.py +++ b/server/clip_server/executors/helper.py @@ -21,6 +21,7 @@ def preproc_image( preprocess_fn: Callable, device: str = 'cpu', return_np: bool = False, + drop_image_content: bool = False, ) -> Tuple['DocumentArray', Dict]: tensors_batch = [] @@ -37,10 +38,10 @@ def preproc_image( tensors_batch.append(preprocess_fn(d.tensor).detach()) # recover doc content - if d.tags.pop('__loaded_by_CAS__', False): + d.content = content + if drop_image_content: + d.pop('blob') d.pop('tensor') - else: - d.content = content tensors_batch = torch.stack(tensors_batch).type(torch.float32) From d5978a12fecabbf7edecf2d02f20f538c8d4712d Mon Sep 17 00:00:00 2001 From: ZiniuYu Date: Fri, 9 Sep 2022 16:46:40 +0800 Subject: [PATCH 6/9] fix: tmp remove test --- tests/test_helper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_helper.py b/tests/test_helper.py index 49cbfe88f..a1c98826b 100644 --- a/tests/test_helper.py +++ b/tests/test_helper.py @@ -102,7 +102,7 @@ def test_preproc_image(inputs): preprocess_fn = clip._transform_ndarray(224) da, pixel_values = preproc_image(inputs, preprocess_fn) assert len(da) == 2 - assert not da[0].blob + # assert not da[0].blob assert da[1].blob assert not da[0].tensor assert not da[1].tensor From 2b55f302811202c59423879871797d38b0b48616 Mon Sep 17 00:00:00 2001 From: ZiniuYu Date: Fri, 9 Sep 2022 16:49:20 +0800 Subject: [PATCH 7/9] fix: remove drop_image_content from init --- server/clip_server/executors/clip_onnx.py | 15 ++++----------- server/clip_server/executors/clip_tensorrt.py | 15 ++++----------- server/clip_server/executors/clip_torch.py | 15 ++++----------- 3 files changed, 12 insertions(+), 33 deletions(-) diff --git a/server/clip_server/executors/clip_onnx.py b/server/clip_server/executors/clip_onnx.py index 7e466695f..9dff2ff21 100644 --- a/server/clip_server/executors/clip_onnx.py +++ b/server/clip_server/executors/clip_onnx.py @@ -26,7 +26,6 @@ def __init__( minibatch_size: int = 32, access_paths: str = '@r', model_path: Optional[str] = None, - drop_image_content: bool = False, **kwargs, ): """ @@ -41,12 +40,10 @@ def __init__( :param model_path: The path to the model to be used. If not specified, the model will be downloaded or loaded from the local cache. Visit https://clip-as-service.jina.ai/user-guides/server/#use-custom-model-for-onnx to learn how to finetune custom models. - :param drop_image_content: Whether to drop the image content from the input documents. Default is False. """ super().__init__(**kwargs) self._minibatch_size = minibatch_size - self._drop_image_content = drop_image_content self._access_paths = access_paths if 'traversal_paths' in kwargs: warnings.warn( @@ -124,10 +121,8 @@ def _preproc_texts(self, docs: 'DocumentArray'): @requests(on='/rank') async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs): - drop_image_content = parameters.get( - 'drop_image_content', self._drop_image_content - ) - await self.encode(docs['@r,m'], drop_image_content=drop_image_content) + _drop_image_content = parameters.get('drop_image_content', False) + await self.encode(docs['@r,m'], drop_image_content=_drop_image_content) set_rank(docs) @@ -139,9 +134,7 @@ async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs): f'`traversal_paths` is deprecated. Use `access_paths` instead.' ) access_paths = parameters['traversal_paths'] - drop_image_content = parameters.get( - 'drop_image_content', self._drop_image_content - ) + _drop_image_content = parameters.get('drop_image_content', False) _img_da = DocumentArray() _txt_da = DocumentArray() @@ -151,7 +144,7 @@ async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs): # for image if _img_da: for minibatch, batch_data in _img_da.map_batch( - partial(self._preproc_images, drop_image_content=drop_image_content), + partial(self._preproc_images, drop_image_content=_drop_image_content), batch_size=self._minibatch_size, pool=self._pool, ): diff --git a/server/clip_server/executors/clip_tensorrt.py b/server/clip_server/executors/clip_tensorrt.py index a55bba283..24a9a6f7b 100644 --- a/server/clip_server/executors/clip_tensorrt.py +++ b/server/clip_server/executors/clip_tensorrt.py @@ -24,7 +24,6 @@ def __init__( num_worker_preprocess: int = 4, minibatch_size: int = 32, access_paths: str = '@r', - drop_image_content: bool = False, **kwargs, ): """ @@ -36,14 +35,12 @@ def __init__( number if you encounter OOM errors. :param access_paths: The access paths to traverse on the input documents to get the images and texts to be processed. Visit https://docarray.jina.ai/fundamentals/documentarray/access-elements for more details. - :param drop_image_content: Whether to drop the image content from the input documents. Default is False. """ super().__init__(**kwargs) self._pool = ThreadPool(processes=num_worker_preprocess) self._minibatch_size = minibatch_size - self._drop_image_content = drop_image_content self._access_paths = access_paths if 'traversal_paths' in kwargs: warnings.warn( @@ -95,10 +92,8 @@ def _preproc_texts(self, docs: 'DocumentArray'): @requests(on='/rank') async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs): - drop_image_content = parameters.get( - 'drop_image_content', self._drop_image_content - ) - await self.encode(docs['@r,m'], drop_image_content=drop_image_content) + _drop_image_content = parameters.get('drop_image_content', False) + await self.encode(docs['@r,m'], drop_image_content=_drop_image_content) set_rank(docs) @@ -110,9 +105,7 @@ async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs): f'`traversal_paths` is deprecated. Use `access_paths` instead.' ) access_paths = parameters['traversal_paths'] - drop_image_content = parameters.get( - 'drop_image_content', self._drop_image_content - ) + _drop_image_content = parameters.get('drop_image_content', False) _img_da = DocumentArray() _txt_da = DocumentArray() @@ -122,7 +115,7 @@ async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs): # for image if _img_da: for minibatch, batch_data in _img_da.map_batch( - partial(self._preproc_images, drop_image_content=drop_image_content), + partial(self._preproc_images, drop_image_content=_drop_image_content), batch_size=self._minibatch_size, pool=self._pool, ): diff --git a/server/clip_server/executors/clip_torch.py b/server/clip_server/executors/clip_torch.py index 7fabad538..d953cebc7 100644 --- a/server/clip_server/executors/clip_torch.py +++ b/server/clip_server/executors/clip_torch.py @@ -27,7 +27,6 @@ def __init__( num_worker_preprocess: int = 4, minibatch_size: int = 32, access_paths: str = '@r', - drop_image_content: bool = False, **kwargs, ): """ @@ -40,12 +39,10 @@ def __init__( number if you encounter OOM errors. :param access_paths: The access paths to traverse on the input documents to get the images and texts to be processed. Visit https://docarray.jina.ai/fundamentals/documentarray/access-elements for more details. - :param drop_image_content: Whether to drop the image content from the input documents. Default is False. """ super().__init__(**kwargs) self._minibatch_size = minibatch_size - self._drop_image_content = drop_image_content self._access_paths = access_paths if 'traversal_paths' in kwargs: warnings.warn( @@ -105,10 +102,8 @@ def _preproc_texts(self, docs: 'DocumentArray'): @requests(on='/rank') async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs): - drop_image_content = parameters.get( - 'drop_image_content', self._drop_image_content - ) - await self.encode(docs['@r,m'], drop_image_content=drop_image_content) + _drop_image_content = parameters.get('drop_image_content', False) + await self.encode(docs['@r,m'], drop_image_content=_drop_image_content) set_rank(docs) @@ -120,9 +115,7 @@ async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs): f'`traversal_paths` is deprecated. Use `access_paths` instead.' ) access_paths = parameters['traversal_paths'] - drop_image_content = parameters.get( - 'drop_image_content', self._drop_image_content - ) + _drop_image_content = parameters.get('drop_image_content', False) _img_da = DocumentArray() _txt_da = DocumentArray() @@ -134,7 +127,7 @@ async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs): if _img_da: for minibatch, batch_data in _img_da.map_batch( partial( - self._preproc_images, drop_image_content=drop_image_content + self._preproc_images, drop_image_content=_drop_image_content ), batch_size=self._minibatch_size, pool=self._pool, From 9a86d99df27a78494e5e45ca440844d4ed3cf7d1 Mon Sep 17 00:00:00 2001 From: ZiniuYu Date: Fri, 16 Sep 2022 23:31:48 +0800 Subject: [PATCH 8/9] fix: default value for drop_image_content in client --- client/clip_client/client.py | 86 ++++++++++---------------- server/clip_server/executors/helper.py | 3 +- tests/test_asyncio.py | 3 - tests/test_client.py | 16 +++++ tests/test_helper.py | 12 +--- tests/test_ranker.py | 11 ---- tests/test_simple.py | 10 --- 7 files changed, 53 insertions(+), 88 deletions(-) diff --git a/client/clip_client/client.py b/client/clip_client/client.py index 4b91583e7..8e5c7d040 100644 --- a/client/clip_client/client.py +++ b/client/clip_client/client.py @@ -176,17 +176,15 @@ def _iter_doc( _mime = mimetypes.guess_type(c)[0] if _mime and _mime.startswith('image'): d = Document( - tags={'__created_by_CAS__': True, '__loaded_by_CAS__': True}, uri=c, ).load_uri_to_blob() else: - d = Document(tags={'__created_by_CAS__': True}, text=c) + d = Document(text=c) elif isinstance(c, Document): if c.content_type in ('text', 'blob'): d = c elif not c.blob and c.uri: c.load_uri_to_blob() - c.tags['__loaded_by_CAS__'] = True d = c elif c.tensor is not None: d = c @@ -288,8 +286,10 @@ def encode(self, content, **kwargs): results = DocumentArray() with self._pbar: - parameters = kwargs.pop('parameters', None) + parameters = kwargs.pop('parameters', {}) + parameters['drop_image_content'] = True model_name = parameters.pop('model_name', '') if parameters else '' + self._client.post( on=f'/encode/{model_name}'.rstrip('/'), **self._get_post_payload(content, results, kwargs), @@ -299,10 +299,6 @@ def encode(self, content, **kwargs): parameters=parameters, ) - for r in results: - if hasattr(r, 'tags') and r.tags.pop('__loaded_by_CAS__', False): - r.pop('blob') - unbox = hasattr(content, '__len__') and isinstance(content[0], str) return self._unboxed_result(results, unbox) @@ -345,7 +341,8 @@ async def aencode(self, content, **kwargs): results = DocumentArray() with self._pbar: - parameters = kwargs.pop('parameters', None) + parameters = kwargs.pop('parameters', {}) + parameters['drop_image_content'] = True model_name = parameters.get('model_name', '') if parameters else '' async for da in self._async_client.post( @@ -367,10 +364,6 @@ async def aencode(self, content, **kwargs): ), ) - for r in results: - if hasattr(r, 'tags') and r.tags.pop('__loaded_by_CAS__', False): - r.pop('blob') - unbox = hasattr(content, '__len__') and isinstance(content[0], str) return self._unboxed_result(results, unbox) @@ -423,7 +416,6 @@ def _prepare_single_doc(d: 'Document'): return d elif not d.blob and d.uri: d.load_uri_to_blob() - d.tags['__loaded_by_CAS__'] = True return d elif d.tensor is not None: return d @@ -439,18 +431,6 @@ def _prepare_rank_doc(d: 'Document', _source: str = 'matches'): setattr(d, _source, [Client._prepare_single_doc(c) for c in _get(d)]) return d - @staticmethod - def _reset_rank_doc(d: 'Document', _source: str = 'matches'): - _get = lambda d: getattr(d, _source) - - if d.tags.pop('__loaded_by_CAS__', False): - d.pop('blob') - - for c in _get(d): - if c.tags.pop('__loaded_by_CAS__', False): - c.pop('blob') - return d - def rank( self, docs: Union['DocumentArray', Iterable['Document']], **kwargs ) -> 'DocumentArray': @@ -474,8 +454,10 @@ def rank( results = DocumentArray() with self._pbar: - parameters = kwargs.pop('parameters', None) + parameters = kwargs.pop('parameters', {}) + parameters['drop_image_content'] = True model_name = parameters.get('model_name', '') if parameters else '' + self._client.post( on=f'/rank/{model_name}'.rstrip('/'), **self._get_rank_payload(docs, results, kwargs), @@ -485,9 +467,6 @@ def rank( parameters=parameters, ) - for r in results: - self._reset_rank_doc(r, _source=kwargs.get('source', 'matches')) - return results async def arank( @@ -507,8 +486,10 @@ async def arank( results = DocumentArray() with self._pbar: - parameters = kwargs.pop('parameters', None) + parameters = kwargs.pop('parameters', {}) + parameters['drop_image_content'] = True model_name = parameters.get('model_name', '') if parameters else '' + async for da in self._async_client.post( on=f'/rank/{model_name}'.rstrip('/'), **self._get_rank_payload(docs, results, kwargs), @@ -528,9 +509,6 @@ async def arank( ), ) - for r in results: - self._reset_rank_doc(r, _source=kwargs.get('source', 'matches')) - return results @overload @@ -581,14 +559,19 @@ def index(self, content, **kwargs): raise TypeError( f'content must be an Iterable of [str, Document], try `.index(["{content}"])` instead' ) + if hasattr(content, '__len__') and len(content) == 0: + return DocumentArray() self._prepare_streaming( not kwargs.get('show_progress'), total=len(content) if hasattr(content, '__len__') else None, ) + results = DocumentArray() with self._pbar: - parameters = kwargs.pop('parameters', None) + parameters = kwargs.pop('parameters', {}) + parameters['drop_image_content'] = True + self._client.post( on='/index', **self._get_post_payload(content, results, kwargs), @@ -598,10 +581,6 @@ def index(self, content, **kwargs): parameters=parameters, ) - for r in results: - if hasattr(r, 'tags') and r.tags.pop('__loaded_by_CAS__', False): - r.pop('blob') - return results @overload @@ -633,17 +612,23 @@ async def aindex(self, content, **kwargs): raise TypeError( f'content must be an Iterable of [str, Document], try `.aindex(["{content}"])` instead' ) + if hasattr(content, '__len__') and len(content) == 0: + return DocumentArray() self._prepare_streaming( not kwargs.get('show_progress'), total=len(content) if hasattr(content, '__len__') else None, ) + results = DocumentArray() with self._pbar: + parameters = kwargs.pop('parameters', {}) + parameters['drop_image_content'] = True + async for da in self._async_client.post( on='/index', **self._get_post_payload(content, results, kwargs), - parameters=kwargs.pop('parameters', None), + parameters=parameters, ): results[da[:, 'id']].embeddings = da.embeddings @@ -659,10 +644,6 @@ async def aindex(self, content, **kwargs): ), ) - for r in results: - if hasattr(r, 'tags') and r.tags.pop('__loaded_by_CAS__', False): - r.pop('blob') - return results @overload @@ -716,15 +697,19 @@ def search(self, content, limit: int = 10, **kwargs) -> 'DocumentArray': raise TypeError( f'content must be an Iterable of [str, Document], try `.search(["{content}"])` instead' ) + if hasattr(content, '__len__') and len(content) == 0: + return DocumentArray() self._prepare_streaming( not kwargs.get('show_progress'), total=len(content) if hasattr(content, '__len__') else None, ) + results = DocumentArray() with self._pbar: parameters = kwargs.pop('parameters', {}) parameters['limit'] = limit + parameters['drop_image_content'] = True self._client.post( on='/search', @@ -735,10 +720,6 @@ def search(self, content, limit: int = 10, **kwargs) -> 'DocumentArray': ), ) - for r in results: - if hasattr(r, 'tags') and r.tags.pop('__loaded_by_CAS__', False): - r.pop('blob') - return results @overload @@ -772,16 +753,19 @@ async def asearch(self, content, limit: int = 10, **kwargs): raise TypeError( f'content must be an Iterable of [str, Document], try `.asearch(["{content}"])` instead' ) + if hasattr(content, '__len__') and len(content) == 0: + return DocumentArray() self._prepare_streaming( not kwargs.get('show_progress'), total=len(content) if hasattr(content, '__len__') else None, ) - results = DocumentArray() + results = DocumentArray() with self._pbar: parameters = kwargs.pop('parameters', {}) parameters['limit'] = limit + parameters['drop_image_content'] = True async for da in self._async_client.post( on='/search', @@ -802,8 +786,4 @@ async def asearch(self, content, limit: int = 10, **kwargs): ), ) - for r in results: - if hasattr(r, 'tags') and r.tags.pop('__loaded_by_CAS__', False): - r.pop('blob') - return results diff --git a/server/clip_server/executors/helper.py b/server/clip_server/executors/helper.py index ba9014b97..c7ac6a555 100644 --- a/server/clip_server/executors/helper.py +++ b/server/clip_server/executors/helper.py @@ -40,8 +40,7 @@ def preproc_image( # recover doc content d.content = content if drop_image_content: - d.pop('blob') - d.pop('tensor') + d.pop('blob', 'tensor') tensors_batch = torch.stack(tensors_batch).type(torch.float32) diff --git a/tests/test_asyncio.py b/tests/test_asyncio.py index 21951575b..cf3618be4 100644 --- a/tests/test_asyncio.py +++ b/tests/test_asyncio.py @@ -46,10 +46,7 @@ async def test_async_docarray_preserve_original_inputs(make_flow, inputs): assert inputs[0] is t2.result()[0] assert t2.result().embeddings.shape assert t2.result().contents == inputs.contents - assert '__created_by_CAS__' not in t2.result()[0].tags - assert '__loaded_by_CAS__' not in t2.result()[0].tags assert not t2.result()[0].tensor - assert not t2.result()[0].blob assert inputs[0] is t2.result()[0] diff --git a/tests/test_client.py b/tests/test_client.py index c15563924..c3bf4511b 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -108,6 +108,22 @@ async def test_client_empty_input(make_torch_flow, inputs): assert isinstance(r, list) assert len(r) == 0 + r = c.index(inputs if not callable(inputs) else inputs()) + assert isinstance(r, DocumentArray) + assert len(r) == 0 + + r = await c.aindex(inputs if not callable(inputs) else inputs()) + assert isinstance(r, DocumentArray) + assert len(r) == 0 + + r = c.search(inputs if not callable(inputs) else inputs()) + assert isinstance(r, DocumentArray) + assert len(r) == 0 + + r = await c.asearch(inputs if not callable(inputs) else inputs()) + assert isinstance(r, DocumentArray) + assert len(r) == 0 + @pytest.mark.asyncio async def test_wrong_input_type(make_torch_flow): diff --git a/tests/test_helper.py b/tests/test_helper.py index a1c98826b..acce92ad2 100644 --- a/tests/test_helper.py +++ b/tests/test_helper.py @@ -85,10 +85,6 @@ def test_split_img_txt_da(inputs): [ DocumentArray( [ - Document( - uri='https://docarray.jina.ai/_static/favicon.png', - tags={'__loaded_by_CAS__': True}, - ).load_uri_to_blob(), Document( uri='https://docarray.jina.ai/_static/favicon.png', ).load_uri_to_blob(), @@ -100,10 +96,8 @@ def test_preproc_image(inputs): from clip_server.model import clip preprocess_fn = clip._transform_ndarray(224) - da, pixel_values = preproc_image(inputs, preprocess_fn) - assert len(da) == 2 - # assert not da[0].blob - assert da[1].blob + da, pixel_values = preproc_image(inputs, preprocess_fn, drop_image_content=True) + assert len(da) == 1 + assert not da[0].blob assert not da[0].tensor - assert not da[1].tensor assert pixel_values.get('pixel_values') is not None diff --git a/tests/test_ranker.py b/tests/test_ranker.py index 60662af1b..70f8165b3 100644 --- a/tests/test_ranker.py +++ b/tests/test_ranker.py @@ -30,14 +30,10 @@ async def test_torch_executor_rank_img2texts(encoder_class): for d in da: for c in d.matches: assert c.scores['clip_score'].value is not None - assert '__loaded_by_CAS__' not in c.tags assert not c.tensor - assert not c.blob org_score = d.matches[:, 'scores__clip_score__value'] assert org_score == list(sorted(org_score, reverse=True)) - assert '__loaded_by_CAS__' not in d.tags assert not d.tensor - assert not d.blob @pytest.mark.asyncio @@ -59,13 +55,10 @@ async def test_torch_executor_rank_text2imgs(encoder_class): for c in d.matches: assert c.scores['clip_score'].value is not None assert c.scores['clip_score_cosine'].value is not None - assert '__loaded_by_CAS__' not in c.tags assert not c.tensor - assert not c.blob np.testing.assert_almost_equal( sum(c.scores['clip_score'].value for c in d.matches), 1 ) - assert '__loaded_by_CAS__' not in d.tags assert not d.tensor assert not d.blob @@ -135,8 +128,6 @@ async def test_torch_executor_rank_text2imgs(encoder_class): def test_docarray_inputs(make_flow, inputs): c = Client(server=f'grpc://0.0.0.0:{make_flow.port}') r = c.rank(inputs if not callable(inputs) else inputs()) - assert '__loaded_by_CAS__' not in r[0].tags - assert not r[0].blob assert not r[0].tensor assert isinstance(r, DocumentArray) rv1 = r['@m', 'scores__clip_score__value'] @@ -200,8 +191,6 @@ def test_docarray_inputs(make_flow, inputs): async def test_async_arank(make_flow, inputs): c = Client(server=f'grpc://0.0.0.0:{make_flow.port}') r = await c.arank(inputs if not callable(inputs) else inputs()) - assert '__loaded_by_CAS__' not in r[0].tags - assert not r[0].blob assert not r[0].tensor assert isinstance(r, DocumentArray) rv = r['@m', 'scores__clip_score__value'] diff --git a/tests/test_simple.py b/tests/test_simple.py index 5648aa8be..dcc8ef2c6 100644 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -76,10 +76,7 @@ def test_docarray_inputs(make_flow, inputs): r = c.encode(inputs if not callable(inputs) else inputs()) assert isinstance(r, DocumentArray) assert r.embeddings.shape - assert '__created_by_CAS__' not in r[0].tags - assert '__loaded_by_CAS__' not in r[0].tags assert not r[0].tensor - assert not r[0].blob if hasattr(inputs, '__len__'): assert inputs[0] is r[0] @@ -107,10 +104,7 @@ def test_docarray_preserve_original_inputs(make_flow, inputs): assert isinstance(r, DocumentArray) assert r.embeddings.shape assert r.contents == inputs.contents - assert '__created_by_CAS__' not in r[0].tags - assert '__loaded_by_CAS__' not in r[0].tags assert not r[0].tensor - assert not r[0].blob assert inputs[0] is r[0] @@ -141,8 +135,6 @@ def test_docarray_traversal(make_flow, inputs): r1 = c.post(on='/', inputs=da, parameters={'traversal_paths': '@c'}) assert isinstance(r1, DocumentArray) assert r1[0].chunks.embeddings.shape[0] == len(inputs) - assert '__created_by_CAS__' not in r1[0].tags - assert '__loaded_by_CAS__' not in r1[0].tags assert not r1[0].tensor assert not r1[0].blob assert not r1[0].chunks[0].tensor @@ -151,8 +143,6 @@ def test_docarray_traversal(make_flow, inputs): r2 = c.post(on='/', inputs=da, parameters={'access_paths': '@c'}) assert isinstance(r2, DocumentArray) assert r2[0].chunks.embeddings.shape[0] == len(inputs) - assert '__created_by_CAS__' not in r2[0].tags - assert '__loaded_by_CAS__' not in r2[0].tags assert not r2[0].tensor assert not r2[0].blob assert not r2[0].chunks[0].tensor From c34c4baa7f09262d16419ceefadf664f440e63e0 Mon Sep 17 00:00:00 2001 From: ZiniuYu Date: Mon, 19 Sep 2022 14:44:44 +0800 Subject: [PATCH 9/9] fix: set drop_image_content default true --- client/clip_client/client.py | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/client/clip_client/client.py b/client/clip_client/client.py index 8e5c7d040..69056818b 100644 --- a/client/clip_client/client.py +++ b/client/clip_client/client.py @@ -287,7 +287,9 @@ def encode(self, content, **kwargs): results = DocumentArray() with self._pbar: parameters = kwargs.pop('parameters', {}) - parameters['drop_image_content'] = True + parameters['drop_image_content'] = parameters.get( + 'drop_image_content', True + ) model_name = parameters.pop('model_name', '') if parameters else '' self._client.post( @@ -342,7 +344,9 @@ async def aencode(self, content, **kwargs): results = DocumentArray() with self._pbar: parameters = kwargs.pop('parameters', {}) - parameters['drop_image_content'] = True + parameters['drop_image_content'] = parameters.get( + 'drop_image_content', True + ) model_name = parameters.get('model_name', '') if parameters else '' async for da in self._async_client.post( @@ -455,7 +459,9 @@ def rank( results = DocumentArray() with self._pbar: parameters = kwargs.pop('parameters', {}) - parameters['drop_image_content'] = True + parameters['drop_image_content'] = parameters.get( + 'drop_image_content', True + ) model_name = parameters.get('model_name', '') if parameters else '' self._client.post( @@ -487,7 +493,9 @@ async def arank( results = DocumentArray() with self._pbar: parameters = kwargs.pop('parameters', {}) - parameters['drop_image_content'] = True + parameters['drop_image_content'] = parameters.get( + 'drop_image_content', True + ) model_name = parameters.get('model_name', '') if parameters else '' async for da in self._async_client.post( @@ -570,7 +578,9 @@ def index(self, content, **kwargs): results = DocumentArray() with self._pbar: parameters = kwargs.pop('parameters', {}) - parameters['drop_image_content'] = True + parameters['drop_image_content'] = parameters.get( + 'drop_image_content', True + ) self._client.post( on='/index', @@ -623,7 +633,9 @@ async def aindex(self, content, **kwargs): results = DocumentArray() with self._pbar: parameters = kwargs.pop('parameters', {}) - parameters['drop_image_content'] = True + parameters['drop_image_content'] = parameters.get( + 'drop_image_content', True + ) async for da in self._async_client.post( on='/index', @@ -709,7 +721,9 @@ def search(self, content, limit: int = 10, **kwargs) -> 'DocumentArray': with self._pbar: parameters = kwargs.pop('parameters', {}) parameters['limit'] = limit - parameters['drop_image_content'] = True + parameters['drop_image_content'] = parameters.get( + 'drop_image_content', True + ) self._client.post( on='/search', @@ -765,7 +779,9 @@ async def asearch(self, content, limit: int = 10, **kwargs): with self._pbar: parameters = kwargs.pop('parameters', {}) parameters['limit'] = limit - parameters['drop_image_content'] = True + parameters['drop_image_content'] = parameters.get( + 'drop_image_content', True + ) async for da in self._async_client.post( on='/search',