Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions invokeai/app/api/sockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,21 @@ async def _handle_sub_bulk_download(self, sid: str, data: Any) -> None:
async def _handle_unsub_bulk_download(self, sid: str, data: Any) -> None:
await self._sio.leave_room(sid, BulkDownloadSubscriptionEvent(**data).bulk_download_id)

async def _broadcast_queue_counts_changed(self, queue_id: str):
"""Broadcast a content-free signal that the queue's aggregate counts may have changed.

The detailed queue item events (status changes, enqueues) are private to their owner
and admins, so non-admin subscribers never learn when *other* users' jobs are queued,
started, or finished. Without that signal their badge's global total — and the
in_progress count that drives the progress animation — go stale and get stuck once
their own jobs finish.

This event carries nothing but the queue_id: no user_id, batch_id, session_id, or
counts. Every subscriber simply refetches GET /queue/{queue_id}/status, which already
redacts per-user data, so broadcasting it to the whole queue room leaks nothing.
"""
await self._sio.emit(event="queue_counts_changed", data={"queue_id": queue_id}, room=queue_id)

async def _handle_queue_event(self, event: FastAPIEvent[QueueEventBase]):
"""Handle queue events with user isolation.

Expand Down Expand Up @@ -313,6 +328,10 @@ async def _handle_queue_event(self, event: FastAPIEvent[QueueEventBase]):

logger.debug(f"Emitted private queue item event {event_name} to user room {user_room} and admin room")

# A status change shifts the global pending/in_progress totals, so nudge every
# subscriber to refetch their (redacted) queue status — see _broadcast_queue_counts_changed.
await self._broadcast_queue_counts_changed(event_data.queue_id)

# RecallParametersUpdatedEvent is private - only emit to owner + admins.
#
# Emit to the union of the owner room and the admin room in a SINGLE
Expand Down Expand Up @@ -340,6 +359,10 @@ async def _handle_queue_event(self, event: FastAPIEvent[QueueEventBase]):
await self._sio.emit(event=event_name, data=event_data.model_dump(mode="json"), room="admin")
logger.debug(f"Emitted private batch_enqueued event to user room {user_room} and admin room")

# Newly enqueued items raise the global total, so nudge every subscriber to
# refetch their (redacted) queue status — see _broadcast_queue_counts_changed.
await self._broadcast_queue_counts_changed(event_data.queue_id)

else:
# For remaining queue events (e.g. QueueClearedEvent) that do not
# carry user identity, emit to all subscribers in the queue room.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { logger } from 'app/logging/logger';
import type { AppDispatch, AppGetState } from 'app/store/store';
import { selectCurrentUser } from 'features/auth/store/authSlice';
import { canvasWorkflowIntegrationProcessingCompleted } from 'features/controlLayers/store/canvasWorkflowIntegrationSlice';
import {
selectAutoSwitch,
Expand Down Expand Up @@ -45,6 +46,20 @@ export const buildOnInvocationComplete = (
completedInvocationKeysByItemId: Map<number, Set<string>>
) => {
const addImagesToGallery = async (data: S['InvocationCompleteEvent']) => {
// In multiuser mode, admins are subscribed to the "admin" socket room and therefore receive
// invocation events for *every* user, not just their own. Those images belong to another
// user's gallery and boards — we must not insert them into this client's gallery or, worse,
// auto-switch the selected board to the other user's board and select their image.
//
// Only gate this when we actually know who is logged in. In single-user mode there is no
// authenticated user (selectCurrentUser is null) and every event is the local user's own, so
// we fall through and preserve the original behavior.
const currentUser = selectCurrentUser(getState());
if (currentUser && data.user_id !== currentUser.user_id) {
log.trace(`Skipping gallery update for image owned by another user (${data.user_id})`);
return;
}

if (nodeTypeDenylist.includes(data.invocation.type)) {
log.trace(`Skipping denylisted node type (${data.invocation.type})`);
return;
Expand Down
10 changes: 10 additions & 0 deletions invokeai/frontend/web/src/services/events/setEventListeners.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,16 @@ export const setEventListeners = ({ socket, store, setIsConnected }: SetEventLis
}
});

socket.on('queue_counts_changed', (data) => {
// A content-free broadcast that the queue's global counts may have changed — typically
// because *another* user enqueued or one of their jobs changed status. We never receive
// those users' private queue item events, so this is the only signal that lets a non-admin's
// badge keep its global total (the "/Y" in "X/Y") and the in_progress-driven progress bar
// current. Only the redacted SessionQueueStatus is refetched; no per-user/batch caches.
log.trace({ data }, 'Queue counts changed');
dispatch(queueApi.util.invalidateTags(['SessionQueueStatus']));
});

socket.on('queue_cleared', (data) => {
log.debug({ data }, 'Queue cleared');
dispatch(
Expand Down
5 changes: 5 additions & 0 deletions invokeai/frontend/web/src/services/events/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ export type ServerToClientEvents = {
model_install_cancelled: (payload: S['ModelInstallCancelledEvent']) => void;
model_load_complete: (payload: S['ModelLoadCompleteEvent']) => void;
queue_item_status_changed: (payload: S['QueueItemStatusChangedEvent']) => void;
// Content-free broadcast to the whole queue room: the global queue counts may have changed
// (some user enqueued or a job changed status). Carries only queue_id — no per-user data —
// so every subscriber can refetch the redacted queue status. Emitted by the backend socket
// layer, not the event bus, so it has no generated `S[...]` schema type.
queue_counts_changed: (payload: { queue_id: string }) => void;
queue_cleared: (payload: S['QueueClearedEvent']) => void;
batch_enqueued: (payload: S['BatchEnqueuedEvent']) => void;
queue_items_retried: (payload: S['QueueItemsRetriedEvent']) => void;
Expand Down
42 changes: 31 additions & 11 deletions tests/app/routers/test_multiuser_authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -1787,15 +1787,26 @@ def test_queue_item_status_changed_routed_privately(self, socketio: Any) -> None

asyncio.run(socketio._handle_queue_event(("queue_item_status_changed", event)))

rooms_emitted_to = [call.kwargs.get("room") for call in mock_emit.call_args_list]
assert "user:owner-xyz" in rooms_emitted_to
assert "admin" in rooms_emitted_to
# CRITICAL: must NOT emit to the queue_id room — that would leak to other users
assert "default" not in rooms_emitted_to
emits = [(call.kwargs.get("event"), call.kwargs.get("room")) for call in mock_emit.call_args_list]

# The private queue_item_status_changed event carries unsanitized per-user
# metadata and must go ONLY to the owner + admin rooms.
private_rooms = [room for name, room in emits if name == "queue_item_status_changed"]
assert "user:owner-xyz" in private_rooms
assert "admin" in private_rooms
# CRITICAL: must NOT emit the private event to the queue_id room — that would leak to other users
assert "default" not in private_rooms

# A redacted queue_counts_changed nudge (no per-user data) IS broadcast to the
# whole queue room so every subscriber's badge stays live across users.
counts_rooms = [room for name, room in emits if name == "queue_counts_changed"]
assert "default" in counts_rooms

def test_batch_enqueued_routed_privately(self, socketio: Any) -> None:
"""Verify that _handle_queue_event emits BatchEnqueuedEvent ONLY to
user:{user_id} and admin rooms, never to the queue_id room."""
"""Verify that _handle_queue_event emits the private BatchEnqueuedEvent ONLY to
user:{user_id} and admin rooms, never to the queue_id room. A redacted
queue_counts_changed nudge is still broadcast to the queue room so every
user's badge stays live."""
import asyncio
from unittest.mock import AsyncMock

