Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a decoratator for monitoring method #4737

Merged
merged 12 commits into from
May 9, 2022
2 changes: 1 addition & 1 deletion jina/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def _set_nofile(nofile_atleast=4096):

# Executor
from jina.serve.executors import BaseExecutor as Executor
from jina.serve.executors.decorators import requests
from jina.serve.executors.decorators import monitor, requests

__all__ = [_s for _s in dir() if not _s.startswith('_')]
__all__.extend(_names_with_underscore)
31 changes: 30 additions & 1 deletion jina/serve/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
from jina.serve.executors.decorators import requests, store_init_kwargs, wrap_func

if TYPE_CHECKING:
from jina import DocumentArray
from prometheus_client import Summary

from docarray import DocumentArray

__all__ = ['BaseExecutor', 'ReducerExecutor']

Expand Down Expand Up @@ -132,8 +133,11 @@ def _init_monitoring(self):
namespace='jina',
labelnames=('executor', 'endpoint'),
)
self._metrics_buffer = {'process_request_seconds': self._summary_method}

else:
self._summary_method = None
self._metrics_buffer = None

def _add_requests(self, _requests: Optional[Dict]):
if not hasattr(self, 'requests'):
Expand Down Expand Up @@ -473,6 +477,31 @@ def to_docker_compose_yaml(
== BaseExecutor.StandaloneExecutorType.EXTERNAL,
)

def get_metrics(
self, name: Optional[str] = None, documentation: Optional[str] = None
) -> Optional['Summary']:
"""
Get a given prometheus metric, if it does not exist yet, it will create it and store it in a buffer.
:param name: the name of the metrics
:param documentation: the description of the metrics

:return: the given prometheus metrics or None if monitoring is not enable.
"""

if self._metrics_buffer:
if name not in self._metrics_buffer:
from prometheus_client import Summary

self._metrics_buffer[name] = Summary(
name,
documentation,
registry=self.runtime_args.metrics_registry,
namespace='jina',
)
return self._metrics_buffer[name]
else:
return None


class ReducerExecutor(BaseExecutor):
"""
Expand Down
42 changes: 42 additions & 0 deletions jina/serve/executors/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,45 @@ def __set_name__(self, owner, name):
return FunctionMapper(func)
else:
return FunctionMapper


def monitor(
*,
name: Optional[str] = None,
documentation: Optional[str] = None,
):
"""
`@monitor()` allow to monitor internal method of your executor. You can access these metrics by enabling the
monitoring on your Executor. It will track the time spend calling the function and the number of times it has been
called. Under the hood it will create a prometheus Summary : https://prometheus.io/docs/practices/histograms/.

:warning: Don't use this decorator with the @request decorator as it already handle monitoring under the hood

:param name: the name of the metrics, by default it is based on the name of the method it decorates
:param documentation: the description of the metrics, by default it is based on the name of the method it decorates

