From d0ad3a40fc242fcae32207b4b83a52c4d3c3fa5a Mon Sep 17 00:00:00 2001 From: Ian Hunt-Isaak Date: Thu, 12 Feb 2026 14:22:07 -0500 Subject: [PATCH 1/3] FIX: prevent hanging on server shutdown On SIGTERM, non-daemon anyio WorkerThread instances blocked Python's threading._shutdown() because the root asyncio task was never awaited, so its done-callbacks (which send shutdown signals to worker threads) never fired. Fix: - Track the root asyncio task (server.start()) so clean() can await it - Stop all rooms explicitly during shutdown (auto_clean_rooms=False means pycrdt never stops them automatically) - Await the root task with a timeout so anyio's task group __aexit__ runs and worker threads get their shutdown signal - Cancel timed-out tasks in stop_extension() instead of ignoring them Co-Authored-By: Claude Opus 4.6 --- .../jupyter_server_ydoc/app.py | 20 +++++- .../jupyter_server_ydoc/handlers.py | 7 ++- .../jupyter_server_ydoc/websocketserver.py | 63 +++++++++++++------ 3 files changed, 69 insertions(+), 21 deletions(-) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py index 79b35b6d..52470314 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py @@ -226,11 +226,27 @@ async def get_document( return None async def stop_extension(self): - # Cancel tasks and clean up - await asyncio.wait( + self.log.info("Stopping collaboration extension...") + _, pending = await asyncio.wait( [ asyncio.create_task(self.ywebsocket_server.clean()), asyncio.create_task(self.file_loaders.clear()), ], timeout=3, ) + if pending: + self.log.warning( + "Collaboration shutdown: %d task(s) still pending after 3s timeout, cancelling.", + len(pending), + ) + for task in pending: + task.cancel() + # Wait briefly for cancellation to propagate + _, still_pending = await asyncio.wait(pending, timeout=1) + if still_pending: + self.log.error( + "Collaboration shutdown: %d task(s) could not be cancelled.", + len(still_pending), + ) + else: + self.log.info("Collaboration extension stopped cleanly.") diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py index 16a761d6..1e95aaa6 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py @@ -80,8 +80,13 @@ def create_task(self, aw): async def prepare(self): await ensure_async(super().prepare()) + if self._websocket_server.stopping: + raise web.HTTPError(503, "Server is shutting down") if not self._websocket_server.started.is_set(): - self.create_task(self._websocket_server.start()) + task = asyncio.create_task(self._websocket_server.start()) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) + self._websocket_server._start_task = task await self._websocket_server.started.wait() # Get room diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py index 376a4ad3..5ae75195 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py @@ -55,28 +55,35 @@ def __init__( self.connected_users: dict[Any, Any] = {} # Async loop is not yet ready at the object instantiation self.monitor_task: asyncio.Task | None = None + self._stopping = False + self._start_task: asyncio.Task | None = None + + @property + def stopping(self) -> bool: + return self._stopping async def clean(self): # TODO: should we wait for any save task? - self.log.info("Deleting all rooms.") # FIXME some clean up should be upstreamed and the following does not # prevent hanging stop process - it also requires some thinking about # should the ystore write action be cancelled; I guess not as it could # results in corrupted data. - # room_tasks = list() - # for name, room in list(self.rooms.items()): - # for task in room.background_tasks: - # task.cancel() # FIXME should be upstreamed - # room_tasks.append(task) - # if room_tasks: - # _, pending = await asyncio.wait(room_tasks, timeout=3) - # if pending: - # msg = f"{len(pending)} room task(s) are pending." - # self.log.warning(msg) - # self.log.debug("Pending tasks: %r", pending) - - await self.stop() - tasks = [] + self.log.info("Cleaning up %d room(s).", len(self.rooms)) + # Reject new WebSocket connections in YDocWebSocketHandler.prepare(). + self._stopping = True + + # Stop the server first to reject any new connections during shutdown. + # Guard: stop() raises RuntimeError if the server was never started. + if self.started.is_set(): + await self.stop() + + # Now disconnect existing clients so their serve() tasks complete. + tasks: list[asyncio.Task] = [] + for _, room in list(self.rooms.items()): + for client in list(room.clients): + tasks.extend(client._background_tasks) # type: ignore[attr-defined] + client._message_queue.put_nowait(b"") # type: ignore[attr-defined] + if self.monitor_task is not None: self.monitor_task.cancel() tasks.append(self.monitor_task) @@ -84,9 +91,29 @@ async def clean(self): if tasks: _, pending = await asyncio.wait(tasks, timeout=3) if pending: - msg = f"{len(pending)} task(s) are pending." - self.log.warning(msg) - self.log.debug("Pending tasks: %r", pending) + self.log.warning("%d task(s) still pending after shutdown.", len(pending)) + + # Rooms persist after client disconnect so users can reconnect without + # a full re-sync (see auto_clean_rooms=False in app.py). On shutdown + # we must stop them explicitly. + for _, room in list(self.rooms.items()): + try: + await room.stop() + except RuntimeError as e: + if "not running" not in str(e): + raise + self.rooms.clear() + + # Await the root asyncio task so anyio's task group __aexit__ runs + # and fires done-callbacks that clean up worker threads. Without this, + # non-daemon WorkerThread instances block Python's threading._shutdown(). + if self._start_task is not None and not self._start_task.done(): + try: + await asyncio.wait_for(self._start_task, timeout=2) + except asyncio.TimeoutError: + # Letting this propagate would be silently swallowed by + # asyncio.wait() in stop_extension(), so we log it here. + self.log.warning("Server start task did not complete within 2s.") def room_exists(self, path: str) -> bool: """ From 27572f9bbf50ebcfbd30763e086e382baa9016df Mon Sep 17 00:00:00 2001 From: Ian Hunt-Isaak Date: Tue, 17 Feb 2026 11:07:18 -0500 Subject: [PATCH 2/3] cleaner loop --- .../jupyter_server_ydoc/websocketserver.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py index 5ae75195..e767699c 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py @@ -79,7 +79,7 @@ async def clean(self): # Now disconnect existing clients so their serve() tasks complete. tasks: list[asyncio.Task] = [] - for _, room in list(self.rooms.items()): + for room in self.rooms.values(): for client in list(room.clients): tasks.extend(client._background_tasks) # type: ignore[attr-defined] client._message_queue.put_nowait(b"") # type: ignore[attr-defined] @@ -96,7 +96,7 @@ async def clean(self): # Rooms persist after client disconnect so users can reconnect without # a full re-sync (see auto_clean_rooms=False in app.py). On shutdown # we must stop them explicitly. - for _, room in list(self.rooms.items()): + for room in self.rooms.values(): try: await room.stop() except RuntimeError as e: From a5f0151422d9255cdfc5006e3b5940a243762c60 Mon Sep 17 00:00:00 2001 From: Ian Hunt-Isaak Date: Tue, 17 Feb 2026 11:09:06 -0500 Subject: [PATCH 3/3] simplify --- .../jupyter-server-ydoc/jupyter_server_ydoc/handlers.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py index 1e95aaa6..ced80190 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py @@ -77,16 +77,14 @@ def create_task(self, aw): task = asyncio.create_task(aw) self._background_tasks.add(task) task.add_done_callback(self._background_tasks.discard) + return task async def prepare(self): await ensure_async(super().prepare()) if self._websocket_server.stopping: raise web.HTTPError(503, "Server is shutting down") if not self._websocket_server.started.is_set(): - task = asyncio.create_task(self._websocket_server.start()) - self._background_tasks.add(task) - task.add_done_callback(self._background_tasks.discard) - self._websocket_server._start_task = task + self._websocket_server._start_task = self.create_task(self._websocket_server.start()) await self._websocket_server.started.wait() # Get room