Source code for pybragerone.models.token

"""Token storage implementations for pybragerone.

This module provides token management and persistence strategies for the pybragerone
library. It includes:

- Token: A Pydantic model representing OAuth2-style authentication tokens
- TokenStore: A protocol defining the interface for token persistence
- CLITokenStore: A concrete implementation for CLI applications using keyring/file storage
- HATokenStore: An adapter for Home Assistant integration storage

The token storage implementations handle secure persistence of authentication tokens,
including access tokens, refresh tokens, and expiration information.

Classes:
    Token: Pydantic model representing an authentication token with expiration tracking.
    TokenStore: Protocol defining the interface for token storage implementations.
    CLITokenStore: Token storage for CLI applications with keyring and file fallback.
    HATokenStore: Adapter for Home Assistant storage with callable-based implementation.

Examples:
    Using CLITokenStore for CLI applications:
        >>> store = CLITokenStore(email="user@example.com")
        >>> token = store.load()
        >>> if token and not token.is_expired():
        ...     print("Token is valid")

    Using HATokenStore with custom storage callbacks:
        >>> def my_loader() -> Token | None:
        ...     # Load from HA storage
        ...     pass
        >>> def my_saver(token: Token) -> None:
        ...     # Save to HA storage
        ...     pass
        >>> def my_clearer() -> None:
        ...     # Clear from HA storage
        ...     pass
        >>> store = HATokenStore(loader=my_loader, saver=my_saver, clearer=my_clearer)

Notes:
    - CLITokenStore prefers system keyring when available, falling back to file storage
    - File storage uses 0600 permissions for security
    - All token storage operations suppress exceptions to prevent disruption
    - Token expiration includes a configurable leeway period (default 60 seconds)

Attributes:
    _HAS_KEYRING (bool): Whether the keyring library is available for secure storage.
    log (logging.Logger): Module logger for debugging and error reporting.
"""

from __future__ import annotations

import contextlib
import json
import logging
import os
import stat
from collections.abc import Callable
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any, Protocol, cast, runtime_checkable

from pydantic import BaseModel, ConfigDict, Field

try:
    import keyring

    _HAS_KEYRING = True
except ImportError:
    _HAS_KEYRING = False

log = logging.getLogger(__name__)


