Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add custom tracing spans with jina>=3.12.0 #861

Merged
merged 6 commits into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
long_description_content_type='text/markdown',
zip_safe=False,
setup_requires=['setuptools>=18.0', 'wheel'],
install_requires=['jina>=3.8.0', 'docarray[common]>=0.16.1', 'packaging'],
install_requires=['jina>=3.12.0', 'docarray[common]>=0.19.0', 'packaging'],
extras_require={
'test': [
'pytest',
Expand Down
136 changes: 86 additions & 50 deletions server/clip_server/executors/clip_onnx.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import os
import warnings
from multiprocessing.pool import ThreadPool
from typing import Optional, Dict
from functools import partial
from multiprocessing.pool import ThreadPool
from typing import Dict, Optional

import onnxruntime as ort
from clip_server.executors.helper import (
split_img_txt_da,
preproc_image,
preproc_text,
set_rank,
split_img_txt_da,
)
from clip_server.model import clip
from clip_server.model.clip_onnx import CLIPOnnxModel
from clip_server.model.tokenization import Tokenizer
from jina import Executor, requests, DocumentArray
from jina import DocumentArray, Executor, requests
from opentelemetry.trace import NoOpTracer, Span


class CLIPEncoder(Executor):
Expand Down Expand Up @@ -51,6 +52,7 @@ def __init__(
)
self._access_paths = kwargs['traversal_paths']

self._num_worker_preprocess = num_worker_preprocess
self._pool = ThreadPool(processes=num_worker_preprocess)

self._model = CLIPOnnxModel(name, model_path)
Expand Down Expand Up @@ -100,24 +102,29 @@ def __init__(

self._model.start_sessions(sess_options=sess_options, providers=providers)

if not self.tracer:
self.tracer = NoOpTracer()

def _preproc_images(self, docs: 'DocumentArray', drop_image_content: bool):
with self.monitor(
name='preprocess_images_seconds',
documentation='images preprocess time in seconds',
):
return preproc_image(
docs,
preprocess_fn=self._image_transform,
return_np=True,
drop_image_content=drop_image_content,
)
with self.tracer.start_as_current_span('preprocess_images'):
return preproc_image(
docs,
preprocess_fn=self._image_transform,
return_np=True,
drop_image_content=drop_image_content,
)

def _preproc_texts(self, docs: 'DocumentArray'):
with self.monitor(
name='preprocess_texts_seconds',
documentation='texts preprocess time in seconds',
):
return preproc_text(docs, tokenizer=self._tokenizer, return_np=True)
with self.tracer.start_as_current_span('preprocess_images'):
return preproc_text(docs, tokenizer=self._tokenizer, return_np=True)

@requests(on='/rank')
async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs):
Expand All @@ -127,44 +134,73 @@ async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs):
set_rank(docs)

@requests
async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs):
access_paths = parameters.get('access_paths', self._access_paths)
if 'traversal_paths' in parameters:
warnings.warn(
f'`traversal_paths` is deprecated. Use `access_paths` instead.'
)
access_paths = parameters['traversal_paths']
_drop_image_content = parameters.get('drop_image_content', False)

_img_da = DocumentArray()
_txt_da = DocumentArray()
for d in docs[access_paths]:
split_img_txt_da(d, _img_da, _txt_da)

# for image
if _img_da:
for minibatch, batch_data in _img_da.map_batch(
partial(self._preproc_images, drop_image_content=_drop_image_content),
batch_size=self._minibatch_size,
pool=self._pool,
):
with self.monitor(
name='encode_images_seconds',
documentation='images encode time in seconds',
):
minibatch.embeddings = self._model.encode_image(batch_data)

# for text
if _txt_da:
for minibatch, batch_data in _txt_da.map_batch(
self._preproc_texts,
batch_size=self._minibatch_size,
pool=self._pool,
):
with self.monitor(
name='encode_texts_seconds',
documentation='texts encode time in seconds',
):
minibatch.embeddings = self._model.encode_text(batch_data)
async def encode(
self,
docs: 'DocumentArray',
tracing_context=None,
parameters: Dict = {},
**kwargs,
):
with self.tracer.start_as_current_span(
'encode', context=tracing_context
) as span:
span.set_attribute('device', self._device)
span.set_attribute('runtime', 'onnx')
access_paths = parameters.get('access_paths', self._access_paths)
if 'traversal_paths' in parameters:
warnings.warn(
f'`traversal_paths` is deprecated. Use `access_paths` instead.'
)
access_paths = parameters['traversal_paths']
_drop_image_content = parameters.get('drop_image_content', False)

_img_da = DocumentArray()
_txt_da = DocumentArray()
for d in docs[access_paths]:
split_img_txt_da(d, _img_da, _txt_da)

with self.tracer.start_as_current_span('inference') as inference_span:
inference_span.set_attribute('drop_image_content', _drop_image_content)
inference_span.set_attribute('minibatch_size', self._minibatch_size)
inference_span.set_attribute('has_img_da', True if _img_da else False)
inference_span.set_attribute('has_txt_da', True if _txt_da else False)
# for image
if _img_da:
with self.tracer.start_as_current_span(
'img_minibatch_encoding'
) as img_encode_span:
for minibatch, batch_data in _img_da.map_batch(
partial(
self._preproc_images,
drop_image_content=_drop_image_content,
),
batch_size=self._minibatch_size,
pool=self._pool,
):
with self.monitor(
name='encode_images_seconds',
documentation='images encode time in seconds',
):
minibatch.embeddings = self._model.encode_image(
batch_data
)

# for text
if _txt_da:
with self.tracer.start_as_current_span(
'txt_minibatch_encoding'
) as txt_encode_span:
for minibatch, batch_data in _txt_da.map_batch(
self._preproc_texts,
batch_size=self._minibatch_size,
pool=self._pool,
):
with self.monitor(
name='encode_texts_seconds',
documentation='texts encode time in seconds',
):
minibatch.embeddings = self._model.encode_text(
batch_data
)

return docs
Loading