Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,27 @@ async def get_document(
return None

async def stop_extension(self):
# Cancel tasks and clean up
await asyncio.wait(
self.log.info("Stopping collaboration extension...")
_, pending = await asyncio.wait(
[
asyncio.create_task(self.ywebsocket_server.clean()),
asyncio.create_task(self.file_loaders.clear()),
],
timeout=3,
)
if pending:
self.log.warning(
"Collaboration shutdown: %d task(s) still pending after 3s timeout, cancelling.",
len(pending),
)
for task in pending:
task.cancel()
# Wait briefly for cancellation to propagate
_, still_pending = await asyncio.wait(pending, timeout=1)
if still_pending:
self.log.error(
"Collaboration shutdown: %d task(s) could not be cancelled.",
len(still_pending),
)
else:
self.log.info("Collaboration extension stopped cleanly.")
7 changes: 6 additions & 1 deletion projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,13 @@ def create_task(self, aw):

async def prepare(self):
await ensure_async(super().prepare())
if self._websocket_server.stopping:
raise web.HTTPError(503, "Server is shutting down")
if not self._websocket_server.started.is_set():
self.create_task(self._websocket_server.start())
task = asyncio.create_task(self._websocket_server.start())
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
self._websocket_server._start_task = task

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could have self.create_task() return the task:

Suggested change
task = asyncio.create_task(self._websocket_server.start())
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
self._websocket_server._start_task = task
self._websocket_server._start_task = self.create_task(self._websocket_server.start())

await self._websocket_server.started.wait()

# Get room
Expand Down
63 changes: 45 additions & 18 deletions projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,38 +55,65 @@ def __init__(
self.connected_users: dict[Any, Any] = {}
# Async loop is not yet ready at the object instantiation
self.monitor_task: asyncio.Task | None = None
self._stopping = False
self._start_task: asyncio.Task | None = None

@property
def stopping(self) -> bool:
return self._stopping

async def clean(self):
# TODO: should we wait for any save task?
self.log.info("Deleting all rooms.")
# FIXME some clean up should be upstreamed and the following does not
# prevent hanging stop process - it also requires some thinking about
# should the ystore write action be cancelled; I guess not as it could
# results in corrupted data.
# room_tasks = list()
# for name, room in list(self.rooms.items()):
# for task in room.background_tasks:
# task.cancel() # FIXME should be upstreamed
# room_tasks.append(task)
# if room_tasks:
# _, pending = await asyncio.wait(room_tasks, timeout=3)
# if pending:
# msg = f"{len(pending)} room task(s) are pending."
# self.log.warning(msg)
# self.log.debug("Pending tasks: %r", pending)

await self.stop()
tasks = []
self.log.info("Cleaning up %d room(s).", len(self.rooms))
# Reject new WebSocket connections in YDocWebSocketHandler.prepare().
self._stopping = True

# Stop the server first to reject any new connections during shutdown.
# Guard: stop() raises RuntimeError if the server was never started.
if self.started.is_set():
await self.stop()

# Now disconnect existing clients so their serve() tasks complete.
tasks: list[asyncio.Task] = []
for _, room in list(self.rooms.items()):

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for _, room in list(self.rooms.items()):
for room in list(self.rooms.values()):

for client in list(room.clients):
tasks.extend(client._background_tasks) # type: ignore[attr-defined]
client._message_queue.put_nowait(b"") # type: ignore[attr-defined]

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be nice to have teh client have a disconnect method that just handles this. looks rather mysterious from here.


if self.monitor_task is not None:
self.monitor_task.cancel()
tasks.append(self.monitor_task)

if tasks:
_, pending = await asyncio.wait(tasks, timeout=3)
if pending:
msg = f"{len(pending)} task(s) are pending."
self.log.warning(msg)
self.log.debug("Pending tasks: %r", pending)
self.log.warning("%d task(s) still pending after shutdown.", len(pending))

# Rooms persist after client disconnect so users can reconnect without
# a full re-sync (see auto_clean_rooms=False in app.py). On shutdown
# we must stop them explicitly.
for _, room in list(self.rooms.items()):

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for _, room in list(self.rooms.items()):
for room in list(self.rooms.values()):

try:
await room.stop()
except RuntimeError as e:
if "not running" not in str(e):
raise
self.rooms.clear()

# Await the root asyncio task so anyio's task group __aexit__ runs
# and fires done-callbacks that clean up worker threads. Without this,
# non-daemon WorkerThread instances block Python's threading._shutdown().
if self._start_task is not None and not self._start_task.done():
try:
await asyncio.wait_for(self._start_task, timeout=2)
except asyncio.TimeoutError:
# Letting this propagate would be silently swallowed by
# asyncio.wait() in stop_extension(), so we log it here.
self.log.warning("Server start task did not complete within 2s.")

def room_exists(self, path: str) -> bool:
"""
Expand Down