Skip to content
This repository has been archived by the owner on Feb 22, 2020. It is now read-only.

feat(router): add a block router for benchmarking #277

Merged
merged 7 commits into from
Sep 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions Dockerfiles/alpine.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@ LABEL maintainer="[email protected]" \
org.label-schema.build-date=$BUILD_DATE \
org.label-schema.name="GNES is Generic Nerual Elastic Search"

RUN apk add --no-cache \
--virtual=.build-dependencies \
build-base g++ gfortran file binutils zeromq-dev \
musl-dev python3-dev py-pgen cython openblas-dev && \
apk add --no-cache libstdc++ openblas libzmq

WORKDIR /gnes/

ADD setup.py MANIFEST.in requirements.txt README.md ./
ADD gnes ./gnes/

RUN apk add --no-cache \
--virtual=.build-dependencies \
build-base g++ gfortran file binutils zeromq-dev \
musl-dev python3-dev py-pgen cython openblas-dev && \
apk add --no-cache libstdc++ openblas libzmq && \
ln -s locale.h /usr/include/xlocale.h && \
RUN ln -s locale.h /usr/include/xlocale.h && \
pip install . --no-cache-dir --compile && \
find /usr/lib/python3.7/ -name 'tests' -exec rm -r '{}' + && \
find /usr/lib/python3.7/site-packages/ -name '*.so' -print -exec sh -c 'file "{}" | grep -q "not stripped" && strip -s "{}"' \; && \
Expand Down
1 change: 0 additions & 1 deletion gnes/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ def set_composer_flask_parser(parser=None):
def set_service_parser(parser=None):
from ..service.base import SocketType, BaseService, ParallelType
import random
import uuid
if not parser:
parser = set_base_parser()
min_port, max_port = 49152, 65536
Expand Down
16 changes: 8 additions & 8 deletions gnes/client/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,30 +84,30 @@ def __init__(self, bar_len: int = 20, task_name: str = ''):
self.task_name = task_name

def update(self):
sys.stdout.write('\r')
self.num_bars += 1
if (self.num_bars + 1) % self.bar_len == 0:
sys.stdout.write('\n')
elapsed = time.perf_counter() - self.start_time
elapsed_str = colored('elapsed', 'yellow')
speed_str = colored('speed', 'yellow')
self.num_bars += 1
if self.num_bars > self.bar_len:
self.num_bars -= self.bar_len
sys.stdout.write('\n')

sys.stdout.write(
'{:>10} [{:<{}}] {:>8}: {:3.1f}s {:>8}: {:3.1f} batch/s'.format(
colored(self.task_name, 'cyan'),
colored('=' * self.num_bars, 'green'),
colored('=' * int(self.num_bars % self.bar_len), 'green'),
self.bar_len + 9,
elapsed_str,
elapsed,
speed_str,
self.num_bars / elapsed,
))

sys.stdout.flush()
sys.stdout.write('\r')

def __enter__(self):
self.start_time = time.perf_counter()
self.num_bars = -1
sys.stdout.write('\n')
self.num_bars = 0
self.update()
return self

Expand Down
6 changes: 3 additions & 3 deletions gnes/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,12 +586,12 @@ def _path_import(absolute_path):
return module, spec


def make_route_table(routes, exclude_frontend: bool = False):
def make_route_table(routes, exclude_frontend: bool = False, jitter: float = 1e-8):
route_time = []
if exclude_frontend:
total_duration = get_duration(routes[0].start_time, routes[0].end_time)
total_duration = get_duration(routes[0].start_time, routes[0].end_time) + jitter
else:
total_duration = get_duration(routes[0].start_time, routes[-1].end_time)
total_duration = get_duration(routes[0].start_time, routes[-1].end_time) + jitter
sum_duration = 0
for k in routes[(1 if exclude_frontend else 0):]:
d = get_duration(k.start_time, k.end_time)
Expand Down
15 changes: 15 additions & 0 deletions gnes/router/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,22 @@
from ..proto import gnes_pb2


class BlockRouter(BaseMapRouter):
"""Wait for 'sleep_sec' seconds and forward messages, useful for benchmark"""

def __init__(self, sleep_sec: int = 5, *args, **kwargs):
super().__init__(*args, **kwargs)
self.sleep_sec = sleep_sec

def apply(self, msg: 'gnes_pb2.Message', *args, **kwargs):
import time
time.sleep(self.sleep_sec)


class PublishRouter(BaseMapRouter):
"""Copy a message 'num_part' time and forward it, useful for PUB-SUB sockets.
'num_part' is an indicator for downstream sync-barrier, e.g. a ReduceRouter
"""

def __init__(self, num_part: int, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down
11 changes: 8 additions & 3 deletions gnes/service/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,12 @@ def call_hooks(self, msg: 'gnes_pb2.Message', hook_type: Union[str, Tuple[str]],

for fn, only_verbose in hooks:
if (only_verbose and self.service_context.args.verbose) or (not only_verbose):
fn(self.service_context, msg, *args, **kwargs)
try:
fn(self.service_context, msg, *args, **kwargs)
except Exception as ex:
self.logger.warning('hook %s throws an exception, '
'this wont affect the server but you may want to pay attention' % fn)
self.logger.error(ex, exc_info=True)

def call_routes(self, msg: 'gnes_pb2.Message'):
def get_default_fn(m_type):
Expand Down Expand Up @@ -433,8 +438,8 @@ def _run(self, ctx):
self.logger.info('break from the event loop')
except ComponentNotLoad:
self.logger.error('component can not be correctly loaded, terminated')
except Exception as e:
self.logger.error("exception occured: %s" % str(e), exc_info=True)
except Exception as ex:
self.logger.error('unknown exception: %s' % str(ex), exc_info=True)
finally:
self.is_ready.set()
self.is_event_loop.clear()
Expand Down