Expand All @@ -1821,10 +1832,19 @@ def test_batch_enqueued_routed_privately(self, socketio: Any) -> None:

asyncio.run(socketio._handle_queue_event(("batch_enqueued", event)))

rooms_emitted_to = [call.kwargs.get("room") for call in mock_emit.call_args_list]
assert "user:owner-zzz" in rooms_emitted_to
assert "admin" in rooms_emitted_to
assert "default" not in rooms_emitted_to
emits = [(call.kwargs.get("event"), call.kwargs.get("room")) for call in mock_emit.call_args_list]

# The private batch_enqueued event carries the enqueuing user's batch_id/origin/
# counts and must go ONLY to the owner + admin rooms.
private_rooms = [room for name, room in emits if name == "batch_enqueued"]
assert "user:owner-zzz" in private_rooms
assert "admin" in private_rooms
assert "default" not in private_rooms

# A redacted queue_counts_changed nudge (no per-user data) IS broadcast to the
# whole queue room so every subscriber's badge stays live across users.
counts_rooms = [room for name, room in emits if name == "queue_counts_changed"]
assert "default" in counts_rooms

def test_queue_cleared_still_broadcast(self, socketio: Any) -> None:
"""QueueClearedEvent does not carry user identity and should still be broadcast
Expand Down
Loading