Source code for restgdf._models._settings

"""Process-level runtime settings for restgdf (**deprecated shim**).

Since phase-2a BL-18 this module is a *backwards-compatibility shim* over
:mod:`restgdf._config`. The flat :class:`Settings` model and
:func:`get_settings` remain for existing callers but:

* :func:`get_settings` emits a :class:`DeprecationWarning` on first use and
  constructs its return value from :func:`restgdf.get_config` — the new
  source of truth.
* Deprecated flat env-var names (``RESTGDF_TIMEOUT_SECONDS``,
  ``RESTGDF_TOKEN_URL``, ``RESTGDF_REFRESH_THRESHOLD``, ``RESTGDF_USER_AGENT``,
  ``RESTGDF_LOG_LEVEL``, ``RESTGDF_MAX_CONCURRENT_REQUESTS``) are honoured by
  :class:`restgdf.Config` with their own ``DeprecationWarning``s — see
  ``MIGRATION.md`` ``phase-2a``.

Prefer :func:`restgdf.get_config` / :class:`restgdf.Config` in new code. The
``Settings`` class and ``get_settings`` function will be removed no earlier
than restgdf 3.0.

``Settings.from_env`` is **not** deprecated at the method level — it is the
legacy direct path and keeps its original semantics (reads the flat env-var
set, no alias translation, no deprecation warnings). Callers that invoke
``Settings.from_env()`` directly therefore do **not** receive the new-wins
alias semantics; use :class:`restgdf.Config.from_env` for that contract.
"""

from __future__ import annotations

import functools
import os
import re
import warnings
from collections.abc import Mapping
from importlib.metadata import PackageNotFoundError, version as package_version
from pathlib import Path
from typing import Any, Callable

from pydantic import (
    BaseModel,
    ConfigDict,
    Field,
    HttpUrl,
    TypeAdapter,
    ValidationError,
    field_validator,
)

from restgdf._models._errors import RestgdfResponseError


_VERSION_RE = re.compile(r"""^__version__\s*=\s*["']([^"']+)["']\s*$""", re.MULTILINE)


# Shared URL-validation adapter + log-level alias map. Kept in sync with the
# copies in :mod:`restgdf._config` so the legacy :class:`Settings` shim
# applies the same validation as the new layered :class:`restgdf.Config`.
_HTTP_URL_ADAPTER: TypeAdapter[HttpUrl] = TypeAdapter(HttpUrl)
_LOG_LEVEL_ALIASES: dict[str, str] = {"WARN": "WARNING", "FATAL": "CRITICAL"}


@functools.lru_cache(maxsize=1)
def _restgdf_version() -> str:
    """Resolve the installed/package version without importing ``restgdf``."""
    try:
        return package_version("restgdf")
    except PackageNotFoundError:
        init_py = Path(__file__).resolve().parents[1] / "__init__.py"
        match = _VERSION_RE.search(init_py.read_text(encoding="utf-8"))
        if match is None:  # pragma: no cover - defensive fallback
            raise RuntimeError("Could not determine restgdf version")
        return match.group(1)


def _default_user_agent() -> str:
    """Return ``"restgdf/<version>"`` without forcing ``restgdf`` at import."""
    return f"restgdf/{_restgdf_version()}"


_VALID_LOG_LEVELS = frozenset(
    {"CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"},
)


