Source code for pybragerone.cli

"""Diagnostic CLI for BragerOne (REST + WS)."""

from __future__ import annotations

import argparse
import asyncio
import contextlib
import json
import logging
import os
import re
import signal
import threading
import time
from collections import deque
from collections.abc import Iterable, Mapping
from dataclasses import dataclass
from typing import Any, cast

import httpx

from .api import BragerOneApiClient, Platform, server_for
from .api.client import ApiError
from .gateway import BragerOneGateway
from .models import CLITokenStore, ParamResolver, ParamStore
from .utils import bg_tasks, spawn

log = logging.getLogger(__name__)
CHAN_RE = re.compile(r"^([a-z])(\d+)$")


@dataclass(slots=True)
class _WatchItem:
    symbol: str
    label: str
    unit: str | dict[str, str] | None
    address: str | None
    value: Any
    value_label: str | None
    kind: str


def _format_event_key_from_parts(pool_raw: Any, chan: Any, idx: Any) -> str:
    pool = str(pool_raw or "")
    if pool.isdigit() or (pool and not pool.startswith("P")):
        pool = f"P{pool}"
    return f"{pool}.{chan}{idx}"


def _format_event_key(upd: Any) -> str:
    return _format_event_key_from_parts(getattr(upd, "pool", None), getattr(upd, "chan", None), getattr(upd, "idx", None))


def _parse_cli_value(raw: str) -> Any:
    text = raw.strip()
    if not text:
        return ""

    lowered = text.lower()
    if lowered == "true":
        return True
    if lowered == "false":
        return False
    if lowered in {"null", "none"}:
        return None

    try:
        return int(text)
    except ValueError:
        pass

    try:
        return float(text)
    except ValueError:
        pass

    if (text.startswith("{") and text.endswith("}")) or (text.startswith("[") and text.endswith("]")):
        with contextlib.suppress(Exception):
            return json.loads(text)

    return text


def _invert_value(current: Any) -> Any:
    if isinstance(current, bool):
        return not current
    if isinstance(current, (int, float)):
        return 0 if int(current) != 0 else 1
    if isinstance(current, str):
        val = current.strip().lower()
        if val in {"on", "true", "1"}:
            return "OFF"
        if val in {"off", "false", "0"}:
            return "ON"
    return 1


def _normalize_expected(value: Any) -> Any:
    if isinstance(value, str) and value.strip() == "void 0":
        return None
    return value


def _read_target_actual(target: Mapping[str, Any], flat_values: Mapping[str, Any]) -> Any:
    address = target.get("address")
    if not isinstance(address, str) or not address:
        return None
    raw_value = flat_values.get(address)

    bit = target.get("bit")
    if isinstance(bit, int) and isinstance(raw_value, int):
        return 1 if ((raw_value >> bit) & 1) else 0

    mask = target.get("mask")
    if isinstance(mask, int) and isinstance(raw_value, int):
        return raw_value & mask

    return raw_value


def _compare_condition(*, operation: str, actual: Any, expected: Any) -> bool:
    expected_norm = _normalize_expected(expected)

    if operation == "equalTo":
        return bool(actual == expected_norm)
    if operation == "notEqualTo":
        return bool(actual != expected_norm)
    if operation == "greaterThan":
        return bool(actual is not None and expected_norm is not None and actual > expected_norm)
    if operation == "greaterThanOrEqualTo":
        return bool(actual is not None and expected_norm is not None and actual >= expected_norm)
    if operation == "lessThan":
        return bool(actual is not None and expected_norm is not None and actual < expected_norm)
    if operation == "lessThanOrEqualTo":
        return bool(actual is not None and expected_norm is not None and actual <= expected_norm)
    return False


def _command_rule_matches(rule: Mapping[str, Any], flat_values: Mapping[str, Any]) -> bool:
    conditions = rule.get("conditions")
    if not isinstance(conditions, list) or not conditions:
        return True

    for cond in conditions:
        if not isinstance(cond, Mapping):
            return False

        operation = cond.get("operation")
        expected = cond.get("expected")
        targets = cond.get("targets")
        if not isinstance(operation, str) or not isinstance(targets, list) or not targets:
            return False

        validated_targets: list[Mapping[str, Any]] = []
        for target in targets:
            if not isinstance(target, Mapping):
                continue
            if all(isinstance(key, str) for key in target):
                validated_targets.append(cast(Mapping[str, Any], target))

        if not validated_targets:
            return False

        if not all(
            _compare_condition(
                operation=operation,
                actual=_read_target_actual(target, flat_values),
                expected=expected,
            )
            for target in validated_targets
        ):
            return False

    return True


def _select_command_rule(desc: Mapping[str, Any], flat_values: Mapping[str, Any]) -> Mapping[str, Any] | None:
    mapping = desc.get("mapping")
    if not isinstance(mapping, Mapping):
        return None
    command_rules = mapping.get("command_rules")
    if not isinstance(command_rules, list):
        return None

    for rule in command_rules:
        if isinstance(rule, Mapping) and _command_rule_matches(rule, flat_values):
            normalized_rule: dict[str, Any] = {}
            for key, value in rule.items():
                normalized_rule[str(key)] = value
            return normalized_rule
    return None


