Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

Commit

Permalink
Merge pull request #277 from gnes-ai/add-block-router
Browse files Browse the repository at this point in the history
feat(router): add a block router for benchmarking
  • Loading branch information
Han Xiao authored Sep 23, 2019
2 parents ea69135 + 2e326af commit 0492512
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 21 deletions.
13 changes: 7 additions & 6 deletions Dockerfiles/alpine.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@ LABEL maintainer="[email protected]" \
org.label-schema.build-date=$BUILD_DATE \
org.label-schema.name="GNES is Generic Nerual Elastic Search"

RUN apk add --no-cache \
--virtual=.build-dependencies \
build-base g++ gfortran file binutils zeromq-dev \
musl-dev python3-dev py-pgen cython openblas-dev && \
apk add --no-cache libstdc++ openblas libzmq

WORKDIR /gnes/

ADD setup.py MANIFEST.in requirements.txt README.md ./
ADD gnes ./gnes/

RUN apk add --no-cache \
--virtual=.build-dependencies \
build-base g++ gfortran file binutils zeromq-dev \
musl-dev python3-dev py-pgen cython openblas-dev && \
apk add --no-cache libstdc++ openblas libzmq && \
ln -s locale.h /usr/include/xlocale.h && \
RUN ln -s locale.h /usr/include/xlocale.h && \
pip install . --no-cache-dir --compile && \
find /usr/lib/python3.7/ -name 'tests' -exec rm -r '{}' + && \
find /usr/lib/python3.7/site-packages/ -name '*.so' -print -exec sh -c 'file "{}" | grep -q "not stripped" && strip -s "{}"' \; && \
Expand Down
1 change: 0 additions & 1 deletion gnes/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ def set_composer_flask_parser(parser=None):
def set_service_parser(parser=None):
from ..service.base import SocketType, BaseService, ParallelType
import random
import uuid
if not parser:
parser = set_base_parser()
min_port, max_port = 49152, 65536
Expand Down
16 changes: 8 additions & 8 deletions gnes/client/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,30 +84,30 @@ def __init__(self, bar_len: int = 20, task_name: str = ''):
self.task_name = task_name

def update(self):
sys.stdout.write('\r')
self.num_bars += 1
if (self.num_bars + 1) % self.bar_len == 0:
sys.stdout.write('\n')
elapsed = time.perf_counter() - self.start_time
elapsed_str = colored('elapsed', 'yellow')
speed_str = colored('speed', 'yellow')
self.num_bars += 1
if self.num_bars > self.bar_len:
self.num_bars -= self.bar_len
sys.stdout.write('\n')

sys.stdout.write(
'{:>10} [{:<{}}] {:>8}: {:3.1f}s {:>8}: {:3.1f} batch/s'.format(
colored(self.task_name, 'cyan'),
colored('=' * self.num_bars, 'green'),
colored('=' * int(self.num_bars % self.bar_len), 'green'),
self.bar_len + 9,
elapsed_str,
elapsed,
speed_str,
self.num_bars / elapsed,
))

sys.stdout.flush()
sys.stdout.write('\r')

def __enter__(self):
self.start_time = time.perf_counter()
self.num_bars = -1
sys.stdout.write('\n')
self.num_bars = 0
self.update()
return self

Expand Down
6 changes: 3 additions & 3 deletions gnes/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,12 +586,12 @@ def _path_import(absolute_path):
return module, spec


def make_route_table(routes, exclude_frontend: bool = False):
def make_route_table(routes, exclude_frontend: bool = False, jitter: float = 1e-8):
route_time = []
if exclude_frontend:
total_duration = get_duration(routes[0].start_time, routes[0].end_time)
total_duration = get_duration(routes[0].start_time, routes[0].end_time) + jitter
else:
total_duration = get_duration(routes[0].start_time, routes[-1].end_time)
total_duration = get_duration(routes[0].start_time, routes[-1].end_time) + jitter
sum_duration = 0
for k in routes[(1 if exclude_frontend else 0):]:
d = get_duration(k.start_time, k.end_time)
Expand Down
15 changes: 15 additions & 0 deletions gnes/router/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,22 @@
from ..proto import gnes_pb2


class BlockRouter(BaseMapRouter):
"""Wait for 'sleep_sec' seconds and forward messages, useful for benchmark"""

def __init__(self, sleep_sec: int = 5, *args, **kwargs):
super().__init__(*args, **kwargs)
self.sleep_sec = sleep_sec

def apply(self, msg: 'gnes_pb2.Message', *args, **kwargs):
import time
time.sleep(self.sleep_sec)


class PublishRouter(BaseMapRouter):
"""Copy a message 'num_part' time and forward it, useful for PUB-SUB sockets.
'num_part' is an indicator for downstream sync-barrier, e.g. a ReduceRouter
"""

def __init__(self, num_part: int, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down
11 changes: 8 additions & 3 deletions gnes/service/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,12 @@ def call_hooks(self, msg: 'gnes_pb2.Message', hook_type: Union[str, Tuple[str]],

for fn, only_verbose in hooks:
if (only_verbose and self.service_context.args.verbose) or (not only_verbose):
fn(self.service_context, msg, *args, **kwargs)
try:
fn(self.service_context, msg, *args, **kwargs)
except Exception as ex:
self.logger.warning('hook %s throws an exception, '
'this wont affect the server but you may want to pay attention' % fn)
self.logger.error(ex, exc_info=True)

def call_routes(self, msg: 'gnes_pb2.Message'):
def get_default_fn(m_type):
Expand Down Expand Up @@ -433,8 +438,8 @@ def _run(self, ctx):
self.logger.info('break from the event loop')
except ComponentNotLoad:
self.logger.error('component can not be correctly loaded, terminated')
except Exception as e:
self.logger.error("exception occured: %s" % str(e), exc_info=True)
except Exception as ex:
self.logger.error('unknown exception: %s' % str(ex), exc_info=True)
finally:
self.is_ready.set()
self.is_event_loop.clear()
Expand Down

0 comments on commit 0492512

Please sign in to comment.