Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
Merge pull request #319 from gnes-ai/fix-flow-2
Browse files Browse the repository at this point in the history
fix(frontend): fix frontend blocking behavior
  • Loading branch information
mergify[bot] authored Oct 11, 2019
2 parents f7e7791 + c23ea61 commit 8ca5ef0
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 20 deletions.
4 changes: 3 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
import os
from os import path
import sys
from os import path

sys.path.insert(0, os.path.abspath('..'))

# -- Project information -----------------------------------------------------
Expand Down Expand Up @@ -51,6 +52,7 @@
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx_autodoc_typehints',
'sphinx.ext.viewcode',
'sphinxcontrib.apidoc',
'sphinxarg.ext',
Expand Down
3 changes: 2 additions & 1 deletion docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
sphinx-argparse
sphinxcontrib-apidoc
sphinxcontrib-apidoc
sphinx-autodoc-typehints
3 changes: 3 additions & 0 deletions gnes/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ def dump_full_path(self):
def yaml_full_path(self):
"""
Get the file path of the yaml config
:return:
"""
return os.path.join(self.work_dir, '%s.yml' % self.name)
Expand Down Expand Up @@ -247,6 +248,7 @@ def train(self, *args, **kwargs):
def dump(self, filename: str = None) -> None:
"""
Serialize the object to a binary file
:param filename: file path of the serialized file, if not given then :py:attr:`dump_full_path` is used
"""
f = filename or self.dump_full_path
Expand All @@ -260,6 +262,7 @@ def dump(self, filename: str = None) -> None:
def dump_yaml(self, filename: str = None) -> None:
"""
Serialize the object to a yaml file
:param filename: file path of the yaml file, if not given then :py:attr:`dump_yaml_path` is used
"""
f = filename or self.yaml_full_path
Expand Down
7 changes: 4 additions & 3 deletions gnes/client/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import sys
import time
import zipfile
from typing import Generator
from typing import Iterator

from termcolor import colored

