Skip to content

Commit

Permalink
feat: add monitoring
Browse files Browse the repository at this point in the history
feat: add monitoring
  • Loading branch information
Sami Jaghouar committed Apr 5, 2022
1 parent 9a68add commit 5fe72a5
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 36 deletions.
96 changes: 60 additions & 36 deletions server/clip_server/executors/clip_torch.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import contextlib
import io
from multiprocessing.pool import ThreadPool, Pool
from typing import Optional, List, Tuple

import torch
from PIL import Image
from jina import Executor, requests, DocumentArray
from jina import Executor, requests
from docarray import DocumentArray
from prometheus_client import Summary

from clip_server.model import clip

Expand Down Expand Up @@ -32,49 +35,70 @@ def __init__(
else:
self._pool = Pool(processes=num_worker_preprocess)

self.summary_text = self.get_summary('text_preproc_second', 'Time to preprocess text')
self.summary_image = self.get_summary('image_preproc_second', 'Time to preprocess image')
self.summary_encode = self.get_summary('encode_second', 'Time to encode')
def get_summary(self, title, details):
return (
Summary(
title,
details,
registry=self.runtime_args.metrics_registry,
).time()
if self.runtime_args.metrics_registry
else contextlib.nullcontext
)

def _preproc_image(self, da: 'DocumentArray') -> 'DocumentArray':
for d in da:
if not d.blob and d.uri:
# in case user uses HTTP protocol and send data via curl not using .blob (base64), but in .uri
d.load_uri_to_blob()
d.tensor = self._preprocess(Image.open(io.BytesIO(d.blob)))
da.tensors = da.tensors.to(self._device)
return da
with self.summary_image:
for d in da:
if not d.blob and d.uri:
# in case user uses HTTP protocol and send data via curl not using .blob (base64), but in .uri
d.load_uri_to_blob()
d.tensor = self._preprocess(Image.open(io.BytesIO(d.blob)))
da.tensors = da.tensors.to(self._device)
return da

def _preproc_text(self, da: 'DocumentArray') -> Tuple['DocumentArray', List[str]]:
texts = da.texts
da.tensors = clip.tokenize(texts).to(self._device)
da[:, 'mime_type'] = 'text'
return da, texts
with self.summary_text:
texts = da.texts
da.tensors = clip.tokenize(texts).to(self._device)
da[:, 'mime_type'] = 'text'
return da, texts





@requests
async def encode(self, docs: 'DocumentArray', **kwargs):
_img_da = docs.find({'blob': {'$exists': True}})
_txt_da = docs.find({'text': {'$exists': True}})

with torch.inference_mode():
# for image
if _img_da:
for minibatch in _img_da.map_batch(
self._preproc_image,
batch_size=self._minibatch_size,
pool=self._pool,
):
minibatch.embeddings = (
self._model.encode_image(minibatch.tensors).cpu().numpy()
)
with self.summary_encode:
with torch.inference_mode():
# for image
if _img_da:
for minibatch in _img_da.map_batch(
self._preproc_image,
batch_size=self._minibatch_size,
pool=self._pool,
):
minibatch.embeddings = (
self._model.encode_image(minibatch.tensors).cpu().numpy()
)

# for text
if _txt_da:
for minibatch, _texts in _txt_da.map_batch(
self._preproc_text,
batch_size=self._minibatch_size,
pool=self._pool,
):
minibatch.embeddings = (
self._model.encode_text(minibatch.tensors).cpu().numpy()
)
minibatch.texts = _texts
# for text
if _txt_da:
for minibatch, _texts in _txt_da.map_batch(
self._preproc_text,
batch_size=self._minibatch_size,
pool=self._pool,
):
minibatch.embeddings = (
self._model.encode_text(minibatch.tensors).cpu().numpy()
)
minibatch.texts = _texts

# drop tensors
docs.tensors = None
# drop tensors
docs.tensors = None
4 changes: 4 additions & 0 deletions server/clip_server/torch-flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ jtype: Flow
version: '1'
with:
port: 51000
monitoring: true
port_monitoring: 8000
executors:
- name: clip_t
uses:
jtype: CLIPEncoder
metas:
py_modules:
- executors/clip_torch.py
monitoring: true
port_monitoring: 9000

0 comments on commit 5fe72a5

Please sign in to comment.