File device.py
File List > pyspacemouse > device.py
Go to the documentation of this file
"""SpaceMouse device representation.
This module contains the SpaceMouseDevice class which represents
a connected SpaceMouse device and handles reading/processing input.
Supports context manager protocol for safe resource cleanup:
with pyspacemouse.open() as device:
while True:
state = device.read()
"""
from __future__ import annotations
import timeit
from typing import TYPE_CHECKING, Callable, List, Optional, Sequence
from easyhid import HIDException
from .callbacks import ButtonCallback, Config, DofCallback
from .types import AXIS_NAMES, ButtonState, DeviceInfo, SpaceMouseState
if TYPE_CHECKING:
from easyhid import Device as HIDDevice
# High-accuracy clock for timing
high_acc_clock = timeit.default_timer
def _to_int16(y1: int, y2: int) -> int:
"""Convert two 8-bit bytes to a signed 16-bit integer."""
x = y1 | (y2 << 8)
if x >= 32768:
x = -(65536 - x)
return x
class SpaceMouseDevice:
"""Represents a connected SpaceMouse device.
This class handles reading input from the device, processing HID data,
and invoking callbacks based on state changes.
Supports context manager protocol:
with pyspacemouse.open() as device:
state = device.read()
Attributes:
info: Static device information (read-only)
state: Current device state
connected: Whether the device is currently connected
"""
__slots__ = (
"_info",
"_device",
"_state",
"_last_axis_time",
"_callback",
"_dof_callback",
"_dof_callbacks",
"_button_callback",
"_button_callbacks",
"_nonblocking",
"_product_name",
"_vendor_name",
"_version_number",
"_serial_number",
)
def __init__(self, info: DeviceInfo, device: Optional[HIDDevice] = None) -> None:
"""Initialize the SpaceMouseDevice.
Args:
info: Device specification from loader
device: Optional HID device instance
"""
self._info = info
self._device = device
# Initialize state
self._state = SpaceMouseState(buttons=ButtonState([0] * len(info.button_specs)))
self._last_axis_time = {axis: 0.0 for axis in AXIS_NAMES}
# Callbacks (None by default)
self._callback: Optional[Callable[[SpaceMouseState], None]] = None
self._dof_callback: Optional[Callable[[SpaceMouseState], None]] = None
self._dof_callbacks: Optional[Sequence[DofCallback]] = None
self._button_callback: Optional[Callable[[SpaceMouseState, List[int]], None]] = None
self._button_callbacks: Optional[Sequence[ButtonCallback]] = None
self._nonblocking = True
# Connection details (populated on open)
self._product_name: str = ""
self._vendor_name: str = ""
self._version_number: str = ""
self._serial_number: str = ""
# -------------------------------------------------------------------------
# Context manager protocol
# -------------------------------------------------------------------------
def __enter__(self) -> SpaceMouseDevice:
"""Enter context manager - device is already open."""
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit context manager - close the device."""
self.close()
# -------------------------------------------------------------------------
# Properties
# -------------------------------------------------------------------------
@property
def info(self) -> DeviceInfo:
"""Get the static device information."""
return self._info
@property
def name(self) -> str:
"""Get the device name."""
return self._info.name
@property
def connected(self) -> bool:
"""Check if the device is connected."""
return self._device is not None
@property
def state(self) -> SpaceMouseState:
"""Get the current device state (triggers a read)."""
return self.read()
@property
def product_name(self) -> str:
"""Get the product name from the connected device."""
return self._product_name
@property
def vendor_name(self) -> str:
"""Get the vendor name from the connected device."""
return self._vendor_name
@property
def version_number(self) -> str:
"""Get the version number from the connected device."""
return self._version_number
@property
def serial_number(self) -> str:
"""Get the serial number from the connected device."""
return self._serial_number
# -------------------------------------------------------------------------
# Connection management
# -------------------------------------------------------------------------
def describe_connection(self) -> str:
"""Return a human-readable description of the connection."""
if not self.connected:
return f"{self.name} [disconnected]"
return (
f"{self.name} connected to {self._vendor_name} {self._product_name} "
f"version: {self._version_number} [serial: {self._serial_number}]"
)
def open(self) -> None:
"""Open the connection to the device."""
if self._device is None:
raise RuntimeError("No HID device assigned to this SpaceMouseDevice")
try:
self._device.open()
except HIDException as e:
raise RuntimeError("Failed to open device") from e
# Copy product details
self._product_name = self._device.product_string or ""
self._vendor_name = self._device.manufacturer_string or ""
self._version_number = str(self._device.release_number or "")
# Convert serial number to hex
serial = self._device.serial_number or ""
self._serial_number = "".join(f"{ord(c):02X}" for c in serial)
def close(self) -> None:
"""Close the connection to the device."""
if self._device is not None:
self._device.close()
self._device = None
# -------------------------------------------------------------------------
# Reading and processing
# -------------------------------------------------------------------------
def read(self) -> SpaceMouseState:
"""Read and process data from the device.
Returns:
The current state after processing any available data.
"""
if not self.connected:
return self._state
data = self._device.read(self._info.bytes_to_read)
if data:
self._process(data)
return self._state
def _process(self, data: bytes) -> None:
"""Process incoming HID data and update state."""
dof_changed = False
button_changed = False
# Process axis data
for axis_name, spec in self._info.mappings.items():
if data[0] == spec.channel:
dof_changed = True
if spec.byte1 < len(data) and spec.byte2 < len(data):
raw_value = _to_int16(data[spec.byte1], data[spec.byte2])
scaled_value = spec.scale * raw_value / self._info.axis_scale
setattr(self._state, axis_name, scaled_value)
# Process button data
for btn_idx, spec in enumerate(self._info.button_specs):
if spec.channel is None:
continue
if data[0] == spec.channel:
button_changed = True
mask = 1 << spec.bit
self._state.buttons[btn_idx] = 1 if (data[spec.byte] & mask) else 0
# Update timestamp
self._state.t = high_acc_clock()
# Invoke callbacks
self._invoke_callbacks(dof_changed, button_changed)
def _invoke_callbacks(self, dof_changed: bool, button_changed: bool) -> None:
"""Invoke registered callbacks based on state changes."""
state = self._state
# General callback
if self._callback:
self._callback(state)
# DoF callback
if self._dof_callback and dof_changed:
self._dof_callback(state)
# Per-axis DoF callbacks
if self._dof_callbacks and dof_changed:
now = high_acc_clock()
for dof_cb in self._dof_callbacks:
axis_name = dof_cb.axis
if now >= self._last_axis_time[axis_name] + dof_cb.sleep:
axis_val = getattr(state, axis_name)
if dof_cb.callback_minus is not None:
if axis_val > dof_cb.filter:
dof_cb.callback(state, axis_val)
elif axis_val < -dof_cb.filter:
dof_cb.callback_minus(state, axis_val)
elif abs(axis_val) > dof_cb.filter:
dof_cb.callback(state, axis_val)
self._last_axis_time[axis_name] = now
# General button callback
if self._button_callback and button_changed:
self._button_callback(state, list(state.buttons))
# Per-button callbacks
if self._button_callbacks and button_changed:
for btn_cb in self._button_callbacks:
buttons = btn_cb.buttons
if isinstance(buttons, int):
buttons = [buttons]
if all(state.buttons[b] for b in buttons):
btn_cb.callback(state, list(state.buttons), btn_cb.buttons)
def set_led(self, state: bool) -> None:
"""Set the LED state.
Controls the LED indicator on the SpaceMouse device.
Args:
state: True to turn LED on, False to turn off
Note:
Not all devices have LEDs. If the device doesn't support LED
control (info.led_id is None), this method does nothing.
LED control failures are silently ignored.
"""
if not self.connected:
return
if self._info.led_id is None:
return # Device has no LED
# Send HID output report to control LED
# led_id format: [report_id, on_value]
report_id, on_value = self._info.led_id
led_value = on_value if state else 0x00
try:
self._device.write(bytearray([report_id, led_value]))
except Exception:
pass # Silently fail if LED control not supported
# -------------------------------------------------------------------------
# Configuration
# -------------------------------------------------------------------------
def set_config(self, config: Config) -> None:
"""Apply a configuration object to set callbacks."""
self._callback = config.callback
self._dof_callback = config.dof_callback
self._dof_callbacks = config.dof_callbacks
self._button_callback = config.button_callback
self._button_callbacks = config.button_callbacks
def configure(
self,
callback: Optional[Callable[[SpaceMouseState], None]] = None,
dof_callback: Optional[Callable[[SpaceMouseState], None]] = None,
dof_callbacks: Optional[Sequence[DofCallback]] = None,
button_callback: Optional[Callable[[SpaceMouseState, List[int]], None]] = None,
button_callbacks: Optional[Sequence[ButtonCallback]] = None,
) -> None:
"""Configure callbacks individually."""
self._callback = callback
self._dof_callback = dof_callback
self._dof_callbacks = dof_callbacks
self._button_callback = button_callback
self._button_callbacks = button_callbacks
def clear_callbacks(self) -> None:
"""Remove all registered callbacks."""
self._callback = None
self._dof_callback = None
self._dof_callbacks = None
self._button_callback = None
self._button_callbacks = None
def get_button_name(self, index: int) -> str:
"""Get the name of a button by its index."""
return self._info.get_button_name(index)