"""Implementation of live asset catalog parsing from BragerOne web app.
This module provides classes and utilities for parsing and managing live assets
from the BragerOne web application, including:
- LiveAssetsCatalog: Main entry point for fetching and parsing assets
- AssetRef, AssetIndex: Data structures for tracking asset references
- ParamMap: Data structures for menu routes and parameter mappings
- TranslationConfig: Configuration for available translations
"""
from __future__ import annotations
import asyncio
import logging
import re
import time
from collections.abc import Iterable, Mapping
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
import tree_sitter_javascript
from tree_sitter import Language, Node, Parser, Tree
from .menu import MenuResult
from .menu_manager import MenuManager, RawMenuData
if TYPE_CHECKING:
from ..api import BragerOneApiClient
JS_LANGUAGE = Language(tree_sitter_javascript.language())
LOG = logging.getLogger(__name__)
# ----------------------------- Data types -----------------------------
[docs]
@dataclass(slots=True)
class AssetRef:
"""Reference to a versioned asset from the BragerOne catalog.
Attributes:
url: The full URL to the asset file.
base: The base name without hash (e.g., 'module.menu' from 'module.menu-BNvCCsxi').
hash: The hash identifier for this version (e.g., 'BNvCCsxi').
etag: Optional ETag from HTTP headers for cache validation.
last_modified: Optional last modified timestamp from HTTP headers.
"""
url: str
base: str # BASENAME (np. 'module.menu-BNvCCsxi' → base='module.menu')
hash: str # 'BNvCCsxi'
etag: str | None = None
last_modified: str | None = None
[docs]
@dataclass(slots=True)
class AssetIndex:
"""Index of assets parsed from ``index-*.js`` file.
Attributes:
assets_by_basename: Full list of assets declared in ``index-*.js`` (exact basenames, no normalization).
Maps basename strings to lists of AssetRef objects.
menu_map: Mapping from deviceMenu integers to BASENAME strings (e.g., ``module.menu-<hash>.js``).
inline_param_candidates: List of (start_byte, end_byte) tuples indicating potential
inline parameter maps detected within ``index-*.js``.
index_bytes: Raw bytes of the index file for potential inline parsing.
"""
# full list of assets declared in index-*.js (exact basenames, no normalization)
assets_by_basename: dict[str, list[AssetRef]] = field(default_factory=dict)
# mapping from deviceMenu:int -> BASENAME('module.menu-<hash>.js')
menu_map: dict[int, str] = field(default_factory=dict)
# inline parameter maps detected within index-*.js
inline_param_candidates: list[tuple[int, int]] = field(default_factory=list) # (start_byte, end_byte)
# raw index for potential inline parsing
index_bytes: bytes = b""
[docs]
def find_asset_for_basename(self, basename: str) -> AssetRef | None:
"""Find an asset reference by its basename.
Args:
basename: The basename of the asset to search for.
Returns:
The last asset reference found for the given basename, or None if no
asset is found. When multiple assets share the same basename, returns
the last one (typically the newest hash).
Note:
Uses an alpha heuristic that assumes the last asset in the list is
the most recent version when multiple assets share the same basename.
"""
lst = self.assets_by_basename.get(basename)
if not lst:
return None
# Alpha heuristic: take the last found (usually the newest hash)
return lst[-1]
[docs]
def find_asset_for_full_name(self, full_name: str) -> AssetRef | None:
"""Find an asset reference by its full name with hash (e.g., 'module.menu-Dbo_n32n').
Args:
full_name: The full name of the asset with hash to search for.
Returns:
The asset reference if found, or None if not found.
"""
# Search through all assets by basename to find the one with matching full name
for basename_assets in self.assets_by_basename.values():
for asset in basename_assets:
if f"{asset.base}-{asset.hash}" == full_name:
return asset
return None
[docs]
@dataclass(slots=True)
class ParamMap:
"""Represents a parameter mapping in the BragerOne system.
This class encapsulates the mapping between parameter identifiers and their
metadata, including paths, units, limits, and status flags. It serves as a
structured representation of parameter configurations retrieved from the router.
Attributes:
key: The unique parameter identifier token from the router.
Examples: "URUCHOMIENIE_KOTLA", "PARAM_66".
group: Optional parameter group identifier. Example: "P6".
paths: Dictionary mapping path types to their respective path lists.
Entries are normalized lists of dictionaries describing pool/index/use
metadata for value/unit/status/command/min/max channels.
component_type: Optional type of the UI component associated with this parameter.
units: Optional measurement unit identifier. May be a localized string, numeric
code, or mapping used for enumerations; resolution happens via the units
i18n catalog where possible.
limits: Optional dictionary containing parameter limit definitions and constraints.
status_flags: List of dictionaries defining various status flags and their meanings.
origin: Source of the parameter map definition.
Format: "asset:<url>" for external sources or "inline:index" for inline definitions.
raw: The raw, unprocessed parameter map dictionary. Preserved for future use
in internationalization (i18n) and logging purposes.
Example:
>>> param_map = ParamMap()
>>> param_map.key = "PARAM_66"
>>> param_map.group = "P6"
>>> param_map.paths = {"v": ["status", "value"], "u": ["status", "unit"]}
"""
key: str # token from router: np. "URUCHOMIENIE_KOTLA" lub "PARAM_66"
group: str | None # np. "P6"
paths: dict[str, list[dict[str, Any]]] # value/unit/status/command/min/max channel descriptors
component_type: str | None
units: str | int | float | dict[str, Any] | list[Any] | None
limits: dict[str, Any] | None
status_flags: list[dict[str, Any]]
status_conditions: dict[str, list[dict[str, Any]]] | None
command_rules: list[dict[str, Any]]
origin: str # "asset:<url>" or "inline:index"
raw: dict[str, Any] # raw map (will be useful later for i18n/logging)
# ----------------------------- Parser helpers -----------------------------
class _TS:
"""Lightweight wrapper around tree-sitter Parser for JavaScript/TypeScript.
This class provides a simplified interface to the tree-sitter parser
specifically configured for JavaScript language parsing.
Attributes:
parser: The underlying tree-sitter Parser instance.
"""
__slots__ = ("parser",)
def __init__(self) -> None:
"""Initialize the parser with JavaScript language support.
Sets up a tree-sitter Parser instance configured to parse JavaScript
and TypeScript code using the tree-sitter-javascript grammar.
"""
self.parser = Parser()
self.parser.language = JS_LANGUAGE
def parse(self, code: bytes) -> Tree:
"""Parse JavaScript code into an Abstract Syntax Tree (AST).
Args:
code: The JavaScript source code as bytes to parse.
Returns:
A tree-sitter Tree object representing the parsed AST.
Example:
>>> parser = _TS()
>>> tree = parser.parse(b'const x = 42;')
>>> print(tree.root_node.type)
program
"""
return self.parser.parse(code)
def _node_text(code: bytes, n: Node) -> str:
"""Extract text content from a tree-sitter Node.
Args:
code: The source code bytes the node comes from.
n: The tree-sitter Node to extract text from.
Returns:
The decoded text content of the node.
"""
return code[n.start_byte : n.end_byte].decode("utf-8", errors="replace")
def _is_string(n: Node) -> bool:
"""Check if a Node represents a string literal.
Args:
n: The tree-sitter Node to check.
Returns:
True if the node is a string or template_string type.
"""
return n.type in {"string", "template_string"}
def _string_value(text: str) -> str:
"""Extract the value from a string literal, removing quotes.
Args:
text: The raw string text including quotes.
Returns:
The string value with surrounding quotes removed.
"""
if len(text) >= 2 and text[0] in "\"'`" and text[-1] == text[0]:
return text[1:-1]
return text
def _split_top_level_csv(raw: str) -> list[str]:
parts: list[str] = []
cur: list[str] = []
depth = 0
for ch in raw:
if ch in "([{":
depth += 1
elif ch in ")]}":
depth = max(0, depth - 1)
if ch == "," and depth == 0:
part = "".join(cur).strip()
if part:
parts.append(part)
cur = []
continue
cur.append(ch)
tail = "".join(cur).strip()
if tail:
parts.append(tail)
return parts
def _build_factory_callable(source: str, bindings: dict[str, Any] | None = None) -> Any:
text = source.strip()
match = re.match(
r"^\(\s*\{(?P<params>.*)\}\s*\)\s*=>\s*\((?P<body>\{.*\})\)\s*$",
text,
flags=re.DOTALL,
)
if match is None:
return source
params_text = match.group("params")
body_text = match.group("body")
param_specs: list[tuple[str, str, str | None]] = []
for item in _split_top_level_csv(params_text):
if ":" not in item:
continue
key_part, var_part = item.split(":", 1)
src_key = key_part.strip()
var_spec = var_part.strip()
default_expr: str | None = None
if "=" in var_spec:
var_name, default_expr_raw = var_spec.split("=", 1)
var_spec = var_name.strip()
default_expr = default_expr_raw.strip() or None
if src_key and var_spec:
param_specs.append((src_key, var_spec, default_expr))
base_bindings = dict(bindings or {})
def _factory(arg: Any) -> Any:
arg_map = arg if isinstance(arg, Mapping) else {}
local_bindings = dict(base_bindings)
for src_key, var_name, default_expr in param_specs:
if src_key in arg_map:
local_bindings[var_name] = arg_map.get(src_key)
continue
if isinstance(default_expr, str) and default_expr:
local_bindings[var_name] = local_bindings.get(default_expr, default_expr)
else:
local_bindings[var_name] = None
body_src = f"const __factory_obj = {body_text};"
tree = _TS().parse(body_src.encode("utf-8"))
for statement in tree.root_node.named_children:
if statement.type not in {"lexical_declaration", "variable_declaration"}:
continue
for declarator in statement.named_children:
if declarator.type != "variable_declarator":
continue
name_node = declarator.child_by_field_name("name")
value_node = declarator.child_by_field_name("value")
if name_node is None or value_node is None:
continue
if _node_text(body_src.encode("utf-8"), name_node).strip() != "__factory_obj":
continue
return _node_to_python(body_src.encode("utf-8"), value_node, local_bindings)
return None
return _factory
def _walk(node: Node) -> Iterable[Node]:
"""Depth-first traversal of tree-sitter Node tree.
Args:
node: The root node to start traversal from.
Yields:
Each node in the tree in depth-first order.
"""
stack = [node]
while stack:
cur = stack.pop()
yield cur
for i in range(cur.child_count - 1, -1, -1):
child = cur.child(i)
if child is not None:
stack.append(child)
def _find_export_root(code: bytes, root: Node) -> Node | None:
"""Find the root export object or array in a JavaScript AST.
This function implements a two-step strategy:
1. Look for explicit export statements containing object/array literals
2. Fall back to finding the largest object/array in the tree
Args:
code: The source code bytes (currently unused but kept for consistency).
root: The root node of the AST to search.
Returns:
The Node representing the main export object/array, or None if not found.
"""
# 1) export default <expr>
for n in _walk(root):
if n.type == "export_statement":
# looking for object/array literal in export
for ch in n.named_children:
if ch.type in {"object", "array"}:
return ch
# 2) fallback: largest object/array
best = None
best_sz = -1
for n in _walk(root):
if n.type in {"object", "array"}:
sz = n.end_byte - n.start_byte
if sz > best_sz:
best, best_sz = n, sz
return best
def _node_to_python(code: bytes, node: Node, bindings: dict[str, Any] | None = None) -> Any:
"""Convert an arbitrary AST node to a Python object.
When *bindings* is provided, identifier nodes are resolved using the
already-converted values stored in the mapping. This allows handling of the
common pattern where large dictionaries reuse previously declared constants
(e.g. i18n bundles exporting `const foo = "label"; const map = {key: foo};`).
"""
t = node.type
if t == "object":
obj: dict[str, Any] = {}
for prop in node.named_children:
if prop.type != "pair":
continue
key_node = prop.child_by_field_name("key")
value_node = prop.child_by_field_name("value")
if key_node is None or value_node is None:
continue
key = _string_value(_node_text(code, key_node)) if _is_string(key_node) else _node_text(code, key_node)
obj[key] = _node_to_python(code, value_node, bindings)
return obj
if t == "array":
return [_node_to_python(code, child, bindings) for child in node.named_children]
if t == "parenthesized_expression" and node.named_children:
return _node_to_python(code, node.named_children[0], bindings)
if node.type == "template_string":
template = _node_text(code, node)
if bindings:
template = re.sub(
r"\$\{([A-Za-z_$][\w$]*)\}",
lambda m: str(bindings.get(m.group(1), m.group(0))),
template,
)
return _string_value(template)
if _is_string(node):
return _string_value(_node_text(code, node))
if t == "number":
text = _node_text(code, node)
try:
return float(text) if any(c in text for c in ".eE") else int(text)
except Exception:
return text
if t in {"true", "false"}:
return t == "true"
if t == "null":
return None
if t in {"identifier", "property_identifier"}:
name = _node_text(code, node)
if bindings and name in bindings:
return bindings[name]
return name
if t == "arrow_function":
raw_function = _node_text(code, node)
return _build_factory_callable(raw_function, bindings)
if t == "call_expression":
func_node = node.child_by_field_name("function")
args_node = node.child_by_field_name("arguments")
callee = _node_to_python(code, func_node, bindings) if func_node is not None else None
if callable(callee) and args_node is not None:
args = [_node_to_python(code, child, bindings) for child in args_node.named_children]
if len(args) == 1:
return callee(args[0])
return callee(args)
return _node_text(code, node)
def _object_to_python(code: bytes, node: Node, *, bindings: dict[str, Any] | None = None) -> Any:
"""Backward-compatible wrapper around :func:`_node_to_python` for objects."""
result = _node_to_python(code, node, bindings)
return result if isinstance(result, dict) else {}
def _collect_bindings(code: bytes, root: Node) -> dict[str, Any]:
"""Collect simple top-level bindings to resolve identifiers during conversion."""
bindings: dict[str, Any] = {}
for statement in root.named_children:
if statement.type not in {"lexical_declaration", "variable_declaration"}:
continue
for declarator in statement.named_children:
if declarator.type != "variable_declarator":
continue
name_node = declarator.child_by_field_name("name")
value_node = declarator.child_by_field_name("value")
if name_node is None or value_node is None:
continue
if name_node.type not in {"identifier", "property_identifier"}:
continue
name = _node_text(code, name_node)
if not name:
continue
try:
value = _node_to_python(code, value_node, bindings)
except Exception as exc: # pragma: no cover - defensive parsing fallback
LOG.debug("Failed to resolve binding '%s': %s", name, exc)
else:
bindings[name] = value
return bindings
# ----------------------------- TranslationConfig -----------------------------
[docs]
@dataclass
class TranslationConfig:
"""Configuration of available translations."""
translations: list[dict[str, Any]]
default_translation: str
# ----------------------------- LiveAssetsCatalog -----------------------------
[docs]
class LiveAssetsCatalog:
"""Main entry point: fetches index-<hash>.js, parses router (module.menu-<hash>.js).
Only loads necessary parameter map files (based on router tokens).
"""
def __init__(
self,
api: BragerOneApiClient,
logger: logging.Logger | None = None,
visibility_strategy: str = "independent", # "independent" | "parent_gates_children" (for future use)
schemas_enabled: bool = False, # hook (OFF)
request_timeout: float = 8.0,
concurrency: int = 8,
) -> None:
"""Initialize the LiveAssetsCatalog.
Args:
api: BragerOneApiClient used to fetch assets and index files.
logger: Optional logger to use for informational and debug output.
visibility_strategy: Strategy for gating menu visibility (default 'independent').
schemas_enabled: Whether schema validation is enabled (currently unused).
request_timeout: Timeout for network requests in seconds.
concurrency: Maximum concurrent network operations (reserved for future use).
"""
self._api = api
self._timeout = request_timeout
self._ts = _TS()
self._idx = AssetIndex()
self._log = logger or logging.getLogger(__name__)
self._last_index_url: str | None = None
# Cache for language-scoped i18n namespaces (from index-driven mapping)
self._cache_i18n: dict[tuple[str, str], dict[str, Any]] = {}
self._i18n_lock = asyncio.Lock()
self._units_descriptor_table: dict[str, dict[str, Any]] | None = None
self._units_descriptor_lock = asyncio.Lock()
self._index_token_raw_maps: dict[str, dict[str, Any]] | None = None
self._index_token_raw_maps_sig: tuple[int, bytes, bytes] | None = None
self._index_token_raw_maps_lock = asyncio.Lock()
# New menu management system
self._menu_manager = MenuManager(self._log)
# Track auto-discovery attempts to help guard repeated network fetches
self._index_autoload_attempts = 0
def _smart_urljoin(self, base_url: str, relative_url: str) -> str:
"""Smart urljoin that handles assets prefix correctly to avoid double /assets/ paths.
Args:
base_url: Base URL (usually an index file like /assets/index-hash.js)
relative_url: Relative URL, may start with assets/
Returns:
Properly joined URL without duplicate /assets/ segments
"""
from urllib.parse import urljoin
# If relative_url starts with assets/ and base_url contains /assets/,
# remove the assets/ prefix from relative_url to avoid duplication
if relative_url.startswith("assets/") and "/assets/" in base_url:
# Remove assets/ prefix
relative_url = relative_url[7:] # len('assets/') = 7
return urljoin(base_url, relative_url)
# ---------- lifecycle ----------
async def __aenter__(self) -> LiveAssetsCatalog:
"""Enter the async context manager and return self."""
return self
async def __aexit__(self, *exc: Any) -> None:
"""Exit the async context manager."""
pass
# ---------- INDEX ----------
[docs]
async def refresh_index(self, index_url: str) -> None:
"""Fetches index-<hash>.js and builds full asset index.
- assets_by_basename (exact BASENAME → [AssetRef])
- menu_map: int -> BASENAME(module.menu-<hash>.js), if present in index
- inline_param_candidates: list of (start,end) large objects in index that look like param-maps
"""
code = await self._api.get_bytes(index_url)
self._last_index_url = index_url # Store for i18n URL construction
self._idx = self._build_asset_index_from_index_js(index_url, code)
self._units_descriptor_table = None
self._index_token_raw_maps = None
self._index_token_raw_maps_sig = None
self._log.info(
"INDEX: assets=%d basenames=%d menus=%d inline_param_candidates=%d",
sum(len(v) for v in self._idx.assets_by_basename.values()),
len(self._idx.assets_by_basename),
len(self._idx.menu_map),
len(self._idx.inline_param_candidates),
)
async def _auto_discover_and_load_index(self) -> None:
"""Auto-discover and load the index file."""
import re
import httpx
self._log.info("Auto-discovering index file...")
try:
# Try to discover index URL from assets page
base = self._api.one_base.rstrip("/")
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(f"{base}/assets/")
resp.raise_for_status()
assets_html = resp.text
# Look for patterns like /assets/index-XXXXXXXX.js
m = re.search(r"/assets/(index-[A-Za-z0-9_-]+\.js)", assets_html)
if m:
index_filename = m.group(1)
discovered_url = f"{base}/assets/{index_filename}"
self._log.info("Discovered index: %s", discovered_url)
await self.refresh_index(discovered_url)
return
# Try alternative URLs if discovery failed
alt_urls = [f"{base}/assets/index-main.js", f"{base}/assets/index.js"]
for alt_url in alt_urls:
try:
self._log.debug("Trying alternative: %s", alt_url)
await self.refresh_index(alt_url)
self._log.info("Success with alternative: %s", alt_url)
return
except Exception as e:
self._log.debug("Alternative failed: %s - %s", alt_url, e)
# If all fails, log warning
self._log.warning("Failed to auto-discover index file")
except Exception as e:
self._log.warning("Error auto-discovering index: %s", e)
async def _ensure_index_loaded(self) -> None:
"""Best-effort ensure that the asset index is available."""
if self._idx.index_bytes or self._idx.assets_by_basename:
return
attempt = self._index_autoload_attempts + 1
self._index_autoload_attempts = attempt
self._log.debug("Index not loaded yet. Auto-discovery attempt %d", attempt)
try:
await self._auto_discover_and_load_index()
except Exception as e: # pragma: no cover - should not happen but guard just in case
self._log.warning("Auto-discovery attempt %d failed: %s", attempt, e)
def _build_asset_index_from_index_js(self, index_url: str, code: bytes) -> AssetIndex:
text = code.decode("utf-8", errors="replace")
# Performance optimization: limit regex search to first part of file for most assets
# Most index files have assets declared early, but we keep a fallback for full file
search_text = text[:50000] if len(text) > 50000 else text
# 1) Collect all '*-<hash>.js' paths from literals - SIMPLIFIED FOR PERFORMANCE
# Much simpler regex that should be faster
assets_by_basename: dict[str, list[AssetRef]] = {}
# Parameters and i18n assets frequently live deeper in the bundle, so we scan both the
# leading slice and the remaining part. We also normalise basenames to strip path prefixes.
simple_pattern = re.compile(r"([A-Za-z0-9._/-]+?)-([A-Za-z0-9_-]+)\.js")
def _register(matches: Iterable[re.Match[str]]) -> None:
for m in matches:
raw_base = m.group(1)
h = m.group(2)
# Normalise '../parameters/write/FOO' → 'FOO'
norm_base = raw_base.rsplit("/", 1)[-1]
full_url = self._smart_urljoin(index_url, f"{raw_base}-{h}.js")
bucket = assets_by_basename.setdefault(norm_base, [])
if all(existing.hash != h or existing.url != full_url for existing in bucket):
bucket.append(AssetRef(url=full_url, base=norm_base, hash=h))
_register(simple_pattern.finditer(search_text))
if len(text) > len(search_text):
self._log.debug("Scanning remaining index fragment for additional assets...")
_register(simple_pattern.finditer(text[len(search_text) :]))
# 2) Try to find mapping from deviceMenu:int -> 'module.menu-<hash>.js' in index (Regex fallback)
# Use regex pattern matching since AST parsing fails on complex minified code in Object.assign
menu_map: dict[int, str] = {}
try:
# Look for deviceMenu patterns in Object.assign calls
# Pattern examples:
# "../config/router/deviceMenu/N/module.menu.ts":()=>d(()=>import("./module.menu-HASH.js"))
# "../../config/router/deviceMenu/N/module.menu.ts":()=>d(()=>import('./module.menu-HASH.js'))
device_menu_pattern = (
r"['\"](?:\.\./)+config/router/deviceMenu/(\d+)/module\.menu\.ts['\"]"
r"\s*:\s*\(\)\s*=>\s*d\s*\(\s*\(\)\s*=>\s*import\s*\(\s*"
r"['\"]\./(module\.menu-[A-Za-z0-9_-]+)\.js['\"]\s*\)"
)
# Code is bytes (from function parameter), decode to string for regex
code_str = code.decode("utf-8")
for match in re.finditer(device_menu_pattern, code_str):
device_menu_num = int(match.group(1))
menu_file_name = match.group(2)
menu_map[device_menu_num] = menu_file_name
self._log.debug("Found deviceMenu mapping: %d -> %s", device_menu_num, menu_file_name)
except Exception as regex_e:
self._log.debug("Regex deviceMenu parsing failed: %s", regex_e)
# Prepare limited code for AST parsing (needed for inline param candidates)
ast_limit = min(1000000, len(code))
limited_code = code[:ast_limit]
# 3) Inline param-map candidates (AST on limited data)
inline_candidates: list[tuple[int, int]] = []
if len(limited_code) > 0:
try:
# Reuse tree if available, or parse again for param candidates
if "tree" not in locals():
tree = self._ts.parse(limited_code)
objects_checked = 0
max_param_objects = 10 # More objects for param search
for n in _walk(tree.root_node):
if n.type == "object" and objects_checked < max_param_objects:
objects_checked += 1
approx_len = n.end_byte - n.start_byte
if approx_len > 200: # heuristic: larger objects are more likely to be maps
# quick "shape" of param-map: look for keywords inside
obj_start = n.start_byte
obj_end = min(n.end_byte, len(limited_code))
snippet = limited_code[obj_start:obj_end]
if any(
key in snippet
for key in (b"group", b"pool", b"use", b"value", b"unit", b"status", b"componentType")
):
inline_candidates.append((obj_start, obj_end))
except Exception as param_e:
self._log.debug("Param candidates AST parsing failed: %s", param_e)
return AssetIndex(
assets_by_basename=assets_by_basename,
menu_map=menu_map,
inline_param_candidates=inline_candidates,
index_bytes=code,
)
# ---------- MENU ----------
async def _load_and_cache_menu(self, device_menu: int) -> None:
"""Load raw menu data and cache it in MenuManager."""
# Auto-load index if not loaded yet
if not self._idx.menu_map and not self._idx.assets_by_basename:
await self._auto_discover_and_load_index()
# Find asset for device_menu
menu_name = self._idx.menu_map.get(device_menu)
if not menu_name:
# Some accounts/modules report device_menu=0 or values not present in index mappings.
# In such cases, the app often still has a generic `module.menu-<hash>.js`.
self._log.debug(
"No menu mapping found for device_menu=%d; falling back to generic module.menu asset (if available)",
device_menu,
)
asset = self._idx.find_asset_for_basename("module.menu")
if not asset:
self._log.warning("No menu asset found for device_menu=%d", device_menu)
self._menu_manager.store_raw_menu(device_menu, [], None)
return
self._log.info("Loading menu asset: %s", asset.url)
code = await self._api.get_bytes(asset.url)
raw_routes = self._parse_menu_routes(code)
self._menu_manager.store_raw_menu(device_menu=device_menu, routes=raw_routes, asset_url=asset.url)
self._log.info("Cached raw menu for device_menu=%d: %d routes", device_menu, len(raw_routes))
return
# Get asset reference
asset = self._idx.find_asset_for_full_name(menu_name)
if not asset:
asset = self._idx.find_asset_for_basename("module.menu")
if not asset:
self._log.warning("No menu asset found for device_menu=%d", device_menu)
self._menu_manager.store_raw_menu(device_menu, [], None)
return
# Fetch and parse menu
self._log.info("Loading menu asset: %s", asset.url)
code = await self._api.get_bytes(asset.url)
# Parse raw routes (no filtering, no processing)
raw_routes = self._parse_menu_routes(code)
# Store in cache
self._menu_manager.store_raw_menu(device_menu=device_menu, routes=raw_routes, asset_url=asset.url)
self._log.info("Cached raw menu for device_menu=%d: %d routes", device_menu, len(raw_routes))
def _parse_menu_routes(self, code: bytes) -> list[dict[str, Any]]:
"""Returning list of root-routes (1:1 with export in module.menu-*.js)."""
tree = self._ts.parse(code)
bindings = _collect_bindings(code, tree.root_node)
root = _find_export_root(code, tree.root_node)
if root is None:
return []
export_obj = _node_to_python(code, root, bindings)
def extract_routes(value: Any, depth: int = 0) -> list[dict[str, Any]]:
if depth > 5:
return []
if isinstance(value, list):
candidates = [item for item in value if isinstance(item, dict)]
meaningful = [item for item in candidates if any(key in item for key in ("path", "name", "children", "meta"))]
return meaningful or []
if isinstance(value, dict):
for key in ("routes", "deviceMenu", "menu", "items"):
if key in value:
extracted = extract_routes(value[key], depth + 1)
if extracted:
return extracted
for nested in value.values():
extracted = extract_routes(nested, depth + 1)
if extracted:
return extracted
return []
raw_routes = extract_routes(export_obj)
return [self._attach_parameters_tokens(route) for route in raw_routes]
# Build output may rename helper functions; do not rely on single-letter identifiers.
PARAM_CALL_RE = re.compile(r"""\b[A-Za-z_$][\w$]*\([^,]*?,\s*(['"])(?P<tok>[^'"]+)\1\)""")
def _attach_parameters_tokens(self, node: dict[str, Any]) -> dict[str, Any]:
"""Attach parameter tokens to a node by processing its parameters section.
After AST to dict conversion, items in parameters.* sections can be:
- Literal strings (already valid tokens)
- Call expressions as raw strings, e.g., "E(A.WRITE,'URUCHOMIENIE_KOTLA')"
from which we extract the token using regex
This method processes all parameter sections (read, write, status, special),
extracts tokens from various formats, and normalizes them into a consistent
dict structure with a "token" key. It recursively processes child nodes.
Args:
node: A dictionary representing a catalog node that may contain a
"parameters" section with read/write/status/special subsections,
and optionally a "children" list.
Returns:
A new dictionary with the same structure as the input node, but with
parameters normalized to dicts containing "token" keys, and children
recursively processed.
"""
out = dict(node)
params = out.get("parameters")
if isinstance(params, dict):
newp: dict[str, list[dict[str, Any]]] = {}
for sec in ("read", "write", "status", "special"):
items = params.get(sec, []) or []
norm: list[dict[str, Any]] = []
for it in items:
if isinstance(it, str):
original = it.strip()
m = self.PARAM_CALL_RE.search(original)
if m:
tok = m.group("tok")
norm.append({"token": tok, "parameter": original})
else:
norm.append({"token": original, "parameter": original})
elif isinstance(it, dict):
entry = dict(it)
param_value = entry.get("parameter")
if isinstance(param_value, str):
match = self.PARAM_CALL_RE.search(param_value)
if match and "token" not in entry:
entry["token"] = match.group("tok")
elif "token" in entry:
token_val = str(entry["token"])
entry.setdefault("parameter", token_val)
else:
fallback_text = str(it)
match = self.PARAM_CALL_RE.search(fallback_text)
if match:
entry["token"] = match.group("tok")
entry.setdefault("parameter", fallback_text)
if "parameter" not in entry and "token" in entry:
entry["parameter"] = str(entry["token"]) or ""
norm.append(entry)
else:
s = str(it)
m = self.PARAM_CALL_RE.search(s)
if m:
tok = m.group("tok")
norm.append({"token": tok, "parameter": s})
elif s:
norm.append({"token": s, "parameter": s})
newp[sec] = norm
out["parameters"] = newp
# recursion for children
ch = out.get("children")
if isinstance(ch, list):
out["children"] = [self._attach_parameters_tokens(c) if isinstance(c, dict) else c for c in ch]
elif ch is not None:
# Fix: If children exists but is not a list (e.g., string reference like 'wA'),
# convert to empty list to prevent MenuManager errors
self._log.debug(f"Converting non-list children to empty list: {type(ch)} {ch}")
out["children"] = []
return out
# ---------- PARAM MAPS ----------
[docs]
async def get_param_mapping(self, tokens: Iterable[str]) -> dict[str, ParamMap]:
"""Retrieve parameter mappings for the given tokens.
This method attempts to resolve parameter mappings for each provided token through
a two-stage resolution process:
1. First, it searches for a dedicated asset file named ``BASENAME-<hash>.js`` where
BASENAME exactly matches the token.
2. If no asset is found, and there is exactly one unresolved token with exactly
one inline parameter candidate in the index, it attempts to use the inline
parameter map from the ``index-*.js`` file as a fallback.
3. Any tokens that cannot be resolved through either method are omitted from
the results.
Args:
tokens: An iterable of token strings to resolve into parameter mappings.
Returns:
A dictionary mapping successfully resolved token strings to their
corresponding ParamMap objects. Only tokens that were successfully
resolved are included in the returned dictionary.
Note:
The method performs asset fetching concurrently using asyncio.gather for
improved performance. Failed fetches are logged but do not raise exceptions.
"""
t0_total = time.perf_counter()
uniq_tokens = list(dict.fromkeys(str(t) for t in tokens if t))
if not self._idx.assets_by_basename:
await self._ensure_index_loaded()
# 1) file assets
file_jobs: list[tuple[str, AssetRef]] = []
unresolved: list[str] = []
for tok in uniq_tokens:
a = self._idx.find_asset_for_basename(tok)
if a:
file_jobs.append((tok, a))
else:
unresolved.append(tok)
results: dict[str, ParamMap] = {}
source_by_token: dict[str, str] = {}
telemetry = {
"token_count": len(uniq_tokens),
"assets_queued": len(file_jobs),
"assets_ok": 0,
"assets_missing_parse": 0,
"assets_failed": 0,
"index_token_map_hits": 0,
"index_chunk_hits": 0,
"index_full_hits": 0,
"index_ast_token_hits": 0,
"unresolved": 0,
"ms_assets": 0.0,
"ms_index_fallback": 0.0,
}
async def fetch_and_parse(tok: str, a: AssetRef) -> tuple[str, ParamMap | None, str | None]:
try:
code = await self._api.get_bytes(a.url)
pm = self._parse_param_map_from_js(code, tok, origin=f"asset:{a.url}")
if pm:
return tok, pm, None
self._log.debug("PARAM MAP not found in asset (export not object?): %s", a.url)
return tok, None, None
except Exception as e:
self._log.warning("Param asset fetch failed %s: %s", a.url, e)
return tok, None, str(e)
t_assets = time.perf_counter()
asset_rows = await asyncio.gather(*(fetch_and_parse(tok, a) for tok, a in file_jobs), return_exceptions=False)
telemetry["ms_assets"] = (time.perf_counter() - t_assets) * 1000.0
for tok, pm, err in asset_rows:
if pm is not None:
results[tok] = pm
source_by_token[tok] = "asset"
telemetry["assets_ok"] += 1
elif err is not None:
telemetry["assets_failed"] += 1
else:
telemetry["assets_missing_parse"] += 1
# 2) inline/index fallback for unresolved tokens
def _looks_like_param_map(pm: ParamMap | None, token: str) -> bool:
if pm is None:
return False
if pm.group is not None:
return True
if pm.units is not None:
return True
if any(pm.paths.get(name) for name in ("value", "unit", "status", "command", "min", "max")):
return True
raw = pm.raw
if isinstance(raw, Mapping):
if any(name in raw for name in ("name", "group", "pool", "use", "value", "unit", "units", "componentType")):
return True
nested = raw.get(token)
if isinstance(nested, Mapping):
return True
return False
unresolved_now = [tok for tok in unresolved if tok not in results]
if unresolved_now and self._idx.index_bytes:
t_fallback = time.perf_counter()
token_raw_maps = await self._get_index_token_raw_maps(self._idx.index_bytes)
chunk_token_maps: list[tuple[int, dict[str, dict[str, Any]]]] = []
chunk_root_objects: list[tuple[int, dict[str, Any]]] = []
full_root_obj = self._extract_root_object_from_js(self._idx.index_bytes)
standard_token_name_re = re.compile(r"^[A-Z][A-Z0-9_]+$")
for idx, (start, end) in enumerate(self._idx.inline_param_candidates):
if start < 0 or end <= start:
continue
chunk = self._idx.index_bytes[start:end]
try:
parsed_chunk = self._parse_index_token_raw_maps(chunk)
except Exception:
parsed_chunk = {}
if parsed_chunk:
chunk_token_maps.append((idx, parsed_chunk))
root_obj = self._extract_root_object_from_js(chunk)
if isinstance(root_obj, dict):
chunk_root_objects.append((idx, root_obj))
for tok in unresolved_now:
resolved_pm: ParamMap | None = None
raw_map = token_raw_maps.get(tok)
if isinstance(raw_map, Mapping):
pm_cached = self._build_param_map_from_obj(dict(raw_map), tok, origin="inline:index-token")
if _looks_like_param_map(pm_cached, tok):
resolved_pm = pm_cached
source_by_token[tok] = "index-token-map"
telemetry["index_token_map_hits"] += 1
if resolved_pm is None:
for idx, token_map in chunk_token_maps:
raw_map_chunk = token_map.get(tok)
if not isinstance(raw_map_chunk, Mapping):
continue
origin = "inline:index" if idx == 0 else f"inline:index[{idx}]"
pm_chunk = self._build_param_map_from_obj(dict(raw_map_chunk), tok, origin=origin)
if _looks_like_param_map(pm_chunk, tok):
resolved_pm = pm_chunk
source_by_token[tok] = "index-inline-chunk"
telemetry["index_chunk_hits"] += 1
break
if resolved_pm is None:
for idx, root_obj in chunk_root_objects:
origin = "inline:index" if idx == 0 else f"inline:index[{idx}]"
pm_chunk_obj = self._build_param_map_from_obj(dict(root_obj), tok, origin=origin)
if _looks_like_param_map(pm_chunk_obj, tok):
resolved_pm = pm_chunk_obj
source_by_token[tok] = "index-inline-chunk"
telemetry["index_chunk_hits"] += 1
break
if resolved_pm is None:
pm_full = None
if isinstance(full_root_obj, dict):
pm_full = self._build_param_map_from_obj(dict(full_root_obj), tok, origin="inline:index-full")
if _looks_like_param_map(pm_full, tok):
resolved_pm = pm_full
source_by_token[tok] = "index-full"
telemetry["index_full_hits"] += 1
# token_raw_maps already covers the common token namespace; keep this expensive
# fallback only for non-standard keys to preserve compatibility.
if resolved_pm is None and not standard_token_name_re.match(tok):
pm_token = self._parse_param_map_from_index_token(
self._idx.index_bytes,
tok,
origin="inline:index-token",
)
if _looks_like_param_map(pm_token, tok):
resolved_pm = pm_token
source_by_token[tok] = "index-token-ast"
telemetry["index_ast_token_hits"] += 1
if resolved_pm is not None:
results[tok] = resolved_pm
else:
self._log.debug("Index fallback didn't parse expected param map for %s", tok)
source_by_token[tok] = "unresolved"
telemetry["unresolved"] += 1
telemetry["ms_index_fallback"] = (time.perf_counter() - t_fallback) * 1000.0
for tok in uniq_tokens:
source_by_token.setdefault(tok, "unresolved")
telemetry["unresolved"] = sum(1 for src in source_by_token.values() if src == "unresolved")
total_ms = (time.perf_counter() - t0_total) * 1000.0
self._log.info(
"PARAM MAPS: tokens=%d resolved=%d unresolved=%d total_ms=%.2f assets_ms=%.2f index_ms=%.2f "
"src(asset=%d idx-map=%d idx-chunk=%d idx-full=%d idx-ast=%d)",
len(uniq_tokens),
len(results),
telemetry["unresolved"],
total_ms,
telemetry["ms_assets"],
telemetry["ms_index_fallback"],
telemetry["assets_ok"],
telemetry["index_token_map_hits"],
telemetry["index_chunk_hits"],
telemetry["index_full_hits"],
telemetry["index_ast_token_hits"],
)
self._log.debug(
"PARAM MAPS detail: assets_queued=%d assets_failed=%d assets_missing_parse=%d",
telemetry["assets_queued"],
telemetry["assets_failed"],
telemetry["assets_missing_parse"],
)
unresolved_preview = [tok for tok, src in source_by_token.items() if src == "unresolved"][:20]
if unresolved_preview:
self._log.debug("PARAM MAPS unresolved preview(%d): %s", len(unresolved_preview), unresolved_preview)
return results
@staticmethod
def _index_signature(code: bytes) -> tuple[int, bytes, bytes]:
head = code[:64]
tail = code[-64:] if len(code) > 64 else code
return (len(code), head, tail)
async def _get_index_token_raw_maps(self, code: bytes) -> dict[str, dict[str, Any]]:
sig = self._index_signature(code)
cached = self._index_token_raw_maps
cached_sig = self._index_token_raw_maps_sig
if isinstance(cached, dict) and cached_sig == sig:
return cached
async with self._index_token_raw_maps_lock:
cached2 = self._index_token_raw_maps
cached_sig2 = self._index_token_raw_maps_sig
if isinstance(cached2, dict) and cached_sig2 == sig:
return cached2
parsed = self._parse_index_token_raw_maps(code)
self._index_token_raw_maps = parsed
self._index_token_raw_maps_sig = sig
return parsed
def _parse_index_token_raw_maps(self, code: bytes) -> dict[str, dict[str, Any]]:
try:
tree = self._ts.parse(code)
bindings = _collect_bindings(code, tree.root_node)
except Exception:
return {}
out: dict[str, dict[str, Any]] = {}
token_name_re = re.compile(r"^[A-Z][A-Z0-9_]+$")
for node in _walk(tree.root_node):
if node.type != "pair":
continue
key_node = node.child_by_field_name("key")
value_node = node.child_by_field_name("value")
if key_node is None or value_node is None:
continue
raw_key = _node_text(code, key_node)
key_text = _string_value(raw_key) if _is_string(key_node) else raw_key
if not token_name_re.match(key_text):
continue
value = _node_to_python(code, value_node, bindings)
if isinstance(value, Mapping):
out[key_text] = dict(value)
return out
def _parse_param_map_from_index_token(self, code: bytes, key: str, origin: str) -> ParamMap | None:
"""Parse a single token mapping directly from index AST by key lookup.
This handles bundles where token definitions are nested in large objects and
may use shorthand factory calls (e.g. ``STATUS_P5_10: eE({...})``).
"""
try:
tree = self._ts.parse(code)
bindings = _collect_bindings(code, tree.root_node)
except Exception:
return None
for node in _walk(tree.root_node):
if node.type != "pair":
continue
key_node = node.child_by_field_name("key")
value_node = node.child_by_field_name("value")
if key_node is None or value_node is None:
continue
raw_key = _node_text(code, key_node)
key_text = _string_value(raw_key) if _is_string(key_node) else raw_key
if key_text != key:
continue
resolved_value = _node_to_python(code, value_node, bindings)
if isinstance(resolved_value, Mapping):
parsed = self._build_param_map_from_obj(dict(resolved_value), key, origin=origin)
if parsed is not None:
return parsed
return None
def _build_param_map_from_obj(self, obj: dict[str, Any], key: str, origin: str) -> ParamMap | None:
candidate_for_key = obj.get(key)
if isinstance(candidate_for_key, Mapping):
obj = dict(candidate_for_key)
group = obj.get("group") or obj.get("pool")
def _ensure_mapping_list(value: Any) -> list[dict[str, Any]]:
if isinstance(value, list):
return [dict(item) for item in value if isinstance(item, Mapping)]
if isinstance(value, Mapping):
return [dict(value)]
return []
def _normalize_status(value: Any) -> tuple[dict[str, list[dict[str, Any]]], list[dict[str, Any]]]:
conditions: dict[str, list[dict[str, Any]]] = {}
flat: list[dict[str, Any]] = []
if isinstance(value, Mapping):
for raw_key, entries in value.items():
key_str = str(raw_key)
normalized_entries = _ensure_mapping_list(entries)
if not normalized_entries:
continue
conditions[key_str] = normalized_entries
for entry in normalized_entries:
enriched = dict(entry)
enriched.setdefault("condition", key_str)
flat.append(enriched)
elif isinstance(value, list):
normalized_entries = _ensure_mapping_list(value)
if normalized_entries:
conditions["default"] = normalized_entries
flat.extend(normalized_entries)
return conditions, flat
def _normalize_literal(value: Any) -> Any:
if isinstance(value, str):
trimmed = value.strip()
if trimmed == "void 0":
return None
if trimmed == "undefined":
return None
if trimmed == "!0":
return True
if trimmed == "!1":
return False
return value
def _normalize_identifier(value: Any) -> str | None:
if not isinstance(value, str):
return None
cleaned = value.strip()
if not cleaned:
return None
if cleaned.startswith("[") and cleaned.endswith("]") and len(cleaned) > 2:
cleaned = cleaned[1:-1]
parts = cleaned.split(".", 1)
if len(parts) == 2 and parts[0] in {"a", "e", "n", "o", "t"}:
cleaned = parts[1]
return cleaned
def _normalize_condition_entries(entries: Any) -> list[dict[str, Any]]:
conditions_list: list[dict[str, Any]] = []
if not isinstance(entries, list):
return conditions_list
for entry in entries:
if not isinstance(entry, Mapping):
continue
condition = {
"operation": _normalize_identifier(entry.get("operation")),
"expected": _normalize_literal(entry.get("expected")),
"targets": _ensure_mapping_list(entry.get("value")),
}
conditions_list.append(condition)
return conditions_list
def _normalize_command_action(raw_action: Any) -> dict[str, Any] | None:
if not isinstance(raw_action, Mapping):
return None
command = _normalize_identifier(raw_action.get("command"))
value = _normalize_literal(raw_action.get("value"))
action: dict[str, Any] = {}
if command:
action["command"] = command
if value is not None:
action["value"] = value
return action if action else None
def _normalize_command_branches(blocks: Any, logic_key: str) -> list[dict[str, Any]]:
normalized: list[dict[str, Any]] = []
if not isinstance(blocks, list):
return normalized
for entry in blocks:
if not isinstance(entry, Mapping):
continue
branch_key = None
for candidate in ("if", "elseif", "else"):
if candidate in entry:
branch_key = candidate
break
if branch_key is None:
branch_key = "if"
conditions = _normalize_condition_entries(entry.get(branch_key)) if branch_key != "else" else []
action_source = entry.get("then") if branch_key != "else" else entry.get("else")
action = _normalize_command_action(action_source)
if action is None:
continue
normalized_entry: dict[str, Any] = {
"logic": logic_key,
"kind": branch_key,
"conditions": conditions,
}
normalized_entry.update(action)
normalized.append(normalized_entry)
return normalized
value_paths = _ensure_mapping_list(obj.get("value"))
unit_field = obj.get("unit")
unit_paths = _ensure_mapping_list(unit_field)
command_paths = _ensure_mapping_list(obj.get("command"))
min_paths = _ensure_mapping_list(obj.get("minValue") or obj.get("min"))
max_paths = _ensure_mapping_list(obj.get("maxValue") or obj.get("max"))
status_conditions, status_flat = _normalize_status(obj.get("status"))
command_rules: list[dict[str, Any]] = []
for logic_key in ("any", "all", "when"):
command_rules.extend(_normalize_command_branches(obj.get(logic_key), logic_key))
use = obj.get("use")
if isinstance(use, Mapping):
value_paths = value_paths or _ensure_mapping_list(use.get("v"))
unit_paths = unit_paths or _ensure_mapping_list(use.get("u"))
if not status_flat:
_, status_flat = _normalize_status(use.get("s"))
min_paths = min_paths or _ensure_mapping_list(use.get("n"))
max_paths = max_paths or _ensure_mapping_list(use.get("x"))
paths = {
"value": value_paths,
"unit": unit_paths,
"status": status_flat,
"command": command_paths,
"min": min_paths,
"max": max_paths,
}
component = obj.get("componentType")
units_raw: Any = obj.get("units")
if units_raw is None:
units_raw = obj.get("unit_name")
if units_raw is None and isinstance(unit_field, (str, int, float)):
units_raw = unit_field
limits = None
for cand in ("limits", "range", "minmax"):
if cand in obj and isinstance(obj[cand], dict):
limits = obj[cand]
break
status_flags = obj.get("statusFlags") or obj.get("status_bits") or []
return ParamMap(
key=key,
group=group if isinstance(group, str) else None,
paths=paths,
component_type=component if isinstance(component, str) else None,
units=units_raw if isinstance(units_raw, (str, int, float)) else None,
limits=limits,
status_flags=status_flags if isinstance(status_flags, list) else [],
status_conditions=status_conditions or None,
command_rules=command_rules,
origin=origin,
raw=obj,
)
def _parse_param_map_from_js(self, code: bytes, key: str, origin: str) -> ParamMap | None:
"""Parse a JavaScript file (or index fragment) and extract a parameter map object.
This method analyzes JavaScript code to find and parse an exported object that
appears to be a parameter map. It extracts various fields like group, paths,
component type, units, limits, and status flags, performing minimal normalization
on the data structure.
Args:
code: The JavaScript source code as bytes to parse.
key: A string identifier for this parameter map.
origin: A string indicating the source/origin of this parameter map.
Returns:
A ParamMap object containing the parsed and normalized parameter data,
or None if no valid parameter map could be extracted from the code.
Note:
The method performs minimal normalization by checking for alternative field
names (e.g., 'group' or 'pool', 'use' fields or direct 'value'/'unit'/'status').
The raw parsed object is preserved in the ParamMap's 'raw' attribute.
"""
obj = self._extract_root_object_from_js(code)
if not isinstance(obj, dict):
return None
return self._build_param_map_from_obj(obj, key, origin)
def _extract_root_object_from_js(self, code: bytes) -> dict[str, Any] | None:
"""Extract root exported object from JavaScript bytes.
Returns a plain Python dictionary when extraction succeeds, otherwise ``None``.
"""
tree = self._ts.parse(code)
bindings = _collect_bindings(code, tree.root_node)
root = _find_export_root(code, tree.root_node)
if root is None:
return None
obj = _object_to_python(code, root, bindings=bindings)
if isinstance(obj, dict):
return obj
return None
# ---------- Permissions helper ----------
[docs]
async def list_symbols_for_permissions(
self,
device_menu: int,
permissions: Iterable[str],
) -> set[str]:
"""List symbols visible for given permissions.
This is a convenient shortcut that fetches the menu for device_menu and returns
tokens visible for the provided permissions. The schemas hook remains OFF.
Args:
device_menu: The device menu identifier.
permissions: An iterable of permission strings to check visibility against.
Returns:
A set of token strings that are visible for the given permissions.
"""
menu = await self.get_module_menu(device_menu=device_menu, permissions=permissions)
return menu.all_tokens()
# ---------- i18n support ----------
[docs]
async def list_language_config(self) -> TranslationConfig | None:
"""Get translation configuration from assets.
Parses the ``index-*.js`` file to extract language configuration by structural patterns.
The configuration object contains translations array and defaultTranslation field.
"""
if not self._idx.index_bytes:
await self._ensure_index_loaded()
if not self._idx.index_bytes:
self._log.warning("No index data available. Call refresh_index() or refresh_index_minimal() first.")
return None
try:
return self._parse_language_config_from_js(self._idx.index_bytes)
except Exception as e:
self._log.warning("Failed to parse language config from index: %s", e)
return None
[docs]
async def get_i18n(self, lang: str, namespace: str) -> dict[str, Any]:
"""Get i18n mapping for a given language and namespace.
Parses the index file for dynamic language imports in the format:
"../../resources/languages/{lang}/{namespace}.json":()=>d(()=>import("./file-hash.js"),[]).then(e=>e.default)
Then fetches and parses the corresponding asset file.
Args:
lang: Language code (e.g., 'en', 'pl').
namespace: Namespace (e.g., 'parameters', 'units').
Returns:
Dictionary with translation mappings, or empty dict if not found.
"""
lang_norm = str(lang).strip().lower()
namespace_norm = str(namespace).strip().lower()
if not lang_norm or not namespace_norm:
return {}
cache_key = (lang_norm, namespace_norm)
cached = self._cache_i18n.get(cache_key)
if cached is not None:
return cached
async with self._i18n_lock:
cached2 = self._cache_i18n.get(cache_key)
if cached2 is not None:
return cached2
if not self._idx.index_bytes:
await self._ensure_index_loaded()
if not self._idx.index_bytes:
self._log.warning("No index data available for i18n lookup")
self._cache_i18n[cache_key] = {}
return {}
try:
asset_ref = self._find_i18n_asset(lang_norm, namespace_norm)
if not asset_ref:
self._log.debug("No i18n asset found for %s/%s", lang_norm, namespace_norm)
self._cache_i18n[cache_key] = {}
return {}
code = await self._api.get_bytes(asset_ref.url)
translations = self._parse_i18n_from_js(code)
result = translations if isinstance(translations, dict) else {}
self._cache_i18n[cache_key] = result
self._log.debug("Loaded i18n %s/%s: %d keys", lang_norm, namespace_norm, len(result))
return result
except Exception as e:
self._log.warning("Failed to load i18n %s/%s: %s", lang_norm, namespace_norm, e)
self._cache_i18n[cache_key] = {}
return {}
@staticmethod
def _normalize_unit_key(raw_key: Any) -> str | None:
if isinstance(raw_key, int):
return str(raw_key)
if isinstance(raw_key, float):
if raw_key.is_integer():
return str(int(raw_key))
return None
if isinstance(raw_key, str):
key = raw_key.strip()
if not key:
return None
if key.isdigit():
return key
try:
as_float = float(key)
except ValueError:
return None
return str(int(as_float)) if as_float.is_integer() else None
return None
@staticmethod
def _is_unit_descriptor_entry(value: Any) -> bool:
if not isinstance(value, Mapping):
return False
return any(k in value for k in ("text", "options", "value", "valuePrepare"))
def _parse_units_descriptor_table_from_index(self, code: bytes) -> dict[str, dict[str, Any]]:
try:
tree = self._ts.parse(code)
bindings = _collect_bindings(code, tree.root_node)
except Exception as exc:
self._log.debug("Failed to parse units descriptor table from index: %s", exc)
return {}
best: dict[str, dict[str, Any]] = {}
for statement in tree.root_node.named_children:
if statement.type not in {"lexical_declaration", "variable_declaration"}:
continue
for declarator in statement.named_children:
if declarator.type != "variable_declarator":
continue
value_node = declarator.child_by_field_name("value")
if value_node is None or value_node.type != "object":
continue
candidate = _node_to_python(code, value_node, bindings)
if not isinstance(candidate, Mapping):
continue
parsed: dict[str, dict[str, Any]] = {}
for key_raw, value_raw in candidate.items():
key = self._normalize_unit_key(key_raw)
if key is None or not self._is_unit_descriptor_entry(value_raw):
continue
parsed[key] = dict(value_raw)
if len(parsed) > len(best):
best = parsed
return best
[docs]
async def get_unit_descriptor(self, unit_code: Any) -> dict[str, Any] | None:
"""Return unit descriptor for raw unit code from index-defined transform table.
The BragerOne frontend keeps canonical unit behavior in an index-scoped table
(text/options/value/valuePrepare). This helper exposes that table entry by raw
unit code so runtime consumers can apply the same mappings/transforms.
"""
key = self._normalize_unit_key(unit_code)
if key is None:
return None
cached = self._units_descriptor_table
if isinstance(cached, dict):
entry = cached.get(key)
return dict(entry) if isinstance(entry, Mapping) else None
async with self._units_descriptor_lock:
cached_inner = self._units_descriptor_table
if not isinstance(cached_inner, dict):
if not self._idx.index_bytes:
await self._ensure_index_loaded()
if not self._idx.index_bytes:
self._units_descriptor_table = {}
else:
self._units_descriptor_table = self._parse_units_descriptor_table_from_index(self._idx.index_bytes)
cached_inner = self._units_descriptor_table
entry = cached_inner.get(key)
return dict(entry) if isinstance(entry, Mapping) else None
def _find_i18n_asset(self, lang: str, namespace: str) -> AssetRef | None:
"""Find i18n asset for given language and namespace.
Looks for patterns in index like:
"../../resources/languages/{lang}/{namespace}.json":()=>d(()=>import("./file-hash.js"),[])
Args:
lang: Language code.
namespace: Namespace (e.g., 'parameters', 'units').
Returns:
AssetRef for the i18n file or None if not found.
"""
if not self._idx.index_bytes:
return None
index_text = self._idx.index_bytes.decode("utf-8", errors="replace")
# Upstream namespaces are not consistently cased (e.g. `app.json` vs `diodeState.json`).
# Match case-insensitively by scanning all language imports and selecting the requested namespace.
want = str(namespace).strip().lower()
if not want:
return None
# Capture: <ns> + import("./<file_base>-<hash>.js")
import_pattern = re.compile(
rf'["\']\.\.\/\.\.\/resources\/languages\/{re.escape(lang)}\/([^"\']+)\.json["\']'
r':\s*\(\)\s*=>\s*\w+\s*\(\s*\(\)\s*=>\s*import\s*\(\s*["\']\.\/([^"\']+)-([A-Za-z0-9_]+)\.js["\']'
)
match: re.Match[str] | None = None
for m in import_pattern.finditer(index_text):
ns_in_index = m.group(1)
if str(ns_in_index).strip().lower() == want:
match = m
break
if not match:
self._log.debug("No i18n asset match for %s/%s (case-insensitive)", lang, namespace)
return None
file_base = match.group(2) # e.g., "parameters" or "diodeState"
file_hash = match.group(3) # e.g., "BNvCCsxi"
# Construct the full asset filename and URL
asset_filename = f"{file_base}-{file_hash}.js"
# Look up the asset in our index by basename
# The basename should match the file_base (e.g., "parameters")
asset_ref = self._idx.find_asset_for_basename(file_base)
if asset_ref and asset_ref.hash == file_hash:
return asset_ref
# Fallback: construct URL based on index URL pattern
# This assumes the i18n files are in the same directory as the index
if hasattr(self, "_last_index_url") and self._last_index_url:
asset_url = self._smart_urljoin(self._last_index_url, asset_filename)
return AssetRef(url=asset_url, base=file_base, hash=file_hash)
return None
def _parse_i18n_from_js(self, code: bytes) -> dict[str, Any]:
"""Parse i18n translations from a JavaScript module.
Expected format:
export default { "key1": "value1", "key2": "value2", ... }
Args:
code: JavaScript source code bytes.
Returns:
Dictionary of translations.
"""
try:
tree = self._ts.parse(code)
root = tree.root_node
code_text = code.decode("utf-8", errors="replace")
bindings: dict[str, Any] = {}
# Collect constant bindings so we can resolve identifier references
for child in root.named_children:
if child.type != "lexical_declaration":
continue
for declarator in child.named_children:
if declarator.type != "variable_declarator":
continue
name_node = declarator.child_by_field_name("name")
value_node = declarator.child_by_field_name("value")
if name_node is None or value_node is None:
continue
name = _node_text(code, name_node).strip()
if not name:
continue
bindings[name] = _node_to_python(code, value_node, bindings)
translations: Any | None = None
# Prefer explicit `export default <expr>` if present.
for child in root.named_children:
if child.type != "export_statement":
continue
value_node = child.child_by_field_name("value")
if value_node is not None:
translations = _node_to_python(code, value_node, bindings)
break
if translations is None:
match = re.search(
r"export\s*\{[^}]*?([A-Za-z0-9_$]+)\s+as\s+default",
code_text,
)
if match:
default_name = match.group(1)
candidate = bindings.get(default_name)
if candidate is not None:
translations = candidate
if translations is None:
export_root = _find_export_root(code, root)
if export_root is not None:
translations = _node_to_python(code, export_root, bindings)
if isinstance(translations, dict):
return translations
if translations is not None:
self._log.warning("i18n export is not an object: %s", type(translations))
return {}
except Exception as e:
self._log.warning("Failed to parse i18n bundle: %s", e)
return {}
def _is_translations_array_bytes(self, array_node: Node, js_bytes: bytes) -> bool:
"""Check if an array looks like a translations array (byte-safe).
Uses the same 70% threshold heuristic as the text-based variant.
"""
valid_entries = 0
total_objects = 0
for child in array_node.children:
if child.type != "object":
continue
total_objects += 1
has_id = False
has_flag = False
for pair in child.children:
if pair.type != "pair":
continue
key_node = pair.child_by_field_name("key")
if key_node is None:
continue
key_bytes = js_bytes[key_node.start_byte : key_node.end_byte]
key_text = key_bytes.decode("utf-8", errors="replace")
if key_text == "id":
has_id = True
elif key_text == "flag":
has_flag = True
if has_id and has_flag:
valid_entries += 1
threshold = total_objects * 0.7
return total_objects > 0 and valid_entries >= threshold
def _parse_language_config_from_js(self, js_bytes: bytes) -> TranslationConfig:
"""Parse TranslationConfig from index JS bytes.
This is intentionally structural: it searches for an object literal containing:
- `translations`: array of objects with `id` and `flag` keys
- `defaultTranslation`: string
"""
tree = self._ts.parse(js_bytes)
root = tree.root_node
def is_language_config_object(obj_node: Node) -> bool:
if obj_node.type != "object":
return False
has_translations = False
has_default_translation = False
for pair in obj_node.children:
if pair.type != "pair":
continue
key_node = pair.child_by_field_name("key")
value_node = pair.child_by_field_name("value")
if key_node is None or value_node is None:
continue
key_bytes = js_bytes[key_node.start_byte : key_node.end_byte]
key_text = key_bytes.decode("utf-8", errors="replace")
if key_text == "translations" and value_node.type == "array":
if self._is_translations_array_bytes(value_node, js_bytes):
has_translations = True
elif key_text == "defaultTranslation" and value_node.type == "string":
has_default_translation = True
return has_translations and has_default_translation
def visit_node(node: Node) -> Node | None:
if node.type == "object" and is_language_config_object(node):
return node
for child in node.children:
found = visit_node(child)
if found is not None:
return found
return None
obj_node = visit_node(root)
if obj_node is None:
raise ValueError("Language config object not found")
translations = self._extract_translations_array_bytes(obj_node, js_bytes)
default_translation = self._extract_default_translation_bytes(obj_node, js_bytes)
if translations is None or default_translation is None:
raise ValueError("Language config object missing required fields")
return TranslationConfig(translations=translations, default_translation=default_translation)
def _is_translations_array(self, array_node: Node, text: str) -> bool:
"""Check if an array looks like a translations array.
Looks for array elements that are objects with 'id' and 'flag' properties,
which is the signature of language configuration entries.
"""
valid_entries = 0
total_objects = 0
for child in array_node.children:
if child.type == "object":
total_objects += 1
has_id = False
has_flag = False
for pair in child.children:
if pair.type != "pair":
continue
key_node = pair.child_by_field_name("key")
if not key_node:
continue
key_text = self._get_node_text(key_node, text)
if key_text == "id":
has_id = True
elif key_text == "flag":
has_flag = True
if has_id and has_flag:
valid_entries += 1
# Consider it a translations array if most objects have the expected structure
threshold = total_objects * 0.7
return total_objects > 0 and valid_entries >= threshold
def _extract_translations_array_bytes(self, obj_node: Node, js_bytes: bytes) -> list[dict[str, Any]] | None:
"""Extract translations array from language configuration object (bytes version)."""
for pair in obj_node.children:
if pair.type == "pair":
key_node = pair.child_by_field_name("key")
if key_node:
key_bytes = js_bytes[key_node.start_byte : key_node.end_byte]
key_text = key_bytes.decode("utf-8", errors="replace")
if key_text == "translations":
value_node = pair.child_by_field_name("value")
if value_node and value_node.type == "array":
return self._parse_translations_array_bytes(value_node, js_bytes)
return None
def _extract_default_translation_bytes(self, obj_node: Node, js_bytes: bytes) -> str | None:
"""Extract defaultTranslation from language configuration object (bytes version)."""
for pair in obj_node.children:
if pair.type == "pair":
key_node = pair.child_by_field_name("key")
if key_node:
key_bytes = js_bytes[key_node.start_byte : key_node.end_byte]
key_text = key_bytes.decode("utf-8", errors="replace")
if key_text == "defaultTranslation":
value_node = pair.child_by_field_name("value")
if value_node and value_node.type == "string":
# Remove quotes from string literal
value_bytes = js_bytes[value_node.start_byte : value_node.end_byte]
value_text = value_bytes.decode("utf-8", errors="replace")
return value_text.strip("\"'")
return None
def _parse_translations_array_bytes(self, array_node: Node, js_bytes: bytes) -> list[dict[str, Any]]:
"""Parse the translations array into Python list of dicts (bytes version)."""
translations = []
for child in array_node.children:
if child.type == "object":
translation = self._parse_translation_object_bytes(child, js_bytes)
if translation:
translations.append(translation)
return translations
def _parse_translation_object_bytes(self, obj_node: Node, js_bytes: bytes) -> dict[str, Any] | None:
"""Parse a single translation object (bytes version)."""
translation = {}
for pair in obj_node.children:
if pair.type == "pair":
key_node = pair.child_by_field_name("key")
value_node = pair.child_by_field_name("value")
if key_node and value_node:
key_bytes = js_bytes[key_node.start_byte : key_node.end_byte]
key = key_bytes.decode("utf-8", errors="replace")
value = self._parse_js_value_bytes(value_node, js_bytes)
translation[key] = value
return translation if translation else None
def _parse_js_value_bytes(self, node: Node, js_bytes: bytes) -> Any:
"""Parse a JavaScript value node into Python equivalent (bytes version)."""
if node.type == "string":
val_bytes = js_bytes[node.start_byte : node.end_byte]
val_text = val_bytes.decode("utf-8", errors="replace")
return val_text.strip("\"'")
elif node.type == "number":
val_bytes = js_bytes[node.start_byte : node.end_byte]
val_text = val_bytes.decode("utf-8", errors="replace")
return int(val_text) if "." not in val_text else float(val_text)
elif node.type == "true":
return True
elif node.type == "false":
return False
elif node.type == "null":
return None
elif node.type == "object":
obj = {}
for pair in node.children:
if pair.type == "pair":
key_node = pair.child_by_field_name("key")
value_node = pair.child_by_field_name("value")
if key_node and value_node:
key_bytes = js_bytes[key_node.start_byte : key_node.end_byte]
key = key_bytes.decode("utf-8", errors="replace")
value = self._parse_js_value_bytes(value_node, js_bytes)
obj[key] = value
return obj
elif node.type == "array":
arr = []
for child in node.children:
if child.type not in (",", "[", "]"):
arr.append(self._parse_js_value_bytes(child, js_bytes))
return arr
else:
# Fallback to raw text
val_bytes = js_bytes[node.start_byte : node.end_byte]
return val_bytes.decode("utf-8", errors="replace")
def _extract_translations_array(self, obj_node: Node, text: str) -> list[dict[str, Any]] | None:
"""Extract translations array from language configuration object."""
for pair in obj_node.children:
if pair.type == "pair":
key_node = pair.child_by_field_name("key")
if key_node and self._get_node_text(key_node, text) == "translations":
value_node = pair.child_by_field_name("value")
if value_node and value_node.type == "array":
return self._parse_translations_array(value_node, text)
return None
def _extract_default_translation(self, obj_node: Node, text: str) -> str | None:
"""Extract defaultTranslation from language configuration object."""
for pair in obj_node.children:
if pair.type == "pair":
key_node = pair.child_by_field_name("key")
if key_node and self._get_node_text(key_node, text) == "defaultTranslation":
value_node = pair.child_by_field_name("value")
if value_node and value_node.type == "string":
# Remove quotes from string literal
return self._get_node_text(value_node, text).strip("\"'")
return None
def _parse_translations_array(self, array_node: Node, text: str) -> list[dict[str, Any]]:
"""Parse the translations array into Python list of dicts."""
translations = []
for child in array_node.children:
if child.type == "object":
translation = self._parse_translation_object(child, text)
if translation:
translations.append(translation)
return translations
def _parse_translation_object(self, obj_node: Node, text: str) -> dict[str, Any] | None:
"""Parse a single translation object."""
translation = {}
for pair in obj_node.children:
if pair.type == "pair":
key_node = pair.child_by_field_name("key")
value_node = pair.child_by_field_name("value")
if key_node and value_node:
key = self._get_node_text(key_node, text)
value = self._parse_js_value(value_node, text)
translation[key] = value
return translation if translation else None
def _parse_js_value(self, node: Node, text: str) -> Any:
"""Parse a JavaScript value node into Python equivalent."""
if node.type == "string":
return self._get_node_text(node, text).strip("\"'")
elif node.type == "number":
val_text = self._get_node_text(node, text)
return int(val_text) if "." not in val_text else float(val_text)
elif node.type == "true":
return True
elif node.type == "false":
return False
elif node.type == "null":
return None
elif node.type == "object":
obj = {}
for pair in node.children:
if pair.type == "pair":
key_node = pair.child_by_field_name("key")
value_node = pair.child_by_field_name("value")
if key_node and value_node:
key = self._get_node_text(key_node, text)
value = self._parse_js_value(value_node, text)
obj[key] = value
return obj
elif node.type == "array":
arr = []
for child in node.children:
if child.type != "," and child.type != "[" and child.type != "]":
arr.append(self._parse_js_value(child, text))
return arr
else:
# Fallback to raw text
return self._get_node_text(node, text)
def _get_node_text(self, node: Node, text: str) -> str:
"""Extract text content of a node, handling quoted strings."""
node_text = text[node.start_byte : node.end_byte]
# Remove quotes from string literals
if (
node.type == "string"
and len(node_text) >= 2
and (
(node_text.startswith('"') and node_text.endswith('"')) or (node_text.startswith("'") and node_text.endswith("'"))
)
):
return node_text[1:-1]
return node_text