diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py index eb995de0..34431443 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py @@ -229,11 +229,27 @@ async def get_document( return None async def stop_extension(self): - # Cancel tasks and clean up - await asyncio.wait( + self.log.info("Stopping collaboration extension...") + _, pending = await asyncio.wait( [ asyncio.create_task(self.ywebsocket_server.clean()), asyncio.create_task(self.file_loaders.clear()), ], timeout=3, ) + if pending: + self.log.warning( + "Collaboration shutdown: %d task(s) still pending after 3s timeout, cancelling.", + len(pending), + ) + for task in pending: + task.cancel() + # Wait briefly for cancellation to propagate + _, still_pending = await asyncio.wait(pending, timeout=1) + if still_pending: + self.log.error( + "Collaboration shutdown: %d task(s) could not be cancelled.", + len(still_pending), + ) + else: + self.log.info("Collaboration extension stopped cleanly.") diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py index dab437c1..60c2431b 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py @@ -79,11 +79,14 @@ def create_task(self, aw): task = asyncio.create_task(aw) self._background_tasks.add(task) task.add_done_callback(self._background_tasks.discard) + return task async def prepare(self): await ensure_async(super().prepare()) + if self._websocket_server.stopping: + raise web.HTTPError(503, "Server is shutting down") if not self._websocket_server.started.is_set(): - self.create_task(self._websocket_server.start()) + self._websocket_server._start_task = self.create_task(self._websocket_server.start()) await self._websocket_server.started.wait() # Get room diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py index 13fb5375..48fb37a6 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py @@ -56,28 +56,35 @@ def __init__( self.connected_users: dict[Any, Any] = {} # Async loop is not yet ready at the object instantiation self.monitor_task: asyncio.Task | None = None + self._stopping = False + self._start_task: asyncio.Task | None = None + + @property + def stopping(self) -> bool: + return self._stopping async def clean(self): # TODO: should we wait for any save task? - self.log.info("Deleting all rooms.") # FIXME some clean up should be upstreamed and the following does not # prevent hanging stop process - it also requires some thinking about # should the ystore write action be cancelled; I guess not as it could # results in corrupted data. - # room_tasks = list() - # for name, room in list(self.rooms.items()): - # for task in room.background_tasks: - # task.cancel() # FIXME should be upstreamed - # room_tasks.append(task) - # if room_tasks: - # _, pending = await asyncio.wait(room_tasks, timeout=3) - # if pending: - # msg = f"{len(pending)} room task(s) are pending." - # self.log.warning(msg) - # self.log.debug("Pending tasks: %r", pending) - - await self.stop() - tasks = [] + self.log.info("Cleaning up %d room(s).", len(self.rooms)) + # Reject new WebSocket connections in YDocWebSocketHandler.prepare(). + self._stopping = True + + # Stop the server first to reject any new connections during shutdown. + # Guard: stop() raises RuntimeError if the server was never started. + if self.started.is_set(): + await self.stop() + + # Now disconnect existing clients so their serve() tasks complete. + tasks: list[asyncio.Task] = [] + for room in self.rooms.values(): + for client in list(room.clients): + tasks.extend(client._background_tasks) # type: ignore[attr-defined] + client._message_queue.put_nowait(b"") # type: ignore[attr-defined] + if self.monitor_task is not None: self.monitor_task.cancel() tasks.append(self.monitor_task) @@ -85,9 +92,29 @@ async def clean(self): if tasks: _, pending = await asyncio.wait(tasks, timeout=3) if pending: - msg = f"{len(pending)} task(s) are pending." - self.log.warning(msg) - self.log.debug("Pending tasks: %r", pending) + self.log.warning("%d task(s) still pending after shutdown.", len(pending)) + + # Rooms persist after client disconnect so users can reconnect without + # a full re-sync (see auto_clean_rooms=False in app.py). On shutdown + # we must stop them explicitly. + for room in self.rooms.values(): + try: + await room.stop() + except RuntimeError as e: + if "not running" not in str(e): + raise + self.rooms.clear() + + # Await the root asyncio task so anyio's task group __aexit__ runs + # and fires done-callbacks that clean up worker threads. Without this, + # non-daemon WorkerThread instances block Python's threading._shutdown(). + if self._start_task is not None and not self._start_task.done(): + try: + await asyncio.wait_for(self._start_task, timeout=2) + except asyncio.TimeoutError: + # Letting this propagate would be silently swallowed by + # asyncio.wait() in stop_extension(), so we log it here. + self.log.warning("Server start task did not complete within 2s.") def room_exists(self, path: str) -> bool: """