Expand Down Expand Up @@ -78,6 +78,7 @@ def query_callback(self, req: 'gnes_pb2.Request', resp: 'gnes_pb2.Response'):
"""
callback after get the query result
override this method to customize query behavior
:param resp: response
:param req: query
:return:
Expand All @@ -86,14 +87,14 @@ def query_callback(self, req: 'gnes_pb2.Request', resp: 'gnes_pb2.Response'):
print(resp)

@property
def bytes_generator(self) -> Generator[bytes, None, None]:
def bytes_generator(self) -> Iterator[bytes]:
if self._bytes_generator:
return self._bytes_generator
else:
raise ValueError('bytes_generator is empty or not set')

@bytes_generator.setter
def bytes_generator(self, bytes_gen: Generator[bytes, None, None]):
def bytes_generator(self, bytes_gen: Iterator[bytes]):
if self._bytes_generator:
self.logger.warning('bytes_generator is not empty, overrided')
self._bytes_generator = bytes_gen
Expand Down
48 changes: 35 additions & 13 deletions gnes/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections import OrderedDict, defaultdict
from contextlib import ExitStack
from functools import wraps
from typing import Union, Tuple, List, Optional, Generator
from typing import Union, Tuple, List, Optional, Iterator

from ..cli.parser import set_router_parser, set_indexer_parser, \
set_frontend_parser, set_preprocessor_parser, \
Expand Down Expand Up @@ -78,9 +78,10 @@ class Flow:
flow.index()
...
You can also use the shortcuts, e.g. :py:meth:add_encoder , :py:meth:add_preprocessor
You can also use the shortcuts, e.g. :py:meth:`add_encoder`, :py:meth:`add_preprocessor`.
It is recommend to use flow in the context manner as showed above.
Note the different default copy behaviors in :py:meth:`.add()` and :py:meth:`.build()`:
:py:meth:`.add()` always copy the flow by default, whereas :py:meth:`.build()` modify the flow in place.
You can change this behavior by giving an argument `copy_flow=False`.
Expand Down Expand Up @@ -169,18 +170,38 @@ def to_mermaid(self, left_right: bool = True):
'copy-paste the output and visualize it with: https://mermaidjs.github.io/mermaid-live-editor/')
return mermaid_str

def train(self, bytes_gen: Generator[bytes, None, None] = None, **kwargs):
def train(self, bytes_gen: Iterator[bytes] = None, **kwargs):
"""Do training on the current flow
It will start a :py:class:`CLIClient` and call :py:func:`train`.
:param bytes_gen: An iterator of bytes. If not given, then you have to specify it in `kwargs`.
:param kwargs: accepts all keyword arguments of `gnes client` CLI
"""
self._call_client(bytes_gen, mode='train', **kwargs)

def index(self, bytes_gen: Generator[bytes, None, None] = None, **kwargs):
def index(self, bytes_gen: Iterator[bytes] = None, **kwargs):
"""Do indexing on the current flow
It will start a :py:class:`CLIClient` and call :py:func:`index`.
:param bytes_gen: An iterator of bytes. If not given, then you have to specify it in `kwargs`.
:param kwargs: accepts all keyword arguments of `gnes client` CLI
"""
self._call_client(bytes_gen, mode='index', **kwargs)

def query(self, bytes_gen: Generator[bytes, None, None] = None, **kwargs):
def query(self, bytes_gen: Iterator[bytes] = None, **kwargs):
"""Do indexing on the current flow
It will start a :py:class:`CLIClient` and call :py:func:`query`.
:param bytes_gen: An iterator of bytes. If not given, then you have to specify it in `kwargs`.
:param kwargs: accepts all keyword arguments of `gnes client` CLI
"""
self._call_client(bytes_gen, mode='query', **kwargs)

@_build_level(BuildLevel.RUNTIME)
def _call_client(self, bytes_gen: Generator[bytes, None, None] = None, **kwargs):

def _call_client(self, bytes_gen: Iterator[bytes] = None, **kwargs):
os.unsetenv('http_proxy')
os.unsetenv('https_proxy')
args, p_args = self._get_parsed_args(self, set_client_cli_parser, kwargs)
Expand All @@ -192,26 +213,26 @@ def _call_client(self, bytes_gen: Generator[bytes, None, None] = None, **kwargs)
c.start()

def add_frontend(self, *args, **kwargs) -> 'Flow':
"""Add a frontend to the current flow, a shortcut of add(Service.Frontend)
"""Add a frontend to the current flow, a shortcut of :py:meth:`add(Service.Frontend)`.
Usually you dont need to call this function explicitly, a flow object contains a frontend service by default.
This function is useful when you build a flow without the frontend and want to customize the frontend later.
"""
return self.add(Service.Frontend, *args, **kwargs)

def add_encoder(self, *args, **kwargs) -> 'Flow':
"""Add an encoder to the current flow, a shortcut of add(Service.Encoder)"""
"""Add an encoder to the current flow, a shortcut of :py:meth:`add(Service.Encoder)`"""
return self.add(Service.Encoder, *args, **kwargs)

def add_indexer(self, *args, **kwargs) -> 'Flow':
"""Add an indexer to the current flow, a shortcut of add(Service.Indexer)"""
"""Add an indexer to the current flow, a shortcut of :py:meth:`add(Service.Indexer)`"""
return self.add(Service.Indexer, *args, **kwargs)

def add_preprocessor(self, *args, **kwargs) -> 'Flow':
"""Add a router to the current flow, a shortcut of add(Service.Preprocessor)"""
"""Add a preprocessor to the current flow, a shortcut of :py:meth:`add(Service.Preprocessor)`"""
return self.add(Service.Preprocessor, *args, **kwargs)

def add_router(self, *args, **kwargs) -> 'Flow':
"""Add a preprocessor to the current flow, a shortcut of add(Service.Router)"""
"""Add a router to the current flow, a shortcut of :py:meth:`add(Service.Router)`"""
return self.add(Service.Router, *args, **kwargs)

