Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

docs(flow): add docs to train, add, index, query #344

Merged
merged 1 commit into from
Oct 18, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 97 additions & 9 deletions gnes/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ class Flow(TrainableBase):
flow.index()
...

You can also use the shortcuts, e.g. :py:meth:`add_encoder`, :py:meth:`add_preprocessor`.
You can also use `add('Encoder', ...)` or `add(Service.Encoder, ...)` to add service to the flow.
The generic :py:meth:`add` provides a convenient way to build the flow.

It is recommend to use flow in the context manner as showed above.
As shown above, it is recommend to use flow in the context manner as showed above,
as it manages all opened sockets/processes/threads automatically when exit from the context.

Note the different default copy behaviors in :py:meth:`.add()` and :py:meth:`.build()`:
Note the different copy behaviors in :py:meth:`.add()` and :py:meth:`.build()`:
:py:meth:`.add()` always copy the flow by default, whereas :py:meth:`.build()` modify the flow in place.
You can change this behavior by giving an argument `copy_flow=False`.
You can change this behavior by specifying th argument `copy_flow=False`.

"""

Expand Down Expand Up @@ -299,15 +301,70 @@ def train(self, bytes_gen: Iterator[bytes] = None, **kwargs):

It will start a :py:class:`CLIClient` and call :py:func:`train`.

Example,

.. highlight:: python
.. code-block:: python

with f.build(backend='thread') as flow:
flow.train(txt_file='aa.txt')
flow.train(image_zip_file='aa.zip', batch_size=64)
flow.train(video_zip_file='aa.zip')
...


This will call the pre-built reader to read files into an iterator of bytes and feed to the flow.

One may also build a reader/generator on your own.

Example,

.. highlight:: python
.. code-block:: python

def my_reader():
for _ in range(10):
yield b'abcdfeg' # each yield generates a document for training

with f.build(backend='thread') as flow:
flow.train(bytes_gen=my_reader())

:param bytes_gen: An iterator of bytes. If not given, then you have to specify it in `kwargs`.
:param kwargs: accepts all keyword arguments of `gnes client` CLI
"""
self._get_client(bytes_gen, mode='train', **kwargs).start()


def index(self, bytes_gen: Iterator[bytes] = None, **kwargs):
"""Do indexing on the current flow

Example,

.. highlight:: python
.. code-block:: python

with f.build(backend='thread') as flow:
flow.index(txt_file='aa.txt')
flow.index(image_zip_file='aa.zip', batch_size=64)
flow.index(video_zip_file='aa.zip')
...


This will call the pre-built reader to read files into an iterator of bytes and feed to the flow.

One may also build a reader/generator on your own.

Example,

.. highlight:: python
.. code-block:: python

def my_reader():
for _ in range(10):
yield b'abcdfeg' # each yield generates a document to index

with f.build(backend='thread') as flow:
flow.index(bytes_gen=my_reader())

It will start a :py:class:`CLIClient` and call :py:func:`index`.

:param bytes_gen: An iterator of bytes. If not given, then you have to specify it in `kwargs`.
Expand All @@ -320,6 +377,35 @@ def query(self, bytes_gen: Iterator[bytes] = None, **kwargs):

It will start a :py:class:`CLIClient` and call :py:func:`query`.


Example,

.. highlight:: python
.. code-block:: python

with f.build(backend='thread') as flow:
flow.query(txt_file='aa.txt')
flow.query(image_zip_file='aa.zip', batch_size=64)
flow.query(video_zip_file='aa.zip')
...


This will call the pre-built reader to read files into an iterator of bytes and feed to the flow.

One may also build a reader/generator on your own.

Example,

.. highlight:: python
.. code-block:: python

def my_reader():
for _ in range(10):
yield b'abcdfeg' # each yield generates a query for searching

with f.build(backend='thread') as flow:
flow.query(bytes_gen=my_reader())

:param bytes_gen: An iterator of bytes. If not given, then you have to specify it in `kwargs`.
:param kwargs: accepts all keyword arguments of `gnes client` CLI
"""
Expand Down Expand Up @@ -418,11 +504,11 @@ def set(self, name: str, recv_from: Union[str, Tuple[str], List[str], 'Service']
recv_from = op_flow._parse_service_endpoints(op_flow, name, recv_from, connect_to_last_service=True)

if clear_old_attr:
node['incomes'] = recv_from
# remove all edges point to this service
for n in op_flow._service_nodes.values():
if name in n['outgoings']:
n['outgoings'].remove(name)
node['incomes'] = recv_from
else:
node['incomes'] = node['incomes'].union(recv_from)

Expand All @@ -432,12 +518,12 @@ def set(self, name: str, recv_from: Union[str, Tuple[str], List[str], 'Service']

if send_to:
send_to = op_flow._parse_service_endpoints(op_flow, name, send_to, connect_to_last_service=False)
node['outgoings'] = send_to
if clear_old_attr:
# remove all edges this service point to
for n in op_flow._service_nodes.values():
if name in n['incomes']:
n['incomes'].remove(name)
node['outgoings'] = send_to
else:
node['outgoings'] = node['outgoings'].union(send_to)

Expand Down Expand Up @@ -576,7 +662,8 @@ def add(self, service: Union['Service', str],
return op_flow

@staticmethod
def _parse_service_endpoints(op_flow, cur_service_name, service_endpoint, connect_to_last_service=False):
def _parse_service_endpoints(op_flow, cur_service_name, service_endpoint, connect_to_last_service=False,
check_name_exist=True):
# parsing recv_from
if isinstance(service_endpoint, str):
service_endpoint = [service_endpoint]
Expand All @@ -592,7 +679,8 @@ def _parse_service_endpoints(op_flow, cur_service_name, service_endpoint, connec
if s == cur_service_name:
raise FlowTopologyError('the income of a service can not be itself')
if s not in op_flow._service_nodes:
raise FlowMissingNode('recv_from: %s can not be found in this Flow' % s)
if check_name_exist:
raise FlowMissingNode('recv_from: %s can not be found in this Flow' % s)
else:
raise ValueError('recv_from=%s is not parsable' % service_endpoint)
return set(service_endpoint)
Expand Down