Skip to content

Commit

Permalink
feat: add custom tracing spans with jina>=3.12.0 (#861)
Browse files Browse the repository at this point in the history
* feat: add custom tracing spans with jina>=3.12.0

* feat: update jina version to 3.12.0

* fix: revert unnecessary README formatting

* fix: provide default value for tracing_context

* fix: revert README to main branch

* feat: add inference attributes to torchrt and onnx executor
  • Loading branch information
girishc13 authored Dec 2, 2022
1 parent f251539 commit 1eebdd7
Show file tree
Hide file tree
Showing 5 changed files with 302 additions and 186 deletions.
2 changes: 1 addition & 1 deletion client/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
long_description_content_type='text/markdown',
zip_safe=False,
setup_requires=['setuptools>=18.0', 'wheel'],
install_requires=['jina>=3.8.0', 'docarray[common]>=0.16.1', 'packaging'],
install_requires=['jina>=3.12.0', 'docarray[common]>=0.19.0', 'packaging'],
extras_require={
'test': [
'pytest',
Expand Down
136 changes: 86 additions & 50 deletions server/clip_server/executors/clip_onnx.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import os
import warnings
from multiprocessing.pool import ThreadPool
from typing import Optional, Dict
from functools import partial
from multiprocessing.pool import ThreadPool
from typing import Dict, Optional

import onnxruntime as ort
from clip_server.executors.helper import (
split_img_txt_da,
preproc_image,
preproc_text,
set_rank,
split_img_txt_da,
)
from clip_server.model import clip
from clip_server.model.clip_onnx import CLIPOnnxModel
from clip_server.model.tokenization import Tokenizer
from jina import Executor, requests, DocumentArray
from jina import DocumentArray, Executor, requests
from opentelemetry.trace import NoOpTracer, Span


class CLIPEncoder(Executor):
Expand Down Expand Up @@ -51,6 +52,7 @@ def __init__(
)
self._access_paths = kwargs['traversal_paths']

self._num_worker_preprocess = num_worker_preprocess
self._pool = ThreadPool(processes=num_worker_preprocess)

self._model = CLIPOnnxModel(name, model_path)
Expand Down Expand Up @@ -100,24 +102,29 @@ def __init__(

self._model.start_sessions(sess_options=sess_options, providers=providers)

if not self.tracer:
self.tracer = NoOpTracer()

def _preproc_images(self, docs: 'DocumentArray', drop_image_content: bool):
with self.monitor(
name='preprocess_images_seconds',
documentation='images preprocess time in seconds',
):
return preproc_image(
docs,
preprocess_fn=self._image_transform,
return_np=True,
drop_image_content=drop_image_content,
)
with self.tracer.start_as_current_span('preprocess_images'):
return preproc_image(
docs,
preprocess_fn=self._image_transform,
return_np=True,
drop_image_content=drop_image_content,
)

def _preproc_texts(self, docs: 'DocumentArray'):
with self.monitor(
name='preprocess_texts_seconds',
documentation='texts preprocess time in seconds',
):
return preproc_text(docs, tokenizer=self._tokenizer, return_np=True)
with self.tracer.start_as_current_span('preprocess_images'):
return preproc_text(docs, tokenizer=self._tokenizer, return_np=True)

@requests(on='/rank')
async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs):
Expand All @@ -127,44 +134,73 @@ async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs):
set_rank(docs)

@requests
async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs):
access_paths = parameters.get('access_paths', self._access_paths)
if 'traversal_paths' in parameters:
warnings.warn(
f'`traversal_paths` is deprecated. Use `access_paths` instead.'
)
access_paths = parameters['traversal_paths']
_drop_image_content = parameters.get('drop_image_content', False)

_img_da = DocumentArray()
_txt_da = DocumentArray()
for d in docs[access_paths]:
split_img_txt_da(d, _img_da, _txt_da)

# for image
if _img_da:
for minibatch, batch_data in _img_da.map_batch(
partial(self._preproc_images, drop_image_content=_drop_image_content),
batch_size=self._minibatch_size,
pool=self._pool,
):
with self.monitor(
name='encode_images_seconds',
documentation='images encode time in seconds',
):
minibatch.embeddings = self._model.encode_image(batch_data)

# for text
if _txt_da:
for minibatch, batch_data in _txt_da.map_batch(
self._preproc_texts,
batch_size=self._minibatch_size,
pool=self._pool,
):
with self.monitor(
name='encode_texts_seconds',
documentation='texts encode time in seconds',
):
minibatch.embeddings = self._model.encode_text(batch_data)
async def encode(
self,
docs: 'DocumentArray',
tracing_context=None,
parameters: Dict = {},
**kwargs,
):
with self.tracer.start_as_current_span(
'encode', context=tracing_context
) as span:
span.set_attribute('device', self._device)
span.set_attribute('runtime', 'onnx')
access_paths = parameters.get('access_paths', self._access_paths)
if 'traversal_paths' in parameters:
warnings.warn(
f'`traversal_paths` is deprecated. Use `access_paths` instead.'
)
access_paths = parameters['traversal_paths']
_drop_image_content = parameters.get('drop_image_content', False)

_img_da = DocumentArray()
_txt_da = DocumentArray()
for d in docs[access_paths]:
split_img_txt_da(d, _img_da, _txt_da)

with self.tracer.start_as_current_span('inference') as inference_span:
inference_span.set_attribute('drop_image_content', _drop_image_content)
inference_span.set_attribute('minibatch_size', self._minibatch_size)
inference_span.set_attribute('has_img_da', True if _img_da else False)
inference_span.set_attribute('has_txt_da', True if _txt_da else False)
# for image
if _img_da:
with self.tracer.start_as_current_span(
'img_minibatch_encoding'
) as img_encode_span:
for minibatch, batch_data in _img_da.map_batch(
partial(
self._preproc_images,
drop_image_content=_drop_image_content,
),
batch_size=self._minibatch_size,
pool=self._pool,
):
with self.monitor(
name='encode_images_seconds',
documentation='images encode time in seconds',
):
minibatch.embeddings = self._model.encode_image(
batch_data
)

# for text
if _txt_da:
with self.tracer.start_as_current_span(
'txt_minibatch_encoding'
) as txt_encode_span:
for minibatch, batch_data in _txt_da.map_batch(
self._preproc_texts,
batch_size=self._minibatch_size,
pool=self._pool,
):
with self.monitor(
name='encode_texts_seconds',
documentation='texts encode time in seconds',
):
minibatch.embeddings = self._model.encode_text(
batch_data
)

return docs
Loading

0 comments on commit 1eebdd7

Please sign in to comment.