diff --git a/dlclivegui/cameras/backends/gentl_backend.py b/dlclivegui/cameras/backends/gentl_backend.py index eb28aea..b55abdf 100644 --- a/dlclivegui/cameras/backends/gentl_backend.py +++ b/dlclivegui/cameras/backends/gentl_backend.py @@ -1,9 +1,10 @@ """GenTL backend implemented using the Harvesters library.""" -# dlclivegui/cameras/backends/gentl_backend.py +# dlclivegui/cameras/backends/gentl_backend.py from __future__ import annotations import logging +import threading import time from pathlib import Path from typing import ClassVar @@ -31,112 +32,97 @@ @register_backend("gentl") class GenTLCameraBackend(CameraBackend): - """Capture frames from GenTL-compatible devices via Harvesters.""" + """Capture frames from GenTL-compatible devices via Harvesters. + + Notes + ----- + Multi-camera operation uses a shared Harvester per CTI set. Some GenTL + producers, including the Imaging Source USB3 Vision producer, can report no + devices if a second independent Harvester enumerates while another camera is + already open/streaming. Therefore open() acquires a shared Harvester and + never calls Harvester.update() during runtime open; initial enumeration is + handled by SharedHarvesterPool when the shared Harvester is created. + """ OPTIONS_KEY: ClassVar[str] = "gentl" - _LEGACY_DEFAULT_CTI_PATTERNS: tuple[str, ...] = ( # Windows-only, ignored on other platforms - r"C:\\Program Files\\The Imaging Source Europe GmbH\\IC4 GenTL Driver for USB3Vision Devices *\\bin\\*.cti", - r"C:\\Program Files\\The Imaging Source Europe GmbH\\TIS Grabber\\bin\\win64_x64\\*.cti", - r"C:\\Program Files\\The Imaging Source Europe GmbH\\TIS Camera SDK\\bin\\win64_x64\\*.cti", - r"C:\\Program Files (x86)\\The Imaging Source Europe GmbH\\TIS Grabber\\bin\\win64_x64\\*.cti", + _OPEN_LOCK: ClassVar[threading.RLock] = threading.RLock() + + _DEFAULT_CTI_PATTERNS: ClassVar[tuple[str, ...]] = ( + # Windows-only defaults; harmless/no-op on other platforms. + r"C:\Program Files\The Imaging Source Europe GmbH\IC4 GenTL Driver for USB3Vision Devices *\bin\*.cti", + r"C:\Program Files\The Imaging Source Europe GmbH\TIS Grabber\bin\win64_x64\*.cti", + r"C:\Program Files\The Imaging Source Europe GmbH\TIS Camera SDK\bin\win64_x64\*.cti", + r"C:\Program Files (x86)\The Imaging Source Europe GmbH\TIS Grabber\bin\win64_x64\*.cti", ) - # Source marker stored in properties["gentl"]["cti_files_source"] - # auto : persisted by auto-discovery (env vars, patterns, etc.). Cache, may be stale, re-discover if missing. - # user : explicitly set by user via properties.gentl.cti_file(s). Cache, strict raise if missing. + _COLOR_PIXEL_FORMATS: ClassVar[tuple[str, ...]] = ( + "BGR8", + "RGB8", + "BayerRG8", + "BayerGB8", + "BayerGR8", + "BayerBG8", + ) + _MONO_PIXEL_FORMATS: ClassVar[tuple[str, ...]] = ( + "Mono8", + "Mono10", + "Mono12", + "Mono16", + ) + + # Source marker stored in properties["gentl"]["cti_files_source"]. + # auto: persisted by auto-discovery; may be stale and can fall back. + # user: explicitly set by user; strict if stale/missing. _CTI_FILES_SOURCE_AUTO: ClassVar[str] = "auto" _CTI_FILES_SOURCE_USER: ClassVar[str] = "user" def __init__(self, settings): super().__init__(settings) - # --- Properties namespace handling (new UI stores backend options under properties["gentl"]) --- props = settings.properties if isinstance(settings.properties, dict) else {} ns = props.get(self.OPTIONS_KEY, {}) if not isinstance(ns, dict): ns = {} - # --- Fast probe mode (CameraProbeWorker sets this) --- - # When fast_start=True, open() should avoid starting acquisition if possible. self._fast_start: bool = bool(ns.get("fast_start", False)) - # --- Stable identity / serial selection --- - # New UI stores stable identity as ns["device_id"], with recommended formats: - # - "serial:" for true serials - # - "fp:" when serial is missing/ambiguous - # - # We keep legacy "serial_number"/"serial" behavior as fallback. raw_device_id = ns.get("device_id") or props.get("device_id") legacy_serial = ns.get("serial_number") or ns.get("serial") or props.get("serial_number") or props.get("serial") self._device_id: str | None = str(raw_device_id).strip() if raw_device_id else None + self._serial_number: str | None = self._serial_from_identity(self._device_id, legacy_serial) - # Decide what to use for actual device selection in open(): - # - If device_id is "serial:XXXX" -> use XXXX as serial_number - # - Otherwise, keep legacy serial if present; open() may still use index if serial is None - self._serial_number: str | None = None - if self._device_id: - did = self._device_id - if did.startswith("serial:"): - self._serial_number = did.split("serial:", 1)[1].strip() or None - elif did.startswith("fp:"): - # fingerprint: not directly usable as serial; rebind_settings should map fp -> index - self._serial_number = legacy_serial # keep legacy if any, otherwise None - else: - # If device_id is provided without prefix, treat it as a "serial-like" value for backward compatibility - self._serial_number = did - else: - self._serial_number = str(legacy_serial).strip() if legacy_serial else None - - # --- Pixel format / image transforms (legacy + backend options) --- - self._pixel_format: str = ns.get("pixel_format") or props.get("pixel_format", "Mono8") + self._pixel_format: str = ns.get("pixel_format") or props.get("pixel_format", "auto") + self._pixel_format = str(self._pixel_format).strip() self._rotate: int = int(ns.get("rotate", props.get("rotate", 0))) % 360 self._crop: tuple[int, int, int, int] | None = self._parse_crop(ns.get("crop", props.get("crop"))) - # --- Exposure / Gain: 0 means Auto (do not set) --- - exp_val = getattr(settings, "exposure", 0) - gain_val = getattr(settings, "gain", 0.0) - - self._exposure: float | None = ( - float(exp_val) if isinstance(exp_val, (int, float)) and float(exp_val) > 0 else None - ) + self._exposure: float | None = self._positive_float(getattr(settings, "exposure", 0)) if self._exposure is None: - v = ns.get("exposure", props.get("exposure")) - try: - self._exposure = float(v) if v is not None and float(v) > 0 else None - except Exception: - self._exposure = None + self._exposure = self._positive_float(ns.get("exposure", props.get("exposure"))) - self._gain: float | None = ( - float(gain_val) if isinstance(gain_val, (int, float)) and float(gain_val) > 0 else None - ) + self._gain: float | None = self._positive_float(getattr(settings, "gain", 0.0)) if self._gain is None: - v = ns.get("gain", props.get("gain")) - try: - self._gain = float(v) if v is not None and float(v) > 0 else None - except Exception: - self._gain = None + self._gain = self._positive_float(ns.get("gain", props.get("gain"))) - # --- Acquisition timeout --- self._timeout: float = float(ns.get("timeout", props.get("timeout", 2.0))) - - # --- Resolution request (None = device default / Auto) --- - # Uses settings.width/settings.height if set; falls back to legacy props["resolution"] if present. self._requested_resolution: tuple[int, int] | None = self._get_requested_resolution_or_none() - # --- Actuals for GUI --- self._actual_width: int | None = None self._actual_height: int | None = None self._actual_fps: float | None = None self._actual_gain: float | None = None self._actual_exposure: float | None = None - # --- Harvesters resources --- self._harvester = None self._acquirer = None + self._shared_entry = None self._device_label: str | None = None - self._cti_files_source_used: str | None = None + # ------------------------------------------------------------------ + # Public telemetry / capabilities + # ------------------------------------------------------------------ + @property def actual_resolution(self) -> tuple[int, int] | None: if self._actual_width and self._actual_height: @@ -170,12 +156,13 @@ def static_capabilities(cls) -> dict[str, SupportLevel]: "stable_identity": SupportLevel.SUPPORTED, } + # ------------------------------------------------------------------ + # Discovery + # ------------------------------------------------------------------ + @classmethod def get_device_count(cls) -> int: - """Get the number of GenTL devices detected by Harvester. - - Returns the number of devices found, or -1 if detection fails. - """ + """Return the number of GenTL devices, or -1 if detection fails.""" if Harvester is None: return -1 @@ -184,1159 +171,962 @@ def get_device_count(cls) -> int: harvester, _, _ = cls._build_harvester_for_discovery(strict_single=False) if harvester is None: return -1 - infos = harvester.device_info_list or [] - return len(infos) + return len(harvester.device_info_list or []) except Exception: return -1 finally: - if harvester is not None: - try: - harvester.reset() - except Exception: - pass - - @staticmethod - def _cti_preflight(path: str) -> tuple[bool, str | None]: - """ - Best-effort check right before calling Harvester.add_file(). - Still subject to race conditions (e.g. file deleted after this check), - but helps diagnose common issues like missing files or permission errors more gracefully and early. - Returns (ok, reason_if_not_ok). - """ - p = Path(str(path)) - try: - if not p.exists(): - return False, "missing at load time" - if not p.is_file(): - return False, "not a file at load time" - # Optional: try opening for read to detect permission/locking issues early - with p.open("rb"): - pass - return True, None - except PermissionError: - return False, "permission denied at load time" - except OSError as e: - return False, f"os error at load time: {e}" - - def _resolve_cti_files_for_settings(self) -> list[str]: - """ - Resolve CTI files to load. - - - User override (properties.gentl.cti_file/cti_files OR legacy properties.cti_file/cti_files): - * strict: must exist, otherwise raise - * source = "user" - - Auto-persisted cache (properties.gentl.cti_files_source == "auto"): - * try persisted ctis first - * if stale/missing, fall back to discovery - * source = "auto" - - Default: discovery (env + configured patterns/dirs) => source = "auto" - - NOTE : legacy properties.cti_file(s) always take strict precedence as user override if present, - even if source marker says "auto". - Never raise just because multiple CTIs exist. - Raise only when none are found (after allowed fallback). - """ - props = self.settings.properties if isinstance(self.settings.properties, dict) else {} - ns = props.get(self.OPTIONS_KEY, {}) - if not isinstance(ns, dict): - ns = {} + cls._safe_reset_harvester(harvester) - # Read source marker - source = ns.get("cti_files_source") - source = str(source).strip().lower() if source is not None else None + @classmethod + def discover_devices( + cls, + *, + max_devices: int = 10, + should_cancel: callable[[], bool] | None = None, + progress_cb: callable[[str], None] | None = None, + ): + """Rich discovery path for CameraFactory.detect_cameras().""" + if Harvester is None: + return [] - # Explicit CTIs (namespace first, then legacy top-level) - ns_cti_files = ns.get("cti_files") - ns_cti_file = ns.get("cti_file") - legacy_cti_files = props.get("cti_files") - legacy_cti_file = props.get("cti_file") + def _canceled() -> bool: + return bool(should_cancel and should_cancel()) - # ------------------------------------------------------------ - # 1) Legacy explicit CTIs: always treat as user override (strict) - # ------------------------------------------------------------ - if legacy_cti_files or legacy_cti_file: - self._cti_files_source_used = self._CTI_FILES_SOURCE_USER + harvester = None + try: + if progress_cb: + progress_cb("Initializing GenTL discovery…") - candidates, diag = cti_finder.discover_cti_files( - cti_file=str(legacy_cti_file) if legacy_cti_file else None, - cti_files=cti_finder.cti_files_as_list(legacy_cti_files) if legacy_cti_files else None, - include_env=False, - must_exist=True, - ) - if not candidates: - raise RuntimeError( - "No valid GenTL producer (.cti) found from properties.cti_file/cti_files.\n\n" - f"Discovery details:\n{diag.summarize()}" - ) - return list(candidates) + harvester, loaded, _ = cls._build_harvester_for_discovery(strict_single=False) + if harvester is None or not loaded: + if progress_cb: + progress_cb("No GenTL producers could be loaded.") + return [] - # ------------------------------------------------------------------------ - # 2) Namespace explicit CTIs: behavior depends on cti_files_source marker - # - source=="auto": treat as cache, stale => fallback to discovery - # - otherwise: strict user override - # ------------------------------------------------------------------------ - if ns_cti_files or ns_cti_file: - is_auto_cache = source == self._CTI_FILES_SOURCE_AUTO + if progress_cb: + progress_cb(f"Loaded {len(loaded)} GenTL producer(s). Scanning devices…") - # Default to "user" if the marker is missing/unknown. - self._cti_files_source_used = self._CTI_FILES_SOURCE_AUTO if is_auto_cache else self._CTI_FILES_SOURCE_USER + infos = list(harvester.device_info_list or []) + limit = min(len(infos), max_devices if max_devices > 0 else len(infos)) + out: list[DetectedCamera] = [] - candidates, diag = cti_finder.discover_cti_files( - cti_file=str(ns_cti_file) if ns_cti_file else None, - cti_files=cti_finder.cti_files_as_list(ns_cti_files) if ns_cti_files else None, - include_env=False, - must_exist=True, - ) + for idx in range(limit): + if _canceled(): + break - if candidates: - return list(candidates) + info = infos[idx] + label = cls._label_from_info(info, idx) + device_id = cls._device_id_from_info(info) - # If auto cache is stale, fall back to discovery - if is_auto_cache: - LOG.info( - "Auto-persisted GenTL CTIs appear stale/missing; falling back to discovery. " - "Persisted cti_file=%s cti_files=%s", - ns_cti_file, - ns_cti_files, - ) - # Fall through to discovery (below) - else: - # User override: strict failure - raise RuntimeError( - "No valid GenTL producer (.cti) found from properties.gentl.cti_file/cti_files.\n\n" - f"Discovery details:\n{diag.summarize()}" + out.append( + DetectedCamera( + index=idx, + label=label, + device_id=device_id, + vid=None, + pid=None, + path=None, + backend_hint=None, + ) ) - # ------------------------------------------------------------ - # 3) Discovery path: env vars + patterns/dirs (source = "auto") - # ------------------------------------------------------------ - self._cti_files_source_used = self._CTI_FILES_SOURCE_AUTO - - search_paths = ns.get("cti_search_paths", props.get("cti_search_paths")) - extra_dirs = ns.get("cti_dirs", props.get("cti_dirs")) + if progress_cb: + progress_cb(f"Found: {label}") - candidates, diag = cti_finder.discover_cti_files( - cti_search_paths=cti_finder.cti_files_as_list(search_paths) if search_paths is not None else None, - include_env=True, - extra_dirs=cti_finder.cti_files_as_list(extra_dirs) if extra_dirs is not None else None, - recursive_env_search=False, - recursive_extra_search=False, - must_exist=True, - ) + out.sort(key=lambda c: c.index) + return out + except Exception: + LOG.debug("GenTL rich discovery failed", exc_info=True) + return [] + finally: + cls._safe_reset_harvester(harvester) - if not candidates: - raise RuntimeError( - "Could not locate any GenTL producer (.cti) file.\n\n" - "Fix options:\n" - " - Set camera.properties.gentl.cti_file to the full path of a .cti file\n" - " - Or set GENICAM_GENTL64_PATH / GENICAM_GENTL32_PATH to include the producer directory\n" - " - Or provide camera.properties.gentl.cti_search_paths with glob patterns\n\n" - f"Discovery details:\n{diag.summarize()}" - ) + @classmethod + def quick_ping(cls, index: int, _unused=None) -> bool: + """Fast presence check by index using a temporary discovery Harvester.""" + if Harvester is None: + return False - return list(candidates) + harvester = None + try: + harvester, _, _ = cls._build_harvester_for_discovery(strict_single=False) + if harvester is None: + return False + infos = harvester.device_info_list or [] + return 0 <= int(index) < len(infos) + except Exception: + return False + finally: + cls._safe_reset_harvester(harvester) @classmethod - def _build_harvester_for_discovery( - cls, - *, - strict_single: bool = False, # retained for optional future use - ): - """ - Build a Harvester instance and load CTI producers for class-level operations - (discover_devices, quick_ping, get_device_count, rebind_settings). - - Default policy: try to load ALL discovered producers. - """ + def _build_harvester_for_discovery(cls, *, strict_single: bool = False): + """Build a temporary Harvester for discovery-only operations.""" if Harvester is None: return None, [], None candidates, diag = cti_finder.discover_cti_files( include_env=True, - cti_search_paths=list(cls._LEGACY_DEFAULT_CTI_PATTERNS), + cti_search_paths=list(cls._DEFAULT_CTI_PATTERNS), must_exist=True, ) - if not candidates: return None, [], diag - # Default: load all candidates cti_files = list(candidates) - - # Optional strict mode (off by default) if strict_single: - # If you ever want strict, use choose_cti_files here; otherwise ignore. cti_files = cti_finder.choose_cti_files( - cti_files, policy=cti_finder.GenTLDiscoveryPolicy.RAISE_IF_MULTIPLE, max_files=1 + cti_files, + policy=cti_finder.GenTLDiscoveryPolicy.RAISE_IF_MULTIPLE, + max_files=1, ) harvester = Harvester() loaded: list[str] = [] - failures: list[tuple[str, str]] = [] for cti in cti_files: ok, reason = cls._cti_preflight(cti) if not ok: - failures.append((str(cti), reason or "Check failed")) LOG.warning("Skipping CTI '%s' during discovery preflight: %s", cti, reason) continue - try: harvester.add_file(cti) loaded.append(cti) except Exception as exc: - failures.append((str(cti), str(exc))) LOG.warning("Failed to load CTI '%s' during discovery: %s", cti, exc) if not loaded: - try: - harvester.reset() - except Exception: - pass + cls._safe_reset_harvester(harvester) return None, [], diag try: harvester.update() except Exception as exc: - LOG.error( - "Harvester.update() failed during discovery: %s" - " Device list not usable, treating as discovery failure." - " CTIs loaded before failure : %s", - exc, - loaded, - ) - try: - harvester.reset() - except Exception: - pass - # Update failure + LOG.error("Harvester.update() failed during discovery: %s. CTIs loaded: %s", exc, loaded) + cls._safe_reset_harvester(harvester) return None, [], diag return harvester, loaded, diag - def open(self) -> None: - if Harvester is None: # pragma: no cover - raise RuntimeError( - "The 'harvesters' package is required for the GenTL backend. Install it via 'pip install harvesters'." - ) + # ------------------------------------------------------------------ + # Settings rebinding + # ------------------------------------------------------------------ - # Ensure properties namespace exists for persistence back to UI - if not isinstance(self.settings.properties, dict): - self.settings.properties = {} - props = self.settings.properties - ns = props.get(self.OPTIONS_KEY, {}) + @classmethod + def rebind_settings(cls, settings): + """Map stable identity to current index when necessary. + + Serial identities are stable enough for open() to select directly, so + they intentionally avoid extra Harvester enumeration during multi-camera + startup. + """ + if Harvester is None: + return settings + + props = settings.properties if isinstance(settings.properties, dict) else {} + ns = props.get(cls.OPTIONS_KEY, {}) if not isinstance(ns, dict): ns = {} - props[self.OPTIONS_KEY] = ns - - # Resolve CTIs (may return many). This no longer raises just because there are multiple. - cti_files = self._resolve_cti_files_for_settings() - ns["cti_files_source"] = ( - self._cti_files_source_used or ns.get("cti_files_source") or self._CTI_FILES_SOURCE_AUTO - ) - self._harvester = Harvester() + target_id = ns.get("device_id") or ns.get("serial_number") or ns.get("serial") + if not target_id: + return settings - loaded: list[str] = [] - failed: list[tuple[str, str]] = [] + target_id_str = str(target_id).strip() + if target_id_str.startswith("serial:"): + cls._persist_serial_identity(settings, target_id_str) + return settings - for cti in cti_files: - ok, reason = self._cti_preflight(cti) - if not ok: - failed.append((str(cti), reason or "preflight failed")) - LOG.warning("Skipping CTI '%s': %s", cti, reason) - continue + # Non-serial fallback retained for older configs / fingerprint IDs. + harvester = None + try: + explicit_files = ns.get("cti_files") or props.get("cti_files") + explicit_file = ns.get("cti_file") or props.get("cti_file") + source = str(ns.get("cti_files_source", "")).strip().lower() + is_auto_cache = source == cls._CTI_FILES_SOURCE_AUTO - try: - self._harvester.add_file(cti) - loaded.append(cti) - except Exception as exc: - failed.append((str(cti), str(exc))) - LOG.warning("Failed to load CTI '%s': %s", cti, exc) + if explicit_files or explicit_file: + candidates, _ = cti_finder.discover_cti_files( + cti_file=explicit_file, + cti_files=cti_finder.cti_files_as_list(explicit_files), + include_env=False, + must_exist=True, + ) + if not candidates and is_auto_cache: + harvester, _, _ = cls._build_harvester_for_discovery(strict_single=False) + elif candidates: + harvester = Harvester() + loaded = [] + for cti in candidates: + try: + harvester.add_file(cti) + loaded.append(cti) + except Exception: + continue + if not loaded: + return settings + harvester.update() + else: + harvester, _, _ = cls._build_harvester_for_discovery(strict_single=False) - # Persist diagnostics for UI / debugging - ns["cti_files"] = [str(p) for p in cti_files] # all resolved candidates - ns["cti_files_loaded"] = [str(p) for p in loaded] # successfully added to harvester - ns["cti_files_failed"] = [{"cti": c, "error": e} for c, e in failed] # load failures + if harvester is None: + return settings - # Keep single-cti convenience key for backward compatibility / display - if loaded: - ns["cti_file"] = str(loaded[0]) - elif cti_files: - ns["cti_file"] = str(cti_files[0]) # best effort + infos = list(harvester.device_info_list or []) + match_index, match_serial = cls._match_device(infos, target_id_str) + if match_index is None: + return settings - if not loaded: - self._reset_harvester() - raise RuntimeError( - "No GenTL producer (.cti) could be loaded.\n\n" - f"Resolved CTIs: {cti_files}\n" - f"Failures: {failed}\n" - "Fix: remove/repair incompatible producers or " - "set properties.gentl.cti_file to a known working producer." - ) + settings.index = int(match_index) + ns2 = cls._ensure_ns_for_settings(settings) + ns2["device_id"] = target_id_str + if match_serial: + ns2["serial_number"] = str(match_serial) + return settings + except Exception: + return settings + finally: + cls._safe_reset_harvester(harvester) - # Update device list after loading producers - self._harvester.update() + # ------------------------------------------------------------------ + # Open / read / close + # ------------------------------------------------------------------ - if not self._harvester.device_info_list: - self._reset_harvester() + def open(self) -> None: + if Harvester is None: # pragma: no cover raise RuntimeError( - "No GenTL cameras detected via Harvesters after loading producers.\n\n" - f"Loaded CTIs: {loaded}\n" - f"Failed CTIs: {failed}\n" - "Fix: ensure your camera vendor's GenTL producer is installed and working." + "The 'harvesters' package is required for the GenTL backend. Install it via 'pip install harvesters'." ) - infos = list(self._harvester.device_info_list) - - # Helper: robustly read device_info fields (dict-like or attribute-like) - def _info_get(info, key: str, default=None): - try: - if hasattr(info, "get"): - v = info.get(key) - if v is not None: - return v - except Exception: - pass + with type(self)._OPEN_LOCK: + loaded, failed = self._resolve_and_persist_ctis() try: - v = getattr(info, key, None) - if v is not None: - return v - except Exception: - pass - return default - - # ------------------------------------------------------------------ - # Device selection (stable device_id > serial > index) - # ------------------------------------------------------------------ - requested_index = int(self.settings.index or 0) - selected_index: int | None = None - selected_serial: str | None = None - - target_device_id = self._device_id or ns.get("device_id") or props.get("device_id") - if target_device_id: - target_device_id = str(target_device_id).strip() + infos = self._acquire_shared_harvester(loaded) + if not infos: + self._reset_harvester() + raise RuntimeError( + "No GenTL cameras detected via Harvesters after loading producers.\n\n" + f"Loaded CTIs: {loaded}\n" + f"Failed CTIs: {failed}\n" + "Fix: ensure your camera vendor's GenTL producer is installed and working." + ) - # Exact match against computed device_id - for idx, info in enumerate(infos): + selected_index, selected_serial, selected_info = self._select_device(infos) + self.settings.index = int(selected_index) + + with self._shared_entry.lock: + self._acquirer = self._create_image_acquirer(selected_serial, int(selected_index)) + node_map = self._acquirer.remote_device.node_map + self._device_label = self._resolve_device_label(node_map) + + self._configure_pixel_format(node_map) + self._configure_trigger(node_map) + self._configure_resolution(node_map) + self._configure_exposure(node_map) + self._configure_gain(node_map) + self._configure_frame_rate(node_map) + self._read_telemetry(node_map) + self._persist_device_metadata(selected_info, selected_serial) + + if self._fast_start: + LOG.info("GenTL open() in fast_start probe mode: acquisition not started.") + return + + self._acquirer.start() + + LOG.debug( + "Opened GenTL camera index=%s serial=%s label=%s", + selected_index, + selected_serial, + self._device_label, + ) + except Exception as exc: try: - did = self._device_id_from_info(info) + self.close() except Exception: - did = None - if did and did == target_device_id: - selected_index = idx - selected_serial = _info_get(info, "serial_number", None) - selected_serial = str(selected_serial).strip() if selected_serial else None - break - - # If device_id is "serial:XXXX", match serial directly - if selected_index is None and target_device_id.startswith("serial:"): - serial_target = target_device_id.split("serial:", 1)[1].strip() - if serial_target: - exact = [] - for idx, info in enumerate(infos): - sn = _info_get(info, "serial_number", "") - sn = str(sn).strip() if sn is not None else "" - if sn == serial_target: - exact.append((idx, sn)) - if exact: - selected_index = exact[0][0] - selected_serial = exact[0][1] - else: - sub = [] - for idx, info in enumerate(infos): - sn = _info_get(info, "serial_number", "") - sn = str(sn).strip() if sn is not None else "" - if serial_target and serial_target in sn: - sub.append((idx, sn)) - if len(sub) == 1: - selected_index = sub[0][0] - selected_serial = sub[0][1] or None - elif len(sub) > 1: - candidates = [sn for _, sn in sub] - raise RuntimeError( - f"Ambiguous GenTL serial match for '{serial_target}'. Candidates: {candidates}" - ) - - # Legacy serial selection fallback - if selected_index is None: - serial = self._serial_number - if serial: - serial = str(serial).strip() - exact = [] - for idx, info in enumerate(infos): - sn = _info_get(info, "serial_number", "") - sn = str(sn).strip() if sn is not None else "" - if sn == serial: - exact.append((idx, sn)) - if exact: - selected_index = exact[0][0] - selected_serial = exact[0][1] - else: - sub = [] - for idx, info in enumerate(infos): - sn = _info_get(info, "serial_number", "") - sn = str(sn).strip() if sn is not None else "" - if serial and serial in sn: - sub.append((idx, sn)) - if len(sub) == 1: - selected_index = sub[0][0] - selected_serial = sub[0][1] or None - elif len(sub) > 1: - candidates = [sn for _, sn in sub] - raise RuntimeError(f"Ambiguous GenTL serial match for '{serial}'. Candidates: {candidates}") - else: - available = [str(_info_get(i, "serial_number", "")).strip() for i in infos] - raise RuntimeError(f"Camera with serial '{serial}' not found. Available cameras: {available}") - - # Index fallback - if selected_index is None: - device_count = len(infos) - if requested_index < 0 or requested_index >= device_count: - raise RuntimeError(f"Camera index {requested_index} out of range for {device_count} GenTL device(s)") - selected_index = requested_index - sn = _info_get(infos[selected_index], "serial_number", "") - selected_serial = str(sn).strip() if sn else None + pass + raise RuntimeError( + f"Failed to open GenTL camera.\n\nLoaded CTIs: {loaded}\nFailed CTIs: {failed}\nReason: {exc}" + ) from exc - # Update settings.index to actual selected index (UI stability) - self.settings.index = int(selected_index) - selected_info = infos[int(selected_index)] + def read(self) -> tuple[np.ndarray, float]: + if self._acquirer is None: + raise RuntimeError("GenTL image acquirer not initialised") - # Create ImageAcquirer via Harvester.create(...) try: - if selected_serial: - self._acquirer = self._harvester.create({"serial_number": str(selected_serial)}) - else: - self._acquirer = self._harvester.create(int(selected_index)) - except TypeError: - if selected_serial: - self._acquirer = self._harvester.create({"serial_number": str(selected_serial)}) - else: - self._acquirer = self._harvester.create(index=int(selected_index)) - - remote = self._acquirer.remote_device - node_map = remote.node_map + with self._acquirer.fetch(timeout=self._timeout) as buffer: + component = buffer.payload.components[0] + channels = 3 if self._pixel_format in {"RGB8", "BGR8"} else 1 + array = np.asarray(component.data) + expected = component.height * component.width * channels + if array.size != expected: + array = np.frombuffer(bytes(component.data), dtype=array.dtype) - self._device_label = self._resolve_device_label(node_map) + try: + if channels > 1: + frame = array.reshape(component.height, component.width, channels).copy() + else: + frame = array.reshape(component.height, component.width).copy() + except ValueError: + frame = array.copy() + except HarvesterTimeoutError as exc: + raise TimeoutError(str(exc) + " (GenTL timeout)") from exc - # Apply configuration - self._configure_pixel_format(node_map) - self._configure_resolution(node_map) - self._configure_exposure(node_map) - self._configure_gain(node_map) - self._configure_frame_rate(node_map) + frame = self._convert_frame(frame) + timestamp = time.time() - # Read back telemetry - try: - self._actual_width = int(node_map.Width.value) - self._actual_height = int(node_map.Height.value) - except Exception: - pass + if self._actual_width is None or self._actual_height is None: + h, w = frame.shape[:2] + self._actual_width = int(w) + self._actual_height = int(h) - try: - self._actual_fps = float(node_map.ResultingFrameRate.value) - except Exception: - self._actual_fps = None + if self._actual_exposure is None or self._actual_gain is None: + try: + self._read_telemetry(self._acquirer.remote_device.node_map) + except Exception: + pass - try: - self._actual_exposure = float(node_map.ExposureTime.value) - except Exception: - self._actual_exposure = None + return frame, timestamp - try: - self._actual_gain = float(node_map.Gain.value) - except Exception: - self._actual_gain = None + def stop(self) -> None: + if self._acquirer is not None: + try: + self._call_with_optional_lock(self._acquirer.stop) + except Exception: + pass - # Persist identity + metadata - computed_id = None - try: - computed_id = self._device_id_from_info(selected_info) - except Exception: - computed_id = None + def close(self) -> None: + if self._acquirer is not None: + try: + self._call_with_optional_lock(self._acquirer.stop) + except Exception: + pass - if computed_id: - ns["device_id"] = computed_id - elif selected_serial: - ns["device_id"] = f"serial:{selected_serial}" + try: + destroy = getattr(self._acquirer, "destroy", None) + if destroy is not None: + self._call_with_optional_lock(destroy) + finally: + self._acquirer = None - if selected_serial: - ns["serial_number"] = str(selected_serial) - ns["device_serial_number"] = str(selected_serial) + if self._harvester is not None or self._shared_entry is not None: + self._reset_harvester() - if self._device_label: - ns["device_name"] = str(self._device_label) + self._device_label = None - ns["device_display_name"] = str(_info_get(selected_info, "display_name", "") or "") - ns["device_info_id"] = str(_info_get(selected_info, "id_", "") or "") - ns["device_vendor"] = str(_info_get(selected_info, "vendor", "") or "") - ns["device_model"] = str(_info_get(selected_info, "model", "") or "") - ns["device_tl_type"] = str(_info_get(selected_info, "tl_type", "") or "") - ns["device_user_defined_name"] = str(_info_get(selected_info, "user_defined_name", "") or "") - ns["device_version"] = str(_info_get(selected_info, "version", "") or "") - ns["device_access_status"] = _info_get(selected_info, "access_status", None) - - # Start acquisition unless fast_start - if getattr(self, "_fast_start", False): - LOG.info("GenTL open() in fast_start probe mode: acquisition not started.") - return + # ------------------------------------------------------------------ + # CTI / shared Harvester helpers + # ------------------------------------------------------------------ - self._acquirer.start() + def _resolve_and_persist_ctis(self) -> tuple[list[str], list[tuple[str, str]]]: + ns = self._ensure_settings_ns() + ns.setdefault("cti_search_paths", list(self._DEFAULT_CTI_PATTERNS)) + ns.setdefault("cti_files_source", self._CTI_FILES_SOURCE_AUTO) - @staticmethod - def _device_id_from_info(info) -> str | None: - """ - Build a stable-ish device identifier from Harvester device_info_list entries. - This helper supports both dict-like and attribute-like representations. - """ + cti_files = self._resolve_cti_files_for_settings() + ns["cti_files_source"] = ( + self._cti_files_source_used or ns.get("cti_files_source") or self._CTI_FILES_SOURCE_AUTO + ) - def _read(name: str): - # dict-like - try: - if hasattr(info, "get"): - v = info.get(name) # type: ignore[attr-defined] - if v is not None: - return v - except Exception: - pass - # attribute-like - try: - return getattr(info, name, None) - except Exception: - return None - - def _get(*names: str) -> str | None: - for n in names: - v = _read(n) - if v is None: - continue - s = str(v).strip() - if s: - return s - return None + loaded: list[str] = [] + failed: list[tuple[str, str]] = [] + for cti in cti_files: + ok, reason = self._cti_preflight(cti) + if ok: + loaded.append(str(cti)) + else: + failed.append((str(cti), reason or "preflight failed")) + LOG.warning("Skipping CTI '%s': %s", cti, reason) - # Prefer serial if present (best stable key when available) - serial = _get("serial_number", "SerialNumber", "device_serial_number", "sn", "serial") - if serial: - return f"serial:{serial}" + ns["cti_files"] = [str(p) for p in cti_files] + ns["cti_files_loaded"] = loaded[:] + ns["cti_files_failed"] = [{"cti": c, "error": e} for c, e in failed] + if loaded: + ns["cti_file"] = loaded[0] + elif cti_files: + ns["cti_file"] = str(cti_files[0]) - # Fallback components (best-effort; names may vary per producer) - vendor = _get("vendor", "vendor_name", "manufacturer", "DeviceVendorName") - model = _get("model", "model_name", "DeviceModelName") - user_id = _get("user_defined_name", "user_id", "DeviceUserID", "DeviceUserId", "device_user_id") - tl_type = _get("tl_type", "transport_layer_type", "DeviceTLType") + if not loaded: + self._reset_harvester() + raise RuntimeError( + "No GenTL producer (.cti) could be loaded.\n\n" + f"Resolved CTIs: {cti_files}\n" + f"Failures: {failed}\n" + "Fix: remove/repair incompatible producers " + "or set properties.gentl.cti_file to a known working producer." + ) - unique = _get("id_", "id", "device_id", "uid", "guid", "mac_address", "interface_id", "display_name") + return loaded, failed - parts = [] - for k, v in (("vendor", vendor), ("model", model), ("user", user_id), ("tl", tl_type), ("uid", unique)): - if v: - parts.append(f"{k}={v}") + def _acquire_shared_harvester(self, loaded: list[str]) -> list: + ns = self._ensure_settings_ns() + try: + self._shared_entry = cti_finder.SharedHarvesterPool.acquire(loaded) + self._harvester = self._shared_entry.harvester + + actual_loaded = list(getattr(self._shared_entry, "loaded_files", loaded)) + actual_failed = dict(getattr(self._shared_entry, "failed_files", {})) + + ns["cti_files_loaded"] = actual_loaded + if actual_failed: + existing_failed = ns.get("cti_files_failed") + merged_failed = list(existing_failed) if isinstance(existing_failed, list) else [] + merged_failed.extend({"cti": str(cti), "error": str(error)} for cti, error in actual_failed.items()) + ns["cti_files_failed"] = merged_failed + + with self._shared_entry.lock: + infos = list(self._harvester.device_info_list or []) + + LOG.debug( + "Using shared GenTL Harvester for %d device(s), refcount=%s", + len(infos), + cti_finder.SharedHarvesterPool.get_refcount(self._shared_entry), + ) + return infos - if not parts: - return None + except Exception as exc: + exc_loaded = list(getattr(exc, "loaded_files", [])) + exc_failed = dict(getattr(exc, "failed_files", {})) - return "fp:" + "|".join(parts) + if exc_loaded or exc_failed: + ns["cti_files_loaded"] = [str(p) for p in exc_loaded] + existing_failed = ns.get("cti_files_failed") + merged_failed = list(existing_failed) if isinstance(existing_failed, list) else [] + merged_failed.extend({"cti": str(cti), "error": str(error)} for cti, error in exc_failed.items()) + ns["cti_files_failed"] = merged_failed - @classmethod - def discover_devices( - cls, - *, - max_devices: int = 10, - should_cancel: callable[[], bool] | None = None, - progress_cb: callable[[str], None] | None = None, - ): - """ - Rich discovery path for CameraFactory.detect_cameras(). - Returns a list of DetectedCamera with device_id filled when possible. + if self._shared_entry is not None: + try: + cti_finder.SharedHarvesterPool.release(self._shared_entry) + except Exception: + pass - Cross-platform CTI discovery: - - Uses GENICAM_GENTL64_PATH / GENICAM_GENTL32_PATH when available - - Falls back to built-in Windows patterns - - Best-effort loads multiple CTI producers - """ - if Harvester is None: - return [] + self._shared_entry = None + self._harvester = None - def _canceled() -> bool: - return bool(should_cancel and should_cancel()) + raise RuntimeError( + f"Failed to initialize shared GenTL producer state.\n\nCTIs: {loaded}\nReason: {exc}" + ) from exc - harvester = None + def _reset_harvester(self) -> None: try: - if progress_cb: - progress_cb("Initializing GenTL discovery…") - - harvester, loaded, _ = cls._build_harvester_for_discovery(strict_single=False) + if self._shared_entry is not None: + cti_finder.SharedHarvesterPool.release(self._shared_entry) + self._shared_entry = None + else: + self._reset_select_harvester(self._harvester) + finally: + self._harvester = None - if harvester is None or not loaded: - if progress_cb: - progress_cb("No GenTL producers could be loaded.") - return [] + @staticmethod + def _reset_select_harvester(harvester) -> None: + GenTLCameraBackend._safe_reset_harvester(harvester) - if progress_cb: - progress_cb(f"Loaded {len(loaded)} GenTL producer(s). Scanning devices…") + @staticmethod + def _safe_reset_harvester(harvester) -> None: + if harvester is not None: + try: + harvester.reset() + except Exception: + pass - infos = list(harvester.device_info_list or []) - if not infos: - return [] + @staticmethod + def _cti_preflight(path: str) -> tuple[bool, str | None]: + p = Path(str(path)) + try: + if not p.exists(): + return False, "missing at load time" + if not p.is_file(): + return False, "not a file at load time" + with p.open("rb"): + pass + return True, None + except PermissionError: + return False, "permission denied at load time" + except OSError as e: + return False, f"os error at load time: {e}" - out: list[DetectedCamera] = [] - limit = min(len(infos), max_devices if max_devices > 0 else len(infos)) + def _resolve_cti_files_for_settings(self) -> list[str]: + """Resolve CTI files using explicit user overrides, auto cache, then discovery.""" + props = self.settings.properties if isinstance(self.settings.properties, dict) else {} + ns = props.get(self.OPTIONS_KEY, {}) + if not isinstance(ns, dict): + ns = {} - for idx in range(limit): - if _canceled(): - break + source = ns.get("cti_files_source") + source = str(source).strip().lower() if source is not None else None - info = infos[idx] + ns_cti_files = ns.get("cti_files") + ns_cti_file = ns.get("cti_file") + legacy_cti_files = props.get("cti_files") + legacy_cti_file = props.get("cti_file") - display_name = None - try: - display_name = ( - info.get("display_name") if hasattr(info, "get") else getattr(info, "display_name", None) - ) - except Exception: - display_name = None + if legacy_cti_files or legacy_cti_file: + self._cti_files_source_used = self._CTI_FILES_SOURCE_USER + candidates, diag = cti_finder.discover_cti_files( + cti_file=str(legacy_cti_file) if legacy_cti_file else None, + cti_files=cti_finder.cti_files_as_list(legacy_cti_files) if legacy_cti_files else None, + include_env=False, + must_exist=True, + ) + if not candidates: + raise RuntimeError( + "No valid GenTL producer (.cti) found from properties.cti_file/cti_files.\n\n" + f"Discovery details:\n{diag.summarize()}" + ) + return list(candidates) - if display_name: - label = str(display_name).strip() - else: - vendor = ( - getattr(info, "vendor", None) or (info.get("vendor") if hasattr(info, "get") else None) or "" - ) - model = getattr(info, "model", None) or (info.get("model") if hasattr(info, "get") else None) or "" - serial = ( - getattr(info, "serial_number", None) - or (info.get("serial_number") if hasattr(info, "get") else None) - or "" - ) - vendor, model, serial = str(vendor).strip(), str(model).strip(), str(serial).strip() - label = f"{vendor} {model}".strip() if (vendor or model) else f"GenTL device {idx}" - if serial: - label = f"{label} ({serial})" + if ns_cti_files or ns_cti_file: + is_auto_cache = source == self._CTI_FILES_SOURCE_AUTO + self._cti_files_source_used = self._CTI_FILES_SOURCE_AUTO if is_auto_cache else self._CTI_FILES_SOURCE_USER + candidates, diag = cti_finder.discover_cti_files( + cti_file=str(ns_cti_file) if ns_cti_file else None, + cti_files=cti_finder.cti_files_as_list(ns_cti_files) if ns_cti_files else None, + include_env=False, + must_exist=True, + ) + if candidates: + return list(candidates) + if not is_auto_cache: + raise RuntimeError( + "No valid GenTL producer (.cti) found from properties.gentl.cti_file/cti_files.\n\n" + f"Discovery details:\n{diag.summarize()}" + ) + LOG.info("Auto-persisted GenTL CTIs stale/missing; falling back to discovery.") - device_id = cls._device_id_from_info(info) + self._cti_files_source_used = self._CTI_FILES_SOURCE_AUTO + search_paths = ns.get("cti_search_paths", props.get("cti_search_paths")) + extra_dirs = ns.get("cti_dirs", props.get("cti_dirs")) + search_patterns = ( + cti_finder.cti_files_as_list(search_paths) if search_paths is not None else list(self._DEFAULT_CTI_PATTERNS) + ) - out.append( - DetectedCamera( - index=idx, - label=label, - device_id=device_id, - vid=None, - pid=None, - path=None, - backend_hint=None, - ) - ) + candidates, diag = cti_finder.discover_cti_files( + cti_search_paths=search_patterns, + include_env=True, + extra_dirs=cti_finder.cti_files_as_list(extra_dirs) if extra_dirs is not None else None, + recursive_env_search=False, + recursive_extra_search=False, + must_exist=True, + ) + if not candidates: + raise RuntimeError( + "Could not locate any GenTL producer (.cti) file.\n\n" + "Fix options:\n" + " - Set camera.properties.gentl.cti_file to the full path of a .cti file\n" + " - Or set GENICAM_GENTL64_PATH / GENICAM_GENTL32_PATH to include the producer directory\n" + " - Or provide camera.properties.gentl.cti_search_paths with glob patterns\n\n" + f"Discovery details:\n{diag.summarize(redact_env=False)}" + ) + return list(candidates) - if progress_cb: - progress_cb(f"Found: {label}") + # ------------------------------------------------------------------ + # Device selection / identity helpers + # ------------------------------------------------------------------ - out.sort(key=lambda c: c.index) - return out + def _select_device(self, infos: list) -> tuple[int, str | None, object]: + requested_index = int(self.settings.index or 0) + target_device_id = self._device_id or self._ensure_settings_ns().get("device_id") - except Exception: - return [] - finally: - if harvester is not None: - try: - harvester.reset() - except Exception: - pass + selected_index: int | None = None + selected_serial: str | None = None - @classmethod - def rebind_settings(cls, settings): - """ - If a stable identity exists in settings.properties['gentl'], map it to the - correct current index (and serial_number if available). - - Strategy: - - If CTIs were persisted: - * if source == "auto" and they are stale -> fall back to discovery - * otherwise use them (best stability) - - Otherwise, fall back to env-var + pattern discovery (best-effort). - """ - if Harvester is None: - return settings + if target_device_id: + target = str(target_device_id).strip() + selected_index, selected_serial = self._match_device(infos, target) + if selected_index is None: + available = [str(self._info_get(i, "serial_number", "") or "").strip() for i in infos] + raise RuntimeError(f"GenTL device '{target}' not found. Available serials: {available}") + + elif self._serial_number: + serial = str(self._serial_number).strip() + selected_index, selected_serial = self._match_device(infos, serial) + if selected_index is None: + available = [str(self._info_get(i, "serial_number", "") or "").strip() for i in infos] + raise RuntimeError(f"GenTL camera with serial '{serial}' not found. Available serials: {available}") - props = settings.properties if isinstance(settings.properties, dict) else {} - ns = props.get(cls.OPTIONS_KEY, {}) - if not isinstance(ns, dict): - ns = {} + else: + if requested_index < 0 or requested_index >= len(infos): + raise RuntimeError(f"Camera index {requested_index} out of range for {len(infos)} GenTL device(s)") + selected_index = requested_index + serial = self._info_get(infos[selected_index], "serial_number", "") + selected_serial = str(serial).strip() if serial else None - target_id = ns.get("device_id") or ns.get("serial_number") or ns.get("serial") - if not target_id: - return settings + return int(selected_index), selected_serial, infos[int(selected_index)] - source = ns.get("cti_files_source") - source = str(source).strip().lower() if source is not None else None - is_auto_cache = source == cls._CTI_FILES_SOURCE_AUTO + @classmethod + def _match_device(cls, infos: list, target: str) -> tuple[int | None, str | None]: + if not target: + return None, None + + serial_target = target.split("serial:", 1)[1].strip() if target.startswith("serial:") else target + + for idx, info in enumerate(infos): + if cls._device_id_from_info(info) == target: + serial = cls._info_get(info, "serial_number", None) + return idx, str(serial).strip() if serial else None + + exact: list[tuple[int, str]] = [] + for idx, info in enumerate(infos): + sn = str(cls._info_get(info, "serial_number", "") or "").strip() + if sn == serial_target: + exact.append((idx, sn)) + if exact: + return exact[0] + + partial = [] + for idx, info in enumerate(infos): + sn = str(cls._info_get(info, "serial_number", "") or "").strip() + if serial_target and serial_target in sn: + partial.append((idx, sn)) + if len(partial) == 1: + return partial[0] + if len(partial) > 1: + raise RuntimeError( + f"Ambiguous GenTL serial match for '{serial_target}'. Candidates: {[sn for _, sn in partial]}" + ) - harvester = None - try: - explicit_files = ns.get("cti_files") or props.get("cti_files") - explicit_file = ns.get("cti_file") or props.get("cti_file") + return None, None - if explicit_files or explicit_file: - candidates, _diag = cti_finder.discover_cti_files( - cti_file=explicit_file, - cti_files=cti_finder.cti_files_as_list(explicit_files), - include_env=False, - must_exist=True, - ) + @staticmethod + def _device_id_from_info(info) -> str | None: + serial = GenTLCameraBackend._first_info_value( + info, + "serial_number", + "SerialNumber", + "device_serial_number", + "sn", + "serial", + ) + if serial: + return f"serial:{serial}" - if not candidates and is_auto_cache: - # Auto cache stale -> fallback to discovery - harvester, _loaded, _diag2 = cls._build_harvester_for_discovery(strict_single=False) - if harvester is None: - return settings - elif not candidates: - # User override stale or unknown -> no rebind - return settings - else: - harvester = Harvester() - loaded: list[str] = [] - for cti in candidates: - try: - harvester.add_file(cti) - loaded.append(cti) - except Exception: - continue - if not loaded: - cls._reset_select_harvester(harvester) - if is_auto_cache: - harvester, _loaded, _diag2 = cls._build_harvester_for_discovery(strict_single=False) - if harvester is None: - return settings - else: - return settings - else: - harvester.update() - else: - harvester, _loaded, _diag = cls._build_harvester_for_discovery(strict_single=False) - if harvester is None: - return settings + parts = [] + for key, names in ( + ("vendor", ("vendor", "vendor_name", "manufacturer", "DeviceVendorName")), + ("model", ("model", "model_name", "DeviceModelName")), + ("user", ("user_defined_name", "user_id", "DeviceUserID", "DeviceUserId", "device_user_id")), + ("tl", ("tl_type", "transport_layer_type", "DeviceTLType")), + ("uid", ("id_", "id", "device_id", "uid", "guid", "mac_address", "interface_id", "display_name")), + ): + value = GenTLCameraBackend._first_info_value(info, *names) + if value: + parts.append(f"{key}={value}") + return "fp:" + "|".join(parts) if parts else None - infos = list(harvester.device_info_list or []) - if not infos: - return settings + @staticmethod + def _first_info_value(info, *names: str) -> str | None: + for name in names: + value = GenTLCameraBackend._info_get(info, name, None) + if value is not None and str(value).strip(): + return str(value).strip() + return None - target_id_str = str(target_id).strip() - match_index = None - match_serial = None + @staticmethod + def _info_get(info, key: str, default=None): + try: + if hasattr(info, "get"): + value = info.get(key) + if value is not None: + return value + except Exception: + pass + try: + value = getattr(info, key, None) + if value is not None: + return value + except Exception: + pass + return default - # 1) Exact match by computed device_id - for idx, info in enumerate(infos): - dev_id = cls._device_id_from_info(info) - if dev_id and dev_id == target_id_str: - match_index = idx - match_serial = getattr(info, "serial_number", None) - break + @staticmethod + def _label_from_info(info, index: int) -> str: + display = GenTLCameraBackend._info_get(info, "display_name", None) + if display: + return str(display).strip() - # 2) Fallback: treat target as serial-ish substring - if match_index is None: - for idx, info in enumerate(infos): - serial = getattr(info, "serial_number", None) - if serial and target_id_str in str(serial): - match_index = idx - match_serial = serial - break + vendor = str(GenTLCameraBackend._info_get(info, "vendor", "") or "").strip() + model = str(GenTLCameraBackend._info_get(info, "model", "") or "").strip() + serial = str(GenTLCameraBackend._info_get(info, "serial_number", "") or "").strip() + label = f"{vendor} {model}".strip() if (vendor or model) else f"GenTL device {index}" + return f"{label} ({serial})" if serial else label - if match_index is None: - return settings + @staticmethod + def _serial_from_identity(device_id: str | None, legacy_serial) -> str | None: + if device_id: + did = str(device_id).strip() + if did.startswith("serial:"): + return did.split("serial:", 1)[1].strip() or None + if not did.startswith("fp:"): + return did + return str(legacy_serial).strip() if legacy_serial else None - # Apply rebinding - settings.index = int(match_index) + @classmethod + def _persist_serial_identity(cls, settings, device_id: str) -> None: + serial = device_id.split("serial:", 1)[1].strip() + if not serial: + return + ns = cls._ensure_ns_for_settings(settings) + ns["device_id"] = device_id + ns["serial_number"] = serial - # Ensure namespace exists - if not isinstance(settings.properties, dict): - settings.properties = {} - ns2 = settings.properties.setdefault(cls.OPTIONS_KEY, {}) - if not isinstance(ns2, dict): - ns2 = {} - settings.properties[cls.OPTIONS_KEY] = ns2 + def _persist_device_metadata(self, selected_info, selected_serial: str | None) -> None: + ns = self._ensure_settings_ns() + computed_id = self._device_id_from_info(selected_info) - if match_serial: - ns2["serial_number"] = str(match_serial) - ns2["device_id"] = target_id_str + if computed_id: + ns["device_id"] = computed_id + elif selected_serial: + ns["device_id"] = f"serial:{selected_serial}" - return settings + if selected_serial: + ns["serial_number"] = str(selected_serial) + ns["device_serial_number"] = str(selected_serial) - except Exception: - return settings - finally: - if harvester is not None: - try: - harvester.reset() - except Exception: - pass + if self._device_label: + ns["device_name"] = str(self._device_label) + + for out_key, info_key in ( + ("device_display_name", "display_name"), + ("device_info_id", "id_"), + ("device_vendor", "vendor"), + ("device_model", "model"), + ("device_tl_type", "tl_type"), + ("device_user_defined_name", "user_defined_name"), + ("device_version", "version"), + ("device_access_status", "access_status"), + ): + value = self._info_get(selected_info, info_key, "") + ns[out_key] = value if out_key == "device_access_status" else str(value or "") @classmethod - def quick_ping(cls, index: int, _unused=None) -> bool: - """ - Fast check: is there a device at this index according to Harvester? - Does not open/start acquisition. - """ - if Harvester is None: - return False + def _ensure_ns_for_settings(cls, settings) -> dict: + if not isinstance(settings.properties, dict): + settings.properties = {} + ns = settings.properties.get(cls.OPTIONS_KEY, {}) + if not isinstance(ns, dict): + ns = {} + settings.properties[cls.OPTIONS_KEY] = ns + return ns - harvester = None - try: - harvester, _, _ = cls._build_harvester_for_discovery(strict_single=False) - if harvester is None: - return False - infos = harvester.device_info_list or [] - return 0 <= int(index) < len(infos) - except Exception: - return False - finally: - if harvester is not None: - try: - harvester.reset() - except Exception: - pass + def _ensure_settings_ns(self) -> dict: + return self._ensure_ns_for_settings(self.settings) - def read(self) -> tuple[np.ndarray, float]: - if self._acquirer is None: - raise RuntimeError("GenTL image acquirer not initialised") + # ------------------------------------------------------------------ + # Existing compatibility helpers + # ------------------------------------------------------------------ - try: - with self._acquirer.fetch(timeout=self._timeout) as buffer: - component = buffer.payload.components[0] - channels = 3 if self._pixel_format in {"RGB8", "BGR8"} else 1 - array = np.asarray(component.data) - expected = component.height * component.width * channels - if array.size != expected: - array = np.frombuffer(bytes(component.data), dtype=array.dtype) - try: - if channels > 1: - frame = array.reshape(component.height, component.width, channels).copy() - else: - frame = array.reshape(component.height, component.width).copy() - except ValueError: - frame = array.copy() - except HarvesterTimeoutError as exc: - raise TimeoutError(str(exc) + " (GenTL timeout)") from exc + def _call_with_optional_lock(self, func, *args, **kwargs): + if self._shared_entry is not None: + with self._shared_entry.lock: + return func(*args, **kwargs) + return func(*args, **kwargs) - frame = self._convert_frame(frame) - timestamp = time.time() + def _create_image_acquirer(self, selected_serial: str | None, selected_index: int): + if self._harvester is None: + raise RuntimeError("Harvester is not initialized") + try: + if selected_serial: + return self._harvester.create({"serial_number": str(selected_serial)}) + return self._harvester.create(int(selected_index)) + except TypeError: + if selected_serial: + return self._harvester.create({"serial_number": str(selected_serial)}) + return self._harvester.create(index=int(selected_index)) - if self._actual_width is None or self._actual_height is None: - h, w = frame.shape[:2] - self._actual_width = int(w) - self._actual_height = int(h) + def _available_serials(self) -> list[str]: + assert self._harvester is not None + return [ + str(s).strip() + for s in (self._info_get(i, "serial_number", "") for i in self._harvester.device_info_list) + if s + ] - if self._actual_exposure is None: - try: - self._actual_exposure = float(self._acquirer.node_map.ExposureTime.value) - except Exception: - self._actual_exposure = None + def _create_acquirer(self, serial: str | None, index: int): + """Compatibility wrapper for older code/tests.""" + return self._create_image_acquirer(serial, index) - if self._actual_gain is None: - try: - self._actual_gain = float(self._acquirer.node_map.Gain.value) - except Exception: - self._actual_gain = None + # ------------------------------------------------------------------ + # Camera configuration helpers + # ------------------------------------------------------------------ - return frame, timestamp + def _configure_pixel_format(self, node_map) -> None: + try: + pixel_format_node = getattr(node_map, "PixelFormat", None) + if pixel_format_node is None: + return - def stop(self) -> None: - if self._acquirer is not None: - try: - self._acquirer.stop() - except Exception: - pass + available = list(getattr(pixel_format_node, "symbolics", []) or []) + if not available: + return - @staticmethod - def _reset_select_harvester(harvester) -> None: - if harvester is not None: - try: - harvester.reset() - except Exception: - pass + requested = str(self._pixel_format or "auto").strip() - def _reset_harvester(self) -> None: - try: - self._reset_select_harvester(self._harvester) - finally: - self._harvester = None + if requested.lower() == "auto": + selected = None - def close(self) -> None: - if self._acquirer is not None: - try: - self._acquirer.stop() - except Exception: - pass - try: - destroy = getattr(self._acquirer, "destroy", None) - if destroy is not None: - destroy() - finally: - self._acquirer = None + for fmt in self._COLOR_PIXEL_FORMATS: + if fmt in available: + selected = fmt + break - if self._harvester is not None: - try: - self._harvester.reset() - finally: - self._harvester = None + if selected is None: + for fmt in self._MONO_PIXEL_FORMATS: + if fmt in available: + selected = fmt + break - self._device_label = None + if selected is None: + selected = available[0] - # ------------------------------------------------------------------ - # Helpers - # ------------------------------------------------------------------ + else: + selected = requested + if selected not in available: + LOG.warning( + "Pixel format '%s' not available. Available formats: %s. Falling back to auto.", + selected, + available, + ) + selected = None + for fmt in self._COLOR_PIXEL_FORMATS + self._MONO_PIXEL_FORMATS: + if fmt in available: + selected = fmt + break + if selected is None: + selected = available[0] - def _parse_crop(self, crop) -> tuple[int, int, int, int] | None: - if isinstance(crop, (list, tuple)) and len(crop) == 4: - return tuple(int(v) for v in crop) - return None + pixel_format_node.value = selected + self._pixel_format = str(pixel_format_node.value) - def _get_requested_resolution_or_none(self) -> tuple[int, int] | None: - """ - Return (w, h) if user explicitly requested a resolution. - Return None to keep device defaults. - """ - props = self.settings.properties if isinstance(self.settings.properties, dict) else {} + LOG.debug("GenTL pixel format selected: %s", self._pixel_format) - legacy = props.get("resolution") - if isinstance(legacy, (list, tuple)) and len(legacy) == 2: - try: - w, h = int(legacy[0]), int(legacy[1]) - if w > 0 and h > 0: - return (w, h) - except Exception: - pass + except Exception as e: + LOG.warning("Failed to configure pixel format '%s': %s", self._pixel_format, e) + def _configure_trigger(self, node_map) -> None: try: - w = int(getattr(self.settings, "width", 0) or 0) - h = int(getattr(self.settings, "height", 0) or 0) - if w > 0 and h > 0: - return (w, h) - except Exception: - pass - - return None + trigger_mode = getattr(node_map, "TriggerMode", None) + if trigger_mode is not None and "Off" in getattr(trigger_mode, "symbolics", []): + trigger_mode.value = "Off" + except Exception as e: + LOG.warning("Failed to disable trigger mode: %s", e) def _configure_resolution(self, node_map) -> None: - """ - Configure camera resolution only if explicitly requested. - If None, keep device defaults. - """ - req = self._requested_resolution - if req is None: - LOG.info("Resolution: using device default.") + if self._requested_resolution is None: return - requested_width, requested_height = req + requested_width, requested_height = self._requested_resolution actual_width, actual_height = None, None - # Width try: node = node_map.Width - min_w, max_w = node.min, node.max - inc_w = getattr(node, "inc", 1) - width = self._adjust_to_increment(requested_width, min_w, max_w, inc_w) + width = self._adjust_to_increment(requested_width, node.min, node.max, getattr(node, "inc", 1)) node.value = int(width) - actual_width = node.value + actual_width = int(node.value) except Exception as e: - LOG.warning(f"Failed to set width: {e}") + LOG.warning("Failed to set width: %s", e) - # Height try: node = node_map.Height - min_h, max_h = node.min, node.max - inc_h = getattr(node, "inc", 1) - height = self._adjust_to_increment(requested_height, min_h, max_h, inc_h) + height = self._adjust_to_increment(requested_height, node.min, node.max, getattr(node, "inc", 1)) node.value = int(height) - actual_height = node.value + actual_height = int(node.value) except Exception as e: - LOG.warning(f"Failed to set height: {e}") + LOG.warning("Failed to set height: %s", e) if actual_width is not None and actual_height is not None: - self._actual_width = int(actual_width) - self._actual_height = int(actual_height) + self._actual_width = actual_width + self._actual_height = actual_height if (actual_width, actual_height) != (requested_width, requested_height): LOG.warning( - f"Resolution mismatch: requested {requested_width}x{requested_height}, " - f"got {actual_width}x{actual_height}" - ) - else: - LOG.info(f"Resolution set to {actual_width}x{actual_height}") - - def _available_serials(self) -> list[str]: - assert self._harvester is not None - serials: list[str] = [] - for info in self._harvester.device_info_list: - serial = getattr(info, "serial_number", "") - if serial: - serials.append(serial) - return serials - - def _create_acquirer(self, serial: str | None, index: int): - assert self._harvester is not None - methods = [ - getattr(self._harvester, "create", None), - getattr(self._harvester, "create_image_acquirer", None), - ] - methods = [m for m in methods if m is not None] - errors: list[str] = [] - device_info = None - if not serial: - device_list = self._harvester.device_info_list - if 0 <= index < len(device_list): - device_info = device_list[index] - for create in methods: - try: - if serial: - return create({"serial_number": serial}) - except Exception as exc: - errors.append(f"{create.__name__} serial: {exc}") - for create in methods: - try: - return create(index=index) - except TypeError: - try: - return create(index) - except Exception as exc: - errors.append(f"{create.__name__} index positional: {exc}") - except Exception as exc: - errors.append(f"{create.__name__} index: {exc}") - if device_info is not None: - for create in methods: - try: - return create(device_info) - except Exception as exc: - errors.append(f"{create.__name__} device_info: {exc}") - if not serial and index == 0: - for create in methods: - try: - return create() - except Exception as exc: - errors.append(f"{create.__name__} default: {exc}") - joined = "; ".join(errors) or "no creation methods available" - raise RuntimeError(f"Failed to initialise GenTL image acquirer ({joined})") - - def _configure_pixel_format(self, node_map) -> None: - try: - if self._pixel_format in node_map.PixelFormat.symbolics: - node_map.PixelFormat.value = self._pixel_format - actual = node_map.PixelFormat.value - if actual != self._pixel_format: - LOG.warning(f"Pixel format mismatch: requested '{self._pixel_format}', got '{actual}'") - else: - LOG.info(f"Pixel format set to '{actual}'") - else: - LOG.warning( - f"Pixel format '{self._pixel_format}' not in available formats: {node_map.PixelFormat.symbolics}" + "Resolution mismatch: requested %sx%s, got %sx%s", + requested_width, + requested_height, + actual_width, + actual_height, ) - except Exception as e: - LOG.warning(f"Failed to set pixel format '{self._pixel_format}': {e}") def _configure_exposure(self, node_map) -> None: if self._exposure is None: return - # Try to disable auto exposure first - for attr in ("ExposureAuto",): - try: - node = getattr(node_map, attr) - node.value = "Off" - LOG.info("Auto exposure disabled") - break - except AttributeError: - continue - except Exception as e: - LOG.warning(f"Failed to disable auto exposure: {e}") + try: + node_map.ExposureAuto.value = "Off" + except Exception: + pass - # Set exposure value for attr in ("ExposureTime", "Exposure"): try: node = getattr(node_map, attr) - except AttributeError: - continue - try: node.value = float(self._exposure) - actual = node.value - if abs(actual - self._exposure) > 1.0: # Allow 1μs tolerance - LOG.warning(f"Exposure mismatch: requested {self._exposure}μs, got {actual}μs") - else: - LOG.info(f"Exposure set to {actual}μs") return - except Exception as e: - LOG.warning(f"Failed to set exposure via {attr}: {e}") + except AttributeError: continue - - LOG.warning(f"Could not set exposure to {self._exposure}μs (no compatible attribute found)") + except Exception as e: + LOG.warning("Failed to set exposure via %s: %s", attr, e) + LOG.warning("Could not set exposure to %s µs", self._exposure) def _configure_gain(self, node_map) -> None: if self._gain is None: return - # Try to disable auto gain first - for attr in ("GainAuto",): - try: - node = getattr(node_map, attr) - node.value = "Off" - LOG.info("Auto gain disabled") - break - except AttributeError: - continue - except Exception as e: - LOG.warning(f"Failed to disable auto gain: {e}") - - # Set gain value - for attr in ("Gain",): - try: - node = getattr(node_map, attr) - except AttributeError: - continue - try: - node.value = float(self._gain) - actual = node.value - if abs(actual - self._gain) > 0.1: # Allow 0.1 tolerance - LOG.warning(f"Gain mismatch: requested {self._gain}, got {actual}") - else: - LOG.info(f"Gain set to {actual}") - return - except Exception as e: - LOG.warning(f"Failed to set gain via {attr}: {e}") - continue + try: + node_map.GainAuto.value = "Off" + except Exception: + pass - LOG.warning(f"Could not set gain to {self._gain} (no compatible attribute found)") + try: + node_map.Gain.value = float(self._gain) + except Exception as e: + LOG.warning("Could not set gain to %s: %s", self._gain, e) def _configure_frame_rate(self, node_map) -> None: if not self.settings.fps: return target = float(self.settings.fps) - - # Try to enable frame rate control for attr in ("AcquisitionFrameRateEnable", "AcquisitionFrameRateControlEnable"): try: getattr(node_map, attr).value = True - LOG.info(f"Frame rate control enabled via {attr}") break except Exception: - continue + pass - # Set frame rate value for attr in ("AcquisitionFrameRate", "ResultingFrameRate", "AcquisitionFrameRateAbs"): try: - node = getattr(node_map, attr) + getattr(node_map, attr).value = target + return except AttributeError: continue - try: - node.value = target - actual = node.value - if abs(actual - target) > 0.1: - LOG.warning(f"FPS mismatch: requested {target:.2f}, got {actual:.2f}") - else: - LOG.info(f"Frame rate set to {actual:.2f} FPS") - return except Exception as e: - LOG.warning(f"Failed to set frame rate via {attr}: {e}") - continue + LOG.warning("Failed to set frame rate via %s: %s", attr, e) + LOG.warning("Could not set frame rate to %s FPS", target) + + def _read_telemetry(self, node_map) -> None: + try: + self._actual_width = int(node_map.Width.value) + self._actual_height = int(node_map.Height.value) + except Exception: + pass + + try: + self._actual_fps = float(node_map.ResultingFrameRate.value) + except Exception: + self._actual_fps = None + + try: + self._actual_exposure = float(node_map.ExposureTime.value) + except Exception: + self._actual_exposure = None + + try: + self._actual_gain = float(node_map.Gain.value) + except Exception: + self._actual_gain = None - LOG.warning(f"Could not set frame rate to {target} FPS (no compatible attribute found)") + # ------------------------------------------------------------------ + # Frame conversion / local helpers + # ------------------------------------------------------------------ def _convert_frame(self, frame: np.ndarray) -> np.ndarray: if frame.dtype != np.uint8: @@ -1344,10 +1134,24 @@ def _convert_frame(self, frame: np.ndarray) -> np.ndarray: scale = 255.0 / max_val if max_val > 0.0 else 1.0 frame = np.clip(frame * scale, 0, 255).astype(np.uint8) + fmt = str(self._pixel_format or "").strip() + if frame.ndim == 2: - frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) - elif frame.ndim == 3 and frame.shape[2] == 3 and self._pixel_format == "RGB8": - frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) + if fmt == "BayerRG8": + frame = cv2.cvtColor(frame, cv2.COLOR_BayerRG2BGR) + elif fmt == "BayerGB8": + frame = cv2.cvtColor(frame, cv2.COLOR_BayerGB2BGR) + elif fmt == "BayerGR8": + frame = cv2.cvtColor(frame, cv2.COLOR_BayerGR2BGR) + elif fmt == "BayerBG8": + frame = cv2.cvtColor(frame, cv2.COLOR_BayerBG2BGR) + else: + frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) + + elif frame.ndim == 3 and frame.shape[2] == 3: + if fmt == "RGB8": + frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) + # BGR8 is already OpenCV-native. if self._crop is not None: top, bottom, left, right = (int(v) for v in self._crop) @@ -1355,9 +1159,7 @@ def _convert_frame(self, frame: np.ndarray) -> np.ndarray: left = max(0, left) bottom = bottom if bottom > 0 else frame.shape[0] right = right if right > 0 else frame.shape[1] - bottom = min(frame.shape[0], bottom) - right = min(frame.shape[1], right) - frame = frame[top:bottom, left:right] + frame = frame[top : min(frame.shape[0], bottom), left : min(frame.shape[1], right)] if self._rotate in (90, 180, 270): rotations = { @@ -1370,11 +1172,10 @@ def _convert_frame(self, frame: np.ndarray) -> np.ndarray: return frame.copy() def _resolve_device_label(self, node_map) -> str | None: - candidates = [ + for name_attr, serial_attr in ( ("DeviceModelName", "DeviceSerialNumber"), ("DeviceDisplayName", "DeviceSerialNumber"), - ] - for name_attr, serial_attr in candidates: + ): try: model = getattr(node_map, name_attr).value except AttributeError: @@ -1385,18 +1186,48 @@ def _resolve_device_label(self, node_map) -> str | None: except AttributeError: pass if model: - model_str = str(model) - serial_str = str(serial) if serial else None - return f"{model_str} ({serial_str})" if serial_str else model_str + return f"{model} ({serial})" if serial else str(model) + return None + + def _parse_crop(self, crop) -> tuple[int, int, int, int] | None: + if isinstance(crop, (list, tuple)) and len(crop) == 4: + return tuple(int(v) for v in crop) + return None + + def _get_requested_resolution_or_none(self) -> tuple[int, int] | None: + props = self.settings.properties if isinstance(self.settings.properties, dict) else {} + legacy = props.get("resolution") + if isinstance(legacy, (list, tuple)) and len(legacy) == 2: + try: + w, h = int(legacy[0]), int(legacy[1]) + if w > 0 and h > 0: + return (w, h) + except Exception: + pass + + try: + w = int(getattr(self.settings, "width", 0) or 0) + h = int(getattr(self.settings, "height", 0) or 0) + if w > 0 and h > 0: + return (w, h) + except Exception: + pass return None - def _adjust_to_increment(self, value: int, minimum: int, maximum: int, increment: int) -> int: + @staticmethod + def _adjust_to_increment(value: int, minimum: int, maximum: int, increment: int) -> int: value = max(minimum, min(maximum, int(value))) if increment <= 0: return value - offset = value - minimum - steps = offset // increment - return minimum + steps * increment + return minimum + ((value - minimum) // increment) * increment + + @staticmethod + def _positive_float(value) -> float | None: + try: + number = float(value) + return number if number > 0 else None + except Exception: + return None def device_name(self) -> str: if self._device_label: diff --git a/dlclivegui/cameras/backends/utils/gentl_discovery.py b/dlclivegui/cameras/backends/utils/gentl_discovery.py index ebce734..9d15a82 100644 --- a/dlclivegui/cameras/backends/utils/gentl_discovery.py +++ b/dlclivegui/cameras/backends/utils/gentl_discovery.py @@ -5,7 +5,9 @@ from __future__ import annotations import glob +import logging import os +import threading from collections.abc import Iterable, Sequence from dataclasses import dataclass, field from enum import Enum, auto @@ -18,6 +20,125 @@ class GenTLDiscoveryPolicy(Enum): RAISE_IF_MULTIPLE = auto() # if > N candidates, raise an error to avoid ambiguity (forces explicit config) +try: # pragma: no cover - optional dependency + from harvesters.core import Harvester # type: ignore +except Exception: # pragma: no cover - optional dependency + Harvester = None # type: ignore + +logger = logging.getLogger(__name__) + + +class SharedHarvesterEntry: + """ + A shared Harvester instance keyed by a canonical tuple of CTI files. + """ + + def __init__(self, cti_files: list[str]): + if Harvester is None: # pragma: no cover + raise RuntimeError( + "The 'harvesters' package is required for the GenTL backend. Install it via 'pip install harvesters'." + ) + + self.lock = threading.RLock() + self.key = tuple(sorted(_normalize_path(p, casefold_windows=True) for p in cti_files)) + self.refcount = 0 + self.harvester = Harvester() + self.loaded_files: list[str] = [] + self.failed_files: dict[str, str] = {} + + for cti in self.key: + try: + self.harvester.add_file(cti) + self.loaded_files.append(cti) + except Exception as e: + logger.exception(f"Failed to load CTI file: {cti}. Skipping.") + self.failed_files[cti] = str(e) + + if not self.loaded_files: + e = RuntimeError("No GenTL producer (.cti) could be loaded by shared Harvester.") + self._raise_and_reset_harvester(e) + + # Initial device enumeration. + try: + self.harvester.update() + except Exception as e: + self._raise_and_reset_harvester(e) + + def _raise_and_reset_harvester(self, exc: Exception) -> None: + exc.loaded_files = self.loaded_files[:] + exc.failed_files = dict(self.failed_files) + try: + self.harvester.reset() + except Exception: + pass + raise exc + + +class SharedHarvesterPool: + """ + Process-local pool of shared Harvester instances. + + Keyed by the canonicalized CTI file set. + """ + + _lock = threading.RLock() + _entries: dict[tuple[str, ...], SharedHarvesterEntry] = {} + + @classmethod + def acquire(cls, cti_files: list[str]) -> SharedHarvesterEntry: + key = tuple(sorted(_normalize_path(p, casefold_windows=True) for p in cti_files)) + with cls._lock: + entry = cls._entries.get(key) + if entry is None: + entry = SharedHarvesterEntry(list(key)) + cls._entries[key] = entry + entry.refcount += 1 + return entry + + @classmethod + def release(cls, entry: SharedHarvesterEntry | None) -> None: + if entry is None: + return + + with cls._lock: + current = cls._entries.get(entry.key) + if current is None: + # Already released/reset. + return + + current.refcount -= 1 + if current.refcount > 0: + return + + try: + with current.lock: + try: + current.harvester.reset() + except Exception: + pass + finally: + cls._entries.pop(entry.key, None) + + @classmethod + def refresh(cls, entry: SharedHarvesterEntry | None) -> None: + """ + Optional helper when callers want to re-enumerate the device list + on an already-shared Harvester instance. + """ + if entry is None: + return + with entry.lock: + entry.harvester.update() + + @classmethod + def get_refcount(cls, entry: SharedHarvesterEntry | None) -> int: + if entry is None: + return 0 + with cls._lock: + current = cls._entries.get(entry.key) + return int(current.refcount) if current is not None else 0 + + @dataclass class CTIDiscoveryDiagnostics: explicit_files: list[str] = field(default_factory=list) @@ -81,18 +202,18 @@ def _expand_user_and_env(value: str) -> str: return s -def _normalize_path(p: str) -> str: - """ - Normalize a filesystem path in a cross-platform way: - - expands ~ and environment variables - - resolves to absolute where possible (without requiring existence) - """ +def _normalize_path(p: str, *, casefold_windows: bool = False) -> str: expanded = _expand_user_and_env(p) pp = Path(expanded) try: - return str(pp.resolve(strict=False)) + out = str(pp.resolve(strict=False)) except Exception: - return str(pp.absolute()) + out = str(pp.absolute()) + + if casefold_windows: + out = os.path.normcase(out) + + return out def _iter_cti_files_in_dir(directory: str, recursive: bool = False) -> Iterable[str]: diff --git a/dlclivegui/main.py b/dlclivegui/main.py index eb444aa..eace8bd 100644 --- a/dlclivegui/main.py +++ b/dlclivegui/main.py @@ -3,6 +3,7 @@ import argparse import logging +import os import signal import sys @@ -54,6 +55,32 @@ def _sigint_handler(_signum, _frame) -> None: app._sig_timer = sig_timer # Store on app to keep it alive and allow cleanup on exit +def configure_logging(debug: bool = False) -> None: + """Configure local application logging.""" + env_debug = os.environ.get("DLCLIVEGUI_DEBUG_LOGGING", "").strip().lower() in ( + "1", + "true", + "yes", + "on", + "debug", + ) + + enabled = bool(debug or env_debug) + level = logging.DEBUG if enabled else logging.INFO + + logging.basicConfig( + level=level, + format="%(asctime)s.%(msecs)03d %(levelname)-8s [%(threadName)s] %(name)s:%(lineno)d - %(message)s", + datefmt="%H:%M:%S", + force=True, + ) + + logging.getLogger("dlclivegui").setLevel(level) + + if enabled: + logging.debug("Debug logging enabled.") + + def parse_args(argv=None): if argv is None: argv = sys.argv[1:] @@ -77,12 +104,13 @@ def parse_args(argv=None): formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("--no-art", action="store_true", help="Disable ASCII art in help and when launching.") + parser.add_argument("--debug-log", action="store_true", help="Enable debug logging.") return parser.parse_known_args(argv) def main() -> None: args, _unknown = parse_args() - + configure_logging(debug=args.debug_log) logging.info("Starting DeepLabCut-Live GUI...") # If you want a startup banner, PRINT it (not log), and only in TTY contexts. diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index 8db469f..7058d59 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -2,6 +2,7 @@ from __future__ import annotations +import copy import logging import time from dataclasses import dataclass @@ -14,6 +15,7 @@ from dlclivegui.cameras import CameraFactory from dlclivegui.cameras.base import CameraBackend +from dlclivegui.cameras.factory import camera_identity_key # from dlclivegui.config import CameraSettings from dlclivegui.config import CameraSettings @@ -42,7 +44,7 @@ class SingleCameraWorker(QObject): def __init__(self, camera_id: str, settings: CameraSettings): super().__init__() self._camera_id = camera_id - self._settings = settings + self._settings = copy.deepcopy(settings) self._stop_event = Event() self._backend: CameraBackend | None = None self._max_consecutive_errors = 5 @@ -53,7 +55,24 @@ def run(self) -> None: self._stop_event.clear() try: + LOGGER.debug( + "[Worker %s] before create: backend=%s index=%s properties=%s", + self._camera_id, + self._settings.backend, + self._settings.index, + self._settings.properties, + ) + self._backend = CameraFactory.create(self._settings) + + LOGGER.debug( + "[Worker %s] after create: backend=%s index=%s properties=%s", + self._camera_id, + self._backend.settings.backend, + self._backend.settings.index, + self._backend.settings.properties, + ) + self._backend.open() except Exception as exc: LOGGER.exception(f"Failed to initialize camera {self._camera_id}", exc_info=exc) @@ -103,8 +122,20 @@ def stop(self) -> None: def get_camera_id(settings: CameraSettings) -> str: - """Generate a unique camera ID from settings.""" - return f"{settings.backend}:{settings.index}" + """Generate a unique camera ID from stable backend identity.""" + backend = (settings.backend or "").lower() + props = settings.properties if isinstance(settings.properties, dict) else {} + ns = props.get(backend, {}) if isinstance(props.get(backend), dict) else {} + + device_id = ns.get("device_id") + if device_id: + return f"{backend}:{device_id}" + + serial = ns.get("serial_number") or ns.get("device_serial_number") or ns.get("serial") + if serial: + return f"{backend}:serial:{serial}" + + return f"{backend}:index:{int(settings.index)}" class MultiCameraController(QObject): @@ -153,6 +184,32 @@ def start(self, camera_settings: list[CameraSettings]) -> None: LOGGER.warning("No active cameras to start") return + # Check for dupes + seen = {} + for s in active_settings: + camera_id = get_camera_id(s) + try: + key = camera_identity_key(s) + except Exception: + LOGGER.exception( + "Failed to compute camera identity key for %s; falling back to camera_id", + camera_id, + ) + key = camera_id + + if key in seen: + self.initialization_failed.emit( + [ + ( + camera_id, + f"Duplicate camera configuration. Conflicts with {seen[key]}", + ) + ] + ) + return + + seen[key] = camera_id + self._running = True self._frames.clear() self._timestamps.clear() @@ -165,13 +222,16 @@ def start(self, camera_settings: list[CameraSettings]) -> None: def _start_camera(self, settings: CameraSettings) -> None: """Start a single camera.""" - cam_id = get_camera_id(settings) + settings_copy = copy.deepcopy(settings) + cam_id = get_camera_id(settings_copy) if cam_id in self._workers: LOGGER.warning(f"Camera {cam_id} already has a worker") return + LOGGER.info(f"[MultiCameraController] Starting {cam_id} with settings: {settings_copy}") + # Normalize and store the dataclass once - self._settings[cam_id] = settings + self._settings[cam_id] = settings_copy dc = self._settings[cam_id] worker = SingleCameraWorker(cam_id, dc) thread = QThread() diff --git a/tests/cameras/backends/conftest.py b/tests/cameras/backends/conftest.py index 2d965a2..9459c35 100644 --- a/tests/cameras/backends/conftest.py +++ b/tests/cameras/backends/conftest.py @@ -4,6 +4,7 @@ import importlib import logging import os +import threading from dataclasses import dataclass from typing import Any @@ -641,6 +642,122 @@ def __exit__(self, exc_type, exc, tb): return False +class FakeSharedHarvesterPoolAcquireError(RuntimeError): + """Raised by the fake shared pool when no CTI can be loaded.""" + + def __init__(self, message: str, *, loaded_files=None, failed_files=None): + super().__init__(message) + self.loaded_files = list(loaded_files or []) + self.failed_files = list(failed_files or []) + + +class FakeSharedEntry: + def __init__(self, harvester, loaded_files, failed_files=None): + self.harvester = harvester + self.loaded_files = list(loaded_files or []) + self.failed_files = list(failed_files or []) + self.lock = threading.RLock() + + +class FakeSharedHarvesterPool: + """ + Test double for cti_finder.SharedHarvesterPool. + + Important behavior: + - Reuses one Harvester per normalized CTI set. + - Calls update() only when creating the shared Harvester. + - Does not call update() when reusing an existing shared Harvester. + - Tracks loaded_files/failed_files so backend diagnostics can be tested. + """ + + _entries: dict[tuple[str, ...], FakeSharedEntry] = {} + _refcounts: dict[tuple[str, ...], int] = {} + _harvester_factory = None + + @classmethod + def configure(cls, harvester_factory): + cls.reset() + cls._harvester_factory = harvester_factory + + @staticmethod + def _key(cti_files) -> tuple[str, ...]: + # Stable across case/path spelling on Windows while preserving loaded_files separately. + return tuple(sorted(os.path.normcase(os.path.abspath(str(p))) for p in cti_files)) + + @classmethod + def acquire(cls, cti_files): + key = cls._key(cti_files) + + if key in cls._entries: + cls._refcounts[key] += 1 + return cls._entries[key] + + if cls._harvester_factory is None: + raise RuntimeError("FakeSharedHarvesterPool is not configured") + + h = cls._harvester_factory() + + loaded: list[str] = [] + failed: list[tuple[str, str]] = [] + + for cti in cti_files: + cti_str = str(cti) + try: + h.add_file(cti_str) + loaded.append(cti_str) + except Exception as exc: + failed.append((cti_str, str(exc))) + + if not loaded: + try: + h.reset() + except Exception: + pass + raise FakeSharedHarvesterPoolAcquireError( + "No fake CTIs could be loaded", + loaded_files=[], + failed_files=failed, + ) + + h.update() + + entry = FakeSharedEntry(h, loaded_files=loaded, failed_files=failed) + cls._entries[key] = entry + cls._refcounts[key] = 1 + return entry + + @classmethod + def release(cls, entry): + for key, value in list(cls._entries.items()): + if value is entry: + cls._refcounts[key] -= 1 + if cls._refcounts[key] <= 0: + try: + entry.harvester.reset() + except Exception: + pass + del cls._entries[key] + del cls._refcounts[key] + return + + @classmethod + def get_refcount(cls, entry): + for key, value in cls._entries.items(): + if value is entry: + return cls._refcounts[key] + return 0 + + @classmethod + def reset(cls): + for entry in list(cls._entries.values()): + try: + entry.harvester.reset() + except Exception: + pass + cls._entries.clear() + cls._refcounts.clear() + + @dataclass class FakeImageAcquirer: """ @@ -691,6 +808,7 @@ def _enqueue_default_frame(self): def start(self): self.start_calls += 1 self._started = True + self._queue.clear() def stop(self): self.stop_calls += 1 @@ -707,10 +825,28 @@ def fetch(self, timeout: float = 2.0): if not self._started: raise FakeGenTLTimeoutException("fetch called while not started") - if not self._queue: - raise FakeGenTLTimeoutException(f"timeout after {timeout}s") + if self._queue: + payload = self._queue.pop(0) + else: + # Generate from the current node map, because backend may have changed + # PixelFormat/Width/Height during open(). + pf = str(self.node_map.PixelFormat.value or "Mono8") + if pf in ("RGB8", "BGR8"): + channels, dtype = 3, np.uint8 + elif pf in ("Mono16", "Mono12", "Mono10"): + channels, dtype = 1, np.uint16 + else: + # Mono8 and Bayer*8 are single-channel uint8 + channels, dtype = 1, np.uint8 + + comp = _FakeComponent( + int(self.node_map.Width.value), + int(self.node_map.Height.value), + channels, + dtype=dtype, + ) + payload = _FakePayload(comp) - payload = self._queue.pop(0) return _FakeFetchedBufferCtx(payload) @@ -854,18 +990,37 @@ def gentl_fail_add_file_for(): def patch_gentl_sdk(monkeypatch, fake_harvester_factory, gentl_fail_add_file_for, tmp_path): """ Patch dlclivegui.cameras.backends.gentl_backend to use FakeHarvester + Fake timeout. - Ensure CTI discovery succeeds for classmethods by creating a real dummy .cti and - exposing it via GENICAM_GENTL64_PATH. + + Important: + The production backend now uses cti_finder.SharedHarvesterPool.acquire() + during open(), so tests must patch that pool too. """ import dlclivegui.cameras.backends.gentl_backend as gb - # Patch Harvester symbol (the backend calls Harvester() directly) + # Reset and expose test counters/state. + gb.update_count = 0 + gb.fail_add_file_for = gentl_fail_add_file_for + + # Patch Harvester symbol for discovery/rebind paths. monkeypatch.setattr(gb, "Harvester", lambda: fake_harvester_factory(), raising=False) - # Keep timeout contract + # Count all fake update() calls. + original_update = FakeHarvester.update + + def update_with_count(self): + gb.update_count += 1 + return original_update(self) + + monkeypatch.setattr(FakeHarvester, "update", update_with_count, raising=True) + + # Keep timeout contract. monkeypatch.setattr(gb, "HarvesterTimeoutError", FakeGenTLTimeoutException, raising=False) - # Create a real CTI file and advertise it via env var + # Patch the shared pool used by open(). + FakeSharedHarvesterPool.configure(fake_harvester_factory) + monkeypatch.setattr(gb.cti_finder, "SharedHarvesterPool", FakeSharedHarvesterPool, raising=False) + + # Create a real CTI file and advertise it via env var. cti_file = tmp_path / "dummy.cti" if not cti_file.exists(): cti_file.write_text("fake", encoding="utf-8") @@ -873,10 +1028,11 @@ def patch_gentl_sdk(monkeypatch, fake_harvester_factory, gentl_fail_add_file_for monkeypatch.setenv("GENICAM_GENTL64_PATH", str(tmp_path)) monkeypatch.delenv("GENICAM_GENTL32_PATH", raising=False) - # OPTIONAL: expose failure control so tests can do gb.fail_add_file_for.add(...) - gb.fail_add_file_for = gentl_fail_add_file_for - - return gb + try: + yield gb + finally: + FakeSharedHarvesterPool.reset() + gb.fail_add_file_for = set() @pytest.fixture() diff --git a/tests/cameras/backends/test_gentl_backend.py b/tests/cameras/backends/test_gentl_backend.py index ecef1b7..3ffdab2 100644 --- a/tests/cameras/backends/test_gentl_backend.py +++ b/tests/cameras/backends/test_gentl_backend.py @@ -318,11 +318,12 @@ def test_quick_ping_true_for_existing_false_for_missing(patch_gentl_sdk, gentl_i assert gb.GenTLCameraBackend.quick_ping(1) is False -def test_rebind_settings_updates_index_using_device_id_with_attribute_entries( +def test_rebind_settings_serial_device_id_persists_serial_without_enumeration( patch_gentl_sdk, gentl_settings_factory, gentl_inventory ): """ - rebind_settings has some getattr(...) usage; feed attribute-like entries to match that path. + Serial GenTL IDs are handled directly by open(), so rebind_settings() + intentionally avoids Harvester enumeration and leaves index unchanged. """ gb = patch_gentl_sdk @@ -338,9 +339,12 @@ def test_rebind_settings_updates_index_using_device_id_with_attribute_entries( settings = gentl_settings_factory(index=0, properties={"gentl": {"device_id": "serial:SER1"}}) out = gb.GenTLCameraBackend.rebind_settings(settings) - assert int(out.index) == 1 + # New behavior: no enumeration/rebind for serial IDs. + assert int(out.index) == 0 + ns = out.properties.get("gentl", {}) assert ns.get("device_id") == "serial:SER1" + assert ns.get("serial_number") == "SER1" def test_rebind_settings_no_device_id_no_change(patch_gentl_sdk, gentl_settings_factory, gentl_inventory): @@ -470,102 +474,6 @@ def create(self, *args, **kwargs): assert acq == "ACQ_POS_INDEX" -def test__create_acquirer_falls_back_to_create_image_acquirer_when_create_fails( - patch_gentl_sdk, gentl_settings_factory -): - gb = patch_gentl_sdk - - settings = gentl_settings_factory() - be = gb.GenTLCameraBackend(settings) - - class H: - device_info_list = [{"serial_number": "SER0"}] - - def create(self, *args, **kwargs): - raise RuntimeError("create fails") - - def create_image_acquirer(self, selector=None, index=None): - # Succeeds here - if isinstance(selector, dict) and selector.get("serial_number") == "SERX": - return "ACQ_CIA_SERIAL" - if index == 1: - return "ACQ_CIA_INDEX" - return "ACQ_CIA_OTHER" - - be._harvester = H() - acq = be._create_acquirer("SERX", 1) - assert acq == "ACQ_CIA_SERIAL" - - -def test__create_acquirer_uses_device_info_fallback_when_available(patch_gentl_sdk, gentl_settings_factory): - gb = patch_gentl_sdk - - settings = gentl_settings_factory() - be = gb.GenTLCameraBackend(settings) - - device_info_obj = {"serial_number": "SER0", "id_": "ID0"} - - class H: - device_info_list = [device_info_obj] - - def create(self, *args, **kwargs): - # Fail index, succeed if given device_info object - if "index" in kwargs or (len(args) == 1 and isinstance(args[0], int)): - raise RuntimeError("index path fails") - if len(args) == 1 and args[0] is device_info_obj: - return "ACQ_DEVICE_INFO" - raise RuntimeError("unexpected call") - - be._harvester = H() - acq = be._create_acquirer(None, 0) - assert acq == "ACQ_DEVICE_INFO" - - -def test__create_acquirer_tries_default_create_when_index0_and_no_serial(patch_gentl_sdk, gentl_settings_factory): - gb = patch_gentl_sdk - - settings = gentl_settings_factory() - be = gb.GenTLCameraBackend(settings) - - class H: - device_info_list = [{"serial_number": "SER0"}] - - def create(self, *args, **kwargs): - # Fail index attempts; succeed only on no-arg create() - if args or kwargs: - raise RuntimeError("only no-arg create works") - return "ACQ_DEFAULT" - - be._harvester = H() - acq = be._create_acquirer(None, 0) - assert acq == "ACQ_DEFAULT" - - -def test__create_acquirer_raises_runtimeerror_with_joined_errors(patch_gentl_sdk, gentl_settings_factory): - gb = patch_gentl_sdk - - settings = gentl_settings_factory() - be = gb.GenTLCameraBackend(settings) - - class H: - device_info_list = [{"serial_number": "SER0"}] - - def create(self, *args, **kwargs): - raise RuntimeError("create boom") - - def create_image_acquirer(self, *args, **kwargs): - raise RuntimeError("cia boom") - - be._harvester = H() - - with pytest.raises(RuntimeError) as ei: - be._create_acquirer("SERX", 0) - - # Error message should include some context about attempted creation methods - msg = str(ei.value).lower() - assert "failed to initialise gentl image acquirer" in msg - - # ---------------------------------- # CTI discovery and selection logic # ---------------------------------- @@ -767,3 +675,128 @@ def test_open_persists_cti_load_diagnostics_complete_failure(patch_gentl_sdk, ge assert sorted(d["cti"] for d in failed) == sorted([str(b1), str(b2)]) for d in failed: assert isinstance(d.get("error"), str) and d["error"] + + +def test_two_gentl_backends_share_same_harvester(patch_gentl_sdk, gentl_settings_factory, gentl_inventory): + gb = patch_gentl_sdk + + gentl_inventory[:] = [ + {"display_name": "Dev0", "serial_number": "SER0"}, + {"display_name": "Dev1", "serial_number": "SER1"}, + ] + + s0 = gentl_settings_factory(index=0, properties={"gentl": {"device_id": "serial:SER0"}}) + s1 = gentl_settings_factory(index=1, properties={"gentl": {"device_id": "serial:SER1"}}) + + b0 = gb.GenTLCameraBackend(s0) + b1 = gb.GenTLCameraBackend(s1) + + b0.open() + b1.open() + + assert b0._harvester is b1._harvester + assert b0._shared_entry is b1._shared_entry + + b1.close() + b0.close() + + +def test_second_open_reuses_shared_harvester_without_update(patch_gentl_sdk, gentl_settings_factory, gentl_inventory): + gb = patch_gentl_sdk + + gentl_inventory[:] = [ + {"display_name": "Dev0", "serial_number": "SER0"}, + {"display_name": "Dev1", "serial_number": "SER1"}, + ] + + s0 = gentl_settings_factory(index=0, properties={"gentl": {"device_id": "serial:SER0"}}) + s1 = gentl_settings_factory(index=1, properties={"gentl": {"device_id": "serial:SER1"}}) + + b0 = gb.GenTLCameraBackend(s0) + b1 = gb.GenTLCameraBackend(s1) + + b0.open() + update_count_after_first = gb.update_count + + b1.open() + + assert gb.update_count == update_count_after_first + + b1.close() + b0.close() + + +def test_open_failure_releases_shared_harvester(patch_gentl_sdk, gentl_settings_factory, gentl_inventory): + gb = patch_gentl_sdk + + gentl_inventory[:] = [{"display_name": "Dev0", "serial_number": "SER0"}] + + settings = gentl_settings_factory(properties={"gentl": {"device_id": "serial:DOES_NOT_EXIST"}}) + be = gb.GenTLCameraBackend(settings) + + with pytest.raises(RuntimeError): + be.open() + + assert be._harvester is None + assert be._shared_entry is None + assert be._acquirer is None + + +def test__create_acquirer_serial_create_runtimeerror_propagates(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + be = gb.GenTLCameraBackend(gentl_settings_factory()) + + class H: + device_info_list = [{"serial_number": "SER0"}] + + def create(self, *args, **kwargs): + raise RuntimeError("create fails") + + def create_image_acquirer(self, *args, **kwargs): + return "SHOULD_NOT_BE_USED" + + be._harvester = H() + + with pytest.raises(RuntimeError, match="create fails"): + be._create_acquirer("SERX", 1) + + +def test__create_acquirer_index_runtimeerror_propagates(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + be = gb.GenTLCameraBackend(gentl_settings_factory()) + + class H: + device_info_list = [{"serial_number": "SER0"}] + + def create(self, *args, **kwargs): + if len(args) == 1 and args[0] == 0: + raise RuntimeError("index path fails") + return "UNEXPECTED" + + be._harvester = H() + + with pytest.raises(RuntimeError, match="index path fails"): + be._create_acquirer(None, 0) + + +def test__create_acquirer_positional_typeerror_falls_back_to_index_keyword(patch_gentl_sdk, gentl_settings_factory): + gb = patch_gentl_sdk + + be = gb.GenTLCameraBackend(gentl_settings_factory()) + + class H: + device_info_list = [{"serial_number": "SER0"}] + + def create(self, *args, **kwargs): + if args: + raise TypeError("positional index not supported") + if kwargs.get("index") == 2: + return "ACQ_KW_INDEX" + raise RuntimeError("unexpected call") + + be._harvester = H() + + acq = be._create_acquirer(None, 2) + assert acq == "ACQ_KW_INDEX"