:return: decorator which takes as an input a single callable
"""

def _decorator(func: Callable):

name_ = name if name else f'{func.__name__}_seconds'
documentation_ = (
documentation
if documentation
else f'Time spent calling method {func.__name__}'
)

@functools.wraps(func)
def _f(self, *args, **kwargs):
metric = self.get_metrics(name_, documentation_)

if metric:
with metric.time():
return func(self, *args, **kwargs)
else:
return func(self, *args, **kwargs)

return _f

return _decorator
56 changes: 56 additions & 0 deletions tests/integration/monitoring/test_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import requests as req
from docarray import DocumentArray
from prometheus_client import Summary

from jina import Executor, Flow, monitor, requests


def test_prometheus_interface(port_generator):
class DummyExecutor(Executor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.summary = Summary(
'a', 'A', registry=self.runtime_args.metrics_registry
)

@requests(on='/foo')
def foo(self, docs, **kwargs):
with self.summary.time():
...

port = port_generator()
with Flow(monitoring=True, port_monitoring=port_generator()).add(
uses=DummyExecutor, monitoring=True, port_monitoring=port
) as f:
f.post('/foo', inputs=DocumentArray.empty(4))

resp = req.get(f'http://localhost:{port}/')
assert f'a_count 1.0' in str( # check that we count 4 documents on foo
resp.content
)


def test_decorator_interface(port_generator):
class DummyExecutor(Executor):
@requests(on='/foo')
def foo(self, docs, **kwargs):
self._proces(docs)
self.proces_2(docs)

@monitor(name='metrics_name', documentation='metrics description')
def _proces(self, docs):
...

@monitor()
def proces_2(self, docs):
...

port = port_generator()
with Flow(monitoring=True, port_monitoring=port_generator()).add(
uses=DummyExecutor, monitoring=True, port_monitoring=port
) as f:
f.post('/foo', inputs=DocumentArray.empty(4))

resp = req.get(f'http://localhost:{port}/')
assert f'jina_metrics_name_count 1.0' in str(resp.content)
assert f'jina_proces_2_seconds_count 1.0' in str(resp.content)
50 changes: 27 additions & 23 deletions tests/integration/monitoring/test_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,26 @@
from jina import Executor, Flow, requests


class DummyExecutor(Executor):
@requests(on='/foo')
def foo(self, docs, **kwargs):
...
@pytest.fixture()
def executor():
class DummyExecutor(Executor):
@requests(on='/foo')
def foo(self, docs, **kwargs):
...

@requests(on='/bar')
def bar(self, docs, **kwargs):
...
@requests(on='/bar')
def bar(self, docs, **kwargs):
...

return DummyExecutor

def test_enable_monitoring_deployment(port_generator):

def test_enable_monitoring_deployment(port_generator, executor):
port1 = port_generator()
port2 = port_generator()

with Flow().add(uses=DummyExecutor, port_monitoring=port1, monitoring=True).add(
uses=DummyExecutor, port_monitoring=port2, monitoring=True
with Flow().add(uses=executor, port_monitoring=port1, monitoring=True).add(
uses=executor, port_monitoring=port2, monitoring=True
) as f:
for port in [port1, port2]:
resp = req.get(f'http://localhost:{port}/')
Expand All @@ -38,14 +42,14 @@ def test_enable_monitoring_deployment(port_generator):


@pytest.mark.parametrize('protocol', ['websocket', 'grpc', 'http'])
def test_enable_monitoring_gateway(protocol, port_generator):
def test_enable_monitoring_gateway(protocol, port_generator, executor):
port0 = port_generator()
port1 = port_generator()
port2 = port_generator()

with Flow(protocol=protocol, monitoring=True, port_monitoring=port0).add(
uses=DummyExecutor, port_monitoring=port1
).add(uses=DummyExecutor, port_monitoring=port2) as f:
uses=executor, port_monitoring=port1
).add(uses=executor, port_monitoring=port2) as f:
for port in [port0, port1, port2]:
resp = req.get(f'http://localhost:{port}/')
assert resp.status_code == 200
Expand All @@ -56,13 +60,13 @@ def test_enable_monitoring_gateway(protocol, port_generator):
assert f'jina_sending_request_seconds' in str(resp.content)


def test_monitoring_head(port_generator):
def test_monitoring_head(port_generator, executor):
port1 = port_generator()
port2 = port_generator()

with Flow(monitoring=True).add(uses=DummyExecutor, port_monitoring=port1).add(
uses=DummyExecutor, port_monitoring=port2, shards=2
) as f:
with Flow(monitoring=True, port_monitoring=port_generator()).add(
uses=executor, port_monitoring=port1
).add(uses=executor, port_monitoring=port2, shards=2) as f:
port3 = f._deployment_nodes['executor0'].pod_args['pods'][0][0].port_monitoring
port4 = f._deployment_nodes['executor1'].pod_args['pods'][0][0].port_monitoring

Expand All @@ -76,12 +80,12 @@ def test_monitoring_head(port_generator):
assert f'jina_sending_request_seconds' in str(resp.content)


def test_document_processed_total(port_generator):
def test_document_processed_total(port_generator, executor):
port0 = port_generator()
port1 = port_generator()

with Flow(monitoring=True, port_monitoring=port0).add(
uses=DummyExecutor, port_monitoring=port1
uses=executor, port_monitoring=port1
) as f:

resp = req.get(f'http://localhost:{port1}/')
Expand Down Expand Up @@ -117,12 +121,12 @@ def test_document_processed_total(port_generator):
)


def test_disable_monitoring_on_pods(port_generator):
def test_disable_monitoring_on_pods(port_generator, executor):
port0 = port_generator()
port1 = port_generator()

with Flow(monitoring=True, port_monitoring=port0).add(
uses=DummyExecutor,
uses=executor,
port_monitoring=port1,
monitoring=False,
):
Expand All @@ -133,12 +137,12 @@ def test_disable_monitoring_on_pods(port_generator):
assert resp.status_code == 200


def test_disable_monitoring_on_gatway_only(port_generator):
def test_disable_monitoring_on_gatway_only(port_generator, executor):
port0 = port_generator()
port1 = port_generator()

with Flow(monitoring=False, port_monitoring=port0).add(
uses=DummyExecutor,
uses=executor,
port_monitoring=port1,
monitoring=True,
):
Expand Down
66 changes: 65 additions & 1 deletion tests/unit/serve/runtimes/worker/test_worker_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@

import grpc
import pytest

import requests as req
from docarray import Document

from jina import DocumentArray, Executor, requests
from jina.clients.request import request_generator
from jina.parsers import set_pod_parser
Expand Down Expand Up @@ -380,3 +381,66 @@ def start_runtime(args, cancel_event):

def _create_test_data_message(counter=0):
return list(request_generator('/', DocumentArray([Document(text=str(counter))])))[0]


@pytest.mark.asyncio
@pytest.mark.slow
@pytest.mark.timeout(5)
async def test_decorator_monitoring(port_generator):
from jina import monitor

class DummyExecutor(Executor):
@requests
def foo(self, docs, **kwargs):
self._proces(docs)
self.proces_2(docs)

@monitor(name='metrics_name', documentation='metrics description')
def _proces(self, docs):
...

@monitor()
def proces_2(self, docs):
...

port = port_generator()
args = set_pod_parser().parse_args(
['--monitoring', '--port-monitoring', str(port), '--uses', 'DummyExecutor']
)

cancel_event = multiprocessing.Event()

def start_runtime(args, cancel_event):
with WorkerRuntime(args, cancel_event=cancel_event) as runtime:
runtime.run_forever()

runtime_thread = Process(
target=start_runtime,
args=(args, cancel_event),
daemon=True,
)
runtime_thread.start()

assert AsyncNewLoopRuntime.wait_for_ready_or_shutdown(
timeout=5.0,
ctrl_address=f'{args.host}:{args.port}',
ready_or_shutdown_event=Event(),
)

assert AsyncNewLoopRuntime.wait_for_ready_or_shutdown(
timeout=5.0,
ctrl_address=f'{args.host}:{args.port}',
ready_or_shutdown_event=Event(),
)

result = await GrpcConnectionPool.send_request_async(
_create_test_data_message(), f'{args.host}:{args.port}', timeout=1.0
)

resp = req.get(f'http://localhost:{port}/')
assert f'jina_metrics_name_count 1.0' in str(resp.content)

cancel_event.set()
runtime_thread.join()

assert not AsyncNewLoopRuntime.is_ready(f'{args.host}:{args.port}')