diff --git a/client/clip_client/client.py b/client/clip_client/client.py index 9436f25d4..15cb9b033 100644 --- a/client/clip_client/client.py +++ b/client/clip_client/client.py @@ -132,7 +132,7 @@ def _gather_result(self, r): @property def _unboxed_result(self): - if self._results.embeddings is None: + if self._return_plain and self._results.embeddings is None: raise ValueError( 'empty embedding returned from the server. ' 'This often due to a mis-config of the server, ' @@ -158,14 +158,13 @@ def _iter_doc(self, content) -> Generator['Document', None, None]: else: yield Document(text=c) elif isinstance(c, Document): - if c.content_type in ('text', 'blob'): - self._return_plain = False + self._return_plain = False + if c.content_type in ('text', 'blob', 'tensor'): yield c elif not c.blob and c.uri: c.load_uri_to_blob() - self._return_plain = False yield c - elif c.tensor is not None: + elif len(c.chunks) > 0 or len(c.matches) > 0: yield c else: raise TypeError(f'unsupported input type {c!r} {c.content_type}') @@ -184,10 +183,17 @@ def _iter_doc(self, content) -> Generator['Document', None, None]: ) def _get_post_payload(self, content, kwargs): + parameters = {} + if 'traversal_paths' in kwargs: + parameters['traversal_paths'] = kwargs['traversal_paths'] + if 'batch_size' in kwargs: + parameters['minibatch_size'] = kwargs['batch_size'] + return dict( on='/', inputs=self._iter_doc(content), - request_size=kwargs.get('batch_size', 8), + request_size=kwargs.get('batch_size', 32), + parameters=parameters, total_docs=len(content) if hasattr(content, '__len__') else None, ) diff --git a/server/clip_server/executors/clip_hg.py b/server/clip_server/executors/clip_hg.py index 6fe0fa953..b790caab3 100644 --- a/server/clip_server/executors/clip_hg.py +++ b/server/clip_server/executors/clip_hg.py @@ -22,9 +22,9 @@ def __init__( use_default_preprocessing: bool = True, max_length: int = 77, device: str = 'cpu', - overwrite_embeddings: bool = False, num_worker_preprocess: int = 4, minibatch_size: int = 32, + traversal_paths: str = '@r', *args, **kwargs, ): @@ -52,12 +52,15 @@ def __init__( :param num_worker_preprocess: Number of cpu processes used in preprocessing step. :param minibatch_size: Default batch size for encoding, used if the batch size is not passed as a parameter with the request. + :param traversal_paths: Default traversal paths for encoding, used if + the traversal path is not passed as a parameter with the request. """ super().__init__(*args, **kwargs) self._minibatch_size = minibatch_size self._use_default_preprocessing = use_default_preprocessing self._max_length = max_length + self._traversal_paths = traversal_paths # self.device = device if not device: @@ -110,32 +113,36 @@ def _preproc_images(self, docs: 'DocumentArray'): name='preprocess_images_seconds', documentation='images preprocess time in seconds', ): - tensors_batch = [] + if self._use_default_preprocessing: + tensors_batch = [] - for d in docs: - content = d.content + for d in docs: + content = d.content - if d.blob: - d.convert_blob_to_image_tensor() - elif d.uri: - d.load_uri_to_image_tensor() + if d.blob: + d.convert_blob_to_image_tensor() + elif d.tensor is None and d.uri: + # in case user uses HTTP protocol and send data via curl not using .blob (base64), but in .uri + d.load_uri_to_image_tensor() - tensors_batch.append(d.tensor) + tensors_batch.append(d.tensor) - # recover content - d.content = content + # recover content + d.content = content - if self._use_default_preprocessing: batch_data = self._vision_preprocessor( images=tensors_batch, return_tensors='pt', ) - batch_data = {k: v.to(self._device) for k, v in batch_data.items()} + batch_data = { + k: v.type(torch.float32).to(self._device) + for k, v in batch_data.items() + } else: batch_data = { 'pixel_values': torch.tensor( - tensors_batch, dtype=torch.float32, device=self._device + docs.tensors, dtype=torch.float32, device=self._device ) } @@ -163,7 +170,7 @@ async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs): set_rank(docs) @requests - async def encode(self, docs: DocumentArray, **kwargs): + async def encode(self, docs: DocumentArray, parameters: Dict = {}, **kwargs): """ Encode all documents with `text` or image content using the corresponding CLIP encoder. Store the embeddings in the `embedding` attribute. @@ -181,10 +188,17 @@ async def encode(self, docs: DocumentArray, **kwargs): the CLIP model was trained on images of the size ``224 x 224``, and that they are of the shape ``[3, H, W]`` with ``dtype=float32``. They should also be normalized (values between 0 and 1). + :param parameters: A dictionary that contains parameters to control encoding. + The accepted keys are ``traversal_paths`` and ``minibatch_size`` - in their + absence their corresponding default values are used. """ + + traversal_paths = parameters.get('traversal_paths', self._traversal_paths) + minibatch_size = parameters.get('minibatch_size', self._minibatch_size) + _img_da = DocumentArray() _txt_da = DocumentArray() - for d in docs: + for d in docs[traversal_paths]: split_img_txt_da(d, _img_da, _txt_da) with torch.inference_mode(): @@ -192,7 +206,7 @@ async def encode(self, docs: DocumentArray, **kwargs): if _img_da: for minibatch, batch_data in _img_da.map_batch( self._preproc_images, - batch_size=self._minibatch_size, + batch_size=minibatch_size, pool=self._pool, ): with self.monitor( @@ -210,7 +224,7 @@ async def encode(self, docs: DocumentArray, **kwargs): if _txt_da: for minibatch, batch_data in _txt_da.map_batch( self._preproc_texts, - batch_size=self._minibatch_size, + batch_size=minibatch_size, pool=self._pool, ): with self.monitor( diff --git a/server/clip_server/executors/clip_onnx.py b/server/clip_server/executors/clip_onnx.py index 8c50362ac..deed22328 100644 --- a/server/clip_server/executors/clip_onnx.py +++ b/server/clip_server/executors/clip_onnx.py @@ -21,16 +21,18 @@ def __init__( name: str = 'ViT-B/32', device: Optional[str] = None, num_worker_preprocess: int = 4, - minibatch_size: int = 16, + minibatch_size: int = 32, + traversal_paths: str = '@r', **kwargs, ): super().__init__(**kwargs) + self._minibatch_size = minibatch_size + self._traversal_paths = traversal_paths + self._preprocess_tensor = clip._transform_ndarray(clip.MODEL_SIZE[name]) self._pool = ThreadPool(processes=num_worker_preprocess) - self._minibatch_size = minibatch_size - self._model = CLIPOnnxModel(name) import torch @@ -59,7 +61,7 @@ def __init__( and hasattr(self.runtime_args, 'replicas') ): replicas = getattr(self.runtime_args, 'replicas', 1) - num_threads = max(1, torch.get_num_threads() // replicas) + num_threads = max(1, torch.get_num_threads() * 2 // replicas) if num_threads < 2: warnings.warn( f'Too many replicas ({replicas}) vs too few threads {num_threads} may result in ' @@ -98,55 +100,40 @@ async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs): set_rank(docs) @requests - async def encode(self, docs: 'DocumentArray', **kwargs): + async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs): + + traversal_paths = parameters.get('traversal_paths', self._traversal_paths) + minibatch_size = parameters.get('minibatch_size', self._minibatch_size) + _img_da = DocumentArray() _txt_da = DocumentArray() - for d in docs: + for d in docs[traversal_paths]: split_img_txt_da(d, _img_da, _txt_da) # for image if _img_da: - for minibatch, _contents in _img_da.map_batch( + for minibatch, batch_data in _img_da.map_batch( self._preproc_images, - batch_size=self._minibatch_size, + batch_size=minibatch_size, pool=self._pool, ): with self.monitor( name='encode_images_seconds', documentation='images encode time in seconds', ): - minibatch.embeddings = self._model.encode_image(minibatch.tensors) - - # recover original content - try: - _ = iter(_contents) - for _d, _ct in zip(minibatch, _contents): - _d.content = _ct - except TypeError: - pass + minibatch.embeddings = self._model.encode_image(batch_data) - # for text + # for text if _txt_da: - for minibatch, _contents in _txt_da.map_batch( + for minibatch, batch_data in _txt_da.map_batch( self._preproc_texts, - batch_size=self._minibatch_size, + batch_size=minibatch_size, pool=self._pool, ): with self.monitor( name='encode_texts_seconds', documentation='texts encode time in seconds', ): - minibatch.embeddings = self._model.encode_text(minibatch.tensors) - - # recover original content - try: - _ = iter(_contents) - for _d, _ct in zip(minibatch, _contents): - _d.content = _ct - except TypeError: - pass - - # drop tensors - docs.tensors = None + minibatch.embeddings = self._model.encode_text(batch_data) return docs diff --git a/server/clip_server/executors/clip_tensorrt.py b/server/clip_server/executors/clip_tensorrt.py index a97c5385d..5dc9af251 100644 --- a/server/clip_server/executors/clip_tensorrt.py +++ b/server/clip_server/executors/clip_tensorrt.py @@ -19,7 +19,8 @@ def __init__( name: str = 'ViT-B/32', device: str = 'cuda', num_worker_preprocess: int = 4, - minibatch_size: int = 64, + minibatch_size: int = 32, + traversal_paths: str = '@r', **kwargs, ): super().__init__(**kwargs) @@ -28,6 +29,8 @@ def __init__( self._pool = ThreadPool(processes=num_worker_preprocess) self._minibatch_size = minibatch_size + self._traversal_paths = traversal_paths + self._device = device import torch @@ -71,17 +74,20 @@ async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs): set_rank(docs) @requests - async def encode(self, docs: 'DocumentArray', **kwargs): + async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs): + traversal_paths = parameters.get('traversal_paths', self._traversal_paths) + minibatch_size = parameters.get('minibatch_size', self._minibatch_size) + _img_da = DocumentArray() _txt_da = DocumentArray() - for d in docs: + for d in docs[traversal_paths]: split_img_txt_da(d, _img_da, _txt_da) # for image if _img_da: - for minibatch, _contents in _img_da.map_batch( + for minibatch, batch_data in _img_da.map_batch( self._preproc_images, - batch_size=self._minibatch_size, + batch_size=minibatch_size, pool=self._pool, ): with self.monitor( @@ -89,26 +95,18 @@ async def encode(self, docs: 'DocumentArray', **kwargs): documentation='images encode time in seconds', ): minibatch.embeddings = ( - self._model.encode_image(minibatch.tensors) + self._model.encode_image(batch_data) .detach() .cpu() .numpy() .astype(np.float32) ) - # recover original content - try: - _ = iter(_contents) - for _d, _ct in zip(minibatch, _contents): - _d.content = _ct - except TypeError: - pass - # for text if _txt_da: - for minibatch, _contents in _txt_da.map_batch( + for minibatch, batch_data in _txt_da.map_batch( self._preproc_texts, - batch_size=self._minibatch_size, + batch_size=minibatch_size, pool=self._pool, ): with self.monitor( @@ -116,22 +114,11 @@ async def encode(self, docs: 'DocumentArray', **kwargs): documentation='texts encode time in seconds', ): minibatch.embeddings = ( - self._model.encode_text(minibatch.tensors) + self._model.encode_text(batch_data) .detach() .cpu() .numpy() .astype(np.float32) ) - # recover original content - try: - _ = iter(_contents) - for _d, _ct in zip(minibatch, _contents): - _d.content = _ct - except TypeError: - pass - - # drop tensors - docs.tensors = None - return docs diff --git a/server/clip_server/executors/clip_torch.py b/server/clip_server/executors/clip_torch.py index c79af2f00..a4701004a 100644 --- a/server/clip_server/executors/clip_torch.py +++ b/server/clip_server/executors/clip_torch.py @@ -22,11 +22,15 @@ def __init__( device: Optional[str] = None, jit: bool = False, num_worker_preprocess: int = 4, - minibatch_size: int = 16, + minibatch_size: int = 32, + traversal_paths: str = '@r', **kwargs, ): super().__init__(**kwargs) + self._minibatch_size = minibatch_size + self._traversal_paths = traversal_paths + if not device: self._device = 'cuda' if torch.cuda.is_available() else 'cpu' else: @@ -50,7 +54,6 @@ def __init__( torch.set_num_threads(max(num_threads, 1)) torch.set_num_interop_threads(1) - self._minibatch_size = minibatch_size self._model, self._preprocess_tensor = clip.load( name, device=self._device, jit=jit ) @@ -83,18 +86,21 @@ async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs): set_rank(docs) @requests - async def encode(self, docs: 'DocumentArray', **kwargs): + async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs): + traversal_paths = parameters.get('traversal_paths', self._traversal_paths) + minibatch_size = parameters.get('minibatch_size', self._minibatch_size) + _img_da = DocumentArray() _txt_da = DocumentArray() - for d in docs: + for d in docs[traversal_paths]: split_img_txt_da(d, _img_da, _txt_da) with torch.inference_mode(): # for image if _img_da: - for minibatch, _contents in _img_da.map_batch( + for minibatch, batch_data in _img_da.map_batch( self._preproc_images, - batch_size=self._minibatch_size, + batch_size=minibatch_size, pool=self._pool, ): with self.monitor( @@ -102,25 +108,17 @@ async def encode(self, docs: 'DocumentArray', **kwargs): documentation='images encode time in seconds', ): minibatch.embeddings = ( - self._model.encode_image(minibatch.tensors) + self._model.encode_image(batch_data) .cpu() .numpy() .astype(np.float32) ) - # recover original content - try: - _ = iter(_contents) - for _d, _ct in zip(minibatch, _contents): - _d.content = _ct - except TypeError: - pass - # for text if _txt_da: - for minibatch, _contents in _txt_da.map_batch( + for minibatch, batch_data in _txt_da.map_batch( self._preproc_texts, - batch_size=self._minibatch_size, + batch_size=minibatch_size, pool=self._pool, ): with self.monitor( @@ -128,20 +126,10 @@ async def encode(self, docs: 'DocumentArray', **kwargs): documentation='texts encode time in seconds', ): minibatch.embeddings = ( - self._model.encode_text(minibatch.tensors) + self._model.encode_text(batch_data) .cpu() .numpy() .astype(np.float32) ) - # recover original content - try: - _ = iter(_contents) - for _d, _ct in zip(minibatch, _contents): - _d.content = _ct - except TypeError: - pass - - # drop tensors - docs.tensors = None return docs diff --git a/server/clip_server/executors/helper.py b/server/clip_server/executors/helper.py index 023861462..4e1ddecb3 100644 --- a/server/clip_server/executors/helper.py +++ b/server/clip_server/executors/helper.py @@ -1,5 +1,5 @@ from typing import Tuple, List, Callable, Any - +import torch import numpy as np from docarray import Document, DocumentArray from docarray.math.distance.numpy import cosine @@ -21,39 +21,46 @@ def preproc_image( device: str = 'cpu', return_np: bool = False, ) -> Tuple['DocumentArray', List[Any]]: - contents = da.contents + + tensors_batch = [] for d in da: + content = d.content + if d.blob: d.convert_blob_to_image_tensor() elif d.tensor is None and d.uri: # in case user uses HTTP protocol and send data via curl not using .blob (base64), but in .uri d.load_uri_to_image_tensor() - d.tensor = preprocess_fn(d.tensor).detach() + tensors_batch.append(preprocess_fn(d.tensor).detach()) + + # recover doc content + d.content = content + + tensors_batch = torch.stack(tensors_batch).type(torch.float32) if return_np: - da.tensors = da.tensors.cpu().numpy().astype(np.float32) + tensors_batch = tensors_batch.cpu().numpy() else: - da.tensors = da.tensors.to(device) + tensors_batch = tensors_batch.to(device) - return da, contents + return da, tensors_batch def preproc_text( da: 'DocumentArray', device: str = 'cpu', return_np: bool = False ) -> Tuple['DocumentArray', List[Any]]: - contents = da.contents - da.tensors = clip.tokenize(contents).detach() + tensors_batch = clip.tokenize(da.texts).detach() if return_np: - da.tensors = da.tensors.cpu().numpy().astype(np.int64) + tensors_batch = tensors_batch.cpu().numpy().astype(np.int64) else: - da.tensors = da.tensors.to(device) + tensors_batch = tensors_batch.to(device) da[:, 'mime_type'] = 'text' - return da, contents + return da, tensors_batch def split_img_txt_da(doc: 'Document', img_da: 'DocumentArray', txt_da: 'DocumentArray'): diff --git a/tests/test_hg.py b/tests/test_hg.py index 0e939a8fc..b35858d49 100644 --- a/tests/test_hg.py +++ b/tests/test_hg.py @@ -22,6 +22,4 @@ def test_batch_no_preprocessing(make_hg_flow_no_default, inputs, port_generator): c = Client(server=f'grpc://0.0.0.0:{make_hg_flow_no_default.port}') r = c.encode(inputs if not callable(inputs) else inputs()) - assert len(r) == 2 - assert r[0].shape == (512,) - assert r[0].dtype == np.float32 + assert r.embeddings.shape == (len(r), 512) diff --git a/tests/test_simple.py b/tests/test_simple.py index 10ccbef8f..e960075d8 100644 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -102,3 +102,29 @@ def test_docarray_preserve_original_inputs(make_flow, inputs, port_generator): assert isinstance(r, DocumentArray) assert r.embeddings.shape assert r.contents == inputs.contents + + +@pytest.mark.parametrize( + 'inputs', + [ + DocumentArray([Document(text='hello, world'), Document(text='goodbye, world')]), + DocumentArray( + [ + Document( + uri='https://docarray.jina.ai/_static/favicon.png', + text='hello, world', + ), + ] + ), + DocumentArray.from_files( + f'{os.path.dirname(os.path.abspath(__file__))}/**/*.jpg' + ), + ], +) +def test_docarray_traversal(make_flow, inputs, port_generator): + da = DocumentArray.empty(1) + da[0].chunks = inputs + + c = Client(server=f'grpc://0.0.0.0:{make_flow.port}') + r = c.encode(da, traversal_paths='@c') + assert r[0].chunks.embeddings.shape[0] == len(inputs)