Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better fix for scheduling logic for transport close() and abort() #2973

Merged
merged 4 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sanic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"FORWARDED_FOR_HEADER": "X-Forwarded-For",
"FORWARDED_SECRET": None,
"GRACEFUL_SHUTDOWN_TIMEOUT": 15.0,
"GRACEFUL_TCP_CLOSE_TIMEOUT": 5.0,
"INSPECTOR": False,
"INSPECTOR_HOST": "localhost",
"INSPECTOR_PORT": 6457,
Expand Down Expand Up @@ -104,6 +105,7 @@ class Config(dict, metaclass=DescriptorMeta):
FORWARDED_FOR_HEADER: str
FORWARDED_SECRET: Optional[str]
GRACEFUL_SHUTDOWN_TIMEOUT: float
GRACEFUL_TCP_CLOSE_TIMEOUT: float
INSPECTOR: bool
INSPECTOR_HOST: str
INSPECTOR_PORT: int
Expand Down
125 changes: 120 additions & 5 deletions sanic/server/protocols/base_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,47 @@ def close(self, timeout: Optional[float] = None):
"""
Attempt close the connection.
"""
# Cause a call to connection_lost where further cleanup occurs
if self.transport:
self.transport.close()
if self.transport is None or self.transport.is_closing():
# do not attempt to close again, already aborted or closing
return

# Check if write is already paused _before_ close() is called.
write_was_paused = not self._can_write.is_set()
# Trigger the UVLoop Stream Transport Close routine
# Causes a call to connection_lost where further cleanup occurs
# Close may fully close the connection now, but if there is still
# data in the libuv buffer, then close becomes an async operation
self.transport.close()
try:
# Check write-buffer data left _after_ close is called.
# in UVLoop, get the data in the libuv transport write-buffer
data_left = self.transport.get_write_buffer_size()
# Some asyncio implementations don't support get_write_buffer_size
except (AttributeError, NotImplementedError):
data_left = 0
if write_was_paused or data_left > 0:
# don't call resume_writing here, it gets called by the transport
# to unpause the protocol when it is ready for more data

# Schedule the async close checker, to close the connection
# after the transport is done, and clean everything up.
if timeout is None:
# This close timeout needs to be less than the graceful
# shutdown timeout. The graceful shutdown _could_ be waiting
# for this transport to close before shutting down the app.
timeout = self.app.config.GRACEFUL_TCP_CLOSE_TIMEOUT
# This is 5s by default.
else:
# Schedule the async close checker but with no timeout,
# this will ensure abort() is called if required.
if timeout is None:
timeout = self.app.config.GRACEFUL_SHUTDOWN_TIMEOUT
self.loop.call_later(timeout, self.abort)
timeout = 0
self.loop.call_soon(
_async_protocol_transport_close,
self,
self.loop,
timeout,
)

def abort(self):
"""
Expand All @@ -118,8 +153,21 @@ def connection_made(self, transport):
error_logger.exception("protocol.connect_made")

def connection_lost(self, exc):
"""
This is a callback handler that is called from the asyncio
transport layer implementation (eg, UVLoop's UVStreamTransport).
It is scheduled to be called async after the transport has closed.
When data is still in the send buffer, this call to connection_lost
will be delayed until _after_ the buffer is finished being sent.

So we can use this callback as a confirmation callback
that the async write-buffer transfer is finished.
"""
try:
self.connections.discard(self)
# unblock the send queue if it is paused,
# this allows the route handler to see
# the CancelledError exception
self.resume_writing()
self.conn_info.lost = True
if self._task:
Expand All @@ -143,3 +191,70 @@ def data_received(self, data: bytes):
self._data_received.set()
except BaseException:
error_logger.exception("protocol.data_received")


def _async_protocol_transport_close(
protocol: SanicProtocol,
loop: asyncio.AbstractEventLoop,
timeout: float,
):
"""
This function is scheduled to be called after close() is called.
It checks that the transport has shut down properly, or waits
for any remaining data to be sent, and aborts after a timeout.
This is required if the transport is closed while there is an async
large async transport write operation in progress.
This is observed when NGINX reverse-proxy is the client.
"""
if protocol.transport is None:
# abort() is the only method that can make
# protocol.transport be None, so abort was already called
return
# protocol.connection_lost does not set protocol.transport to None
# so to detect it a different way with conninfo.lost
elif protocol.conn_info is not None and protocol.conn_info.lost:
# Terminus. Most connections finish the protocol here!
# Connection_lost callback was executed already,
# so transport did complete and close itself properly.
# No need to call abort().

# This is the last part of cleanup to do
# that is not done by connection_lost handler.
# Ensure transport is cleaned up by GC.
protocol.transport = None
return
elif not protocol.transport.is_closing():
raise RuntimeError(
"You must call transport.close() before "
"protocol._async_transport_close() runs."
)

write_is_paused = not protocol._can_write.is_set()
try:
# in UVLoop, get the data in the libuv write-buffer
data_left = protocol.transport.get_write_buffer_size()
# Some asyncio implementations don't support get_write_buffer_size
except (AttributeError, NotImplementedError):
data_left = 0
if write_is_paused or data_left > 0:
# don't need to call resume_writing here to unpause
if timeout <= 0:
# timeout is 0 or less, so we can simply abort now
loop.call_soon(SanicProtocol.abort, protocol)
else:
next_check_interval = min(timeout, 0.1)
next_check_timeout = timeout - next_check_interval
loop.call_later(
# Recurse back in after the timeout, to check again
next_check_interval,
# this next time with reduced timeout.
_async_protocol_transport_close,
protocol,
loop,
next_check_timeout,
)
else:
# Not paused, and no data left in the buffer, but transport
# is still open, connection_lost has not been called yet.
# We can call abort() to fix that.
loop.call_soon(SanicProtocol.abort, protocol)
8 changes: 2 additions & 6 deletions sanic/server/protocols/http_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,10 @@ def close(self, timeout: Optional[float] = None):
"""
Requires to prevent checking timeouts for closed connections
"""
if timeout is not None:
super().close(timeout=timeout)
return

if self._callback_check_timeouts:
self._callback_check_timeouts.cancel()
if self.transport:
self.transport.close()
self.abort()
return super().close(timeout=timeout)

async def send(self, data): # no cov
"""
Expand Down
Loading