"""Diagnostic CLI for BragerOne (REST + WS)."""
from __future__ import annotations
import argparse
import asyncio
import contextlib
import json
import logging
import os
import re
import signal
import threading
import time
from collections import deque
from collections.abc import Iterable, Mapping
from dataclasses import dataclass
from typing import Any, cast
import httpx
from .api import BragerOneApiClient, Platform, server_for
from .api.client import ApiError
from .gateway import BragerOneGateway
from .models import CLITokenStore, ParamResolver, ParamStore
from .utils import bg_tasks, spawn
log = logging.getLogger(__name__)
CHAN_RE = re.compile(r"^([a-z])(\d+)$")
@dataclass(slots=True)
class _WatchItem:
symbol: str
label: str
unit: str | dict[str, str] | None
address: str | None
value: Any
value_label: str | None
kind: str
def _format_event_key_from_parts(pool_raw: Any, chan: Any, idx: Any) -> str:
pool = str(pool_raw or "")
if pool.isdigit() or (pool and not pool.startswith("P")):
pool = f"P{pool}"
return f"{pool}.{chan}{idx}"
def _format_event_key(upd: Any) -> str:
return _format_event_key_from_parts(getattr(upd, "pool", None), getattr(upd, "chan", None), getattr(upd, "idx", None))
def _parse_cli_value(raw: str) -> Any:
text = raw.strip()
if not text:
return ""
lowered = text.lower()
if lowered == "true":
return True
if lowered == "false":
return False
if lowered in {"null", "none"}:
return None
try:
return int(text)
except ValueError:
pass
try:
return float(text)
except ValueError:
pass
if (text.startswith("{") and text.endswith("}")) or (text.startswith("[") and text.endswith("]")):
with contextlib.suppress(Exception):
return json.loads(text)
return text
def _invert_value(current: Any) -> Any:
if isinstance(current, bool):
return not current
if isinstance(current, (int, float)):
return 0 if int(current) != 0 else 1
if isinstance(current, str):
val = current.strip().lower()
if val in {"on", "true", "1"}:
return "OFF"
if val in {"off", "false", "0"}:
return "ON"
return 1
def _normalize_expected(value: Any) -> Any:
if isinstance(value, str) and value.strip() == "void 0":
return None
return value
def _read_target_actual(target: Mapping[str, Any], flat_values: Mapping[str, Any]) -> Any:
address = target.get("address")
if not isinstance(address, str) or not address:
return None
raw_value = flat_values.get(address)
bit = target.get("bit")
if isinstance(bit, int) and isinstance(raw_value, int):
return 1 if ((raw_value >> bit) & 1) else 0
mask = target.get("mask")
if isinstance(mask, int) and isinstance(raw_value, int):
return raw_value & mask
return raw_value
def _compare_condition(*, operation: str, actual: Any, expected: Any) -> bool:
expected_norm = _normalize_expected(expected)
if operation == "equalTo":
return bool(actual == expected_norm)
if operation == "notEqualTo":
return bool(actual != expected_norm)
if operation == "greaterThan":
return bool(actual is not None and expected_norm is not None and actual > expected_norm)
if operation == "greaterThanOrEqualTo":
return bool(actual is not None and expected_norm is not None and actual >= expected_norm)
if operation == "lessThan":
return bool(actual is not None and expected_norm is not None and actual < expected_norm)
if operation == "lessThanOrEqualTo":
return bool(actual is not None and expected_norm is not None and actual <= expected_norm)
return False
def _command_rule_matches(rule: Mapping[str, Any], flat_values: Mapping[str, Any]) -> bool:
conditions = rule.get("conditions")
if not isinstance(conditions, list) or not conditions:
return True
for cond in conditions:
if not isinstance(cond, Mapping):
return False
operation = cond.get("operation")
expected = cond.get("expected")
targets = cond.get("targets")
if not isinstance(operation, str) or not isinstance(targets, list) or not targets:
return False
validated_targets: list[Mapping[str, Any]] = []
for target in targets:
if not isinstance(target, Mapping):
continue
if all(isinstance(key, str) for key in target):
validated_targets.append(cast(Mapping[str, Any], target))
if not validated_targets:
return False
if not all(
_compare_condition(
operation=operation,
actual=_read_target_actual(target, flat_values),
expected=expected,
)
for target in validated_targets
):
return False
return True
def _select_command_rule(desc: Mapping[str, Any], flat_values: Mapping[str, Any]) -> Mapping[str, Any] | None:
mapping = desc.get("mapping")
if not isinstance(mapping, Mapping):
return None
command_rules = mapping.get("command_rules")
if not isinstance(command_rules, list):
return None
for rule in command_rules:
if isinstance(rule, Mapping) and _command_rule_matches(rule, flat_values):
normalized_rule: dict[str, Any] = {}
for key, value in rule.items():
normalized_rule[str(key)] = value
return normalized_rule
return None
def _toggle_value_for_symbol(*, desc: Mapping[str, Any], store: ParamStore) -> Any:
pool = desc.get("pool")
chan = desc.get("chan")
idx = desc.get("idx")
has_direct_address = isinstance(pool, str) and isinstance(chan, str) and isinstance(idx, int)
mapping = desc.get("mapping")
command_rules: list[Any] | None = None
if isinstance(mapping, Mapping):
raw_rules = mapping.get("command_rules")
if isinstance(raw_rules, list) and raw_rules:
command_rules = raw_rules
current_value = desc.get("computed_value")
if current_value is None:
current_value = desc.get("value")
if not has_direct_address and command_rules:
flat_values = store.flatten()
active_rule = _select_command_rule(desc, flat_values)
if isinstance(active_rule, Mapping):
active_command = active_rule.get("command")
active_value = active_rule.get("value")
if isinstance(active_command, str):
for candidate in command_rules:
if not isinstance(candidate, Mapping) or candidate is active_rule:
continue
if candidate.get("command") != active_command:
continue
candidate_value = candidate.get("value")
if candidate_value != active_value:
return candidate_value
return None
return _invert_value(current_value)
async def _store_ingest_loop(gw: BragerOneGateway, store: ParamStore) -> None:
async for upd in gw.bus.subscribe():
if getattr(upd, "value", None) is None:
continue
await store.upsert_async(_format_event_key(upd), upd.value)
def _mapping_parameter_name(desc: Mapping[str, Any]) -> str | None:
mapping = desc.get("mapping")
if not isinstance(mapping, Mapping):
return None
raw = mapping.get("raw")
if not isinstance(raw, Mapping):
return None
name = raw.get("name")
if isinstance(name, str) and name.strip():
return name.strip()
return None
def _as_float(value: Any) -> float | None:
if isinstance(value, bool):
return None
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
text = value.strip().replace(",", ".")
if not text:
return None
with contextlib.suppress(ValueError):
return float(text)
return None
async def _symbol_numeric_transform_expr(resolver: ParamResolver, desc: Mapping[str, Any]) -> Any:
unit_code = desc.get("unit_code")
if unit_code is not None:
unit_meta = await resolver._resolve_unit_meta(raw_unit_code=unit_code)
if isinstance(unit_meta, Mapping) and "value" in unit_meta:
return unit_meta.get("value")
mapping = desc.get("mapping")
if isinstance(mapping, Mapping):
unit_source = mapping.get("units_source")
if isinstance(unit_source, Mapping) and "value" in unit_source:
return unit_source.get("value")
return None
async def _prepare_symbol_write_value(
*,
resolver: ParamResolver,
desc: Mapping[str, Any],
requested_value: Any,
) -> tuple[bool, Any, str | None]:
min_raw = desc.get("min")
max_raw = desc.get("max")
min_raw_num = _as_float(min_raw)
max_raw_num = _as_float(max_raw)
transform_expr = await _symbol_numeric_transform_expr(resolver, desc)
transform = ParamResolver._parse_numeric_transform(transform_expr)
prepared_value: Any = requested_value
requested_num = _as_float(requested_value)
if requested_num is not None and transform is not None and transform.factor != 0.0:
raw_num = (requested_num / transform.factor) - transform.shift
prepared_value = round(raw_num) if abs(raw_num - round(raw_num)) < 1e-9 else raw_num
prepared_num = _as_float(prepared_value)
if prepared_num is not None:
out_of_bounds = (min_raw_num is not None and prepared_num < min_raw_num) or (
max_raw_num is not None and prepared_num > max_raw_num
)
if out_of_bounds:
display_min = ParamResolver._apply_numeric_transform(min_raw_num, transform_expr) if min_raw_num is not None else None
display_max = ParamResolver._apply_numeric_transform(max_raw_num, transform_expr) if max_raw_num is not None else None
return (
False,
prepared_value,
"value out of range: "
f"raw[{min_raw}..{max_raw}] display[{display_min}..{display_max}] requested={requested_value}",
)
return True, prepared_value, None
async def _execute_symbol_write(
*,
api: BragerOneApiClient,
resolver: ParamResolver,
store: ParamStore,
devid: str,
symbol: str,
value: Any,
) -> tuple[bool, str]:
desc = await resolver.describe_symbol(symbol)
value_ok, prepared_value, err = await _prepare_symbol_write_value(
resolver=resolver,
desc=desc,
requested_value=value,
)
if not value_ok:
return False, f"{symbol}: {err}"
pool = desc.get("pool")
chan = desc.get("chan")
idx = desc.get("idx")
if isinstance(pool, str) and isinstance(chan, str) and isinstance(idx, int):
parameter = f"{chan}{idx}"
result = await api.module_command(
devid=devid,
pool=pool,
parameter=parameter,
value=prepared_value,
parameter_name=_mapping_parameter_name(desc),
return_data=True,
)
status, data = cast(tuple[int, Any], result)
return (
status in (200, 201, 202, 204),
f"{symbol} -> command {pool}.{parameter}={prepared_value} (input={value}) status={status} data={data}",
)
flat_values = store.flatten()
rule = _select_command_rule(desc, flat_values)
if isinstance(rule, Mapping):
command = rule.get("command")
if isinstance(command, str) and command.strip():
send_value = prepared_value if prepared_value is not None else rule.get("value")
result = await api.module_command_raw(devid=devid, command=command.strip(), value=send_value, return_data=True)
status, data = cast(tuple[int, Any], result)
return (
status in (200, 201, 202, 204),
f"{symbol} -> raw {command} value={send_value} (input={value}) status={status} data={data}",
)
return False, f"{symbol}: unable to determine command route (no address and no command rule match)"
async def _run_send_only_actions(
*,
api: BragerOneApiClient,
gw: BragerOneGateway,
store: ParamStore,
resolver: ParamResolver,
module_ids: list[str],
set_values: list[str],
toggles: list[str],
) -> int:
ingest_task = asyncio.create_task(_store_ingest_loop(gw, store), name="store-ingest")
try:
await gw.start()
await gw.wait_for_prime(timeout=30.0)
await asyncio.sleep(0.2)
devid = module_ids[0]
exit_code = 0
for raw_entry in set_values:
if "=" not in raw_entry:
print(f"✖ Invalid --set value '{raw_entry}'. Expected SYMBOL=VALUE")
exit_code = 2
continue
symbol, raw_value = raw_entry.split("=", 1)
symbol_norm = symbol.strip()
if not symbol_norm:
print(f"✖ Invalid --set value '{raw_entry}'. Missing symbol")
exit_code = 2
continue
ok, message = await _execute_symbol_write(
api=api,
resolver=resolver,
store=store,
devid=devid,
symbol=symbol_norm,
value=_parse_cli_value(raw_value),
)
print(("✔ " if ok else "✖ ") + message)
if not ok:
exit_code = 1
for symbol in toggles:
symbol_norm = symbol.strip()
if not symbol_norm:
continue
desc = await resolver.describe_symbol(symbol_norm)
toggle_value = _toggle_value_for_symbol(desc=desc, store=store)
ok, message = await _execute_symbol_write(
api=api,
resolver=resolver,
store=store,
devid=devid,
symbol=symbol_norm,
value=toggle_value,
)
print(("✔ " if ok else "✖ ") + message)
if not ok:
exit_code = 1
return exit_code
finally:
ingest_task.cancel()
await asyncio.gather(ingest_task, return_exceptions=True)
def _maybe_load_dotenv() -> None:
"""Load environment variables from a `.env` file, if supported.
Notes:
The CLI reads defaults from `os.environ` (e.g. `PYBO_EMAIL`). Typical shells and `uv run` do not
automatically load `.env`, so we opportunistically load it when `python-dotenv` is installed.
Existing environment variables are not overridden.
"""
try:
from dotenv import find_dotenv, load_dotenv
except Exception:
return
dotenv_path = find_dotenv(usecwd=True)
if dotenv_path:
load_dotenv(dotenv_path, override=False)
def _setup_logging(debug: bool, quiet: bool) -> None:
"""Setup logging based on debug/quiet flags."""
level = logging.DEBUG if debug else (logging.WARNING if quiet else logging.INFO)
logging.basicConfig(level=level, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
async def _prompt_select_object(api: BragerOneApiClient) -> int | None:
"""Prompt user to select an object from the list of available ones."""
items = await api.get_objects()
if not items:
log.warning("Failed to fetch the list of objects (/v1/objects). Provide --object-id.")
return None
print("\nSelect an object (installation):")
for i, o in enumerate(items, 1):
name = o.name or f"object-{o.id}"
print(f"[{i}] {name} (id={o.id})")
while True:
sel = input("Item number: ").strip()
if not sel.isdigit():
print("Enter a number from the list.")
continue
idx = int(sel)
if 1 <= idx <= len(items):
return items[idx - 1].id
print("Out of range, try again.")
async def _prompt_select_modules(api: BragerOneApiClient, object_id: int) -> list[str]:
"""Prompt user to select modules from the list of available ones."""
rows = await api.get_modules(object_id=object_id)
if not rows:
print("No modules for the selected object.")
return []
print("Available modules:")
for i, m in enumerate(rows, start=1):
name = str(m.name or "-")
code = m.devid or str(m.id)
ver = m.moduleVersion or m.gateway.version or "-"
print(f"[{i}] {name:24} code={code} ver={ver}")
print("Enter numbers separated by commas (e.g. 1,3) or * for all.")
while True:
sel = input("Selection: ").strip()
if sel == "*":
# all
all = {str(m.devid or m.id) for m in rows if (m.devid or m.id) is not None}
return sorted(all)
try:
idxs = [int(x) for x in sel.replace(" ", "").split(",") if x]
except ValueError:
print("Enter numbers from the list.")
continue
choices: set[str] = set()
for idx in idxs:
if 1 <= idx <= len(rows):
m = rows[idx - 1]
code_obj = m.devid or str(m.id)
if code_obj is not None:
choices.add(str(code_obj))
if choices:
return sorted(choices)
print("Invalid selection — please try again.")
[docs]
async def run(args: argparse.Namespace) -> int:
"""Run the CLI with given args."""
_setup_logging(args.debug, args.quiet)
store = CLITokenStore(args.email)
# Temporary REST client for object/modules selection
server = server_for(args.platform)
api = BragerOneApiClient(server=server)
api.set_token_store(store)
gw: BragerOneGateway | None = None
try:
await api.ensure_auth(args.email, args.password)
object_id = args.object_id or (await _prompt_select_object(api))
if not object_id:
print("Missing object_id — exiting.")
return 2
modules: list[str]
if args.modules:
# you can provide: --module FOO --module BAR or PYBO_MODULES="FOO,BAR"
if isinstance(args.modules, str):
modules = [m for m in args.modules.split(",") if m]
else:
modules = [str(m) for m in args.modules if m]
else:
modules = await _prompt_select_modules(api, object_id)
if not modules:
print("No modules selected — exiting.")
return 2
# Runtime ParamStore is storage-only; opt into heavy resolution via ParamResolver.
param_store = ParamStore()
resolver = ParamResolver.from_api(api=api, store=param_store, lang=str(args.lang).strip().lower())
# Keep a single authenticated ApiClient instance and inject it into the Gateway.
gw = BragerOneGateway(api=api, object_id=object_id, modules=modules)
set_values = list(args.set_values or [])
toggles = list(args.toggles or [])
if set_values or toggles:
return await _run_send_only_actions(
api=api,
gw=gw,
store=param_store,
resolver=resolver,
module_ids=modules,
set_values=set_values,
toggles=toggles,
)
# Start TUI subscriber BEFORE gateway prime so we don't miss the initial snapshot.
spawn(
_run_tui(
api=api,
gw=gw,
store=param_store,
resolver=resolver,
object_id=object_id,
module_ids=modules,
json_events=bool(args.json),
suppress_log=bool(args.no_diff),
all_panels=bool(args.all_panels),
debug_panels=bool(args.debug_panels),
debug_logging=bool(args.debug),
token_labels=bool(args.token_labels),
),
"tui",
log,
)
# Start gateway (REST prime + WS live)
await gw.start()
stop = asyncio.Event()
for sig in (signal.SIGINT, signal.SIGTERM):
with contextlib.suppress(NotImplementedError):
asyncio.get_running_loop().add_signal_handler(sig, stop.set)
await stop.wait()
return 0
finally:
if gw is not None:
with contextlib.suppress(Exception):
await gw.stop()
# Best-effort token revoke (keep behavior, but never crash on exit)
with contextlib.suppress(Exception):
await api.revoke()
# Clean up background tasks
for t in list(bg_tasks):
t.cancel()
with contextlib.suppress(Exception):
await asyncio.gather(*bg_tasks, return_exceptions=True)
await api.close()
def _route_title(route: Any) -> str:
meta = getattr(route, "meta", None)
dn = getattr(meta, "display_name", None) if meta is not None else None
if isinstance(dn, str) and dn.strip():
return dn.strip()
name = getattr(route, "name", None)
if isinstance(name, str) and name.strip():
return name.strip()
path = getattr(route, "path", None)
return str(path or "-")
def _iter_routes(routes: Iterable[Any]) -> Iterable[Any]:
stack = list(routes)[::-1]
while stack:
cur = stack.pop()
yield cur
children = getattr(cur, "children", None)
if isinstance(children, list):
for child in reversed(children):
stack.append(child)
def _collect_route_symbols(route: Any) -> set[str]:
symbols: set[str] = set()
def add_from_container(container: Any) -> None:
if container is None:
return
for kind in ("read", "write", "status", "special"):
items = getattr(container, kind, None)
if not isinstance(items, list):
continue
for item in items:
tok = getattr(item, "token", None)
if isinstance(tok, str) and tok:
symbols.add(tok)
meta = getattr(route, "meta", None)
if meta is not None:
add_from_container(getattr(meta, "parameters", None))
# Legacy / compatibility field
add_from_container(getattr(route, "parameters", None))
return symbols
async def _build_watch_groups(
*,
api: BragerOneApiClient,
resolver: ParamResolver,
object_id: int,
module_ids: list[str],
all_panels: bool,
) -> dict[str, list[str]]:
"""Build watch groups from library adapter (core or all-panels mode)."""
modules = await api.get_modules(object_id=object_id)
selected = {m.devid: m for m in modules if m.devid is not None and str(m.devid) in set(module_ids)}
first_id = module_ids[0]
mod = selected.get(first_id)
if mod is None:
return {"Boiler": [], "DHW": [], "Valve 1": []}
device_menu = int(mod.deviceMenu)
perms = list(getattr(mod, "permissions", []) or [])
return await resolver.build_panel_groups(device_menu=device_menu, permissions=perms, all_panels=all_panels)
async def _run_tui(
*,
api: BragerOneApiClient,
gw: BragerOneGateway,
store: ParamStore,
resolver: ParamResolver,
object_id: int,
module_ids: list[str],
json_events: bool,
suppress_log: bool,
all_panels: bool,
debug_panels: bool,
debug_logging: bool = False,
token_labels: bool = False,
) -> None:
"""Run a minimal Rich TUI with a live values area and a scrolling log."""
try:
from rich.console import Console, Group
from rich.layout import Layout
from rich.live import Live
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
except ImportError as exc: # pragma: no cover
raise RuntimeError(
"TUI mode requires optional CLI dependencies. Install the 'cli' extra (e.g. `pip install pybragerone[cli]`)."
) from exc
console = Console()
log_lines: deque[str] = deque(maxlen=200)
log_lines.append("▶ Starting… waiting for prime and live updates. Ctrl+C to exit.")
termios_mod: Any | None = None
tty_mod: Any | None = None
loop_supports_add_reader = hasattr(asyncio.get_running_loop(), "add_reader")
if loop_supports_add_reader:
with contextlib.suppress(Exception):
import termios as _termios
import tty as _tty
termios_mod = _termios
tty_mod = _tty
keyboard_control_enabled = termios_mod is not None and tty_mod is not None and loop_supports_add_reader
if keyboard_control_enabled:
log_lines.append("⌨ Keys: j/k or arrows = select, t = toggle, s = set value")
loop = asyncio.get_running_loop()
log_lock = threading.Lock()
watch: dict[str, _WatchItem] = {}
group_symbols: dict[str, list[str]] = {}
visible_group_symbols: dict[str, list[str]] = {}
panel_order: list[str] = []
symbol_policy_visible: dict[str, bool] = {}
desc_cache: dict[str, dict[str, Any]] = {}
symbol_deps: dict[str, set[str]] = {}
symbol_groups: dict[str, set[str]] = {}
key_to_symbols: dict[str, set[str]] = {}
key_to_computed_symbols: dict[str, set[str]] = {}
computed_symbols: set[str] = set()
visible_computed_symbols: set[str] = set()
selected_symbol: str | None = None
dirty_keys: set[str] = set()
initial_refresh = True
content_ready = False
render_rows_limit = 40
debug_lines_limit = 25
periodic_refresh_seconds = 1.0
def _collect_symbol_dependencies(desc: dict[str, Any], resolved: _WatchItem) -> set[str]:
deps: set[str] = set()
if isinstance(resolved.address, str) and resolved.address:
deps.add(resolved.address)
mapping = desc.get("mapping")
if not isinstance(mapping, dict):
return deps
inputs = mapping.get("inputs")
if isinstance(inputs, list):
for entry in inputs:
if isinstance(entry, dict):
addr = entry.get("address")
if isinstance(addr, str) and addr:
deps.add(addr)
channels = mapping.get("channels")
if isinstance(channels, dict):
for entries in channels.values():
if not isinstance(entries, list):
continue
for entry in entries:
if isinstance(entry, dict):
addr = entry.get("address")
if isinstance(addr, str) and addr:
deps.add(addr)
status_conditions = mapping.get("status_conditions")
if isinstance(status_conditions, dict):
for entries in status_conditions.values():
if not isinstance(entries, list):
continue
for entry in entries:
if isinstance(entry, dict):
addr = entry.get("address")
if isinstance(addr, str) and addr:
deps.add(addr)
return deps
def _has_display_value(item: _WatchItem) -> bool:
if isinstance(item.value_label, str) and item.value_label.strip():
return True
if item.value is None:
return False
if isinstance(item.value, str):
return bool(item.value.strip())
return True
def _normalize_inline_unit_spacing(value: Any) -> Any:
if not isinstance(value, str):
return value
text = value.strip()
if not text:
return value
return re.sub(r"^([+-]?\d+(?:[.,]\d+)?)(°\S+)$", r"\1 \2", text)
def _recompute_visible_groups() -> None:
visible_computed_symbols.clear()
visible_group_symbols.clear()
for group_name, symbols in group_symbols.items():
visible = [
sym
for sym in symbols
if symbol_policy_visible.get(sym, True) and (watch.get(sym) is not None and _has_display_value(watch[sym]))
]
visible_group_symbols[group_name] = visible
for sym in visible:
if sym in computed_symbols:
visible_computed_symbols.add(sym)
# Keep original group order but hide panels without visible rows.
panel_order[:] = [name for name in group_symbols if visible_group_symbols.get(name)]
def _visible_symbols_in_order() -> list[str]:
ordered: list[str] = []
for panel_name in panel_order:
ordered.extend(visible_group_symbols.get(panel_name, []))
return ordered
def _ensure_selected_symbol() -> None:
nonlocal selected_symbol
ordered = _visible_symbols_in_order()
if not ordered:
selected_symbol = None
return
if selected_symbol not in ordered:
selected_symbol = ordered[0]
def _move_selection(delta: int) -> None:
nonlocal selected_symbol
ordered = _visible_symbols_in_order()
if not ordered:
selected_symbol = None
return
if selected_symbol not in ordered:
selected_symbol = ordered[0]
return
idx = ordered.index(selected_symbol)
selected_symbol = ordered[(idx + delta) % len(ordered)]
def render_group(name: str) -> Panel:
table = Table.grid(expand=True, padding=(0, 1))
table.add_column("Name", ratio=2, no_wrap=True, overflow="ellipsis")
table.add_column("Value", ratio=1, no_wrap=True, overflow="ellipsis")
if name not in visible_group_symbols:
table.add_row("Loading…", "")
return Panel(table, title=name)
symbols = visible_group_symbols.get(name, [])
if not symbols:
table.add_row("-", "-")
return Panel(table, title=name)
for sym in symbols[:render_rows_limit]:
it = watch.get(sym)
if it is None:
continue
val = it.value_label if it.value_label is not None else it.value
if isinstance(it.unit, str) and it.unit.strip():
val = f"{val} {it.unit}" if val is not None else "-"
else:
val = _normalize_inline_unit_spacing(val)
marker = "▶ " if sym == selected_symbol else " "
table.add_row(f"{marker}{it.label}", str(val) if val is not None else "-")
hidden_count = max(0, len(symbols) - render_rows_limit)
if hidden_count > 0:
table.add_row("…", f"+{hidden_count} more")
return Panel(table, title=name)
def render_log() -> Panel:
text = Text("\n".join(log_lines))
return Panel(text, title="Console")
def render_panels_grid() -> Any:
grid = Table.grid(expand=True, padding=(0, 0))
grid.add_column(ratio=1)
grid.add_column(ratio=1)
grid.add_column(ratio=1)
if not panel_order:
status_text = "Loading panels…" if not group_symbols else "No visible panels"
grid.add_row(Panel(status_text, title="Panels"), Group(), Group())
return grid
columns: list[list[Any]] = [[], [], []]
heights = [0, 0, 0]
for name in panel_order:
symbols = visible_group_symbols.get(name, [])
est_height = max(1, min(len(symbols), render_rows_limit)) + 3
col_idx = min(range(3), key=lambda idx: heights[idx])
columns[col_idx].append(render_group(name))
heights[col_idx] += est_height
renderables: list[Any] = []
for col in columns:
renderables.append(Group(*col) if col else Group())
grid.add_row(*renderables)
return grid
def build_layout() -> Layout:
root = Layout(name="root")
root.split_column(Layout(name="top", ratio=3), Layout(name="bottom", ratio=1))
root["bottom"].update(render_log())
root["top"].update(render_panels_grid())
return root
layout = build_layout()
def refresh_panels() -> None:
layout["bottom"].update(render_log())
layout["top"].update(render_panels_grid())
dirty = asyncio.Event()
log_state: dict[str, Any] = {"pending": 0, "sample_key": None, "sample_value": None}
def _append_log_line(line: str) -> None:
with log_lock:
log_lines.append(line)
dirty.set()
class _TuiLogHandler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
# Keep noise down by default.
min_level = logging.DEBUG if debug_logging else logging.WARNING
if record.levelno < min_level:
return
msg = self.format(record)
loop.call_soon_threadsafe(_append_log_line, msg)
async def bus_ingest() -> None:
async for upd in gw.bus.subscribe():
if getattr(upd, "value", None) is None:
continue
key = _format_event_key(upd)
await store.upsert_async(key, upd.value)
dirty_keys.add(key)
src = ""
if isinstance(upd.meta, dict):
src_raw = upd.meta.get("_source")
src = str(src_raw) if src_raw is not None else ""
if not suppress_log and src == "ws":
if json_events:
log_lines.append(json.dumps({"key": key, "value": upd.value}, ensure_ascii=False))
else:
log_state["pending"] = int(log_state["pending"]) + 1
log_state["sample_key"] = key
log_state["sample_value"] = upd.value
dirty.set()
async def log_flush_loop() -> None:
while True:
await asyncio.sleep(0.5)
pending = int(log_state["pending"])
if pending <= 0:
continue
sample_key = log_state.get("sample_key")
sample_value = log_state.get("sample_value")
log_state["pending"] = 0
if isinstance(sample_key, str):
if pending == 1:
log_lines.append(f"↺ {sample_key} = {sample_value}")
else:
log_lines.append(f"↺ {sample_key} = {sample_value} (+{pending - 1} more)")
else:
log_lines.append(f"↺ +{pending} ws updates")
dirty.set()
async def init_watch() -> None:
nonlocal content_ready
# Keep prime fast and avoid repeated expensive UI recomposition during bootstrap.
await gw.wait_for_prime(timeout=None)
modules = await api.get_modules(object_id=object_id)
selected = {m.devid: m for m in modules if m.devid is not None and str(m.devid) in set(module_ids)}
selected_module = selected.get(module_ids[0])
if selected_module is not None and selected_module.devid is not None:
devid_text = str(selected_module.devid)
resolver.set_runtime_context(
{
"devid": devid_text,
"modulesMap": {
devid_text: {
"connectedAt": selected_module.connectedAt,
}
},
}
)
else:
resolver.set_runtime_context(None)
groups = await _build_watch_groups(
api=api,
resolver=resolver,
object_id=object_id,
module_ids=module_ids,
all_panels=all_panels,
)
group_symbols.clear()
group_symbols.update(groups)
panel_order.clear()
panel_order.extend(list(groups.keys()))
if debug_panels:
mod = selected_module
if mod is not None:
perms = list(getattr(mod, "permissions", []) or [])
route_diag = await resolver.panel_route_diagnostics(
device_menu=int(mod.deviceMenu),
permissions=perms,
all_panels=all_panels,
)
accepted = sum(1 for row in route_diag if bool(row.get("accepted")))
rejected = len(route_diag) - accepted
log_lines.append(f"INFO: Panel route diagnostics: accepted={accepted}, rejected={rejected}")
rejected_rows = [row for row in route_diag if not bool(row.get("accepted"))]
for row in rejected_rows[:debug_lines_limit]:
if bool(row.get("accepted")):
continue
nm = str(row.get("name") or "-")
rsn = str(row.get("reason") or "-")
cnt = int(row.get("symbol_count") or 0)
log_lines.append(f"✖ {nm} ({cnt}) -> {rsn}")
extra = max(0, len(rejected_rows) - debug_lines_limit)
if extra:
log_lines.append(f"INFO: ... {extra} more rejected routes omitted")
# Build watch items (labels + mapping) once; values will be refreshed from ParamStore on dirty ticks.
all_symbols = [sym for symbols in group_symbols.values() for sym in symbols]
desc_by_symbol = await resolver.describe_symbols(all_symbols)
for _group_name, symbols in group_symbols.items():
for sym in symbols:
symbol_groups.setdefault(sym, set()).add(_group_name)
if sym in watch:
continue
desc = desc_by_symbol.get(sym) or await resolver.describe_symbol(sym)
desc_cache[sym] = desc
resolved = await resolver.resolve_value(sym)
display_label = sym if token_labels else str(desc.get("label") or sym)
watch[sym] = _WatchItem(
symbol=sym,
label=display_label,
unit=resolved.unit,
address=resolved.address,
value=resolved.value,
value_label=resolved.value_label,
kind=resolved.kind,
)
symbol_deps[sym] = _collect_symbol_dependencies(desc, watch[sym])
addr = watch[sym].address
if isinstance(addr, str) and addr:
key_to_symbols.setdefault(addr, set()).add(sym)
if watch[sym].kind == "computed":
computed_symbols.add(sym)
for dep in symbol_deps[sym]:
key_to_computed_symbols.setdefault(dep, set()).add(sym)
flat_values = store.flatten()
for sym, item in watch.items():
desc = desc_cache.get(sym, {})
visible, _reason = resolver.parameter_visibility_diagnostics(desc=desc, resolved=item, flat_values=flat_values)
symbol_policy_visible[sym] = visible
_recompute_visible_groups()
_ensure_selected_symbol()
content_ready = True
dirty.set()
async def refresh_loop(*, live: Live) -> None:
# Manual refresh to reduce blinking.
nonlocal initial_refresh
last_periodic_refresh = time.monotonic()
while True:
try:
await asyncio.wait_for(dirty.wait(), timeout=0.25)
except TimeoutError:
continue
dirty.clear()
await asyncio.sleep(0.05)
if not content_ready:
continue
updated_keys = set(dirty_keys)
dirty_keys.clear()
symbols_to_refresh: set[str] = set()
if initial_refresh:
symbols_to_refresh = set(visible_computed_symbols)
initial_refresh = False
elif updated_keys:
for key in updated_keys:
symbols_to_refresh.update(key_to_symbols.get(key, set()))
symbols_to_refresh.update(key_to_computed_symbols.get(key, set()))
now = time.monotonic()
if not symbols_to_refresh and now - last_periodic_refresh >= periodic_refresh_seconds:
symbols_to_refresh = set(visible_computed_symbols)
last_periodic_refresh = now
elif symbols_to_refresh:
last_periodic_refresh = now
# Re-resolve values from ParamStore so we don't depend on matching raw update keys.
for sym in symbols_to_refresh:
it = watch.get(sym)
if it is None:
continue
resolved = await resolver.resolve_value(sym)
it.value = resolved.value
it.value_label = resolved.value_label
_recompute_visible_groups()
_ensure_selected_symbol()
refresh_panels()
live.refresh()
async def _perform_toggle_selected() -> None:
if not module_ids:
return
_ensure_selected_symbol()
if not selected_symbol:
return
desc = await resolver.describe_symbol(selected_symbol)
toggle_value = _toggle_value_for_symbol(desc=desc, store=store)
ok, message = await _execute_symbol_write(
api=api,
resolver=resolver,
store=store,
devid=module_ids[0],
symbol=selected_symbol,
value=toggle_value,
)
log_lines.append(("✔ " if ok else "✖ ") + message)
dirty.set()
async def _perform_set_selected(raw_input: str) -> None:
if not module_ids:
return
_ensure_selected_symbol()
if not selected_symbol:
return
requested_value = _parse_cli_value(raw_input)
ok, message = await _execute_symbol_write(
api=api,
resolver=resolver,
store=store,
devid=module_ids[0],
symbol=selected_symbol,
value=requested_value,
)
log_lines.append(("✔ " if ok else "✖ ") + message)
dirty.set()
async def keyboard_loop() -> None:
if not keyboard_control_enabled or termios_mod is None or tty_mod is None:
return
loop = asyncio.get_running_loop()
fd = 0
old_settings = termios_mod.tcgetattr(fd)
async def _read_key_async() -> str:
fut: asyncio.Future[str] = loop.create_future()
def _on_readable() -> None:
if fut.done():
return
try:
raw = os.read(fd, 3)
key = raw.decode("utf-8", errors="ignore")
except Exception:
key = ""
fut.set_result(key)
loop.add_reader(fd, _on_readable)
try:
return await fut
finally:
with contextlib.suppress(Exception):
loop.remove_reader(fd)
def _prompt_value(symbol: str) -> str:
termios_mod.tcsetattr(fd, termios_mod.TCSADRAIN, old_settings)
try:
console.print(f"\nSet value for {symbol}: ", end="")
return input().strip()
finally:
tty_mod.setcbreak(fd)
try:
tty_mod.setcbreak(fd)
while True:
key = await _read_key_async()
if not key:
continue
if key in ("\x1b[A", "k"):
_move_selection(-1)
dirty.set()
continue
if key in ("\x1b[B", "j"):
_move_selection(1)
dirty.set()
continue
if key == "t":
await _perform_toggle_selected()
continue
if key == "s":
_ensure_selected_symbol()
if selected_symbol is None:
continue
raw_value = _prompt_value(selected_symbol)
if raw_value:
await _perform_set_selected(raw_value)
else:
log_lines.append("INFO: set cancelled (empty input)")
dirty.set()
finally:
termios_mod.tcsetattr(fd, termios_mod.TCSADRAIN, old_settings)
live = Live(layout, console=console, screen=True, auto_refresh=False)
root_logger = logging.getLogger()
prev_handlers = list(root_logger.handlers)
prev_level = root_logger.level
prev_httpx_level = logging.getLogger("httpx").level
prev_ws_eio_level = logging.getLogger("pybragerone.api.ws.eio").level
prev_ws_sio_level = logging.getLogger("pybragerone.api.ws.sio").level
prev_catalog_level = logging.getLogger("pybragerone.models.catalog").level
handler = _TuiLogHandler()
handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s"))
# In TUI mode, don't print logs to stdout/stderr.
root_logger.handlers = [handler]
root_logger.setLevel(logging.DEBUG if debug_logging else logging.WARNING)
if not debug_logging:
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("pybragerone.api.ws.eio").setLevel(logging.WARNING)
logging.getLogger("pybragerone.api.ws.sio").setLevel(logging.WARNING)
logging.getLogger("pybragerone.models.catalog").setLevel(logging.WARNING)
try:
with live, contextlib.suppress(asyncio.CancelledError):
refresh_panels()
live.refresh()
async with asyncio.TaskGroup() as tg:
tg.create_task(bus_ingest())
tg.create_task(init_watch())
tg.create_task(refresh_loop(live=live))
tg.create_task(log_flush_loop())
tg.create_task(keyboard_loop())
finally:
root_logger.handlers = prev_handlers
root_logger.setLevel(prev_level)
logging.getLogger("httpx").setLevel(prev_httpx_level)
logging.getLogger("pybragerone.api.ws.eio").setLevel(prev_ws_eio_level)
logging.getLogger("pybragerone.api.ws.sio").setLevel(prev_ws_sio_level)
logging.getLogger("pybragerone.models.catalog").setLevel(prev_catalog_level)
[docs]
def build_parser() -> argparse.ArgumentParser:
"""Build argument parser."""
p = argparse.ArgumentParser(prog="pybragerone", description="BragerOne — diagnostic CLI (REST + WS).")
# env-first: if not in CLI, take from environment (easy to debug in VSCode)
p.add_argument(
"--email",
default=os.environ.get("PYBO_EMAIL"),
help="Login email (ENV: PYBO_EMAIL)",
)
p.add_argument(
"--password",
default=os.environ.get("PYBO_PASSWORD"),
help="Password (ENV: PYBO_PASSWORD)",
)
p.add_argument(
"--platform",
choices=[p.value for p in Platform],
default=os.environ.get("PYBO_PLATFORM", Platform.BRAGERONE.value),
help="Backend platform: bragerone or tisconnect (ENV: PYBO_PLATFORM). Default: bragerone",
)
p.add_argument(
"--lang",
default=os.environ.get("PYBO_LANG", "en"),
help="Language code for asset-driven labels (ENV: PYBO_LANG). Default: en",
)
p.add_argument(
"--object-id",
type=int,
default=os.environ.get("PYBO_OBJECT_ID") and int(os.environ["PYBO_OBJECT_ID"]),
help="Object ID (ENV: PYBO_OBJECT_ID)",
)
p.add_argument(
"--module",
dest="modules",
action="append",
help="Module code (devid/code). Can be specified multiple times or via ENV PYBO_MODULES=FTTCTBSLCE,OTHER",
default=os.environ.get("PYBO_MODULES"),
)
p.add_argument(
"--json",
action="store_true",
help="Print events as JSON (one line per update)",
)
p.add_argument("--raw-ws", action="store_true", help="Print raw WS events (debug)")
p.add_argument("--raw-http", action="store_true", help="Trace HTTP (warning: logs may contain data)")
p.add_argument("--no-diff", action="store_true", help="Don't print changes (arrows) on STDOUT")
p.add_argument("--debug", action="store_true", help="More logs")
p.add_argument("--quiet", action="store_true", help="Fewer logs")
p.add_argument(
"--all-panels",
action="store_true",
help="Render all available menu panels (3-column grid) instead of Boiler/DHW/Valve 1 only",
)
p.add_argument(
"--debug-panels",
action="store_true",
help="Log panel route inclusion/exclusion reasons in TUI console",
)
p.add_argument(
"--token-labels",
action="store_true",
help="Display raw symbol tokens as names in TUI instead of localized labels",
)
p.add_argument(
"--set",
dest="set_values",
action="append",
default=[],
help="Send SYMBOL=VALUE update (repeatable). Example: --set PARAM_0=76",
)
p.add_argument(
"--toggle",
dest="toggles",
action="append",
default=[],
help="Toggle a symbol using current state/rules (repeatable). Example: --toggle URUCHOMIENIE_KOTLA",
)
return p
[docs]
def main() -> None:
"""Main entrypoint for CLI."""
_maybe_load_dotenv()
parser = build_parser()
args = parser.parse_args()
# PYBO_MODULES=FOO,BAR → convert to list if --module was not used
if isinstance(args.modules, str):
args.modules = [m for m in args.modules.split(",") if m]
if not args.email or not args.password:
raise SystemExit("Missing credentials: set PYBO_EMAIL/PYBO_PASSWORD or pass --email/--password.")
try:
code = asyncio.run(run(args))
except ApiError as exc:
payload = exc.data
body = json.dumps(payload, ensure_ascii=False) if isinstance(payload, (dict, list)) else str(payload)
raise SystemExit(f"HTTP {exc.status}: {body}") from None
except httpx.RequestError as exc:
raise SystemExit(f"Network error: {exc.__class__.__name__}: {exc}") from None
except KeyboardInterrupt:
code = 130
raise SystemExit(code)
if __name__ == "__main__":
main()