def _toggle_value_for_symbol(*, desc: Mapping[str, Any], store: ParamStore) -> Any:
    pool = desc.get("pool")
    chan = desc.get("chan")
    idx = desc.get("idx")
    has_direct_address = isinstance(pool, str) and isinstance(chan, str) and isinstance(idx, int)

    mapping = desc.get("mapping")
    command_rules: list[Any] | None = None
    if isinstance(mapping, Mapping):
        raw_rules = mapping.get("command_rules")
        if isinstance(raw_rules, list) and raw_rules:
            command_rules = raw_rules

    current_value = desc.get("computed_value")
    if current_value is None:
        current_value = desc.get("value")

    if not has_direct_address and command_rules:
        flat_values = store.flatten()
        active_rule = _select_command_rule(desc, flat_values)
        if isinstance(active_rule, Mapping):
            active_command = active_rule.get("command")
            active_value = active_rule.get("value")
            if isinstance(active_command, str):
                for candidate in command_rules:
                    if not isinstance(candidate, Mapping) or candidate is active_rule:
                        continue
                    if candidate.get("command") != active_command:
                        continue
                    candidate_value = candidate.get("value")
                    if candidate_value != active_value:
                        return candidate_value
        return None

    return _invert_value(current_value)


async def _store_ingest_loop(gw: BragerOneGateway, store: ParamStore) -> None:
    async for upd in gw.bus.subscribe():
        if getattr(upd, "value", None) is None:
            continue
        await store.upsert_async(_format_event_key(upd), upd.value)


def _mapping_parameter_name(desc: Mapping[str, Any]) -> str | None:
    mapping = desc.get("mapping")
    if not isinstance(mapping, Mapping):
        return None
    raw = mapping.get("raw")
    if not isinstance(raw, Mapping):
        return None
    name = raw.get("name")
    if isinstance(name, str) and name.strip():
        return name.strip()
    return None


def _as_float(value: Any) -> float | None:
    if isinstance(value, bool):
        return None
    if isinstance(value, (int, float)):
        return float(value)
    if isinstance(value, str):
        text = value.strip().replace(",", ".")
        if not text:
            return None
        with contextlib.suppress(ValueError):
            return float(text)
    return None


async def _symbol_numeric_transform_expr(resolver: ParamResolver, desc: Mapping[str, Any]) -> Any:
    unit_code = desc.get("unit_code")
    if unit_code is not None:
        unit_meta = await resolver._resolve_unit_meta(raw_unit_code=unit_code)
        if isinstance(unit_meta, Mapping) and "value" in unit_meta:
            return unit_meta.get("value")

    mapping = desc.get("mapping")
    if isinstance(mapping, Mapping):
        unit_source = mapping.get("units_source")
        if isinstance(unit_source, Mapping) and "value" in unit_source:
            return unit_source.get("value")

    return None


async def _prepare_symbol_write_value(
    *,
    resolver: ParamResolver,
    desc: Mapping[str, Any],
    requested_value: Any,
) -> tuple[bool, Any, str | None]:
    min_raw = desc.get("min")
    max_raw = desc.get("max")
    min_raw_num = _as_float(min_raw)
    max_raw_num = _as_float(max_raw)

    transform_expr = await _symbol_numeric_transform_expr(resolver, desc)
    transform = ParamResolver._parse_numeric_transform(transform_expr)

    prepared_value: Any = requested_value
    requested_num = _as_float(requested_value)

    if requested_num is not None and transform is not None and transform.factor != 0.0:
        raw_num = (requested_num / transform.factor) - transform.shift
        prepared_value = round(raw_num) if abs(raw_num - round(raw_num)) < 1e-9 else raw_num

    prepared_num = _as_float(prepared_value)
    if prepared_num is not None:
        out_of_bounds = (min_raw_num is not None and prepared_num < min_raw_num) or (
            max_raw_num is not None and prepared_num > max_raw_num
        )
        if out_of_bounds:
            display_min = ParamResolver._apply_numeric_transform(min_raw_num, transform_expr) if min_raw_num is not None else None
            display_max = ParamResolver._apply_numeric_transform(max_raw_num, transform_expr) if max_raw_num is not None else None
            return (
                False,
                prepared_value,
                "value out of range: "
                f"raw[{min_raw}..{max_raw}] display[{display_min}..{display_max}] requested={requested_value}",
            )

    return True, prepared_value, None


async def _execute_symbol_write(
    *,
    api: BragerOneApiClient,
    resolver: ParamResolver,
    store: ParamStore,
    devid: str,
    symbol: str,
    value: Any,
) -> tuple[bool, str]:
    desc = await resolver.describe_symbol(symbol)
    value_ok, prepared_value, err = await _prepare_symbol_write_value(
        resolver=resolver,
        desc=desc,
        requested_value=value,
    )
    if not value_ok:
        return False, f"{symbol}: {err}"

    pool = desc.get("pool")
    chan = desc.get("chan")
    idx = desc.get("idx")
    if isinstance(pool, str) and isinstance(chan, str) and isinstance(idx, int):
        parameter = f"{chan}{idx}"
        result = await api.module_command(
            devid=devid,
            pool=pool,
            parameter=parameter,
            value=prepared_value,
            parameter_name=_mapping_parameter_name(desc),
            return_data=True,
        )
        status, data = cast(tuple[int, Any], result)
        return (
            status in (200, 201, 202, 204),
            f"{symbol} -> command {pool}.{parameter}={prepared_value} (input={value}) status={status} data={data}",
        )

    flat_values = store.flatten()
    rule = _select_command_rule(desc, flat_values)
    if isinstance(rule, Mapping):
        command = rule.get("command")
        if isinstance(command, str) and command.strip():
            send_value = prepared_value if prepared_value is not None else rule.get("value")
            result = await api.module_command_raw(devid=devid, command=command.strip(), value=send_value, return_data=True)
            status, data = cast(tuple[int, Any], result)
            return (
                status in (200, 201, 202, 204),
                f"{symbol} -> raw {command} value={send_value} (input={value}) status={status} data={data}",
            )

    return False, f"{symbol}: unable to determine command route (no address and no command rule match)"


