"""Layered runtime configuration for restgdf (phase-2a BL-18).
Seven frozen pydantic 2.x sub-configs mirror the plan-obs §3 taxonomy:
:class:`TransportConfig`, :class:`TimeoutConfig`, :class:`RetryConfig`,
:class:`LimiterConfig`, :class:`ConcurrencyConfig`, :class:`AuthConfig`,
:class:`TelemetryConfig`. The aggregate :class:`Config` is resolved lazily via
:func:`get_config` (LRU-cached size-1; reset with :func:`reset_config_cache`).
Env-var naming
--------------
New names follow ``RESTGDF_<CATEGORY>_<FIELD>`` (uppercased field name). The
following flat legacy names stay wired as deprecated aliases:
* ``RESTGDF_TIMEOUT_SECONDS`` → ``RESTGDF_TIMEOUT_TOTAL_S``
* ``RESTGDF_TOKEN_URL`` → ``RESTGDF_AUTH_TOKEN_URL``
* ``RESTGDF_REFRESH_THRESHOLD`` → ``RESTGDF_AUTH_REFRESH_THRESHOLD_S``
* ``RESTGDF_USER_AGENT`` → ``RESTGDF_TRANSPORT_USER_AGENT``
* ``RESTGDF_LOG_LEVEL`` → ``RESTGDF_TELEMETRY_LOG_LEVEL``
* ``RESTGDF_MAX_CONCURRENT_REQUESTS`` →
``RESTGDF_CONCURRENCY_MAX_CONCURRENT_REQUESTS``
Precedence: new name wins over old alias wins over model defaults. When the
old alias is set a :class:`DeprecationWarning` names its replacement. When
both the old alias and its preferred new name are set, the new name wins and
the deprecation warning notes that the old value was ignored. See MIGRATION.md
for the full migration guide.
The legacy :class:`restgdf.Settings` model and :func:`restgdf.get_settings`
remain as deprecated shims delegating here; see
:mod:`restgdf._models._settings`.
"""
from __future__ import annotations
import functools
import os
import warnings
from collections.abc import Mapping
from typing import Any, Callable, Literal
from pydantic import (
BaseModel,
ConfigDict,
Field,
HttpUrl,
SecretStr,
TypeAdapter,
ValidationError,
field_validator,
model_validator,
)
from restgdf._models._errors import RestgdfResponseError
from restgdf._models._settings import _VALID_LOG_LEVELS, _default_user_agent
_FROZEN = ConfigDict(extra="forbid", frozen=True, populate_by_name=True)
# pydantic HttpUrl-based validator for ``token_url`` strings. We keep the
# public field type as ``str`` so consumers (e.g. TokenSessionConfig) that
# expect plain strings do not break, but we reuse pydantic's URL parser for
# validation so we reject malformed inputs consistently.
_HTTP_URL_ADAPTER: TypeAdapter[HttpUrl] = TypeAdapter(HttpUrl)
# Logging aliases we accept in addition to the canonical level names. The
# stdlib logging module treats ``WARN`` as a synonym for ``WARNING`` and
# ``FATAL`` as a synonym for ``CRITICAL``; we normalize both here so
# ``RESTGDF_TELEMETRY_LOG_LEVEL=WARN`` does not raise.
_LOG_LEVEL_ALIASES: Mapping[str, str] = {"WARN": "WARNING", "FATAL": "CRITICAL"}
class TransportConfig(BaseModel):
"""HTTP transport knobs (TLS, user agent)."""
model_config = _FROZEN
verify_ssl: bool = True
user_agent: str = Field(default_factory=_default_user_agent, min_length=1)
class TimeoutConfig(BaseModel):
"""HTTP timeout budget (total + optional connect/read splits)."""
model_config = _FROZEN
connect_s: float | None = Field(default=None, gt=0)
read_s: float | None = Field(default=None, gt=0)
total_s: float = Field(default=30.0, gt=0)
class RetryConfig(BaseModel):
"""Retry policy (disabled by default; phase-3a wires the executor)."""
model_config = _FROZEN
enabled: bool = False
max_attempts: int = Field(default=5, ge=1)
max_delay_s: float = Field(default=60.0, gt=0)
class LimiterConfig(BaseModel):
"""Rate-limiter configuration (disabled by default)."""
model_config = _FROZEN
enabled: bool = False
rate_per_host: float | None = Field(default=None, gt=0)
class ConcurrencyConfig(BaseModel):
"""Bounded-semaphore ceiling for top-level orchestration calls."""
model_config = _FROZEN
max_concurrent_requests: int = Field(default=8, ge=1)
class AuthConfig(BaseModel):
"""ArcGIS token-session defaults.
.. versionchanged:: 3.0
Default *transport* flipped from ``"body"`` to ``"header"``; default
*header_name* is ``"X-Esri-Authorization"``. Pass
``allow_query_transport=True`` to enable ``transport="query"``.
"""
model_config = _FROZEN
token_url: str | None = None
transport: Literal["header", "body", "query"] = "header"
header_name: str = Field(default="X-Esri-Authorization", min_length=1)
referer: str | None = None
refresh_threshold_s: float = Field(default=60.0, ge=0)
refresh_leeway_s: float = Field(default=120.0, ge=0.0, le=600.0)
clock_skew_s: float = Field(default=30.0, ge=0.0, le=120.0)
username: SecretStr | None = None
password: SecretStr | None = None
token: SecretStr | None = None
allow_query_transport: bool = False
@field_validator("token_url")
@classmethod
def _check_token_url_scheme(cls, value: str | None) -> str | None:
if value is None:
return value
try:
_HTTP_URL_ADAPTER.validate_python(value)
except ValidationError as exc:
raise ValueError(
f"token_url must be a valid http(s) URL: {value!r} ({exc})",
) from exc
return value
@model_validator(mode="after")
def _reject_query_without_flag(self) -> AuthConfig:
"""R-13 strict: ``transport='query'`` without ``allow_query_transport`` → error."""
if self.transport == "query" and not self.allow_query_transport:
raise ValueError(
"transport='query' is insecure and requires "
"allow_query_transport=True at AuthConfig construction.",
)
return self
class TelemetryConfig(BaseModel):
"""Optional telemetry + legacy ``schema_drift`` log-level routing."""
model_config = _FROZEN
enabled: bool = False
service_name: str = Field(default="restgdf", min_length=1)
log_level: str = Field(default="WARNING")
@field_validator("log_level")
@classmethod
def _normalize_log_level(cls, value: str) -> str:
upper = value.upper()
upper = _LOG_LEVEL_ALIASES.get(upper, upper)
if upper not in _VALID_LOG_LEVELS:
raise ValueError(
f"log_level must be one of {sorted(_VALID_LOG_LEVELS)!r}",
)
return upper
class ResilienceConfig(BaseModel):
"""Resilience adapter configuration (BL-31).
Controls the optional stamina-based retry wrapper and per-service-root
token-bucket rate limiter. Disabled by default; callers opt in via
``RESTGDF_RESILIENCE_ENABLED=1`` or by constructing explicitly.
"""
model_config = _FROZEN
enabled: bool = False
rate_per_service_root_per_second: float | None = Field(default=None, gt=0)
respect_retry_after_max_s: float = Field(default=60.0, gt=0)
fallback_retry_after_seconds: float = Field(default=5.0, gt=0)
backend: str = "stamina"
_Caster = Callable[[str], Any]
def _parse_bool(raw: str) -> bool:
s = raw.strip().lower()
if s in {"1", "true", "yes", "on"}:
return True
if s in {"0", "false", "no", "off"}:
return False
raise ValueError(f"not a boolean: {raw!r}")
_NEW_ENV_SPEC: tuple[tuple[str, str, _Caster], ...] = (
("RESTGDF_TRANSPORT_VERIFY_SSL", "transport.verify_ssl", _parse_bool),
("RESTGDF_TRANSPORT_USER_AGENT", "transport.user_agent", str),
("RESTGDF_TIMEOUT_CONNECT_S", "timeout.connect_s", float),
("RESTGDF_TIMEOUT_READ_S", "timeout.read_s", float),
("RESTGDF_TIMEOUT_TOTAL_S", "timeout.total_s", float),
("RESTGDF_RETRY_ENABLED", "retry.enabled", _parse_bool),
("RESTGDF_RETRY_MAX_ATTEMPTS", "retry.max_attempts", int),
("RESTGDF_RETRY_MAX_DELAY_S", "retry.max_delay_s", float),
("RESTGDF_LIMITER_ENABLED", "limiter.enabled", _parse_bool),
("RESTGDF_LIMITER_RATE_PER_HOST", "limiter.rate_per_host", float),
(
"RESTGDF_CONCURRENCY_MAX_CONCURRENT_REQUESTS",
"concurrency.max_concurrent_requests",
int,
),
("RESTGDF_AUTH_TOKEN_URL", "auth.token_url", str),
("RESTGDF_AUTH_REFRESH_THRESHOLD_S", "auth.refresh_threshold_s", float),
("RESTGDF_TELEMETRY_ENABLED", "telemetry.enabled", _parse_bool),
("RESTGDF_TELEMETRY_SERVICE_NAME", "telemetry.service_name", str),
("RESTGDF_TELEMETRY_LOG_LEVEL", "telemetry.log_level", str),
("RESTGDF_RESILIENCE_ENABLED", "resilience.enabled", _parse_bool),
(
"RESTGDF_RESILIENCE_RATE_PER_SERVICE_ROOT_PER_SECOND",
"resilience.rate_per_service_root_per_second",
float,
),
(
"RESTGDF_RESILIENCE_RESPECT_RETRY_AFTER_MAX_S",
"resilience.respect_retry_after_max_s",
float,
),
(
"RESTGDF_RESILIENCE_FALLBACK_RETRY_AFTER_SECONDS",
"resilience.fallback_retry_after_seconds",
float,
),
("RESTGDF_RESILIENCE_BACKEND", "resilience.backend", str),
)
_DEPRECATED_ALIASES: tuple[tuple[str, str, str, _Caster], ...] = (
(
"RESTGDF_TIMEOUT_SECONDS",
"RESTGDF_TIMEOUT_TOTAL_S",
"timeout.total_s",
float,
),
("RESTGDF_TOKEN_URL", "RESTGDF_AUTH_TOKEN_URL", "auth.token_url", str),
(
"RESTGDF_REFRESH_THRESHOLD",
"RESTGDF_AUTH_REFRESH_THRESHOLD_S",
"auth.refresh_threshold_s",
float,
),
(
"RESTGDF_USER_AGENT",
"RESTGDF_TRANSPORT_USER_AGENT",
"transport.user_agent",
str,
),
(
"RESTGDF_LOG_LEVEL",
"RESTGDF_TELEMETRY_LOG_LEVEL",
"telemetry.log_level",
str,
),
(
"RESTGDF_MAX_CONCURRENT_REQUESTS",
"RESTGDF_CONCURRENCY_MAX_CONCURRENT_REQUESTS",
"concurrency.max_concurrent_requests",
int,
),
)
[docs]
class Config(BaseModel):
"""Aggregate of the eight sub-configs. Frozen.
Use :func:`get_config` (process-cached) rather than instantiating directly
in production code; direct instantiation is useful for tests.
"""
model_config = _FROZEN
transport: TransportConfig = Field(default_factory=TransportConfig)
timeout: TimeoutConfig = Field(default_factory=TimeoutConfig)
retry: RetryConfig = Field(default_factory=RetryConfig)
limiter: LimiterConfig = Field(default_factory=LimiterConfig)
concurrency: ConcurrencyConfig = Field(default_factory=ConcurrencyConfig)
auth: AuthConfig = Field(default_factory=AuthConfig)
telemetry: TelemetryConfig = Field(default_factory=TelemetryConfig)
resilience: ResilienceConfig = Field(default_factory=ResilienceConfig)
@classmethod
def from_env(
cls,
env: Mapping[str, str] | None = None,
*,
_warn_stacklevel: int = 2,
) -> Config:
"""Build :class:`Config` from environment variables.
Parameters
----------
env
Mapping of env-var name to value. Defaults to ``os.environ``.
Pass an explicit mapping (including ``{}``) to bypass the real
environment, primarily for tests.
_warn_stacklevel
Internal hook controlling the ``stacklevel`` passed to
:func:`warnings.warn` for deprecated-alias warnings. Direct
callers of ``Config.from_env`` get the default (``2``), which
attributes the warning to the user's call site.
:func:`get_config` overrides this to ``3`` so the warning
surfaces past the cached accessor frame. Not part of the
public API.
Raises
------
RestgdfResponseError
If any ``RESTGDF_*`` env var contains a malformed value or fails
pydantic validation. The original exception chains via
``__cause__``.
"""
source: Mapping[str, str] = os.environ if env is None else env
sub_kwargs: dict[str, dict[str, Any]] = {
"transport": {},
"timeout": {},
"retry": {},
"limiter": {},
"concurrency": {},
"auth": {},
"telemetry": {},
"resilience": {},
}
def _assign(dotted: str, value: Any) -> None:
section, field_name = dotted.split(".", 1)
sub_kwargs[section][field_name] = value
def _coerce(env_key: str, dotted: str, caster: _Caster) -> None:
raw = source.get(env_key)
if raw is None:
return
try:
_assign(dotted, caster(raw))
except (TypeError, ValueError) as exc:
raise RestgdfResponseError(
f"invalid value for {env_key}: {raw!r} ({exc})",
model_name=cls.__name__,
context=env_key,
raw=raw,
) from exc
for env_key, dotted, caster in _NEW_ENV_SPEC:
_coerce(env_key, dotted, caster)
for old_key, new_key, dotted, caster in _DEPRECATED_ALIASES:
if old_key not in source:
continue
if new_key in source:
warnings.warn(
f"{old_key} is deprecated; {new_key} is set and "
f"takes precedence (old value ignored).",
DeprecationWarning,
stacklevel=_warn_stacklevel,
)
continue
warnings.warn(
f"{old_key} is deprecated; use {new_key} instead.",
DeprecationWarning,
stacklevel=_warn_stacklevel,
)
_coerce(old_key, dotted, caster)
try:
return cls(
transport=TransportConfig(**sub_kwargs["transport"]),
timeout=TimeoutConfig(**sub_kwargs["timeout"]),
retry=RetryConfig(**sub_kwargs["retry"]),
limiter=LimiterConfig(**sub_kwargs["limiter"]),
concurrency=ConcurrencyConfig(**sub_kwargs["concurrency"]),
auth=AuthConfig(**sub_kwargs["auth"]),
telemetry=TelemetryConfig(**sub_kwargs["telemetry"]),
resilience=ResilienceConfig(**sub_kwargs["resilience"]),
)
except ValidationError as exc:
raise RestgdfResponseError(
f"Config validation failed: {exc.errors()!r}",
model_name=cls.__name__,
context="Config.from_env",
raw=dict(sub_kwargs),
) from exc
[docs]
@functools.lru_cache(maxsize=1)
def get_config() -> Config:
"""Return the process-wide cached :class:`Config` instance.
Deprecated-alias warnings emitted during env resolution attribute to the
caller of :func:`get_config` (``stacklevel=3``: one extra frame past the
:meth:`Config.from_env` default so the warning surfaces at user code).
"""
return Config.from_env(_warn_stacklevel=3)
def reset_config_cache() -> None:
"""Clear the :func:`get_config` cache *and* the legacy Settings cache.
Bidirectional cascade avoids stale Settings after env changes: callers
that reset only the new Config cache still get a fresh Settings shim
view on the next :func:`restgdf.get_settings` call.
"""
get_config.cache_clear()
from restgdf._models._settings import get_settings as _gs
_gs.cache_clear()
__all__ = [
"AuthConfig",
"ConcurrencyConfig",
"Config",
"LimiterConfig",
"ResilienceConfig",
"RetryConfig",
"TelemetryConfig",
"TimeoutConfig",
"TransportConfig",
"get_config",
"reset_config_cache",
]