[docs] class Token(BaseModel): """Authentication token payload returned by POST /v1/auth/user.""" access_token: str | None refresh_token: str | None = None token_type: str = "bearer" expires_at: datetime | None = None user_id: int | None = None objects: list[dict[str, Any]] = Field(default_factory=list) model_config = ConfigDict(frozen=False)
[docs] @classmethod def from_login_payload(cls, data: dict[str, Any]) -> Token: """Create a Token instance from a login response payload.""" exp_raw = data.get("expiresAt") exp_dt: datetime | None = None if exp_raw: with contextlib.suppress(Exception): exp_dt = datetime.fromisoformat(str(exp_raw).replace("Z", "+00:00")) # Extract user.id with proper type narrowing user_obj = data.get("user", {}) or {} user_dict = cast(dict[str, Any], user_obj) if isinstance(user_obj, dict) else {} return cls( access_token=data.get("accessToken") or data.get("token") or "", refresh_token=data.get("refreshToken"), token_type=(data.get("type") or "bearer"), expires_at=exp_dt, user_id=user_dict.get("id") or None, objects=data.get("objects") or [], )
[docs] def is_expired(self, *, leeway: int = 60) -> bool: """Return True if the token is expired or will expire within `leeway` seconds.""" if not self.expires_at: return False now = datetime.now(UTC) return now + timedelta(seconds=leeway) >= self.expires_at
[docs] @runtime_checkable class TokenStore(Protocol): """Abstract persistence for auth tokens."""
[docs] def load(self) -> Token | None: """Return a cached Token or None if not present.""" ...
[docs] def save(self, token: Token) -> None: """Persist the Token atomically.""" ...
[docs] def clear(self) -> None: """Remove any persisted token.""" ...
# ---------- CLI IMPLEMENTATION ----------
[docs] @dataclass class CLITokenStore: """Token persistence for CLI. Preference order: 1) system keyring (if available) 2) file at ~/.config/pybragerone/token-<email>.json (0600) """ email: str service: str = "pybragerone" def _file_path(self) -> Path: """Return the path to the fallback token file.""" base = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) d = base / "pybragerone" d.mkdir(parents=True, exist_ok=True) return d / f"token-{self.email}.json" @staticmethod def _write_file_secure(p: Path, content: str) -> None: """Write a file with 0600 permissions, ignoring errors.""" p.write_text(content, encoding="utf-8") with contextlib.suppress(Exception): os.chmod(p, stat.S_IRUSR | stat.S_IWUSR) # 0o600
[docs] def load(self) -> Token | None: """Return a cached Token or None if not present.""" # 1) keyring if _HAS_KEYRING: raw = keyring.get_password(self.service, self.email) # -type: ignore[attr-defined] if raw: with contextlib.suppress(Exception): data = json.loads(raw) return self._to_token(data) # 2) fallback file p = self._file_path() if p.exists(): with contextlib.suppress(Exception): data = json.loads(p.read_text("utf-8")) return self._to_token(data) return None
[docs] def save(self, token: Token) -> None: """Persist the Token atomically.""" payload = json.dumps( { "access_token": getattr(token, "access_token", None), "token_type": getattr(token, "token_type", "bearer"), "refresh_token": getattr(token, "refresh_token", None), "expires_at": getattr(token, "expires_at", None), "objects": getattr(token, "objects", []) or [], }, ensure_ascii=False, ) if _HAS_KEYRING: keyring.set_password(self.service, self.email, payload) return self._write_file_secure(self._file_path(), payload)
[docs] def clear(self) -> None: """Remove any persisted token.""" if _HAS_KEYRING: with contextlib.suppress(Exception): keyring.delete_password(self.service, self.email) with contextlib.suppress(Exception): self._file_path().unlink()
@staticmethod def _to_token(data: dict[str, Any]) -> Token: """Convert a dict to a Token instance, ignoring errors.""" return Token( access_token=data.get("access_token"), token_type=data.get("token_type", "bearer"), refresh_token=data.get("refresh_token"), expires_at=data.get("expires_at"), objects=data.get("objects") or [], )
# ---------- HA EXAMPLE IMPLEMENTATION ----------
[docs] class HATokenStore(TokenStore): """Minimal example adapter for Home Assistant storage. Expects an object with async load/save/clear; wraps sync API expected by :class:`~pybragerone.api.BragerOneApiClient`. Replace with your concrete implementation in the HA integration. """ def __init__( self, loader: Callable[[], Token | None], saver: Callable[[Token], None], clearer: Callable[[], None], ) -> None: """Initialize with callables to load/save/clear the token. Args: loader: Callable that returns a Token or None. saver: Callable that takes a Token and persists it. clearer: Callable that removes any persisted token. Returns: None """ # Callables supplied by HA layer: # loader: () -> Optional[dict] # saver: (dict) -> None # clearer: () -> None self._loader = loader self._saver = saver self._clearer = clearer
[docs] def load(self) -> Token | None: """Return a cached Token or None if not present.""" data: Any = None with contextlib.suppress(Exception): data = self._loader() if not isinstance(data, dict): return None # Type narrowing: after isinstance check, cast to dict[str, Any] for proper typing token_data = cast(dict[str, Any], data) return Token( access_token=token_data.get("access_token"), token_type=token_data.get("token_type", "bearer"), refresh_token=token_data.get("refresh_token"), expires_at=token_data.get("expires_at"), objects=token_data.get("objects") or [], )
[docs] def save(self, token: Token) -> None: """Persist the Token atomically.""" payload: dict[str, Any] = { "access_token": getattr(token, "access_token", None), "token_type": getattr(token, "token_type", "bearer"), "refresh_token": getattr(token, "refresh_token", None), "expires_at": getattr(token, "expires_at", None), "objects": getattr(token, "objects", []) or [], } with contextlib.suppress(Exception): self._saver(Token.from_login_payload(payload))
[docs] def clear(self) -> None: """Remove any persisted token.""" with contextlib.suppress(Exception): self._clearer()