diff --git a/pyproject.toml b/pyproject.toml index c42d2c6f58a..151ca23ce4d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ readme = "readme.md" license = {file = "copying.txt"} dependencies = [ # NVDA's runtime dependencies + "bleak==1.1.0", "comtypes==1.4.11", "cryptography==44.0.1", "pyserial==3.5", @@ -127,6 +128,14 @@ ignore_packages = [ "Markdown", # BSD-3-Clause, but not in PyPI correctly "markdown-link-attr-modifier", # GPLV3 license, but not in PyPI correctly "pycaw", # MIT license, but not in PyPI + "winrt-Windows.Devices.Bluetooth", # MIT license, but not in PyPI + "winrt-Windows.Devices.Bluetooth.Advertisement", # MIT license, but not in PyPI + "winrt-Windows.Devices.Bluetooth.GenericAttributeProfile", # MIT license, but not in PyPI + "winrt-Windows.Devices.Enumeration", # MIT license, but not in PyPI + "winrt-Windows.Foundation", # MIT license, but not in PyPI + "winrt-Windows.Foundation.Collections", # MIT license, but not in PyPI + "winrt-Windows.Storage.Streams", # MIT license, but not in PyPI + "winrt-runtime", # MIT license, but not in PyPI "wxPython", # wxWindows Library License ] diff --git a/source/asyncioEventLoop.py b/source/asyncioEventLoop.py new file mode 100644 index 00000000000..345db5d62dc --- /dev/null +++ b/source/asyncioEventLoop.py @@ -0,0 +1,89 @@ +# A part of NonVisual Desktop Access (NVDA) +# Copyright (C) 2025 NV Access Limited, Dot Incorporated, Bram Duvigneau +# This file is covered by the GNU General Public License. +# See the file COPYING for more details. + +""" +Provide an asyncio event loop +""" + +import asyncio +from collections.abc import Coroutine +from threading import Thread + +from logHandler import log + +TERMINATE_TIMEOUT_SECONDS = 5 +"Time to wait for tasks to finish while terminating the event loop." + +eventLoop: asyncio.BaseEventLoop +"The asyncio event loop used by NVDA." +asyncioThread: Thread +"Thread running the asyncio event loop." + + +def initialize(): + """Initialize and start the asyncio event loop.""" + global eventLoop, asyncioThread + log.info("Initializing asyncio event loop") + eventLoop = asyncio.new_event_loop() + asyncio.set_event_loop(eventLoop) + asyncioThread = Thread(target=eventLoop.run_forever, daemon=True) + asyncioThread.start() + + +def terminate(): + global eventLoop, asyncioThread + log.info("Terminating asyncio event loop") + + async def cancelAllTasks(): + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + log.debug(f"Stopping {len(tasks)} tasks") + [task.cancel() for task in tasks] + await asyncio.gather(*tasks, return_exceptions=True) + log.debug("Done stopping tasks") + + try: + runCoroutineSync(cancelAllTasks(), TERMINATE_TIMEOUT_SECONDS) + except TimeoutError: + log.debugWarning("Timeout while stopping async tasks") + finally: + eventLoop.call_soon_threadsafe(eventLoop.stop) + + asyncioThread.join() + asyncioThread = None + eventLoop.close() + + +def runCoroutine(coro: Coroutine) -> asyncio.Future: + """Schedule a coroutine to be run on the asyncio event loop. + + :param coro: The coroutine to run. + """ + if asyncioThread is None or not asyncioThread.is_alive(): + raise RuntimeError("Asyncio event loop thread is not running") + return asyncio.run_coroutine_threadsafe(coro, eventLoop) + + +def runCoroutineSync(coro: Coroutine, timeout: float | None = None): + """Schedule a coroutine to be run on the asyncio event loop and wait for the result. + + This is a synchronous wrapper around runCoroutine() that blocks until the coroutine + completes and returns the result directly, or raises any exception that occurred. + + :param coro: The coroutine to run. + :param timeout: Optional timeout in seconds. If None, waits indefinitely. + :return: The result of the coroutine. + :raises: Any exception raised by the coroutine. + :raises TimeoutError: If the timeout is exceeded. + :raises RuntimeError: If the asyncio event loop thread is not running. + """ + future = runCoroutine(coro) + try: + # Wait for the future to complete and get the result + # This will raise any exception that occurred in the coroutine + return future.result(timeout) + except asyncio.TimeoutError as e: + # Cancel the coroutine since it timed out + future.cancel() + raise TimeoutError(f"Coroutine execution timed out after {timeout} seconds") from e diff --git a/source/bdDetect.py b/source/bdDetect.py index 50c112ee637..f52baa73f88 100644 --- a/source/bdDetect.py +++ b/source/bdDetect.py @@ -1,7 +1,7 @@ # A part of NonVisual Desktop Access (NVDA) # This file is covered by the GNU General Public License. # See the file COPYING for more details. -# Copyright (C) 2013-2025 NV Access Limited, Babbage B.V., Leonard de Ruijter, Christian Comaschi +# Copyright (C) 2013-2025 NV Access Limited, Babbage B.V., Leonard de Ruijter, Christian Comaschi, Dot Incorporated, Bram Duvigneau """Support for braille display detection. This allows devices to be automatically detected and used when they become available, @@ -16,6 +16,7 @@ from functools import partial import itertools import threading +import time from concurrent.futures import ThreadPoolExecutor, Future from enum import StrEnum from typing import ( @@ -24,6 +25,10 @@ ) from collections import OrderedDict from collections.abc import Callable, Generator, Iterable, Iterator +from bleak.backends.device import BLEDevice +from bleak.backends.scanner import AdvertisementData +import hwIo +import hwIo.ble import hwPortUtils import NVDAState import braille @@ -52,6 +57,8 @@ class ProtocolType(StrEnum): """Serial devices (COM ports)""" CUSTOM = "custom" """Devices with a manufacturer specific protocol""" + BLE = "ble" + """Bluetooth Low Energy devices using BLE commands and notifications""" class CommunicationType(StrEnum): @@ -59,6 +66,8 @@ class CommunicationType(StrEnum): """Bluetooth devices""" USB = "usb" """USB devices""" + BLE = "ble" + """Bluetooth Low Energy devices""" class _DeviceTypeMeta(type): @@ -169,8 +178,10 @@ def matches(self, deviceMatch: DeviceMatch) -> bool: Handlers are called with these keyword arguments: @param usb: Whether the handler is expected to yield USB devices. @type usb: bool -@param bluetooth: Whether the handler is expected to yield USB devices. +@param bluetooth: Whether the handler is expected to yield Bluetooth devices. @type bluetooth: bool +@param ble: Whether the handler is expected to yield BLE devices. +@type ble: bool @param limitToDevices: Drivers to which detection should be limited. ``None`` if no driver filtering should occur. @type limitToDevices: Optional[List[str]] @@ -406,12 +417,16 @@ def __init__(self): self._stopEvent = threading.Event() self._detectUsb = True self._detectBluetooth = True + self._detectBle = True self._limitToDevices: list[str] | None = None + # Register for real-time BLE device discovery notifications + hwIo.ble.scanner.deviceDiscovered.register(self._onBleDeviceDiscovered) def _queueBgScan( self, usb: bool = False, bluetooth: bool = False, + ble: bool = False, limitToDevices: list[str] | None = None, preferredDevice: DriverAndDeviceMatch | None = None, ): @@ -420,6 +435,7 @@ def _queueBgScan( To explicitely cancel a scan in progress, use L{rescan}. :param usb: Whether USB devices should be detected for this and subsequent scans. :param bluetooth: Whether Bluetooth devices should be detected for this and subsequent scans. + :param ble: Whether BLE devices should be detected for this and subsequent scans. :param limitToDevices: Drivers to which detection should be limited for this and subsequent scans. ``None`` if default driver filtering according to config should occur. :param preferredDevice: An optional preferred device to use for detection before scanning. @@ -427,15 +443,17 @@ def _queueBgScan( """ if _isDebug(): log.debug( - "Queuing background scan: usb=%r, bluetooth=%r, limitToDevices=%r, preferredDevice=%r", + "Queuing background scan: usb=%r, bluetooth=%r, ble=%r, limitToDevices=%r, preferredDevice=%r", usb, bluetooth, + ble, limitToDevices, preferredDevice, ) self._detectUsb = usb self._detectBluetooth = bluetooth + self._detectBle = ble if limitToDevices is None and config.conf["braille"]["auto"]["excludedDisplays"]: limitToDevices = list(getBrailleDisplayDriversEnabledForDetection()) if limitToDevices and _isDebug(): @@ -455,6 +473,7 @@ def _queueBgScan( self._bgScan, usb, bluetooth, + ble, limitToDevices, preferredDevice, ) @@ -470,6 +489,11 @@ def _stopBgScan(self): if _isDebug(): log.debug("Cancelling queued future for next background scan") self._queuedFuture.cancel() + # Stop the BLE scanner if it's running + if hwIo.ble.scanner.isScanning: + if _isDebug(): + log.debug("Stopping BLE scanner") + hwIo.ble.scanner.stop() @staticmethod def _bgScanUsb( @@ -507,10 +531,23 @@ def _bgScanBluetooth( if btDevsCache is not btDevs: deviceInfoFetcher.btDevsCache = btDevsCache + @staticmethod + def _bgScanBle( + ble: bool = True, + limitToDevices: list[str] | None = None, + ): + """Handler for L{scanForDevices} that yields BLE devices. + See the L{scanForDevices} documentation for information about the parameters. + """ + if not ble: + return + yield from getDriversForBleDevices(limitToDevices) + def _bgScan( self, usb: bool, bluetooth: bool, + ble: bool, limitToDevices: list[str] | None, preferredDevice: DriverAndDeviceMatch | None, ): @@ -518,6 +555,7 @@ def _bgScan( this function should be run on a background thread. :param usb: Whether USB devices should be detected for this particular scan. :param bluetooth: Whether Bluetooth devices should be detected for this particular scan. + :param ble: Whether BLE devices should be detected for this particular scan. :param limitToDevices: Drivers to which detection should be limited for this scan. ``None`` if no driver filtering should occur. :param preferredDevice: An optional preferred device to use for detection before scanning. @@ -525,21 +563,35 @@ def _bgScan( """ if _isDebug(): log.debug( - "Starting background scan: usb=%r, bluetooth=%r, limitToDevices=%r, preferredDevice=%r", + "Starting background scan: usb=%r, bluetooth=%r, ble=%r, limitToDevices=%r, preferredDevice=%r", usb, bluetooth, + ble, limitToDevices, preferredDevice, ) # Clear the stop event before a scan is started. # Since a scan can take some time to complete, another thread can set the stop event to cancel it. self._stopEvent.clear() + + # Start BLE scanner if needed for this scan + if ble and not hwIo.ble.scanner.isScanning: + if _isDebug(): + log.debug("Starting BLE scanner for background scan") + hwIo.ble.scanner.start() + # Give the scanner some time to get initial results + time.sleep(0.2) if preferredDevice: if _isDebug(): log.debug("Trying preferred device first: %r", preferredDevice) if braille.handler.setDisplayByName(preferredDevice[0], detected=preferredDevice[1]): if _isDebug(): log.debug("Switched to preferred device: %r", preferredDevice[0]) + # Device connected - stop BLE scanner to save resources + if hwIo.ble.scanner.isScanning: + if _isDebug(): + log.debug("Stopping BLE scanner after device connected") + hwIo.ble.scanner.stop() return elif _isDebug(): log.debug("Failed to switch to preferred device, continuing scan: %r", preferredDevice) @@ -550,6 +602,7 @@ def _bgScan( iterator = scanForDevices.iter( usb=usb, bluetooth=bluetooth, + ble=ble, limitToDevices=limitToDevices, ) for driver, match in iterator: @@ -560,6 +613,11 @@ def _bgScan( if braille.handler.setDisplayByName(driver, detected=match): if _isDebug(): log.debug("Switched to driver %r, match %r", driver, match) + # Device connected - stop BLE scanner to save resources + if hwIo.ble.scanner.isScanning: + if _isDebug(): + log.debug("Stopping BLE scanner after device connected") + hwIo.ble.scanner.stop() return elif _isDebug(): log.debug("Failed to switch to driver %r, match %r. Continuing", driver, match) @@ -570,12 +628,14 @@ def rescan( self, usb: bool = True, bluetooth: bool = True, + ble: bool = True, limitToDevices: list[str] | None = None, preferredDevice: DriverAndDeviceMatch | None = None, ): """Stop a current scan when in progress, and start scanning from scratch. :param usb: Whether USB devices should be detected for this and subsequent scans. :param bluetooth: Whether Bluetooth devices should be detected for this and subsequent scans. + :param ble: Whether BLE devices should be detected for this and subsequent scans. :param limitToDevices: Drivers to which detection should be limited for this and subsequent scans. ``None`` if default driver filtering according to config should occur. :param preferredDevice: An optional preferred device to use for detection before scanning. @@ -587,13 +647,18 @@ def rescan( self._queueBgScan( usb=usb, bluetooth=bluetooth, + ble=ble, limitToDevices=limitToDevices, preferredDevice=preferredDevice, ) def handleWindowMessage(self, msg=None, wParam=None): if msg == winUser.WM_DEVICECHANGE and wParam == DBT_DEVNODES_CHANGED: - self.rescan(bluetooth=self._detectBluetooth, limitToDevices=self._limitToDevices) + self.rescan( + bluetooth=self._detectBluetooth, + ble=self._detectBle, + limitToDevices=self._limitToDevices, + ) def pollBluetoothDevices(self): """Poll bluetooth devices that might be in range. @@ -603,11 +668,91 @@ def pollBluetoothDevices(self): return if not deviceInfoFetcher.btDevsCache: return - self._queueBgScan(bluetooth=self._detectBluetooth, limitToDevices=self._limitToDevices) + self._queueBgScan( + bluetooth=self._detectBluetooth, + ble=self._detectBle, + limitToDevices=self._limitToDevices, + ) + + def _getBleDeviceMatch(self, device: BLEDevice) -> DriverAndDeviceMatch | None: + """Check if BLE device matches any registered driver. + + :param device: The BLE device to check + :return: Tuple of (driver_name, DeviceMatch) if match found, None otherwise + """ + # Create DeviceMatch for this device + match = DeviceMatch( + ProtocolType.BLE, + device.name or device.address, + device.address, + { + "name": device.name or "", + "address": device.address, + "provider": CommunicationType.BLE, + }, + ) + + # Check against registered drivers (respect limitToDevices filter) + driversToCheck = ( + ((driver, devs) for driver, devs in _driverDevices.items() if driver in self._limitToDevices) + if self._limitToDevices + else _driverDevices.items() + ) + + for driver, devs in driversToCheck: + matchFunc = devs.get(CommunicationType.BLE) + if callable(matchFunc) and matchFunc(match): + return (driver, match) + + return None + + def _onBleDeviceDiscovered( + self, + device: BLEDevice, + advertisementData: AdvertisementData, + isNew: bool, + ) -> None: + """Handler for real-time BLE device discoveries. + + Immediately attempts to connect when a new BLE device matching + a registered driver is discovered, providing much faster connection + than waiting for periodic app-switch polling. + + :param device: The BLE device that was discovered + :param advertisementData: Advertisement data from the device + :param isNew: True if this is the first time seeing this device + """ + # Only react to newly discovered devices + if not isNew: + return + + # Check if BLE detection is enabled + if not self._detectBle: + return + + # Find matching driver for this device + match = self._getBleDeviceMatch(device) + if not match: + return + + # Queue scan with this device as preferred + driver, deviceMatch = match + if _isDebug(): + log.debug( + f"New BLE device {device.name or device.address} matches driver {driver}, " + f"queueing connection attempt", + ) + + self._queueBgScan( + ble=True, + limitToDevices=self._limitToDevices, + preferredDevice=(driver, deviceMatch), + ) def terminate(self): appModuleHandler.post_appSwitch.unregister(self.pollBluetoothDevices) messageWindow.pre_handleWindowMessage.unregister(self.handleWindowMessage) + hwIo.ble.scanner.deviceDiscovered.unregister(self._onBleDeviceDiscovered) self._stopBgScan() # Clear the cache of bluetooth devices so new devices can be picked up with a new instance. deviceInfoFetcher.btDevsCache = None @@ -655,6 +800,83 @@ def getConnectedUsbDevicesForDriver(driver: str) -> Iterator[DeviceMatch]: yield match +def getDriversForBleDevices( + limitToDevices: list[str] | None = None, +) -> Iterator[DriverAndDeviceMatch]: + """Get any matching drivers for BLE devices. + :param limitToDevices: Drivers to which detection should be limited. + ``None`` if no driver filtering should occur. + :return: Generator of pairs of drivers and device information. + """ + if limitToDevices and _isDebug(): + log.debug("Limiting BLE device detection to drivers: %r", limitToDevices) + + # Check if any drivers support BLE before starting the scanner + driversToCheck = ( + ((driver, devs) for driver, devs in _driverDevices.items() if driver in limitToDevices) + if limitToDevices + else _driverDevices.items() + ) + hasBleDrivers = any(callable(devs.get(CommunicationType.BLE)) for _, devs in driversToCheck) + if not hasBleDrivers: + if _isDebug(): + log.debug("No drivers with BLE support registered, skipping BLE scan") + return + + # Use the module-level singleton scanner + scanner = hwIo.ble.scanner + if not scanner.isScanning: + if _isDebug(): + log.debugWarning("BLE scanner not running, results may be incomplete") + + scanResults = scanner.results() + for device in scanResults: + # Create DeviceMatch for each BLE device + # id: Display name for UI (device name if available, otherwise address) + # port: BLE address for unique identification and connection + # deviceInfo: All information as strings for backwards compatibility + match = DeviceMatch( + ProtocolType.BLE, + device.name or device.address, + device.address, + { + "name": device.name or "", + "address": device.address, + "provider": CommunicationType.BLE, + }, + ) + + for driver, devs in _driverDevices.items(): + if limitToDevices and driver not in limitToDevices: + if _isDebug(): + log.debug("Skipping excluded driver %r for BLE device match: %r", driver, match) + continue + + matchFunc = devs[CommunicationType.BLE] + if not callable(matchFunc): + if _isDebug(): + log.debugWarning( + "Skipping non-callable matchFunc %r for BLE device match: %r", + matchFunc, + match, + ) + continue + + if matchFunc(match): + if _isDebug(): + log.debug("Found BLE device match: %r for driver %r", match, driver) + yield (driver, match) + + +def getBleDevicesForDriver(driver: str) -> Iterator[DeviceMatch]: + """Get any BLE devices associated with a particular driver. + :param driver: The name of the driver. + :return: Generator of device information for matching BLE devices. + """ + for _driverName, match in getDriversForBleDevices(limitToDevices=[driver]): + yield match + + def getPossibleBluetoothDevicesForDriver(driver: str) -> Iterator[DeviceMatch]: """Get any possible Bluetooth devices associated with a particular driver. @param driver: The name of the driver. @@ -695,6 +917,7 @@ def driverHasPossibleDevices(driver: str) -> bool: itertools.chain( getConnectedUsbDevicesForDriver(driver), getPossibleBluetoothDevicesForDriver(driver), + getBleDevicesForDriver(driver), ), None, ), @@ -748,6 +971,7 @@ def initialize(): scanForDevices.register(_Detector._bgScanUsb) scanForDevices.register(_Detector._bgScanBluetooth) + scanForDevices.register(_Detector._bgScanBle) # Add devices for display in getSupportedBrailleDisplayDrivers(): @@ -760,8 +984,12 @@ def initialize(): def terminate(): global deviceInfoFetcher _driverDevices.clear() + scanForDevices.unregister(_Detector._bgScanBle) scanForDevices.unregister(_Detector._bgScanBluetooth) scanForDevices.unregister(_Detector._bgScanUsb) + # Stop BLE scanner if running + if hwIo.ble.scanner.isScanning: + hwIo.ble.scanner.stop() deviceInfoFetcher = None @@ -860,6 +1088,15 @@ def addBluetoothDevices(self, matchFunc: MatchFuncT): devs = self._getDriverDict() devs[CommunicationType.BLUETOOTH] = matchFunc + def addBleDevices(self, matchFunc: MatchFuncT): + """Associate BLE devices with the driver on this instance. + @param matchFunc: A function which determines whether a given BLE device matches. + It takes a L{DeviceMatch} as its only argument + and returns a C{bool} indicating whether it matched. + """ + devs = self._getDriverDict() + devs[CommunicationType.BLE] = matchFunc + def addDeviceScanner( self, scanFunc: Callable[..., Iterable[DriverAndDeviceMatch]], @@ -871,8 +1108,10 @@ def addDeviceScanner( The callable is called with these keyword arguments: @param usb: Whether the handler is expected to yield USB devices. @type usb: bool - @param bluetooth: Whether the handler is expected to yield USB devices. + @param bluetooth: Whether the handler is expected to yield Bluetooth devices. @type bluetooth: bool + @param ble: Whether the handler is expected to yield BLE devices. + @type ble: bool @param limitToDevices: Drivers to which detection should be limited. ``None`` if no driver filtering should occur. @type limitToDevices: Optional[List[str]] diff --git a/source/braille.py b/source/braille.py index 80d85f119db..f12b2f5f5cf 100644 --- a/source/braille.py +++ b/source/braille.py @@ -2788,10 +2788,11 @@ def setDisplayByName( elif ( "bluetoothName" in detected.deviceInfo or detected.deviceInfo.get("provider") == "bluetooth" + or detected.deviceInfo.get("provider") in ("bluetooth", "ble") ): # As USB devices have priority over Bluetooth, keep a detector running to switch to USB when connected. # Note that the detector should always be running in this situation, so we can trigger a rescan. - self._detector.rescan(bluetooth=False, limitToDevices=[newDisplayClass.name]) + self._detector.rescan(bluetooth=False, ble=False, limitToDevices=[newDisplayClass.name]) else: self._disableDetection() return True @@ -3325,6 +3326,7 @@ def _enableDetection( self, usb: bool = True, bluetooth: bool = True, + ble: bool = True, limitToDevices: Optional[List[str]] = None, preferredDevice: bdDetect.DriverAndDeviceMatch | None = None, ): @@ -3334,6 +3336,7 @@ def _enableDetection( In that case, it is triggered by L{setDisplayByname}. :param usb: Whether to scan for USB devices :param bluetooth: Whether to scan for Bluetooth devices. + :param ble: Whether to scan for Bluetooth Low Energy devices. :param limitToDevices: An optional list of driver names a scan should be limited to. This is used when a Bluetooth device is detected, in order to switch to USB when an USB device for the same driver is found. @@ -3346,6 +3349,7 @@ def _enableDetection( self._detector.rescan( usb=usb, bluetooth=bluetooth, + ble=ble, limitToDevices=limitToDevices, preferredDevice=preferredDevice, ) @@ -3355,6 +3359,7 @@ def _enableDetection( self._detector._queueBgScan( usb=usb, bluetooth=bluetooth, + ble=ble, limitToDevices=limitToDevices, preferredDevice=preferredDevice, ) @@ -3631,6 +3636,23 @@ def getPossiblePorts(cls) -> typing.OrderedDict[str, str]: ports.update((USB_PORT,)) if bluetooth: ports.update((BLUETOOTH_PORT,)) + # Add individual BLE devices + try: + bleDevices = list(bdDetect.getBleDevicesForDriver(cls.name)) + if bleDevices: + # Ensure "auto" option is present if we have BLE devices + if AUTOMATIC_PORT[0] not in ports: + ports.update((AUTOMATIC_PORT,)) + # Add each BLE device as a selectable port + for match in bleDevices: + # Format: "ble:DeviceName@Address" for unique identification + portKey = f"{match.type}:{match.id}@{match.port}" + # Translators: Name of a Bluetooth Low Energy braille display port + portName = _("Bluetooth: {deviceName}").format(deviceName=match.id) + ports[portKey] = portName + except Exception: + # If BLE scanning fails, continue without BLE devices + pass try: ports.update(cls.getManualPorts()) except NotImplementedError: @@ -3681,6 +3703,37 @@ def _getTryPorts( if isinstance(port, bdDetect.DeviceMatch): yield port elif isinstance(port, str): + # Check if this is a specific BLE device port (format: "ble:DeviceName@Address" or legacy "ble:DeviceName") + if port.startswith("ble:"): + portContent = port[4:] # Remove "ble:" prefix + + # Parse name@address format (new) or plain name (legacy) + if "@" in portContent: + deviceName, address = portContent.rsplit("@", 1) + else: + # Legacy format without address + deviceName = portContent + address = None + + # Try to find device in current scan results first (preferred) + for match in bdDetect.getBleDevicesForDriver(cls.name): + if match.id == deviceName or (address and match.port == address): + yield match + return + + # Fallback: If we have an address but device not in scan, create DeviceMatch from config + if address: + log.debug( + f"BLE device {deviceName} not in scan results, " + f"attempting connection by address {address}", + ) + yield bdDetect.DeviceMatch( + bdDetect.ProtocolType.BLE, + deviceName, # id + address, # port + {"name": deviceName, "address": address}, + ) + return isUsb = port in (AUTOMATIC_PORT[0], USB_PORT[0]) isBluetooth = port in (AUTOMATIC_PORT[0], BLUETOOTH_PORT[0]) if not isUsb and not isBluetooth: diff --git a/source/brailleDisplayDrivers/dotPad/defs.py b/source/brailleDisplayDrivers/dotPad/defs.py index c4215d0d0cd..064078dd219 100644 --- a/source/brailleDisplayDrivers/dotPad/defs.py +++ b/source/brailleDisplayDrivers/dotPad/defs.py @@ -106,3 +106,8 @@ class DP_PerkinsKey(enum.IntEnum): DP_CHECKSUM_BASE = 0xA5 + +# BLE service and characteristic UUIDs +BLE_SERVICE_UUID = "49535343-fe7d-4ae5-8fa9-9fafd205e455" +BLE_READ_CHARACTERISTIC_UUID = "49535343-1e4d-4bd9-ba61-23c647249616" +BLE_WRITE_CHARACTERISTIC_UUID = "49535343-8841-43f4-a8d4-ecbe34729bb3" diff --git a/source/brailleDisplayDrivers/dotPad/driver.py b/source/brailleDisplayDrivers/dotPad/driver.py index d7b0f5d51bb..30c9311954b 100644 --- a/source/brailleDisplayDrivers/dotPad/driver.py +++ b/source/brailleDisplayDrivers/dotPad/driver.py @@ -10,10 +10,10 @@ import enum from dataclasses import dataclass import serial +import hwIo.ble import inputCore import braille import winBindings.kernel32 -import hwIo import bdDetect from logHandler import log from autoSettingsUtils.driverSetting import DriverSetting @@ -29,6 +29,9 @@ DP_PerkinsKey, DP_BoardInformation, DP_CHECKSUM_BASE, + BLE_SERVICE_UUID, + BLE_READ_CHARACTERISTIC_UUID, + BLE_WRITE_CHARACTERISTIC_UUID, ) @@ -96,6 +99,27 @@ class BrailleDisplayDriver(braille.BrailleDisplayDriver): def getManualPorts(cls): return braille.getSerialPorts() + @classmethod + def check(cls) -> bool: + """DotPad is available if BLE is supported or manual ports exist. + + This allows DotPad to appear in the braille display list even when + no devices are currently detected, enabling users to manually select + BLE devices after the GUI triggers a scan. + """ + # Check if BLE is available on this system + if hwIo.ble.isAvailable(): + return True + + # Fallback: check if manual serial ports exist + try: + next(cls.getManualPorts()) + return True + except (StopIteration, NotImplementedError): + pass + + return False + @classmethod def registerAutomaticDetection(cls, driverRegistrar: bdDetect.DriverRegistrar): driverRegistrar.addUsbDevices( @@ -104,6 +128,16 @@ def registerAutomaticDetection(cls, driverRegistrar: bdDetect.DriverRegistrar): "VID_0403&PID_6010", # FTDI Dual RS232 as used in DotPad320A }, ) + driverRegistrar.addBleDevices(cls._isBleDotPad) + + @staticmethod + def _isBleDotPad(match: bdDetect.DeviceMatch) -> bool: + """Check if a BLE device is a DotPad display. + + :param match: DeviceMatch object containing BLE device information + :return: True if device ID (name or address) starts with "DotPad" + """ + return match.id.startswith("DotPad") supportedSettings = [ DriverSetting( @@ -115,6 +149,8 @@ def registerAutomaticDetection(cls, driverRegistrar: bdDetect.DriverRegistrar): ] _lastResponse: dict[int, CommandResponse] = {} + _receiveBuffer: bytearray = bytearray() + MAX_PACKET_SIZE = 512 # Safety limit to prevent memory issues with malformed streams def _sendCommand( self, @@ -157,20 +193,76 @@ def _sendCommand( return response.data return b"" - def _onReceive(self, header1: bytes): - if ord(header1) != DP_PacketSyncByte.SYNC1: - raise RuntimeError(f"Bad {header1=}") - header2 = self._dev.read(1) - if ord(header2) != DP_PacketSyncByte.SYNC2: - raise RuntimeError(f"bad {header2=}") - length = struct.unpack(">H", self._dev.read(2))[0] - packetBody = self._dev.read(length) + def _onReceive(self, data: bytes): + """Handle received data from either Serial (1 byte) or BLE (full packets). + + This method buffers incoming data and extracts complete packets as they arrive. + It works with both byte-at-a-time delivery (Serial) and packet-based delivery (BLE). + + :param data: Received data (1 byte for Serial, variable length for BLE) + """ + self._receiveBuffer.extend(data) + + # Safety check: prevent unbounded buffer growth from malformed streams + if len(self._receiveBuffer) > self.MAX_PACKET_SIZE: + log.warning( + f"Receive buffer exceeded {self.MAX_PACKET_SIZE} bytes, discarding data and resyncing", + ) + self._receiveBuffer.clear() + return + + # Extract and process all complete packets from the buffer + while len(self._receiveBuffer) >= 4: # Minimum: SYNC1 + SYNC2 + length (2 bytes) + # Check for first sync byte + if self._receiveBuffer[0] != DP_PacketSyncByte.SYNC1: + # Discard bad byte and try to resynchronize + log.debug(f"Bad first sync byte: 0x{self._receiveBuffer[0]:02x}, discarding") + self._receiveBuffer.pop(0) + continue + + # Check for second sync byte + if self._receiveBuffer[1] != DP_PacketSyncByte.SYNC2: + # Discard first byte and try again + log.debug(f"Bad second sync byte: 0x{self._receiveBuffer[1]:02x}, discarding") + self._receiveBuffer.pop(0) + continue + + # Extract packet length from header + packetLength = struct.unpack(">H", bytes(self._receiveBuffer[2:4]))[0] + totalLength = 4 + packetLength # header (4 bytes) + body + + # Check if we have the complete packet + if len(self._receiveBuffer) < totalLength: + # Not enough data yet, wait for more + break + + # Extract complete packet + packet = bytes(self._receiveBuffer[:totalLength]) + self._receiveBuffer = self._receiveBuffer[totalLength:] + + # Process the packet body (skip 4-byte header: SYNC1, SYNC2, length) + try: + self._processPacket(packet[4:]) + except Exception: + log.error("Error processing packet", exc_info=True) + + def _processPacket(self, packetBody: bytes): + """Process a complete packet body (after sync bytes and length header). + + :param packetBody: The packet body containing dest, command, sequence, data, and checksum + """ dest, cmdHigh, cmdLow, seqNum, *data, checksum = packetBody data = bytes(data) + + # Verify checksum if checksum != functools.reduce(operator.xor, packetBody[:-1], DP_CHECKSUM_BASE): raise RuntimeError("bad checksum") + + # Parse command cmd = DP_Command(struct.unpack(">H", bytes([cmdHigh, cmdLow]))[0]) - log.debug(f"Received responce {cmd.name}, {dest=}, {seqNum=}, data={bytes(data)}") + log.debug(f"Received response {cmd.name}, {dest=}, {seqNum=}, data={bytes(data)}") + + # Route to appropriate handler if cmd.name.startswith("RSP_"): self._recordCommandResponse(cmd, data, dest, seqNum) elif cmd.name.startswith("NTF_"): @@ -224,45 +316,55 @@ def _handleNotification(self, cmd: DP_Command, data: bytes, dest: int = 0, seqNu pass def __init__(self, port: str = "auto"): - if port == "auto": - # Try autodetection - for portType, portId, port, portInfo in self._getTryPorts(port): - if self._tryConnect(port): - break - else: - raise RuntimeError("No DotPad device found") + # _getTryPorts handles all port types: "auto", "ble:DeviceName@Address", COM ports, etc. + # It yields DeviceMatch objects with type, id, port, and deviceInfo fields + for match in self._getTryPorts(port): + if self._tryConnect(match.port, match.type, match.deviceInfo): + break else: - # Direct port connection - if not self._tryConnect(port): - raise RuntimeError(f"Could not connect to DotPad on port {port}") + raise RuntimeError("No DotPad device found") super().__init__() - def _tryConnect(self, port: str) -> bool: + def _tryConnect(self, port: str, portType: str, portInfo: dict) -> bool: """Try to connect to a DotPad device on the given port. - Attempts to open a serial connection to the specified port and verifies that - the connected device is a DotPad. Updates internal state with device model, - board information, and braille destination if successful. - - Side effects: - - Sets self._dev to the opened serial device on success. - - Sets self.model and self._boardInformation based on the connected device. - - Sets self._brailleDestination depending on device features. - - Closes self._dev and resets related attributes on failure. - - :param port: The port to connect to. + :param port: The port to connect to (COM port or BLE address). + :param portType: The protocol type (from bdDetect.ProtocolType). + :param portInfo: Additional port information dictionary. :return: True if connection successful, False otherwise. """ try: - self._dev = hwIo.Serial( - port=port, - baudrate=self.SERIAL_BAUD_RATE, - parity=self.SERIAL_PARITY, - timeout=self.timeout, - writeTimeout=self.timeout, - onReceive=self._onReceive, - ) + if portType == bdDetect.ProtocolType.BLE: + address = portInfo.get("address") or port + + # Try to get BLEDevice from scanner first (preferred - avoids implicit discovery) + device = hwIo.ble.findDeviceByAddress(address) + + if device is None: + # Fallback: Use address string directly + # Note: This triggers implicit discovery in Bleak, but ensures connection succeeds + log.debug(f"BLE device {address} not in scan results, using address for connection") + device = address # Pass string address to Ble class + + self._dev = hwIo.ble.Ble( + device=device, # Can be BLEDevice or str + writeServiceUuid=BLE_SERVICE_UUID, + writeCharacteristicUuid=BLE_WRITE_CHARACTERISTIC_UUID, + readServiceUuid=BLE_SERVICE_UUID, + readCharacteristicUuid=BLE_READ_CHARACTERISTIC_UUID, + onReceive=self._onReceive, + ) + else: + self._dev = hwIo.Serial( + port=port, + baudrate=self.SERIAL_BAUD_RATE, + parity=self.SERIAL_PARITY, + timeout=self.timeout, + writeTimeout=self.timeout, + onReceive=self._onReceive, + ) + # Verify this is actually a DotPad device self.model = self._requestDeviceName() self._boardInformation = self._requestBoardInformation() @@ -275,6 +377,7 @@ def _tryConnect(self, port: str) -> bool: return True except Exception: # Clean up on failure + log.debugWarning("Failed to connect", exc_info=True) try: self._dev.close() except Exception: diff --git a/source/core.py b/source/core.py index 40c20289e7b..d9d21685c23 100644 --- a/source/core.py +++ b/source/core.py @@ -757,6 +757,11 @@ def main(): log.debug("Initializing appModule Handler") appModuleHandler.initialize() + import asyncioEventLoop + + log.debug("Initializing asyncio event loop") + asyncioEventLoop.initialize() + log.debug("initializing background i/o") import hwIo @@ -1096,6 +1101,7 @@ def _doPostNvdaStartupAction(): _terminate(characterProcessing) _terminate(bdDetect) _terminate(hwIo) + _terminate(asyncioEventLoop, name="asyncio event loop") _terminate(addonHandler) _terminate(dataManager, name="addon dataManager") _terminate(garbageHandler) diff --git a/source/gui/settingsDialogs.py b/source/gui/settingsDialogs.py index b58ddb7da5c..89608aa58ee 100644 --- a/source/gui/settingsDialogs.py +++ b/source/gui/settingsDialogs.py @@ -4606,6 +4606,16 @@ def makeSettings(self, settingsSizer): self.updateStateDependentControls() def postInit(self): + # Start BLE scanner if not already running to populate device list + import hwIo.ble + + if not hwIo.ble.scanner.isScanning: + try: + hwIo.ble.scanner.start() # Starts in background mode + log.debug("Started BLE scanner for braille display selection") + except Exception: + log.error("Failed to start BLE scanner", exc_info=True) + # Finally, ensure that focus is on the list of displays. self.displayList.SetFocus() @@ -4669,10 +4679,20 @@ def updateStateDependentControls(self): # Display name not in config or port not valid selection = 0 self.portsList.SetSelection(selection) - # If no port selection is possible or only automatic selection is available, disable the port selection control - enable = len(self.possiblePorts) > 0 and not ( - len(self.possiblePorts) == 1 and self.possiblePorts[0][0] == "auto" - ) + enable = True + else: + # No ports available - show helpful message + # Translators: Message shown when no devices are available for a braille display + noDevicesMessage = _("(No devices found - switch to another display and back to refresh)") + self.portsList.SetItems([noDevicesMessage]) + self.portsList.SetSelection(0) + enable = False + + # Special case: If only "auto" port exists, disable manual selection + # (This means devices are detected but no manual selection is needed) + if len(self.possiblePorts) == 1 and self.possiblePorts[0][0] == "auto": + enable = False + self.portsList.Enable(enable) self.autoDetectList.Enable(isAutoDisplaySelected) @@ -4718,8 +4738,26 @@ def onOk(self, evt): # Hack: we need to update the display in our parent window before closing. # Otherwise, NVDA will report the old display even though the new display is reflected visually. self.Parent.updateCurrentDisplay() + self._stopBleScanner() super(BrailleDisplaySelectionDialog, self).onOk(evt) + def onCancel(self, evt): + """Stop BLE scanner when dialog is cancelled if background detection is not active.""" + self._stopBleScanner() + super().onCancel(evt) + + def _stopBleScanner(self): + """Stop BLE scanner if it's running and background detection is not active.""" + import hwIo.ble + + # Only stop if we're not in automatic detection mode + if hwIo.ble.scanner.isScanning and config.conf["braille"]["display"] != braille.AUTO_DISPLAY_NAME: + try: + hwIo.ble.scanner.stop() + log.debug("Stopped BLE scanner after braille display selection dialog closed") + except Exception: + log.error("Failed to stop BLE scanner", exc_info=True) + class BrailleSettingsSubPanel(AutoSettingsMixin, SettingsPanel): helpId = "BrailleSettings" diff --git a/source/hwIo/ble/__init__.py b/source/hwIo/ble/__init__.py new file mode 100644 index 00000000000..498eb532880 --- /dev/null +++ b/source/hwIo/ble/__init__.py @@ -0,0 +1,84 @@ +# A part of NonVisual Desktop Access (NVDA) +# Copyright (C) 2025 NV Access Limited, Dot Incorporated, Bram Duvigneau +# This file is covered by the GNU General Public License. +# See the file COPYING for more details. + +"""Raw I/O for Bluetooth Low Energy (BLE) devices + +This module provides classes for scanning for BLE devices and communicating with them. +It uses the Bleak library for BLE communication. + +Only use this if you need access to a device that only implements BLE and not Bluetooth Classic. +Bluetooth Classic devices should be paired through Windows' Bluetooth settings and accessed through the related serial/HID device. +""" + +import time +import bleak +from bleak.backends.device import BLEDevice +from logHandler import log + +from ._scanner import Scanner # noqa: F401 +from ._io import Ble # noqa: F401 + +# Module-level singleton scanner for all BLE operations +scanner = Scanner() + + +def findDeviceByAddress(address: str, timeout: float = 5.0, pollInterval: float = 0.1) -> BLEDevice | None: + """Find a BLE device by its address. + + Checks already-discovered devices first, then scans if needed. + + :param address: The BLE device address (MAC address) + :param timeout: Maximum time to scan in seconds (default 5.0) + :param pollInterval: How often to check results in seconds (default 0.1) + :return: The BLE device object if found, None otherwise + """ + log.debug(f"Searching for BLE device with address {address}") + + # Check if device already discovered + for device in scanner.results(): + if device.address == address: + log.debug(f"Found BLE device {address} in existing results") + return device + + # Not found - start scanning if not already running + if not scanner.isScanning: + try: + scanner.start() # Start in background mode + except Exception: + log.error(f"Failed to start BLE scanner while searching for device {address}", exc_info=True) + return None + + try: + startTime = time.time() + while time.time() - startTime < timeout: + time.sleep(pollInterval) + + # Check if device appeared + for device in scanner.results(): + if device.address == address: + elapsed = time.time() - startTime + log.debug(f"Found BLE device {address} after {elapsed:.2f}s") + return device + + # Timeout - device not found + log.debug(f"BLE device {address} not found after {timeout}s timeout") + return None + except Exception: + log.error(f"Error while scanning for BLE device {address}", exc_info=True) + return None + + +def isAvailable() -> bool: + """Check if BLE (Bluetooth Low Energy) is available on this system. + + :return: True if BLE adapter is available and Bleak can scan, False otherwise + """ + try: + # Try to create a scanner - this will fail if no BLE adapter present + bleak.BleakScanner() + return True + except Exception: + log.debugWarning("BLE not available on this system", exc_info=True) + return False diff --git a/source/hwIo/ble/_io.py b/source/hwIo/ble/_io.py new file mode 100644 index 00000000000..9667f1f6f36 --- /dev/null +++ b/source/hwIo/ble/_io.py @@ -0,0 +1,227 @@ +# A part of NonVisual Desktop Access (NVDA) +# Copyright (C) 2025 NV Access Limited, Dot Incorporated, Bram Duvigneau +# This file is covered by the GNU General Public License. +# See the file COPYING for more details. +import time +from itertools import count, takewhile +from queue import Empty, Queue +from threading import Event, Thread +from typing import Callable, Iterator +import weakref + +from asyncioEventLoop import runCoroutineSync +from ..base import _isDebug, IoBase +from ..ioThread import IoThread +from logHandler import log + +import bleak +from bleak.backends.device import BLEDevice +from bleak.backends.characteristic import BleakGATTCharacteristic +from bleak.backends.winrt.client import WinRTClientArgs + +CONNECT_TIMEOUT_SECONDS: int = 2 +WINRT_CLIENT_ARGS = WinRTClientArgs(use_cached_services=True) + + +def queueReader( + queue: Queue[bytes], + onReceive: Callable[[bytes], None], + stopEvent: Event, + ioThread: IoThread, +) -> None: + while True: + try: + if stopEvent.is_set(): + log.debug("Reader thread got stop event") + break + try: + data: bytes = queue.get(timeout=0.2) + except Empty: + continue + + def apc(_x: int = 0): + return onReceive(data) + + ioThread.queueAsApc(apc) + queue.task_done() + except Exception: + log.error("Reader thread got exception", exc_info=True) + + +def sliced(data: bytes, n: int) -> Iterator[bytes]: + """Split data into chunks of size n (last chunk may be smaller).""" + return takewhile(len, (data[i : i + n] for i in count(0, n))) + + +class Ble(IoBase): + """I/O for Bluetooth Low Energy (BLE) devices + + This implementation expects a service/characteristic pair to send raw data to as a BLE command + and receive raw data through a BLE notify on a service/characteristic pair. + """ + + _client: bleak.BleakClient + "The Bleak client to use for BLE communication" + _writeServiceUuid: str + "The service UUID to use for writing data to the peripheral, this should accept BLE commands" + _writeCharacteristicUuid: str + "The characteristic UUID to use for writing data to the peripheral, this should accept BLE commands" + _readServiceUuid: str + "The service UUID to use for reading data from the peripheral, this should generate BLE notifications" + _readCharacteristicUuid: str + """The characteristic UUID to use for reading data from the peripheral, + this should generate BLE notifications""" + _onReceive: Callable[[bytes], None] | None + "The callback to call when data is received" + _queuedData: Queue[bytes | bytearray] + "A queue of received data, this is processed by the onReceive handler" + _readEvent: Event + "An event that is set when data is received" + _readerThread: Thread + "Thread that processes the queue of read data" + _stopReaderEvent: Event + "Event that is set to stop the reader thread" + _ioThreadRef: weakref.ReferenceType[IoThread] + "Reference to the I/O thread" + + def __init__( + self, + device: BLEDevice | str, + writeServiceUuid: str, + writeCharacteristicUuid: str, + readServiceUuid: str, + readCharacteristicUuid: str, + onReceive: Callable[[bytes], None], + ioThread: IoThread | None = None, + ) -> None: + if isinstance(device, str): + # String address provided - Bleak will perform implicit discovery + address = device + log.info(f"Connecting to BLE device at address {address}") + self._client = bleak.BleakClient(address, winrt=WINRT_CLIENT_ARGS) + else: + # BLEDevice object provided (preferred) + log.info(f"Connecting to {device.name} ({device.address})") + self._client = bleak.BleakClient(device, winrt=WINRT_CLIENT_ARGS) + self._writeServiceUuid = writeServiceUuid + self._writeCharacteristicUuid = writeCharacteristicUuid + self._readServiceUuid = readServiceUuid + self._readCharacteristicUuid = readCharacteristicUuid + self._onReceive = onReceive + if ioThread is None: + from .. import bgThread as ioThread + self._ioThreadRef = weakref.ref(ioThread) + self._queuedData = Queue() + self._readEvent = Event() + self._stopReaderEvent = Event() + self._readerThread = Thread( + target=queueReader, + args=(self._queuedData, self._onReceive, self._stopReaderEvent, ioThread), + daemon=True, + ) + self._readerThread.start() + runCoroutineSync(self._initAndConnect(), timeout=CONNECT_TIMEOUT_SECONDS) + self.waitForConnection(CONNECT_TIMEOUT_SECONDS) + + async def _initAndConnect(self) -> None: + await self._client.connect() + # Listen for notifications + await self._client.start_notify(self._readCharacteristicUuid, self._notifyReceive) + + def waitForRead(self, timeout: int | float) -> bool: + """Wait for data to be received from the peripheral.""" + self._readEvent.clear() + return self._readEvent.wait(timeout) + + def write(self, data: bytes): + """Write data to the connected BLE peripheral. + + Data is automatically split into MTU-sized chunks if needed. + + :param data: The data to write to the peripheral. + :raises RuntimeError: If not connected or service/characteristic not found. + """ + if not self._client.is_connected: + raise RuntimeError("Not connected to peripheral") + service = self._client.services.get_service(self._writeServiceUuid) + if not service: + raise RuntimeError(f"Service {self._writeServiceUuid} not found") + characteristic = service.get_characteristic(self._writeCharacteristicUuid) + if not characteristic: + raise RuntimeError(f"Characteristic {self._writeCharacteristicUuid} not found") + if _isDebug(): + log.debug(f"Write: {data!r}") + + # Split the data into chunks that fit within the MTU + for s in sliced(data, characteristic.max_write_without_response_size): + runCoroutineSync( + self._client.write_gatt_char(characteristic, s, response=False), + ) + + def close(self) -> None: + """Disconnect the BLE peripheral and release resources.""" + if _isDebug(): + log.debug("Closing BLE connection") + if self._client.is_connected: + runCoroutineSync(self._client.disconnect()) + self._queuedData.join() + self._stopReaderEvent.set() + self._readerThread.join() + + self._onReceive = None + + def __del__(self): + """Ensure the BLE connection is closed before object destruction.""" + try: + self.close() + except AttributeError: + if _isDebug(): + log.debugWarning("Couldn't delete object gracefully", exc_info=True) + + def isConnected(self) -> bool: + """Check if the BLE peripheral is currently connected.""" + return self._client.is_connected + + def waitForConnection(self, maxWait: int | float): + """Wait for connection and service discovery. + + :param maxWait: Maximum time to wait in seconds. + :raises RuntimeError: If connection not established within maxWait. + """ + numTries = 0 + sleepTime = 0.1 + + while (sleepTime * numTries) < maxWait: + if _isDebug(): + services = [ + ( + s.uuid, + s.description, + ) + for s in self._client.services.services.values() + ] + log.debug( + f"Waiting for connection, {numTries} tries, " + f"is connected {self.isConnected()}, services {services}", + ) + if self._client.is_connected and len(self._client.services.services) > 0: + return + time.sleep(sleepTime) + numTries += 1 + raise RuntimeError("Connection timed out") + + def _notifyReceive(self, _char: BleakGATTCharacteristic, data: bytearray): + if _isDebug(): + log.debug(f"Read: {data!r}") + self._readEvent.set() + self._queuedData.put(data) + + def read(self, num_bytes: int = 1) -> bytes: + """Not implemented for BLE. + + BLE communication uses a push model with notifications rather than polling reads. + Data is received asynchronously via the onReceive callback provided during initialization. + + :raises NotImplementedError: Always, as BLE doesn't support synchronous reads + """ + raise NotImplementedError("BLE uses notification-based communication, not polling reads") diff --git a/source/hwIo/ble/_scanner.py b/source/hwIo/ble/_scanner.py new file mode 100644 index 00000000000..5afa808ab5d --- /dev/null +++ b/source/hwIo/ble/_scanner.py @@ -0,0 +1,83 @@ +import time +from threading import Event +from typing import Callable + +from asyncioEventLoop import runCoroutine +import extensionPoints +from logHandler import log + +import bleak +from bleak.backends.device import BLEDevice +from bleak.backends.scanner import AdvertisementData + + +class Scanner: + """Scan for BLE devices + + This is a small synchronous wrapper around Bleak's Scanner. + It allows starting and stopping scans, retrieving results, and checking if scanning is active. + """ + + _scanner: bleak.BleakScanner + _discoveredDevices: dict[str, BLEDevice] + _isScanning: Event + + def __init__(self): + self._discoveredDevices = {} + self._scanner = bleak.BleakScanner(self._onDeviceAdvertised) + self._isScanning = Event() + #: Action called when a BLE device is discovered or re-advertises. + #: Handlers receive: device (BLEDevice), advertisementData (AdvertisementData), isNew (bool) + self.deviceDiscovered = extensionPoints.Action() + + def _onDeviceAdvertised(self, device: BLEDevice, adv: AdvertisementData) -> None: + # Check if this is a new device before updating the dict + isNew = device.address not in self._discoveredDevices + + # Store all devices, even those without a local_name + # Devices without names can still be found by address in findDeviceByAddress() + self._discoveredDevices[device.address] = device + + # Notify extension point handlers + self.deviceDiscovered.notify(device=device, advertisementData=adv, isNew=isNew) + + if isNew: + log.debug(f"Discovered BLE device: {device.name or device.address}") + + def start(self, duration: float = 0): + """Start scanning for BLE devices. + + :param duration: If 0 (default), scan continues in background until stop() is called. + If > 0, scan for specified duration in seconds then stop automatically. + """ + log.debug("Scanning for devices") + # Clear device cache only on first start to allow multiple callers to share results + if not self._isScanning.is_set(): + self._discoveredDevices.clear() + self._isScanning.set() + runCoroutine(self._scanner.start()) + if duration > 0: + time.sleep(duration) + runCoroutine(self._scanner.stop()) + self._isScanning.clear() + + def stop(self): + """Stop scanning""" + runCoroutine(self._scanner.stop()) + self._isScanning.clear() + + def results(self, filterFunc: Callable[[BLEDevice], bool] | None = None) -> list[BLEDevice]: + """Get the discovered BLE devices. + + :param filterFunc: Optional filter function to select specific devices. + :return: List of BLE devices found during the scan, optionally filtered. + """ + results = list(self._discoveredDevices.values()) + if filterFunc: + results = [device for device in results if filterFunc(device)] + return results + + @property + def isScanning(self) -> bool: + """Check if scanning is currently active""" + return self._isScanning.is_set() diff --git a/tests/unit/brailleDisplayDrivers/test_dotPad.py b/tests/unit/brailleDisplayDrivers/test_dotPad.py new file mode 100644 index 00000000000..e592f00bcee --- /dev/null +++ b/tests/unit/brailleDisplayDrivers/test_dotPad.py @@ -0,0 +1,211 @@ +# A part of NonVisual Desktop Access (NVDA) +# This file is covered by the GNU General Public License. +# See the file COPYING for more details. +# Copyright (C) 2025 NV Access Limited, Dot Incorporated, Bram Duvigneau + +"""Unit tests for the dotPad braille display driver.""" + +import unittest +from unittest.mock import MagicMock +import struct +import functools +import operator + + +class TestDotPadBufferedReceive(unittest.TestCase): + """Tests for the buffered receive logic in the DotPad driver.""" + + def setUp(self): + """Set up test fixtures.""" + # Import after patching to avoid hardware dependencies + from brailleDisplayDrivers.dotPad.driver import BrailleDisplayDriver + from brailleDisplayDrivers.dotPad.defs import ( + DP_Command, + DP_PacketSyncByte, + DP_CHECKSUM_BASE, + ) + + self.BrailleDisplayDriver = BrailleDisplayDriver + self.DP_Command = DP_Command + self.DP_PacketSyncByte = DP_PacketSyncByte + self.DP_CHECKSUM_BASE = DP_CHECKSUM_BASE + + # Create a minimal driver instance for testing receive logic + self.driver = MagicMock(spec=BrailleDisplayDriver) + self.driver._receiveBuffer = bytearray() + self.driver.MAX_PACKET_SIZE = 512 + self.driver._lastResponse = {} + + # Track processed packets + self.processedPackets = [] + + def mockProcessPacket(packetBody): + self.processedPackets.append(bytes(packetBody)) + + self.driver._processPacket = mockProcessPacket + + # Bind the actual _onReceive method + self.driver._onReceive = BrailleDisplayDriver._onReceive.__get__(self.driver, type(self.driver)) + + def _createPacket(self, dest=0, cmd=0x0101, seqNum=0, data=b""): + """Helper to create a valid DotPad packet. + + :param dest: Destination address + :param cmd: Command code + :param seqNum: Sequence number + :param data: Packet data payload + :return: Complete packet as bytes + """ + # Build packet body + packetBody = bytearray([dest]) + packetBody.extend(struct.pack(">H", cmd)) + packetBody.append(seqNum) + packetBody.extend(data) + + # Calculate checksum + checksum = functools.reduce(operator.xor, packetBody, self.DP_CHECKSUM_BASE) + packetBody.append(checksum) + + # Add header + packet = bytearray( + [ + self.DP_PacketSyncByte.SYNC1, + self.DP_PacketSyncByte.SYNC2, + ], + ) + packet.extend(struct.pack(">H", len(packetBody))) + packet.extend(packetBody) + + return bytes(packet) + + def test_completePacketAtOnce(self): + """Test receiving a complete packet in a single call (BLE behavior).""" + packet = self._createPacket(dest=0, cmd=0x0101, seqNum=1, data=b"test") + + # Receive entire packet at once + self.driver._onReceive(packet) + + # Verify packet was processed + self.assertEqual(len(self.processedPackets), 1) + # Verify buffer is empty after processing + self.assertEqual(len(self.driver._receiveBuffer), 0) + + def test_byteAtATime(self): + """Test receiving a packet one byte at a time (Serial behavior).""" + packet = self._createPacket(dest=0, cmd=0x0101, seqNum=1, data=b"AB") + + # Send packet byte by byte + for byte in packet: + self.driver._onReceive(bytes([byte])) + + # Verify packet was processed after all bytes arrived + self.assertEqual(len(self.processedPackets), 1) + # Verify buffer is empty + self.assertEqual(len(self.driver._receiveBuffer), 0) + + def test_partialPacket(self): + """Test receiving a packet in multiple chunks.""" + packet = self._createPacket(dest=0, cmd=0x0101, seqNum=1, data=b"test data") + + # Split packet in the middle + chunk1 = packet[: len(packet) // 2] + chunk2 = packet[len(packet) // 2 :] + + # Send first chunk + self.driver._onReceive(chunk1) + # No packet processed yet + self.assertEqual(len(self.processedPackets), 0) + # Buffer should contain first chunk + self.assertGreater(len(self.driver._receiveBuffer), 0) + + # Send second chunk + self.driver._onReceive(chunk2) + # Now packet should be processed + self.assertEqual(len(self.processedPackets), 1) + # Buffer should be empty + self.assertEqual(len(self.driver._receiveBuffer), 0) + + def test_multiplePacketsAtOnce(self): + """Test receiving multiple complete packets in a single call.""" + packet1 = self._createPacket(dest=0, cmd=0x0101, seqNum=1, data=b"A") + packet2 = self._createPacket(dest=0, cmd=0x0102, seqNum=2, data=b"B") + packet3 = self._createPacket(dest=0, cmd=0x0103, seqNum=3, data=b"C") + + # Concatenate all packets + allPackets = packet1 + packet2 + packet3 + + # Receive all at once + self.driver._onReceive(allPackets) + + # Verify all three packets were processed + self.assertEqual(len(self.processedPackets), 3) + # Buffer should be empty + self.assertEqual(len(self.driver._receiveBuffer), 0) + + def test_badSyncByte_resynchronize(self): + """Test that bad sync bytes are discarded and driver resynchronizes.""" + # Bad data followed by valid packet + badData = b"\x00\x11\x22" + goodPacket = self._createPacket(dest=0, cmd=0x0101, seqNum=1, data=b"OK") + + # Receive bad data + good packet + self.driver._onReceive(badData + goodPacket) + + # Only the good packet should be processed + self.assertEqual(len(self.processedPackets), 1) + # Buffer should be empty + self.assertEqual(len(self.driver._receiveBuffer), 0) + + def test_bufferOverflow_cleared(self): + """Test that buffer is cleared when it exceeds MAX_PACKET_SIZE.""" + # Create garbage data exceeding limit + garbageData = b"\xff" * (self.driver.MAX_PACKET_SIZE + 10) + + # Send garbage + self.driver._onReceive(garbageData) + + # No packets should be processed + self.assertEqual(len(self.processedPackets), 0) + # Buffer should be cleared (safety mechanism) + self.assertEqual(len(self.driver._receiveBuffer), 0) + + def test_incompletePacketInBuffer(self): + """Test that incomplete packet stays in buffer.""" + packet = self._createPacket(dest=0, cmd=0x0101, seqNum=1, data=b"test") + + # Send only first 6 bytes (header is 4 bytes, need more for complete packet) + partialData = packet[:6] + self.driver._onReceive(partialData) + + # No packets processed yet + self.assertEqual(len(self.processedPackets), 0) + # Buffer should contain the partial data + self.assertEqual(len(self.driver._receiveBuffer), 6) + self.assertEqual(bytes(self.driver._receiveBuffer), partialData) + + def test_emptyData(self): + """Test receiving empty data doesn't cause errors.""" + self.driver._onReceive(b"") + + # Nothing processed + self.assertEqual(len(self.processedPackets), 0) + # Buffer remains empty + self.assertEqual(len(self.driver._receiveBuffer), 0) + + def test_partialHeaderOnly(self): + """Test receiving only partial header (less than 4 bytes).""" + # Send only 3 bytes (need 4 for complete header) + self.driver._onReceive( + bytes( + [ + self.DP_PacketSyncByte.SYNC1, + self.DP_PacketSyncByte.SYNC2, + 0x00, + ], + ), + ) + + # No packets processed + self.assertEqual(len(self.processedPackets), 0) + # Buffer contains the 3 bytes + self.assertEqual(len(self.driver._receiveBuffer), 3) diff --git a/tests/unit/test_asyncioEventLoop.py b/tests/unit/test_asyncioEventLoop.py new file mode 100644 index 00000000000..f3bc09f71c1 --- /dev/null +++ b/tests/unit/test_asyncioEventLoop.py @@ -0,0 +1,95 @@ +# A part of NonVisual Desktop Access (NVDA) +# Copyright (C) 2025 NV Access Limited, Bram Duvigneau, Dot Incorporated +# This file is covered by the GNU General Public License. +# See the file COPYING for more details. + +"""Unit tests for asyncioEventLoop module.""" + +import asyncio +import unittest + +import asyncioEventLoop + + +class TestRunCoroutineSync(unittest.TestCase): + """Tests for runCoroutineSync function.""" + + @classmethod + def setUpClass(cls): + """Initialize the asyncio event loop before tests.""" + asyncioEventLoop.initialize() + + @classmethod + def tearDownClass(cls): + """Terminate the asyncio event loop after tests.""" + asyncioEventLoop.terminate() + + def test_returnsResult(self): + """Test that runCoroutineSync returns the coroutine's result.""" + + async def simpleCoroutine(): + return 42 + + result = asyncioEventLoop.runCoroutineSync(simpleCoroutine()) + self.assertEqual(result, 42) + + def test_returnsComplexResult(self): + """Test that runCoroutineSync returns complex objects.""" + + async def complexCoroutine(): + await asyncio.sleep(0.01) + return {"key": "value", "number": 123} + + result = asyncioEventLoop.runCoroutineSync(complexCoroutine()) + self.assertEqual(result, {"key": "value", "number": 123}) + + def test_raisesException(self): + """Test that runCoroutineSync raises exceptions from the coroutine.""" + + async def failingCoroutine(): + await asyncio.sleep(0.01) + raise ValueError("Test error message") + + with self.assertRaises(ValueError) as cm: + asyncioEventLoop.runCoroutineSync(failingCoroutine()) + self.assertEqual(str(cm.exception), "Test error message") + + def test_timeoutRaisesTimeoutError(self): + """Test that runCoroutineSync raises TimeoutError when timeout is exceeded.""" + + async def slowCoroutine(): + await asyncio.sleep(10) + return "Should not reach here" + + with self.assertRaises(TimeoutError) as cm: + asyncioEventLoop.runCoroutineSync(slowCoroutine(), timeout=0.1) + self.assertIn("timed out", str(cm.exception).lower()) + + def test_noTimeoutWaitsIndefinitely(self): + """Test that runCoroutineSync waits indefinitely when no timeout is specified.""" + + async def delayedCoroutine(): + await asyncio.sleep(0.1) + return "completed" + + # This should complete successfully even though it takes some time + result = asyncioEventLoop.runCoroutineSync(delayedCoroutine()) + self.assertEqual(result, "completed") + + def test_raisesRuntimeErrorWhenEventLoopNotRunning(self): + """Test that runCoroutineSync raises RuntimeError when event loop is not running.""" + # Save original thread reference + originalThread = asyncioEventLoop.asyncioThread + + # Temporarily set to None to simulate not running + asyncioEventLoop.asyncioThread = None + + async def anyCoroutine(): + return "test" + + with self.assertRaises(RuntimeError) as cm: + asyncioEventLoop.runCoroutineSync(anyCoroutine()) + self.assertIn("not running", str(cm.exception).lower()) + + # Restore original thread + asyncioEventLoop.asyncioThread = originalThread diff --git a/tests/unit/test_bdDetect.py b/tests/unit/test_bdDetect.py index 478f7b6a15e..6c7d9d7f31c 100644 --- a/tests/unit/test_bdDetect.py +++ b/tests/unit/test_bdDetect.py @@ -16,7 +16,7 @@ class TestBdDetectExtensionPoints(unittest.TestCase): """A test for the extension points on the bdDetect module.""" def test_scanForDevices(self): - kwargs = dict(usb=False, bluetooth=False, limitToDevices=["noBraille"]) + kwargs = dict(usb=False, bluetooth=False, ble=False, limitToDevices=["noBraille"]) with chainTester( self, bdDetect.scanForDevices, @@ -99,3 +99,55 @@ def matchFunc(match: bdDetect.DeviceMatch) -> bool: registrar.addBluetoothDevices(matchFunc) self.assertEqual(registrar._getDriverDict().get(bdDetect.CommunicationType.BLUETOOTH), matchFunc) + + def test_addBleDevices(self): + """Test adding a BLE match function.""" + from brailleDisplayDrivers import dotPad + + registrar = bdDetect.DriverRegistrar(dotPad.BrailleDisplayDriver.name) + + def matchFunc(match: bdDetect.DeviceMatch) -> bool: + return match.id.startswith("DotPad") + + registrar.addBleDevices(matchFunc) + + # Verify the match function was stored + stored_match_func = registrar._getDriverDict().get(bdDetect.CommunicationType.BLE) + self.assertEqual(stored_match_func, matchFunc) + + # Verify it's callable + self.assertTrue(callable(stored_match_func)) + + def test_bleDeviceMatching(self): + """Test that BLE device matching works correctly.""" + from brailleDisplayDrivers import dotPad + + registrar = bdDetect.DriverRegistrar(dotPad.BrailleDisplayDriver.name) + + # Register the dotPad BLE match function + registrar.addBleDevices(dotPad.BrailleDisplayDriver._isBleDotPad) + + # Create a matching DeviceMatch (DotPad device) + matching_device = bdDetect.DeviceMatch( + type=bdDetect.ProtocolType.BLE, + id="DotPad320", + port="AA:BB:CC:DD:EE:FF", + deviceInfo={"name": "DotPad320", "address": "AA:BB:CC:DD:EE:FF"}, + ) + + # Create a non-matching DeviceMatch + non_matching_device = bdDetect.DeviceMatch( + type=bdDetect.ProtocolType.BLE, + id="SomeOtherDevice", + port="11:22:33:44:55:66", + deviceInfo={"name": "SomeOtherDevice", "address": "11:22:33:44:55:66"}, + ) + + # Get the match function + match_func = registrar._getDriverDict().get(bdDetect.CommunicationType.BLE) + + # Test matching device + self.assertTrue(match_func(matching_device)) + + # Test non-matching device + self.assertFalse(match_func(non_matching_device)) diff --git a/tests/unit/test_hwIo_ble.py b/tests/unit/test_hwIo_ble.py new file mode 100644 index 00000000000..4ebcfaed026 --- /dev/null +++ b/tests/unit/test_hwIo_ble.py @@ -0,0 +1,474 @@ +# A part of NonVisual Desktop Access (NVDA) +# This file is covered by the GNU General Public License. +# See the file COPYING for more details. +# Copyright (C) 2025 NV Access Limited, Dot Incorporated, Bram Duvigneau + +"""Unit tests for the hwIo.ble module.""" + +import unittest +from unittest.mock import AsyncMock, MagicMock, patch +from bleak.backends.device import BLEDevice +from bleak.backends.scanner import AdvertisementData + + +class TestScanner(unittest.TestCase): + """Tests for hwIo.ble.Scanner""" + + def setUp(self): + """Set up patches and create Scanner instance.""" + # Patch bleak.BleakScanner before importing + self.bleakScannerPatcher = patch("hwIo.ble._scanner.bleak.BleakScanner") + self.mockBleakScannerClass = self.bleakScannerPatcher.start() + + # Patch runCoroutine to avoid async event loop issues in tests + self.runCoroutinePatcher = patch("hwIo.ble._scanner.runCoroutine") + self.mockRunCoroutine = self.runCoroutinePatcher.start() + # Make runCoroutine return a completed future + mockFuture = MagicMock() + mockFuture.result.return_value = None + mockFuture.exception.return_value = None + self.mockRunCoroutine.return_value = mockFuture + + # Create mock scanner instance + self.mockScannerInstance = MagicMock() + self.mockScannerInstance.start = AsyncMock() + self.mockScannerInstance.stop = AsyncMock() + self.mockBleakScannerClass.return_value = self.mockScannerInstance + + # Import Scanner after patching + from hwIo.ble._scanner import Scanner + + self.Scanner = Scanner + + def tearDown(self): + """Clean up patches.""" + self.runCoroutinePatcher.stop() + self.bleakScannerPatcher.stop() + + def test_startScanning(self): + """Test that starting scan calls Bleak scanner and sets isScanning flag.""" + scanner = self.Scanner() + + # Initially not scanning + self.assertFalse(scanner.isScanning) + + # Start scanning in background mode + scanner.start(duration=0) + + # Verify Bleak scanner was started + self.mockScannerInstance.start.assert_called_once() + + # Verify isScanning flag is set + self.assertTrue(scanner.isScanning) + + def test_stopScanning(self): + """Test that stopping scan calls Bleak scanner and clears isScanning flag.""" + scanner = self.Scanner() + scanner.start(duration=0) + + # Verify scanning + self.assertTrue(scanner.isScanning) + + # Stop scanning + scanner.stop() + + # Verify Bleak scanner was stopped + self.mockScannerInstance.stop.assert_called_once() + + # Verify isScanning flag is cleared + self.assertFalse(scanner.isScanning) + + def test_deviceDiscoveredExtensionPoint(self): + """Test that deviceDiscovered extension point fires when device is advertised.""" + scanner = self.Scanner() + + # Track handler calls + handlerCalls = [] + + def testHandler(device, advertisementData, isNew): + handlerCalls.append( + { + "device": device, + "advertisementData": advertisementData, + "isNew": isNew, + }, + ) + + # Register handler - keep reference to prevent weak ref cleanup + scanner.deviceDiscovered.register(testHandler) + + # Create fake device and advertisement data + fakeDevice = MagicMock(spec=BLEDevice) + fakeDevice.address = "AA:BB:CC:DD:EE:FF" + fakeDevice.name = "TestDevice" + fakeAdvData = MagicMock(spec=AdvertisementData) + + # Trigger device discovery + scanner._onDeviceAdvertised(fakeDevice, fakeAdvData) + + # Verify handler was called once + self.assertEqual(len(handlerCalls), 1) + + # Verify handler received correct arguments + self.assertEqual(handlerCalls[0]["device"], fakeDevice) + self.assertEqual(handlerCalls[0]["advertisementData"], fakeAdvData) + self.assertTrue(handlerCalls[0]["isNew"]) + + # Trigger same device again - should not be new + scanner._onDeviceAdvertised(fakeDevice, fakeAdvData) + + # Verify handler was called again + self.assertEqual(len(handlerCalls), 2) + self.assertFalse(handlerCalls[1]["isNew"]) + + def test_deviceTracking(self): + """Test that devices are tracked in internal dict and returned by results().""" + scanner = self.Scanner() + + # Create fake device + fakeDevice = MagicMock(spec=BLEDevice) + fakeDevice.address = "AA:BB:CC:DD:EE:FF" + fakeDevice.name = "TestDevice" + fakeAdvData = MagicMock(spec=AdvertisementData) + + # Initially no devices + self.assertEqual(len(scanner.results()), 0) + + # Trigger device discovery + scanner._onDeviceAdvertised(fakeDevice, fakeAdvData) + + # Verify device is tracked + self.assertIn(fakeDevice.address, scanner._discoveredDevices) + self.assertEqual(scanner._discoveredDevices[fakeDevice.address], fakeDevice) + + # Verify device is in results + results = scanner.results() + self.assertEqual(len(results), 1) + self.assertEqual(results[0], fakeDevice) + + def test_resultsFiltering(self): + """Test that results() filter function works correctly.""" + scanner = self.Scanner() + + # Create multiple fake devices + device1 = MagicMock(spec=BLEDevice) + device1.address = "AA:BB:CC:DD:EE:01" + device1.name = "TestDevice1" + + device2 = MagicMock(spec=BLEDevice) + device2.address = "AA:BB:CC:DD:EE:02" + device2.name = "OtherDevice" + + device3 = MagicMock(spec=BLEDevice) + device3.address = "AA:BB:CC:DD:EE:03" + device3.name = "TestDevice2" + + fakeAdvData = MagicMock(spec=AdvertisementData) + + # Add devices + scanner._onDeviceAdvertised(device1, fakeAdvData) + scanner._onDeviceAdvertised(device2, fakeAdvData) + scanner._onDeviceAdvertised(device3, fakeAdvData) + + # Get all results + allResults = scanner.results() + self.assertEqual(len(allResults), 3) + + # Filter for devices starting with "Test" + filteredResults = scanner.results(filterFunc=lambda d: d.name.startswith("Test")) + self.assertEqual(len(filteredResults), 2) + self.assertIn(device1, filteredResults) + self.assertIn(device3, filteredResults) + self.assertNotIn(device2, filteredResults) + + +class TestBle(unittest.TestCase): + """Tests for hwIo.ble.Ble""" + + def setUp(self): + """Set up patches for Ble testing.""" + # Patch BleakClient + self.bleakClientPatcher = patch("hwIo.ble._io.bleak.BleakClient") + self.mockBleakClientClass = self.bleakClientPatcher.start() + + # Create mock client instance + self.mockClient = MagicMock() + self.mockClient.connect = AsyncMock() + self.mockClient.disconnect = AsyncMock() + self.mockClient.start_notify = AsyncMock() + self.mockClient.write_gatt_char = AsyncMock() + self.mockClient.is_connected = True + self.mockBleakClientClass.return_value = self.mockClient + + # Mock services - needs to have at least one service for waitForConnection + self.mockService = MagicMock() + self.mockCharacteristic = MagicMock() + self.mockCharacteristic.max_write_without_response_size = 20 + self.mockService.get_characteristic.return_value = self.mockCharacteristic + + # Create mock services object + self.mockServices = MagicMock() + self.mockServices.get_service.return_value = self.mockService + # Create a mock services collection + mockServicesDict = MagicMock() + mockServicesDict.__len__ = lambda self: 1 # Non-empty to pass waitForConnection + mockServicesDict.values.return_value = [self.mockService] + self.mockServices.services = mockServicesDict + self.mockClient.services = self.mockServices + + # Patch runCoroutineSync (just returns None, as it's a synchronous wrapper) + self.runCoroutineSyncPatcher = patch("hwIo.ble._io.runCoroutineSync") + self.mockRunCoroutineSync = self.runCoroutineSyncPatcher.start() + self.mockRunCoroutineSync.return_value = None + + # Import Ble after patching + from hwIo.ble._io import Ble + + self.Ble = Ble + + def tearDown(self): + """Clean up patches.""" + self.runCoroutineSyncPatcher.stop() + self.bleakClientPatcher.stop() + + def test_connectionSuccess(self): + """Test that Ble connects successfully and starts notifications.""" + # Create mock device and IoThread + mockDevice = MagicMock(spec=BLEDevice) + mockDevice.address = "AA:BB:CC:DD:EE:FF" + mockDevice.name = "TestDevice" + + mockIoThread = MagicMock() + + # Track onReceive calls + receivedData = [] + + def onReceive(data): + receivedData.append(data) + + # Create Ble instance + ble = self.Ble( + device=mockDevice, + writeServiceUuid="service-uuid", + writeCharacteristicUuid="write-char-uuid", + readServiceUuid="service-uuid", + readCharacteristicUuid="read-char-uuid", + onReceive=onReceive, + ioThread=mockIoThread, + ) + + # Verify BleakClient was created with device + self.mockBleakClientClass.assert_called_once() + callArgs = self.mockBleakClientClass.call_args + self.assertEqual(callArgs[0][0], mockDevice) + + # Verify runCoroutineSync was called for initialization + self.mockRunCoroutineSync.assert_called() + + # Verify the connection is established + self.assertTrue(ble.isConnected()) + + def test_writeData(self): + """Test writing data to BLE characteristic.""" + mockDevice = MagicMock(spec=BLEDevice) + mockDevice.address = "AA:BB:CC:DD:EE:FF" + mockDevice.name = "TestDevice" + mockIoThread = MagicMock() + + ble = self.Ble( + device=mockDevice, + writeServiceUuid="service-uuid", + writeCharacteristicUuid="write-char-uuid", + readServiceUuid="service-uuid", + readCharacteristicUuid="read-char-uuid", + onReceive=lambda data: None, + ioThread=mockIoThread, + ) + + # Write data + testData = b"test data" + ble.write(testData) + + # Verify service and characteristic were retrieved + self.mockServices.get_service.assert_called_with("service-uuid") + self.mockService.get_characteristic.assert_called_with("write-char-uuid") + + # Verify write was called (through runCoroutineSync) + self.assertGreater(self.mockRunCoroutineSync.call_count, 1) # At least init + write + + def test_writeDataChunking(self): + """Test that large data is split into MTU-sized chunks.""" + mockDevice = MagicMock(spec=BLEDevice) + mockDevice.address = "AA:BB:CC:DD:EE:FF" + mockDevice.name = "TestDevice" + mockIoThread = MagicMock() + + # Set MTU size to 10 + self.mockCharacteristic.max_write_without_response_size = 10 + + ble = self.Ble( + device=mockDevice, + writeServiceUuid="service-uuid", + writeCharacteristicUuid="write-char-uuid", + readServiceUuid="service-uuid", + readCharacteristicUuid="read-char-uuid", + onReceive=lambda data: None, + ioThread=mockIoThread, + ) + + # Reset call count after initialization + initialCallCount = self.mockRunCoroutineSync.call_count + + # Write 25 bytes (should split into 3 chunks: 10, 10, 5) + testData = b"A" * 25 + ble.write(testData) + + # Verify runCoroutineSync was called 3 times for writes + writeCalls = self.mockRunCoroutineSync.call_count - initialCallCount + self.assertEqual(writeCalls, 3) + + def test_receiveNotification(self): + """Test receiving data via BLE notification.""" + mockDevice = MagicMock(spec=BLEDevice) + mockDevice.address = "AA:BB:CC:DD:EE:FF" + mockDevice.name = "TestDevice" + mockIoThread = MagicMock() + + # Track received data + receivedData = [] + + def onReceive(data): + receivedData.append(data) + + ble = self.Ble( + device=mockDevice, + writeServiceUuid="service-uuid", + writeCharacteristicUuid="write-char-uuid", + readServiceUuid="service-uuid", + readCharacteristicUuid="read-char-uuid", + onReceive=onReceive, + ioThread=mockIoThread, + ) + + # Verify initialization occurred + self.mockRunCoroutineSync.assert_called() + + # Simulate notification by calling _notifyReceive directly + testData = bytearray(b"notification data") + mockChar = MagicMock() + ble._notifyReceive(mockChar, testData) + + # Verify data was queued + self.assertFalse(ble._queuedData.empty()) + + def test_closeCleanup(self): + """Test that close() properly disconnects and cleans up resources.""" + mockDevice = MagicMock(spec=BLEDevice) + mockDevice.address = "AA:BB:CC:DD:EE:FF" + mockDevice.name = "TestDevice" + mockIoThread = MagicMock() + + ble = self.Ble( + device=mockDevice, + writeServiceUuid="service-uuid", + writeCharacteristicUuid="write-char-uuid", + readServiceUuid="service-uuid", + readCharacteristicUuid="read-char-uuid", + onReceive=lambda data: None, + ioThread=mockIoThread, + ) + + # Close the connection + ble.close() + + # Verify disconnect was called via runCoroutineSync (init + close) + self.assertGreater(self.mockRunCoroutineSync.call_count, 1) + + # Verify onReceive callback was cleared + self.assertIsNone(ble._onReceive) + + +class TestFindDeviceByAddress(unittest.TestCase): + """Tests for hwIo.ble.findDeviceByAddress""" + + def setUp(self): + """Set up patches for findDeviceByAddress testing.""" + # Patch the scanner module-level instance + self.scannerPatcher = patch("hwIo.ble.scanner") + self.mockScanner = self.scannerPatcher.start() + + # Import function after patching + from hwIo.ble import findDeviceByAddress + + self.findDeviceByAddress = findDeviceByAddress + + def tearDown(self): + """Clean up patches.""" + self.scannerPatcher.stop() + + def test_deviceAlreadyInResults(self): + """Test finding device that's already in scanner results.""" + # Create fake device + fakeDevice = MagicMock(spec=BLEDevice) + fakeDevice.address = "AA:BB:CC:DD:EE:FF" + fakeDevice.name = "TestDevice" + + # Mock scanner to return device immediately + self.mockScanner.results.return_value = [fakeDevice] + self.mockScanner.isScanning = False + + # Find device + result = self.findDeviceByAddress("AA:BB:CC:DD:EE:FF") + + # Verify device was found + self.assertEqual(result, fakeDevice) + + # Verify scanner was not started (device already in results) + self.mockScanner.start.assert_not_called() + + def test_deviceNotFound(self): + """Test that None is returned when device is not found after timeout.""" + # Mock scanner to return empty results + self.mockScanner.results.return_value = [] + self.mockScanner.isScanning = False + + # Find device with short timeout + result = self.findDeviceByAddress("AA:BB:CC:DD:EE:FF", timeout=0.1) + + # Verify None was returned + self.assertIsNone(result) + + # Verify scanner was started + self.mockScanner.start.assert_called_once() + + def test_deviceFoundDuringScan(self): + """Test finding device that appears during scanning.""" + # Create fake device + fakeDevice = MagicMock(spec=BLEDevice) + fakeDevice.address = "AA:BB:CC:DD:EE:FF" + fakeDevice.name = "TestDevice" + + # Simulate device appearing after a delay + callCount = 0 + + def mockResults(): + nonlocal callCount + callCount += 1 + if callCount <= 1: + # First call - no devices + return [] + else: + # Subsequent calls - device appears + return [fakeDevice] + + self.mockScanner.results.side_effect = mockResults + self.mockScanner.isScanning = False + + # Find device with reasonable timeout + result = self.findDeviceByAddress("AA:BB:CC:DD:EE:FF", timeout=0.5, pollInterval=0.05) + + # Verify device was found + self.assertEqual(result, fakeDevice) + + # Verify scanner was started + self.mockScanner.start.assert_called_once() diff --git a/user_docs/en/changes.md b/user_docs/en/changes.md index 8528678faff..c3832588253 100644 --- a/user_docs/en/changes.md +++ b/user_docs/en/changes.md @@ -31,6 +31,10 @@ This can be enabled using the "Report when lists support multiple selection" set * VirusTotal scan results are now available in the details for an add-on in the Add-on Store. An action has been added to view the full scan results on the VirusTotal website. (#18974) * In the Add-on Store, a new action has been added to see the latest changes for the current version of add-ons. (#14041, @josephsl, @nvdaes) +* Dot Pad displays can now be connected via Bluetooth Low Energy (BLE) in addition to USB. (#19122, @bramd) + * When automatic detection is enabled, Dot Pad devices will be discovered and connected automatically when in range. + * No Bluetooth pairing in Windows settings is required. + * Bluetooth Low Energy support requires Windows 10 version 1703 (Creators Update) or later. * In browse mode, the number of items in a list is now reported in braille. (#7455, @nvdaes) ### Changes diff --git a/user_docs/en/userGuide.md b/user_docs/en/userGuide.md index 33901054ce5..991967837e5 100644 --- a/user_docs/en/userGuide.md +++ b/user_docs/en/userGuide.md @@ -4342,6 +4342,7 @@ The following displays support this automatic detection functionality. * Nattiq nBraille displays * Seika Notetaker: MiniSeika (16, 24 cells), V6, and V6Pro (40 cells) * Tivomatic Caiku Albatross 46/80 displays +* Dot Pad displays (USB and Bluetooth) * NLS eReader Zoomax * Any Display that supports the Standard HID Braille protocol @@ -5587,11 +5588,28 @@ You can configure whether NVDA displays braille on the dedicated braille display Panning keys are supported, but due to limited buttons on the device, other commands and routing capabilities are currently not available. -The Dot Pad driver supports automatic detection of USB-connected devices. +#### Connecting to Dot Pad {#dotPadConnecting} + +The Dot Pad can be connected via USB or Bluetooth Low Energy (BLE). + +The Dot Pad driver supports automatic detection via both USB and Bluetooth. However, automatic detection is disabled by default due to the device using generic USB identifiers that could conflict with other devices. To enable automatic detection, go to NVDA's Braille settings and check "Dot Pad" in the automatic detection list. -When automatic detection is enabled and a compatible device is detected, NVDA will automatically connect to it. -You can also manually select a specific USB or Bluetooth virtual serial port if needed. + +When automatic detection is enabled: + +* USB connections are detected immediately when plugged in +* Bluetooth connections are discovered and connected automatically when the device is powered on and in range +* No Bluetooth pairing in Windows settings is required for Bluetooth Low Energy connections +* NVDA will automatically connect to the first compatible device found +* Bluetooth devices will appear in the connection list with the prefix "Bluetooth:" followed by the device name (e.g., "Bluetooth: DotPad A320") + +Note: Bluetooth Low Energy support requires Windows 10 version 1703 (Creators Update) or later. + +You can also manually select a specific connection: + +* A USB port (e.g., COM3) +* A Bluetooth device (e.g., "Bluetooth: DotPad A320") Please note that due to hardware limitations, the Dot Pad will not refresh all dots correctly while your hand is on the device. Make sure to lift your hand entirely off the device when navigating with NVDA, and only start reading again once it has fully updated. diff --git a/uv.lock b/uv.lock index ee81edb10c2..811f7ab7f4c 100644 --- a/uv.lock +++ b/uv.lock @@ -50,6 +50,25 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b7/b8/3fe70c75fe32afc4bb507f75563d39bc5642255d1d94f1f23604725780bf/babel-2.17.0-py3-none-any.whl", hash = "sha256:4d0b53093fdfb4b21c92b5213dba5a1b23885afa8383709427046b21c366e5f2", size = 10182537, upload-time = "2025-02-01T15:17:37.39Z" }, ] +[[package]] +name = "bleak" +version = "1.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "winrt-runtime", marker = "sys_platform == 'win32'" }, + { name = "winrt-windows-devices-bluetooth", marker = "sys_platform == 'win32'" }, + { name = "winrt-windows-devices-bluetooth-advertisement", marker = "sys_platform == 'win32'" }, + { name = "winrt-windows-devices-bluetooth-genericattributeprofile", marker = "sys_platform == 'win32'" }, + { name = "winrt-windows-devices-enumeration", marker = "sys_platform == 'win32'" }, + { name = "winrt-windows-foundation", marker = "sys_platform == 'win32'" }, + { name = "winrt-windows-foundation-collections", marker = "sys_platform == 'win32'" }, + { name = "winrt-windows-storage-streams", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c1/84/a7d5056e148b02b7a3398fe122eea5b1585f0439d95958f019867a2ec4b6/bleak-1.1.0.tar.gz", hash = "sha256:0ace59c8cf5a2d8aa66a2493419b59ac6a119c2f72f6e57be8dbdd3f2c0270e0", size = 116100, upload-time = "2025-08-10T22:50:23.129Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fe/7a/fbfffec2f7839fa779a11a3d1d46edcd6cf790c135ff3a2eaa3777906fea/bleak-1.1.0-py3-none-any.whl", hash = "sha256:174e7836e1ab0879860cd24ddd0ac604bd192bcc1acb978892e27359f3f18304", size = 136236, upload-time = "2025-08-10T22:50:21.74Z" }, +] + [[package]] name = "boolean-py" version = "5.0" @@ -528,6 +547,7 @@ wheels = [ name = "nvda" source = { editable = "." } dependencies = [ + { name = "bleak", marker = "sys_platform == 'win32'" }, { name = "comtypes", marker = "sys_platform == 'win32'" }, { name = "configobj", marker = "sys_platform == 'win32'" }, { name = "crowdin-api-client", marker = "sys_platform == 'win32'" }, @@ -582,6 +602,7 @@ unit-tests = [ [package.metadata] requires-dist = [ + { name = "bleak", specifier = "==1.1.0" }, { name = "comtypes", specifier = "==1.4.11" }, { name = "configobj", git = "https://github.com/DiffSK/configobj?rev=8be54629ee7c26acb5c865b74c76284e80f3aa31" }, { name = "crowdin-api-client", specifier = "==1.21.0" }, @@ -1279,6 +1300,118 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e1/07/c6fe3ad3e685340704d314d765b7912993bcb8dc198f0e7a89382d37974b/win32_setctime-1.2.0-py3-none-any.whl", hash = "sha256:95d644c4e708aba81dc3704a116d8cbc974d70b3bdb8be1d150e36be6e9d1390", size = 4083, upload-time = "2024-12-07T15:28:26.465Z" }, ] +[[package]] +name = "winrt-runtime" +version = "3.2.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/16/dd/acdd527c1d890c8f852cc2af644aa6c160974e66631289420aa871b05e65/winrt_runtime-3.2.1.tar.gz", hash = "sha256:c8dca19e12b234ae6c3dadf1a4d0761b51e708457492c13beb666556958801ea", size = 21721, upload-time = "2025-06-06T14:40:27.593Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/79/d4/1a555d8bdcb8b920f8e896232c82901cc0cda6d3e4f92842199ae7dff70a/winrt_runtime-3.2.1-cp313-cp313-win32.whl", hash = "sha256:44e2733bc709b76c554aee6c7fe079443b8306b2e661e82eecfebe8b9d71e4d1", size = 210022, upload-time = "2025-06-06T06:44:11.767Z" }, + { url = "https://files.pythonhosted.org/packages/aa/24/2b6e536ca7745d788dfd17a2ec376fa03a8c7116dc638bb39b035635484f/winrt_runtime-3.2.1-cp313-cp313-win_amd64.whl", hash = "sha256:3c1fdcaeedeb2920dc3b9039db64089a6093cad2be56a3e64acc938849245a6d", size = 241349, upload-time = "2025-06-06T06:44:12.661Z" }, + { url = "https://files.pythonhosted.org/packages/d4/7f/6d72973279e2929b2a71ed94198ad4a5d63ee2936e91a11860bf7b431410/winrt_runtime-3.2.1-cp313-cp313-win_arm64.whl", hash = "sha256:28f3dab083412625ff4d2b46e81246932e6bebddf67bea7f05e01712f54e6159", size = 415126, upload-time = "2025-06-06T06:44:13.702Z" }, +] + +[[package]] +name = "winrt-windows-devices-bluetooth" +version = "3.2.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "winrt-runtime", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b2/a0/1c8a0c469abba7112265c6cb52f0090d08a67c103639aee71fc690e614b8/winrt_windows_devices_bluetooth-3.2.1.tar.gz", hash = "sha256:db496d2d92742006d5a052468fc355bf7bb49e795341d695c374746113d74505", size = 23732, upload-time = "2025-06-06T14:41:20.489Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d4/cc/797516c5c0f8d7f5b680862e0ed7c1087c58aec0bcf57a417fa90f7eb983/winrt_windows_devices_bluetooth-3.2.1-cp313-cp313-win32.whl", hash = "sha256:12b0a16fb36ce0b42243ca81f22a6b53fbb344ed7ea07a6eeec294604f0505e4", size = 105757, upload-time = "2025-06-06T07:00:13.269Z" }, + { url = "https://files.pythonhosted.org/packages/05/6d/f60588846a065e69a2ec5e67c5f85eb45cb7edef2ee8974cd52fa8504de6/winrt_windows_devices_bluetooth-3.2.1-cp313-cp313-win_amd64.whl", hash = "sha256:6703dfbe444ee22426738830fb305c96a728ea9ccce905acfdf811d81045fdb3", size = 113363, upload-time = "2025-06-06T07:00:14.135Z" }, + { url = "https://files.pythonhosted.org/packages/2c/13/2d3c4762018b26a9f66879676ea15d7551cdbf339c8e8e0c56ea05ea31ef/winrt_windows_devices_bluetooth-3.2.1-cp313-cp313-win_arm64.whl", hash = "sha256:2cf8a0bfc9103e32dc7237af15f84be06c791f37711984abdca761f6318bbdb2", size = 104722, upload-time = "2025-06-06T07:00:14.999Z" }, +] + +[[package]] +name = "winrt-windows-devices-bluetooth-advertisement" +version = "3.2.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "winrt-runtime", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/06/fc/7ffe66ca4109b9e994b27c00f3d2d506e6e549e268791f755287ad9106d8/winrt_windows_devices_bluetooth_advertisement-3.2.1.tar.gz", hash = "sha256:0223852a7b7fa5c8dea3c6a93473bd783df4439b1ed938d9871f947933e574cc", size = 16906, upload-time = "2025-06-06T14:41:21.448Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/34/01/8fc8e57605ea08dd0723c035ed0c2d0435dace2bc80a66d33aecfea49a56/winrt_windows_devices_bluetooth_advertisement-3.2.1-cp313-cp313-win32.whl", hash = "sha256:4122348ea525a914e85615647a0b54ae8b2f42f92cdbf89c5a12eea53ef6ed90", size = 90037, upload-time = "2025-06-06T07:00:25.818Z" }, + { url = "https://files.pythonhosted.org/packages/86/83/503cf815d84c5ba8c8bc61480f32e55579ebf76630163405f7df39aa297b/winrt_windows_devices_bluetooth_advertisement-3.2.1-cp313-cp313-win_amd64.whl", hash = "sha256:b66410c04b8dae634a7e4b615c3b7f8adda9c7d4d6902bcad5b253da1a684943", size = 95822, upload-time = "2025-06-06T07:00:26.666Z" }, + { url = "https://files.pythonhosted.org/packages/32/13/052be8b6642e6f509b30c194312b37bfee8b6b60ac3bd5ca2968c3ea5b80/winrt_windows_devices_bluetooth_advertisement-3.2.1-cp313-cp313-win_arm64.whl", hash = "sha256:07af19b1d252ddb9dd3eb2965118bc2b7cabff4dda6e499341b765e5038ca61d", size = 89326, upload-time = "2025-06-06T07:00:27.477Z" }, +] + +[[package]] +name = "winrt-windows-devices-bluetooth-genericattributeprofile" +version = "3.2.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "winrt-runtime", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/44/21/aeeddc0eccdfbd25e543360b5cc093233e2eab3cdfb53ad3cabae1b5d04d/winrt_windows_devices_bluetooth_genericattributeprofile-3.2.1.tar.gz", hash = "sha256:cdf6ddc375e9150d040aca67f5a17c41ceaf13a63f3668f96608bc1d045dde71", size = 38896, upload-time = "2025-06-06T14:41:22.687Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ec/93/30b45ce473d1a604908221a1fa035fe8d5e4bb9008e820ae671a21dab94c/winrt_windows_devices_bluetooth_genericattributeprofile-3.2.1-cp313-cp313-win32.whl", hash = "sha256:b1879c8dcf46bd2110b9ad4b0b185f4e2a5f95170d014539203a5fee2b2115f0", size = 183342, upload-time = "2025-06-06T07:00:56.16Z" }, + { url = "https://files.pythonhosted.org/packages/5b/3b/eb9d99b82a36002d7885206d00ea34f4a23db69c16c94816434ded728fa3/winrt_windows_devices_bluetooth_genericattributeprofile-3.2.1-cp313-cp313-win_amd64.whl", hash = "sha256:8d8d89f01e9b6931fb48217847caac3227a0aeb38a5b7782af71c2e7b262ec30", size = 187844, upload-time = "2025-06-06T07:00:57.134Z" }, + { url = "https://files.pythonhosted.org/packages/84/9b/ebbbe9be9a3e640dcfc5f166eb48f2f9d8ce42553f83aa9f4c5dcd9eb5f5/winrt_windows_devices_bluetooth_genericattributeprofile-3.2.1-cp313-cp313-win_arm64.whl", hash = "sha256:4e71207bb89798016b1795bb15daf78afe45529f2939b3b9e78894cfe650b383", size = 184540, upload-time = "2025-06-06T07:00:58.081Z" }, +] + +[[package]] +name = "winrt-windows-devices-enumeration" +version = "3.2.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "winrt-runtime", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/9e/dd/75835bfbd063dffa152109727dedbd80f6e92ea284855f7855d48cdf31c9/winrt_windows_devices_enumeration-3.2.1.tar.gz", hash = "sha256:df316899e39bfc0ffc1f3cb0f5ee54d04e1d167fbbcc1484d2d5121449a935cf", size = 23538, upload-time = "2025-06-06T14:41:26.787Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ff/7d/ebd712ab8ccd599c593796fbcd606abe22b5a8e20db134aa87987d67ac0e/winrt_windows_devices_enumeration-3.2.1-cp313-cp313-win32.whl", hash = "sha256:14a71cdcc84f624c209cbb846ed6bd9767a9a9437b2bf26b48ac9a91599da6e9", size = 130276, upload-time = "2025-06-06T07:02:05.178Z" }, + { url = "https://files.pythonhosted.org/packages/70/de/f30daaaa0e6f4edb6bd7ddb3e058bd453c9ad90c032a4545c4d4639338aa/winrt_windows_devices_enumeration-3.2.1-cp313-cp313-win_amd64.whl", hash = "sha256:6ca40d334734829e178ad46375275c4f7b5d6d2d4fc2e8879690452cbfb36015", size = 141536, upload-time = "2025-06-06T07:02:06.067Z" }, + { url = "https://files.pythonhosted.org/packages/75/4b/9a6aafdc74a085c550641a325be463bf4b811f6f605766c9cd4f4b5c19d2/winrt_windows_devices_enumeration-3.2.1-cp313-cp313-win_arm64.whl", hash = "sha256:2d14d187f43e4409c7814b7d1693c03a270e77489b710d92fcbbaeca5de260d4", size = 135362, upload-time = "2025-06-06T07:02:06.997Z" }, +] + +[[package]] +name = "winrt-windows-foundation" +version = "3.2.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "winrt-runtime", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/0c/55/098ce7ea0679efcc1298b269c48768f010b6c68f90c588f654ec874c8a74/winrt_windows_foundation-3.2.1.tar.gz", hash = "sha256:ad2f1fcaa6c34672df45527d7c533731fdf65b67c4638c2b4aca949f6eec0656", size = 30485, upload-time = "2025-06-06T14:41:53.344Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7b/71/5e87131e4aecc8546c76b9e190bfe4e1292d028bda3f9dd03b005d19c76c/winrt_windows_foundation-3.2.1-cp313-cp313-win32.whl", hash = "sha256:3998dc58ed50ecbdbabace1cdef3a12920b725e32a5806d648ad3f4829d5ba46", size = 112184, upload-time = "2025-06-06T07:11:04.459Z" }, + { url = "https://files.pythonhosted.org/packages/ba/7f/8d5108461351d4f6017f550af8874e90c14007f9122fa2eab9f9e0e9b4e1/winrt_windows_foundation-3.2.1-cp313-cp313-win_amd64.whl", hash = "sha256:6e98617c1e46665c7a56ce3f5d28e252798416d1ebfee3201267a644a4e3c479", size = 118672, upload-time = "2025-06-06T07:11:05.55Z" }, + { url = "https://files.pythonhosted.org/packages/44/f5/2edf70922a3d03500dab17121b90d368979bd30016f6dbca0d043f0c71f1/winrt_windows_foundation-3.2.1-cp313-cp313-win_arm64.whl", hash = "sha256:2a8c1204db5c352f6a563130a5a41d25b887aff7897bb677d4ff0b660315aad4", size = 109673, upload-time = "2025-06-06T07:11:06.398Z" }, +] + +[[package]] +name = "winrt-windows-foundation-collections" +version = "3.2.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "winrt-runtime", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ef/62/d21e3f1eeb8d47077887bbf0c3882c49277a84d8f98f7c12bda64d498a07/winrt_windows_foundation_collections-3.2.1.tar.gz", hash = "sha256:0eff1ad0d8d763ad17e9e7bbd0c26a62b27215016393c05b09b046d6503ae6d5", size = 16043, upload-time = "2025-06-06T14:41:53.983Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a6/cd/99ef050d80bea2922fa1ded93e5c250732634095d8bd3595dd808083e5ca/winrt_windows_foundation_collections-3.2.1-cp313-cp313-win32.whl", hash = "sha256:4267a711b63476d36d39227883aeb3fb19ac92b88a9fc9973e66fbce1fd4aed9", size = 60063, upload-time = "2025-06-06T07:11:18.65Z" }, + { url = "https://files.pythonhosted.org/packages/94/93/4f75fd6a4c96f1e9bee198c5dc9a9b57e87a9c38117e1b5e423401886353/winrt_windows_foundation_collections-3.2.1-cp313-cp313-win_amd64.whl", hash = "sha256:5e12a6e75036ee90484c33e204b85fb6785fcc9e7c8066ad65097301f48cdd10", size = 69057, upload-time = "2025-06-06T07:11:19.446Z" }, + { url = "https://files.pythonhosted.org/packages/40/76/de47ccc390017ec5575e7e7fd9f659ee3747c52049cdb2969b1b538ce947/winrt_windows_foundation_collections-3.2.1-cp313-cp313-win_arm64.whl", hash = "sha256:34b556255562f1b36d07fba933c2bcd9f0db167fa96727a6cbb4717b152ad7a2", size = 58792, upload-time = "2025-06-06T07:11:20.24Z" }, +] + +[[package]] +name = "winrt-windows-storage-streams" +version = "3.2.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "winrt-runtime", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/00/50/f4488b07281566e3850fcae1021f0285c9653992f60a915e15567047db63/winrt_windows_storage_streams-3.2.1.tar.gz", hash = "sha256:476f522722751eb0b571bc7802d85a82a3cae8b1cce66061e6e758f525e7b80f", size = 34335, upload-time = "2025-06-06T14:43:23.905Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d9/d2/24d9f59bdc05e741261d5bec3bcea9a848d57714126a263df840e2b515a8/winrt_windows_storage_streams-3.2.1-cp313-cp313-win32.whl", hash = "sha256:401bb44371720dc43bd1e78662615a2124372e7d5d9d65dfa8f77877bbcb8163", size = 127774, upload-time = "2025-06-06T14:02:04.752Z" }, + { url = "https://files.pythonhosted.org/packages/15/59/601724453b885265c7779d5f8025b043a68447cbc64ceb9149d674d5b724/winrt_windows_storage_streams-3.2.1-cp313-cp313-win_amd64.whl", hash = "sha256:202c5875606398b8bfaa2a290831458bb55f2196a39c1d4e5fa88a03d65ef915", size = 131827, upload-time = "2025-06-06T14:02:05.601Z" }, + { url = "https://files.pythonhosted.org/packages/fb/c2/a419675a6087c9ea496968c9b7805ef234afa585b7483e2269608a12b044/winrt_windows_storage_streams-3.2.1-cp313-cp313-win_arm64.whl", hash = "sha256:ca3c5ec0aab60895006bf61053a1aca6418bc7f9a27a34791ba3443b789d230d", size = 128180, upload-time = "2025-06-06T14:02:06.759Z" }, +] + [[package]] name = "wrapt" version = "1.17.2"