Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 229 additions & 6 deletions projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
from __future__ import annotations

import asyncio
import hashlib
import json
from collections.abc import Callable
from logging import Logger
from typing import Any

from jupyter_events import EventLogger
from jupyter_ydoc import YNotebook
from jupyter_ydoc import ydocs as YDOCS
from pycrdt import (
Assoc,
Channel,
Decoder,
Doc,
Encoder,
)
Expand All @@ -30,6 +34,13 @@ class DocumentRoom(YRoom):

_background_tasks: set[asyncio.Task]

# Deterministic rebuilds author content under a "marked" Yjs client id
# (>= this value). Real collaborative clients are y-websocket/Yjs, which use
# uint32 client ids (< 2^32), so any author id below the marker denotes a
# genuine client edit. The marker stays well under 2^53 so the id round-trips
# through Yjs (which requires JSON-safe integers).
_REBUILD_CLIENT_MARKER = 1 << 47

def __init__(
self,
room_id: str,
Expand Down Expand Up @@ -59,6 +70,10 @@ def __init__(
self._saving_document: asyncio.Task | None = None
self._messages: dict[str, asyncio.Lock] = {}
self._background_tasks = set()
self._deduplicating = False
# Client id used by the most recent deterministic rebuild from disk; lets
# the dedup logic recognise the authoritative on-disk copy of a cell.
self._rebuild_client_id: int | None = None

# Listen for document changes
self._document.observe(self._on_document_change)
Expand Down Expand Up @@ -182,17 +197,44 @@ async def initialize(self) -> None:
self.ready = True
self._emit(LogLevel.INFO, "initialize", "Room initialized")

@classmethod
def _content_client_id(cls, content: Any) -> int:
"""A deterministic, content-derived Yjs client id for a rebuild.

Identical content always yields the same id, so rebuilding an unchanged
document from disk reproduces the same Yjs history (a reconnecting client
sees no duplication). Crucially, *different* content yields a *different*
id: the new content then occupies coordinates a stale client lacks, so it
actually reaches the client instead of silently colliding with the old
content at the same (client_id, clock), which is what caused the "cell
reverts to a previous version" bug. The id is marked
(>= _REBUILD_CLIENT_MARKER) so it is distinguishable from real Yjs client ids.
"""
digest = hashlib.sha256(
json.dumps(content, sort_keys=True, default=str).encode("utf-8")
).digest()
marker = cls._REBUILD_CLIENT_MARKER
return (int.from_bytes(digest[:6], "big") & (marker - 1)) | marker

async def _apply_deterministic_source_content(self, content: Any) -> None:
"""Load source content using a deterministic update.
"""Load source content using a deterministic, content-addressed update.

Rooms rebuilt from disk must recreate the same Yjs history for identical
content, otherwise reconnecting clients can merge duplicate content from a
divergent history after server restart or room eviction.

The client ID needs to be fixed to a deterministic value, see:
https://discuss.yjs.dev/t/initial-offline-value-of-a-shared-document/465
The client id is derived from the content (see _content_client_id) rather
than fixed to 0: a fixed id makes *changed* content reuse the same
(client_id, clock) coordinates a stale client still holds for the old
content, so state-vector sync delivers nothing and the client silently
stays on the previous version. A content-addressed id keeps the
unchanged-content idempotency while letting changed content reach the
client (resolved afterwards by _deduplicate_cells).
See https://discuss.yjs.dev/t/initial-offline-value-of-a-shared-document/465
"""
source_ydoc: Doc = Doc(client_id=0)
client_id = self._content_client_id(content)
self._rebuild_client_id = client_id
source_ydoc: Doc = Doc(client_id=client_id)
source_document = YDOCS.get(self._file_type, YFILE)(source_ydoc)
await source_document.aset(content)
self.ydoc.apply_update(source_ydoc.get_update())
Expand Down Expand Up @@ -255,11 +297,187 @@ async def _handle_sync_message_error(
channel.path,
exc,
)
await channel.send(self._conflict_message())
return True

@staticmethod
def _conflict_message() -> bytes:
"""Build a RAW conflict notification for the frontend resolution dialog."""
encoder = Encoder()
encoder.write_var_uint(MessageType.RAW)
encoder.write_var_string(json.dumps({"type": "conflict"}))
await channel.send(encoder.to_bytes())
return True
return encoder.to_bytes()

async def _broadcast_conflict(self) -> None:
"""Send a RAW conflict notification to all connected clients."""
message = self._conflict_message()
for client in list(self.clients):
try:
await client.send(message)
except Exception as exc: # noqa: BLE001
self.log.warning("Failed to send conflict notification to %s: %s", client.path, exc)

def _has_duplicate_cell_ids(self) -> bool:
"""Cheaply check whether any cell ID appears more than once.

Reads only the ``id`` of each cell (no full conversion), so it is safe
to run on every cell change; the expensive content comparison in
_deduplicate_cells only runs when this returns True.
"""
if not isinstance(self._document, YNotebook):
return False
seen: set[str] = set()
ycells = self._document.ycells
for index in range(len(ycells)):
cell_id = ycells[index].get("id")
if cell_id is None:
continue
if cell_id in seen:
return True
seen.add(cell_id)
return False

def _maybe_deduplicate_cells(self) -> None:
"""Schedule duplicate-cell repair when duplicate cell IDs are present.

Duplicate cell IDs appear when a client that edited the document during
a session reconnects after the room was rebuilt deterministically
(client_id=0) from disk: the same cells then exist both under
client_id=0 and under the client's own id, and the reconnect sync
appends the client's copies next to the rebuilt ones.

Repair must run outside the observer (mutating the document inside a
change callback raises "read-only transaction"), so it is deferred to a
background task, mirroring the autosave scheduling.
"""
if self._deduplicating or self._update_lock.locked():
return
if not self._has_duplicate_cell_ids():
return
self._deduplicating = True
self.create_task(self._deduplicate_cells())

def _cell_client_ids(self, count: int) -> list[int | None]:
"""Return the originating Yjs client id of each cell, by index.

A deterministic rebuild from disk authors its cells under the content
derived ``self._rebuild_client_id`` (see _content_client_id), so a cell
whose id equals that value is the authoritative on-disk copy, while a
reconnecting client's stale cell carries a different (earlier rebuild's)
client id.
"""
client_ids: list[int | None] = []
with self._document.ydoc.transaction():
for index in range(count):
sticky = self._document.ycells.sticky_index(index, Assoc.AFTER)
item = sticky.to_json().get("item") if sticky is not None else None
client_ids.append(item.get("client") if item else None)
return client_ids

def _doc_has_client_edits(self) -> bool:
"""Whether any real collaborative client authored content in the room.

Deterministic rebuilds author cells under a marked client id
(>= _REBUILD_CLIENT_MARKER); Yjs clients use uint32 ids (< 2^32). So an
authoring client id below the marker (other than the room's own doc id,
used only for server-side housekeeping such as dedup deletions) means a
genuine client edit is present, which must never be silently discarded.

Read from the document state vector, which lists every client that has
inserted content. If it cannot be decoded we assume edits exist, so the
conservative branch keeps the client's copy.
"""
doc = self._document.ydoc
try:
decoder = Decoder(doc.get_state())
num_clients = decoder.read_var_uint()
own = doc.client_id
for _ in range(num_clients):
client = decoder.read_var_uint()
decoder.read_var_uint() # clock (unused)
if client < self._REBUILD_CLIENT_MARKER and client != own:
return True
except Exception: # noqa: BLE001
return True
return False

async def _deduplicate_cells(self) -> None:
"""Repair duplicate cells produced when a stale client reconnects.

Duplicates arise when the room is rebuilt from disk (content-addressed
client id) and a reconnecting client still holds an earlier copy of a
cell under a different rebuild id. Cells are grouped by id and resolved:

* Exact duplicates (identical content) collapse to one copy, preferring
the authoritative on-disk copy. No conflict.
* Divergent copies where the client made NO edits (a purely stale cache
of an older on-disk version): adopt the authoritative on-disk copy and
drop the stale one. No conflict; the client simply catches up.
* Divergent copies where the client DID edit (or the on-disk copy cannot
be identified): keep the client's copy and surface a conflict, so the
user's in-memory edits are never silently discarded (Revert restores
the on-disk version).
"""
has_conflict = False
try:
async with self._update_lock:
cells = self._document.get(deduplicate=False)["cells"]
client_ids = self._cell_client_ids(len(cells))
client_edited = self._doc_has_client_edits()
rebuild_id = self._rebuild_client_id

groups: dict[str, list[int]] = {}
for index, cell in enumerate(cells):
cell_id = cell.get("id")
if cell_id is not None:
groups.setdefault(cell_id, []).append(index)

to_delete: list[int] = []
for indices in groups.values():
if len(indices) < 2:
continue
disk = [i for i in indices if client_ids[i] == rebuild_id]
first = cells[indices[0]]
if all(cells[i] == first for i in indices[1:]):
# Identical content: keep one copy (prefer the on-disk one).
keep = disk[0] if disk else indices[0]
to_delete.extend(i for i in indices if i != keep)
elif disk and not client_edited:
# Purely stale cache: adopt the authoritative on-disk copy.
keep = disk[0]
to_delete.extend(i for i in indices if i != keep)
else:
# Client edits diverge from disk (or no on-disk copy):
# keep the client's copy and surface a conflict.
client_copies = [i for i in indices if i not in disk]
keep = client_copies[0] if client_copies else indices[0]
to_delete.extend(i for i in indices if i != keep)
has_conflict = True

if to_delete:
with self._document.ydoc.transaction():
for index in sorted(set(to_delete), reverse=True):
del self._document.ycells[index]
self.log.warning(
"Resolved %d duplicate cell(s) in room %s after a "
"stale-client reconnect",
len(to_delete),
self._room_id,
)
self._emit(
LogLevel.WARNING,
"deduplicate",
f"Resolved {len(to_delete)} duplicate cell(s).",
)
finally:
self._deduplicating = False

if has_conflict:
self.log.warning(
"Divergent cell edits in room %s; surfacing conflict",
self._room_id,
)
await self._broadcast_conflict()

async def _on_outofband_change(self) -> None:
"""
Expand Down Expand Up @@ -302,6 +520,11 @@ def _on_document_change(self, target: str, event: Any) -> None:
document. This tasks are debounced (60 seconds by default) so we
need to cancel previous tasks before creating a new one.
"""
# Repair duplicate cells from a divergent-history merge (e.g. a client
# reconnecting after a deterministic room rebuild).
if target == "cells":
self._maybe_deduplicate_cells()

# Collect autosave values from all clients
autosave_states = [
state.get("autosave", True)
Expand Down
Loading
Loading