From 29ee7cc7327f8f19fa59455b86b0aad0f81149c6 Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Thu, 10 Oct 2019 17:57:29 +0800 Subject: [PATCH 1/4] fix(base): fix env expansion in gnes_config --- gnes/base/__init__.py | 6 ++++++ gnes/flow/__init__.py | 6 +++--- tests/test_gnes_flow.py | 8 ++++---- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/gnes/base/__init__.py b/gnes/base/__init__.py index b930fe84..3c253294 100644 --- a/gnes/base/__init__.py +++ b/gnes/base/__init__.py @@ -336,6 +336,12 @@ def _get_instance_from_yaml(cls, constructor, node, stop_on_import_error=False): data = ruamel.yaml.constructor.SafeConstructor.construct_mapping( constructor, node, deep=True) + _gnes_config = data.get('gnes_config', {}) + for k, v in _gnes_config.items(): + _gnes_config[k] = _expand_env_var(v) + if _gnes_config: + data['gnes_config'] = _gnes_config + dump_path = cls._get_dump_path_from_config(data.get('gnes_config', {})) load_from_dump = False if dump_path: diff --git a/gnes/flow/__init__.py b/gnes/flow/__init__.py index b9c142d6..27b3d480 100644 --- a/gnes/flow/__init__.py +++ b/gnes/flow/__init__.py @@ -430,7 +430,7 @@ def build(self, backend: Optional[str] = 'thread', copy_flow: bool = False, *arg # for thread and process backend which runs locally, host_in and host_out should not be set p_args.host_in = BaseService.default_host p_args.host_out = BaseService.default_host - op_flow._service_contexts.append(Flow._service2builder[v['service']](p_args)) + op_flow._service_contexts.append((Flow._service2builder[v['service']], p_args)) op_flow._build_level = Flow.BuildLevel.RUNTIME else: raise NotImplementedError('backend=%s is not supported yet' % backend) @@ -447,9 +447,9 @@ def __enter__(self): 'build the flow now via build() with default parameters' % self._build_level) self.build(copy_flow=False) self._service_stack = ExitStack() + for k, v in self._service_contexts: + self._service_stack.enter_context(k(v)) - for k in self._service_contexts: - self._service_stack.enter_context(k) self.logger.critical('flow is built and ready, current build level is %s' % self._build_level) return self diff --git a/tests/test_gnes_flow.py b/tests/test_gnes_flow.py index 2a6edd72..1779bc97 100644 --- a/tests/test_gnes_flow.py +++ b/tests/test_gnes_flow.py @@ -127,7 +127,7 @@ def _test_index_flow(self): .add(gfs.Router, name='sync_barrier', yaml_path='BaseReduceRouter', num_part=2, service_in=['vec_idx', 'doc_idx'])) - with flow.build(backend='thread') as f: + with flow.build(backend='process') as f: f.index(txt_file=self.test_file, batch_size=20) for k in [self.indexer1_bin, self.indexer2_bin]: @@ -141,10 +141,10 @@ def _test_query_flow(self): .add(gfs.Router, name='scorer', yaml_path='yaml/flow-score.yml') .add(gfs.Indexer, name='doc_idx', yaml_path='yaml/flow-dictindex.yml')) - with flow.build(backend='thread') as f: - f.query(txt_file=self.test_file) + with flow.build(backend='process') as f, open(self.test_file, encoding='utf8') as fp: + f.query(bytes_gen=[v.encode() for v in fp][:10]) - @unittest.SkipTest + # @unittest.SkipTest def test_index_query_flow(self): self._test_index_flow() print('indexing finished') From 8c069303bb05642fb35968c7c1ba4e68b61c958f Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Thu, 10 Oct 2019 18:02:00 +0800 Subject: [PATCH 2/4] fix(base): fix env expansion in gnes_config --- gnes/service/indexer.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/gnes/service/indexer.py b/gnes/service/indexer.py index 9e3ab046..33037f88 100644 --- a/gnes/service/indexer.py +++ b/gnes/service/indexer.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import threading import numpy as np @@ -27,7 +26,7 @@ def post_init(self): from ..indexer.base import BaseIndexer # print('id: %s, before: %r' % (threading.get_ident(), self._model)) self._model = self.load_model(BaseIndexer) - self._tmp_a = threading.get_ident() + # self._tmp_a = threading.get_ident() # print('id: %s, after: %r, self._tmp_a: %r' % (threading.get_ident(), self._model, self._tmp_a)) @handler.register(gnes_pb2.Request.IndexRequest) From a7a786253e9e8eaac33686ca9558b6bc8ae68f12 Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Thu, 10 Oct 2019 18:28:53 +0800 Subject: [PATCH 3/4] fix(base): fix env expansion in gnes_config --- tests/test_gnes_flow.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/test_gnes_flow.py b/tests/test_gnes_flow.py index 1779bc97..b4c68711 100644 --- a/tests/test_gnes_flow.py +++ b/tests/test_gnes_flow.py @@ -120,9 +120,9 @@ def _test_index_flow(self): flow = (Flow(check_version=False, route_table=False) .add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor') - .add(gfs.Encoder, yaml_path='yaml/flow-transformer.yml') - .add(gfs.Indexer, name='vec_idx', yaml_path='yaml/flow-vecindex.yml') - .add(gfs.Indexer, name='doc_idx', yaml_path='yaml/flow-dictindex.yml', + .add(gfs.Encoder, yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml')) + .add(gfs.Indexer, name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml')) + .add(gfs.Indexer, name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml'), service_in='prep') .add(gfs.Router, name='sync_barrier', yaml_path='BaseReduceRouter', num_part=2, service_in=['vec_idx', 'doc_idx'])) @@ -136,10 +136,10 @@ def _test_index_flow(self): def _test_query_flow(self): flow = (Flow(check_version=False, route_table=False) .add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor') - .add(gfs.Encoder, yaml_path='yaml/flow-transformer.yml') - .add(gfs.Indexer, name='vec_idx', yaml_path='yaml/flow-vecindex.yml') - .add(gfs.Router, name='scorer', yaml_path='yaml/flow-score.yml') - .add(gfs.Indexer, name='doc_idx', yaml_path='yaml/flow-dictindex.yml')) + .add(gfs.Encoder, yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml')) + .add(gfs.Indexer, name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml')) + .add(gfs.Router, name='scorer', yaml_path=os.path.join(self.dirname, 'yaml/flow-score.yml')) + .add(gfs.Indexer, name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml'))) with flow.build(backend='process') as f, open(self.test_file, encoding='utf8') as fp: f.query(bytes_gen=[v.encode() for v in fp][:10]) From bca5b5b7fdc99f0ca666a3b0bf4bc77ae732c19c Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Thu, 10 Oct 2019 18:46:38 +0800 Subject: [PATCH 4/4] fix(base): fix env expansion in gnes_config --- tests/test_gnes_flow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_gnes_flow.py b/tests/test_gnes_flow.py index b4c68711..a5ec5f3f 100644 --- a/tests/test_gnes_flow.py +++ b/tests/test_gnes_flow.py @@ -144,7 +144,7 @@ def _test_query_flow(self): with flow.build(backend='process') as f, open(self.test_file, encoding='utf8') as fp: f.query(bytes_gen=[v.encode() for v in fp][:10]) - # @unittest.SkipTest + @unittest.SkipTest def test_index_query_flow(self): self._test_index_flow() print('indexing finished')