async def _run_send_only_actions(
    *,
    api: BragerOneApiClient,
    gw: BragerOneGateway,
    store: ParamStore,
    resolver: ParamResolver,
    module_ids: list[str],
    set_values: list[str],
    toggles: list[str],
) -> int:
    ingest_task = asyncio.create_task(_store_ingest_loop(gw, store), name="store-ingest")
    try:
        await gw.start()
        await gw.wait_for_prime(timeout=30.0)
        await asyncio.sleep(0.2)

        devid = module_ids[0]
        exit_code = 0

        for raw_entry in set_values:
            if "=" not in raw_entry:
                print(f"✖ Invalid --set value '{raw_entry}'. Expected SYMBOL=VALUE")
                exit_code = 2
                continue
            symbol, raw_value = raw_entry.split("=", 1)
            symbol_norm = symbol.strip()
            if not symbol_norm:
                print(f"✖ Invalid --set value '{raw_entry}'. Missing symbol")
                exit_code = 2
                continue

            ok, message = await _execute_symbol_write(
                api=api,
                resolver=resolver,
                store=store,
                devid=devid,
                symbol=symbol_norm,
                value=_parse_cli_value(raw_value),
            )
            print(("✔ " if ok else "✖ ") + message)
            if not ok:
                exit_code = 1

        for symbol in toggles:
            symbol_norm = symbol.strip()
            if not symbol_norm:
                continue

            desc = await resolver.describe_symbol(symbol_norm)
            toggle_value = _toggle_value_for_symbol(desc=desc, store=store)

            ok, message = await _execute_symbol_write(
                api=api,
                resolver=resolver,
                store=store,
                devid=devid,
                symbol=symbol_norm,
                value=toggle_value,
            )
            print(("✔ " if ok else "✖ ") + message)
            if not ok:
                exit_code = 1

        return exit_code
    finally:
        ingest_task.cancel()
        await asyncio.gather(ingest_task, return_exceptions=True)


def _maybe_load_dotenv() -> None:
    """Load environment variables from a `.env` file, if supported.

    Notes:
        The CLI reads defaults from `os.environ` (e.g. `PYBO_EMAIL`). Typical shells and `uv run` do not
        automatically load `.env`, so we opportunistically load it when `python-dotenv` is installed.

        Existing environment variables are not overridden.
    """
    try:
        from dotenv import find_dotenv, load_dotenv
    except Exception:
        return

    dotenv_path = find_dotenv(usecwd=True)
    if dotenv_path:
        load_dotenv(dotenv_path, override=False)


def _setup_logging(debug: bool, quiet: bool) -> None:
    """Setup logging based on debug/quiet flags."""
    level = logging.DEBUG if debug else (logging.WARNING if quiet else logging.INFO)
    logging.basicConfig(level=level, format="%(asctime)s %(levelname)s %(name)s: %(message)s")


async def _prompt_select_object(api: BragerOneApiClient) -> int | None:
    """Prompt user to select an object from the list of available ones."""
    items = await api.get_objects()
    if not items:
        log.warning("Failed to fetch the list of objects (/v1/objects). Provide --object-id.")
        return None

    print("\nSelect an object (installation):")
    for i, o in enumerate(items, 1):
        name = o.name or f"object-{o.id}"
        print(f"[{i}] {name}  (id={o.id})")
    while True:
        sel = input("Item number: ").strip()
        if not sel.isdigit():
            print("Enter a number from the list.")
            continue
        idx = int(sel)
        if 1 <= idx <= len(items):
            return items[idx - 1].id
        print("Out of range, try again.")


async def _prompt_select_modules(api: BragerOneApiClient, object_id: int) -> list[str]:
    """Prompt user to select modules from the list of available ones."""
    rows = await api.get_modules(object_id=object_id)
    if not rows:
        print("No modules for the selected object.")
        return []

    print("Available modules:")
    for i, m in enumerate(rows, start=1):
        name = str(m.name or "-")
        code = m.devid or str(m.id)
        ver = m.moduleVersion or m.gateway.version or "-"
        print(f"[{i}] {name:24} code={code}  ver={ver}")

    print("Enter numbers separated by commas (e.g. 1,3) or * for all.")
    while True:
        sel = input("Selection: ").strip()
        if sel == "*":
            # all
            all = {str(m.devid or m.id) for m in rows if (m.devid or m.id) is not None}
            return sorted(all)

        try:
            idxs = [int(x) for x in sel.replace(" ", "").split(",") if x]
        except ValueError:
            print("Enter numbers from the list.")
            continue

        choices: set[str] = set()
        for idx in idxs:
            if 1 <= idx <= len(rows):
                m = rows[idx - 1]
                code_obj = m.devid or str(m.id)
                if code_obj is not None:
                    choices.add(str(code_obj))

        if choices:
            return sorted(choices)

        print("Invalid selection — please try again.")


