Source code for pybragerone.models.events
"""Event bus and event classes for pybragerone."""
from __future__ import annotations
import asyncio
import time
from collections.abc import AsyncGenerator
from contextlib import suppress
from dataclasses import dataclass, field
from typing import Any
[docs]
@dataclass(frozen=True)
class FeatureChanged:
"""Feature changed event representing a change in device feature state."""
devid: str
#: Device identifier.
feature: str
#: Name of the feature that changed.
value: bool
#: New boolean value of the feature.
[docs]
@dataclass(frozen=True)
class ParamUpdate:
"""Parameter update event carrying value and metadata updates."""
devid: str
#: Device identifier.
pool: str
#: Parameter pool name.
chan: str
#: Channel identifier (``v``, ``s``, ``u`` ...).
idx: int
#: Parameter index.
value: Any | None
#: New parameter value, can be ``None`` for meta-only updates.
meta: dict[str, Any] = field(default_factory=dict)
#: Additional metadata dictionary.
ts: float = field(default_factory=time.time)
#: Timestamp when the update occurred.
seq: int = 0
#: Sequence number assigned by :class:`EventBus`.
[docs]
class EventBus:
"""Event bus for managing parameter update events.
Provides publish-subscribe functionality for parameter updates with
sequence numbering and thread-safe operations.
"""
def __init__(self) -> None:
"""Initialize the event bus."""
self._subs: list[asyncio.Queue[ParamUpdate]] = []
self._seq = 0
self._lock = asyncio.Lock()
[docs]
def last_seq(self) -> int:
"""Get the last sequence number.
Returns:
The last sequence number that was assigned, or -1 if no events have been published.
"""
return max(self._seq - 1, -1)
[docs]
async def publish(self, upd: ParamUpdate) -> None:
"""Publish an event to all subscribers.
Args:
upd: The parameter update event to publish.
"""
async with self._lock:
upd.__dict__["seq"] = self._seq # safe, despite frozen dataclass
self._seq += 1
# snapshot of subscriber list, so we don't hold lock during put()
targets: tuple[asyncio.Queue[ParamUpdate], ...] = tuple(self._subs)
# broadcast outside of lock
for q in targets:
await q.put(upd)
[docs]
async def subscribe(self) -> AsyncGenerator[ParamUpdate]:
"""Subscribe to events.
Returns:
An async iterator that yields parameter update events.
"""
q: asyncio.Queue[ParamUpdate] = asyncio.Queue()
async with self._lock:
self._subs.append(q)
try:
while True:
yield await q.get()
finally:
# unsubscribe subscriber
async with self._lock:
with suppress(ValueError):
self._subs.remove(q)