From 2e3d9ae61bef763042b2c93a0d6e3917d6a0274d Mon Sep 17 00:00:00 2001 From: Quin Gillespie Date: Wed, 25 Mar 2026 09:40:47 -0600 Subject: [PATCH 01/12] Start of the new Braille APIs --- source/braille.py | 182 ++++++++++++++++- source/braillePipeServer.py | 386 ++++++++++++++++++++++++++++++++++++ 2 files changed, 566 insertions(+), 2 deletions(-) create mode 100644 source/braillePipeServer.py diff --git a/source/braille.py b/source/braille.py index c1c0abd4d8c..2e8ed075934 100644 --- a/source/braille.py +++ b/source/braille.py @@ -1,8 +1,8 @@ # A part of NonVisual Desktop Access (NVDA) # This file is covered by the GNU General Public License. # See the file COPYING for more details. -# Copyright (C) 2008-2025 NV Access Limited, Joseph Lee, Babbage B.V., Davy Kager, Bram Duvigneau, -# Leonard de Ruijter, Burman's Computer and Education Ltd., Julien Cochuyt +# Copyright (C) 2008-2026 NV Access Limited, Joseph Lee, Babbage B.V., Davy Kager, Bram Duvigneau, +# Leonard de Ruijter, Burman's Computer and Education Ltd., Julien Cochuyt, Pneuma Solutions from enum import StrEnum import itertools @@ -67,6 +67,7 @@ import hwPortUtils import bdDetect import queueHandler +import winUser import brailleViewer from autoSettingsUtils.driverSetting import BooleanDriverSetting, NumericDriverSetting from utils.security import objectBelowLockScreenAndWindowsIsLocked, post_sessionLockStateChanged @@ -3434,6 +3435,9 @@ def initialize(): handler = BrailleHandler() handler.handlePostConfigProfileSwitch() config.post_configProfileSwitch.register(handler.handlePostConfigProfileSwitch) + import braillePipeServer + + braillePipeServer.initialize() def pumpAll(): @@ -3443,6 +3447,9 @@ def pumpAll(): def terminate(): global handler + import braillePipeServer + + braillePipeServer.terminate() handler.terminate() handler = None @@ -3969,6 +3976,177 @@ def getDisplayTextForIdentifier(cls, identifier): inputCore.registerGestureSource("br", BrailleDisplayGesture) +class BrailleMirror: + """Abstract base class for a braille mirror. + + A mirror intercepts every braille display update and can optionally influence the negotiated display width. + Both the physical display and all registered mirrors receive the same cells simultaneously; the mirror does **not** suppress the local display. + Register an instance with :func:`registerMirror` and remove it with :func:`unregisterMirror`. To inject a gesture back into NVDA (e.g. a routing key received over a remote channel) use :func:`injectGesture`. + """ + + def display(self, cells: List[int]) -> None: + """Called with the full cell array on every display update. + + :param cells: The braille cells written to the display. + """ + + def numCells(self) -> int: + """Return the number of cells this mirror can show. + + Return 0 (the default) to have no effect on the negotiated display + width. A positive value caps the display width used by + :data:`filter_displayDimensions` to the smallest value across all registered mirrors and the physical display. + """ + return 0 + + +_registeredMirrors: List["BrailleMirror"] = [] + + +def _mirrorPreWriteCells(cells: List[int], **kwargs) -> None: + for mirror in _registeredMirrors: + mirror.display(cells) + + +def _mirrorFilterDisplayDimensions(value: DisplayDimensions) -> DisplayDimensions: + sizes = [m.numCells() for m in _registeredMirrors if m.numCells() > 0] + if not sizes: + return value + cap = min(sizes) + if cap >= value.numCols: + return value + return value._replace(numCols=cap) + + +def registerMirror(mirror: BrailleMirror) -> None: + """Register *mirror* to receive braille display updates. + + :meth:`BrailleMirror.display` will be called on the main thread for every subsequent :meth:`BrailleHandler._writeCells` call. If *mirror* returns a positive value from :meth:`BrailleMirror.numCells`, it will also participate in display-width negotiation via :data:`filter_displayDimensions`. + """ + if not _registeredMirrors: + pre_writeCells.register(_mirrorPreWriteCells) + filter_displayDimensions.register(_mirrorFilterDisplayDimensions) + _registeredMirrors.append(mirror) + if handler: + handler._refreshEnabled(block=True) + + +def unregisterMirror(mirror: BrailleMirror) -> None: + """Remove a previously registered mirror. + + Safe to call even if *mirror* is not currently registered. + """ + try: + _registeredMirrors.remove(mirror) + except ValueError: + return + if not _registeredMirrors: + pre_writeCells.unregister(_mirrorPreWriteCells) + filter_displayDimensions.unregister(_mirrorFilterDisplayDimensions) + if handler: + handler._refreshEnabled(block=True) + + +def injectGesture(gesture: BrailleDisplayGesture) -> None: + """Inject *gesture* into NVDA's input pipeline. + + This is a thin wrapper around :func:`inputCore.manager.executeGesture` that silently swallows :class:`inputCore.NoInputGestureAction` so callers do not need to handle the common case where no script is bound. + + Thread safety: must be called on the main thread, or scheduled via ``wx.CallAfter`` from a background thread. + """ + try: + inputCore.manager.executeGesture(gesture) + except inputCore.NoInputGestureAction: + pass + + +class DirectBrailleWindow: + """Take over braille output and input while a specific window has focus. + + When the window identified by *hwnd* is the foreground window, NVDA's own braille rendering is suppressed. The application drives what appears on the physical display by calling :meth:`display`, and all braille gestures are forwarded to :meth:`onGesture` instead of being processed by NVDA. + When the window loses focus, normal NVDA rendering resumes automatically. + + :param hwnd: The HWND of the window that triggers direct braille mode. + :param numCells: Advertised display width; 0 means use whatever NVDA provides. A positive value caps the negotiated display width via :data:`filter_displayDimensions`. + """ + + def __init__(self, hwnd: int, numCells: int = 0) -> None: + self._hwnd = hwnd + self._numCells = numCells + self._active = False + + def _isForeground(self) -> bool: + """Return True if our registered window is currently in the foreground.""" + fg = winUser.getForegroundWindow() + return fg == self._hwnd or winUser.isDescendantWindow(self._hwnd, fg) + + def display(self, cells: List[int]) -> None: + """Push *cells* to the physical braille display. + + Has no effect if the registered window is not currently foreground or if no braille display is connected. + + Thread safety: safe to call from any thread; the actual write is dispatched to the main thread via ``wx.CallAfter``. + """ + if handler and self._isForeground(): + wx.CallAfter(handler._writeCells, cells) + + def onGesture(self, gesture: BrailleDisplayGesture) -> None: + """Called when a braille gesture arrives while this window is active. + + The default implementation does nothing, so gestures are suppressed. Subclass and override to handle them. + + :param gesture: The braille gesture that was intercepted. + """ + + def _handleDecideEnabled(self) -> bool: + return not self._isForeground() + + def _handleDecideExecuteGesture(self, gesture: inputCore.InputGesture) -> bool: + if not self._isForeground(): + return True + if isinstance(gesture, BrailleDisplayGesture): + self.onGesture(gesture) + return False + return True + + def _handleFilterDisplayDimensions(self, value: DisplayDimensions) -> DisplayDimensions: + if self._numCells <= 0 or not self._isForeground(): + return value + if self._numCells >= value.numCols: + return value + return value._replace(numCols=self._numCells) + + def activate(self) -> None: + """Start intercepting braille output and input for the registered window. + + Safe to call even if already active (subsequent calls are no-ops). + """ + if self._active: + return + self._active = True + decide_enabled.register(self._handleDecideEnabled) + inputCore.decide_executeGesture.register(self._handleDecideExecuteGesture) + if self._numCells > 0: + filter_displayDimensions.register(self._handleFilterDisplayDimensions) + if handler: + handler._refreshEnabled(block=True) + + def deactivate(self) -> None: + """Stop intercepting braille, restoring NVDA's normal rendering. + + Safe to call even if not currently active. + """ + if not self._active: + return + self._active = False + decide_enabled.unregister(self._handleDecideEnabled) + inputCore.decide_executeGesture.unregister(self._handleDecideExecuteGesture) + if self._numCells > 0: + filter_displayDimensions.unregister(self._handleFilterDisplayDimensions) + if handler: + handler._refreshEnabled(block=True) + + def getSerialPorts(filterFunc=None) -> typing.Iterator[typing.Tuple[str, str]]: """Get available serial ports in a format suitable for L{BrailleDisplayDriver.getManualPorts}. @param filterFunc: a function executed on every dictionary retrieved using L{hwPortUtils.listComPorts}. diff --git a/source/braillePipeServer.py b/source/braillePipeServer.py new file mode 100644 index 00000000000..6330fd72c4a --- /dev/null +++ b/source/braillePipeServer.py @@ -0,0 +1,386 @@ +# A part of NonVisual Desktop Access (NVDA) +# This file is covered by the GNU General Public License. +# See the file COPYING for more details. +# Copyright (C) 2026 Pneuma Solutions + +"""Named-pipe IPC server that exposes the Braille Mirror and Direct Braille Window APIs to external processes. + +Two pipe names are used: + +* ``\\\\.\\pipe\\nvda_braille`` – normal desktop instance +* ``\\\\.\\pipe\\nvda_braille_secure`` – secure desktop instance + +Wire protocol: +Every message is length-prefixed: 4 bytes little-endian uint32 body length, followed by a UTF-8 JSON object. + +One connection handles one role (mirror or direct braille). +""" + +import ctypes +import ctypes.wintypes +import json +import queue +import struct +import threading +from typing import Optional + +import wx + +import braille +import inputCore +from logHandler import log +from utils.security import isRunningOnSecureDesktop + +_kernel32 = ctypes.windll.kernel32 # type: ignore[attr-defined] + +INVALID_HANDLE_VALUE = ctypes.wintypes.HANDLE(-1).value +ERROR_PIPE_CONNECTED = 535 +ERROR_BROKEN_PIPE = 109 +ERROR_IO_PENDING = 997 +PIPE_ACCESS_DUPLEX = 0x00000003 +PIPE_TYPE_BYTE = 0x00000000 +PIPE_READMODE_BYTE = 0x00000000 +PIPE_WAIT = 0x00000000 +PIPE_UNLIMITED_INSTANCES = 255 +NMPWAIT_USE_DEFAULT_WAIT = 0 +FILE_FLAG_OVERLAPPED = 0x40000000 + + +def _createPipeInstance(name: str) -> ctypes.wintypes.HANDLE: + """Create a single named-pipe instance and return its handle.""" + handle = _kernel32.CreateNamedPipeW( + name, + PIPE_ACCESS_DUPLEX, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, + PIPE_UNLIMITED_INSTANCES, + 65536, + 65536, + NMPWAIT_USE_DEFAULT_WAIT, + None, + ) + if handle == INVALID_HANDLE_VALUE: + raise ctypes.WinError() + return handle + + +def _connectClient(handle: ctypes.wintypes.HANDLE) -> bool: + """Block until a client connects. Return False if the pipe is broken.""" + result = _kernel32.ConnectNamedPipe(handle, None) + if result: + return True + err = ctypes.GetLastError() + if err == ERROR_PIPE_CONNECTED: + return True + return False + + +def _readExact(handle: ctypes.wintypes.HANDLE, n: int) -> Optional[bytes]: + """Read exactly *n* bytes from *handle*. Return None on pipe break.""" + buf = (ctypes.c_char * n)() + total = 0 + while total < n: + read = ctypes.wintypes.DWORD(0) + ok = _kernel32.ReadFile(handle, ctypes.byref(buf, total), n - total, ctypes.byref(read), None) + if not ok or read.value == 0: + return None + total += read.value + return bytes(buf) + + +def _writeAll(handle: ctypes.wintypes.HANDLE, data: bytes) -> bool: + """Write all of *data* to *handle*. Return False on pipe break.""" + offset = 0 + while offset < len(data): + written = ctypes.wintypes.DWORD(0) + ok = _kernel32.WriteFile( + handle, + ctypes.c_char_p(data[offset:]), + len(data) - offset, + ctypes.byref(written), + None, + ) + if not ok or written.value == 0: + return False + offset += written.value + return True + + +def _sendMessage(handle: ctypes.wintypes.HANDLE, obj: dict) -> bool: + body = json.dumps(obj).encode("utf-8") + frame = struct.pack(" Optional[dict]: + header = _readExact(handle, 4) + if header is None: + return None + (length,) = struct.unpack(" None: + super().__init__() + self._source = source + self._model = model or None + self._id = id_ + self.routingIndex = routingIndex + self.dots = dots + self.space = space + + def _get_source(self) -> str: + return self._source + + def _get_model(self) -> str: + return self._model + + def _get_id(self) -> str: + return self._id + + +class _MirrorSession(braille.BrailleMirror): + """BrailleMirror that forwards display updates over the pipe.""" + + def __init__(self, handle: ctypes.wintypes.HANDLE, numCells: int) -> None: + self._handle = handle + self._numCells = numCells + braille.registerMirror(self) + if braille.handler: + self._notifyDisplaySize(braille.handler.displayDimensions.numCols) + + def numCells(self) -> int: + return self._numCells + + def display(self, cells: list) -> None: + _sendMessage(self._handle, {"type": "display", "cells": cells}) + + def _notifyDisplaySize(self, numCols: int) -> None: + _sendMessage(self._handle, {"type": "display_size", "numCols": numCols}) + + def handleMessage(self, msg: dict) -> None: + mtype = msg.get("type") + if mtype == "inject_gesture": + gesture = _PipeBrailleGesture( + source=msg.get("source", ""), + model=msg.get("model", ""), + id_=msg.get("id", ""), + routingIndex=msg.get("routingIndex"), + dots=msg.get("dots", 0), + space=msg.get("space", False), + ) + wx.CallAfter(braille.injectGesture, gesture) + else: + log.debug(f"braillePipeServer: unexpected mirror message type {mtype!r}") + + def close(self) -> None: + braille.unregisterMirror(self) + + +class _DirectSession(braille.DirectBrailleWindow): + """DirectBrailleWindow that forwards gestures over the pipe.""" + + def __init__(self, handle: ctypes.wintypes.HANDLE, hwnd: int, numCells: int) -> None: + super().__init__(hwnd=hwnd, numCells=numCells) + self._handle = handle + self.activate() + + def onGesture(self, gesture: braille.BrailleDisplayGesture) -> None: + msg: dict = { + "type": "gesture", + "source": getattr(gesture, "source", ""), + "model": getattr(gesture, "model", "") or "", + "id": getattr(gesture, "id", ""), + "routingIndex": getattr(gesture, "routingIndex", None), + "dots": getattr(gesture, "dots", 0), + "space": getattr(gesture, "space", False), + } + _sendMessage(self._handle, msg) + + def handleMessage(self, msg: dict) -> None: + mtype = msg.get("type") + if mtype == "display": + cells = msg.get("cells", []) + self.display(cells) + else: + log.debug(f"braillePipeServer: unexpected direct message type {mtype!r}") + + def close(self) -> None: + self.deactivate() + + +def _handleConnection(handle: ctypes.wintypes.HANDLE, pending_q: "queue.Queue") -> None: + """Thread that drives one client connection from registration to close.""" + session = None + try: + # First message must be a registration. + msg = _recvMessage(handle) + if msg is None: + return + mtype = msg.get("type") + if mtype == "register_mirror": + numCells = int(msg.get("numCells", 0)) + # If braille is not yet initialised, queue registration. + if braille.handler is None: + pending_q.put(("mirror", handle, numCells)) + return + session = _MirrorSession(handle, numCells) + elif mtype == "register_direct_braille": + hwnd = int(msg.get("hwnd", 0)) + numCells = int(msg.get("numCells", 0)) + if braille.handler is None: + pending_q.put(("direct", handle, hwnd, numCells)) + return + session = _DirectSession(handle, hwnd, numCells) + else: + log.warning(f"braillePipeServer: unexpected first message type {mtype!r}") + return + while True: + msg = _recvMessage(handle) + if msg is None: + break + session.handleMessage(msg) + except Exception: + log.exception("braillePipeServer: error in connection handler") + finally: + if session is not None: + session.close() + _kernel32.CloseHandle(handle) + + +class _PipeServer: + """Listens on a named pipe and spawns per-connection threads.""" + + def __init__(self, pipeName: str) -> None: + self._pipeName = pipeName + self._stop = threading.Event() + self._thread: Optional[threading.Thread] = None + # Queue for registrations that arrive before braille.handler is ready. + self._pending: queue.Queue = queue.Queue() + + def start(self) -> None: + self._thread = threading.Thread(target=self._loop, name="braillePipeServer", daemon=True) + self._thread.start() + + def stop(self) -> None: + self._stop.set() + # Unblock the accept loop by opening a dummy connection. + try: + dummy = _kernel32.CreateFileW( + self._pipeName, + 0, # GENERIC_READ | GENERIC_WRITE not needed, just unblock + 0, + None, + 3, # OPEN_EXISTING + 0, + None, + ) + if dummy != INVALID_HANDLE_VALUE: + _kernel32.CloseHandle(dummy) + except Exception: + pass + if self._thread: + self._thread.join(timeout=2.0) + + def processPending(self) -> None: + """Process any registrations that arrived before braille was ready.""" + while not self._pending.empty(): + try: + item = self._pending.get_nowait() + except queue.Empty: + break + role = item[0] + if role == "mirror": + _, handle, numCells = item + try: + session = _MirrorSession(handle, numCells) + threading.Thread( + target=_driveSession, + args=(handle, session), + daemon=True, + ).start() + except Exception: + log.exception("braillePipeServer: error creating pending mirror session") + _kernel32.CloseHandle(handle) + elif role == "direct": + _, handle, hwnd, numCells = item + try: + session = _DirectSession(handle, hwnd, numCells) + threading.Thread( + target=_driveSession, + args=(handle, session), + daemon=True, + ).start() + except Exception: + log.exception("braillePipeServer: error creating pending direct session") + _kernel32.CloseHandle(handle) + + def _loop(self) -> None: + while not self._stop.is_set(): + try: + handle = _createPipeInstance(self._pipeName) + except OSError: + log.exception("braillePipeServer: failed to create pipe instance") + break + connected = _connectClient(handle) + if self._stop.is_set(): + _kernel32.CloseHandle(handle) + break + if not connected: + _kernel32.CloseHandle(handle) + continue + threading.Thread( + target=_handleConnection, + args=(handle, self._pending), + daemon=True, + ).start() + + +def _driveSession(handle: ctypes.wintypes.HANDLE, session) -> None: + """Message loop for a session created from a pending registration.""" + try: + while True: + msg = _recvMessage(handle) + if msg is None: + break + session.handleMessage(msg) + except Exception: + log.exception("braillePipeServer: error in deferred session") + finally: + session.close() + _kernel32.CloseHandle(handle) + + +_server: Optional[_PipeServer] = None + + +def initialize() -> None: + """Start the named pipe server.""" + global _server + if _server is not None: + return + pipeName = r"\\.\pipe\nvda_braille_secure" if isRunningOnSecureDesktop() else r"\\.\pipe\nvda_braille" + _server = _PipeServer(pipeName) + _server.start() + log.info(f"braillePipeServer: listening on {pipeName}") + # Resolve any registrations that beat us to it (shouldn't normally happen since we start early, but be defensive). + _server.processPending() + + +def terminate() -> None: + """Stop the named pipe server.""" + global _server + if _server is None: + return + _server.stop() + _server = None + log.info("braillePipeServer: stopped") From 13072dc5ded6f6e0bd7b95af426016e9bbb38020 Mon Sep 17 00:00:00 2001 From: Quin Gillespie Date: Wed, 25 Mar 2026 09:55:35 -0600 Subject: [PATCH 02/12] Port _remoteClient to use the braille mirror. --- source/_remoteClient/localMachine.py | 40 +---------------------- source/_remoteClient/session.py | 49 +++++++++++++++++++--------- 2 files changed, 35 insertions(+), 54 deletions(-) diff --git a/source/_remoteClient/localMachine.py b/source/_remoteClient/localMachine.py index 7e1f8274989..34a095920e6 100644 --- a/source/_remoteClient/localMachine.py +++ b/source/_remoteClient/localMachine.py @@ -27,7 +27,6 @@ import api import braille from config.registry import RegistryKey -import inputCore import nvwave import speech import tones @@ -126,9 +125,6 @@ def __init__(self) -> None: self.receivingBraille = False - self._cachedSizes: list[int] | None = None - """Cached braille display sizes from remote machines""" - self._showingLocalUiMessage: bool = False """Whether we're currently showing a `ui.message` while showing remote braille.""" @@ -257,41 +253,7 @@ def brailleInput(self, **kwargs: dict[str, Any]) -> None: :param kwargs: Gesture parameters passed to BrailleInputGesture :note: Silently ignores gestures that have no associated action. """ - try: - inputCore.manager.executeGesture(input.BrailleInputGesture(**kwargs)) - except inputCore.NoInputGestureAction: - pass - - def setBrailleDisplaySize(self, sizes: list[int]) -> None: - """Cache remote braille display sizes for size negotiation. - - :param sizes: List of display sizes (cells) from remote machines - """ - self._cachedSizes = sizes - - def _handleFilterDisplayDimensions(self, value: braille.DisplayDimensions) -> braille.DisplayDimensions: - """Filter the local display dimensions based on remote display dimensions. - - Determines the optimal display dimensions when sharing braille output by - finding the smallest positive width among local and remote displays. - - .. note:: - We can currently only support a single line of braille, - as sending display dimensions would require changing the Remote Access protocol. - - :param value: Local display dimensions - :return: The negotiated display dimensions to use. - """ - if not self._cachedSizes: - # We cannot support multiline displays without breaking the Remote Access protocol, - # so always force numRows to 1. - return value._replace(numRows=1) - # There is no point storing the number of rows if we are always going to set it to 1. - sizes = self._cachedSizes + [value.numCols] - try: - return braille.DisplayDimensions(numRows=1, numCols=min(i for i in sizes if i > 0)) - except ValueError: - return value._replace(numRows=1) + braille.injectGesture(input.BrailleInputGesture(**kwargs)) def handleDecideEnabled(self) -> bool: """Determine if the local braille display should be enabled. diff --git a/source/_remoteClient/session.py b/source/_remoteClient/session.py index d068b30a8c3..d73dfc6be6a 100644 --- a/source/_remoteClient/session.py +++ b/source/_remoteClient/session.py @@ -270,6 +270,26 @@ def connectedClientsCount(self) -> int: return self.connectedLeadersCount + self.connectedFollowersCount +class _FollowerBrailleMirror(braille.BrailleMirror): + """BrailleMirror that forwards display updates to connected leader machines. + + Registered while at least one leader with a braille display is connected. + :meth:`numCells` returns the smallest positive leader display size so NVDA + negotiates a compatible display width. + """ + + def __init__(self, session: "FollowerSession") -> None: + self._session = session + + def display(self, cells: list[int]) -> None: + if self._session.hasBrailleLeaders(): + self._session.transport.send(type=RemoteMessageType.DISPLAY, cells=cells) + + def numCells(self) -> int: + sizes = [s for s in self._session.leaderDisplaySizes if s > 0] + return min(sizes) if sizes else 0 + + class FollowerSession(RemoteSession): """Session that runs on the controlled (follower) NVDA instance. @@ -302,6 +322,9 @@ def __init__( self.leaders = defaultdict(dict) self.leaderDisplaySizes = [] self.followers = set() + self._brailleMirror = _FollowerBrailleMirror(self) + # The remote protocol only supports single-row braille; force numRows to 1. + braille.filter_displayDimensions.register(self._filterNumRowsToOne) self.transport.transportClosing.register(self.handleTransportClosing) self.transport.registerInbound( RemoteMessageType.CHANNEL_JOINED, @@ -315,9 +338,6 @@ def __init__( RemoteMessageType.SET_DISPLAY_SIZE, self.setDisplaySize, ) - braille.filter_displayDimensions.register( - self.localMachine._handleFilterDisplayDimensions, - ) self.transport.registerInbound( RemoteMessageType.BRAILLE_INPUT, self.localMachine.brailleInput, @@ -340,7 +360,7 @@ def registerCallbacks(self) -> None: ) self.transport.registerOutbound(decide_playWaveFile, RemoteMessageType.WAVE) self.transport.registerOutbound(post_speechPaused, RemoteMessageType.PAUSE_SPEECH) - braille.pre_writeCells.register(self.display) + braille.registerMirror(self._brailleMirror) pre_speechQueued.register(self.sendSpeech) self.callbacksAdded = True @@ -351,7 +371,7 @@ def unregisterCallbacks(self) -> None: self.transport.unregisterOutbound(RemoteMessageType.CANCEL) self.transport.unregisterOutbound(RemoteMessageType.WAVE) self.transport.unregisterOutbound(RemoteMessageType.PAUSE_SPEECH) - braille.pre_writeCells.unregister(self.display) + braille.unregisterMirror(self._brailleMirror) pre_speechQueued.unregister(self.sendSpeech) self.callbacksAdded = False @@ -382,6 +402,7 @@ def handleTransportClosing(self) -> None: to ensure clean shutdown of remote features. """ self.unregisterCallbacks() + braille.filter_displayDimensions.unregister(self._filterNumRowsToOne) def handleTransportDisconnected(self) -> None: """Handle disconnection from the transport layer. @@ -408,7 +429,14 @@ def setDisplaySize(self, sizes: list[int] | None = None) -> None: sizes if sizes else [info.get("braille_numCells", 0) for info in self.leaders.values()] ) log.debug(f"Setting follower display size to: {self.leaderDisplaySizes!r}") - self.localMachine.setBrailleDisplaySize(self.leaderDisplaySizes) + + def _filterNumRowsToOne(self, value: braille.DisplayDimensions) -> braille.DisplayDimensions: + """Force single-row braille output. + + The remote protocol does not support multi-row displays, + so we always constrain numRows to 1. + """ + return value._replace(numRows=1) def handleBrailleInfo( self, @@ -451,15 +479,6 @@ def pauseSpeech(self, switch: bool) -> None: """Toggle speech pause state on leader instances.""" self.transport.send(type=RemoteMessageType.PAUSE_SPEECH, switch=switch) - def display(self, cells: list[int]) -> None: - """Forward braille display content to leader instances. - - Only sends braille data if there are connected leaders with braille displays. - """ - # Only send braille data when there are controlling machines with a braille display - if self.hasBrailleLeaders(): - self.transport.send(type=RemoteMessageType.DISPLAY, cells=cells) - def hasBrailleLeaders(self) -> bool: """Check if any connected leaders have braille displays. From c64bb830b5ce6e88cd191dee46fa17f6093b806f Mon Sep 17 00:00:00 2001 From: Quin Gillespie Date: Wed, 25 Mar 2026 13:06:28 -0600 Subject: [PATCH 03/12] Add unit tests for all the new braille APIs --- source/braillePipeServer.py | 78 +++-- .../test_mirrorAndDirectWindow.py | 280 ++++++++++++++++++ 2 files changed, 338 insertions(+), 20 deletions(-) create mode 100644 tests/unit/test_braille/test_mirrorAndDirectWindow.py diff --git a/source/braillePipeServer.py b/source/braillePipeServer.py index 6330fd72c4a..89916bdad70 100644 --- a/source/braillePipeServer.py +++ b/source/braillePipeServer.py @@ -27,7 +27,6 @@ import wx import braille -import inputCore from logHandler import log from utils.security import isRunningOnSecureDesktop @@ -105,10 +104,13 @@ def _writeAll(handle: ctypes.wintypes.HANDLE, data: bytes) -> bool: return True -def _sendMessage(handle: ctypes.wintypes.HANDLE, obj: dict) -> bool: +def _frameMessage(obj: dict) -> bytes: body = json.dumps(obj).encode("utf-8") - frame = struct.pack(" bool: + return _writeAll(handle, _frameMessage(obj)) def _recvMessage(handle: ctypes.wintypes.HANDLE) -> Optional[dict]: @@ -148,24 +150,56 @@ def _get_id(self) -> str: return self._id +class _AsyncWriter: + """Background writer thread that drains a queue to a pipe handle. + + All sends from NVDA's main-thread callbacks (display updates, gesture + notifications) go through here so pipe I/O never blocks the core loop. + Sending ``None`` to the queue is the stop sentinel. + """ + + def __init__(self, handle: ctypes.wintypes.HANDLE) -> None: + self._handle = handle + self._queue: queue.SimpleQueue = queue.SimpleQueue() + self._thread = threading.Thread(target=self._loop, daemon=True, name="braillePipeWriter") + self._thread.start() + + def send(self, obj: dict) -> None: + """Enqueue *obj* for asynchronous delivery to the pipe client.""" + self._queue.put(_frameMessage(obj)) + + def stop(self) -> None: + """Signal the writer thread to stop after draining remaining items.""" + self._queue.put(None) + + def _loop(self) -> None: + while True: + frame = self._queue.get() + if frame is None: + break + if not _writeAll(self._handle, frame): + break + + class _MirrorSession(braille.BrailleMirror): """BrailleMirror that forwards display updates over the pipe.""" def __init__(self, handle: ctypes.wintypes.HANDLE, numCells: int) -> None: - self._handle = handle self._numCells = numCells + self._writer = _AsyncWriter(handle) braille.registerMirror(self) + braille.displaySizeChanged.register(self._handleDisplaySizeChanged) if braille.handler: - self._notifyDisplaySize(braille.handler.displayDimensions.numCols) + self._writer.send({"type": "display_size", "numCols": braille.handler.displayDimensions.numCols}) def numCells(self) -> int: return self._numCells def display(self, cells: list) -> None: - _sendMessage(self._handle, {"type": "display", "cells": cells}) + self._writer.send({"type": "display", "cells": cells}) - def _notifyDisplaySize(self, numCols: int) -> None: - _sendMessage(self._handle, {"type": "display_size", "numCols": numCols}) + def _handleDisplaySizeChanged(self, displaySize: int, numRows: int, numCols: int) -> None: + self._writer.send({"type": "display_size", "numCols": numCols}) def handleMessage(self, msg: dict) -> None: mtype = msg.get("type") @@ -183,7 +217,9 @@ def handleMessage(self, msg: dict) -> None: log.debug(f"braillePipeServer: unexpected mirror message type {mtype!r}") def close(self) -> None: + braille.displaySizeChanged.unregister(self._handleDisplaySizeChanged) braille.unregisterMirror(self) + self._writer.stop() class _DirectSession(braille.DirectBrailleWindow): @@ -191,20 +227,21 @@ class _DirectSession(braille.DirectBrailleWindow): def __init__(self, handle: ctypes.wintypes.HANDLE, hwnd: int, numCells: int) -> None: super().__init__(hwnd=hwnd, numCells=numCells) - self._handle = handle + self._writer = _AsyncWriter(handle) self.activate() def onGesture(self, gesture: braille.BrailleDisplayGesture) -> None: - msg: dict = { - "type": "gesture", - "source": getattr(gesture, "source", ""), - "model": getattr(gesture, "model", "") or "", - "id": getattr(gesture, "id", ""), - "routingIndex": getattr(gesture, "routingIndex", None), - "dots": getattr(gesture, "dots", 0), - "space": getattr(gesture, "space", False), - } - _sendMessage(self._handle, msg) + self._writer.send( + { + "type": "gesture", + "source": getattr(gesture, "source", ""), + "model": getattr(gesture, "model", "") or "", + "id": getattr(gesture, "id", ""), + "routingIndex": getattr(gesture, "routingIndex", None), + "dots": getattr(gesture, "dots", 0), + "space": getattr(gesture, "space", False), + } + ) def handleMessage(self, msg: dict) -> None: mtype = msg.get("type") @@ -216,6 +253,7 @@ def handleMessage(self, msg: dict) -> None: def close(self) -> None: self.deactivate() + self._writer.stop() def _handleConnection(handle: ctypes.wintypes.HANDLE, pending_q: "queue.Queue") -> None: diff --git a/tests/unit/test_braille/test_mirrorAndDirectWindow.py b/tests/unit/test_braille/test_mirrorAndDirectWindow.py new file mode 100644 index 00000000000..d1e92a9a191 --- /dev/null +++ b/tests/unit/test_braille/test_mirrorAndDirectWindow.py @@ -0,0 +1,280 @@ +# A part of NonVisual Desktop Access (NVDA) +# This file is covered by the GNU General Public License. +# See the file COPYING for more details. +# Copyright (C) 2026 Pneuma Solutions + +"""Unit tests for braille.BrailleMirror, braille.DirectBrailleWindow, braille.registerMirror, braille.unregisterMirror, and braille.injectGesture.""" + +import unittest +from unittest.mock import MagicMock, patch + +import braille +import inputCore + + +class _CountingMirror(braille.BrailleMirror): + """A mirror that records every display() call.""" + + def __init__(self, numCells_: int = 0) -> None: + self._numCells_ = numCells_ + self.received: list[list[int]] = [] + + def display(self, cells: list[int]) -> None: + self.received.append(list(cells)) + + def numCells(self) -> int: + return self._numCells_ + + +class TestBrailleMirrorRegistration(unittest.TestCase): + """Test register/unregisterMirror lifecycle.""" + + def tearDown(self) -> None: + # Ensure no mirrors leak between tests. + for m in list(braille._registeredMirrors): + braille.unregisterMirror(m) + + def test_registerAddsToList(self): + m = _CountingMirror() + braille.registerMirror(m) + self.assertIn(m, braille._registeredMirrors) + + def test_unregisterRemovesFromList(self): + m = _CountingMirror() + braille.registerMirror(m) + braille.unregisterMirror(m) + self.assertNotIn(m, braille._registeredMirrors) + + def test_unregisterNonRegisteredIsSafe(self): + m = _CountingMirror() + # Should not raise. + braille.unregisterMirror(m) + + def test_extensionPointsHookedOnFirstRegister(self): + self.assertFalse(braille._registeredMirrors) + m = _CountingMirror() + braille.registerMirror(m) + # The shared handler should now be registered. + self.assertIn(braille._mirrorPreWriteCells, list(braille.pre_writeCells.handlers)) + + def test_extensionPointsUnhookedAfterLastUnregister(self): + m = _CountingMirror() + braille.registerMirror(m) + braille.unregisterMirror(m) + self.assertNotIn(braille._mirrorPreWriteCells, list(braille.pre_writeCells.handlers)) + + def test_multipleRegistrationsKeepHandlerOnce(self): + m1 = _CountingMirror() + m2 = _CountingMirror() + braille.registerMirror(m1) + braille.registerMirror(m2) + # The shared pre_writeCells handler should be registered exactly once. + self.assertEqual(list(braille.pre_writeCells.handlers).count(braille._mirrorPreWriteCells), 1) + braille.unregisterMirror(m1) + braille.unregisterMirror(m2) + + +class TestBrailleMirrorDisplay(unittest.TestCase): + def tearDown(self) -> None: + for m in list(braille._registeredMirrors): + braille.unregisterMirror(m) + + def test_mirrorReceivesCellsOnWriteCells(self): + m = _CountingMirror() + braille.registerMirror(m) + cells = [0] * braille.handler.displaySize + braille.handler._writeCells(cells) + self.assertEqual(len(m.received), 1) + self.assertEqual(m.received[0], cells) + + def test_multipleMirrorsAllReceiveCells(self): + m1 = _CountingMirror() + m2 = _CountingMirror() + braille.registerMirror(m1) + braille.registerMirror(m2) + cells = [1] * braille.handler.displaySize + braille.handler._writeCells(cells) + self.assertEqual(len(m1.received), 1) + self.assertEqual(len(m2.received), 1) + + def test_unregisteredMirrorReceivesNothing(self): + m = _CountingMirror() + braille.registerMirror(m) + braille.unregisterMirror(m) + braille.handler._writeCells([0] * braille.handler.displaySize) + self.assertEqual(m.received, []) + + +class TestBrailleMirrorNumCells(unittest.TestCase): + def tearDown(self) -> None: + for m in list(braille._registeredMirrors): + braille.unregisterMirror(m) + + def test_numCellsZeroHasNoEffect(self): + """A mirror with numCells()==0 should not shrink the display width.""" + m = _CountingMirror(numCells_=0) + braille.registerMirror(m) + original = braille.handler.displayDimensions + # Apply the mirror filter manually. + result = braille._mirrorFilterDisplayDimensions(original) + self.assertEqual(result, original) + + def test_numCellsShrinksCap(self): + """A mirror reporting fewer cells should cap numCols.""" + displayCols = braille.handler.displayDimensions.numCols + cap = max(1, displayCols - 4) + m = _CountingMirror(numCells_=cap) + braille.registerMirror(m) + original = braille.handler.displayDimensions + result = braille._mirrorFilterDisplayDimensions(original) + self.assertEqual(result.numCols, cap) + + def test_numCellsLargerThanDisplayHasNoEffect(self): + """A mirror reporting more cells than the display should not widen it.""" + displayCols = braille.handler.displayDimensions.numCols + m = _CountingMirror(numCells_=displayCols + 100) + braille.registerMirror(m) + original = braille.handler.displayDimensions + result = braille._mirrorFilterDisplayDimensions(original) + self.assertEqual(result.numCols, displayCols) + + def test_smallestMirrorWins(self): + """When multiple mirrors are registered, the smallest positive numCells wins.""" + displayCols = braille.handler.displayDimensions.numCols + small = max(1, displayCols - 10) + big = max(1, displayCols - 5) + m1 = _CountingMirror(numCells_=big) + m2 = _CountingMirror(numCells_=small) + braille.registerMirror(m1) + braille.registerMirror(m2) + original = braille.handler.displayDimensions + result = braille._mirrorFilterDisplayDimensions(original) + self.assertEqual(result.numCols, small) + + +class TestInjectGesture(unittest.TestCase): + def test_injectGestureCallsExecuteGesture(self): + gesture = MagicMock(spec=braille.BrailleDisplayGesture) + mock_manager = MagicMock() + with patch.object(inputCore, "manager", mock_manager): + braille.injectGesture(gesture) + mock_manager.executeGesture.assert_called_once_with(gesture) + + def test_injectGestureSwallowsNoInputGestureAction(self): + gesture = MagicMock(spec=braille.BrailleDisplayGesture) + mock_manager = MagicMock() + mock_manager.executeGesture.side_effect = inputCore.NoInputGestureAction + with patch.object(inputCore, "manager", mock_manager): + # Should not raise. + braille.injectGesture(gesture) + + +class TestDirectBrailleWindow(unittest.TestCase): + """Tests for DirectBrailleWindow using a mock foreground-window check.""" + + FAKE_HWND = 0xDEADBEEF + + def setUp(self) -> None: + self._win = braille.DirectBrailleWindow(hwnd=self.FAKE_HWND, numCells=0) + + def tearDown(self) -> None: + self._win.deactivate() + + def _patch_foreground(self, is_fg: bool): + return patch.object(self._win, "_isForeground", return_value=is_fg) + + def test_activateRegistersDecideEnabled(self): + self._win.activate() + self.assertIn(self._win._handleDecideEnabled, list(braille.decide_enabled.handlers)) + + def test_deactivateUnregistersDecideEnabled(self): + self._win.activate() + self._win.deactivate() + self.assertNotIn(self._win._handleDecideEnabled, list(braille.decide_enabled.handlers)) + + def test_doubleActivateIsNoop(self): + self._win.activate() + self._win.activate() + count = list(braille.decide_enabled.handlers).count(self._win._handleDecideEnabled) + self.assertEqual(count, 1) + self._win.deactivate() + + def test_doubleDeactivateIsNoop(self): + self._win.activate() + self._win.deactivate() + # Second deactivate should not raise. + self._win.deactivate() + + def test_handleDecideEnabledReturnsFalseWhenForeground(self): + with self._patch_foreground(True): + self.assertFalse(self._win._handleDecideEnabled()) + + def test_handleDecideEnabledReturnsTrueWhenNotForeground(self): + with self._patch_foreground(False): + self.assertTrue(self._win._handleDecideEnabled()) + + def test_gestureInterceptedWhenForeground(self): + gesture = MagicMock(spec=braille.BrailleDisplayGesture) + self._win.onGesture = MagicMock() + with self._patch_foreground(True): + result = self._win._handleDecideExecuteGesture(gesture) + self.assertFalse(result) + self._win.onGesture.assert_called_once_with(gesture) + + def test_gesturePassedThroughWhenNotForeground(self): + gesture = MagicMock(spec=braille.BrailleDisplayGesture) + self._win.onGesture = MagicMock() + with self._patch_foreground(False): + result = self._win._handleDecideExecuteGesture(gesture) + self.assertTrue(result) + self._win.onGesture.assert_not_called() + + def test_nonBrailleGestureAlwaysPassedThrough(self): + gesture = MagicMock() # Not a BrailleDisplayGesture + with self._patch_foreground(True): + result = self._win._handleDecideExecuteGesture(gesture) + self.assertTrue(result) + + def test_numCellsZeroSkipsFilter(self): + win = braille.DirectBrailleWindow(hwnd=self.FAKE_HWND, numCells=0) + dims = braille.handler.displayDimensions + with patch.object(win, "_isForeground", return_value=True): + result = win._handleFilterDisplayDimensions(dims) + self.assertEqual(result, dims) + + def test_numCellsCapsWhenForeground(self): + displayCols = braille.handler.displayDimensions.numCols + cap = max(1, displayCols - 4) + win = braille.DirectBrailleWindow(hwnd=self.FAKE_HWND, numCells=cap) + dims = braille.handler.displayDimensions + with patch.object(win, "_isForeground", return_value=True): + result = win._handleFilterDisplayDimensions(dims) + self.assertEqual(result.numCols, cap) + + def test_numCellsIgnoredWhenNotForeground(self): + displayCols = braille.handler.displayDimensions.numCols + win = braille.DirectBrailleWindow(hwnd=self.FAKE_HWND, numCells=max(1, displayCols - 4)) + dims = braille.handler.displayDimensions + with patch.object(win, "_isForeground", return_value=False): + result = win._handleFilterDisplayDimensions(dims) + self.assertEqual(result, dims) + + def test_numCellsLargerThanDisplayHasNoEffect(self): + displayCols = braille.handler.displayDimensions.numCols + win = braille.DirectBrailleWindow(hwnd=self.FAKE_HWND, numCells=displayCols + 100) + dims = braille.handler.displayDimensions + with patch.object(win, "_isForeground", return_value=True): + result = win._handleFilterDisplayDimensions(dims) + self.assertEqual(result.numCols, displayCols) + + def test_filterRegisteredWhenNumCellsPositive(self): + win = braille.DirectBrailleWindow(hwnd=self.FAKE_HWND, numCells=20) + win.activate() + self.assertIn(win._handleFilterDisplayDimensions, list(braille.filter_displayDimensions.handlers)) + win.deactivate() + + def test_filterNotRegisteredWhenNumCellsZero(self): + win = braille.DirectBrailleWindow(hwnd=self.FAKE_HWND, numCells=0) + win.activate() + self.assertNotIn(win._handleFilterDisplayDimensions, list(braille.filter_displayDimensions.handlers)) + win.deactivate() From 06e9a5cc7b041ab63ad6c63d91e5b946f27aa084 Mon Sep 17 00:00:00 2001 From: Quin Gillespie Date: Wed, 25 Mar 2026 13:23:28 -0600 Subject: [PATCH 04/12] Implement secure screen handling for the pipe IPC --- source/braillePipeServer.py | 68 +++++++++++++++++++++++++++++++++---- 1 file changed, 62 insertions(+), 6 deletions(-) diff --git a/source/braillePipeServer.py b/source/braillePipeServer.py index 89916bdad70..ba27d5914c1 100644 --- a/source/braillePipeServer.py +++ b/source/braillePipeServer.py @@ -31,6 +31,7 @@ from utils.security import isRunningOnSecureDesktop _kernel32 = ctypes.windll.kernel32 # type: ignore[attr-defined] +_advapi32 = ctypes.windll.advapi32 # type: ignore[attr-defined] INVALID_HANDLE_VALUE = ctypes.wintypes.HANDLE(-1).value ERROR_PIPE_CONNECTED = 535 @@ -44,8 +45,46 @@ NMPWAIT_USE_DEFAULT_WAIT = 0 FILE_FLAG_OVERLAPPED = 0x40000000 +SDDL_REVISION_1 = 1 -def _createPipeInstance(name: str) -> ctypes.wintypes.HANDLE: +# SDDL for the normal-desktop pipe. +# The normal-desktop NVDA instance runs as the logged-in user. Its default token DACL grants that user and local Administrators access, but does NOT reliably include NT AUTHORITY\SYSTEM, which is the identity used by RIM's elevated service component. +_NORMAL_PIPE_SDDL = "D:(A;;GA;;;OW)(A;;GRGW;;;SY)" + + +class _SECURITY_ATTRIBUTES(ctypes.Structure): + _fields_ = [ + ("nLength", ctypes.wintypes.DWORD), + ("lpSecurityDescriptor", ctypes.c_void_p), + ("bInheritHandle", ctypes.wintypes.BOOL), + ] + + def __init__(self, **kwargs): + super().__init__(nLength=ctypes.sizeof(self), **kwargs) + + +def _buildSystemAccessSA() -> tuple["_SECURITY_ATTRIBUTES", ctypes.c_void_p]: + """Build a SECURITY_ATTRIBUTES granting SYSTEM read/write on the normal-desktop pipe. + + Returns ``(sa, sd)`` where *sd* is the LocalAlloc'd security descriptor that must be freed with ``_kernel32.LocalFree(sd)`` when the pipe server stops. + """ + sd = ctypes.c_void_p() + ok = _advapi32.ConvertStringSecurityDescriptorToSecurityDescriptorW( + _NORMAL_PIPE_SDDL, + SDDL_REVISION_1, + ctypes.byref(sd), + None, + ) + if not ok: + raise ctypes.WinError() + sa = _SECURITY_ATTRIBUTES(lpSecurityDescriptor=sd) + return sa, sd + + +def _createPipeInstance( + name: str, + sa: Optional["_SECURITY_ATTRIBUTES"] = None, +) -> ctypes.wintypes.HANDLE: """Create a single named-pipe instance and return its handle.""" handle = _kernel32.CreateNamedPipeW( name, @@ -55,7 +94,7 @@ def _createPipeInstance(name: str) -> ctypes.wintypes.HANDLE: 65536, 65536, NMPWAIT_USE_DEFAULT_WAIT, - None, + ctypes.byref(sa) if sa is not None else None, ) if handle == INVALID_HANDLE_VALUE: raise ctypes.WinError() @@ -298,14 +337,25 @@ def _handleConnection(handle: ctypes.wintypes.HANDLE, pending_q: "queue.Queue") class _PipeServer: """Listens on a named pipe and spawns per-connection threads.""" - def __init__(self, pipeName: str) -> None: + def __init__(self, pipeName: str, useSystemDacl: bool = False) -> None: self._pipeName = pipeName + self._useSystemDacl = useSystemDacl self._stop = threading.Event() self._thread: Optional[threading.Thread] = None + self._sa: Optional[_SECURITY_ATTRIBUTES] = None + self._sd: Optional[ctypes.c_void_p] = None # Queue for registrations that arrive before braille.handler is ready. self._pending: queue.Queue = queue.Queue() def start(self) -> None: + if self._useSystemDacl: + try: + self._sa, self._sd = _buildSystemAccessSA() + except OSError: + log.exception( + "braillePipeServer: failed to build SYSTEM-access security descriptor;" + " falling back to default DACL (SYSTEM clients may be unable to connect)", + ) self._thread = threading.Thread(target=self._loop, name="braillePipeServer", daemon=True) self._thread.start() @@ -328,6 +378,10 @@ def stop(self) -> None: pass if self._thread: self._thread.join(timeout=2.0) + if self._sd is not None: + _kernel32.LocalFree(self._sd) + self._sd = None + self._sa = None def processPending(self) -> None: """Process any registrations that arrived before braille was ready.""" @@ -365,7 +419,7 @@ def processPending(self) -> None: def _loop(self) -> None: while not self._stop.is_set(): try: - handle = _createPipeInstance(self._pipeName) + handle = _createPipeInstance(self._pipeName, self._sa) except OSError: log.exception("braillePipeServer: failed to create pipe instance") break @@ -406,8 +460,10 @@ def initialize() -> None: global _server if _server is not None: return - pipeName = r"\\.\pipe\nvda_braille_secure" if isRunningOnSecureDesktop() else r"\\.\pipe\nvda_braille" - _server = _PipeServer(pipeName) + isSecure = isRunningOnSecureDesktop() + pipeName = r"\\.\pipe\nvda_braille_secure" if isSecure else r"\\.\pipe\nvda_braille" + # The secure-desktop instance runs as SYSTEM, so its default DACL already permits SYSTEM connections. Only the normal-desktop instance needs the explicit SYSTEM ACE. + _server = _PipeServer(pipeName, useSystemDacl=not isSecure) _server.start() log.info(f"braillePipeServer: listening on {pipeName}") # Resolve any registrations that beat us to it (shouldn't normally happen since we start early, but be defensive). From 6c7056e374e295228d1bee33b10528a55119b9f1 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 25 Mar 2026 19:37:32 +0000 Subject: [PATCH 05/12] Pre-commit auto-fix --- source/braillePipeServer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/braillePipeServer.py b/source/braillePipeServer.py index ba27d5914c1..ff77819fea0 100644 --- a/source/braillePipeServer.py +++ b/source/braillePipeServer.py @@ -279,7 +279,7 @@ def onGesture(self, gesture: braille.BrailleDisplayGesture) -> None: "routingIndex": getattr(gesture, "routingIndex", None), "dots": getattr(gesture, "dots", 0), "space": getattr(gesture, "space", False), - } + }, ) def handleMessage(self, msg: dict) -> None: From b6deb2f90dc1b76ab25be5af77a16b8b9a4ddef8 Mon Sep 17 00:00:00 2001 From: Quin Gillespie Date: Fri, 27 Mar 2026 13:53:23 -0600 Subject: [PATCH 06/12] Change to generic pipe names --- source/braillePipeServer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/braillePipeServer.py b/source/braillePipeServer.py index ff77819fea0..3cbf5f240ef 100644 --- a/source/braillePipeServer.py +++ b/source/braillePipeServer.py @@ -7,8 +7,8 @@ Two pipe names are used: -* ``\\\\.\\pipe\\nvda_braille`` – normal desktop instance -* ``\\\\.\\pipe\\nvda_braille_secure`` – secure desktop instance +* ``\\\\.\\pipe\\screen_reader_braille`` – normal desktop instance +* ``\\\\.\\pipe\\screen_reader_braille_secure`` – secure desktop instance Wire protocol: Every message is length-prefixed: 4 bytes little-endian uint32 body length, followed by a UTF-8 JSON object. @@ -461,7 +461,7 @@ def initialize() -> None: if _server is not None: return isSecure = isRunningOnSecureDesktop() - pipeName = r"\\.\pipe\nvda_braille_secure" if isSecure else r"\\.\pipe\nvda_braille" + pipeName = r"\\.\pipe\screen_reader_braille_secure" if isSecure else r"\\.\pipe\screen_reader_braille" # The secure-desktop instance runs as SYSTEM, so its default DACL already permits SYSTEM connections. Only the normal-desktop instance needs the explicit SYSTEM ACE. _server = _PipeServer(pipeName, useSystemDacl=not isSecure) _server.start() From bbcae570e002a16d6c2b4b0c226890872ef69446 Mon Sep 17 00:00:00 2001 From: Quin Gillespie Date: Wed, 20 May 2026 13:59:42 -0600 Subject: [PATCH 07/12] Apply seanbudd review feedback on new braille APIs - Fix copyright headers to use the project-standard format (NV Access Limited, Pneuma Solutions + GPL v2 license text) in braillePipeServer.py, _brailleMirror.py, and the unit test file. - Replace ctypes.windll.kernel32/advapi32 direct usage in braillePipeServer.py with winBindings; add CreateNamedPipe, ConnectNamedPipe, ReadFile, LocalFree to winBindings/kernel32.py and ConvertStringSecurityDescriptorToSecurityDescriptorW to winBindings/advapi32.py. Remove the inline _SECURITY_ATTRIBUTES class in favour of winBindings.advapi32.SECURITY_ATTRIBUTES. - Replace Optional[X] with X | None throughout braillePipeServer.py. - Move BrailleMirror, DirectBrailleWindow, registerMirror, unregisterMirror, injectGesture and their private helpers out of the already-large braille.py into a new source/_brailleMirror.py; braille.py re-exports them so all callers are unaffected (closes #12772 concern). --- source/_brailleMirror.py | 193 ++++++++++++++++++ source/braille.py | 179 +--------------- source/braillePipeServer.py | 46 ++--- source/winBindings/advapi32.py | 19 ++ source/winBindings/kernel32.py | 68 ++++++ .../test_mirrorAndDirectWindow.py | 6 +- 6 files changed, 310 insertions(+), 201 deletions(-) create mode 100644 source/_brailleMirror.py diff --git a/source/_brailleMirror.py b/source/_brailleMirror.py new file mode 100644 index 00000000000..6db247b3f40 --- /dev/null +++ b/source/_brailleMirror.py @@ -0,0 +1,193 @@ +# A part of NonVisual Desktop Access (NVDA) +# Copyright (C) 2026 NV Access Limited, Pneuma Solutions +# This file may be used under the terms of the GNU General Public License, version 2 or later, as modified by the NVDA license. +# For full terms and any additional permissions, see the NVDA license file: https://github.com/nvaccess/nvda/blob/master/copying.txt + +"""BrailleMirror and DirectBrailleWindow public APIs, and their supporting infrastructure. + +Kept in a separate module to avoid further growth of braille.py (see #12772). +Imported into braille at the bottom of that module so callers continue to access +everything as ``braille.BrailleMirror`` etc. +""" + +from __future__ import annotations + +import inputCore +import winUser +import wx + +# Imported at module level for runtime isinstance checks and attribute access. +# All references to braille.X names are inside function/method bodies so the +# circular import (braille -> _brailleMirror -> braille) is safe. +import braille + + +class BrailleMirror: + """Abstract base class for a braille mirror. + + A mirror intercepts every braille display update and can optionally influence the negotiated display width. + Both the physical display and all registered mirrors receive the same cells simultaneously; the mirror does **not** suppress the local display. + Register an instance with :func:`registerMirror` and remove it with :func:`unregisterMirror`. To inject a gesture back into NVDA (e.g. a routing key received over a remote channel) use :func:`injectGesture`. + """ + + def display(self, cells: list[int]) -> None: + """Called with the full cell array on every display update. + + :param cells: The braille cells written to the display. + """ + + def numCells(self) -> int: + """Return the number of cells this mirror can show. + + Return 0 (the default) to have no effect on the negotiated display + width. A positive value caps the display width used by + :data:`braille.filter_displayDimensions` to the smallest value across all registered mirrors and the physical display. + """ + return 0 + + +_registeredMirrors: list[BrailleMirror] = [] + + +def _mirrorPreWriteCells(cells: list[int], **kwargs) -> None: + for mirror in _registeredMirrors: + mirror.display(cells) + + +def _mirrorFilterDisplayDimensions(value: braille.DisplayDimensions) -> braille.DisplayDimensions: + sizes = [m.numCells() for m in _registeredMirrors if m.numCells() > 0] + if not sizes: + return value + cap = min(sizes) + if cap >= value.numCols: + return value + return value._replace(numCols=cap) + + +def registerMirror(mirror: BrailleMirror) -> None: + """Register *mirror* to receive braille display updates. + + :meth:`BrailleMirror.display` will be called on the main thread for every subsequent :meth:`braille.BrailleHandler._writeCells` call. If *mirror* returns a positive value from :meth:`BrailleMirror.numCells`, it will also participate in display-width negotiation via :data:`braille.filter_displayDimensions`. + """ + if not _registeredMirrors: + braille.pre_writeCells.register(_mirrorPreWriteCells) + braille.filter_displayDimensions.register(_mirrorFilterDisplayDimensions) + _registeredMirrors.append(mirror) + if braille.handler: + braille.handler._refreshEnabled(block=True) + + +def unregisterMirror(mirror: BrailleMirror) -> None: + """Remove a previously registered mirror. + + Safe to call even if *mirror* is not currently registered. + """ + try: + _registeredMirrors.remove(mirror) + except ValueError: + return + if not _registeredMirrors: + braille.pre_writeCells.unregister(_mirrorPreWriteCells) + braille.filter_displayDimensions.unregister(_mirrorFilterDisplayDimensions) + if braille.handler: + braille.handler._refreshEnabled(block=True) + + +def injectGesture(gesture: braille.BrailleDisplayGesture) -> None: + """Inject *gesture* into NVDA's input pipeline. + + This is a thin wrapper around :func:`inputCore.manager.executeGesture` that silently swallows :class:`inputCore.NoInputGestureAction` so callers do not need to handle the common case where no script is bound. + + Thread safety: must be called on the main thread, or scheduled via ``wx.CallAfter`` from a background thread. + """ + try: + inputCore.manager.executeGesture(gesture) + except inputCore.NoInputGestureAction: + pass + + +class DirectBrailleWindow: + """Take over braille output and input while a specific window has focus. + + When the window identified by *hwnd* is the foreground window, NVDA's own braille rendering is suspended. The application drives what appears on the physical display by calling :meth:`display`, and all braille gestures are forwarded to :meth:`onGesture` instead of being processed by NVDA. + When the window loses focus, normal NVDA rendering resumes automatically. + + :param hwnd: The HWND of the window that triggers direct braille mode. + :param numCells: Advertised display width; 0 means use whatever NVDA provides. A positive value caps the negotiated display width via :data:`braille.filter_displayDimensions`. + """ + + def __init__(self, hwnd: int, numCells: int = 0) -> None: + self._hwnd = hwnd + self._numCells = numCells + self._active = False + + def _isForeground(self) -> bool: + """Return True if our registered window is currently in the foreground.""" + fg = winUser.getForegroundWindow() + return fg == self._hwnd or winUser.isDescendantWindow(self._hwnd, fg) + + def display(self, cells: list[int]) -> None: + """Push *cells* to the physical braille display. + + Has no effect if the registered window is not currently foreground or if no braille display is connected. + + Thread safety: safe to call from any thread; the actual write is dispatched to the main thread via ``wx.CallAfter``. + """ + if braille.handler and self._isForeground(): + wx.CallAfter(braille.handler._writeCells, cells) + + def onGesture(self, gesture: braille.BrailleDisplayGesture) -> None: + """Called when a braille gesture arrives while this window is active. + + The default implementation does nothing, so gestures are suppressed. Subclass and override to handle them. + + :param gesture: The braille gesture that was intercepted. + """ + + def _handleDecideEnabled(self) -> bool: + return not self._isForeground() + + def _handleDecideExecuteGesture(self, gesture: inputCore.InputGesture) -> bool: + if not self._isForeground(): + return True + if isinstance(gesture, braille.BrailleDisplayGesture): + self.onGesture(gesture) + return False + return True + + def _handleFilterDisplayDimensions(self, value: braille.DisplayDimensions) -> braille.DisplayDimensions: + if self._numCells <= 0 or not self._isForeground(): + return value + if self._numCells >= value.numCols: + return value + return value._replace(numCols=self._numCells) + + def activate(self) -> None: + """Start intercepting braille output and input for the registered window. + + Safe to call even if already active (subsequent calls are no-ops). + """ + if self._active: + return + self._active = True + braille.decide_enabled.register(self._handleDecideEnabled) + inputCore.decide_executeGesture.register(self._handleDecideExecuteGesture) + if self._numCells > 0: + braille.filter_displayDimensions.register(self._handleFilterDisplayDimensions) + if braille.handler: + braille.handler._refreshEnabled(block=True) + + def deactivate(self) -> None: + """Stop intercepting braille, restoring NVDA's normal rendering. + + Safe to call even if not currently active. + """ + if not self._active: + return + self._active = False + braille.decide_enabled.unregister(self._handleDecideEnabled) + inputCore.decide_executeGesture.unregister(self._handleDecideExecuteGesture) + if self._numCells > 0: + braille.filter_displayDimensions.unregister(self._handleFilterDisplayDimensions) + if braille.handler: + braille.handler._refreshEnabled(block=True) diff --git a/source/braille.py b/source/braille.py index 2e8ed075934..bf6e733dd8a 100644 --- a/source/braille.py +++ b/source/braille.py @@ -3976,175 +3976,16 @@ def getDisplayTextForIdentifier(cls, identifier): inputCore.registerGestureSource("br", BrailleDisplayGesture) -class BrailleMirror: - """Abstract base class for a braille mirror. - - A mirror intercepts every braille display update and can optionally influence the negotiated display width. - Both the physical display and all registered mirrors receive the same cells simultaneously; the mirror does **not** suppress the local display. - Register an instance with :func:`registerMirror` and remove it with :func:`unregisterMirror`. To inject a gesture back into NVDA (e.g. a routing key received over a remote channel) use :func:`injectGesture`. - """ - - def display(self, cells: List[int]) -> None: - """Called with the full cell array on every display update. - - :param cells: The braille cells written to the display. - """ - - def numCells(self) -> int: - """Return the number of cells this mirror can show. - - Return 0 (the default) to have no effect on the negotiated display - width. A positive value caps the display width used by - :data:`filter_displayDimensions` to the smallest value across all registered mirrors and the physical display. - """ - return 0 - - -_registeredMirrors: List["BrailleMirror"] = [] - - -def _mirrorPreWriteCells(cells: List[int], **kwargs) -> None: - for mirror in _registeredMirrors: - mirror.display(cells) - - -def _mirrorFilterDisplayDimensions(value: DisplayDimensions) -> DisplayDimensions: - sizes = [m.numCells() for m in _registeredMirrors if m.numCells() > 0] - if not sizes: - return value - cap = min(sizes) - if cap >= value.numCols: - return value - return value._replace(numCols=cap) - - -def registerMirror(mirror: BrailleMirror) -> None: - """Register *mirror* to receive braille display updates. - - :meth:`BrailleMirror.display` will be called on the main thread for every subsequent :meth:`BrailleHandler._writeCells` call. If *mirror* returns a positive value from :meth:`BrailleMirror.numCells`, it will also participate in display-width negotiation via :data:`filter_displayDimensions`. - """ - if not _registeredMirrors: - pre_writeCells.register(_mirrorPreWriteCells) - filter_displayDimensions.register(_mirrorFilterDisplayDimensions) - _registeredMirrors.append(mirror) - if handler: - handler._refreshEnabled(block=True) - - -def unregisterMirror(mirror: BrailleMirror) -> None: - """Remove a previously registered mirror. - - Safe to call even if *mirror* is not currently registered. - """ - try: - _registeredMirrors.remove(mirror) - except ValueError: - return - if not _registeredMirrors: - pre_writeCells.unregister(_mirrorPreWriteCells) - filter_displayDimensions.unregister(_mirrorFilterDisplayDimensions) - if handler: - handler._refreshEnabled(block=True) - - -def injectGesture(gesture: BrailleDisplayGesture) -> None: - """Inject *gesture* into NVDA's input pipeline. - - This is a thin wrapper around :func:`inputCore.manager.executeGesture` that silently swallows :class:`inputCore.NoInputGestureAction` so callers do not need to handle the common case where no script is bound. - - Thread safety: must be called on the main thread, or scheduled via ``wx.CallAfter`` from a background thread. - """ - try: - inputCore.manager.executeGesture(gesture) - except inputCore.NoInputGestureAction: - pass - - -class DirectBrailleWindow: - """Take over braille output and input while a specific window has focus. - - When the window identified by *hwnd* is the foreground window, NVDA's own braille rendering is suppressed. The application drives what appears on the physical display by calling :meth:`display`, and all braille gestures are forwarded to :meth:`onGesture` instead of being processed by NVDA. - When the window loses focus, normal NVDA rendering resumes automatically. - - :param hwnd: The HWND of the window that triggers direct braille mode. - :param numCells: Advertised display width; 0 means use whatever NVDA provides. A positive value caps the negotiated display width via :data:`filter_displayDimensions`. - """ - - def __init__(self, hwnd: int, numCells: int = 0) -> None: - self._hwnd = hwnd - self._numCells = numCells - self._active = False - - def _isForeground(self) -> bool: - """Return True if our registered window is currently in the foreground.""" - fg = winUser.getForegroundWindow() - return fg == self._hwnd or winUser.isDescendantWindow(self._hwnd, fg) - - def display(self, cells: List[int]) -> None: - """Push *cells* to the physical braille display. - - Has no effect if the registered window is not currently foreground or if no braille display is connected. - - Thread safety: safe to call from any thread; the actual write is dispatched to the main thread via ``wx.CallAfter``. - """ - if handler and self._isForeground(): - wx.CallAfter(handler._writeCells, cells) - - def onGesture(self, gesture: BrailleDisplayGesture) -> None: - """Called when a braille gesture arrives while this window is active. - - The default implementation does nothing, so gestures are suppressed. Subclass and override to handle them. - - :param gesture: The braille gesture that was intercepted. - """ - - def _handleDecideEnabled(self) -> bool: - return not self._isForeground() - - def _handleDecideExecuteGesture(self, gesture: inputCore.InputGesture) -> bool: - if not self._isForeground(): - return True - if isinstance(gesture, BrailleDisplayGesture): - self.onGesture(gesture) - return False - return True - - def _handleFilterDisplayDimensions(self, value: DisplayDimensions) -> DisplayDimensions: - if self._numCells <= 0 or not self._isForeground(): - return value - if self._numCells >= value.numCols: - return value - return value._replace(numCols=self._numCells) - - def activate(self) -> None: - """Start intercepting braille output and input for the registered window. - - Safe to call even if already active (subsequent calls are no-ops). - """ - if self._active: - return - self._active = True - decide_enabled.register(self._handleDecideEnabled) - inputCore.decide_executeGesture.register(self._handleDecideExecuteGesture) - if self._numCells > 0: - filter_displayDimensions.register(self._handleFilterDisplayDimensions) - if handler: - handler._refreshEnabled(block=True) - - def deactivate(self) -> None: - """Stop intercepting braille, restoring NVDA's normal rendering. - - Safe to call even if not currently active. - """ - if not self._active: - return - self._active = False - decide_enabled.unregister(self._handleDecideEnabled) - inputCore.decide_executeGesture.unregister(self._handleDecideExecuteGesture) - if self._numCells > 0: - filter_displayDimensions.unregister(self._handleFilterDisplayDimensions) - if handler: - handler._refreshEnabled(block=True) +from _brailleMirror import ( + BrailleMirror, + DirectBrailleWindow, + injectGesture, + registerMirror, + unregisterMirror, + _mirrorFilterDisplayDimensions, + _mirrorPreWriteCells, + _registeredMirrors, +) def getSerialPorts(filterFunc=None) -> typing.Iterator[typing.Tuple[str, str]]: diff --git a/source/braillePipeServer.py b/source/braillePipeServer.py index 3cbf5f240ef..c65614586b6 100644 --- a/source/braillePipeServer.py +++ b/source/braillePipeServer.py @@ -1,7 +1,7 @@ # A part of NonVisual Desktop Access (NVDA) -# This file is covered by the GNU General Public License. -# See the file COPYING for more details. -# Copyright (C) 2026 Pneuma Solutions +# Copyright (C) 2026 NV Access Limited, Pneuma Solutions +# This file may be used under the terms of the GNU General Public License, version 2 or later, as modified by the NVDA license. +# For full terms and any additional permissions, see the NVDA license file: https://github.com/nvaccess/nvda/blob/master/copying.txt """Named-pipe IPC server that exposes the Braille Mirror and Direct Braille Window APIs to external processes. @@ -22,16 +22,15 @@ import queue import struct import threading -from typing import Optional import wx import braille from logHandler import log from utils.security import isRunningOnSecureDesktop - -_kernel32 = ctypes.windll.kernel32 # type: ignore[attr-defined] -_advapi32 = ctypes.windll.advapi32 # type: ignore[attr-defined] +from winBindings import advapi32 as _advapi32 +from winBindings import kernel32 as _kernel32 +from winBindings.advapi32 import SECURITY_ATTRIBUTES INVALID_HANDLE_VALUE = ctypes.wintypes.HANDLE(-1).value ERROR_PIPE_CONNECTED = 535 @@ -52,18 +51,7 @@ _NORMAL_PIPE_SDDL = "D:(A;;GA;;;OW)(A;;GRGW;;;SY)" -class _SECURITY_ATTRIBUTES(ctypes.Structure): - _fields_ = [ - ("nLength", ctypes.wintypes.DWORD), - ("lpSecurityDescriptor", ctypes.c_void_p), - ("bInheritHandle", ctypes.wintypes.BOOL), - ] - - def __init__(self, **kwargs): - super().__init__(nLength=ctypes.sizeof(self), **kwargs) - - -def _buildSystemAccessSA() -> tuple["_SECURITY_ATTRIBUTES", ctypes.c_void_p]: +def _buildSystemAccessSA() -> tuple[SECURITY_ATTRIBUTES, ctypes.c_void_p]: """Build a SECURITY_ATTRIBUTES granting SYSTEM read/write on the normal-desktop pipe. Returns ``(sa, sd)`` where *sd* is the LocalAlloc'd security descriptor that must be freed with ``_kernel32.LocalFree(sd)`` when the pipe server stops. @@ -77,16 +65,16 @@ def _buildSystemAccessSA() -> tuple["_SECURITY_ATTRIBUTES", ctypes.c_void_p]: ) if not ok: raise ctypes.WinError() - sa = _SECURITY_ATTRIBUTES(lpSecurityDescriptor=sd) + sa = SECURITY_ATTRIBUTES(nLength=ctypes.sizeof(SECURITY_ATTRIBUTES), lpSecurityDescriptor=sd) return sa, sd def _createPipeInstance( name: str, - sa: Optional["_SECURITY_ATTRIBUTES"] = None, + sa: SECURITY_ATTRIBUTES | None = None, ) -> ctypes.wintypes.HANDLE: """Create a single named-pipe instance and return its handle.""" - handle = _kernel32.CreateNamedPipeW( + handle = _kernel32.CreateNamedPipe( name, PIPE_ACCESS_DUPLEX, PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, @@ -112,7 +100,7 @@ def _connectClient(handle: ctypes.wintypes.HANDLE) -> bool: return False -def _readExact(handle: ctypes.wintypes.HANDLE, n: int) -> Optional[bytes]: +def _readExact(handle: ctypes.wintypes.HANDLE, n: int) -> bytes | None: """Read exactly *n* bytes from *handle*. Return None on pipe break.""" buf = (ctypes.c_char * n)() total = 0 @@ -152,7 +140,7 @@ def _sendMessage(handle: ctypes.wintypes.HANDLE, obj: dict) -> bool: return _writeAll(handle, _frameMessage(obj)) -def _recvMessage(handle: ctypes.wintypes.HANDLE) -> Optional[dict]: +def _recvMessage(handle: ctypes.wintypes.HANDLE) -> dict | None: header = _readExact(handle, 4) if header is None: return None @@ -341,9 +329,9 @@ def __init__(self, pipeName: str, useSystemDacl: bool = False) -> None: self._pipeName = pipeName self._useSystemDacl = useSystemDacl self._stop = threading.Event() - self._thread: Optional[threading.Thread] = None - self._sa: Optional[_SECURITY_ATTRIBUTES] = None - self._sd: Optional[ctypes.c_void_p] = None + self._thread: threading.Thread | None = None + self._sa: SECURITY_ATTRIBUTES | None = None + self._sd: ctypes.c_void_p | None = None # Queue for registrations that arrive before braille.handler is ready. self._pending: queue.Queue = queue.Queue() @@ -363,7 +351,7 @@ def stop(self) -> None: self._stop.set() # Unblock the accept loop by opening a dummy connection. try: - dummy = _kernel32.CreateFileW( + dummy = _kernel32.CreateFile( self._pipeName, 0, # GENERIC_READ | GENERIC_WRITE not needed, just unblock 0, @@ -452,7 +440,7 @@ def _driveSession(handle: ctypes.wintypes.HANDLE, session) -> None: _kernel32.CloseHandle(handle) -_server: Optional[_PipeServer] = None +_server: _PipeServer | None = None def initialize() -> None: diff --git a/source/winBindings/advapi32.py b/source/winBindings/advapi32.py index 69f69764284..3c8bf293bb0 100644 --- a/source/winBindings/advapi32.py +++ b/source/winBindings/advapi32.py @@ -35,6 +35,7 @@ "RegQueryValueEx", "CreateProcessAsUser", "GetTokenInformation", + "ConvertStringSecurityDescriptorToSecurityDescriptorW", ) @@ -259,3 +260,21 @@ class TOKEN_ELEVATION_TYPE(IntEnum): POINTER(DWORD), # ReturnLength ) GetTokenInformation.restype = BOOL + + +ConvertStringSecurityDescriptorToSecurityDescriptorW = WINFUNCTYPE(None)( + ("ConvertStringSecurityDescriptorToSecurityDescriptorW", dll), +) +""" +Converts a string-format security descriptor into a valid, functional security descriptor. + +.. seealso:: + https://learn.microsoft.com/en-us/windows/win32/api/sddl/nf-sddl-convertstringsecuritydescriptortosecuritydescriptorw +""" +ConvertStringSecurityDescriptorToSecurityDescriptorW.restype = BOOL +ConvertStringSecurityDescriptorToSecurityDescriptorW.argtypes = ( + LPCWSTR, # StringSecurityDescriptor: The string-format security descriptor to convert + DWORD, # StringSDRevision: The revision level of the StringSecurityDescriptor string + POINTER(c_void_p), # SecurityDescriptor: Receives a pointer to the converted security descriptor + c_void_p, # SecurityDescriptorSize: Optional pointer to receive the size; pass None if not needed +) diff --git a/source/winBindings/kernel32.py b/source/winBindings/kernel32.py index 9832131d46e..b0adc6eaa72 100644 --- a/source/winBindings/kernel32.py +++ b/source/winBindings/kernel32.py @@ -134,6 +134,10 @@ "MoveFileEx", "LCIDToLocaleName", "GetACP", + "CreateNamedPipe", + "ConnectNamedPipe", + "ReadFile", + "LocalFree", ) @@ -1722,3 +1726,67 @@ class FILE_MAP(IntEnum): BOOL, # bInheritHandle LPCWSTR, # lpName ) + + +CreateNamedPipe = WINFUNCTYPE(None)(("CreateNamedPipeW", dll)) +""" +Creates an instance of a named pipe and returns a handle for subsequent pipe operations. + +.. seealso:: + https://learn.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipew +""" +CreateNamedPipe.restype = HANDLE +CreateNamedPipe.argtypes = ( + LPCWSTR, # lpName: The unique pipe name + DWORD, # dwOpenMode: The open mode + DWORD, # dwPipeMode: The pipe mode + DWORD, # nMaxInstances: The maximum number of instances that can be created for this pipe + DWORD, # nOutBufferSize: The number of bytes to reserve for the output buffer + DWORD, # nInBufferSize: The number of bytes to reserve for the input buffer + DWORD, # nDefaultTimeOut: The default time-out value, in milliseconds + LPSECURITY_ATTRIBUTES, # lpSecurityAttributes: A pointer to a SECURITY_ATTRIBUTES structure +) + + +ConnectNamedPipe = WINFUNCTYPE(None)(("ConnectNamedPipe", dll)) +""" +Enables a named pipe server process to wait for a client process to connect to an instance of a named pipe. + +.. seealso:: + https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-connectnamedpipe +""" +ConnectNamedPipe.restype = BOOL +ConnectNamedPipe.argtypes = ( + HANDLE, # hNamedPipe: A handle to the server end of a named pipe instance + LPOVERLAPPED, # lpOverlapped: A pointer to an OVERLAPPED structure (NULL for synchronous I/O) +) + + +ReadFile = WINFUNCTYPE(None)(("ReadFile", dll)) +""" +Reads data from the specified file or input/output (I/O) device. + +.. seealso:: + https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile +""" +ReadFile.restype = BOOL +ReadFile.argtypes = ( + HANDLE, # hFile: A handle to the device + LPVOID, # lpBuffer: A pointer to the buffer that receives the data read from a file or device + DWORD, # nNumberOfBytesToRead: The maximum number of bytes to be read + LPDWORD, # lpNumberOfBytesRead: A pointer to the variable that receives the number of bytes read + LPOVERLAPPED, # lpOverlapped: A pointer to an OVERLAPPED structure (NULL for synchronous I/O) +) + + +LocalFree = WINFUNCTYPE(None)(("LocalFree", dll)) +""" +Frees the specified local memory object and invalidates its handle. + +.. seealso:: + https://learn.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-localfree +""" +LocalFree.restype = HANDLE +LocalFree.argtypes = ( + HANDLE, # hMem: A handle to the local memory object +) diff --git a/tests/unit/test_braille/test_mirrorAndDirectWindow.py b/tests/unit/test_braille/test_mirrorAndDirectWindow.py index d1e92a9a191..3713a98d845 100644 --- a/tests/unit/test_braille/test_mirrorAndDirectWindow.py +++ b/tests/unit/test_braille/test_mirrorAndDirectWindow.py @@ -1,7 +1,7 @@ # A part of NonVisual Desktop Access (NVDA) -# This file is covered by the GNU General Public License. -# See the file COPYING for more details. -# Copyright (C) 2026 Pneuma Solutions +# Copyright (C) 2026 NV Access Limited, Pneuma Solutions +# This file may be used under the terms of the GNU General Public License, version 2 or later, as modified by the NVDA license. +# For full terms and any additional permissions, see the NVDA license file: https://github.com/nvaccess/nvda/blob/master/copying.txt """Unit tests for braille.BrailleMirror, braille.DirectBrailleWindow, braille.registerMirror, braille.unregisterMirror, and braille.injectGesture.""" From 16e78a4cdab18e1150afa1a5c5c5ed9871a3794e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 20 May 2026 20:03:30 +0000 Subject: [PATCH 08/12] Pre-commit auto-fix --- source/braille.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/source/braille.py b/source/braille.py index bf6e733dd8a..4f3f8aa662b 100644 --- a/source/braille.py +++ b/source/braille.py @@ -67,7 +67,6 @@ import hwPortUtils import bdDetect import queueHandler -import winUser import brailleViewer from autoSettingsUtils.driverSetting import BooleanDriverSetting, NumericDriverSetting from utils.security import objectBelowLockScreenAndWindowsIsLocked, post_sessionLockStateChanged @@ -3976,18 +3975,6 @@ def getDisplayTextForIdentifier(cls, identifier): inputCore.registerGestureSource("br", BrailleDisplayGesture) -from _brailleMirror import ( - BrailleMirror, - DirectBrailleWindow, - injectGesture, - registerMirror, - unregisterMirror, - _mirrorFilterDisplayDimensions, - _mirrorPreWriteCells, - _registeredMirrors, -) - - def getSerialPorts(filterFunc=None) -> typing.Iterator[typing.Tuple[str, str]]: """Get available serial ports in a format suitable for L{BrailleDisplayDriver.getManualPorts}. @param filterFunc: a function executed on every dictionary retrieved using L{hwPortUtils.listComPorts}. From d92217a0310ad906dba63e7cdb8daf74a05174e5 Mon Sep 17 00:00:00 2001 From: Quin Gillespie Date: Thu, 18 Jun 2026 10:02:16 -0600 Subject: [PATCH 09/12] Fix circular import in _brailleMirror causing CI AttributeError Python was returning a partially-initialized braille module when braillePipeServer tried to subclass braille.BrailleMirror, because _brailleMirror imported braille at module level during braille's own load (braille -> _brailleMirror -> import braille circular). Replace the module-level import with lazy per-function imports and a TYPE_CHECKING guard so Pyright retains full type information without creating a circular import at runtime. --- source/_brailleMirror.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/source/_brailleMirror.py b/source/_brailleMirror.py index 6db247b3f40..40662eaa550 100644 --- a/source/_brailleMirror.py +++ b/source/_brailleMirror.py @@ -15,11 +15,10 @@ import inputCore import winUser import wx +from typing import TYPE_CHECKING -# Imported at module level for runtime isinstance checks and attribute access. -# All references to braille.X names are inside function/method bodies so the -# circular import (braille -> _brailleMirror -> braille) is safe. -import braille +if TYPE_CHECKING: + import braille class BrailleMirror: @@ -55,6 +54,7 @@ def _mirrorPreWriteCells(cells: list[int], **kwargs) -> None: def _mirrorFilterDisplayDimensions(value: braille.DisplayDimensions) -> braille.DisplayDimensions: + import braille sizes = [m.numCells() for m in _registeredMirrors if m.numCells() > 0] if not sizes: return value @@ -69,6 +69,7 @@ def registerMirror(mirror: BrailleMirror) -> None: :meth:`BrailleMirror.display` will be called on the main thread for every subsequent :meth:`braille.BrailleHandler._writeCells` call. If *mirror* returns a positive value from :meth:`BrailleMirror.numCells`, it will also participate in display-width negotiation via :data:`braille.filter_displayDimensions`. """ + import braille if not _registeredMirrors: braille.pre_writeCells.register(_mirrorPreWriteCells) braille.filter_displayDimensions.register(_mirrorFilterDisplayDimensions) @@ -82,6 +83,7 @@ def unregisterMirror(mirror: BrailleMirror) -> None: Safe to call even if *mirror* is not currently registered. """ + import braille try: _registeredMirrors.remove(mirror) except ValueError: @@ -133,6 +135,7 @@ def display(self, cells: list[int]) -> None: Thread safety: safe to call from any thread; the actual write is dispatched to the main thread via ``wx.CallAfter``. """ + import braille if braille.handler and self._isForeground(): wx.CallAfter(braille.handler._writeCells, cells) @@ -148,6 +151,7 @@ def _handleDecideEnabled(self) -> bool: return not self._isForeground() def _handleDecideExecuteGesture(self, gesture: inputCore.InputGesture) -> bool: + import braille if not self._isForeground(): return True if isinstance(gesture, braille.BrailleDisplayGesture): @@ -169,6 +173,7 @@ def activate(self) -> None: """ if self._active: return + import braille self._active = True braille.decide_enabled.register(self._handleDecideEnabled) inputCore.decide_executeGesture.register(self._handleDecideExecuteGesture) @@ -184,6 +189,7 @@ def deactivate(self) -> None: """ if not self._active: return + import braille self._active = False braille.decide_enabled.unregister(self._handleDecideEnabled) inputCore.decide_executeGesture.unregister(self._handleDecideExecuteGesture) From d0acda57b564241d56c9c2b40bd82d7270b8ede5 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 18 Jun 2026 16:07:48 +0000 Subject: [PATCH 10/12] Pre-commit auto-fix --- source/_brailleMirror.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/_brailleMirror.py b/source/_brailleMirror.py index 40662eaa550..64043ac7b5a 100644 --- a/source/_brailleMirror.py +++ b/source/_brailleMirror.py @@ -54,7 +54,6 @@ def _mirrorPreWriteCells(cells: list[int], **kwargs) -> None: def _mirrorFilterDisplayDimensions(value: braille.DisplayDimensions) -> braille.DisplayDimensions: - import braille sizes = [m.numCells() for m in _registeredMirrors if m.numCells() > 0] if not sizes: return value @@ -70,6 +69,7 @@ def registerMirror(mirror: BrailleMirror) -> None: :meth:`BrailleMirror.display` will be called on the main thread for every subsequent :meth:`braille.BrailleHandler._writeCells` call. If *mirror* returns a positive value from :meth:`BrailleMirror.numCells`, it will also participate in display-width negotiation via :data:`braille.filter_displayDimensions`. """ import braille + if not _registeredMirrors: braille.pre_writeCells.register(_mirrorPreWriteCells) braille.filter_displayDimensions.register(_mirrorFilterDisplayDimensions) @@ -84,6 +84,7 @@ def unregisterMirror(mirror: BrailleMirror) -> None: Safe to call even if *mirror* is not currently registered. """ import braille + try: _registeredMirrors.remove(mirror) except ValueError: @@ -136,6 +137,7 @@ def display(self, cells: list[int]) -> None: Thread safety: safe to call from any thread; the actual write is dispatched to the main thread via ``wx.CallAfter``. """ import braille + if braille.handler and self._isForeground(): wx.CallAfter(braille.handler._writeCells, cells) @@ -152,6 +154,7 @@ def _handleDecideEnabled(self) -> bool: def _handleDecideExecuteGesture(self, gesture: inputCore.InputGesture) -> bool: import braille + if not self._isForeground(): return True if isinstance(gesture, braille.BrailleDisplayGesture): @@ -174,6 +177,7 @@ def activate(self) -> None: if self._active: return import braille + self._active = True braille.decide_enabled.register(self._handleDecideEnabled) inputCore.decide_executeGesture.register(self._handleDecideExecuteGesture) @@ -190,6 +194,7 @@ def deactivate(self) -> None: if not self._active: return import braille + self._active = False braille.decide_enabled.unregister(self._handleDecideEnabled) inputCore.decide_executeGesture.unregister(self._handleDecideExecuteGesture) From f10c337e63a88e14832c3a92f8683f979a940023 Mon Sep 17 00:00:00 2001 From: Quin Gillespie Date: Thu, 18 Jun 2026 10:42:49 -0600 Subject: [PATCH 11/12] Add missing re-export of BrailleMirror API from braille.py The from _brailleMirror import (...) block was never committed, so braille.BrailleMirror, braille.DirectBrailleWindow etc. did not exist in the braille namespace. braillePipeServer then failed at module load time with AttributeError when it tried to subclass braille.BrailleMirror. --- source/braille.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/source/braille.py b/source/braille.py index e9cc07d2018..4230143e019 100644 --- a/source/braille.py +++ b/source/braille.py @@ -4274,3 +4274,17 @@ def _speakOnNavigatingByUnit(info: textInfos.TextInfo, readingUnit: str) -> None copy.expand(readingUnit) cancelSpeech() speakTextInfo(copy, unit=readingUnit, reason=controlTypes.OutputReason.CARET) + + +# Re-export the public braille mirror/direct-window API so callers can use +# braille.BrailleMirror etc. without importing _brailleMirror directly. +from _brailleMirror import ( + BrailleMirror, + DirectBrailleWindow, + injectGesture, + registerMirror, + unregisterMirror, + _mirrorFilterDisplayDimensions, + _mirrorPreWriteCells, + _registeredMirrors, +) From e62a857f1fbab02ea85c9c8a6be9a1ae704324a5 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 18 Jun 2026 16:44:38 +0000 Subject: [PATCH 12/12] Pre-commit auto-fix --- source/braille.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/source/braille.py b/source/braille.py index 4230143e019..dfc5eb45789 100644 --- a/source/braille.py +++ b/source/braille.py @@ -4278,13 +4278,3 @@ def _speakOnNavigatingByUnit(info: textInfos.TextInfo, readingUnit: str) -> None # Re-export the public braille mirror/direct-window API so callers can use # braille.BrailleMirror etc. without importing _brailleMirror directly. -from _brailleMirror import ( - BrailleMirror, - DirectBrailleWindow, - injectGesture, - registerMirror, - unregisterMirror, - _mirrorFilterDisplayDimensions, - _mirrorPreWriteCells, - _registeredMirrors, -)