[docs] class Settings(BaseModel): """Validated runtime configuration for restgdf. The model is **frozen**: treat a ``Settings`` instance as immutable. To change runtime configuration, mutate the environment (or pass an explicit mapping), then call :func:`reset_settings_cache` and re-fetch via :func:`get_settings`. """ model_config = ConfigDict(extra="forbid", frozen=True, populate_by_name=True) chunk_size: int = Field( default=100, gt=0, description=( "Default batch size for object-id chunking in feature queries. " "ArcGIS services advertise their own ``maxRecordCount``; this " "value is the library-level fallback." ), ) timeout_seconds: float = Field( default=30.0, gt=0, description="Default HTTP request timeout (seconds).", ) user_agent: str = Field( default_factory=_default_user_agent, min_length=1, description="User-Agent header sent on ArcGIS REST requests.", ) log_level: str = Field( default="WARNING", description=( "Log level applied to the ``restgdf.schema_drift`` logger when " "consumers opt in." ), ) token_url: str = Field( default="https://www.arcgis.com/sharing/rest/generateToken", description="Default ArcGIS ``generateToken`` endpoint.", ) refresh_threshold_seconds: int = Field( default=60, ge=0, description=( "Default token-refresh threshold for " ":class:`~restgdf.utils.token.ArcGISTokenSession`." ), ) max_concurrent_requests: int = Field( default=8, ge=1, description=( "Upper bound on the number of in-flight HTTP requests per " "top-level restgdf orchestration call (``service_metadata``, " "``fetch_all_data``, ``safe_crawl``). The default of 8 matches " "aiohttp's ``TCPConnector`` default connection-pool size; " "operators can raise or lower it via " "``RESTGDF_MAX_CONCURRENT_REQUESTS``. Saturation semantics = " "wait (plan.md §3c R-19)." ), ) default_headers_json: str | None = Field( default=None, description=( "Optional JSON-encoded dict merged into default request " "headers. Consumers parse this string at the HTTP boundary." ), ) @field_validator("log_level") @classmethod def _normalize_log_level(cls, value: str) -> str: upper = value.upper() upper = _LOG_LEVEL_ALIASES.get(upper, upper) if upper not in _VALID_LOG_LEVELS: raise ValueError( f"log_level must be one of {sorted(_VALID_LOG_LEVELS)!r}", ) return upper @field_validator("token_url") @classmethod def _check_token_url_scheme(cls, value: str) -> str: try: _HTTP_URL_ADAPTER.validate_python(value) except ValidationError as exc: raise ValueError( f"token_url must be a valid http(s) URL: {value!r} ({exc})", ) from exc return value @classmethod def from_env( cls, env: Mapping[str, str] | None = None, ) -> Settings: """Build :class:`Settings` from environment variables. Parameters ---------- env Mapping of env-var name to value. Defaults to ``os.environ``. Pass an explicit dict (including ``{}``) to bypass the real environment — primarily useful for tests. Raises ------ RestgdfResponseError If any ``RESTGDF_*`` variable contains a malformed value (bad cast or failed pydantic validation). The original exception is chained via ``__cause__``. """ source: Mapping[str, str] = os.environ if env is None else env kwargs: dict[str, Any] = {} def _coerce( env_key: str, field_name: str, caster: Callable[[str], Any], ) -> None: raw = source.get(env_key) if raw is None: return try: kwargs[field_name] = caster(raw) except (TypeError, ValueError) as exc: raise RestgdfResponseError( f"invalid value for {env_key}: {raw!r} ({exc})", model_name=cls.__name__, context=env_key, raw=raw, ) from exc _coerce("RESTGDF_CHUNK_SIZE", "chunk_size", int) _coerce("RESTGDF_TIMEOUT_SECONDS", "timeout_seconds", float) _coerce("RESTGDF_REFRESH_THRESHOLD", "refresh_threshold_seconds", int) _coerce( "RESTGDF_MAX_CONCURRENT_REQUESTS", "max_concurrent_requests", int, ) for env_key, field_name in ( ("RESTGDF_USER_AGENT", "user_agent"), ("RESTGDF_LOG_LEVEL", "log_level"), ("RESTGDF_TOKEN_URL", "token_url"), ("RESTGDF_DEFAULT_HEADERS_JSON", "default_headers_json"), ): raw = source.get(env_key) if raw is not None: kwargs[field_name] = raw try: return cls(**kwargs) except ValidationError as exc: raise RestgdfResponseError( f"Settings validation failed: {exc.errors()!r}", model_name=cls.__name__, context="Settings.from_env", raw=dict(kwargs), ) from exc
[docs] @functools.lru_cache(maxsize=1) def get_settings() -> Settings: """Return the process-wide cached :class:`Settings` instance. .. deprecated:: phase-2a Use ``restgdf.get_config()`` and ``restgdf.Config``. This shim constructs a :class:`Settings` from the cached ``Config`` and emits a single :class:`DeprecationWarning` per process. """ warnings.warn( "restgdf.get_settings() / restgdf.Settings are deprecated; " "use restgdf.get_config() / restgdf.Config instead. See " "MIGRATION.md phase-2a.", DeprecationWarning, stacklevel=2, ) from restgdf._config import get_config cfg = get_config() shim_kwargs: dict[str, Any] = { "timeout_seconds": cfg.timeout.total_s, "user_agent": cfg.transport.user_agent, "log_level": cfg.telemetry.log_level, "token_url": ( cfg.auth.token_url if cfg.auth.token_url is not None else Settings.model_fields["token_url"].get_default() ), "refresh_threshold_seconds": int(cfg.auth.refresh_threshold_s), "max_concurrent_requests": cfg.concurrency.max_concurrent_requests, } raw_chunk = os.environ.get("RESTGDF_CHUNK_SIZE") if raw_chunk is not None: try: shim_kwargs["chunk_size"] = int(raw_chunk) except ValueError as exc: raise RestgdfResponseError( f"invalid value for RESTGDF_CHUNK_SIZE: {raw_chunk!r} ({exc})", model_name="Settings", context="RESTGDF_CHUNK_SIZE", raw=raw_chunk, ) from exc raw_hdr = os.environ.get("RESTGDF_DEFAULT_HEADERS_JSON") if raw_hdr is not None: shim_kwargs["default_headers_json"] = raw_hdr try: return Settings(**shim_kwargs) except ValidationError as exc: raise RestgdfResponseError( f"Settings shim validation failed: {exc.errors()!r}", model_name="Settings", context="get_settings", raw=dict(shim_kwargs), ) from exc
def reset_settings_cache() -> None: """Clear the :func:`get_settings` cache *and* the new Config cache. Bidirectional cascade with ``restgdf.reset_config_cache`` so tests and long-lived processes can refresh all configuration with a single call regardless of which accessor they use. """ get_settings.cache_clear() from restgdf._config import get_config as _gc _gc.cache_clear() __all__ = [ "Settings", "get_settings", "reset_settings_cache", ]