Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,27 @@ async def get_document(
return None

async def stop_extension(self):
# Cancel tasks and clean up
await asyncio.wait(
self.log.info("Stopping collaboration extension...")
_, pending = await asyncio.wait(
[
asyncio.create_task(self.ywebsocket_server.clean()),
asyncio.create_task(self.file_loaders.clear()),
],
timeout=3,
)
if pending:
self.log.warning(
"Collaboration shutdown: %d task(s) still pending after 3s timeout, cancelling.",
len(pending),
)
for task in pending:
task.cancel()
# Wait briefly for cancellation to propagate
_, still_pending = await asyncio.wait(pending, timeout=1)
if still_pending:
self.log.error(
"Collaboration shutdown: %d task(s) could not be cancelled.",
len(still_pending),
)
else:
self.log.info("Collaboration extension stopped cleanly.")
5 changes: 4 additions & 1 deletion projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,14 @@ def create_task(self, aw):
task = asyncio.create_task(aw)
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
return task

async def prepare(self):
await ensure_async(super().prepare())
if self._websocket_server.stopping:
raise web.HTTPError(503, "Server is shutting down")
if not self._websocket_server.started.is_set():
self.create_task(self._websocket_server.start())
self._websocket_server._start_task = self.create_task(self._websocket_server.start())
await self._websocket_server.started.wait()

# Get room
Expand Down
63 changes: 45 additions & 18 deletions projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,38 +56,65 @@ def __init__(
self.connected_users: dict[Any, Any] = {}
# Async loop is not yet ready at the object instantiation
self.monitor_task: asyncio.Task | None = None
self._stopping = False
self._start_task: asyncio.Task | None = None

@property
def stopping(self) -> bool:
return self._stopping

async def clean(self):
# TODO: should we wait for any save task?
self.log.info("Deleting all rooms.")
# FIXME some clean up should be upstreamed and the following does not
# prevent hanging stop process - it also requires some thinking about
# should the ystore write action be cancelled; I guess not as it could
# results in corrupted data.
# room_tasks = list()
# for name, room in list(self.rooms.items()):
# for task in room.background_tasks:
# task.cancel() # FIXME should be upstreamed
# room_tasks.append(task)
# if room_tasks:
# _, pending = await asyncio.wait(room_tasks, timeout=3)
# if pending:
# msg = f"{len(pending)} room task(s) are pending."
# self.log.warning(msg)
# self.log.debug("Pending tasks: %r", pending)

await self.stop()
tasks = []
self.log.info("Cleaning up %d room(s).", len(self.rooms))
# Reject new WebSocket connections in YDocWebSocketHandler.prepare().
self._stopping = True

# Stop the server first to reject any new connections during shutdown.
# Guard: stop() raises RuntimeError if the server was never started.
if self.started.is_set():
await self.stop()

# Now disconnect existing clients so their serve() tasks complete.
tasks: list[asyncio.Task] = []
for room in self.rooms.values():
for client in list(room.clients):
tasks.extend(client._background_tasks) # type: ignore[attr-defined]
client._message_queue.put_nowait(b"") # type: ignore[attr-defined]

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be nice to have teh client have a disconnect method that just handles this. looks rather mysterious from here.


if self.monitor_task is not None:
self.monitor_task.cancel()
tasks.append(self.monitor_task)

if tasks:
_, pending = await asyncio.wait(tasks, timeout=3)
if pending:
msg = f"{len(pending)} task(s) are pending."
self.log.warning(msg)
self.log.debug("Pending tasks: %r", pending)
self.log.warning("%d task(s) still pending after shutdown.", len(pending))

# Rooms persist after client disconnect so users can reconnect without
# a full re-sync (see auto_clean_rooms=False in app.py). On shutdown
# we must stop them explicitly.
for room in self.rooms.values():
try:
await room.stop()
except RuntimeError as e:
if "not running" not in str(e):
raise
self.rooms.clear()

# Await the root asyncio task so anyio's task group __aexit__ runs
# and fires done-callbacks that clean up worker threads. Without this,
# non-daemon WorkerThread instances block Python's threading._shutdown().
if self._start_task is not None and not self._start_task.done():
try:
await asyncio.wait_for(self._start_task, timeout=2)
except asyncio.TimeoutError:
# Letting this propagate would be silently swallowed by
# asyncio.wait() in stop_extension(), so we log it here.
self.log.warning("Server start task did not complete within 2s.")

def room_exists(self, path: str) -> bool:
"""
Expand Down
Loading