Source code for restgdf.utils._stats

"""Async statistics/aggregation helpers for ArcGIS REST endpoints.

Private submodule; all public names are re-exported by
``restgdf.utils.getinfo`` to preserve import paths.
"""

from __future__ import annotations

from typing import TYPE_CHECKING, Any


from restgdf._client._protocols import AsyncHTTPSession
from restgdf._client.request import build_conservative_query_data
from restgdf._models._drift import _parse_response
from restgdf._models.responses import FeaturesResponse
from restgdf.utils._deprecations import deprecated_alias
from restgdf.utils._http import _arcgis_request, default_headers, default_timeout
from restgdf.utils._optional import require_pandas_dataframe

if TYPE_CHECKING:
    from pandas import DataFrame


def _feature_attributes(feature: dict[str, Any]) -> dict[str, Any]:
    """Normalize a feature payload to its attributes dict."""
    return dict(feature.get("attributes") or {})


def _records_to_frame(
    records: list[dict[str, Any]],
    *,
    feature: str,
    columns: list[str] | None = None,
) -> DataFrame:
    """Build a pandas DataFrame only when a tabular result is requested."""
    DataFrame = require_pandas_dataframe(feature)
    return DataFrame.from_records(records, columns=columns)


def _sorted_scalar_values(values: list[Any | None]) -> list[Any | None]:
    """Sort scalar REST values, falling back to a stable repr-based order."""
    raw_values: list[Any] = list(values)
    try:
        return sorted(raw_values)
    except TypeError:
        return sorted(raw_values, key=lambda value: (value is None, repr(value)))


[docs] async def get_unique_values( url: str, fields: tuple | str, session: AsyncHTTPSession, sortby: str | None = None, **kwargs, ) -> list | DataFrame: """Get the unique values for a field.""" if not isinstance(fields, str) and len(fields) > 1: require_pandas_dataframe("get_unique_values() with multiple fields") datadict = build_conservative_query_data( { "where": "1=1", "f": "json", "returnGeometry": False, "returnDistinctValues": True, "outFields": fields if isinstance(fields, str) else ",".join(fields), }, kwargs.get("data"), ) xkwargs: dict = {k: v for k, v in kwargs.items() if k != "data"} xkwargs.setdefault("timeout", default_timeout()) response = await _arcgis_request( session, f"{url}/query", datadict, headers=default_headers(xkwargs.pop("headers", None)), **xkwargs, ) raw = await response.json(content_type=None) envelope = _parse_response(FeaturesResponse, raw, context=f"{url}/query") features = envelope.features or [] records = [_feature_attributes(feature) for feature in features] if isinstance(fields, str): res_l = [record.get(fields) for record in records] if sortby and sortby == fields: res_l = _sorted_scalar_values(res_l) return res_l if len(fields) == 1: res_l = [record.get(fields[0]) for record in records] if sortby and sortby == fields[0]: res_l = _sorted_scalar_values(res_l) return res_l res_df = _records_to_frame( records, feature="get_unique_values() with multiple fields", columns=list(fields), ) if sortby: res_df = res_df.sort_values(sortby).reset_index(drop=True) return res_df
[docs] async def get_value_counts( url: str, field: str, session: AsyncHTTPSession, **kwargs, ) -> DataFrame: """Get the value counts for a field.""" require_pandas_dataframe("get_value_counts()") statstr = f'[{{"statisticType":"count","onStatisticField":"{field}","outStatisticFieldName":"{field}_count"}}]' data = kwargs.pop("data", None) or {} data = { "where": "1=1", "f": "json", "returnGeometry": False, "outFields": field, "outStatistics": statstr, "groupByFieldsForStatistics": field, **data, } kwargs.setdefault("timeout", default_timeout()) response = await _arcgis_request( session, f"{url}/query", data, headers=default_headers(kwargs.pop("headers", None)), **kwargs, ) raw = await response.json(content_type=None) envelope = _parse_response(FeaturesResponse, raw, context=f"{url}/query") features = envelope.features or [] cc = _records_to_frame( [_feature_attributes(feature) for feature in features], feature="get_value_counts()", ) if cc.empty: return cc.reindex(columns=[field, f"{field}_count"]) return cc.sort_values(f"{field}_count", ascending=False).reset_index(drop=True)
[docs] async def nested_count( url: str, fields, session: AsyncHTTPSession, **kwargs, ) -> DataFrame: """Get the nested value counts for a field.""" require_pandas_dataframe("nested_count()") statstr = "".join( ( "[", ",".join( f'{{"statisticType":"count","onStatisticField":"{f}","outStatisticFieldName":"{f}_count"}}' for f in fields ), "]", ), ) data = kwargs.pop("data", None) or {} data = { "where": "1=1", "f": "json", "returnGeometry": False, "outFields": ",".join(fields), "outStatistics": statstr, "groupByFieldsForStatistics": ",".join(fields), **data, } kwargs.setdefault("timeout", default_timeout()) response = await _arcgis_request( session, f"{url}/query", data, headers=default_headers(kwargs.pop("headers", None)), **kwargs, ) raw = await response.json(content_type=None) envelope = _parse_response(FeaturesResponse, raw, context=f"{url}/query") features = envelope.features or [] cc = _records_to_frame( [_feature_attributes(feature) for feature in features], feature="nested_count()", ) if cc.empty: return cc.reindex(columns=[*fields, "Count"]) dropcol = [c for c in cc.columns if c.startswith(f"{fields[0]}_count")][0] rencol = [c for c in cc.columns if c.startswith(f"{fields[1]}_count")][0] return ( cc.drop(columns=dropcol) .rename(columns={rencol: "Count"}) .sort_values([fields[0], "Count"], ascending=[True, False]) .reset_index(drop=True) )
# Deprecated legacy aliases (Phase 6). See `_deprecations.deprecated_alias`. getuniquevalues = deprecated_alias( get_unique_values, "getuniquevalues", "get_unique_values", ) getvaluecounts = deprecated_alias( get_value_counts, "getvaluecounts", "get_value_counts", ) nestedcount = deprecated_alias(nested_count, "nestedcount", "nested_count")