Skip to content

Commit

Permalink
Revert "feat: add traversal paths (#748)" (#749)
Browse files Browse the repository at this point in the history
This reverts commit 4fe5a1b.
  • Loading branch information
hanxiao authored Jun 10, 2022
1 parent 4fe5a1b commit d5be8c2
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 139 deletions.
18 changes: 6 additions & 12 deletions client/clip_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def _gather_result(self, r):

@property
def _unboxed_result(self):
if self._return_plain and self._results.embeddings is None:
if self._results.embeddings is None:
raise ValueError(
'empty embedding returned from the server. '
'This often due to a mis-config of the server, '
Expand All @@ -158,13 +158,14 @@ def _iter_doc(self, content) -> Generator['Document', None, None]:
else:
yield Document(text=c)
elif isinstance(c, Document):
self._return_plain = False
if c.content_type in ('text', 'blob', 'tensor'):
if c.content_type in ('text', 'blob'):
self._return_plain = False
yield c
elif not c.blob and c.uri:
c.load_uri_to_blob()
self._return_plain = False
yield c
elif len(c.chunks) > 0 or len(c.matches) > 0:
elif c.tensor is not None:
yield c
else:
raise TypeError(f'unsupported input type {c!r} {c.content_type}')
Expand All @@ -183,17 +184,10 @@ def _iter_doc(self, content) -> Generator['Document', None, None]:
)

def _get_post_payload(self, content, kwargs):
parameters = {}
if 'traversal_paths' in kwargs:
parameters['traversal_paths'] = kwargs['traversal_paths']
if 'batch_size' in kwargs:
parameters['minibatch_size'] = kwargs['batch_size']

return dict(
on='/',
inputs=self._iter_doc(content),
request_size=kwargs.get('batch_size', 32),
parameters=parameters,
request_size=kwargs.get('batch_size', 8),
total_docs=len(content) if hasattr(content, '__len__') else None,
)

Expand Down
50 changes: 18 additions & 32 deletions server/clip_server/executors/clip_hg.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ def __init__(
use_default_preprocessing: bool = True,
max_length: int = 77,
device: str = 'cpu',
overwrite_embeddings: bool = False,
num_worker_preprocess: int = 4,
minibatch_size: int = 32,
traversal_paths: str = '@r',
*args,
**kwargs,
):
Expand Down Expand Up @@ -52,15 +52,12 @@ def __init__(
:param num_worker_preprocess: Number of cpu processes used in preprocessing step.
:param minibatch_size: Default batch size for encoding, used if the
batch size is not passed as a parameter with the request.
:param traversal_paths: Default traversal paths for encoding, used if
the traversal path is not passed as a parameter with the request.
"""
super().__init__(*args, **kwargs)
self._minibatch_size = minibatch_size

self._use_default_preprocessing = use_default_preprocessing
self._max_length = max_length
self._traversal_paths = traversal_paths

# self.device = device
if not device:
Expand Down Expand Up @@ -113,36 +110,32 @@ def _preproc_images(self, docs: 'DocumentArray'):
name='preprocess_images_seconds',
documentation='images preprocess time in seconds',
):
if self._use_default_preprocessing:
tensors_batch = []
tensors_batch = []

for d in docs:
content = d.content
for d in docs:
content = d.content

if d.blob:
d.convert_blob_to_image_tensor()
elif d.tensor is None and d.uri:
# in case user uses HTTP protocol and send data via curl not using .blob (base64), but in .uri
d.load_uri_to_image_tensor()
if d.blob:
d.convert_blob_to_image_tensor()
elif d.uri:
d.load_uri_to_image_tensor()

tensors_batch.append(d.tensor)
tensors_batch.append(d.tensor)

# recover content
d.content = content
# recover content
d.content = content

if self._use_default_preprocessing:
batch_data = self._vision_preprocessor(
images=tensors_batch,
return_tensors='pt',
)
batch_data = {
k: v.type(torch.float32).to(self._device)
for k, v in batch_data.items()
}
batch_data = {k: v.to(self._device) for k, v in batch_data.items()}

else:
batch_data = {
'pixel_values': torch.tensor(
docs.tensors, dtype=torch.float32, device=self._device
tensors_batch, dtype=torch.float32, device=self._device
)
}

Expand Down Expand Up @@ -170,7 +163,7 @@ async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs):
set_rank(docs)

@requests
async def encode(self, docs: DocumentArray, parameters: Dict = {}, **kwargs):
async def encode(self, docs: DocumentArray, **kwargs):
"""
Encode all documents with `text` or image content using the corresponding CLIP
encoder. Store the embeddings in the `embedding` attribute.
Expand All @@ -188,25 +181,18 @@ async def encode(self, docs: DocumentArray, parameters: Dict = {}, **kwargs):
the CLIP model was trained on images of the size ``224 x 224``, and that
they are of the shape ``[3, H, W]`` with ``dtype=float32``. They should
also be normalized (values between 0 and 1).
:param parameters: A dictionary that contains parameters to control encoding.
The accepted keys are ``traversal_paths`` and ``minibatch_size`` - in their
absence their corresponding default values are used.
"""

traversal_paths = parameters.get('traversal_paths', self._traversal_paths)
minibatch_size = parameters.get('minibatch_size', self._minibatch_size)

_img_da = DocumentArray()
_txt_da = DocumentArray()
for d in docs[traversal_paths]:
for d in docs:
split_img_txt_da(d, _img_da, _txt_da)

with torch.inference_mode():
# for image
if _img_da:
for minibatch, batch_data in _img_da.map_batch(
self._preproc_images,
batch_size=minibatch_size,
batch_size=self._minibatch_size,
pool=self._pool,
):
with self.monitor(
Expand All @@ -224,7 +210,7 @@ async def encode(self, docs: DocumentArray, parameters: Dict = {}, **kwargs):
if _txt_da:
for minibatch, batch_data in _txt_da.map_batch(
self._preproc_texts,
batch_size=minibatch_size,
batch_size=self._minibatch_size,
pool=self._pool,
):
with self.monitor(
Expand Down
51 changes: 32 additions & 19 deletions server/clip_server/executors/clip_onnx.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,16 @@ def __init__(
name: str = 'ViT-B/32',
device: Optional[str] = None,
num_worker_preprocess: int = 4,
minibatch_size: int = 32,
traversal_paths: str = '@r',
minibatch_size: int = 16,
**kwargs,
):
super().__init__(**kwargs)

self._minibatch_size = minibatch_size
self._traversal_paths = traversal_paths

self._preprocess_tensor = clip._transform_ndarray(clip.MODEL_SIZE[name])
self._pool = ThreadPool(processes=num_worker_preprocess)

self._minibatch_size = minibatch_size

self._model = CLIPOnnxModel(name)

import torch
Expand Down Expand Up @@ -61,7 +59,7 @@ def __init__(
and hasattr(self.runtime_args, 'replicas')
):
replicas = getattr(self.runtime_args, 'replicas', 1)
num_threads = max(1, torch.get_num_threads() * 2 // replicas)
num_threads = max(1, torch.get_num_threads() // replicas)
if num_threads < 2:
warnings.warn(
f'Too many replicas ({replicas}) vs too few threads {num_threads} may result in '
Expand Down Expand Up @@ -100,40 +98,55 @@ async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs):
set_rank(docs)

@requests
async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs):

traversal_paths = parameters.get('traversal_paths', self._traversal_paths)
minibatch_size = parameters.get('minibatch_size', self._minibatch_size)

async def encode(self, docs: 'DocumentArray', **kwargs):
_img_da = DocumentArray()
_txt_da = DocumentArray()
for d in docs[traversal_paths]:
for d in docs:
split_img_txt_da(d, _img_da, _txt_da)

# for image
if _img_da:
for minibatch, batch_data in _img_da.map_batch(
for minibatch, _contents in _img_da.map_batch(
self._preproc_images,
batch_size=minibatch_size,
batch_size=self._minibatch_size,
pool=self._pool,
):
with self.monitor(
name='encode_images_seconds',
documentation='images encode time in seconds',
):
minibatch.embeddings = self._model.encode_image(batch_data)
minibatch.embeddings = self._model.encode_image(minibatch.tensors)

# recover original content
try:
_ = iter(_contents)
for _d, _ct in zip(minibatch, _contents):
_d.content = _ct
except TypeError:
pass

# for text
# for text
if _txt_da:
for minibatch, batch_data in _txt_da.map_batch(
for minibatch, _contents in _txt_da.map_batch(
self._preproc_texts,
batch_size=minibatch_size,
batch_size=self._minibatch_size,
pool=self._pool,
):
with self.monitor(
name='encode_texts_seconds',
documentation='texts encode time in seconds',
):
minibatch.embeddings = self._model.encode_text(batch_data)
minibatch.embeddings = self._model.encode_text(minibatch.tensors)

# recover original content
try:
_ = iter(_contents)
for _d, _ct in zip(minibatch, _contents):
_d.content = _ct
except TypeError:
pass

# drop tensors
docs.tensors = None

return docs
43 changes: 28 additions & 15 deletions server/clip_server/executors/clip_tensorrt.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ def __init__(
name: str = 'ViT-B/32',
device: str = 'cuda',
num_worker_preprocess: int = 4,
minibatch_size: int = 32,
traversal_paths: str = '@r',
minibatch_size: int = 64,
**kwargs,
):
super().__init__(**kwargs)
Expand All @@ -29,8 +28,6 @@ def __init__(
self._pool = ThreadPool(processes=num_worker_preprocess)

self._minibatch_size = minibatch_size
self._traversal_paths = traversal_paths

self._device = device

import torch
Expand Down Expand Up @@ -74,51 +71,67 @@ async def rank(self, docs: 'DocumentArray', parameters: Dict, **kwargs):
set_rank(docs)

@requests
async def encode(self, docs: 'DocumentArray', parameters: Dict = {}, **kwargs):
traversal_paths = parameters.get('traversal_paths', self._traversal_paths)
minibatch_size = parameters.get('minibatch_size', self._minibatch_size)

async def encode(self, docs: 'DocumentArray', **kwargs):
_img_da = DocumentArray()
_txt_da = DocumentArray()
for d in docs[traversal_paths]:
for d in docs:
split_img_txt_da(d, _img_da, _txt_da)

# for image
if _img_da:
for minibatch, batch_data in _img_da.map_batch(
for minibatch, _contents in _img_da.map_batch(
self._preproc_images,
batch_size=minibatch_size,
batch_size=self._minibatch_size,
pool=self._pool,
):
with self.monitor(
name='encode_images_seconds',
documentation='images encode time in seconds',
):
minibatch.embeddings = (
self._model.encode_image(batch_data)
self._model.encode_image(minibatch.tensors)
.detach()
.cpu()
.numpy()
.astype(np.float32)
)

# recover original content
try:
_ = iter(_contents)
for _d, _ct in zip(minibatch, _contents):
_d.content = _ct
except TypeError:
pass

# for text
if _txt_da:
for minibatch, batch_data in _txt_da.map_batch(
for minibatch, _contents in _txt_da.map_batch(
self._preproc_texts,
batch_size=minibatch_size,
batch_size=self._minibatch_size,
pool=self._pool,
):
with self.monitor(
name='encode_texts_seconds',
documentation='texts encode time in seconds',
):
minibatch.embeddings = (
self._model.encode_text(batch_data)
self._model.encode_text(minibatch.tensors)
.detach()
.cpu()
.numpy()
.astype(np.float32)
)

# recover original content
try:
_ = iter(_contents)
for _d, _ct in zip(minibatch, _contents):
_d.content = _ct
except TypeError:
pass

# drop tensors
docs.tensors = None

return docs
Loading

0 comments on commit d5be8c2

Please sign in to comment.