def add(self, service: 'Service',
Expand Down Expand Up @@ -416,7 +437,8 @@ def _build_graph(self, copy_flow: bool) -> 'Flow':
def build(self, backend: Optional[str] = 'thread', copy_flow: bool = False, *args, **kwargs) -> 'Flow':
"""
Build the current flow and make it ready to use
:param backend: supported 'thread', 'process', 'swarm', 'k8s', 'shell'
:param backend: supported 'thread', 'process', 'swarm', 'k8s', 'shell', if None then only build graph only
:param copy_flow: return the copy of the current flow
:return: the current flow (by default)
"""
Expand Down
2 changes: 2 additions & 0 deletions gnes/indexer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def __init__(self, helper_indexer: 'BaseChunkIndexerHelper' = None, *args, **kwa
def add(self, keys: List[Tuple[int, int]], vectors: np.ndarray, weights: List[float], *args, **kwargs):
"""
adding new chunks and their vector representations
:param keys: list of (doc_id, offset) tuple
:param vectors: vector representations
:param weights: weight of the chunks
Expand Down Expand Up @@ -160,6 +161,7 @@ class BaseDocIndexer(BaseIndexer):
def add(self, keys: List[int], docs: List['gnes_pb2.Document'], *args, **kwargs):
"""
adding new docs and their protobuf representation
:param keys: list of doc_id
:param docs: list of protobuf Document objects
"""
Expand Down
1 change: 1 addition & 0 deletions gnes/indexer/chunk/annoy.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class AnnoyIndexer(BCI):
def __init__(self, num_dim: int, data_path: str, metric: str = 'angular', n_trees: int = 10, *args, **kwargs):
"""
Initialize an AnnoyIndexer
:param num_dim: when set to -1, then num_dim is auto decided on first .add()
:param data_path: index data file managed by the annoy indexer
:param metric:
Expand Down
1 change: 1 addition & 0 deletions gnes/indexer/chunk/faiss.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class FaissIndexer(BCI):
def __init__(self, num_dim: int, index_key: str, data_path: str, *args, **kwargs):
"""
Initialize an FaissIndexer
:param num_dim: when set to -1, then num_dim is auto decided on first .add()
:param data_path: index data file managed by the faiss indexer
"""
Expand Down
3 changes: 3 additions & 0 deletions gnes/indexer/doc/filesys.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def add(self, keys: List[int], docs: List['gnes_pb2.Document'], *args, **kwargs)
"""
write GIFs of each document into disk
folder structure: /data_path/doc_id/0.gif, 1.gif...
:param keys: list of doc id
:param docs: list of docs
"""
Expand All @@ -55,6 +56,8 @@ def add(self, keys: List[int], docs: List['gnes_pb2.Document'], *args, **kwargs)

def query(self, keys: List[int], *args, **kwargs) -> List['gnes_pb2.Document']:
"""
Find the doc according to the keys
:param keys: list of doc id
:return: list of documents whose chunks field contain all the GIFs of this doc(one GIF per chunk)
"""
Expand Down
1 change: 1 addition & 0 deletions gnes/router/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def reduce_embedding(self, accum_msgs: List['gnes_pb2.Message'], msg_type: str,
def apply(self, msg: 'gnes_pb2.Message', accum_msgs: List['gnes_pb2.Message'], *args, **kwargs) -> None:
"""
reduce embeddings from encoders (means, concat ....)
:param msg: the current message
:param accum_msgs: accumulated messages
"""
Expand Down
4 changes: 2 additions & 2 deletions gnes/service/frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ def get_response(num_recv, blocked=False):
with self.zmq_context as zmq_client:

for request in request_iterator:
num_recv = max(self.pending_request - self.args.max_pending_request, 0)
yield from get_response(num_recv, num_recv > 0)
num_recv = max(self.pending_request - self.args.max_pending_request, 1)
yield from get_response(num_recv, num_recv > 1)

zmq_client.send_message(self.add_envelope(request, zmq_client), **self.send_recv_kwargs)
self.pending_request += 1
Expand Down

0 comments on commit 8ca5ef0

Please sign in to comment.