diff --git a/client/setup.py b/client/setup.py index bc89240b0..6a15108c1 100644 --- a/client/setup.py +++ b/client/setup.py @@ -41,7 +41,7 @@ long_description_content_type='text/markdown', zip_safe=False, setup_requires=['setuptools>=18.0', 'wheel'], - install_requires=['jina>=3.8.0', 'docarray[common]>=0.16.1', 'packaging'], + install_requires=['jina>=3.12.0', 'docarray[common]>=0.19.0', 'packaging'], extras_require={ 'test': [ 'pytest', diff --git a/server/clip_server/executors/clip_onnx.py b/server/clip_server/executors/clip_onnx.py index 9dff2ff21..3262f0a8d 100644 --- a/server/clip_server/executors/clip_onnx.py +++ b/server/clip_server/executors/clip_onnx.py @@ -1,20 +1,21 @@ import os import warnings -from multiprocessing.pool import ThreadPool -from typing import Optional, Dict from functools import partial +from multiprocessing.pool import ThreadPool +from typing import Dict, Optional import onnxruntime as ort from clip_server.executors.helper import ( - split_img_txt_da, preproc_image, preproc_text, set_rank, + split_img_txt_da, ) from clip_server.model import clip from clip_server.model.clip_onnx import CLIPOnnxModel from clip_server.model.tokenization import Tokenizer -from jina import Executor, requests, DocumentArray +from jina import DocumentArray, Executor, requests +from opentelemetry.trace import NoOpTracer, Span class CLIPEncoder(Executor): @@ -51,6 +52,7 @@ def __init__( ) self._access_paths = kwargs['traversal_paths'] + self._num_worker_preprocess = num_worker_preprocess self._pool = ThreadPool(processes=num_worker_preprocess) self._model = CLIPOnnxModel(name, model_path) @@ -100,24 +102,29 @@ def __init__( self._model.start_sessions(sess_options=sess_options, providers=providers) + if not self.tracer: + self.tracer = NoOpTracer() + def _preproc_images(self, docs: 'DocumentArray', drop_image_content: bool): with self.monitor( name='preprocess_images_seconds', documentation='images preprocess time in seconds', ): - return preproc_image( - docs, - preprocess_fn=self._image_transform, - return_np=True, - drop_image_content=drop_image_content, - ) + with self.tracer.start_as_current_span('preprocess_images'): + return preproc_image( + docs, + preprocess_fn=self._image_transform, + return_np=True, + drop_image_content=drop_image_content, + ) def _preproc_texts(self, docs: 'DocumentArray'): with self.monitor( name='preprocess_texts_seconds', documentation='texts preprocess time in seconds', ): - return preproc_text(docs, tokenizer=self._tokenizer, return_np=True) + with self.tracer.start_as_current_span('preprocess_images'): + return preproc_text(docs, tokenizer=self._tokenizer, return_np=True) @requests(on='/rank') async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs): @@ -127,44 +134,73 @@ async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs): set_rank(docs) @requests - async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs): - access_paths = parameters.get('access_paths', self._access_paths) - if 'traversal_paths' in parameters: - warnings.warn( - f'`traversal_paths` is deprecated. Use `access_paths` instead.' - ) - access_paths = parameters['traversal_paths'] - _drop_image_content = parameters.get('drop_image_content', False) - - _img_da = DocumentArray() - _txt_da = DocumentArray() - for d in docs[access_paths]: - split_img_txt_da(d, _img_da, _txt_da) - - # for image - if _img_da: - for minibatch, batch_data in _img_da.map_batch( - partial(self._preproc_images, drop_image_content=_drop_image_content), - batch_size=self._minibatch_size, - pool=self._pool, - ): - with self.monitor( - name='encode_images_seconds', - documentation='images encode time in seconds', - ): - minibatch.embeddings = self._model.encode_image(batch_data) - - # for text - if _txt_da: - for minibatch, batch_data in _txt_da.map_batch( - self._preproc_texts, - batch_size=self._minibatch_size, - pool=self._pool, - ): - with self.monitor( - name='encode_texts_seconds', - documentation='texts encode time in seconds', - ): - minibatch.embeddings = self._model.encode_text(batch_data) + async def encode( + self, + docs: 'DocumentArray', + tracing_context=None, + parameters: Dict = {}, + **kwargs, + ): + with self.tracer.start_as_current_span( + 'encode', context=tracing_context + ) as span: + span.set_attribute('device', self._device) + span.set_attribute('runtime', 'onnx') + access_paths = parameters.get('access_paths', self._access_paths) + if 'traversal_paths' in parameters: + warnings.warn( + f'`traversal_paths` is deprecated. Use `access_paths` instead.' + ) + access_paths = parameters['traversal_paths'] + _drop_image_content = parameters.get('drop_image_content', False) + + _img_da = DocumentArray() + _txt_da = DocumentArray() + for d in docs[access_paths]: + split_img_txt_da(d, _img_da, _txt_da) + + with self.tracer.start_as_current_span('inference') as inference_span: + inference_span.set_attribute('drop_image_content', _drop_image_content) + inference_span.set_attribute('minibatch_size', self._minibatch_size) + inference_span.set_attribute('has_img_da', True if _img_da else False) + inference_span.set_attribute('has_txt_da', True if _txt_da else False) + # for image + if _img_da: + with self.tracer.start_as_current_span( + 'img_minibatch_encoding' + ) as img_encode_span: + for minibatch, batch_data in _img_da.map_batch( + partial( + self._preproc_images, + drop_image_content=_drop_image_content, + ), + batch_size=self._minibatch_size, + pool=self._pool, + ): + with self.monitor( + name='encode_images_seconds', + documentation='images encode time in seconds', + ): + minibatch.embeddings = self._model.encode_image( + batch_data + ) + + # for text + if _txt_da: + with self.tracer.start_as_current_span( + 'txt_minibatch_encoding' + ) as txt_encode_span: + for minibatch, batch_data in _txt_da.map_batch( + self._preproc_texts, + batch_size=self._minibatch_size, + pool=self._pool, + ): + with self.monitor( + name='encode_texts_seconds', + documentation='texts encode time in seconds', + ): + minibatch.embeddings = self._model.encode_text( + batch_data + ) return docs diff --git a/server/clip_server/executors/clip_tensorrt.py b/server/clip_server/executors/clip_tensorrt.py index 24a9a6f7b..62176fe4c 100644 --- a/server/clip_server/executors/clip_tensorrt.py +++ b/server/clip_server/executors/clip_tensorrt.py @@ -1,19 +1,20 @@ import warnings -from multiprocessing.pool import ThreadPool -from typing import Optional, Dict from functools import partial +from multiprocessing.pool import ThreadPool +from typing import Dict, Optional import numpy as np from clip_server.executors.helper import ( - split_img_txt_da, preproc_image, preproc_text, set_rank, + split_img_txt_da, ) from clip_server.model import clip -from clip_server.model.tokenization import Tokenizer from clip_server.model.clip_trt import CLIPTensorRTModel -from jina import Executor, requests, DocumentArray +from clip_server.model.tokenization import Tokenizer +from jina import DocumentArray, Executor, requests +from opentelemetry.trace import NoOpTracer, Span class CLIPEncoder(Executor): @@ -38,6 +39,7 @@ def __init__( """ super().__init__(**kwargs) + self._num_worker_preprocess = num_worker_preprocess self._pool = ThreadPool(processes=num_worker_preprocess) self._minibatch_size = minibatch_size @@ -68,27 +70,35 @@ def __init__( self._tokenizer = Tokenizer(name) self._image_transform = clip._transform_ndarray(self._model.image_size) + if not self.tracer: + self.tracer = NoOpTracer() + def _preproc_images(self, docs: 'DocumentArray', drop_image_content: bool): with self.monitor( name='preprocess_images_seconds', documentation='images preprocess time in seconds', ): - return preproc_image( - docs, - preprocess_fn=self._image_transform, - device=self._device, - return_np=False, - drop_image_content=drop_image_content, - ) + with self.tracer.start_as_current_span('preprocess_images'): + return preproc_image( + docs, + preprocess_fn=self._image_transform, + device=self._device, + return_np=False, + drop_image_content=drop_image_content, + ) def _preproc_texts(self, docs: 'DocumentArray'): with self.monitor( name='preprocess_texts_seconds', documentation='texts preprocess time in seconds', ): - return preproc_text( - docs, tokenizer=self._tokenizer, device=self._device, return_np=False - ) + with self.tracer.start_as_current_span('preprocess_images'): + return preproc_text( + docs, + tokenizer=self._tokenizer, + device=self._device, + return_np=False, + ) @requests(on='/rank') async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs): @@ -98,56 +108,81 @@ async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs): set_rank(docs) @requests - async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs): - access_paths = parameters.get('access_paths', self._access_paths) - if 'traversal_paths' in parameters: - warnings.warn( - f'`traversal_paths` is deprecated. Use `access_paths` instead.' - ) - access_paths = parameters['traversal_paths'] - _drop_image_content = parameters.get('drop_image_content', False) - - _img_da = DocumentArray() - _txt_da = DocumentArray() - for d in docs[access_paths]: - split_img_txt_da(d, _img_da, _txt_da) - - # for image - if _img_da: - for minibatch, batch_data in _img_da.map_batch( - partial(self._preproc_images, drop_image_content=_drop_image_content), - batch_size=self._minibatch_size, - pool=self._pool, - ): - with self.monitor( - name='encode_images_seconds', - documentation='images encode time in seconds', - ): - minibatch.embeddings = ( - self._model.encode_image(batch_data) - .detach() - .cpu() - .numpy() - .astype(np.float32) - ) - - # for text - if _txt_da: - for minibatch, batch_data in _txt_da.map_batch( - self._preproc_texts, - batch_size=self._minibatch_size, - pool=self._pool, - ): - with self.monitor( - name='encode_texts_seconds', - documentation='texts encode time in seconds', - ): - minibatch.embeddings = ( - self._model.encode_text(batch_data) - .detach() - .cpu() - .numpy() - .astype(np.float32) - ) + async def encode( + self, + docs: 'DocumentArray', + tracing_context=None, + parameters: Dict = {}, + **kwargs, + ): + with self.tracer.start_as_current_span( + 'encode', context=tracing_context + ) as span: + span.set_attribute('device', self._device) + span.set_attribute('runtime', 'tensorrt') + access_paths = parameters.get('access_paths', self._access_paths) + if 'traversal_paths' in parameters: + warnings.warn( + f'`traversal_paths` is deprecated. Use `access_paths` instead.' + ) + access_paths = parameters['traversal_paths'] + _drop_image_content = parameters.get('drop_image_content', False) + + _img_da = DocumentArray() + _txt_da = DocumentArray() + for d in docs[access_paths]: + split_img_txt_da(d, _img_da, _txt_da) + + with self.tracer.start_as_current_span('inference') as inference_span: + inference_span.set_attribute('drop_image_content', _drop_image_content) + inference_span.set_attribute('minibatch_size', self._minibatch_size) + inference_span.set_attribute('has_img_da', True if _img_da else False) + inference_span.set_attribute('has_txt_da', True if _txt_da else False) + # for image + if _img_da: + with self.tracer.start_as_current_span( + 'img_minibatch_encoding' + ) as img_encode_span: + for minibatch, batch_data in _img_da.map_batch( + partial( + self._preproc_images, + drop_image_content=_drop_image_content, + ), + batch_size=self._minibatch_size, + pool=self._pool, + ): + with self.monitor( + name='encode_images_seconds', + documentation='images encode time in seconds', + ): + minibatch.embeddings = ( + self._model.encode_image(batch_data) + .detach() + .cpu() + .numpy() + .astype(np.float32) + ) + + # for text + if _txt_da: + with self.tracer.start_as_current_span( + 'txt_minibatch_encoding' + ) as txt_encode_span: + for minibatch, batch_data in _txt_da.map_batch( + self._preproc_texts, + batch_size=self._minibatch_size, + pool=self._pool, + ): + with self.monitor( + name='encode_texts_seconds', + documentation='texts encode time in seconds', + ): + minibatch.embeddings = ( + self._model.encode_text(batch_data) + .detach() + .cpu() + .numpy() + .astype(np.float32) + ) return docs diff --git a/server/clip_server/executors/clip_torch.py b/server/clip_server/executors/clip_torch.py index d953cebc7..8b740d526 100644 --- a/server/clip_server/executors/clip_torch.py +++ b/server/clip_server/executors/clip_torch.py @@ -1,21 +1,22 @@ import os import warnings -from multiprocessing.pool import ThreadPool -from typing import Optional, Dict from functools import partial +from multiprocessing.pool import ThreadPool +from typing import Dict, Optional import numpy as np import torch from clip_server.executors.helper import ( - split_img_txt_da, preproc_image, preproc_text, set_rank, + split_img_txt_da, ) from clip_server.model import clip from clip_server.model.clip_model import CLIPModel from clip_server.model.tokenization import Tokenizer -from jina import Executor, requests, DocumentArray +from jina import DocumentArray, Executor, requests +from opentelemetry.trace import NoOpTracer, Span class CLIPEncoder(Executor): @@ -72,33 +73,43 @@ def __init__( # For more details, please see https://pytorch.org/docs/stable/generated/torch.set_num_threads.html torch.set_num_threads(max(num_threads, 1)) torch.set_num_interop_threads(1) + + self._num_worker_preprocess = num_worker_preprocess self._pool = ThreadPool(processes=num_worker_preprocess) self._model = CLIPModel(name, device=self._device, jit=jit, **kwargs) self._tokenizer = Tokenizer(name) self._image_transform = clip._transform_ndarray(self._model.image_size) + if not self.tracer: + self.tracer = NoOpTracer() + def _preproc_images(self, docs: 'DocumentArray', drop_image_content: bool): with self.monitor( name='preprocess_images_seconds', documentation='images preprocess time in seconds', ): - return preproc_image( - docs, - preprocess_fn=self._image_transform, - device=self._device, - return_np=False, - drop_image_content=drop_image_content, - ) + with self.tracer.start_as_current_span('preprocess_images'): + return preproc_image( + docs, + preprocess_fn=self._image_transform, + device=self._device, + return_np=False, + drop_image_content=drop_image_content, + ) def _preproc_texts(self, docs: 'DocumentArray'): with self.monitor( name='preprocess_texts_seconds', documentation='texts preprocess time in seconds', ): - return preproc_text( - docs, tokenizer=self._tokenizer, device=self._device, return_np=False - ) + with self.tracer.start_as_current_span('preprocess_images'): + return preproc_text( + docs, + tokenizer=self._tokenizer, + device=self._device, + return_np=False, + ) @requests(on='/rank') async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs): @@ -108,57 +119,92 @@ async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs): set_rank(docs) @requests - async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs): - access_paths = parameters.get('access_paths', self._access_paths) - if 'traversal_paths' in parameters: - warnings.warn( - f'`traversal_paths` is deprecated. Use `access_paths` instead.' - ) - access_paths = parameters['traversal_paths'] - _drop_image_content = parameters.get('drop_image_content', False) - - _img_da = DocumentArray() - _txt_da = DocumentArray() - for d in docs[access_paths]: - split_img_txt_da(d, _img_da, _txt_da) - - with torch.inference_mode(): - # for image - if _img_da: - for minibatch, batch_data in _img_da.map_batch( - partial( - self._preproc_images, drop_image_content=_drop_image_content - ), - batch_size=self._minibatch_size, - pool=self._pool, - ): - with self.monitor( - name='encode_images_seconds', - documentation='images encode time in seconds', - ): - minibatch.embeddings = ( - self._model.encode_image(**batch_data) - .cpu() - .numpy() - .astype(np.float32) - ) - - # for text - if _txt_da: - for minibatch, batch_data in _txt_da.map_batch( - self._preproc_texts, - batch_size=self._minibatch_size, - pool=self._pool, - ): - with self.monitor( - name='encode_texts_seconds', - documentation='texts encode time in seconds', - ): - minibatch.embeddings = ( - self._model.encode_text(**batch_data) - .cpu() - .numpy() - .astype(np.float32) - ) + async def encode( + self, + docs: 'DocumentArray', + tracing_context=None, + parameters: Dict = {}, + **kwargs, + ): + with self.tracer.start_as_current_span( + 'encode', context=tracing_context + ) as span: + span.set_attribute('device', self._device) + span.set_attribute('runtime', 'torch') + access_paths = parameters.get('access_paths', self._access_paths) + if 'traversal_paths' in parameters: + warnings.warn( + f'`traversal_paths` is deprecated. Use `access_paths` instead.' + ) + access_paths = parameters['traversal_paths'] + _drop_image_content = parameters.get('drop_image_content', False) + + _img_da = DocumentArray() + _txt_da = DocumentArray() + for d in docs[access_paths]: + split_img_txt_da(d, _img_da, _txt_da) + + with self.tracer.start_as_current_span('inference') as inference_span: + with torch.inference_mode(): + inference_span.set_attribute( + 'drop_image_content', _drop_image_content + ) + inference_span.set_attribute('minibatch_size', self._minibatch_size) + inference_span.set_attribute( + 'has_img_da', True if _img_da else False + ) + inference_span.set_attribute( + 'has_txt_da', True if _txt_da else False + ) + # for image + if _img_da: + with self.tracer.start_as_current_span( + 'img_minibatch_encoding' + ) as img_encode_span: + img_encode_span.set_attribute( + 'num_pool_workers', self._num_worker_preprocess + ) + for minibatch, batch_data in _img_da.map_batch( + partial( + self._preproc_images, + drop_image_content=_drop_image_content, + ), + batch_size=self._minibatch_size, + pool=self._pool, + ): + with self.monitor( + name='encode_images_seconds', + documentation='images encode time in seconds', + ): + minibatch.embeddings = ( + self._model.encode_image(**batch_data) + .cpu() + .numpy() + .astype(np.float32) + ) + + # for text + if _txt_da: + with self.tracer.start_as_current_span( + 'txt_minibatch_encoding' + ) as txt_encode_span: + txt_encode_span.set_attribute( + 'num_pool_workers', self._num_worker_preprocess + ) + for minibatch, batch_data in _txt_da.map_batch( + self._preproc_texts, + batch_size=self._minibatch_size, + pool=self._pool, + ): + with self.monitor( + name='encode_texts_seconds', + documentation='texts encode time in seconds', + ): + minibatch.embeddings = ( + self._model.encode_text(**batch_data) + .cpu() + .numpy() + .astype(np.float32) + ) return docs diff --git a/server/setup.py b/server/setup.py index dfbf20a3e..f4a820706 100644 --- a/server/setup.py +++ b/server/setup.py @@ -1,8 +1,7 @@ import sys from os import path -from setuptools import find_packages -from setuptools import setup +from setuptools import find_packages, setup if sys.version_info < (3, 7, 0): raise OSError(f'CLIP-as-service requires Python >=3.7, but yours is {sys.version}') @@ -46,7 +45,7 @@ 'torch', 'regex', 'torchvision<=0.13.0' if sys.version_info <= (3, 7, 2) else 'torchvision', - 'jina>=3.8.0', + 'jina>=3.12.0', 'prometheus-client', 'open_clip_torch>=1.3.0', ],