Skip to content

Commit

Permalink
Start recv_events only after attributes are initialized.
Browse files Browse the repository at this point in the history
Else, a race condition could lead to accessing self.pong_waiters
before it is defined.
  • Loading branch information
aaugustin committed Feb 1, 2025
1 parent 3dac6c4 commit e4a3db8
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 21 deletions.
16 changes: 8 additions & 8 deletions src/websockets/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,6 @@ def __init__(
# Protect sending fragmented messages.
self.fragmented_send_waiter: asyncio.Future[None] | None = None

# Exception raised while reading from the connection, to be chained to
# ConnectionClosed in order to show why the TCP connection dropped.
self.recv_exc: BaseException | None = None

# Completed when the TCP connection is closed and the WebSocket
# connection state becomes CLOSED.
self.connection_lost_waiter: asyncio.Future[None] = self.loop.create_future()

# Mapping of ping IDs to pong waiters, in chronological order.
self.pong_waiters: dict[bytes, tuple[asyncio.Future[float], float]] = {}

Expand All @@ -128,6 +120,14 @@ def __init__(
# Task that sends keepalive pings. None when ping_interval is None.
self.keepalive_task: asyncio.Task[None] | None = None

# Exception raised while reading from the connection, to be chained to
# ConnectionClosed in order to show why the TCP connection dropped.
self.recv_exc: BaseException | None = None

# Completed when the TCP connection is closed and the WebSocket
# connection state becomes CLOSED.
self.connection_lost_waiter: asyncio.Future[None] = self.loop.create_future()

# Adapted from asyncio.FlowControlMixin
self.paused: bool = False
self.drain_waiters: collections.deque[asyncio.Future[None]] = (
Expand Down
28 changes: 15 additions & 13 deletions src/websockets/sync/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,6 @@ def __init__(
# Whether we are busy sending a fragmented message.
self.send_in_progress = False

# Exception raised in recv_events, to be chained to ConnectionClosed
# in the user thread in order to show why the TCP connection dropped.
self.recv_exc: BaseException | None = None

# Receiving events from the socket. This thread is marked as daemon to
# allow creating a connection in a non-daemon thread and using it in a
# daemon thread. This mustn't prevent the interpreter from exiting.
self.recv_events_thread = threading.Thread(
target=self.recv_events,
daemon=True,
)
self.recv_events_thread.start()

# Mapping of ping IDs to pong waiters, in chronological order.
self.pong_waiters: dict[bytes, tuple[threading.Event, float, bool]] = {}

Expand All @@ -133,6 +120,21 @@ def __init__(
# Thread that sends keepalive pings. None when ping_interval is None.
self.keepalive_thread: threading.Thread | None = None

# Exception raised in recv_events, to be chained to ConnectionClosed
# in the user thread in order to show why the TCP connection dropped.
self.recv_exc: BaseException | None = None

# Receiving events from the socket. This thread is marked as daemon to
# allow creating a connection in a non-daemon thread and using it in a
# daemon thread. This mustn't prevent the interpreter from exiting.
self.recv_events_thread = threading.Thread(
target=self.recv_events,
daemon=True,
)

# Start recv_events only after all attributes are initialized.
self.recv_events_thread.start()

# Public attributes

@property
Expand Down

0 comments on commit e4a3db8

Please sign in to comment.