"""Runtime-light parameter store.
This module intentionally contains *only* the minimal structures and logic
needed to store and update raw parameter values (e.g. ``P5.s0``).
All asset-driven behavior (mappings, menu grouping, i18n, computed STATUS rule
evaluation, and rich "describe" helpers) is implemented in
:class:`pybragerone.models.param_resolver.ParamResolver`.
"""
from __future__ import annotations
from collections.abc import Mapping
from typing import Any
from pydantic import BaseModel, ConfigDict, Field
from .events import EventBus
[docs]
class ParamFamilyModel(BaseModel):
"""One parameter "family" (e.g., P4 index 1) collecting channels: v/s/u/n/x..."""
pool: str
idx: int
channels: dict[str, Any] = Field(default_factory=dict)
model_config = ConfigDict(frozen=False, validate_assignment=True)
[docs]
def set(self, chan: str, value: Any) -> None:
"""Set raw channel value."""
self.channels[chan] = value
[docs]
def get(self, chan: str, default: Any = None) -> Any:
"""Get raw channel value, or default if not present."""
return self.channels.get(chan, default)
@property
def value(self) -> Any:
"""Raw value channel, if any."""
return self.channels.get("v")
@property
def unit_code(self) -> Any:
"""Raw unit code channel, if any."""
return self.channels.get("u")
@property
def status_raw(self) -> Any:
"""Raw status channel, if any."""
return self.channels.get("s")
[docs]
class ParamStore(BaseModel):
"""Store of live parameter values.
Notes:
Keys use the BragerOne addressing format: ``P<n>.<chan><idx>``
(e.g. ``P5.s0``, ``P4.v1``, ``P4.u1``).
This class is designed to be safe and fast for HA runtime.
"""
families: dict[str, ParamFamilyModel] = Field(default_factory=dict)
model_config = ConfigDict(frozen=False, validate_assignment=True)
[docs]
async def run_with_bus(self, bus: EventBus) -> None:
"""Consume ParamUpdate events from EventBus and upsert into ParamStore."""
async for upd in bus.subscribe():
if getattr(upd, "value", None) is None:
continue
await self.upsert_async(f"{upd.pool}.{upd.chan}{upd.idx}", upd.value)
def _fid(self, pool: str, idx: int) -> str:
"""Unique family ID for (pool, idx), e.g. 'P4:1'."""
return f"{pool}:{idx}"
[docs]
def upsert(self, key: str, value: Any) -> ParamFamilyModel | None:
"""Upsert a single parameter value by full key, e.g. ``P4.v1``."""
try:
pool, rest = key.split(".", 1)
chan = rest[0]
idx = int(rest[1:])
except Exception:
return None
fid = self._fid(pool, idx)
fam = self.families.get(fid)
if fam is None:
fam = ParamFamilyModel(pool=pool, idx=idx)
self.families[fid] = fam
fam.set(chan, value)
return fam
[docs]
async def upsert_async(self, key: str, value: Any) -> ParamFamilyModel | None:
"""Async upsert wrapper for convenience in async code."""
return self.upsert(key, value)
[docs]
def get_family(self, pool: str, idx: int) -> ParamFamilyModel | None:
"""Get ParamFamilyModel by (pool, idx) address, or None if not found."""
return self.families.get(self._fid(pool, idx))
[docs]
def flatten(self) -> dict[str, Any]:
"""Flattened view of all parameters as ``{ 'P4.v1': value, ... }``."""
return {f"{fam.pool}.{ch}{fam.idx}": val for fam in self.families.values() for ch, val in fam.channels.items()}
[docs]
def ingest_prime_payload(self, payload: Mapping[str, Any]) -> None:
"""Ingest REST prime payload (modules/parameters) into the store."""
for pools in payload.values():
if not isinstance(pools, Mapping):
continue
for pool, entries in pools.items():
if not isinstance(pool, str) or not isinstance(entries, Mapping):
continue
for chan_idx, body in entries.items():
if not isinstance(chan_idx, str) or len(chan_idx) < 2:
continue
chan = chan_idx[0]
try:
idx = int(chan_idx[1:])
except ValueError:
continue
chan_key = f"{pool}.{chan}{idx}"
if isinstance(body, Mapping):
fam: ParamFamilyModel | None
if "value" in body:
fam = self.upsert(chan_key, body["value"])
else:
fam = self.get_family(pool, idx)
if fam is None:
fam = self.upsert(chan_key, None)
if fam is not None:
meta_keys = (
"storable",
"createdAt",
"previousCreatedAt",
"updatedAt",
"updatedAtClient",
"expire",
"average",
)
for meta_key in meta_keys:
if meta_key in body:
fam.set(meta_key, body[meta_key])
else:
self.upsert(chan_key, body)