Skip to content

Commit

Permalink
Misc/app configure timeout ceil threshold (aio-libs#6316)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrew Svetlov <[email protected]>
  • Loading branch information
hlecnt and asvetlov authored Dec 6, 2021
1 parent 674948f commit d3cd8ba
Show file tree
Hide file tree
Showing 14 changed files with 217 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGES/6316.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Configure timeout value ceil threshold at the application level.
1 change: 1 addition & 0 deletions CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ Jakob Ackermann
Jakub Wilk
Jan Buchar
Jashandeep Sohi
Jean-Baptiste Estival
Jens Steinhauser
Jeonghun Lee
Jeongkyu Shin
Expand Down
11 changes: 9 additions & 2 deletions aiohttp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class ClientTimeout:
connect: Optional[float] = None
sock_read: Optional[float] = None
sock_connect: Optional[float] = None
ceil_threshold: float = 5

# pool_queue_timeout: Optional[float] = None
# dns_resolution_timeout: Optional[float] = None
Expand Down Expand Up @@ -402,7 +403,9 @@ async def _request(
real_timeout = timeout
# timeout is cumulative for all request operations
# (request, redirects, responses, data consuming)
tm = TimeoutHandle(self._loop, real_timeout.total)
tm = TimeoutHandle(
self._loop, real_timeout.total, ceil_threshold=real_timeout.ceil_threshold
)
handle = tm.start()

if read_bufsize is None:
Expand Down Expand Up @@ -489,7 +492,10 @@ async def _request(

# connection timeout
try:
async with ceil_timeout(real_timeout.connect):
async with ceil_timeout(
real_timeout.connect,
ceil_threshold=real_timeout.ceil_threshold,
):
assert self._connector is not None
conn = await self._connector.connect(
req, traces=traces, timeout=real_timeout
Expand All @@ -509,6 +515,7 @@ async def _request(
auto_decompress=self._auto_decompress,
read_timeout=real_timeout.sock_read,
read_bufsize=read_bufsize,
timeout_ceil_threshold=self._connector._timeout_ceil_threshold,
)

try:
Expand Down
5 changes: 5 additions & 0 deletions aiohttp/client_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
self._read_timeout = None # type: Optional[float]
self._read_timeout_handle = None # type: Optional[asyncio.TimerHandle]

self._timeout_ceil_threshold = 5 # type: Optional[float]

self.closed = self._loop.create_future() # type: asyncio.Future[None]

@property
Expand Down Expand Up @@ -150,12 +152,15 @@ def set_response_params(
auto_decompress: bool = True,
read_timeout: Optional[float] = None,
read_bufsize: int = 2 ** 16,
timeout_ceil_threshold: float = 5,
) -> None:
self._skip_payload = skip_payload

self._read_timeout = read_timeout
self._reschedule_timeout()

self._timeout_ceil_threshold = timeout_ceil_threshold

self._parser = HttpResponseParser(
self,
self._loop,
Expand Down
14 changes: 12 additions & 2 deletions aiohttp/client_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,12 @@ def _reset_heartbeat(self) -> None:

if self._heartbeat is not None:
self._heartbeat_cb = call_later(
self._send_heartbeat, self._heartbeat, self._loop
self._send_heartbeat,
self._heartbeat,
self._loop,
timeout_ceil_threshold=self._conn._connector._timeout_ceil_threshold
if self._conn is not None
else 5,
)

def _send_heartbeat(self) -> None:
Expand All @@ -107,7 +112,12 @@ def _send_heartbeat(self) -> None:
if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
self._pong_response_cb = call_later(
self._pong_not_received, self._pong_heartbeat, self._loop
self._pong_not_received,
self._pong_heartbeat,
self._loop,
timeout_ceil_threshold=self._conn._connector._timeout_ceil_threshold
if self._conn is not None
else 5,
)

def _pong_not_received(self) -> None:
Expand Down
46 changes: 38 additions & 8 deletions aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ class BaseConnector:
limit_per_host - Number of simultaneous connections to one host.
enable_cleanup_closed - Enables clean-up closed ssl transports.
Disabled by default.
timeout_ceil_threshold - Trigger ceiling of timeout values when
it's above timeout_ceil_threshold.
loop - Optional event loop.
"""

Expand All @@ -188,6 +190,7 @@ def __init__(
limit: int = 100,
limit_per_host: int = 0,
enable_cleanup_closed: bool = False,
timeout_ceil_threshold: float = 5,
) -> None:

if force_close:
Expand All @@ -199,6 +202,8 @@ def __init__(
if keepalive_timeout is sentinel:
keepalive_timeout = 15.0

self._timeout_ceil_threshold = timeout_ceil_threshold

loop = asyncio.get_running_loop()

self._closed = False
Expand Down Expand Up @@ -326,7 +331,11 @@ def _cleanup(self) -> None:

if self._conns:
self._cleanup_handle = helpers.weakref_handle(
self, "_cleanup", timeout, self._loop
self,
"_cleanup",
timeout,
self._loop,
timeout_ceil_threshold=self._timeout_ceil_threshold,
)

def _drop_acquired_per_host(
Expand Down Expand Up @@ -356,7 +365,11 @@ def _cleanup_closed(self) -> None:

if not self._cleanup_closed_disabled:
self._cleanup_closed_handle = helpers.weakref_handle(
self, "_cleanup_closed", self._cleanup_closed_period, self._loop
self,
"_cleanup_closed",
self._cleanup_closed_period,
self._loop,
timeout_ceil_threshold=self._timeout_ceil_threshold,
)

async def close(self) -> None:
Expand Down Expand Up @@ -639,7 +652,11 @@ def _release(

if self._cleanup_handle is None:
self._cleanup_handle = helpers.weakref_handle(
self, "_cleanup", self._keepalive_timeout, self._loop
self,
"_cleanup",
self._keepalive_timeout,
self._loop,
timeout_ceil_threshold=self._timeout_ceil_threshold,
)

async def _create_connection(
Expand Down Expand Up @@ -728,13 +745,15 @@ def __init__(
limit: int = 100,
limit_per_host: int = 0,
enable_cleanup_closed: bool = False,
timeout_ceil_threshold: float = 5,
) -> None:
super().__init__(
keepalive_timeout=keepalive_timeout,
force_close=force_close,
limit=limit,
limit_per_host=limit_per_host,
enable_cleanup_closed=enable_cleanup_closed,
timeout_ceil_threshold=timeout_ceil_threshold,
)

if not isinstance(ssl, SSL_ALLOWED_TYPES):
Expand Down Expand Up @@ -945,7 +964,9 @@ async def _wrap_create_connection(
**kwargs: Any,
) -> Tuple[asyncio.Transport, ResponseHandler]:
try:
async with ceil_timeout(timeout.sock_connect):
async with ceil_timeout(
timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
):
return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa
except cert_errors as exc:
raise ClientConnectorCertificateError(req.connection_key, exc) from exc
Expand Down Expand Up @@ -1009,7 +1030,9 @@ async def _start_tls_connection(
sslcontext = cast(ssl.SSLContext, self._get_ssl_context(req))

try:
async with ceil_timeout(timeout.sock_connect):
async with ceil_timeout(
timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
):
try:
tls_transport = await self._loop.start_tls(
underlying_transport,
Expand Down Expand Up @@ -1185,7 +1208,10 @@ async def _create_proxy_connection(
# read_until_eof=True will ensure the connection isn't closed
# once the response is received and processed allowing
# START_TLS to work on the connection below.
protocol.set_response_params(read_until_eof=True)
protocol.set_response_params(
read_until_eof=True,
timeout_ceil_threshold=self._timeout_ceil_threshold,
)
resp = await proxy_resp.start(conn)
except BaseException:
proxy_resp.close()
Expand Down Expand Up @@ -1263,7 +1289,9 @@ async def _create_connection(
self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
) -> ResponseHandler:
try:
async with ceil_timeout(timeout.sock_connect):
async with ceil_timeout(
timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
):
_, proto = await self._loop.create_unix_connection(
self._factory, self._path
)
Expand Down Expand Up @@ -1319,7 +1347,9 @@ async def _create_connection(
self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
) -> ResponseHandler:
try:
async with ceil_timeout(timeout.sock_connect):
async with ceil_timeout(
timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
):
_, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined] # noqa: E501
self._factory, self._path
)
Expand Down
29 changes: 21 additions & 8 deletions aiohttp/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,23 +573,30 @@ def _weakref_handle(info: "Tuple[weakref.ref[object], str]") -> None:


def weakref_handle(
ob: object, name: str, timeout: float, loop: asyncio.AbstractEventLoop
ob: object,
name: str,
timeout: float,
loop: asyncio.AbstractEventLoop,
timeout_ceil_threshold: float = 5,
) -> Optional[asyncio.TimerHandle]:
if timeout is not None and timeout > 0:
when = loop.time() + timeout
if timeout >= 5:
if timeout >= timeout_ceil_threshold:
when = ceil(when)

return loop.call_at(when, _weakref_handle, (weakref.ref(ob), name))
return None


def call_later(
cb: Callable[[], Any], timeout: float, loop: asyncio.AbstractEventLoop
cb: Callable[[], Any],
timeout: float,
loop: asyncio.AbstractEventLoop,
timeout_ceil_threshold: float = 5,
) -> Optional[asyncio.TimerHandle]:
if timeout is not None and timeout > 0:
when = loop.time() + timeout
if timeout > 5:
if timeout > timeout_ceil_threshold:
when = ceil(when)
return loop.call_at(when, cb)
return None
Expand All @@ -599,10 +606,14 @@ class TimeoutHandle:
"""Timeout handle"""

def __init__(
self, loop: asyncio.AbstractEventLoop, timeout: Optional[float]
self,
loop: asyncio.AbstractEventLoop,
timeout: Optional[float],
ceil_threshold: float = 5,
) -> None:
self._timeout = timeout
self._loop = loop
self._ceil_threshold = ceil_threshold
self._callbacks = (
[]
) # type: List[Tuple[Callable[..., None], Tuple[Any, ...], Dict[str, Any]]]
Expand All @@ -619,7 +630,7 @@ def start(self) -> Optional[asyncio.Handle]:
timeout = self._timeout
if timeout is not None and timeout > 0:
when = self._loop.time() + timeout
if timeout >= 5:
if timeout >= self._ceil_threshold:
when = ceil(when)
return self._loop.call_at(when, self.__call__)
else:
Expand Down Expand Up @@ -701,14 +712,16 @@ def timeout(self) -> None:
self._cancelled = True


def ceil_timeout(delay: Optional[float]) -> async_timeout.Timeout:
def ceil_timeout(
delay: Optional[float], ceil_threshold: float = 5
) -> async_timeout.Timeout:
if delay is None or delay <= 0:
return async_timeout.timeout(None)

loop = asyncio.get_running_loop()
now = loop.time()
when = now + delay
if delay > 5:
if delay > ceil_threshold:
when = ceil(when)
return async_timeout.timeout_at(when)

Expand Down
15 changes: 14 additions & 1 deletion aiohttp/web_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ class RequestHandler(BaseProtocol):
max_headers -- Optional maximum header size
timeout_ceil_threshold -- Optional value to specify
threshold to ceil() timeout
values
"""

KEEPALIVE_RESCHEDULE_DELAY = 1
Expand Down Expand Up @@ -173,6 +177,7 @@ class RequestHandler(BaseProtocol):
"_close",
"_force_close",
"_current_request",
"_timeout_ceil_threshold",
)

def __init__(
Expand All @@ -192,6 +197,7 @@ def __init__(
lingering_time: float = 10.0,
read_bufsize: int = 2 ** 16,
auto_decompress: bool = True,
timeout_ceil_threshold: float = 5,
):
super().__init__(loop)

Expand Down Expand Up @@ -228,6 +234,12 @@ def __init__(
auto_decompress=auto_decompress,
) # type: Optional[HttpRequestParser]

self._timeout_ceil_threshold = 5 # type: float
try:
self._timeout_ceil_threshold = float(timeout_ceil_threshold)
except (TypeError, ValueError):
pass

self.logger = logger
self.access_log = access_log
if access_log:
Expand Down Expand Up @@ -440,7 +452,8 @@ def _process_keepalive(self) -> None:
# not all request handlers are done,
# reschedule itself to next second
self._keepalive_handle = self._loop.call_later(
self.KEEPALIVE_RESCHEDULE_DELAY, self._process_keepalive
self.KEEPALIVE_RESCHEDULE_DELAY,
self._process_keepalive,
)

async def _handle_request(
Expand Down
11 changes: 10 additions & 1 deletion aiohttp/web_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,13 @@ async def shutdown(self, timeout: Optional[float] = None) -> None:
self._connections.clear()

def __call__(self) -> RequestHandler:
return RequestHandler(self, loop=self._loop, **self._kwargs)
try:
return RequestHandler(self, loop=self._loop, **self._kwargs)
except TypeError:
# Failsafe creation: remove all custom handler_args
kwargs = {
k: v
for k, v in self._kwargs.items()
if k in ["debug", "access_log_class"]
}
return RequestHandler(self, loop=self._loop, **kwargs)
Loading

0 comments on commit d3cd8ba

Please sign in to comment.