diff --git a/gnes/flow/__init__.py b/gnes/flow/__init__.py index 98764ccb..fdabf87b 100644 --- a/gnes/flow/__init__.py +++ b/gnes/flow/__init__.py @@ -30,13 +30,15 @@ class Flow(TrainableBase): flow.index() ... - You can also use the shortcuts, e.g. :py:meth:`add_encoder`, :py:meth:`add_preprocessor`. + You can also use `add('Encoder', ...)` or `add(Service.Encoder, ...)` to add service to the flow. + The generic :py:meth:`add` provides a convenient way to build the flow. - It is recommend to use flow in the context manner as showed above. + As shown above, it is recommend to use flow in the context manner as showed above, + as it manages all opened sockets/processes/threads automatically when exit from the context. - Note the different default copy behaviors in :py:meth:`.add()` and :py:meth:`.build()`: + Note the different copy behaviors in :py:meth:`.add()` and :py:meth:`.build()`: :py:meth:`.add()` always copy the flow by default, whereas :py:meth:`.build()` modify the flow in place. - You can change this behavior by giving an argument `copy_flow=False`. + You can change this behavior by specifying th argument `copy_flow=False`. """ @@ -299,15 +301,70 @@ def train(self, bytes_gen: Iterator[bytes] = None, **kwargs): It will start a :py:class:`CLIClient` and call :py:func:`train`. + Example, + + .. highlight:: python + .. code-block:: python + + with f.build(backend='thread') as flow: + flow.train(txt_file='aa.txt') + flow.train(image_zip_file='aa.zip', batch_size=64) + flow.train(video_zip_file='aa.zip') + ... + + + This will call the pre-built reader to read files into an iterator of bytes and feed to the flow. + + One may also build a reader/generator on your own. + + Example, + + .. highlight:: python + .. code-block:: python + + def my_reader(): + for _ in range(10): + yield b'abcdfeg' # each yield generates a document for training + + with f.build(backend='thread') as flow: + flow.train(bytes_gen=my_reader()) + :param bytes_gen: An iterator of bytes. If not given, then you have to specify it in `kwargs`. :param kwargs: accepts all keyword arguments of `gnes client` CLI """ self._get_client(bytes_gen, mode='train', **kwargs).start() - def index(self, bytes_gen: Iterator[bytes] = None, **kwargs): """Do indexing on the current flow + Example, + + .. highlight:: python + .. code-block:: python + + with f.build(backend='thread') as flow: + flow.index(txt_file='aa.txt') + flow.index(image_zip_file='aa.zip', batch_size=64) + flow.index(video_zip_file='aa.zip') + ... + + + This will call the pre-built reader to read files into an iterator of bytes and feed to the flow. + + One may also build a reader/generator on your own. + + Example, + + .. highlight:: python + .. code-block:: python + + def my_reader(): + for _ in range(10): + yield b'abcdfeg' # each yield generates a document to index + + with f.build(backend='thread') as flow: + flow.index(bytes_gen=my_reader()) + It will start a :py:class:`CLIClient` and call :py:func:`index`. :param bytes_gen: An iterator of bytes. If not given, then you have to specify it in `kwargs`. @@ -320,6 +377,35 @@ def query(self, bytes_gen: Iterator[bytes] = None, **kwargs): It will start a :py:class:`CLIClient` and call :py:func:`query`. + + Example, + + .. highlight:: python + .. code-block:: python + + with f.build(backend='thread') as flow: + flow.query(txt_file='aa.txt') + flow.query(image_zip_file='aa.zip', batch_size=64) + flow.query(video_zip_file='aa.zip') + ... + + + This will call the pre-built reader to read files into an iterator of bytes and feed to the flow. + + One may also build a reader/generator on your own. + + Example, + + .. highlight:: python + .. code-block:: python + + def my_reader(): + for _ in range(10): + yield b'abcdfeg' # each yield generates a query for searching + + with f.build(backend='thread') as flow: + flow.query(bytes_gen=my_reader()) + :param bytes_gen: An iterator of bytes. If not given, then you have to specify it in `kwargs`. :param kwargs: accepts all keyword arguments of `gnes client` CLI """ @@ -418,11 +504,11 @@ def set(self, name: str, recv_from: Union[str, Tuple[str], List[str], 'Service'] recv_from = op_flow._parse_service_endpoints(op_flow, name, recv_from, connect_to_last_service=True) if clear_old_attr: - node['incomes'] = recv_from # remove all edges point to this service for n in op_flow._service_nodes.values(): if name in n['outgoings']: n['outgoings'].remove(name) + node['incomes'] = recv_from else: node['incomes'] = node['incomes'].union(recv_from) @@ -432,12 +518,12 @@ def set(self, name: str, recv_from: Union[str, Tuple[str], List[str], 'Service'] if send_to: send_to = op_flow._parse_service_endpoints(op_flow, name, send_to, connect_to_last_service=False) - node['outgoings'] = send_to if clear_old_attr: # remove all edges this service point to for n in op_flow._service_nodes.values(): if name in n['incomes']: n['incomes'].remove(name) + node['outgoings'] = send_to else: node['outgoings'] = node['outgoings'].union(send_to) @@ -576,7 +662,8 @@ def add(self, service: Union['Service', str], return op_flow @staticmethod - def _parse_service_endpoints(op_flow, cur_service_name, service_endpoint, connect_to_last_service=False): + def _parse_service_endpoints(op_flow, cur_service_name, service_endpoint, connect_to_last_service=False, + check_name_exist=True): # parsing recv_from if isinstance(service_endpoint, str): service_endpoint = [service_endpoint] @@ -592,7 +679,8 @@ def _parse_service_endpoints(op_flow, cur_service_name, service_endpoint, connec if s == cur_service_name: raise FlowTopologyError('the income of a service can not be itself') if s not in op_flow._service_nodes: - raise FlowMissingNode('recv_from: %s can not be found in this Flow' % s) + if check_name_exist: + raise FlowMissingNode('recv_from: %s can not be found in this Flow' % s) else: raise ValueError('recv_from=%s is not parsable' % service_endpoint) return set(service_endpoint)