diff --git a/python/sglang/srt/aio_rwlock.py b/python/sglang/srt/aio_rwlock.py index e6c617d2e9f..deda1fe7903 100644 --- a/python/sglang/srt/aio_rwlock.py +++ b/python/sglang/srt/aio_rwlock.py @@ -2,93 +2,99 @@ class RWLock: - """ - A Read-Write Lock for asyncio: - - Multiple readers can hold the lock in parallel if no writer holds it. - - A writer has exclusive access. - """ - def __init__(self): - self._readers = 0 # How many readers currently hold the lock + # Protects internal state + self._lock = asyncio.Lock() + + # Condition variable used to wait for state changes + self._cond = asyncio.Condition(self._lock) + + # Number of readers currently holding the lock + self._readers = 0 + + # Whether a writer is currently holding the lock self._writer_active = False - self._lock = asyncio.Lock() # Internal mutex to protect state - # Conditions associated with _lock: - self._readers_ok = asyncio.Condition(self._lock) # Notify blocked readers - self._writers_ok = asyncio.Condition(self._lock) # Notify blocked writers - # Expose two async context-manager helpers: - self.reader_lock = self._ReaderLock(self) - self.writer_lock = self._WriterLock(self) + # How many writers are queued waiting for a turn + self._waiting_writers = 0 - async def _acquire_reader(self): + @property + def reader_lock(self): """ - Wait until there is no active writer. - Then increment the count of active readers. + A context manager for acquiring a shared (reader) lock. + + Example: + async with rwlock.reader_lock: + # read-only access """ - async with self._lock: - # If a writer is active, wait until it's done. - while self._writer_active: - await self._readers_ok.wait() - self._readers += 1 + return _ReaderLock(self) - async def _release_reader(self): + @property + def writer_lock(self): """ - Decrement the count of active readers. - If this was the last active reader, wake up a possible waiting writer. + A context manager for acquiring an exclusive (writer) lock. + + Example: + async with rwlock.writer_lock: + # exclusive access """ + return _WriterLock(self) + + async def acquire_reader(self): + async with self._lock: + # Wait until there is no active writer or waiting writer + # to ensure fairness. + while self._writer_active or self._waiting_writers > 0: + await self._cond.wait() + self._readers += 1 + + async def release_reader(self): async with self._lock: self._readers -= 1 - # If no more readers, a writer could proceed. + # If this was the last reader, wake up anyone waiting + # (potentially a writer or new readers). if self._readers == 0: - self._writers_ok.notify() + self._cond.notify_all() - async def _acquire_writer(self): - """ - Wait until there is no active writer and no active readers. - Then mark a writer as active. - """ + async def acquire_writer(self): async with self._lock: - while self._writer_active or self._readers > 0: - await self._writers_ok.wait() - self._writer_active = True - - async def _release_writer(self): - """ - Mark the writer as done and notify readers and writers. - """ + # Increment the count of writers waiting + self._waiting_writers += 1 + try: + # Wait while either a writer is active or readers are present + while self._writer_active or self._readers > 0: + await self._cond.wait() + self._writer_active = True + finally: + # Decrement waiting writers only after we've acquired the writer lock + self._waiting_writers -= 1 + + async def release_writer(self): async with self._lock: self._writer_active = False - # Allow any waiting readers to proceed: - self._readers_ok.notify_all() - # Allow next waiting writer to proceed: - self._writers_ok.notify() + # Wake up anyone waiting (readers or writers) + self._cond.notify_all() - class _ReaderLock: - """ - A simple async context manager that acquires a reader lock - on entering and releases it on exit. - """ - def __init__(self, parent: "RWLock"): - self._parent = parent +class _ReaderLock: + def __init__(self, rwlock: RWLock): + self._rwlock = rwlock - async def __aenter__(self): - await self._parent._acquire_reader() + async def __aenter__(self): + await self._rwlock.acquire_reader() + return self - async def __aexit__(self, exc_type, exc_val, exc_tb): - await self._parent._release_reader() + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self._rwlock.release_reader() - class _WriterLock: - """ - A simple async context manager that acquires a writer lock - on entering and releases it on exit. - """ - def __init__(self, parent: "RWLock"): - self._parent = parent +class _WriterLock: + def __init__(self, rwlock: RWLock): + self._rwlock = rwlock - async def __aenter__(self): - await self._parent._acquire_writer() + async def __aenter__(self): + await self._rwlock.acquire_writer() + return self - async def __aexit__(self, exc_type, exc_val, exc_tb): - await self._parent._release_writer() + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self._rwlock.release_writer()