Source code for pybragerone.models.menu_manager
"""Refactored menu system with clear separation of concerns.
New design:
1. MenuParser - parses raw JS and stores raw menu data
2. MenuProcessor - applies filtering, validation, i18n etc.
3. ProcessedMenu - final clean result with all transformations applied
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Any
from pydantic import ValidationError
from .menu import MenuParameter, MenuResult
[docs]
@dataclass
class RawMenuData:
"""Raw menu data parsed from JavaScript asset."""
routes: list[dict[str, Any]] = field(default_factory=list)
asset_url: str | None = None
device_menu: int | None = None
parsed_at: float = field(default_factory=lambda: __import__("time").time())
[docs]
def route_count(self) -> int:
"""Count total routes including nested."""
def count_recursive(routes: list[dict[str, Any]]) -> int:
count = len(routes)
for route in routes:
children = route.get("children", [])
count += count_recursive(children)
return count
return count_recursive(self.routes)
[docs]
class MenuProcessor:
"""Processes raw menu data with various filters and transformations."""
def __init__(self, raw_menu: RawMenuData, logger: logging.Logger | None = None) -> None:
"""Initialise processor with raw menu data and optional logger."""
self.raw_menu = raw_menu
self.logger = logger or logging.getLogger(__name__)
def _prune_invalid_routes(self, routes: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Remove route nodes that can't be validated as MenuRoute.
A valid route must include non-empty `path` and `name` strings.
"""
def is_valid(node: dict[str, Any]) -> bool:
path = node.get("path")
name = node.get("name")
return isinstance(path, str) and bool(path.strip()) and isinstance(name, str) and bool(name.strip())
pruned: list[dict[str, Any]] = []
for node in routes:
if not is_valid(node):
continue
clean = dict(node)
children = clean.get("children")
if isinstance(children, list):
clean["children"] = self._prune_invalid_routes([c for c in children if isinstance(c, dict)])
pruned.append(clean)
return pruned
[docs]
def get_debug_info(self) -> dict[str, Any]:
"""Get debug information about the menu."""
return {
"raw_routes_count": self.raw_menu.route_count(),
"asset_url": self.raw_menu.asset_url,
"device_menu": self.raw_menu.device_menu,
"parsed_at": self.raw_menu.parsed_at,
"sample_route": self.raw_menu.routes[0] if self.raw_menu.routes else None,
"route_names": [route.get("name", "unnamed") for route in self.raw_menu.routes],
}
def _deep_copy_routes(self, routes: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Create deep copy of routes for safe processing."""
import copy
return copy.deepcopy(routes)
def _apply_permission_filter(
self,
routes: list[dict[str, Any]],
permissions: set[str],
include_invisible: bool,
) -> list[dict[str, Any]]:
"""Filter routes respecting the provided permission set."""
detected_prefixes = self._detect_permission_prefixes(routes)
self.logger.debug("Detected permission prefixes: %s", detected_prefixes)
def normalize_permission(raw: str | None) -> str:
if not raw:
return ""
for prefix in detected_prefixes:
if raw.startswith(prefix):
return raw[len(prefix) :]
return raw
def has_permission(required: str | None) -> bool:
if not required:
return True
normalized = normalize_permission(required)
return normalized in permissions
def gate(node: dict[str, Any]) -> dict[str, Any]:
raw_meta = node.get("meta")
meta = raw_meta if isinstance(raw_meta, dict) else None
# Route permission may live either in meta.permissionModule (common) or directly on the route object
# depending on the bundler/build.
route_perm = None
if meta is not None and meta.get("permissionModule") is not None:
route_perm = meta.get("permissionModule")
elif node.get("permissionModule") is not None:
route_perm = node.get("permissionModule")
visible = has_permission(route_perm)
def filter_parameters(container: dict[str, Any]) -> None:
for section in ("read", "write", "status", "special"):
original = container.get(section) or []
filtered: list[dict[str, Any] | str] = []
for item in original:
if isinstance(item, dict):
allowed = has_permission(item.get("permissionModule"))
if allowed or include_invisible:
filtered.append(dict(item))
else:
filtered.append(item)
container[section] = filtered
params = node.get("parameters")
if isinstance(params, dict):
filter_parameters(params)
node["parameters"] = params
if meta is not None:
meta_params = meta.get("parameters")
if isinstance(meta_params, dict):
filter_parameters(meta_params)
meta["parameters"] = meta_params
node["meta"] = meta
else:
node.pop("meta", None)
children = node.get("children") or []
node["children"] = [gate(child) for child in children if isinstance(child, dict)]
node["_visible"] = visible
return node
def strip_invisible(processed: list[dict[str, Any]]) -> list[dict[str, Any]]:
result: list[dict[str, Any]] = []
for route in processed:
if not isinstance(route, dict):
continue # type: ignore[unreachable]
if not include_invisible and not route.get("_visible", False):
continue
clean = dict(route)
clean.pop("_visible", None)
if isinstance(clean.get("children"), list):
clean["children"] = strip_invisible(clean["children"])
result.append(clean)
return result
gated = [gate(route) for route in routes if isinstance(route, dict)]
return strip_invisible(gated)
def _apply_i18n(self, routes: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Perform minimal i18n cleanup such as trimming display names."""
def apply(node: dict[str, Any]) -> None:
meta = node.get("meta")
if isinstance(meta, dict) and isinstance(meta.get("displayName"), str):
meta["displayName"] = meta["displayName"].strip()
children = node.get("children")
if isinstance(children, list):
for child in children:
if isinstance(child, dict):
apply(child)
for route in routes:
if isinstance(route, dict):
apply(route)
return routes
def _resolve_tokens(self, routes: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Normalize parameter entries to guarantee clean tokens before validation."""
detected_prefixes = self._detect_permission_prefixes(routes)
def normalize_permission(value: str | None) -> str | None:
if value is None:
return None
for prefix in detected_prefixes:
if value.startswith(prefix):
return value[len(prefix) :]
return value
def normalize_list(items: list[Any]) -> list[Any]:
normalized: list[Any] = []
for item in items:
if isinstance(item, dict):
token = item.get("token")
parameter = item.get("parameter")
if isinstance(token, str) and token and isinstance(parameter, str) and parameter:
fast_item = dict(item)
permission_module = fast_item.get("permissionModule")
if isinstance(permission_module, str) and permission_module:
fast_item["permissionModule"] = normalize_permission(permission_module)
normalized.append(fast_item)
continue
try:
model = MenuParameter.model_validate(item)
normalized.append(model.model_dump(mode="json", by_alias=True))
except ValidationError:
normalized.append(dict(item))
else:
normalized.append(item)
return normalized
def process(node: dict[str, Any]) -> dict[str, Any]:
params = node.get("parameters")
if isinstance(params, dict):
for section in ("read", "write", "status", "special"):
params[section] = normalize_list(params.get(section, []) or [])
node["parameters"] = params
meta = node.get("meta")
if isinstance(meta, dict):
meta_params = meta.get("parameters")
if isinstance(meta_params, dict):
for section in ("read", "write", "status", "special"):
meta_params[section] = normalize_list(meta_params.get(section, []) or [])
meta["parameters"] = meta_params
node["meta"] = meta
children = node.get("children")
if isinstance(children, list):
node["children"] = [process(child) for child in children if isinstance(child, dict)]
return node
return [process(route) for route in routes if isinstance(route, dict)]
def _detect_permission_prefixes(self, routes: list[dict[str, Any]]) -> list[str]:
"""Auto-detect permission prefixes used in the menu."""
prefixes = set()
prefix_re = __import__("re").compile(r"^(?P<prefix>[A-Za-z]{1,3}\.)")
def scan_route(route: dict[str, Any]) -> None:
meta = route.get("meta", {})
# Check route permission
perm = meta.get("permissionModule") or route.get("permissionModule") or ""
if perm:
m = prefix_re.match(str(perm))
if m:
prefixes.add(m.group("prefix"))
# Check parameter permissions
params = meta.get("parameters", {})
for section in ("read", "write", "status", "special"):
items = params.get(section, [])
for item in items:
if isinstance(item, dict):
item_perm = item.get("permissionModule", "")
if item_perm:
m = prefix_re.match(str(item_perm))
if m:
prefixes.add(m.group("prefix"))
# Scan children
for child in route.get("children", []):
scan_route(child)
for route in routes:
scan_route(route)
# Return in consistent order
return sorted(prefixes)
[docs]
class MenuManager:
"""High-level menu management with caching and convenience methods."""
def __init__(self, logger: logging.Logger | None = None) -> None:
"""Create a manager with optional externally provided logger."""
self.logger = logger or logging.getLogger(__name__)
self._raw_cache: dict[int, RawMenuData] = {}
self._processor_cache: dict[int, MenuProcessor] = {}
[docs]
def get_debug_info(self, device_menu: int) -> dict[str, Any]:
"""Get debug information for device_menu."""
if device_menu not in self._processor_cache:
raise ValueError(f"No menu data for device_menu={device_menu}")
processor = self._processor_cache[device_menu]
return processor.get_debug_info()
[docs]
def clear_cache(self) -> None:
"""Clear all cached menu data."""
self._raw_cache.clear()
self._processor_cache.clear()
self.logger.info("Cleared menu cache")