Source code for pybragerone.api.ws

"""WebSocket (Socket.IO) client for BragerOne realtime events."""

from __future__ import annotations

import asyncio
import logging
from collections.abc import Awaitable, Callable
from typing import (
    Any,
    Protocol,
    TypedDict,
    runtime_checkable,
)

import socketio

from ..utils import spawn
from .constants import IO_BASE, ONE_BASE, SOCK_PATH, WS_NAMESPACE

log = logging.getLogger(__name__)
sio_log = logging.getLogger(__name__ + ".sio")
eio_log = logging.getLogger(__name__ + ".eio")


# Signature for a generic event handler used by the gateway.
EventHandler = Callable[[str, Any], None]
# Signature for a callback invoked on connection establishment.
ConnectedCb = Callable[[], None | Awaitable[None]]


[docs] @runtime_checkable class EventDispatcher(Protocol): """Protocol for an event dispatcher used by the realtime manager.""" def __call__(self, event_name: str, payload: Any) -> None | Awaitable[None]: """Handle an event with the given name and payload. Args: event_name: The event name. payload: The event payload (varies by event). """ ...
class _SubPayload(TypedDict, total=False): """Payload for subscription events.""" modules: list[str] devids: list[str] group_id: int
[docs] class RealtimeManager: """Thin Socket.IO wrapper for BragerOne realtime channel. The manager keeps a single AsyncClient connection, exposes the Engine.IO SID and the namespace SID, and forwards selected events to a user-provided callback (``EventHandler``). It does **not** interpret payloads; that is the gateway's responsibility. Notes: - Authentication is provided **only** via HTTP headers (Bearer token). - We always connect to the `:data:`~.constants.WS_NAMESPACE`` namespace. - We listen to: ``snapshot`` and the various ``*:parameters:change`` events. - Subscriptions are emitted in a few payload variants (``modules`` / ``devids``) and optionally include ``group_id``. """ def __init__( self, token: str, *, origin: str = ONE_BASE, referer: str = f"{ONE_BASE}/", io_base: str = IO_BASE, socket_path: str = SOCK_PATH, namespace: str = WS_NAMESPACE, ) -> None: """Initialize the realtime manager. Args: token: Bearer token used for the initial Socket.IO HTTP upgrade. origin: HTTP ``Origin`` header value (default: :data:`~.constants.ONE_BASE`). referer: HTTP ``Referer`` header value (default: :data:`~.constants.ONE_BASE` + ``/``). io_base: Base URL of the Engine.IO/Socket.IO server (default: :data:`~.constants.IO_BASE`). socket_path: Socket.IO path on the server (default: :data:`~.constants.SOCK_PATH`). namespace: The namespace to join (default: :data:`~.constants.WS_NAMESPACE`). """ self._token = token self._origin = origin self._referer = referer self._io_base = io_base.rstrip("/") self._socket_path = socket_path self._namespace = namespace self._connected = asyncio.Event() self._on_connected: list[ConnectedCb] = [] self._on_event: EventDispatcher | None = None self._modules: list[str] = [] self._group_id: int | None = None # Configure AsyncClient with reconnection enabled. self._sio: socketio.AsyncClient = socketio.AsyncClient( reconnection=True, reconnection_attempts=0, # infinite reconnection_delay=1, reconnection_delay_max=10, logger=sio_log, # pyright: ignore[reportArgumentType] # route socket.io logs to a sub-logger engineio_logger=eio_log, # route engine.io logs to a sub-logger ) ns = self._namespace # Register built-in event handlers. self._sio.on("connect", self._on_connect, namespace=ns) self._sio.on("disconnect", self._on_disconnect, namespace=ns) self._sio.on("connect_error", self._on_connect_error, namespace=ns) self._sio.on("reconnect", self._on_reconnect, namespace=ns) self._sio.on("reconnect_attempt", self._on_reconnect_attempt, namespace=ns) self._sio.on("reconnect_error", self._on_reconnect_error, namespace=ns) self._sio.on("error", self._on_error, namespace=ns) self._sio.on("message", self._on_message, namespace=ns) # Register key domain event handlers. self._sio.on("snapshot", self._on_snapshot, namespace=ns) self._sio.on( "app:modules:parameters:change", self._on_app_modules_parameters_change, namespace=ns, ) self._sio.on( "modules:parameters:change", # fallback alt name self._on_modules_parameters_change, namespace=ns, ) self._sio.on( "parameters:change", self._on_parameters_change, namespace=ns, ) self._sio.on( "app:module:task:created", self._on_app_modules_task_created, namespace=ns, ) self._sio.on( "app:module:task:status:changed", self._on_app_modules_task_status_changed, namespace=ns, ) self._sio.on( "app:module:task:completed", self._on_app_modules_task_completed, namespace=ns, ) # Numeric fallbacks observed in some builds self._sio.on("60", self._on_ev60, namespace=ns) self._sio.on("61", self._on_ev61, namespace=ns) self._sio.on("63", self._on_ev63, namespace=ns) # ---- Built-in handlers ---- async def _on_connect(self) -> None: ns_sid = self.sid() eng_sid = self.engine_sid() log.info("WS connected, SID=%s", eng_sid) log.info( "WS connected, namespace_sid=%s, engine_sid=%s, namespaces=%s", ns_sid, eng_sid, list(self._sio.namespaces), ) for cb in list(self._on_connected): try: res = cb() if asyncio.iscoroutine(res): spawn(res, "on_connected_cb", log) except Exception: log.exception("Error in on_connected callback") self._connected.set() async def _on_disconnect(self) -> None: log.info("WS disconnected") self._connected.clear() async def _on_connect_error(self, data: Any | None = None) -> None: log.warning("WS connect_error: %s", data) async def _on_reconnect(self) -> None: log.info("WS reconnect OK") async def _on_reconnect_attempt(self, number: int) -> None: log.info("WS reconnect attempt #%s", number) async def _on_reconnect_error(self, data: Any | None = None) -> None: log.warning("WS reconnect_error: %s", data) async def _on_error(self, data: Any) -> None: log.error("WS ERROR: %s", data) async def _on_message(self, data: Any) -> None: log.debug("WS message → %s", data) # --- Key domain events --- async def _on_snapshot(self, payload: Any) -> None: log.debug("WS EVENT snapshot → %s", payload) self._dispatch("snapshot", payload) async def _on_app_modules_parameters_change(self, payload: Any) -> None: log.debug("WS EVENT app:modules:parameters:change → %s", payload) self._dispatch("app:modules:parameters:change", payload) # Fallback alt names occasionally seen in traces async def _on_modules_parameters_change(self, payload: Any) -> None: log.debug("WS EVENT modules:parameters:change → %s", payload) self._dispatch("modules:parameters:change", payload) async def _on_parameters_change(self, payload: Any) -> None: log.debug("WS EVENT parameters:change → %s", payload) self._dispatch("parameters:change", payload) # Optional task-related events; kept for completeness/diagnostics. async def _on_app_modules_task_created(self, p: Any) -> None: log.debug("WS EVENT app:module:task:created → %s", p) self._dispatch("app:module:task:created", p) async def _on_app_modules_task_status_changed(self, p: Any) -> None: log.debug("WS EVENT app:module:task:status:changed → %s", p) self._dispatch("app:module:task:status:changed", p) async def _on_app_modules_task_completed(self, p: Any) -> None: log.debug("WS EVENT app:module:task:completed → %s", p) self._dispatch("app:module:task:completed", p) # Numeric fallbacks observed in some builds async def _on_ev60(self, p: Any) -> None: log.debug("WS EVENT 60 → %s", p) self._dispatch("app:module:task:status:changed", p) async def _on_ev61(self, p: Any) -> None: log.debug("WS EVENT 61 → %s", p) self._dispatch("app:module:task:created", p) async def _on_ev63(self, p: Any) -> None: log.debug("WS EVENT 63 → %s", p) self._dispatch("app:module:task:completed", p) # ---------------- Public API ----------------
[docs] async def connect(self) -> None: """Open a Socket.IO connection with appropriate headers and wait for join.""" headers = { "Authorization": f"Bearer {self._token}", "Origin": self._origin, "Referer": self._referer, "Accept-Language": "pl-PL,pl;q=0.9", "x-appversion": "1.1.78", } await self._sio.connect( self._io_base, headers=headers, transports=["pooling", "websocket"], socketio_path=self._socket_path, namespaces=[self._namespace], ) # Short grace period to ensure the namespace is fully established await self._connected.wait() await asyncio.sleep(0.1)
[docs] def sid(self) -> str | None: """Return the namespace SID (``/ws``), if available.""" try: return str(self._sio.get_sid(self._namespace)) except Exception: return None
[docs] def engine_sid(self) -> str | None: """Return the underlying Engine.IO SID (transport-level).""" return getattr(self._sio, "sid", None)
[docs] async def disconnect(self) -> None: """Close the Socket.IO connection if open.""" if self._sio.connected: await self._sio.disconnect()
[docs] def on_event(self, handler: EventDispatcher) -> None: """Register a single event dispatcher (gateway attaches here).""" self._on_event = handler
[docs] def add_on_connected(self, cb: ConnectedCb) -> None: """Register a callback to be called when the connection is established.""" self._on_connected.append(cb)
@property def group_id(self) -> int | None: """Return the optional ``group_id`` included in subscription payloads.""" return self._group_id @group_id.setter def group_id(self, group_id: int | None) -> None: """Set an optional ``group_id`` to be included in subscription payloads.""" self._group_id = group_id
[docs] async def subscribe(self, modules: list[str]) -> None: """Emit listen events for the provided devices (devids/modules).""" self._modules = sorted(set(modules)) if not self._modules: return # Base variants: "modules" and alt. "devids" base: _SubPayload = {"modules": self._modules} base_alt: _SubPayload = {"devids": self._modules} # Optionally include "group_id" if self.group_id is not None: base["group_id"] = self.group_id base_alt["group_id"] = self.group_id payloads: list[tuple[str, _SubPayload]] = [ ("app:modules:parameters:listen", base), ("app:modules:parameters:listen", base_alt), ("app:modules:activity:quantity:listen", base), ("app:modules:activity:quantity:listen", base_alt), ] for ev, pl in payloads: log.debug("EMIT %s %s", ev, pl) try: await self._sio.emit(ev, pl, namespace=self._namespace) except Exception: log.exception("Emit failed for %s", ev)
[docs] async def resubscribe(self) -> None: """Re-emit subscription events after a reconnect.""" if self._modules: await self.subscribe(self._modules)
# ---------------- Internal ---------------- def _dispatch(self, name: str, payload: Any) -> None: """Forward an event to the registered dispatcher (if any).""" if self._on_event: try: self._on_event(name, payload) except Exception: log.exception("Error in on_event(%s) callback", name)