[docs] async def run(args: argparse.Namespace) -> int: """Run the CLI with given args.""" _setup_logging(args.debug, args.quiet) store = CLITokenStore(args.email) # Temporary REST client for object/modules selection server = server_for(args.platform) api = BragerOneApiClient(server=server) api.set_token_store(store) gw: BragerOneGateway | None = None try: await api.ensure_auth(args.email, args.password) object_id = args.object_id or (await _prompt_select_object(api)) if not object_id: print("Missing object_id — exiting.") return 2 modules: list[str] if args.modules: # you can provide: --module FOO --module BAR or PYBO_MODULES="FOO,BAR" if isinstance(args.modules, str): modules = [m for m in args.modules.split(",") if m] else: modules = [str(m) for m in args.modules if m] else: modules = await _prompt_select_modules(api, object_id) if not modules: print("No modules selected — exiting.") return 2 # Runtime ParamStore is storage-only; opt into heavy resolution via ParamResolver. param_store = ParamStore() resolver = ParamResolver.from_api(api=api, store=param_store, lang=str(args.lang).strip().lower()) # Keep a single authenticated ApiClient instance and inject it into the Gateway. gw = BragerOneGateway(api=api, object_id=object_id, modules=modules) set_values = list(args.set_values or []) toggles = list(args.toggles or []) if set_values or toggles: return await _run_send_only_actions( api=api, gw=gw, store=param_store, resolver=resolver, module_ids=modules, set_values=set_values, toggles=toggles, ) # Start TUI subscriber BEFORE gateway prime so we don't miss the initial snapshot. spawn( _run_tui( api=api, gw=gw, store=param_store, resolver=resolver, object_id=object_id, module_ids=modules, json_events=bool(args.json), suppress_log=bool(args.no_diff), all_panels=bool(args.all_panels), debug_panels=bool(args.debug_panels), debug_logging=bool(args.debug), token_labels=bool(args.token_labels), ), "tui", log, ) # Start gateway (REST prime + WS live) await gw.start() stop = asyncio.Event() for sig in (signal.SIGINT, signal.SIGTERM): with contextlib.suppress(NotImplementedError): asyncio.get_running_loop().add_signal_handler(sig, stop.set) await stop.wait() return 0 finally: if gw is not None: with contextlib.suppress(Exception): await gw.stop() # Best-effort token revoke (keep behavior, but never crash on exit) with contextlib.suppress(Exception): await api.revoke() # Clean up background tasks for t in list(bg_tasks): t.cancel() with contextlib.suppress(Exception): await asyncio.gather(*bg_tasks, return_exceptions=True) await api.close()
def _route_title(route: Any) -> str: meta = getattr(route, "meta", None) dn = getattr(meta, "display_name", None) if meta is not None else None if isinstance(dn, str) and dn.strip(): return dn.strip() name = getattr(route, "name", None) if isinstance(name, str) and name.strip(): return name.strip() path = getattr(route, "path", None) return str(path or "-") def _iter_routes(routes: Iterable[Any]) -> Iterable[Any]: stack = list(routes)[::-1] while stack: cur = stack.pop() yield cur children = getattr(cur, "children", None) if isinstance(children, list): for child in reversed(children): stack.append(child) def _collect_route_symbols(route: Any) -> set[str]: symbols: set[str] = set() def add_from_container(container: Any) -> None: if container is None: return for kind in ("read", "write", "status", "special"): items = getattr(container, kind, None) if not isinstance(items, list): continue for item in items: tok = getattr(item, "token", None) if isinstance(tok, str) and tok: symbols.add(tok) meta = getattr(route, "meta", None) if meta is not None: add_from_container(getattr(meta, "parameters", None)) # Legacy / compatibility field add_from_container(getattr(route, "parameters", None)) return symbols async def _build_watch_groups( *, api: BragerOneApiClient, resolver: ParamResolver, object_id: int, module_ids: list[str], all_panels: bool, ) -> dict[str, list[str]]: """Build watch groups from library adapter (core or all-panels mode).""" modules = await api.get_modules(object_id=object_id) selected = {m.devid: m for m in modules if m.devid is not None and str(m.devid) in set(module_ids)} first_id = module_ids[0] mod = selected.get(first_id) if mod is None: return {"Boiler": [], "DHW": [], "Valve 1": []} device_menu = int(mod.deviceMenu) perms = list(getattr(mod, "permissions", []) or []) return await resolver.build_panel_groups(device_menu=device_menu, permissions=perms, all_panels=all_panels) async def _run_tui( *, api: BragerOneApiClient, gw: BragerOneGateway, store: ParamStore, resolver: ParamResolver, object_id: int, module_ids: list[str], json_events: bool, suppress_log: bool, all_panels: bool, debug_panels: bool, debug_logging: bool = False, token_labels: bool = False, ) -> None: """Run a minimal Rich TUI with a live values area and a scrolling log.""" try: from rich.console import Console, Group from rich.layout import Layout from rich.live import Live from rich.panel import Panel from rich.table import Table from rich.text import Text except ImportError as exc: # pragma: no cover raise RuntimeError( "TUI mode requires optional CLI dependencies. Install the 'cli' extra (e.g. `pip install pybragerone[cli]`)." ) from exc console = Console() log_lines: deque[str] = deque(maxlen=200) log_lines.append("▶ Starting… waiting for prime and live updates. Ctrl+C to exit.") termios_mod: Any | None = None tty_mod: Any | None = None loop_supports_add_reader = hasattr(asyncio.get_running_loop(), "add_reader") if loop_supports_add_reader: with contextlib.suppress(Exception): import termios as _termios import tty as _tty termios_mod = _termios tty_mod = _tty keyboard_control_enabled = termios_mod is not None and tty_mod is not None and loop_supports_add_reader if keyboard_control_enabled: log_lines.append("⌨ Keys: j/k or arrows = select, t = toggle, s = set value") loop = asyncio.get_running_loop() log_lock = threading.Lock() watch: dict[str, _WatchItem] = {} group_symbols: dict[str, list[str]] = {} visible_group_symbols: dict[str, list[str]] = {} panel_order: list[str] = [] symbol_policy_visible: dict[str, bool] = {} desc_cache: dict[str, dict[str, Any]] = {} symbol_deps: dict[str, set[str]] = {} symbol_groups: dict[str, set[str]] = {} key_to_symbols: dict[str, set[str]] = {} key_to_computed_symbols: dict[str, set[str]] = {} computed_symbols: set[str] = set() visible_computed_symbols: set[str] = set() selected_symbol: str | None = None dirty_keys: set[str] = set() initial_refresh = True content_ready = False render_rows_limit = 40 debug_lines_limit = 25 periodic_refresh_seconds = 1.0 def _collect_symbol_dependencies(desc: dict[str, Any], resolved: _WatchItem) -> set[str]: deps: set[str] = set() if isinstance(resolved.address, str) and resolved.address: deps.add(resolved.address) mapping = desc.get("mapping") if not isinstance(mapping, dict): return deps inputs = mapping.get("inputs") if isinstance(inputs, list): for entry in inputs: if isinstance(entry, dict): addr = entry.get("address") if isinstance(addr, str) and addr: deps.add(addr) channels = mapping.get("channels") if isinstance(channels, dict): for entries in channels.values(): if not isinstance(entries, list): continue for entry in entries: if isinstance(entry, dict): addr = entry.get("address") if isinstance(addr, str) and addr: deps.add(addr) status_conditions = mapping.get("status_conditions") if isinstance(status_conditions, dict): for entries in status_conditions.values(): if not isinstance(entries, list): continue for entry in entries: if isinstance(entry, dict): addr = entry.get("address") if isinstance(addr, str) and addr: deps.add(addr) return deps def _has_display_value(item: _WatchItem) -> bool: if isinstance(item.value_label, str) and item.value_label.strip(): return True if item.value is None: return False if isinstance(item.value, str): return bool(item.value.strip()) return True def _normalize_inline_unit_spacing(value: Any) -> Any: if not isinstance(value, str): return value text = value.strip() if not text: return value return re.sub(r"^([+-]?\d+(?:[.,]\d+)?)(°\S+)$", r"\1 \2", text) def _recompute_visible_groups() -> None: visible_computed_symbols.clear() visible_group_symbols.clear() for group_name, symbols in group_symbols.items(): visible = [ sym for sym in symbols if symbol_policy_visible.get(sym, True) and (watch.get(sym) is not None and _has_display_value(watch[sym])) ] visible_group_symbols[group_name] = visible for sym in visible: if sym in computed_symbols: visible_computed_symbols.add(sym) # Keep original group order but hide panels without visible rows. panel_order[:] = [name for name in group_symbols if visible_group_symbols.get(name)] def _visible_symbols_in_order() -> list[str]: ordered: list[str] = [] for panel_name in panel_order: ordered.extend(visible_group_symbols.get(panel_name, [])) return ordered def _ensure_selected_symbol() -> None: nonlocal selected_symbol ordered = _visible_symbols_in_order() if not ordered: selected_symbol = None return if selected_symbol not in ordered: selected_symbol = ordered[0] def _move_selection(delta: int) -> None: nonlocal selected_symbol ordered = _visible_symbols_in_order() if not ordered: selected_symbol = None return if selected_symbol not in ordered: selected_symbol = ordered[0] return idx = ordered.index(selected_symbol) selected_symbol = ordered[(idx + delta) % len(ordered)] def render_group(name: str) -> Panel: table = Table.grid(expand=True, padding=(0, 1)) table.add_column("Name", ratio=2, no_wrap=True, overflow="ellipsis") table.add_column("Value", ratio=1, no_wrap=True, overflow="ellipsis") if name not in visible_group_symbols: table.add_row("Loading…", "") return Panel(table, title=name) symbols = visible_group_symbols.get(name, []) if not symbols: table.add_row("-", "-") return Panel(table, title=name) for sym in symbols[:render_rows_limit]: it = watch.get(sym) if it is None: continue val = it.value_label if it.value_label is not None else it.value if isinstance(it.unit, str) and it.unit.strip(): val = f"{val} {it.unit}" if val is not None else "-" else: val = _normalize_inline_unit_spacing(val) marker = "▶ " if sym == selected_symbol else " " table.add_row(f"{marker}{it.label}", str(val) if val is not None else "-") hidden_count = max(0, len(symbols) - render_rows_limit) if hidden_count > 0: table.add_row("…", f"+{hidden_count} more") return Panel(table, title=name) def render_log() -> Panel: text = Text("\n".join(log_lines)) return Panel(text, title="Console") def render_panels_grid() -> Any: grid = Table.grid(expand=True, padding=(0, 0)) grid.add_column(ratio=1) grid.add_column(ratio=1) grid.add_column(ratio=1) if not panel_order: status_text = "Loading panels…" if not group_symbols else "No visible panels" grid.add_row(Panel(status_text, title="Panels"), Group(), Group()) return grid columns: list[list[Any]] = [[], [], []] heights = [0, 0, 0] for name in panel_order: symbols = visible_group_symbols.get(name, []) est_height = max(1, min(len(symbols), render_rows_limit)) + 3 col_idx = min(range(3), key=lambda idx: heights[idx]) columns[col_idx].append(render_group(name)) heights[col_idx] += est_height renderables: list[Any] = [] for col in columns: renderables.append(Group(*col) if col else Group()) grid.add_row(*renderables) return grid def build_layout() -> Layout: root = Layout(name="root") root.split_column(Layout(name="top", ratio=3), Layout(name="bottom", ratio=1)) root["bottom"].update(render_log()) root["top"].update(render_panels_grid()) return root layout = build_layout() def refresh_panels() -> None: layout["bottom"].update(render_log()) layout["top"].update(render_panels_grid()) dirty = asyncio.Event() log_state: dict[str, Any] = {"pending": 0, "sample_key": None, "sample_value": None} def _append_log_line(line: str) -> None: with log_lock: log_lines.append(line) dirty.set() class _TuiLogHandler(logging.Handler): def emit(self, record: logging.LogRecord) -> None: # Keep noise down by default. min_level = logging.DEBUG if debug_logging else logging.WARNING if record.levelno < min_level: return msg = self.format(record) loop.call_soon_threadsafe(_append_log_line, msg) async def bus_ingest() -> None: async for upd in gw.bus.subscribe(): if getattr(upd, "value", None) is None: continue key = _format_event_key(upd) await store.upsert_async(key, upd.value) dirty_keys.add(key) src = "" if isinstance(upd.meta, dict): src_raw = upd.meta.get("_source") src = str(src_raw) if src_raw is not None else "" if not suppress_log and src == "ws": if json_events: log_lines.append(json.dumps({"key": key, "value": upd.value}, ensure_ascii=False)) else: log_state["pending"] = int(log_state["pending"]) + 1 log_state["sample_key"] = key log_state["sample_value"] = upd.value dirty.set() async def log_flush_loop() -> None: while True: await asyncio.sleep(0.5) pending = int(log_state["pending"]) if pending <= 0: continue sample_key = log_state.get("sample_key") sample_value = log_state.get("sample_value") log_state["pending"] = 0 if isinstance(sample_key, str): if pending == 1: log_lines.append(f"↺ {sample_key} = {sample_value}") else: log_lines.append(f"↺ {sample_key} = {sample_value} (+{pending - 1} more)") else: log_lines.append(f"↺ +{pending} ws updates") dirty.set() async def init_watch() -> None: nonlocal content_ready # Keep prime fast and avoid repeated expensive UI recomposition during bootstrap. await gw.wait_for_prime(timeout=None) modules = await api.get_modules(object_id=object_id) selected = {m.devid: m for m in modules if m.devid is not None and str(m.devid) in set(module_ids)} selected_module = selected.get(module_ids[0]) if selected_module is not None and selected_module.devid is not None: devid_text = str(selected_module.devid) resolver.set_runtime_context( { "devid": devid_text, "modulesMap": { devid_text: { "connectedAt": selected_module.connectedAt, } }, } ) else: resolver.set_runtime_context(None) groups = await _build_watch_groups( api=api, resolver=resolver, object_id=object_id, module_ids=module_ids, all_panels=all_panels, ) group_symbols.clear() group_symbols.update(groups) panel_order.clear() panel_order.extend(list(groups.keys())) if debug_panels: mod = selected_module if mod is not None: perms = list(getattr(mod, "permissions", []) or []) route_diag = await resolver.panel_route_diagnostics( device_menu=int(mod.deviceMenu), permissions=perms, all_panels=all_panels, ) accepted = sum(1 for row in route_diag if bool(row.get("accepted"))) rejected = len(route_diag) - accepted log_lines.append(f"INFO: Panel route diagnostics: accepted={accepted}, rejected={rejected}") rejected_rows = [row for row in route_diag if not bool(row.get("accepted"))] for row in rejected_rows[:debug_lines_limit]: if bool(row.get("accepted")): continue nm = str(row.get("name") or "-") rsn = str(row.get("reason") or "-") cnt = int(row.get("symbol_count") or 0) log_lines.append(f"✖ {nm} ({cnt}) -> {rsn}") extra = max(0, len(rejected_rows) - debug_lines_limit) if extra: log_lines.append(f"INFO: ... {extra} more rejected routes omitted") # Build watch items (labels + mapping) once; values will be refreshed from ParamStore on dirty ticks. all_symbols = [sym for symbols in group_symbols.values() for sym in symbols] desc_by_symbol = await resolver.describe_symbols(all_symbols) for _group_name, symbols in group_symbols.items(): for sym in symbols: symbol_groups.setdefault(sym, set()).add(_group_name) if sym in watch: continue desc = desc_by_symbol.get(sym) or await resolver.describe_symbol(sym) desc_cache[sym] = desc resolved = await resolver.resolve_value(sym) display_label = sym if token_labels else str(desc.get("label") or sym) watch[sym] = _WatchItem( symbol=sym, label=display_label, unit=resolved.unit, address=resolved.address, value=resolved.value, value_label=resolved.value_label, kind=resolved.kind, ) symbol_deps[sym] = _collect_symbol_dependencies(desc, watch[sym]) addr = watch[sym].address if isinstance(addr, str) and addr: key_to_symbols.setdefault(addr, set()).add(sym) if watch[sym].kind == "computed": computed_symbols.add(sym) for dep in symbol_deps[sym]: key_to_computed_symbols.setdefault(dep, set()).add(sym) flat_values = store.flatten() for sym, item in watch.items(): desc = desc_cache.get(sym, {}) visible, _reason = resolver.parameter_visibility_diagnostics(desc=desc, resolved=item, flat_values=flat_values) symbol_policy_visible[sym] = visible _recompute_visible_groups() _ensure_selected_symbol() content_ready = True dirty.set() async def refresh_loop(*, live: Live) -> None: # Manual refresh to reduce blinking. nonlocal initial_refresh last_periodic_refresh = time.monotonic() while True: try: await asyncio.wait_for(dirty.wait(), timeout=0.25) except TimeoutError: continue dirty.clear() await asyncio.sleep(0.05) if not content_ready: continue updated_keys = set(dirty_keys) dirty_keys.clear() symbols_to_refresh: set[str] = set() if initial_refresh: symbols_to_refresh = set(visible_computed_symbols) initial_refresh = False elif updated_keys: for key in updated_keys: symbols_to_refresh.update(key_to_symbols.get(key, set())) symbols_to_refresh.update(key_to_computed_symbols.get(key, set())) now = time.monotonic() if not symbols_to_refresh and now - last_periodic_refresh >= periodic_refresh_seconds: symbols_to_refresh = set(visible_computed_symbols) last_periodic_refresh = now elif symbols_to_refresh: last_periodic_refresh = now # Re-resolve values from ParamStore so we don't depend on matching raw update keys. for sym in symbols_to_refresh: it = watch.get(sym) if it is None: continue resolved = await resolver.resolve_value(sym) it.value = resolved.value it.value_label = resolved.value_label _recompute_visible_groups() _ensure_selected_symbol() refresh_panels() live.refresh() async def _perform_toggle_selected() -> None: if not module_ids: return _ensure_selected_symbol() if not selected_symbol: return desc = await resolver.describe_symbol(selected_symbol) toggle_value = _toggle_value_for_symbol(desc=desc, store=store) ok, message = await _execute_symbol_write( api=api, resolver=resolver, store=store, devid=module_ids[0], symbol=selected_symbol, value=toggle_value, ) log_lines.append(("✔ " if ok else "✖ ") + message) dirty.set() async def _perform_set_selected(raw_input: str) -> None: if not module_ids: return _ensure_selected_symbol() if not selected_symbol: return requested_value = _parse_cli_value(raw_input) ok, message = await _execute_symbol_write( api=api, resolver=resolver, store=store, devid=module_ids[0], symbol=selected_symbol, value=requested_value, ) log_lines.append(("✔ " if ok else "✖ ") + message) dirty.set() async def keyboard_loop() -> None: if not keyboard_control_enabled or termios_mod is None or tty_mod is None: return loop = asyncio.get_running_loop() fd = 0 old_settings = termios_mod.tcgetattr(fd) async def _read_key_async() -> str: fut: asyncio.Future[str] = loop.create_future() def _on_readable() -> None: if fut.done(): return try: raw = os.read(fd, 3) key = raw.decode("utf-8", errors="ignore") except Exception: key = "" fut.set_result(key) loop.add_reader(fd, _on_readable) try: return await fut finally: with contextlib.suppress(Exception): loop.remove_reader(fd) def _prompt_value(symbol: str) -> str: termios_mod.tcsetattr(fd, termios_mod.TCSADRAIN, old_settings) try: console.print(f"\nSet value for {symbol}: ", end="") return input().strip() finally: tty_mod.setcbreak(fd) try: tty_mod.setcbreak(fd) while True: key = await _read_key_async() if not key: continue if key in ("\x1b[A", "k"): _move_selection(-1) dirty.set() continue if key in ("\x1b[B", "j"): _move_selection(1) dirty.set() continue if key == "t": await _perform_toggle_selected() continue if key == "s": _ensure_selected_symbol() if selected_symbol is None: continue raw_value = _prompt_value(selected_symbol) if raw_value: await _perform_set_selected(raw_value) else: log_lines.append("INFO: set cancelled (empty input)") dirty.set() finally: termios_mod.tcsetattr(fd, termios_mod.TCSADRAIN, old_settings) live = Live(layout, console=console, screen=True, auto_refresh=False) root_logger = logging.getLogger() prev_handlers = list(root_logger.handlers) prev_level = root_logger.level prev_httpx_level = logging.getLogger("httpx").level prev_ws_eio_level = logging.getLogger("pybragerone.api.ws.eio").level prev_ws_sio_level = logging.getLogger("pybragerone.api.ws.sio").level prev_catalog_level = logging.getLogger("pybragerone.models.catalog").level handler = _TuiLogHandler() handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s")) # In TUI mode, don't print logs to stdout/stderr. root_logger.handlers = [handler] root_logger.setLevel(logging.DEBUG if debug_logging else logging.WARNING) if not debug_logging: logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("pybragerone.api.ws.eio").setLevel(logging.WARNING) logging.getLogger("pybragerone.api.ws.sio").setLevel(logging.WARNING) logging.getLogger("pybragerone.models.catalog").setLevel(logging.WARNING) try: with live, contextlib.suppress(asyncio.CancelledError): refresh_panels() live.refresh() async with asyncio.TaskGroup() as tg: tg.create_task(bus_ingest()) tg.create_task(init_watch()) tg.create_task(refresh_loop(live=live)) tg.create_task(log_flush_loop()) tg.create_task(keyboard_loop()) finally: root_logger.handlers = prev_handlers root_logger.setLevel(prev_level) logging.getLogger("httpx").setLevel(prev_httpx_level) logging.getLogger("pybragerone.api.ws.eio").setLevel(prev_ws_eio_level) logging.getLogger("pybragerone.api.ws.sio").setLevel(prev_ws_sio_level) logging.getLogger("pybragerone.models.catalog").setLevel(prev_catalog_level)
[docs] def build_parser() -> argparse.ArgumentParser: """Build argument parser.""" p = argparse.ArgumentParser(prog="pybragerone", description="BragerOne — diagnostic CLI (REST + WS).") # env-first: if not in CLI, take from environment (easy to debug in VSCode) p.add_argument( "--email", default=os.environ.get("PYBO_EMAIL"), help="Login email (ENV: PYBO_EMAIL)", ) p.add_argument( "--password", default=os.environ.get("PYBO_PASSWORD"), help="Password (ENV: PYBO_PASSWORD)", ) p.add_argument( "--platform", choices=[p.value for p in Platform], default=os.environ.get("PYBO_PLATFORM", Platform.BRAGERONE.value), help="Backend platform: bragerone or tisconnect (ENV: PYBO_PLATFORM). Default: bragerone", ) p.add_argument( "--lang", default=os.environ.get("PYBO_LANG", "en"), help="Language code for asset-driven labels (ENV: PYBO_LANG). Default: en", ) p.add_argument( "--object-id", type=int, default=os.environ.get("PYBO_OBJECT_ID") and int(os.environ["PYBO_OBJECT_ID"]), help="Object ID (ENV: PYBO_OBJECT_ID)", ) p.add_argument( "--module", dest="modules", action="append", help="Module code (devid/code). Can be specified multiple times or via ENV PYBO_MODULES=FTTCTBSLCE,OTHER", default=os.environ.get("PYBO_MODULES"), ) p.add_argument( "--json", action="store_true", help="Print events as JSON (one line per update)", ) p.add_argument("--raw-ws", action="store_true", help="Print raw WS events (debug)") p.add_argument("--raw-http", action="store_true", help="Trace HTTP (warning: logs may contain data)") p.add_argument("--no-diff", action="store_true", help="Don't print changes (arrows) on STDOUT") p.add_argument("--debug", action="store_true", help="More logs") p.add_argument("--quiet", action="store_true", help="Fewer logs") p.add_argument( "--all-panels", action="store_true", help="Render all available menu panels (3-column grid) instead of Boiler/DHW/Valve 1 only", ) p.add_argument( "--debug-panels", action="store_true", help="Log panel route inclusion/exclusion reasons in TUI console", ) p.add_argument( "--token-labels", action="store_true", help="Display raw symbol tokens as names in TUI instead of localized labels", ) p.add_argument( "--set", dest="set_values", action="append", default=[], help="Send SYMBOL=VALUE update (repeatable). Example: --set PARAM_0=76", ) p.add_argument( "--toggle", dest="toggles", action="append", default=[], help="Toggle a symbol using current state/rules (repeatable). Example: --toggle URUCHOMIENIE_KOTLA", ) return p
[docs] def main() -> None: """Main entrypoint for CLI.""" _maybe_load_dotenv() parser = build_parser() args = parser.parse_args() # PYBO_MODULES=FOO,BAR → convert to list if --module was not used if isinstance(args.modules, str): args.modules = [m for m in args.modules.split(",") if m] if not args.email or not args.password: raise SystemExit("Missing credentials: set PYBO_EMAIL/PYBO_PASSWORD or pass --email/--password.") try: code = asyncio.run(run(args)) except ApiError as exc: payload = exc.data body = json.dumps(payload, ensure_ascii=False) if isinstance(payload, (dict, list)) else str(payload) raise SystemExit(f"HTTP {exc.status}: {body}") from None except httpx.RequestError as exc: raise SystemExit(f"Network error: {exc.__class__.__name__}: {exc}") from None except KeyboardInterrupt: code = 130 raise SystemExit(code)
if __name__ == "__main__": main()