Skip to content

Commit

Permalink
Merge branch 'bugfixes/358-unhandled-worker-thread-exc'
Browse files Browse the repository at this point in the history
A DoS would happen in many situations, including TLS errors and
attempts to close the underlying sockets erroring out.

This patch aims to prevent a situation when the worker threads are
killed by arbitrary exceptions that bubble up to their entry point
layers that aren't handled properly or at all.

PR #649

Fixes #358
Fixes #354

Ref #310
Ref #346
Ref #375
Ref #599
Ref #641

Resolves #365
  • Loading branch information
webknjaz committed Mar 31, 2024
2 parents 029600e + a7665f3 commit 9720c31
Show file tree
Hide file tree
Showing 3 changed files with 334 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ per-file-ignores =
cheroot/test/test_dispatch.py: DAR101, DAR201, S101, WPS111, WPS121, WPS302, WPS422, WPS430
cheroot/test/test_ssl.py: C818, DAR101, DAR201, DAR301, DAR401, E800, I001, I003, I004, I005, S101, S309, S404, S603, WPS100, WPS110, WPS111, WPS114, WPS121, WPS130, WPS201, WPS202, WPS204, WPS210, WPS211, WPS218, WPS219, WPS222, WPS226, WPS231, WPS300, WPS301, WPS317, WPS318, WPS324, WPS326, WPS335, WPS336, WPS337, WPS352, WPS408, WPS420, WPS421, WPS422, WPS432, WPS436, WPS440, WPS441, WPS442, WPS450, WPS509, WPS510, WPS608
cheroot/test/test_server.py: DAR101, DAR201, DAR301, I001, I003, I004, I005, S101, WPS110, WPS111, WPS118, WPS121, WPS122, WPS130, WPS201, WPS202, WPS210, WPS218, WPS226, WPS229, WPS300, WPS317, WPS318, WPS324, WPS326, WPS421, WPS422, WPS430, WPS432, WPS433, WPS436, WPS437, WPS442, WPS507, WPS509, WPS608
cheroot/test/test_conn.py: B007, DAR101, DAR201, DAR301, DAR401, E800, I001, I003, I004, I005, N802, N805, RST304, S101, S310, WPS100, WPS110, WPS111, WPS114, WPS115, WPS120, WPS121, WPS122, WPS201, WPS202, WPS204, WPS210, WPS211, WPS213, WPS214, WPS218, WPS219, WPS226, WPS231, WPS301, WPS306, WPS317, WPS318, WPS323, WPS326, WPS361, WPS420, WPS421, WPS422, WPS425, WPS429, WPS430, WPS432, WPS435, WPS436, WPS437, WPS440, WPS442, WPS447, WPS462, WPS508, WPS509, WPS510, WPS526
cheroot/test/test_conn.py: B007, DAR101, DAR201, DAR301, DAR401, E800, I001, I003, I004, I005, N802, N805, RST304, S101, S310, WPS100, WPS110, WPS111, WPS114, WPS115, WPS120, WPS121, WPS122, WPS201, WPS202, WPS204, WPS210, WPS211, WPS213, WPS214, WPS218, WPS219, WPS226, WPS231, WPS301, WPS306, WPS317, WPS318, WPS323, WPS326, WPS361, WPS402, WPS420, WPS421, WPS422, WPS425, WPS429, WPS430, WPS432, WPS435, WPS436, WPS437, WPS440, WPS442, WPS447, WPS462, WPS508, WPS509, WPS510, WPS526
cheroot/test/webtest.py: B007, DAR101, DAR201, DAR401, I001, I003, I004, N802, RST303, RST304, S101, S104, WPS100, WPS110, WPS111, WPS115, WPS120, WPS121, WPS122, WPS201, WPS202, WPS204, WPS210, WPS211, WPS213, WPS214, WPS220, WPS221, WPS223, WPS229, WPS230, WPS231, WPS236, WPS301, WPS306, WPS317, WPS323, WPS326, WPS338, WPS361, WPS414, WPS420, WPS421, WPS422, WPS430, WPS432, WPS433, WPS437, WPS440, WPS501, WPS503, WPS505, WPS601
cheroot/testing.py: B014, C815, DAR101, DAR201, DAR301, I001, I003, S104, WPS100, WPS202, WPS211, WPS229, WPS301, WPS306, WPS317, WPS414, WPS420, WPS422, WPS430, WPS503, WPS526
cheroot/workers/threadpool.py: B007, DAR101, DAR201, E800, I001, I003, I004, RST201, RST203, RST301, WPS100, WPS110, WPS111, WPS121, WPS125, WPS211, WPS214, WPS220, WPS229, WPS230, WPS231, WPS304, WPS306, WPS317, WPS318, WPS322, WPS326, WPS335, WPS338, WPS362, WPS410, WPS414, WPS420, WPS422, WPS428, WPS432, WPS440, WPS462, WPS501, WPS505, WPS601, WPS602, WPS609
Expand Down
258 changes: 258 additions & 0 deletions cheroot/test/test_conn.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Tests for TCP connection handling, including proper and timely close."""

import errno
from re import match as _matches_pattern
import socket
import time
import logging
Expand Down Expand Up @@ -700,6 +701,263 @@ def _close_kernel_socket(self):
assert _close_kernel_socket.exception_leaked is exception_leaks


def test_broken_connection_during_http_communication_fallback( # noqa: WPS118
monkeypatch,
test_client,
testing_server,
wsgi_server_thread,
):
"""Test that unhandled internal error cascades into shutdown."""
def _raise_connection_reset(*_args, **_kwargs):
raise ConnectionResetError(666)

def _read_request_line(self):
monkeypatch.setattr(self.conn.rfile, 'close', _raise_connection_reset)
monkeypatch.setattr(self.conn.wfile, 'write', _raise_connection_reset)
_raise_connection_reset()

monkeypatch.setattr(
test_client.server_instance.ConnectionClass.RequestHandlerClass,
'read_request_line',
_read_request_line,
)

test_client.get_connection().send(b'GET / HTTP/1.1')
wsgi_server_thread.join() # no extra logs upon server termination

actual_log_entries = testing_server.error_log.calls[:]
testing_server.error_log.calls.clear() # prevent post-test assertions

expected_log_entries = (
(logging.WARNING, r'^socket\.error 666$'),
(
logging.INFO,
'^Got a connection error while handling a connection '
r'from .*:\d{1,5} \(666\)',
),
(
logging.CRITICAL,
r'A fatal exception happened\. Setting the server interrupt flag '
r'to ConnectionResetError\(666\) and giving up\.\n\nPlease, '
'report this on the Cheroot tracker at '
r'<https://github\.com/cherrypy/cheroot/issues/new/choose>, '
'providing a full reproducer with as much context and details '
r'as possible\.$',
),
)

assert len(actual_log_entries) == len(expected_log_entries)

for ( # noqa: WPS352
(expected_log_level, expected_msg_regex),
(actual_msg, actual_log_level, _tb),
) in zip(expected_log_entries, actual_log_entries):
assert expected_log_level == actual_log_level
assert _matches_pattern(expected_msg_regex, actual_msg) is not None, (
f'{actual_msg !r} does not match {expected_msg_regex !r}'
)


def test_kb_int_from_http_handler(
test_client,
testing_server,
wsgi_server_thread,
):
"""Test that a keyboard interrupt from HTTP handler causes shutdown."""
def _trigger_kb_intr(_req, _resp):
raise KeyboardInterrupt('simulated test handler keyboard interrupt')
testing_server.wsgi_app.handlers['/kb_intr'] = _trigger_kb_intr

http_conn = test_client.get_connection()
http_conn.putrequest('GET', '/kb_intr', skip_host=True)
http_conn.putheader('Host', http_conn.host)
http_conn.endheaders()
wsgi_server_thread.join() # no extra logs upon server termination

actual_log_entries = testing_server.error_log.calls[:]
testing_server.error_log.calls.clear() # prevent post-test assertions

expected_log_entries = (
(
logging.DEBUG,
'^Got a server shutdown request while handling a connection '
r'from .*:\d{1,5} \(simulated test handler keyboard interrupt\)$',
),
(
logging.DEBUG,
'^Setting the server interrupt flag to KeyboardInterrupt'
r"\('simulated test handler keyboard interrupt'\)$",
),
(
logging.INFO,
'^Keyboard Interrupt: shutting down$',
),
)

assert len(actual_log_entries) == len(expected_log_entries)

for ( # noqa: WPS352
(expected_log_level, expected_msg_regex),
(actual_msg, actual_log_level, _tb),
) in zip(expected_log_entries, actual_log_entries):
assert expected_log_level == actual_log_level
assert _matches_pattern(expected_msg_regex, actual_msg) is not None, (
f'{actual_msg !r} does not match {expected_msg_regex !r}'
)


def test_unhandled_exception_in_request_handler(
mocker,
monkeypatch,
test_client,
testing_server,
wsgi_server_thread,
):
"""Ensure worker threads are resilient to in-handler exceptions."""

class SillyMistake(BaseException): # noqa: WPS418, WPS431
"""A simulated crash within an HTTP handler."""

def _trigger_scary_exc(_req, _resp):
raise SillyMistake('simulated unhandled exception 💣 in test handler')

testing_server.wsgi_app.handlers['/scary_exc'] = _trigger_scary_exc

server_connection_close_spy = mocker.spy(
test_client.server_instance.ConnectionClass,
'close',
)

http_conn = test_client.get_connection()
http_conn.putrequest('GET', '/scary_exc', skip_host=True)
http_conn.putheader('Host', http_conn.host)
http_conn.endheaders()

# NOTE: This spy ensure the log entry gets recorded before we're testing
# NOTE: them and before server shutdown, preserving their order and making
# NOTE: the log entry presence non-flaky.
while not server_connection_close_spy.called: # noqa: WPS328
pass

assert len(testing_server.requests._threads) == 10
while testing_server.requests.idle < 10: # noqa: WPS328
pass
assert len(testing_server.requests._threads) == 10
testing_server.interrupt = SystemExit('test requesting shutdown')
assert not testing_server.requests._threads
wsgi_server_thread.join() # no extra logs upon server termination

actual_log_entries = testing_server.error_log.calls[:]
testing_server.error_log.calls.clear() # prevent post-test assertions

expected_log_entries = (
(
logging.ERROR,
'^Unhandled error while processing an incoming connection '
'SillyMistake'
r"\('simulated unhandled exception 💣 in test handler'\)$",
),
(
logging.INFO,
'^SystemExit raised: shutting down$',
),
)

assert len(actual_log_entries) == len(expected_log_entries)

for ( # noqa: WPS352
(expected_log_level, expected_msg_regex),
(actual_msg, actual_log_level, _tb),
) in zip(expected_log_entries, actual_log_entries):
assert expected_log_level == actual_log_level
assert _matches_pattern(expected_msg_regex, actual_msg) is not None, (
f'{actual_msg !r} does not match {expected_msg_regex !r}'
)


def test_remains_alive_post_unhandled_exception(
mocker,
monkeypatch,
test_client,
testing_server,
wsgi_server_thread,
):
"""Ensure worker threads are resilient to unhandled exceptions."""

class ScaryCrash(BaseException): # noqa: WPS418, WPS431
"""A simulated crash during HTTP parsing."""

_orig_read_request_line = (
test_client.server_instance.
ConnectionClass.RequestHandlerClass.
read_request_line
)

def _read_request_line(self):
_orig_read_request_line(self)
raise ScaryCrash(666)

monkeypatch.setattr(
test_client.server_instance.ConnectionClass.RequestHandlerClass,
'read_request_line',
_read_request_line,
)

server_connection_close_spy = mocker.spy(
test_client.server_instance.ConnectionClass,
'close',
)

# NOTE: The initial worker thread count is 10.
assert len(testing_server.requests._threads) == 10

test_client.get_connection().send(b'GET / HTTP/1.1')

# NOTE: This spy ensure the log entry gets recorded before we're testing
# NOTE: them and before server shutdown, preserving their order and making
# NOTE: the log entry presence non-flaky.
while not server_connection_close_spy.called: # noqa: WPS328
pass

# NOTE: This checks for whether there's any crashed threads
while testing_server.requests.idle < 10: # noqa: WPS328
pass
assert len(testing_server.requests._threads) == 10
assert all(
worker_thread.is_alive()
for worker_thread in testing_server.requests._threads
)
testing_server.interrupt = SystemExit('test requesting shutdown')
assert not testing_server.requests._threads
wsgi_server_thread.join() # no extra logs upon server termination

actual_log_entries = testing_server.error_log.calls[:]
testing_server.error_log.calls.clear() # prevent post-test assertions

expected_log_entries = (
(
logging.ERROR,
'^Unhandled error while processing an incoming connection '
r'ScaryCrash\(666\)$',
),
(
logging.INFO,
'^SystemExit raised: shutting down$',
),
)

assert len(actual_log_entries) == len(expected_log_entries)

for ( # noqa: WPS352
(expected_log_level, expected_msg_regex),
(actual_msg, actual_log_level, _tb),
) in zip(expected_log_entries, actual_log_entries):
assert expected_log_level == actual_log_level
assert _matches_pattern(expected_msg_regex, actual_msg) is not None, (
f'{actual_msg !r} does not match {expected_msg_regex !r}'
)


@pytest.mark.parametrize(
'timeout_before_headers',
(
Expand Down
77 changes: 75 additions & 2 deletions cheroot/workers/threadpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import collections
import logging
import threading
import time
import socket
Expand Down Expand Up @@ -107,14 +108,38 @@ def run(self):
from the inner-layer code constitute a global server interrupt
request. When they happen, the worker thread exits.
:raises BaseException: when an unexpected non-interrupt
exception leaks from the inner layers
# noqa: DAR401 KeyboardInterrupt SystemExit
"""
self.server.stats['Worker Threads'][self.name] = self.stats
self.ready = True
try:
self._process_connections_until_interrupted()
except (KeyboardInterrupt, SystemExit) as ex:
self.server.interrupt = ex
except (KeyboardInterrupt, SystemExit) as interrupt_exc:
interrupt_cause = interrupt_exc.__cause__ or interrupt_exc
self.server.error_log(
f'Setting the server interrupt flag to {interrupt_cause !r}',
level=logging.DEBUG,
)
self.server.interrupt = interrupt_cause
except BaseException as underlying_exc: # noqa: WPS424
# NOTE: This is the last resort logging with the last dying breath
# NOTE: of the worker. It is only reachable when exceptions happen
# NOTE: in the `finally` branch of the internal try/except block.
self.server.error_log(
'A fatal exception happened. Setting the server interrupt flag'
f' to {underlying_exc !r} and giving up.'
'\N{NEW LINE}\N{NEW LINE}'
'Please, report this on the Cheroot tracker at '
'<https://github.com/cherrypy/cheroot/issues/new/choose>, '
'providing a full reproducer with as much context and details as possible.',
level=logging.CRITICAL,
traceback=True,
)
self.server.interrupt = underlying_exc
raise
finally:
self.ready = False

