Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A better aio rwlock that guarantees the order #2547

Merged
merged 1 commit into from
Dec 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 72 additions & 66 deletions python/sglang/srt/aio_rwlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,93 +2,99 @@


class RWLock:
"""
A Read-Write Lock for asyncio:
- Multiple readers can hold the lock in parallel if no writer holds it.
- A writer has exclusive access.
"""

def __init__(self):
self._readers = 0 # How many readers currently hold the lock
# Protects internal state
self._lock = asyncio.Lock()

# Condition variable used to wait for state changes
self._cond = asyncio.Condition(self._lock)

# Number of readers currently holding the lock
self._readers = 0

# Whether a writer is currently holding the lock
self._writer_active = False
self._lock = asyncio.Lock() # Internal mutex to protect state
# Conditions associated with _lock:
self._readers_ok = asyncio.Condition(self._lock) # Notify blocked readers
self._writers_ok = asyncio.Condition(self._lock) # Notify blocked writers

# Expose two async context-manager helpers:
self.reader_lock = self._ReaderLock(self)
self.writer_lock = self._WriterLock(self)
# How many writers are queued waiting for a turn
self._waiting_writers = 0

async def _acquire_reader(self):
@property
def reader_lock(self):
"""
Wait until there is no active writer.
Then increment the count of active readers.
A context manager for acquiring a shared (reader) lock.

Example:
async with rwlock.reader_lock:
# read-only access
"""
async with self._lock:
# If a writer is active, wait until it's done.
while self._writer_active:
await self._readers_ok.wait()
self._readers += 1
return _ReaderLock(self)

async def _release_reader(self):
@property
def writer_lock(self):
"""
Decrement the count of active readers.
If this was the last active reader, wake up a possible waiting writer.
A context manager for acquiring an exclusive (writer) lock.

Example:
async with rwlock.writer_lock:
# exclusive access
"""
return _WriterLock(self)

async def acquire_reader(self):
async with self._lock:
# Wait until there is no active writer or waiting writer
# to ensure fairness.
while self._writer_active or self._waiting_writers > 0:
await self._cond.wait()
self._readers += 1

async def release_reader(self):
async with self._lock:
self._readers -= 1
# If no more readers, a writer could proceed.
# If this was the last reader, wake up anyone waiting
# (potentially a writer or new readers).
if self._readers == 0:
self._writers_ok.notify()
self._cond.notify_all()

async def _acquire_writer(self):
"""
Wait until there is no active writer and no active readers.
Then mark a writer as active.
"""
async def acquire_writer(self):
async with self._lock:
while self._writer_active or self._readers > 0:
await self._writers_ok.wait()
self._writer_active = True

async def _release_writer(self):
"""
Mark the writer as done and notify readers and writers.
"""
# Increment the count of writers waiting
self._waiting_writers += 1
try:
# Wait while either a writer is active or readers are present
while self._writer_active or self._readers > 0:
await self._cond.wait()
self._writer_active = True
finally:
# Decrement waiting writers only after we've acquired the writer lock
self._waiting_writers -= 1

async def release_writer(self):
async with self._lock:
self._writer_active = False
# Allow any waiting readers to proceed:
self._readers_ok.notify_all()
# Allow next waiting writer to proceed:
self._writers_ok.notify()
# Wake up anyone waiting (readers or writers)
self._cond.notify_all()

class _ReaderLock:
"""
A simple async context manager that acquires a reader lock
on entering and releases it on exit.
"""

def __init__(self, parent: "RWLock"):
self._parent = parent
class _ReaderLock:
def __init__(self, rwlock: RWLock):
self._rwlock = rwlock

async def __aenter__(self):
await self._parent._acquire_reader()
async def __aenter__(self):
await self._rwlock.acquire_reader()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._parent._release_reader()
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._rwlock.release_reader()

class _WriterLock:
"""
A simple async context manager that acquires a writer lock
on entering and releases it on exit.
"""

def __init__(self, parent: "RWLock"):
self._parent = parent
class _WriterLock:
def __init__(self, rwlock: RWLock):
self._rwlock = rwlock

async def __aenter__(self):
await self._parent._acquire_writer()
async def __aenter__(self):
await self._rwlock.acquire_writer()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._parent._release_writer()
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._rwlock.release_writer()
Loading