diff --git a/CHANGES/6316.misc b/CHANGES/6316.misc new file mode 100644 index 00000000000..fa8519e12d3 --- /dev/null +++ b/CHANGES/6316.misc @@ -0,0 +1 @@ +Configure timeout value ceil threshold at the application level. diff --git a/CONTRIBUTORS.txt b/CONTRIBUTORS.txt index 61c1f049cea..96fda3f654d 100644 --- a/CONTRIBUTORS.txt +++ b/CONTRIBUTORS.txt @@ -152,6 +152,7 @@ Jakob Ackermann Jakub Wilk Jan Buchar Jashandeep Sohi +Jean-Baptiste Estival Jens Steinhauser Jeonghun Lee Jeongkyu Shin diff --git a/aiohttp/client.py b/aiohttp/client.py index 42e2fde7781..a273b2c4556 100644 --- a/aiohttp/client.py +++ b/aiohttp/client.py @@ -141,6 +141,7 @@ class ClientTimeout: connect: Optional[float] = None sock_read: Optional[float] = None sock_connect: Optional[float] = None + ceil_threshold: float = 5 # pool_queue_timeout: Optional[float] = None # dns_resolution_timeout: Optional[float] = None @@ -445,7 +446,9 @@ async def _request( real_timeout = timeout # timeout is cumulative for all request operations # (request, redirects, responses, data consuming) - tm = TimeoutHandle(self._loop, real_timeout.total) + tm = TimeoutHandle( + self._loop, real_timeout.total, ceil_threshold=real_timeout.ceil_threshold + ) handle = tm.start() if read_bufsize is None: @@ -532,7 +535,10 @@ async def _request( # connection timeout try: - async with ceil_timeout(real_timeout.connect): + async with ceil_timeout( + real_timeout.connect, + ceil_threshold=real_timeout.ceil_threshold, + ): assert self._connector is not None conn = await self._connector.connect( req, traces=traces, timeout=real_timeout @@ -552,6 +558,7 @@ async def _request( auto_decompress=self._auto_decompress, read_timeout=real_timeout.sock_read, read_bufsize=read_bufsize, + timeout_ceil_threshold=self._connector._timeout_ceil_threshold, ) try: diff --git a/aiohttp/client_proto.py b/aiohttp/client_proto.py index f36863b8363..92af3b737c6 100644 --- a/aiohttp/client_proto.py +++ b/aiohttp/client_proto.py @@ -36,6 +36,10 @@ def __init__(self, loop: asyncio.AbstractEventLoop) -> None: self._read_timeout = None # type: Optional[float] self._read_timeout_handle = None # type: Optional[asyncio.TimerHandle] + self._timeout_ceil_threshold = 5 # type: Optional[float] + + self.closed = self._loop.create_future() # type: asyncio.Future[None] + @property def upgraded(self) -> bool: return self._upgraded @@ -143,12 +147,15 @@ def set_response_params( auto_decompress: bool = True, read_timeout: Optional[float] = None, read_bufsize: int = 2 ** 16, + timeout_ceil_threshold: float = 5, ) -> None: self._skip_payload = skip_payload self._read_timeout = read_timeout self._reschedule_timeout() + self._timeout_ceil_threshold = timeout_ceil_threshold + self._parser = HttpResponseParser( self, self._loop, diff --git a/aiohttp/client_ws.py b/aiohttp/client_ws.py index 7c8121f659f..c949cfc088e 100644 --- a/aiohttp/client_ws.py +++ b/aiohttp/client_ws.py @@ -83,7 +83,12 @@ def _reset_heartbeat(self) -> None: if self._heartbeat is not None: self._heartbeat_cb = call_later( - self._send_heartbeat, self._heartbeat, self._loop + self._send_heartbeat, + self._heartbeat, + self._loop, + timeout_ceil_threshold=self._conn._connector._timeout_ceil_threshold + if self._conn is not None + else 5, ) def _send_heartbeat(self) -> None: @@ -96,7 +101,12 @@ def _send_heartbeat(self) -> None: if self._pong_response_cb is not None: self._pong_response_cb.cancel() self._pong_response_cb = call_later( - self._pong_not_received, self._pong_heartbeat, self._loop + self._pong_not_received, + self._pong_heartbeat, + self._loop, + timeout_ceil_threshold=self._conn._connector._timeout_ceil_threshold + if self._conn is not None + else 5, ) def _pong_not_received(self) -> None: diff --git a/aiohttp/connector.py b/aiohttp/connector.py index d98176f83ea..ce113adb3f9 100644 --- a/aiohttp/connector.py +++ b/aiohttp/connector.py @@ -202,6 +202,8 @@ class BaseConnector: limit_per_host - Number of simultaneous connections to one host. enable_cleanup_closed - Enables clean-up closed ssl transports. Disabled by default. + timeout_ceil_threshold - Trigger ceiling of timeout values when + it's above timeout_ceil_threshold. loop - Optional event loop. """ @@ -220,6 +222,7 @@ def __init__( limit_per_host: int = 0, enable_cleanup_closed: bool = False, loop: Optional[asyncio.AbstractEventLoop] = None, + timeout_ceil_threshold: float = 5, ) -> None: if force_close: @@ -232,6 +235,7 @@ def __init__( keepalive_timeout = 15.0 loop = get_running_loop(loop) + self._timeout_ceil_threshold = timeout_ceil_threshold self._closed = False if loop.get_debug(): @@ -373,7 +377,11 @@ def _cleanup(self) -> None: if self._conns: self._cleanup_handle = helpers.weakref_handle( - self, "_cleanup", timeout, self._loop + self, + "_cleanup", + timeout, + self._loop, + timeout_ceil_threshold=self._timeout_ceil_threshold, ) def _drop_acquired_per_host( @@ -403,7 +411,11 @@ def _cleanup_closed(self) -> None: if not self._cleanup_closed_disabled: self._cleanup_closed_handle = helpers.weakref_handle( - self, "_cleanup_closed", self._cleanup_closed_period, self._loop + self, + "_cleanup_closed", + self._cleanup_closed_period, + self._loop, + timeout_ceil_threshold=self._timeout_ceil_threshold, ) def close(self) -> Awaitable[None]: @@ -674,7 +686,11 @@ def _release( if self._cleanup_handle is None: self._cleanup_handle = helpers.weakref_handle( - self, "_cleanup", self._keepalive_timeout, self._loop + self, + "_cleanup", + self._keepalive_timeout, + self._loop, + timeout_ceil_threshold=self._timeout_ceil_threshold, ) async def _create_connection( @@ -767,6 +783,7 @@ def __init__( limit_per_host: int = 0, enable_cleanup_closed: bool = False, loop: Optional[asyncio.AbstractEventLoop] = None, + timeout_ceil_threshold: float = 5, ): super().__init__( keepalive_timeout=keepalive_timeout, @@ -775,6 +792,7 @@ def __init__( limit_per_host=limit_per_host, enable_cleanup_closed=enable_cleanup_closed, loop=loop, + timeout_ceil_threshold=timeout_ceil_threshold, ) self._ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint) @@ -982,7 +1000,9 @@ async def _wrap_create_connection( **kwargs: Any, ) -> Tuple[asyncio.Transport, ResponseHandler]: try: - async with ceil_timeout(timeout.sock_connect): + async with ceil_timeout( + timeout.sock_connect, ceil_threshold=timeout.ceil_threshold + ): return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa except cert_errors as exc: raise ClientConnectorCertificateError(req.connection_key, exc) from exc @@ -1090,7 +1110,9 @@ async def _start_tls_connection( sslcontext = cast(ssl.SSLContext, self._get_ssl_context(req)) try: - async with ceil_timeout(timeout.sock_connect): + async with ceil_timeout( + timeout.sock_connect, ceil_threshold=timeout.ceil_threshold + ): try: tls_transport = await self._loop.start_tls( underlying_transport, @@ -1271,7 +1293,10 @@ async def _create_proxy_connection( # read_until_eof=True will ensure the connection isn't closed # once the response is received and processed allowing # START_TLS to work on the connection below. - protocol.set_response_params(read_until_eof=runtime_has_start_tls) + protocol.set_response_params( + read_until_eof=runtime_has_start_tls, + timeout_ceil_threshold=self._timeout_ceil_threshold, + ) resp = await proxy_resp.start(conn) except BaseException: proxy_resp.close() @@ -1374,7 +1399,9 @@ async def _create_connection( self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" ) -> ResponseHandler: try: - async with ceil_timeout(timeout.sock_connect): + async with ceil_timeout( + timeout.sock_connect, ceil_threshold=timeout.ceil_threshold + ): _, proto = await self._loop.create_unix_connection( self._factory, self._path ) @@ -1432,7 +1459,9 @@ async def _create_connection( self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" ) -> ResponseHandler: try: - async with ceil_timeout(timeout.sock_connect): + async with ceil_timeout( + timeout.sock_connect, ceil_threshold=timeout.ceil_threshold + ): _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined] # noqa: E501 self._factory, self._path ) diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index 28b0c77b1f2..0975ccf76fb 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -595,11 +595,15 @@ def _weakref_handle(info: "Tuple[weakref.ref[object], str]") -> None: def weakref_handle( - ob: object, name: str, timeout: float, loop: asyncio.AbstractEventLoop + ob: object, + name: str, + timeout: float, + loop: asyncio.AbstractEventLoop, + timeout_ceil_threshold: float = 5, ) -> Optional[asyncio.TimerHandle]: if timeout is not None and timeout > 0: when = loop.time() + timeout - if timeout >= 5: + if timeout >= timeout_ceil_threshold: when = ceil(when) return loop.call_at(when, _weakref_handle, (weakref.ref(ob), name)) @@ -607,11 +611,14 @@ def weakref_handle( def call_later( - cb: Callable[[], Any], timeout: float, loop: asyncio.AbstractEventLoop + cb: Callable[[], Any], + timeout: float, + loop: asyncio.AbstractEventLoop, + timeout_ceil_threshold: float = 5, ) -> Optional[asyncio.TimerHandle]: if timeout is not None and timeout > 0: when = loop.time() + timeout - if timeout > 5: + if timeout > timeout_ceil_threshold: when = ceil(when) return loop.call_at(when, cb) return None @@ -621,10 +628,14 @@ class TimeoutHandle: """Timeout handle""" def __init__( - self, loop: asyncio.AbstractEventLoop, timeout: Optional[float] + self, + loop: asyncio.AbstractEventLoop, + timeout: Optional[float], + ceil_threshold: float = 5, ) -> None: self._timeout = timeout self._loop = loop + self._ceil_threshold = ceil_threshold self._callbacks = ( [] ) # type: List[Tuple[Callable[..., None], Tuple[Any, ...], Dict[str, Any]]] @@ -641,7 +652,7 @@ def start(self) -> Optional[asyncio.Handle]: timeout = self._timeout if timeout is not None and timeout > 0: when = self._loop.time() + timeout - if timeout >= 5: + if timeout >= self._ceil_threshold: when = ceil(when) return self._loop.call_at(when, self.__call__) else: @@ -723,14 +734,16 @@ def timeout(self) -> None: self._cancelled = True -def ceil_timeout(delay: Optional[float]) -> async_timeout.Timeout: +def ceil_timeout( + delay: Optional[float], ceil_threshold: float = 5 +) -> async_timeout.Timeout: if delay is None or delay <= 0: return async_timeout.timeout(None) loop = get_running_loop() now = loop.time() when = now + delay - if delay > 5: + if delay > ceil_threshold: when = ceil(when) return async_timeout.timeout_at(when) diff --git a/aiohttp/web_protocol.py b/aiohttp/web_protocol.py index ad0c0498e39..78962e9ed70 100644 --- a/aiohttp/web_protocol.py +++ b/aiohttp/web_protocol.py @@ -127,6 +127,10 @@ class RequestHandler(BaseProtocol): max_headers -- Optional maximum header size + timeout_ceil_threshold -- Optional value to specify + threshold to ceil() timeout + values + """ KEEPALIVE_RESCHEDULE_DELAY = 1 @@ -157,6 +161,7 @@ class RequestHandler(BaseProtocol): "_close", "_force_close", "_current_request", + "_timeout_ceil_threshold", ) def __init__( @@ -177,6 +182,7 @@ def __init__( lingering_time: float = 10.0, read_bufsize: int = 2 ** 16, auto_decompress: bool = True, + timeout_ceil_threshold: float = 5, ): super().__init__(loop) @@ -213,6 +219,12 @@ def __init__( auto_decompress=auto_decompress, ) # type: Optional[HttpRequestParser] + self._timeout_ceil_threshold = 5 # type: float + try: + self._timeout_ceil_threshold = float(timeout_ceil_threshold) + except (TypeError, ValueError): + pass + self.logger = logger self.debug = debug self.access_log = access_log @@ -419,7 +431,8 @@ def _process_keepalive(self) -> None: # not all request handlers are done, # reschedule itself to next second self._keepalive_handle = self._loop.call_later( - self.KEEPALIVE_RESCHEDULE_DELAY, self._process_keepalive + self.KEEPALIVE_RESCHEDULE_DELAY, + self._process_keepalive, ) async def _handle_request( diff --git a/aiohttp/web_server.py b/aiohttp/web_server.py index 5657ed9c800..d0947a5971c 100644 --- a/aiohttp/web_server.py +++ b/aiohttp/web_server.py @@ -59,4 +59,13 @@ async def shutdown(self, timeout: Optional[float] = None) -> None: self._connections.clear() def __call__(self) -> RequestHandler: - return RequestHandler(self, loop=self._loop, **self._kwargs) + try: + return RequestHandler(self, loop=self._loop, **self._kwargs) + except TypeError: + # Failsafe creation: remove all custom handler_args + kwargs = { + k: v + for k, v in self._kwargs.items() + if k in ["debug", "access_log_class"] + } + return RequestHandler(self, loop=self._loop, **kwargs) diff --git a/aiohttp/web_ws.py b/aiohttp/web_ws.py index 16b0a1747cf..de7dfb7bb35 100644 --- a/aiohttp/web_ws.py +++ b/aiohttp/web_ws.py @@ -105,7 +105,12 @@ def _reset_heartbeat(self) -> None: if self._heartbeat is not None: assert self._loop is not None self._heartbeat_cb = call_later( - self._send_heartbeat, self._heartbeat, self._loop + self._send_heartbeat, + self._heartbeat, + self._loop, + timeout_ceil_threshold=self._req._protocol._timeout_ceil_threshold + if self._req is not None + else 5, ) def _send_heartbeat(self) -> None: @@ -119,7 +124,12 @@ def _send_heartbeat(self) -> None: if self._pong_response_cb is not None: self._pong_response_cb.cancel() self._pong_response_cb = call_later( - self._pong_not_received, self._pong_heartbeat, self._loop + self._pong_not_received, + self._pong_heartbeat, + self._loop, + timeout_ceil_threshold=self._req._protocol._timeout_ceil_threshold + if self._req is not None + else 5, ) def _pong_not_received(self) -> None: diff --git a/docs/client_quickstart.rst b/docs/client_quickstart.rst index 78be177d76c..d9a1d6cb20c 100644 --- a/docs/client_quickstart.rst +++ b/docs/client_quickstart.rst @@ -453,13 +453,17 @@ Supported :class:`ClientTimeout` fields are: The maximal number of seconds allowed for period between reading a new data portion from a peer. + ``ceil_threshold`` + + The threshold value to trigger ceiling of absolute timeout values. + All fields are floats, ``None`` or ``0`` disables a particular timeout check, see the :class:`ClientTimeout` reference for defaults and additional details. Thus the default timeout is:: aiohttp.ClientTimeout(total=5*60, connect=None, - sock_connect=None, sock_read=None) + sock_connect=None, sock_read=None, ceil_threshold=5) .. note:: @@ -476,4 +480,5 @@ Thus the default timeout is:: timeout expiration. Smaller timeouts are not rounded to help testing; in the real life network - timeouts usually greater than tens of seconds. + timeouts usually greater than tens of seconds. However, the default threshold + value of 5 seconds can be configured using the ``ceil_threshold`` parameter. diff --git a/docs/web_advanced.rst b/docs/web_advanced.rst index 56cf07d46f1..7dcd9b9c9bf 100644 --- a/docs/web_advanced.rst +++ b/docs/web_advanced.rst @@ -845,6 +845,25 @@ Signal handler may look like:: Both :func:`run_app` and :meth:`AppRunner.cleanup` call shutdown signal handlers. +.. _aiohttp-web-ceil-absolute-timeout: + +Ceil of absolute timeout value +------------------------------ + +*aiohttp* **ceils** internal timeout values if the value is equal or +greater than 5 seconds. The timeout expires at the next integer second +greater than ``current_time + timeout``. + +More details about ceiling absolute timeout values is available here +:ref:`aiohttp-client-timeouts`. + +The default threshold can be configured at :class:`aiohttp.web.Application` +level using the ``handler_args`` parameter. + +.. code-block:: python3 + + app = web.Application(handler_args={"timeout_ceil_threshold": 1}) + .. _aiohttp-web-background-tasks: Background tasks diff --git a/tests/test_helpers.py b/tests/test_helpers.py index d192545de7f..1d4ecca91f8 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -4,7 +4,7 @@ import gc import platform import tempfile -from math import isclose, modf +from math import isclose, modf, ceil from unittest import mock from urllib.request import getproxies_environment @@ -341,6 +341,18 @@ def test_when_timeout_smaller_second(loop) -> None: assert isclose(when - timer, 0, abs_tol=0.001) +def test_when_timeout_smaller_second_with_low_threshold(loop) -> None: + timeout = 0.1 + timer = loop.time() + timeout + + handle = helpers.TimeoutHandle(loop, timeout, 0.01) + when = handle.start()._when + handle.close() + + assert isinstance(when, int) + assert when == ceil(timer) + + def test_timeout_handle_cb_exc(loop) -> None: handle = helpers.TimeoutHandle(loop, 10.2) cb = mock.Mock() @@ -384,6 +396,16 @@ async def test_weakref_handle(loop) -> None: assert cb.test.called +async def test_weakref_handle_with_small_threshold(loop) -> None: + cb = mock.Mock() + loop = mock.Mock() + loop.time.return_value = 10 + helpers.weakref_handle(cb, "test", 0.1, loop, 0.01) + loop.call_at.assert_called_with( + 11, helpers._weakref_handle, (weakref.ref(cb), "test") + ) + + async def test_weakref_handle_weak(loop) -> None: cb = mock.Mock() helpers.weakref_handle(cb, "test", 0.01, loop) @@ -412,6 +434,32 @@ async def test_ceil_timeout_small() -> None: assert frac != 0 +def test_ceil_call_later_with_small_threshold() -> None: + cb = mock.Mock() + loop = mock.Mock() + loop.time.return_value = 10.1 + helpers.call_later(cb, 4.5, loop, 1) + loop.call_at.assert_called_with(15, cb) + + +def test_ceil_call_later_no_timeout() -> None: + cb = mock.Mock() + loop = mock.Mock() + helpers.call_later(cb, 0, loop) + assert not loop.call_at.called + + +async def test_ceil_timeout_none(loop) -> None: + async with helpers.ceil_timeout(None) as cm: + assert cm.deadline is None + + +async def test_ceil_timeout_small_with_overriden_threshold(loop) -> None: + async with helpers.ceil_timeout(1.5, ceil_threshold=1) as cm: + frac, integer = modf(cm.deadline) + assert frac == 0 + + # -------------------------------- ContentDisposition ------------------- diff --git a/tests/test_web_runner.py b/tests/test_web_runner.py index 8c08a5f5fbd..324b650e4f6 100644 --- a/tests/test_web_runner.py +++ b/tests/test_web_runner.py @@ -188,3 +188,33 @@ async def shutdown(): web.run_app(app) assert spy.called, "run_app() should work after asyncio.run()." + + +async def test_app_handler_args_failure() -> None: + app = web.Application(handler_args={"unknown_parameter": 5}) + runner = web.AppRunner(app) + await runner.setup() + assert runner._server + rh = runner._server() + assert rh._timeout_ceil_threshold == 5 + await runner.cleanup() + assert app + + +@pytest.mark.parametrize( + ("value", "expected"), + ( + (2, 2), + (None, 5), + ("2", 2), + ), +) +async def test_app_handler_args_ceil_threshold(value: Any, expected: Any) -> None: + app = web.Application(handler_args={"timeout_ceil_threshold": value}) + runner = web.AppRunner(app) + await runner.setup() + assert runner._server + rh = runner._server() + assert rh._timeout_ceil_threshold == expected + await runner.cleanup() + assert app