Skip to content

Commit

Permalink
[3.9] Misc/app configure timeout ceil threshold (aio-libs#6316)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrew Svetlov <[email protected]>.
(cherry picked from commit d3cd8ba)

Co-authored-by: jb <[email protected]>
  • Loading branch information
hlecnt authored and Jean-Baptiste Estival committed Dec 7, 2021
1 parent b235105 commit c0f07d2
Show file tree
Hide file tree
Showing 14 changed files with 229 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGES/6316.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Configure timeout value ceil threshold at the application level.
1 change: 1 addition & 0 deletions CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ Jakob Ackermann
Jakub Wilk
Jan Buchar
Jashandeep Sohi
Jean-Baptiste Estival
Jens Steinhauser
Jeonghun Lee
Jeongkyu Shin
Expand Down
11 changes: 9 additions & 2 deletions aiohttp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ class ClientTimeout:
connect: Optional[float] = None
sock_read: Optional[float] = None
sock_connect: Optional[float] = None
ceil_threshold: float = 5

# pool_queue_timeout: Optional[float] = None
# dns_resolution_timeout: Optional[float] = None
Expand Down Expand Up @@ -445,7 +446,9 @@ async def _request(
real_timeout = timeout
# timeout is cumulative for all request operations
# (request, redirects, responses, data consuming)
tm = TimeoutHandle(self._loop, real_timeout.total)
tm = TimeoutHandle(
self._loop, real_timeout.total, ceil_threshold=real_timeout.ceil_threshold
)
handle = tm.start()

if read_bufsize is None:
Expand Down Expand Up @@ -532,7 +535,10 @@ async def _request(

# connection timeout
try:
async with ceil_timeout(real_timeout.connect):
async with ceil_timeout(
real_timeout.connect,
ceil_threshold=real_timeout.ceil_threshold,
):
assert self._connector is not None
conn = await self._connector.connect(
req, traces=traces, timeout=real_timeout
Expand All @@ -552,6 +558,7 @@ async def _request(
auto_decompress=self._auto_decompress,
read_timeout=real_timeout.sock_read,
read_bufsize=read_bufsize,
timeout_ceil_threshold=self._connector._timeout_ceil_threshold,
)

try:
Expand Down
7 changes: 7 additions & 0 deletions aiohttp/client_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
self._read_timeout = None # type: Optional[float]
self._read_timeout_handle = None # type: Optional[asyncio.TimerHandle]

self._timeout_ceil_threshold = 5 # type: Optional[float]

self.closed = self._loop.create_future() # type: asyncio.Future[None]

@property
def upgraded(self) -> bool:
return self._upgraded
Expand Down Expand Up @@ -143,12 +147,15 @@ def set_response_params(
auto_decompress: bool = True,
read_timeout: Optional[float] = None,
read_bufsize: int = 2 ** 16,
timeout_ceil_threshold: float = 5,
) -> None:
self._skip_payload = skip_payload

self._read_timeout = read_timeout
self._reschedule_timeout()

self._timeout_ceil_threshold = timeout_ceil_threshold

self._parser = HttpResponseParser(
self,
self._loop,
Expand Down
14 changes: 12 additions & 2 deletions aiohttp/client_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ def _reset_heartbeat(self) -> None:

if self._heartbeat is not None:
self._heartbeat_cb = call_later(
self._send_heartbeat, self._heartbeat, self._loop
self._send_heartbeat,
self._heartbeat,
self._loop,
timeout_ceil_threshold=self._conn._connector._timeout_ceil_threshold
if self._conn is not None
else 5,
)

def _send_heartbeat(self) -> None:
Expand All @@ -96,7 +101,12 @@ def _send_heartbeat(self) -> None:
if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
self._pong_response_cb = call_later(
self._pong_not_received, self._pong_heartbeat, self._loop
self._pong_not_received,
self._pong_heartbeat,
self._loop,
timeout_ceil_threshold=self._conn._connector._timeout_ceil_threshold
if self._conn is not None
else 5,
)

def _pong_not_received(self) -> None:
Expand Down
45 changes: 37 additions & 8 deletions aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ class BaseConnector:
limit_per_host - Number of simultaneous connections to one host.
enable_cleanup_closed - Enables clean-up closed ssl transports.
Disabled by default.
timeout_ceil_threshold - Trigger ceiling of timeout values when
it's above timeout_ceil_threshold.
loop - Optional event loop.
"""

Expand All @@ -220,6 +222,7 @@ def __init__(
limit_per_host: int = 0,
enable_cleanup_closed: bool = False,
loop: Optional[asyncio.AbstractEventLoop] = None,
timeout_ceil_threshold: float = 5,
) -> None:

if force_close:
Expand All @@ -232,6 +235,7 @@ def __init__(
keepalive_timeout = 15.0

loop = get_running_loop(loop)
self._timeout_ceil_threshold = timeout_ceil_threshold

self._closed = False
if loop.get_debug():
Expand Down Expand Up @@ -373,7 +377,11 @@ def _cleanup(self) -> None:

if self._conns:
self._cleanup_handle = helpers.weakref_handle(
self, "_cleanup", timeout, self._loop
self,
"_cleanup",
timeout,
self._loop,
timeout_ceil_threshold=self._timeout_ceil_threshold,
)

def _drop_acquired_per_host(
Expand Down Expand Up @@ -403,7 +411,11 @@ def _cleanup_closed(self) -> None:

if not self._cleanup_closed_disabled:
self._cleanup_closed_handle = helpers.weakref_handle(
self, "_cleanup_closed", self._cleanup_closed_period, self._loop
self,
"_cleanup_closed",
self._cleanup_closed_period,
self._loop,
timeout_ceil_threshold=self._timeout_ceil_threshold,
)

def close(self) -> Awaitable[None]:
Expand Down Expand Up @@ -674,7 +686,11 @@ def _release(

if self._cleanup_handle is None:
self._cleanup_handle = helpers.weakref_handle(
self, "_cleanup", self._keepalive_timeout, self._loop
self,
"_cleanup",
self._keepalive_timeout,
self._loop,
timeout_ceil_threshold=self._timeout_ceil_threshold,
)

async def _create_connection(
Expand Down Expand Up @@ -767,6 +783,7 @@ def __init__(
limit_per_host: int = 0,
enable_cleanup_closed: bool = False,
loop: Optional[asyncio.AbstractEventLoop] = None,
timeout_ceil_threshold: float = 5,
):
super().__init__(
keepalive_timeout=keepalive_timeout,
Expand All @@ -775,6 +792,7 @@ def __init__(
limit_per_host=limit_per_host,
enable_cleanup_closed=enable_cleanup_closed,
loop=loop,
timeout_ceil_threshold=timeout_ceil_threshold,
)

self._ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint)
Expand Down Expand Up @@ -982,7 +1000,9 @@ async def _wrap_create_connection(
**kwargs: Any,
) -> Tuple[asyncio.Transport, ResponseHandler]:
try:
async with ceil_timeout(timeout.sock_connect):
async with ceil_timeout(
timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
):
return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa
except cert_errors as exc:
raise ClientConnectorCertificateError(req.connection_key, exc) from exc
Expand Down Expand Up @@ -1090,7 +1110,9 @@ async def _start_tls_connection(
sslcontext = cast(ssl.SSLContext, self._get_ssl_context(req))

try:
async with ceil_timeout(timeout.sock_connect):
async with ceil_timeout(
timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
):
try:
tls_transport = await self._loop.start_tls(
underlying_transport,
Expand Down Expand Up @@ -1271,7 +1293,10 @@ async def _create_proxy_connection(
# read_until_eof=True will ensure the connection isn't closed
# once the response is received and processed allowing
# START_TLS to work on the connection below.
protocol.set_response_params(read_until_eof=runtime_has_start_tls)
protocol.set_response_params(
read_until_eof=runtime_has_start_tls,
timeout_ceil_threshold=self._timeout_ceil_threshold,
)
resp = await proxy_resp.start(conn)
except BaseException:
proxy_resp.close()
Expand Down Expand Up @@ -1374,7 +1399,9 @@ async def _create_connection(
self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
) -> ResponseHandler:
try:
async with ceil_timeout(timeout.sock_connect):
async with ceil_timeout(
timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
):
_, proto = await self._loop.create_unix_connection(
self._factory, self._path
)
Expand Down Expand Up @@ -1432,7 +1459,9 @@ async def _create_connection(
self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout"
) -> ResponseHandler:
try:
async with ceil_timeout(timeout.sock_connect):
async with ceil_timeout(
timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
):
_, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined] # noqa: E501
self._factory, self._path
)
Expand Down
29 changes: 21 additions & 8 deletions aiohttp/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,23 +595,30 @@ def _weakref_handle(info: "Tuple[weakref.ref[object], str]") -> None:


def weakref_handle(
ob: object, name: str, timeout: float, loop: asyncio.AbstractEventLoop
ob: object,
name: str,
timeout: float,
loop: asyncio.AbstractEventLoop,
timeout_ceil_threshold: float = 5,
) -> Optional[asyncio.TimerHandle]:
if timeout is not None and timeout > 0:
when = loop.time() + timeout
if timeout >= 5:
if timeout >= timeout_ceil_threshold:
when = ceil(when)

return loop.call_at(when, _weakref_handle, (weakref.ref(ob), name))
return None


def call_later(
cb: Callable[[], Any], timeout: float, loop: asyncio.AbstractEventLoop
cb: Callable[[], Any],
timeout: float,
loop: asyncio.AbstractEventLoop,
timeout_ceil_threshold: float = 5,
) -> Optional[asyncio.TimerHandle]:
if timeout is not None and timeout > 0:
when = loop.time() + timeout
if timeout > 5:
if timeout > timeout_ceil_threshold:
when = ceil(when)
return loop.call_at(when, cb)
return None
Expand All @@ -621,10 +628,14 @@ class TimeoutHandle:
"""Timeout handle"""

def __init__(
self, loop: asyncio.AbstractEventLoop, timeout: Optional[float]
self,
loop: asyncio.AbstractEventLoop,
timeout: Optional[float],
ceil_threshold: float = 5,
) -> None:
self._timeout = timeout
self._loop = loop
self._ceil_threshold = ceil_threshold
self._callbacks = (
[]
) # type: List[Tuple[Callable[..., None], Tuple[Any, ...], Dict[str, Any]]]
Expand All @@ -641,7 +652,7 @@ def start(self) -> Optional[asyncio.Handle]:
timeout = self._timeout
if timeout is not None and timeout > 0:
when = self._loop.time() + timeout
if timeout >= 5:
if timeout >= self._ceil_threshold:
when = ceil(when)
return self._loop.call_at(when, self.__call__)
else:
Expand Down Expand Up @@ -723,14 +734,16 @@ def timeout(self) -> None:
self._cancelled = True


def ceil_timeout(delay: Optional[float]) -> async_timeout.Timeout:
def ceil_timeout(
delay: Optional[float], ceil_threshold: float = 5
) -> async_timeout.Timeout:
if delay is None or delay <= 0:
return async_timeout.timeout(None)

loop = get_running_loop()
now = loop.time()
when = now + delay
if delay > 5:
if delay > ceil_threshold:
when = ceil(when)
return async_timeout.timeout_at(when)

Expand Down
15 changes: 14 additions & 1 deletion aiohttp/web_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ class RequestHandler(BaseProtocol):
max_headers -- Optional maximum header size
timeout_ceil_threshold -- Optional value to specify
threshold to ceil() timeout
values
"""

KEEPALIVE_RESCHEDULE_DELAY = 1
Expand Down Expand Up @@ -157,6 +161,7 @@ class RequestHandler(BaseProtocol):
"_close",
"_force_close",
"_current_request",
"_timeout_ceil_threshold",
)

def __init__(
Expand All @@ -177,6 +182,7 @@ def __init__(
lingering_time: float = 10.0,
read_bufsize: int = 2 ** 16,
auto_decompress: bool = True,
timeout_ceil_threshold: float = 5,
):
super().__init__(loop)

Expand Down Expand Up @@ -213,6 +219,12 @@ def __init__(
auto_decompress=auto_decompress,
) # type: Optional[HttpRequestParser]

self._timeout_ceil_threshold = 5 # type: float
try:
self._timeout_ceil_threshold = float(timeout_ceil_threshold)
except (TypeError, ValueError):
pass

self.logger = logger
self.debug = debug
self.access_log = access_log
Expand Down Expand Up @@ -419,7 +431,8 @@ def _process_keepalive(self) -> None:
# not all request handlers are done,
# reschedule itself to next second
self._keepalive_handle = self._loop.call_later(
self.KEEPALIVE_RESCHEDULE_DELAY, self._process_keepalive
self.KEEPALIVE_RESCHEDULE_DELAY,
self._process_keepalive,
)

async def _handle_request(
Expand Down
11 changes: 10 additions & 1 deletion aiohttp/web_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,13 @@ async def shutdown(self, timeout: Optional[float] = None) -> None:
self._connections.clear()

def __call__(self) -> RequestHandler:
return RequestHandler(self, loop=self._loop, **self._kwargs)
try:
return RequestHandler(self, loop=self._loop, **self._kwargs)
except TypeError:
# Failsafe creation: remove all custom handler_args
kwargs = {
k: v
for k, v in self._kwargs.items()
if k in ["debug", "access_log_class"]
}
return RequestHandler(self, loop=self._loop, **kwargs)
Loading

0 comments on commit c0f07d2

Please sign in to comment.