Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
Merge pull request #344 from gnes-ai/fix-flow-6
Browse files Browse the repository at this point in the history
docs(flow): add docs to train, add, index, query
  • Loading branch information
mergify[bot] authored Oct 18, 2019
2 parents 8b43398 + 82f4b58 commit 7865b45
Showing 1 changed file with 97 additions and 9 deletions.
106 changes: 97 additions & 9 deletions gnes/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ class Flow(TrainableBase):
flow.index()
...
You can also use the shortcuts, e.g. :py:meth:`add_encoder`, :py:meth:`add_preprocessor`.
You can also use `add('Encoder', ...)` or `add(Service.Encoder, ...)` to add service to the flow.
The generic :py:meth:`add` provides a convenient way to build the flow.
It is recommend to use flow in the context manner as showed above.
As shown above, it is recommend to use flow in the context manner as showed above,
as it manages all opened sockets/processes/threads automatically when exit from the context.
Note the different default copy behaviors in :py:meth:`.add()` and :py:meth:`.build()`:
Note the different copy behaviors in :py:meth:`.add()` and :py:meth:`.build()`:
:py:meth:`.add()` always copy the flow by default, whereas :py:meth:`.build()` modify the flow in place.
You can change this behavior by giving an argument `copy_flow=False`.
You can change this behavior by specifying th argument `copy_flow=False`.
"""

Expand Down Expand Up @@ -299,15 +301,70 @@ def train(self, bytes_gen: Iterator[bytes] = None, **kwargs):
It will start a :py:class:`CLIClient` and call :py:func:`train`.
Example,
.. highlight:: python
.. code-block:: python
with f.build(backend='thread') as flow:
flow.train(txt_file='aa.txt')
flow.train(image_zip_file='aa.zip', batch_size=64)
flow.train(video_zip_file='aa.zip')
...
This will call the pre-built reader to read files into an iterator of bytes and feed to the flow.
One may also build a reader/generator on your own.
Example,
.. highlight:: python
.. code-block:: python
def my_reader():
for _ in range(10):
yield b'abcdfeg' # each yield generates a document for training
with f.build(backend='thread') as flow:
flow.train(bytes_gen=my_reader())
:param bytes_gen: An iterator of bytes. If not given, then you have to specify it in `kwargs`.
:param kwargs: accepts all keyword arguments of `gnes client` CLI
"""
self._get_client(bytes_gen, mode='train', **kwargs).start()


def index(self, bytes_gen: Iterator[bytes] = None, **kwargs):
"""Do indexing on the current flow
Example,
.. highlight:: python
.. code-block:: python
with f.build(backend='thread') as flow:
flow.index(txt_file='aa.txt')
flow.index(image_zip_file='aa.zip', batch_size=64)
flow.index(video_zip_file='aa.zip')
...
This will call the pre-built reader to read files into an iterator of bytes and feed to the flow.
One may also build a reader/generator on your own.
Example,
.. highlight:: python
.. code-block:: python
def my_reader():
for _ in range(10):
yield b'abcdfeg' # each yield generates a document to index
with f.build(backend='thread') as flow:
flow.index(bytes_gen=my_reader())
It will start a :py:class:`CLIClient` and call :py:func:`index`.
:param bytes_gen: An iterator of bytes. If not given, then you have to specify it in `kwargs`.
Expand All @@ -320,6 +377,35 @@ def query(self, bytes_gen: Iterator[bytes] = None, **kwargs):
It will start a :py:class:`CLIClient` and call :py:func:`query`.
Example,
.. highlight:: python
.. code-block:: python
with f.build(backend='thread') as flow:
flow.query(txt_file='aa.txt')
flow.query(image_zip_file='aa.zip', batch_size=64)
flow.query(video_zip_file='aa.zip')
...
This will call the pre-built reader to read files into an iterator of bytes and feed to the flow.
One may also build a reader/generator on your own.
Example,
.. highlight:: python
.. code-block:: python
def my_reader():
for _ in range(10):
yield b'abcdfeg' # each yield generates a query for searching
with f.build(backend='thread') as flow:
flow.query(bytes_gen=my_reader())
:param bytes_gen: An iterator of bytes. If not given, then you have to specify it in `kwargs`.
:param kwargs: accepts all keyword arguments of `gnes client` CLI
"""
Expand Down Expand Up @@ -418,11 +504,11 @@ def set(self, name: str, recv_from: Union[str, Tuple[str], List[str], 'Service']
recv_from = op_flow._parse_service_endpoints(op_flow, name, recv_from, connect_to_last_service=True)

if clear_old_attr:
node['incomes'] = recv_from
# remove all edges point to this service
for n in op_flow._service_nodes.values():
if name in n['outgoings']:
n['outgoings'].remove(name)
node['incomes'] = recv_from
else:
node['incomes'] = node['incomes'].union(recv_from)

Expand All @@ -432,12 +518,12 @@ def set(self, name: str, recv_from: Union[str, Tuple[str], List[str], 'Service']

if send_to:
send_to = op_flow._parse_service_endpoints(op_flow, name, send_to, connect_to_last_service=False)
node['outgoings'] = send_to
if clear_old_attr:
# remove all edges this service point to
for n in op_flow._service_nodes.values():
if name in n['incomes']:
n['incomes'].remove(name)
node['outgoings'] = send_to
else:
node['outgoings'] = node['outgoings'].union(send_to)

Expand Down Expand Up @@ -576,7 +662,8 @@ def add(self, service: Union['Service', str],
return op_flow

@staticmethod
def _parse_service_endpoints(op_flow, cur_service_name, service_endpoint, connect_to_last_service=False):
def _parse_service_endpoints(op_flow, cur_service_name, service_endpoint, connect_to_last_service=False,
check_name_exist=True):
# parsing recv_from
if isinstance(service_endpoint, str):
service_endpoint = [service_endpoint]
Expand All @@ -592,7 +679,8 @@ def _parse_service_endpoints(op_flow, cur_service_name, service_endpoint, connec
if s == cur_service_name:
raise FlowTopologyError('the income of a service can not be itself')
if s not in op_flow._service_nodes:
raise FlowMissingNode('recv_from: %s can not be found in this Flow' % s)
if check_name_exist:
raise FlowMissingNode('recv_from: %s can not be found in this Flow' % s)
else:
raise ValueError('recv_from=%s is not parsable' % service_endpoint)
return set(service_endpoint)
Expand Down

0 comments on commit 7865b45

Please sign in to comment.