diff --git a/invokeai/app/api/sockets.py b/invokeai/app/api/sockets.py index 7e93c332d64..144a1e9a2c0 100644 --- a/invokeai/app/api/sockets.py +++ b/invokeai/app/api/sockets.py @@ -260,6 +260,21 @@ async def _handle_sub_bulk_download(self, sid: str, data: Any) -> None: async def _handle_unsub_bulk_download(self, sid: str, data: Any) -> None: await self._sio.leave_room(sid, BulkDownloadSubscriptionEvent(**data).bulk_download_id) + async def _broadcast_queue_counts_changed(self, queue_id: str): + """Broadcast a content-free signal that the queue's aggregate counts may have changed. + + The detailed queue item events (status changes, enqueues) are private to their owner + and admins, so non-admin subscribers never learn when *other* users' jobs are queued, + started, or finished. Without that signal their badge's global total — and the + in_progress count that drives the progress animation — go stale and get stuck once + their own jobs finish. + + This event carries nothing but the queue_id: no user_id, batch_id, session_id, or + counts. Every subscriber simply refetches GET /queue/{queue_id}/status, which already + redacts per-user data, so broadcasting it to the whole queue room leaks nothing. + """ + await self._sio.emit(event="queue_counts_changed", data={"queue_id": queue_id}, room=queue_id) + async def _handle_queue_event(self, event: FastAPIEvent[QueueEventBase]): """Handle queue events with user isolation. @@ -313,6 +328,10 @@ async def _handle_queue_event(self, event: FastAPIEvent[QueueEventBase]): logger.debug(f"Emitted private queue item event {event_name} to user room {user_room} and admin room") + # A status change shifts the global pending/in_progress totals, so nudge every + # subscriber to refetch their (redacted) queue status — see _broadcast_queue_counts_changed. + await self._broadcast_queue_counts_changed(event_data.queue_id) + # RecallParametersUpdatedEvent is private - only emit to owner + admins. # # Emit to the union of the owner room and the admin room in a SINGLE @@ -340,6 +359,10 @@ async def _handle_queue_event(self, event: FastAPIEvent[QueueEventBase]): await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room="admin") logger.debug(f"Emitted private batch_enqueued event to user room {user_room} and admin room") + # Newly enqueued items raise the global total, so nudge every subscriber to + # refetch their (redacted) queue status — see _broadcast_queue_counts_changed. + await self._broadcast_queue_counts_changed(event_data.queue_id) + else: # For remaining queue events (e.g. QueueClearedEvent) that do not # carry user identity, emit to all subscribers in the queue room. diff --git a/invokeai/frontend/web/src/services/events/onInvocationComplete.tsx b/invokeai/frontend/web/src/services/events/onInvocationComplete.tsx index ea6a237d4b2..0c83fa718f0 100644 --- a/invokeai/frontend/web/src/services/events/onInvocationComplete.tsx +++ b/invokeai/frontend/web/src/services/events/onInvocationComplete.tsx @@ -1,5 +1,6 @@ import { logger } from 'app/logging/logger'; import type { AppDispatch, AppGetState } from 'app/store/store'; +import { selectCurrentUser } from 'features/auth/store/authSlice'; import { canvasWorkflowIntegrationProcessingCompleted } from 'features/controlLayers/store/canvasWorkflowIntegrationSlice'; import { selectAutoSwitch, @@ -45,6 +46,20 @@ export const buildOnInvocationComplete = ( completedInvocationKeysByItemId: Map> ) => { const addImagesToGallery = async (data: S['InvocationCompleteEvent']) => { + // In multiuser mode, admins are subscribed to the "admin" socket room and therefore receive + // invocation events for *every* user, not just their own. Those images belong to another + // user's gallery and boards — we must not insert them into this client's gallery or, worse, + // auto-switch the selected board to the other user's board and select their image. + // + // Only gate this when we actually know who is logged in. In single-user mode there is no + // authenticated user (selectCurrentUser is null) and every event is the local user's own, so + // we fall through and preserve the original behavior. + const currentUser = selectCurrentUser(getState()); + if (currentUser && data.user_id !== currentUser.user_id) { + log.trace(`Skipping gallery update for image owned by another user (${data.user_id})`); + return; + } + if (nodeTypeDenylist.includes(data.invocation.type)) { log.trace(`Skipping denylisted node type (${data.invocation.type})`); return; diff --git a/invokeai/frontend/web/src/services/events/setEventListeners.tsx b/invokeai/frontend/web/src/services/events/setEventListeners.tsx index e6010ce4ca1..05b23e74428 100644 --- a/invokeai/frontend/web/src/services/events/setEventListeners.tsx +++ b/invokeai/frontend/web/src/services/events/setEventListeners.tsx @@ -451,6 +451,16 @@ export const setEventListeners = ({ socket, store, setIsConnected }: SetEventLis } }); + socket.on('queue_counts_changed', (data) => { + // A content-free broadcast that the queue's global counts may have changed — typically + // because *another* user enqueued or one of their jobs changed status. We never receive + // those users' private queue item events, so this is the only signal that lets a non-admin's + // badge keep its global total (the "/Y" in "X/Y") and the in_progress-driven progress bar + // current. Only the redacted SessionQueueStatus is refetched; no per-user/batch caches. + log.trace({ data }, 'Queue counts changed'); + dispatch(queueApi.util.invalidateTags(['SessionQueueStatus'])); + }); + socket.on('queue_cleared', (data) => { log.debug({ data }, 'Queue cleared'); dispatch( diff --git a/invokeai/frontend/web/src/services/events/types.ts b/invokeai/frontend/web/src/services/events/types.ts index 8937dcc451d..e8dff693cb4 100644 --- a/invokeai/frontend/web/src/services/events/types.ts +++ b/invokeai/frontend/web/src/services/events/types.ts @@ -26,6 +26,11 @@ export type ServerToClientEvents = { model_install_cancelled: (payload: S['ModelInstallCancelledEvent']) => void; model_load_complete: (payload: S['ModelLoadCompleteEvent']) => void; queue_item_status_changed: (payload: S['QueueItemStatusChangedEvent']) => void; + // Content-free broadcast to the whole queue room: the global queue counts may have changed + // (some user enqueued or a job changed status). Carries only queue_id — no per-user data — + // so every subscriber can refetch the redacted queue status. Emitted by the backend socket + // layer, not the event bus, so it has no generated `S[...]` schema type. + queue_counts_changed: (payload: { queue_id: string }) => void; queue_cleared: (payload: S['QueueClearedEvent']) => void; batch_enqueued: (payload: S['BatchEnqueuedEvent']) => void; queue_items_retried: (payload: S['QueueItemsRetriedEvent']) => void; diff --git a/tests/app/routers/test_multiuser_authorization.py b/tests/app/routers/test_multiuser_authorization.py index 8cca29dee69..4fcf5997054 100644 --- a/tests/app/routers/test_multiuser_authorization.py +++ b/tests/app/routers/test_multiuser_authorization.py @@ -1787,15 +1787,26 @@ def test_queue_item_status_changed_routed_privately(self, socketio: Any) -> None asyncio.run(socketio._handle_queue_event(("queue_item_status_changed", event))) - rooms_emitted_to = [call.kwargs.get("room") for call in mock_emit.call_args_list] - assert "user:owner-xyz" in rooms_emitted_to - assert "admin" in rooms_emitted_to - # CRITICAL: must NOT emit to the queue_id room — that would leak to other users - assert "default" not in rooms_emitted_to + emits = [(call.kwargs.get("event"), call.kwargs.get("room")) for call in mock_emit.call_args_list] + + # The private queue_item_status_changed event carries unsanitized per-user + # metadata and must go ONLY to the owner + admin rooms. + private_rooms = [room for name, room in emits if name == "queue_item_status_changed"] + assert "user:owner-xyz" in private_rooms + assert "admin" in private_rooms + # CRITICAL: must NOT emit the private event to the queue_id room — that would leak to other users + assert "default" not in private_rooms + + # A redacted queue_counts_changed nudge (no per-user data) IS broadcast to the + # whole queue room so every subscriber's badge stays live across users. + counts_rooms = [room for name, room in emits if name == "queue_counts_changed"] + assert "default" in counts_rooms def test_batch_enqueued_routed_privately(self, socketio: Any) -> None: - """Verify that _handle_queue_event emits BatchEnqueuedEvent ONLY to - user:{user_id} and admin rooms, never to the queue_id room.""" + """Verify that _handle_queue_event emits the private BatchEnqueuedEvent ONLY to + user:{user_id} and admin rooms, never to the queue_id room. A redacted + queue_counts_changed nudge is still broadcast to the queue room so every + user's badge stays live.""" import asyncio from unittest.mock import AsyncMock @@ -1821,10 +1832,19 @@ def test_batch_enqueued_routed_privately(self, socketio: Any) -> None: asyncio.run(socketio._handle_queue_event(("batch_enqueued", event))) - rooms_emitted_to = [call.kwargs.get("room") for call in mock_emit.call_args_list] - assert "user:owner-zzz" in rooms_emitted_to - assert "admin" in rooms_emitted_to - assert "default" not in rooms_emitted_to + emits = [(call.kwargs.get("event"), call.kwargs.get("room")) for call in mock_emit.call_args_list] + + # The private batch_enqueued event carries the enqueuing user's batch_id/origin/ + # counts and must go ONLY to the owner + admin rooms. + private_rooms = [room for name, room in emits if name == "batch_enqueued"] + assert "user:owner-zzz" in private_rooms + assert "admin" in private_rooms + assert "default" not in private_rooms + + # A redacted queue_counts_changed nudge (no per-user data) IS broadcast to the + # whole queue room so every subscriber's badge stays live across users. + counts_rooms = [room for name, room in emits if name == "queue_counts_changed"] + assert "default" in counts_rooms def test_queue_cleared_still_broadcast(self, socketio: Any) -> None: """QueueClearedEvent does not carry user identity and should still be broadcast