Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
fix(service): make service handler thread-safe
Browse files Browse the repository at this point in the history
  • Loading branch information
hanhxiao committed Oct 11, 2019
1 parent a3da058 commit 51581bf
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 16 deletions.
15 changes: 8 additions & 7 deletions gnes/service/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ def register(self, msg_type: Union[List, Tuple, type]):
def decorator(f):
if isinstance(msg_type, list) or isinstance(msg_type, tuple):
for m in msg_type:
self.routes[m] = f
self.routes[m] = f.__name__
else:
self.routes[msg_type] = f
self.routes[msg_type] = f.__name__
return f

return decorator
Expand All @@ -187,11 +187,12 @@ def register_hook(self, hook_type: Union[str, Tuple[str]], only_when_verbose: bo

def decorator(f):
if isinstance(hook_type, str) and hook_type in self.hooks:
self.hooks[hook_type].append((f, only_when_verbose))
self.hooks[hook_type].append((f.__name__, only_when_verbose))
return f
elif isinstance(hook_type, list) or isinstance(hook_type, tuple):
for h in set(hook_type):
if h in self.hooks:
self.hooks[h].append((f, only_when_verbose))
self.hooks[h].append((f.__name__, only_when_verbose))
else:
raise AttributeError('hook type: %s is not supported' % h)
return f
Expand Down Expand Up @@ -222,7 +223,7 @@ def call_hooks(self, msg: 'gnes_pb2.Message', hook_type: Union[str, Tuple[str]],
for fn, only_verbose in hooks:
if (only_verbose and self.service_context.args.verbose) or (not only_verbose):
try:
fn(self.service_context, msg, *args, **kwargs)
fn(msg, *args, **kwargs)
except Exception as ex:
self.logger.warning('hook %s throws an exception, '
'this wont affect the server but you may want to pay attention' % fn)
Expand All @@ -249,7 +250,7 @@ def get_default_fn(m_type):
fn = get_default_fn(type(msg))

self.logger.info('handling message with %s' % fn.__name__)
return fn(self.service_context, msg)
return fn(msg)

def call_routes_send_back(self, msg: 'gnes_pb2.Message', out_sock):
try:
Expand Down Expand Up @@ -334,7 +335,7 @@ def __init__(self, args):
check_version=self.args.check_version,
timeout=self.args.timeout,
squeeze_pb=self.args.squeeze_pb)
# self._override_handler()
self._override_handler()

def _override_handler(self):
# replace the function name by the function itself
Expand Down
2 changes: 1 addition & 1 deletion gnes/service/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def post_init(self):
def _handler_index(self, msg: 'gnes_pb2.Message'):
# print('tid: %s, model: %r, self._tmp_a: %r' % (threading.get_ident(), self._model, self._tmp_a))
# if self._tmp_a != threading.get_ident():
# print('tid: %s, tmp_a: %r !!! %r' % (threading.get_ident(), self._tmp_a, self._handler_index))
# print('!!! tid: %s, tmp_a: %r %r' % (threading.get_ident(), self._tmp_a, self._handler_index))
from ..indexer.base import BaseChunkIndexer, BaseDocIndexer
if isinstance(self._model, BaseChunkIndexer):
self._handler_chunk_index(msg)
Expand Down
19 changes: 11 additions & 8 deletions tests/test_gnes_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def test_flow5(self):
print(f._service_edges)
print(f.to_mermaid())

def _test_index_flow(self):
def _test_index_flow(self, backend):
for k in [self.indexer1_bin, self.indexer2_bin, self.encoder_bin]:
self.assertFalse(os.path.exists(k))

Expand All @@ -127,25 +127,28 @@ def _test_index_flow(self):
.add(gfs.Router, name='sync_barrier', yaml_path='BaseReduceRouter',
num_part=2, service_in=['vec_idx', 'doc_idx']))

with flow.build(backend='process') as f:
with flow.build(backend=backend) as f:
f.index(txt_file=self.test_file, batch_size=20)

for k in [self.indexer1_bin, self.indexer2_bin]:
self.assertTrue(os.path.exists(k))

def _test_query_flow(self):
def _test_query_flow(self, backend):
flow = (Flow(check_version=False, route_table=False)
.add(gfs.Preprocessor, name='prep', yaml_path='SentSplitPreprocessor')
.add(gfs.Encoder, yaml_path=os.path.join(self.dirname, 'yaml/flow-transformer.yml'))
.add(gfs.Indexer, name='vec_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-vecindex.yml'))
.add(gfs.Router, name='scorer', yaml_path=os.path.join(self.dirname, 'yaml/flow-score.yml'))
.add(gfs.Indexer, name='doc_idx', yaml_path=os.path.join(self.dirname, 'yaml/flow-dictindex.yml')))

with flow.build(backend='process') as f, open(self.test_file, encoding='utf8') as fp:
f.query(bytes_gen=[v.encode() for v in fp][:10])
with flow.build(backend=backend) as f, open(self.test_file, encoding='utf8') as fp:
f.query(bytes_gen=[v.encode() for v in fp][:3])

# @unittest.SkipTest
def test_index_query_flow(self):
self._test_index_flow()
print('indexing finished')
self._test_query_flow()
self._test_index_flow('thread')
self._test_query_flow('thread')

def test_indexe_query_flow_proc(self):
self._test_index_flow('process')
self._test_query_flow('process')

0 comments on commit 51581bf

Please sign in to comment.