Source code for pybragerone.api.client

"""HTTP API client for BragerOne."""

from __future__ import annotations

import asyncio
import contextlib
import json
import logging
import ssl
from collections.abc import Callable, Mapping
from typing import Any

import httpx

from pybragerone.models.api import (
    AuthResponse,
    BragerObject,
    Module,
    ModuleCard,
    ObjectDetails,
    Permission,
    SystemVersion,
    User,
)

from ..models.token import Token, TokenStore
from .constants import API_BASE, ONE_BASE
from .endpoints import (
    auth_revoke_url,
    auth_user_url,
    module_card_url,
    module_command_raw_url,
    module_command_url,
    modules_activity_quantity_url,
    modules_connect_url,
    modules_parameters_url,
    modules_url,
    object_permissions_url,
    object_url,
    objects_url,
    system_version_url,
    user_permissions_url,
    user_url,
)
from .server import BRAGERONE_SERVER, ServerConfig

LOG = logging.getLogger("pybragerone.api")
LOG_HTTP = logging.getLogger("pybragerone.http")


[docs] class ApiError(RuntimeError): """Raised for non-2xx HTTP responses. Attributes: status: HTTP status code. data: Response data. headers: Response headers. """ def __init__(self, status: int, data: Any, headers: dict[str, str] | None = None): """Initialize the API error. Args: status: HTTP status code. data: Response data. headers: Response headers dictionary. """ super().__init__(f"HTTP {status}: {data!r}") self.status = status self.data = data self.headers = headers or {}
[docs] class HttpCache: """Simple HTTP cache using ETag/Last-Modified headers with in-memory body storage. This cache implementation stores response bodies along with their cache headers to enable efficient HTTP caching with proper conditional request handling. """ def __init__(self) -> None: """Initialize an empty HTTP cache. Creates an empty cache store that maps URLs to tuples containing ETag, Last-Modified, and response body data. """ self._store: dict[str, tuple[str | None, str | None, bytes]] = {}
[docs] def headers_for(self, url: str) -> dict[str, str]: """Generate conditional request headers for cached URL. Returns appropriate If-None-Match and If-Modified-Since headers based on cached ETag and Last-Modified values for the given URL. Args: url: The URL to generate headers for. Returns: Dictionary of conditional headers to include in the request. """ etag, last_mod, _ = self._store.get(url, (None, None, b"")) headers = {} if etag: headers["If-None-Match"] = etag if last_mod: headers["If-Modified-Since"] = last_mod return headers
[docs] def update(self, url: str, headers: Mapping[str, str], body: bytes) -> None: """Update cache entry with response data. Stores the response body along with ETag and Last-Modified headers for the given URL to enable future conditional requests. Args: url: The URL being cached. headers: Response headers containing cache information. body: The response body to cache. """ etag = headers.get("ETag") last_mod = headers.get("Last-Modified") self._store[url] = (etag, last_mod, body)
[docs] def get_body(self, url: str) -> bytes | None: """Retrieve cached response body for URL. Args: url: The URL to retrieve the cached body for. Returns: Cached response body as bytes, or None if not cached. """ val = self._store.get(url) return val[2] if val else None
CredProvider = Callable[[], tuple[str, str]] # -> (email, password)
[docs] class BragerOneApiClient: """HTTP API client with idempotent login and auto-refresh. Provides a comprehensive HTTP client for the BragerOne API with automatic token management, authentication, and retry logic. """ def __init__( self, *, server: ServerConfig | None = None, api_base: str = API_BASE, one_base: str = ONE_BASE, io_base: str | None = None, token_store: TokenStore | None = None, enable_http_trace: bool = False, redact_secrets: bool = True, creds_provider: CredProvider | None = None, validate_on_start: bool = True, refresh_leeway: int = 90, timeout: float = 8.0, concurrency: int = 4, verify: bool | str | ssl.SSLContext = True, ): """Initialize the API client. Args: server: Optional server/platform configuration. If provided, it overrides `api_base`, `one_base`, and `io_base`. api_base: Base URL for the REST API. one_base: Base URL for the web app/assets origin. io_base: Base URL for the Engine.IO/Socket.IO server. When not provided, it is derived from `api_base`. token_store: Optional token storage for persistence. enable_http_trace: Whether to enable HTTP request/response tracing. redact_secrets: Whether to redact sensitive information in logs. creds_provider: Function that provides email/password credentials. validate_on_start: Whether to validate tokens on first use. refresh_leeway: Time in seconds before token expiry to refresh. timeout: Request timeout in seconds. concurrency: Maximum number of concurrent requests. verify: SSL verification configuration for httpx. """ self._session: httpx.AsyncClient | None = None effective_server = server or BRAGERONE_SERVER self._api_base = effective_server.api_base if server is not None else api_base.rstrip("/") self._one_base = effective_server.one_base if server is not None else one_base.rstrip("/") self._container = effective_server.container if server is not None else "BragerOne" if server is not None: self._io_base = effective_server.io_base.rstrip("/") else: self._io_base = (io_base or self._api_base.rsplit("/", 1)[0]).rstrip("/") self._enable_http_trace = enable_http_trace self._redact_secrets = redact_secrets self._validate_on_start = validate_on_start self._refresh_leeway = refresh_leeway self._token: Token | None = None self._validated: bool = False self._token_loader: Callable[[], Token | None] | None = None self._token_saver: Callable[[Token], None] | None = None self._token_clearer: Callable[[], None] | None = None # NEW self._skip_load_once = False if token_store is not None: self.set_token_store(token_store) self._creds_provider = creds_provider self._auth_lock = asyncio.Lock() self._connect_variant: dict[str, Any] | None = None self._cache = HttpCache() self._timeout = timeout self._sem = asyncio.Semaphore(concurrency) self._verify = verify # ----------------- session helpers -----------------
[docs] def set_token_store(self, store: TokenStore | None) -> None: """Wire token persistence to a store that exposes load/save/clear. Args: store: Token storage instance or None to disable persistence. """ if store is None: self._token_loader = None self._token_saver = None self._token_clearer = None return self._token_loader = store.load self._token_saver = store.save self._token_clearer = store.clear
async def _ensure_session(self) -> httpx.AsyncClient: """Ensure we have an AsyncClient (with optional HTTP tracing). Returns: An active httpx AsyncClient with configured headers and event hooks. """ if self._session and not self._session.is_closed: return self._session # Configure default headers headers = {"Origin": self._one_base, "Referer": f"{self._one_base}/"} # Configure timeout timeout = httpx.Timeout(self._timeout) # Create client self._session = httpx.AsyncClient( headers=headers, timeout=timeout, follow_redirects=True, verify=self._verify, ) # Add HTTP tracing event hooks if enabled if self._enable_http_trace: self._session.event_hooks = { "request": [self._log_request], "response": [self._log_response], } return self._session async def _log_request(self, request: httpx.Request) -> None: """Log HTTP request for tracing. Args: request: The httpx Request object. """ safe_headers = dict(request.headers) if self._redact_secrets and "Authorization" in safe_headers: safe_headers["Authorization"] = "<redacted>" LOG_HTTP.debug("→ %s %s headers=%s", request.method, request.url, safe_headers) async def _log_response(self, response: httpx.Response) -> None: """Log HTTP response for tracing. Args: response: The httpx Response object. """ LOG_HTTP.debug("← %s %s%s", response.request.method, response.request.url, response.status_code)
[docs] async def close(self) -> None: """Close the underlying HTTP session. This should be called when the client is no longer needed to properly release resources. """ if self._session and not self._session.is_closed: await self._session.aclose()
# ----------------- request ----------------- async def _req( self, method: str, path: str, *, json: Any | None = None, data: Any | None = None, headers: dict[str, str] | None = None, auth: bool = True, _retry: bool = True, ) -> tuple[int, Any, dict[str, str]]: """Perform an HTTP request with optional auth and auto-refresh on 401. Args: method: HTTP method (GET, POST, etc.). path: Request path/URL. json: JSON payload for the request body. data: Form data for the request body. headers: Additional HTTP headers. auth: Whether to include authentication headers. _retry: Whether to retry on authentication failure. Returns: Tuple of (status_code, response_data, response_headers). Raises: ApiError: For HTTP error responses (4xx/5xx). """ sess = await self._ensure_session() hdrs = dict(headers or {}) hdrs["Accept"] = "application/json" if auth: if not self._token or not self._token.access_token: raise ApiError(401, {"message": "No token"}, {}) hdrs["Authorization"] = f"Bearer {self._token.access_token}" resp = await sess.request(method, path, json=json, data=data, headers=hdrs) status = resp.status_code ctype = resp.headers.get("Content-Type", "") try: body = resp.json() if "application/json" in ctype else resp.text except Exception: body = None if status == 401 and auth and _retry: LOG.debug("401 received — attempting token refresh & single retry") em, pw = (None, None) if self._creds_provider: with contextlib.suppress(Exception): em, pw = self._creds_provider() with contextlib.suppress(Exception): await self.ensure_auth(em, pw) return await self._req( method, path, json=json, data=data, headers=headers, auth=auth, _retry=False, ) if status >= 400: raise ApiError(status, body, dict(resp.headers)) return status, body, dict(resp.headers) # -------- SYSTEM --------
[docs] async def get_system_version(self) -> SystemVersion: """Get system version information. Returns: System version information as a Pydantic model. Raises: ApiError: If the request fails. """ status, data, _ = await self._req( "GET", system_version_url(api_base=self._api_base, container=self._container), auth=False, ) if status != 200: raise ApiError(status, data) if not isinstance(data, dict): raise ApiError(500, {"message": "Unexpected version payload"}, {}) # API returns {"version": {...}}, extract the inner object version_data = data.get("version", data) return SystemVersion.model_validate(version_data)
# ----------------- AUTH -----------------
[docs] async def ensure_auth(self, email: str | None = None, password: str | None = None) -> Token: """Ensure valid token: use cache + validation, and if missing/expired — login. Args: email: User email for authentication. password: User password for authentication. Returns: Valid authentication token. Raises: ApiError: If authentication fails or credentials are missing. """ async with self._auth_lock: # 1) load from persistence (if loader provided) if self._token is None and self._token_loader and not self._skip_load_once: with contextlib.suppress(Exception): self._token = self._token_loader() self._skip_load_once = False # 2) if we have token and it's not expired — soft validation (optional) and done if self._token and not self._token.is_expired(leeway=self._refresh_leeway): if self._validate_on_start and not self._validated: try: await self._try_validate() return self._token except ApiError as e: if e.status in (401, 403): self._token = None self._validated = False else: raise else: return self._token # 3) no token → need credentials em = email pw = password if (not em or not pw) and self._creds_provider: em, pw = self._creds_provider() if not em or not pw: raise ApiError(401, {"message": "No credentials for (re)login"}, {}) # 4) classic login return await self._post_login(em, pw)
@property def access_token(self) -> str: """Get the current access token. Returns: Current access token string. Raises: RuntimeError: If the client is not authenticated yet. """ if self._token is None or not self._token.access_token: raise RuntimeError("ApiClient has no access token; call ensure_auth() first") return self._token.access_token @property def api_base(self) -> str: """Base URL for REST API calls.""" return self._api_base @property def one_base(self) -> str: """Base URL for web app/assets origin.""" return self._one_base @property def io_base(self) -> str: """Base URL for Engine.IO/Socket.IO server.""" return self._io_base async def _do_login_request(self, email: str, password: str) -> AuthResponse: """Execute login request to the authentication endpoint. Args: email: User email address. password: User password. Returns: Authentication response as a Pydantic model. Raises: ApiError: If the request fails. """ status, data, _ = await self._req( "POST", auth_user_url(api_base=self._api_base), json={"email": email, "password": password}, auth=False, ) if status != 200: raise ApiError(status, data) if not isinstance(data, dict): raise ApiError(500, {"message": "Unexpected login payload"}, {}) return AuthResponse.model_validate(data) async def _post_login(self, email: str, password: str) -> Token: """Login via /v1/auth/user. On 500/ER_DUP_ENTRY try short backoff and retry. Args: email: User email address. password: User password. Returns: Authentication token. Raises: ApiError: If login fails after all retries. """ import random delays = [0.2, 0.4, 0.8] # seconds for _, d in enumerate([*delays, None]): # last attempt without sleep after try: auth_response = await self._do_login_request(email, password) except ApiError as e: # only retry 500/duplicate errors if e.status == 500 and self._is_duplicate_token_error(e.data) and d is not None: jitter = random.uniform(0.0, 0.15) # nosec B311 - non-cryptographic jitter for retry backoff await asyncio.sleep(d + jitter) continue raise # Convert AuthResponse to Token using existing method tok = Token.from_login_payload(auth_response.model_dump()) if not tok.access_token: raise ApiError(500, {"message": "No accessToken in login payload"}, {}) self._token = tok self._validated = False if self._token_saver: with contextlib.suppress(Exception): self._token_saver(tok) return tok # practically won't reach here, but for safety: raise ApiError(500, {"message": "Login failed after retries"}, {}) async def _try_validate(self) -> None: """Try to validate the current token by making a test request. Raises: ApiError: If validation fails or token is invalid. """ if not self._token: raise ApiError(401, {"message": "No token"}, {}) try: status, _data, _hdrs = await self._req( "GET", user_url(api_base=self._api_base), auth=True, _retry=False, ) if status == 200: self._validated = True return except ApiError as e: if e.status == 401: self._token = None self._validated = False raise raise self._validated = True @staticmethod def _is_duplicate_token_error(data: Any) -> bool: """Check if error response indicates a duplicate token error. Args: data: Error response data. Returns: True if this is a duplicate token error. """ if not isinstance(data, dict): return False msg = str(data.get("message", "")).lower() return "duplicate entry" in msg or "er_dup_entry" in msg
[docs] async def revoke(self) -> None: """Server-side logout + local cleanup of token and persistence. Attempts to revoke the token on the server and cleans up local state regardless of server response. """ try: await self._req("POST", auth_revoke_url(api_base=self._api_base), auth=True) except ApiError as e: if e.status not in (401, 403, 404): raise finally: # RAM self._token = None self._validated = False # PERSISTENCE if self._token_clearer: with contextlib.suppress(Exception): self._token_clearer() # "don't load immediately" on the next ensure_auth in the same cycle self._skip_load_once = True
# -------- USER --------
[docs] async def get_user(self) -> User: """Get current user information. Returns: User information as a Pydantic model. Raises: ApiError: If the request fails. """ status, data, _ = await self._req("GET", user_url(api_base=self._api_base)) if status != 200: raise ApiError(status, data) if not isinstance(data, dict): raise ApiError(500, {"message": "Unexpected user payload"}, {}) # API returns {"user": {...}}, extract the inner object user_data = data.get("user", data) return User.model_validate(user_data)
[docs] async def get_user_permissions(self) -> list[Permission]: """Get current user permissions. Returns: List of user permissions. Raises: ApiError: If the request fails. """ status, data, _ = await self._req("GET", user_permissions_url(api_base=self._api_base)) if status != 200: raise ApiError(status, data) # API returns {"permissions": [...]} format permissions_list: list[str] = [] if isinstance(data, dict): permissions_list = data.get("permissions", []) elif isinstance(data, list): permissions_list = data # Convert strings to Permission objects return [Permission.model_validate(perm) for perm in permissions_list]
# -------- OBJECTS --------
[docs] async def get_objects(self) -> list[BragerObject]: """GET /v1/objects → list of objects (with tolerance for different shapes). Returns: List of BragerObject models. """ st, data, _ = await self._req("GET", objects_url(api_base=self._api_base)) if st != 200: return [] # Extract objects array from different response formats objects_data = [] if isinstance(data, dict) and isinstance(data.get("data"), list): objects_data = data["data"] elif isinstance(data, dict) and isinstance(data.get("objects"), list): objects_data = data["objects"] elif isinstance(data, list): objects_data = data # Convert to Pydantic models return [BragerObject.model_validate(obj) for obj in objects_data]
[docs] async def get_object(self, object_id: int) -> ObjectDetails: """Get object by ID. Args: object_id: The object identifier. Returns: Object details with operational status. Raises: ApiError: If the request fails. """ status, data, _ = await self._req("GET", object_url(object_id, api_base=self._api_base)) if status != 200: raise ApiError(status, data) if not isinstance(data, dict): raise ApiError(500, {"message": "Unexpected object payload"}, {}) return ObjectDetails.model_validate(data)
[docs] async def get_object_permissions(self, object_id: int) -> list[Permission]: """Get object-specific permissions for the current user. Args: object_id: The ID of the object to get permissions for. Returns: List of object-specific permissions. Raises: ApiError: If the request fails. """ status, data, _ = await self._req("GET", object_permissions_url(object_id, api_base=self._api_base)) if status != 200: raise ApiError(status, data) # API returns {"permissions": [...]} format permissions_list: list[str] = [] if isinstance(data, dict): permissions_list = data.get("permissions", []) elif isinstance(data, list): permissions_list = data # Convert strings to Permission objects return [Permission.model_validate(perm) for perm in permissions_list]
# -------- MODULES --------
[docs] async def get_modules(self, object_id: int) -> list[Module]: """GET /v1/modules?page=1&limit=999&group_id=<object_id> → list of modules. Args: object_id: The object/group identifier. Returns: List of Module models. """ st, data, _ = await self._req("GET", modules_url(object_id, api_base=self._api_base)) if st != 200: return [] # Extract modules array from different response formats modules_data = [] if isinstance(data, dict) and isinstance(data.get("data"), list): modules_data = data["data"] elif isinstance(data, list): modules_data = data # Convert to Pydantic models return [Module.model_validate(mod) for mod in modules_data]
[docs] async def get_module_card(self, code: str) -> ModuleCard: """Get module card information by code. Args: code: The module code identifier. Returns: Module card information as a Pydantic model. Raises: ApiError: If the request fails. """ status, data, _ = await self._req("GET", module_card_url(code, api_base=self._api_base)) if status != 200: raise ApiError(status, data) if not isinstance(data, dict): raise ApiError(500, {"message": "Unexpected module card payload"}, {}) return ModuleCard.model_validate(data)
[docs] async def modules_connect( self, wsid_ns: str, modules: list[str], group_id: int | None = None, engine_sid: str | None = None, ) -> bool: """Connect to modules via WebSocket. Args: wsid_ns: WebSocket namespace ID. modules: List of module codes to connect to. group_id: Optional group/object ID. engine_sid: Optional engine session ID as fallback. Returns: True if connection successful, False otherwise. """ if not modules: return True modules = sorted(set(modules)) headers = { "Content-Type": "application/json;charset=UTF-8", "X-Requested-With": "XMLHttpRequest", } def bodies(pref_sid: str) -> list[dict[str, Any]]: """Generate candidate request bodies for module connection.""" arr = [ {"wsid": pref_sid, "modules": modules}, {"sid": pref_sid, "modules": modules}, ] if group_id is not None: arr.append({"wsid": pref_sid, "group_id": str(group_id), "modules": modules}) return arr candidates: list[dict[str, Any]] = [] if self._connect_variant: candidates.append(self._connect_variant) # 1) namespace SID if wsid_ns: for body in bodies(wsid_ns): candidates.append(body) # 2) engine SID (fallback) if engine_sid: for body in bodies(engine_sid): candidates.append(body) # de-dupe seen: set[str] = set() uniq: list[dict[str, Any]] = [] for c in candidates: # hash payload "shape" as stable JSON key = json.dumps(c, sort_keys=True, separators=(",", ":")) if key not in seen: seen.add(key) uniq.append(c) for body in uniq: status, data, _ = await self._req( "POST", modules_connect_url(api_base=self._api_base), json=body, headers=headers, ) LOG.debug("modules.connect try %s%s %s", body, status, data if isinstance(data, dict) else "") if status in (200, 204): self._connect_variant = body return True return False
[docs] async def modules_parameters_prime(self, modules: list[str], *, return_data: bool = False) -> tuple[int, Any] | bool: """Prime modules parameters. Args: modules: List of module codes. return_data: Whether to return response data or just success status. Returns: Tuple of (status, data) if return_data=True, otherwise boolean success. """ payload = {"modules": modules} status, data, _ = await self._req("POST", modules_parameters_url(api_base=self._api_base), json=payload) # log_json_payload(LOG, "prime.modules.parameters", summarize_top_level(data)) return (status, data) if return_data else (status in (200, 204))
[docs] async def modules_activity_quantity_prime(self, modules: list[str], *, return_data: bool = False) -> tuple[int, Any] | bool: """Prime modules activity quantity. Args: modules: List of module codes. return_data: Whether to return response data or just success status. Returns: Tuple of (status, data) if return_data=True, otherwise boolean success. """ payload = {"modules": modules} status, data, _ = await self._req("POST", modules_activity_quantity_url(api_base=self._api_base), json=payload) # log_json_payload(LOG, "prime.modules.activity.quantity", summarize_top_level(data)) return (status, data) if return_data else (status in (200, 204))
[docs] async def module_command( self, *, devid: str, pool: str, parameter: str, value: Any, parameter_name: str | None = None, unit: int | None = None, extra_payload: Mapping[str, Any] | None = None, return_data: bool = False, ) -> tuple[int, Any] | bool: """Dispatch a parameter-like command payload to a module. This method posts to ``/v1/module/command`` and mirrors the web application's payload shape for non-raw writes. Args: devid: Module device identifier. pool: Parameter pool/group (for example ``"P4"``). parameter: Parameter channel/index token (for example ``"v8"``). value: Value to be written. parameter_name: Optional display/name token. unit: Optional unit identifier expected by backend for selected writes. extra_payload: Optional additional keys to include in the request payload. return_data: Whether to return ``(status, data)`` instead of only success. Returns: Tuple of ``(status, data)`` when ``return_data=True``; otherwise a boolean indicating request success. """ payload: dict[str, Any] = { "devid": devid, "pool": pool, "parameter": parameter, "value": value, } if parameter_name is not None: payload["parameterName"] = parameter_name if unit is not None: payload["unit"] = unit if extra_payload: payload.update(dict(extra_payload)) status, data, _ = await self._req("POST", module_command_url(api_base=self._api_base), json=payload) return (status, data) if return_data else (status in (200, 201, 202, 204))
[docs] async def module_command_raw( self, *, devid: str, command: str, value: Any | None = None, extra_payload: Mapping[str, Any] | None = None, return_data: bool = False, ) -> tuple[int, Any] | bool: """Dispatch a symbolic command to a module. This method matches the web application's command transport and posts to ``/v1/module/command/raw``. Args: devid: Module device identifier. command: Symbolic command name (for example ``"MODULE_RESTART"``). value: Optional command value (for example ``"ONLINE"`` or ``"OFF"``). extra_payload: Optional additional keys to include in the request payload. return_data: Whether to return ``(status, data)`` instead of only success. Returns: Tuple of ``(status, data)`` when ``return_data=True``; otherwise a boolean indicating request success. """ payload: dict[str, Any] = {"devid": devid, "command": command} if value is not None: payload["value"] = value if extra_payload: payload.update(dict(extra_payload)) status, data, _ = await self._req("POST", module_command_raw_url(api_base=self._api_base), json=payload) return (status, data) if return_data else (status in (200, 201, 202, 204))
[docs] async def module_command_auto( self, *, devid: str, value: Any | None = None, command: str | None = None, pool: str | None = None, parameter: str | None = None, parameter_name: str | None = None, unit: int | None = None, extra_payload: Mapping[str, Any] | None = None, return_data: bool = False, ) -> tuple[int, Any] | bool: """Automatically dispatch via ``/module/command`` or ``/module/command/raw``. Routing rules: - If ``command`` is provided, dispatches through ``/module/command/raw``. - If both ``pool`` and ``parameter`` are provided, dispatches through ``/module/command``. - If both route styles are provided at once, raises ``ValueError``. - If no complete route style is provided, raises ``ValueError``. Args: devid: Module device identifier. value: Command/write value. Optional for raw commands. command: Symbolic command for ``/module/command/raw``. pool: Parameter pool/group for ``/module/command``. parameter: Parameter channel/index token for ``/module/command``. parameter_name: Optional display/name token for ``/module/command``. unit: Optional unit identifier for ``/module/command``. extra_payload: Optional additional keys to include in request payload. return_data: Whether to return ``(status, data)`` instead of only success. Returns: Tuple of ``(status, data)`` when ``return_data=True``; otherwise a boolean indicating request success. """ has_raw_route = command is not None has_param_route = pool is not None or parameter is not None if has_raw_route and has_param_route: raise ValueError("Ambiguous command payload: provide either command or pool/parameter, not both") if has_raw_route: if command is None: raise ValueError("Missing raw command name") return await self.module_command_raw( devid=devid, command=command, value=value, extra_payload=extra_payload, return_data=return_data, ) if pool is None or parameter is None: raise ValueError("Missing command route data: provide command or both pool and parameter") return await self.module_command( devid=devid, pool=pool, parameter=parameter, value=value, parameter_name=parameter_name, unit=unit, extra_payload=extra_payload, return_data=return_data, )
# -------- ASSETS --------
[docs] async def fetch_text_one(self, path: str) -> tuple[int, str]: """Fetch text content from ONE_BASE. Args: path: The path to fetch from. Returns: Tuple of (status_code, text_content). """ status, data, _ = await self._req("GET", f"{self._one_base}/{path.lstrip('/')}") return (status, data) if isinstance(data, str) else (status, "")
[docs] async def fetch_json_one(self, path: str) -> tuple[int, dict[str, Any] | list[Any] | None]: """Fetch JSON content from ONE_BASE. Args: path: The path to fetch from. Returns: Tuple of (status_code, json_data). Note: Assets are JS, so JSON extraction should be done *catalog-side*, not here. This method is here only for symmetry; you may not use it. """ st, txt = await self.fetch_text_one(path) # NOTE: assets are JS, so JSON extraction should be *catalog-side*, not here # This method is here only for symmetry; you may not use it. try: return st, json.loads(txt) except Exception: return st, None
[docs] async def get_bytes(self, url: str) -> bytes: """Get raw bytes from URL with caching (ETag/Last-Modified). Args: url: The URL to fetch. Returns: The raw bytes of the response. Raises: ApiError: If the request fails. Note: This method uses an in-memory cache to store ETag and Last-Modified headers to optimize subsequent requests to the same URL. """ sess = await self._ensure_session() headers = self._cache.headers_for(url) async with self._sem: LOG.debug("HTTP GET %s", url) try: r = await sess.get(url, headers=headers) if r.status_code == 304: body = self._cache.get_body(url) if body is None: # 304 without previous body - fetch full content r2 = await sess.get(url) r2.raise_for_status() body = r2.content self._cache.update(url, r2.headers, body) return body # Body from cache is guaranteed to be bytes return body r.raise_for_status() body = r.content self._cache.update(url, r.headers, body) return body except Exception as e: LOG.warning("HTTP error for %s: %s", url, e) raise