Expand All @@ -123,6 +148,9 @@ def _process_connections_until_interrupted(self):
Retrieves incoming connections from thread pool, processing
them one by one.
:raises SystemExit: on the internal requests to stop the
server instance
"""
while True:
conn = self.server.requests.get()
Expand All @@ -136,7 +164,52 @@ def _process_connections_until_interrupted(self):
keep_conn_open = False
try:
keep_conn_open = conn.communicate()
except ConnectionError as connection_error:
keep_conn_open = False # Drop the connection cleanly
self.server.error_log(
'Got a connection error while handling a '
f'connection from {conn.remote_addr !s}:'
f'{conn.remote_port !s} ({connection_error !s})',
level=logging.INFO,
)
continue
except (KeyboardInterrupt, SystemExit) as shutdown_request:
# Shutdown request
keep_conn_open = False # Drop the connection cleanly
self.server.error_log(
'Got a server shutdown request while handling a '
f'connection from {conn.remote_addr !s}:'
f'{conn.remote_port !s} ({shutdown_request !s})',
level=logging.DEBUG,
)
raise SystemExit(
str(shutdown_request),
) from shutdown_request
except BaseException as unhandled_error: # noqa: WPS424
# NOTE: Only a shutdown request should bubble up to the
# NOTE: external cleanup code. Otherwise, this thread dies.
# NOTE: If this were to happen, the threadpool would still
# NOTE: list a dead thread without knowing its state. And
# NOTE: the calling code would fail to schedule processing
# NOTE: of new requests.
self.server.error_log(
'Unhandled error while processing an incoming '
f'connection {unhandled_error !r}',
level=logging.ERROR,
traceback=True,
)
continue # Prevent the thread from dying
finally:
# NOTE: Any exceptions coming from within `finally` may
# NOTE: kill the thread, causing the threadpool to only
# NOTE: contain references to dead threads rendering the
# NOTE: server defunct, effectively meaning a DoS.
# NOTE: Ideally, things called here should process
# NOTE: everything recoverable internally. Any unhandled
# NOTE: errors will bubble up into the outer try/except
# NOTE: block. They will be treated as fatal and turned
# NOTE: into server shutdown requests and then reraised
# NOTE: unconditionally.
if keep_conn_open:
self.server.put_conn(conn)
else:
Expand Down

0 comments on commit 9720c31

Please sign in to comment.