Skip to content

Commit

Permalink
refactor: monitor (#743)
Browse files Browse the repository at this point in the history
* refactor: monitor

* fix: errors

* fix: remove overrite_embedding

* fix: minior revision
  • Loading branch information
numb3r3 authored Jun 9, 2022
1 parent d675148 commit 5eb5d7e
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 183 deletions.
174 changes: 71 additions & 103 deletions server/clip_server/executors/clip_hg.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import os
import warnings
from multiprocessing.pool import ThreadPool
from typing import Dict, Optional, Sequence
from typing import Dict, Optional
import numpy as np
import torch
from transformers import CLIPFeatureExtractor, CLIPModel, CLIPTokenizer
from clip_server.executors.helper import (
split_img_txt_da,
set_rank,
)
from clip_server.model import clip
from jina import Executor, requests, DocumentArray, monitor
from jina import Executor, requests, DocumentArray


class CLIPEncoder(Executor):
Expand Down Expand Up @@ -50,25 +49,15 @@ def __init__(
use 77 as the max length.
:param device: Pytorch device to put the model on, e.g. 'cpu', 'cuda',
'cuda:1'.
:param overwrite_embeddings: Whether to overwrite existing embeddings. By
default docs that have embeddings already are not processed. This value
can be overwritten if the same parameter is passed to the request.
:param num_worker_preprocess: Number of cpu processes used in preprocessing step.
:param minibatch_size: Default batch size for encoding, used if the
batch size is not passed as a parameter with the request.
"""
super().__init__(*args, **kwargs)
self.overwrite_embeddings = overwrite_embeddings
self._minibatch_size = minibatch_size
self.pretrained_model_name_or_path = pretrained_model_name_or_path
self.base_tokenizer_model = (
base_tokenizer_model or pretrained_model_name_or_path
)
self.use_default_preprocessing = use_default_preprocessing
self.base_feature_extractor = (
base_feature_extractor or pretrained_model_name_or_path
)
self.max_length = max_length

self._use_default_preprocessing = use_default_preprocessing
self._max_length = max_length

# self.device = device
if not device:
Expand All @@ -91,15 +80,16 @@ def __init__(
# NOTE: make sure to set the threads right after the torch import,
# and `torch.set_num_threads` always take precedence over environment variables `OMP_NUM_THREADS`.
# For more details, please see https://pytorch.org/docs/stable/generated/torch.set_num_threads.html
# FIXME: This hack would harm the performance in K8S deployment.
torch.set_num_threads(max(num_threads, 1))
torch.set_num_interop_threads(1)

self.vision_preprocessor = CLIPFeatureExtractor.from_pretrained(
self.base_feature_extractor
self._vision_preprocessor = CLIPFeatureExtractor.from_pretrained(
base_feature_extractor or pretrained_model_name_or_path
)
self._tokenizer = CLIPTokenizer.from_pretrained(
base_tokenizer_model or pretrained_model_name_or_path
)
self.tokenizer = CLIPTokenizer.from_pretrained(self.base_tokenizer_model)
self._model = CLIPModel.from_pretrained(self.pretrained_model_name_or_path)
self._model = CLIPModel.from_pretrained(pretrained_model_name_or_path)

if finetuned_checkpoint_path:
if finetuned_checkpoint_path.startswith(
Expand All @@ -115,44 +105,50 @@ def __init__(
self._model.eval().to(self._device)
self._pool = ThreadPool(processes=num_worker_preprocess)

@monitor(name='preprocess_images_seconds')
def _preproc_images(self, docs: 'DocumentArray'):
contents = docs.contents
tensors_batch = []
for d in docs:
if d.blob:
d.convert_blob_to_image_tensor()
elif d.uri:
d.load_uri_to_image_tensor()
tensors_batch.append(d.tensor)
if self.use_default_preprocessing:
docs.tensors = self._preprocess_images(tensors_batch)['pixel_values']
else:
docs.tensors = torch.tensor(
tensors_batch, dtype=torch.float32, device=self._device
)
return docs, contents

@monitor(name='encode_images_seconds')
def _encode_images(self, docs: DocumentArray):
docs.embeddings = (
self._model.get_image_features(docs.tensors)
.cpu()
.numpy()
.astype(np.float32)
)
with self.monitor('preprocess_images_seconds'):
tensors_batch = []

for d in docs:
content = d.content

if d.blob:
d.convert_blob_to_image_tensor()
elif d.uri:
d.load_uri_to_image_tensor()

tensors_batch.append(d.tensor)

# recover content
d.content = content

if self._use_default_preprocessing:
batch_data = self._vision_preprocessor(
images=tensors_batch,
return_tensors='pt',
)
batch_data = {k: v.to(self._device) for k, v in batch_data.items()}

else:
batch_data = {
'pixel_values': torch.tensor(
tensors_batch, dtype=torch.float32, device=self._device
)
}

return docs, batch_data

@monitor(name='preprocess_texts_seconds')
def _preproc_texts(self, docs: 'DocumentArray'):
contents = docs.contents
docs.tensors = self._tokenize_texts(docs.texts)['input_ids']
return docs, contents

@monitor(name='encode_texts_seconds')
def _encode_texts(self, docs: 'DocumentArray'):
docs.embeddings = (
self._model.get_text_features(docs.tensors).cpu().numpy().astype(np.float32)
)
with self.monitor('preprocess_texts_seconds'):
batch_data = self._tokenizer(
docs.texts,
max_length=self._max_length,
padding='longest',
truncation=True,
return_tensors='pt',
)
batch_data = {k: v.to(self._device) for k, v in batch_data.items()}
return docs, batch_data

@requests(on='/rank')
async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs):
Expand All @@ -164,9 +160,8 @@ async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs):
async def encode(self, docs: DocumentArray, **kwargs):
"""
Encode all documents with `text` or image content using the corresponding CLIP
encoder. Store the embeddings in the `embedding` attribute. Documents with
existing embeddings are not processed unless `overwrite_embeddings` is set to
True.
encoder. Store the embeddings in the `embedding` attribute.
:param docs: Documents sent to the encoder. The image docs must have
``tensor`` of the
shape ``Height x Width x 3``. By default, the input ``tensor`` must
Expand All @@ -189,59 +184,32 @@ async def encode(self, docs: DocumentArray, **kwargs):
with torch.inference_mode():
# for image
if _img_da:
for minibatch, _contents in _img_da.map_batch(
for minibatch, batch_data in _img_da.map_batch(
self._preproc_images,
batch_size=self._minibatch_size,
pool=self._pool,
):

self._encode_images(minibatch)

# recover original content
try:
_ = iter(_contents)
for _d, _ct in zip(minibatch, _contents):
_d.content = _ct
except TypeError:
pass
with self.monitor('encode_images_seconds'):
minibatch.embeddings = (
self._model.get_image_features(**batch_data)
.cpu()
.numpy()
.astype(np.float32)
)

# for text
if _txt_da:
for minibatch, _contents in _txt_da.map_batch(
for minibatch, batch_data in _txt_da.map_batch(
self._preproc_texts,
batch_size=self._minibatch_size,
pool=self._pool,
):
self._encode_texts(minibatch)

# recover original content
try:
_ = iter(_contents)
for _d, _ct in zip(minibatch, _contents):
_d.content = _ct
except TypeError:
pass

# drop tensors
if self.use_default_preprocessing:
docs.tensors = None
return docs
with self.monitor('encode_texts_seconds'):
minibatch.embeddings = (
self._model.get_text_features(**batch_data)
.cpu()
.numpy()
.astype(np.float32)
)

def _preprocess_images(self, images):
"""Preprocess images."""
x = self.vision_preprocessor(
images=images,
return_tensors='pt',
)
return {k: v.to(torch.device(self._device)) for k, v in x.items()}

def _tokenize_texts(self, texts: Sequence[str]):
"""Tokenize texts."""
x = self.tokenizer(
texts,
max_length=self.max_length,
padding='longest',
truncation=True,
return_tensors='pt',
)
return {k: v.to(self._device) for k, v in x.items()}
return docs
29 changes: 12 additions & 17 deletions server/clip_server/executors/clip_onnx.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
)
from clip_server.model import clip
from clip_server.model.clip_onnx import CLIPOnnxModel
from jina import Executor, requests, DocumentArray, monitor
from jina import Executor, requests, DocumentArray


class CLIPEncoder(Executor):
Expand Down Expand Up @@ -75,23 +75,15 @@ def __init__(

self._model.start_sessions(sess_options=sess_options, providers=providers)

@monitor(name='preprocess_images_seconds')
def _preproc_images(self, docs: 'DocumentArray'):
return preproc_image(
docs, preprocess_fn=self._preprocess_tensor, return_np=True
)
with self.monitor('preprocess_images_seconds'):
return preproc_image(
docs, preprocess_fn=self._preprocess_tensor, return_np=True
)

@monitor(name='preprocess_texts_seconds')
def _preproc_texts(self, docs: 'DocumentArray'):
return preproc_text(docs, return_np=True)

@monitor(name='encode_images_seconds')
def _encode_images(self, docs: 'DocumentArray'):
docs.embeddings = self._model.encode_image(docs.tensors)

@monitor(name='encode_texts_seconds')
def _encode_texts(self, docs: 'DocumentArray'):
docs.embeddings = self._model.encode_text(docs.tensors)
with self.monitor('preprocess_texts_seconds'):
return preproc_text(docs, return_np=True)

@requests(on='/rank')
async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs):
Expand All @@ -113,7 +105,8 @@ async def encode(self, docs: 'DocumentArray', **kwargs):
batch_size=self._minibatch_size,
pool=self._pool,
):
self._encode_images(minibatch)
with self.monitor('encode_images_seconds'):
minibatch.embeddings = self._model.encode_image(minibatch.tensors)

# recover original content
try:
Expand All @@ -130,7 +123,9 @@ async def encode(self, docs: 'DocumentArray', **kwargs):
batch_size=self._minibatch_size,
pool=self._pool,
):
self._encode_texts(minibatch)
with self.monitor('encode_texts_seconds'):
minibatch.embeddings = self._model.encode_text(minibatch.tensors)

# recover original content
try:
_ = iter(_contents)
Expand Down
Loading

0 comments on commit 5eb5d7e

Please sign in to comment.