Source code for restgdf.featurelayer.featurelayer

"""A package for getting GeoDataFrames from ArcGIS FeatureLayers."""

from __future__ import annotations

import random
import warnings
from collections.abc import AsyncIterable, AsyncIterator
from typing import TYPE_CHECKING, Any, Literal


from restgdf._client._protocols import AsyncHTTPSession
from restgdf._compat import _warn_deprecated, aclosing
from restgdf._models.responses import LayerMetadata
from restgdf.errors import FieldDoesNotExistError
from restgdf.utils._optional import require_geo_stack
from restgdf.utils.getgdf import (
    _feature_to_row_dict,
    _iter_pages_raw,
    chunk_generator,
    get_gdf,
    row_dict_generator,
)
from restgdf.utils.getinfo import (
    default_data,
    get_feature_count,
    get_fields,
    get_fields_frame,
    get_metadata,
    get_name,
    get_object_id_field,
    get_unique_values,
    get_value_counts,
    nested_count,
)

# Deprecated names re-imported at module scope so callers can still patch
# them via ``unittest.mock.patch("restgdf.featurelayer.featurelayer.<old>")``.
# These look unused to linters but are required by backward-compat tests
# (see ``tests/test_compat.py::test_featurelayer_patch_targets``).
# Do NOT remove — emit DeprecationWarning via the shim, not by deletion.
from restgdf.utils.getinfo import (  # noqa: F401
    getuniquevalues,
    getvaluecounts,
    nestedcount,
)

# Keep the deprecated names reachable via ``__all__`` so static-analysis tools
# that respect ``__all__`` treat them as public re-exports.
__all__ = [
    "FeatureLayer",
    "get_unique_values",
    "get_value_counts",
    "nested_count",
    "getuniquevalues",
    "getvaluecounts",
    "nestedcount",
]
from restgdf.utils.utils import where_var_in_list, ends_with_num

if TYPE_CHECKING:
    from geopandas import GeoDataFrame
    from pandas import DataFrame


def _require_featurelayer_geo_support(feature: str) -> None:
    """Fail fast for FeatureLayer GeoDataFrame helpers on base installs."""
    require_geo_stack(feature)


[docs] class FeatureLayer: """A class for interacting with an ArcGIS REST FeatureLayer. Attributes ---------- metadata : restgdf.LayerMetadata Pydantic-validated layer metadata (name, fields, max record count, advanced query capabilities, ...). Replaces the pre-2.0 raw ``dict``. Extra keys sent by the server are preserved via ``extra="allow"`` and reachable through ``metadata.model_extra``. name : str Convenience alias for ``metadata.name``. fields : tuple[str, ...] Field names consumed by restgdf. object_id_field : str Resolved object-id field name (``"OBJECTID"`` when the server omits it). count : int Feature count, validated via ``CountResponse`` at prep time. """ def __init__( self, url: str, session: AsyncHTTPSession, where: str = "1=1", token: str | None = None, **kwargs, ): """Initialize a FeatureLayer instance. Prefer :meth:`from_url` for most use-cases — it calls :meth:`prep` automatically so metadata attributes are immediately available. Parameters ---------- url : str ArcGIS REST FeatureLayer endpoint URL. Must end with a numeric layer ID (e.g. ``".../MapServer/0"``). session : AsyncHTTPSession An aiohttp-compatible async HTTP session used for transport. where : str, default "1=1" ArcGIS WHERE clause filter applied to all queries. The default ``"1=1"`` selects all features. token : str or None, optional Optional ArcGIS token for secured services. If also supplied inside ``kwargs["data"]["token"]`` with a *different* value, a :class:`ValueError` is raised. **kwargs Additional keyword arguments forwarded to the underlying HTTP requests (e.g. ``data``, ``params``). Raises ------ ValueError If *url* does not end with a numeric layer ID, or if conflicting token values are provided via *token* and ``data["token"]``. See Also -------- from_url : Recommended async constructor that also calls :meth:`prep`. """ if not ends_with_num(url): raise ValueError( "The url must end with a number, which is the layer id of the FeatureLayer.", ) self.url = url self.session = session self.wherestr = where self.kwargs = kwargs self.datadict = default_data(kwargs.pop("data", {})) self.datadict["where"] = self.wherestr if token is not None: existing_token = self.datadict.get("token") if existing_token is not None and existing_token != token: raise ValueError( "Pass token either via token= or data['token'], not both with different values.", ) self.datadict["token"] = token self.kwargs["data"] = self.datadict self.uniquevalues: dict[ tuple[str | tuple, str | None], list | DataFrame, ] = {} self.valuecounts: dict = {} self.nestedcount: dict = {} self.gdf: GeoDataFrame | None = None self._fieldtypes_frame: DataFrame | None = None self.metadata: LayerMetadata self.name: str self.fields: tuple[str, ...] self.object_id_field: str self.count: int
[docs] async def prep(self): """Fetch and validate layer metadata from the server. Populates :attr:`metadata`, :attr:`name`, :attr:`fields`, :attr:`object_id_field`, and :attr:`count`. Must be called before accessing any metadata attributes unless the instance was created via :meth:`from_url` (which calls this method automatically). Raises ------ ValueError If the URL does not point to a Feature Layer (e.g. a Table or Map Service root). See Also -------- from_url : Recommended constructor that calls ``prep`` automatically. """ raw = await get_metadata( self.url, self.session, token=self.kwargs["data"].get("token"), ) self.metadata = ( raw if isinstance(raw, LayerMetadata) else LayerMetadata.model_validate(raw) ) if self.metadata.type != "Feature Layer": raise ValueError("The url must point to a FeatureLayer.") self.name = get_name(self.metadata) self.fields = get_fields(self.metadata) self._fieldtypes_frame = None self.object_id_field = get_object_id_field(self.metadata) self.count = await get_feature_count(self.url, self.session, **self.kwargs)
@property def fieldtypes(self) -> DataFrame: """Return field metadata as a DataFrame when pandas is available.""" if not hasattr(self, "metadata"): raise AttributeError("fieldtypes") if self._fieldtypes_frame is None: self._fieldtypes_frame = get_fields_frame(self.metadata) return self._fieldtypes_frame
[docs] @classmethod async def from_url(cls, url: str, **kwargs) -> FeatureLayer: """Create a prepared FeatureLayer from a URL. This is the recommended constructor. It instantiates the class and calls :meth:`prep` so all metadata attributes are immediately available on the returned instance. Parameters ---------- url : str ArcGIS REST FeatureLayer endpoint URL (must end with a numeric layer ID). **kwargs Forwarded to :meth:`__init__` — accepts ``session``, ``where``, ``token``, and any extra HTTP request arguments. Returns ------- FeatureLayer A fully prepared instance with metadata loaded. Raises ------ ValueError If *url* does not end with a number, conflicting tokens are provided, or the endpoint is not a Feature Layer. See Also -------- __init__ : Low-level constructor (requires a manual ``prep`` call). """ self = cls(url, **kwargs) await self.prep() return self
[docs] async def get_oids(self) -> list[int]: """Return all object IDs matching the current WHERE filter. Delegates to :meth:`get_unique_values` using the resolved :attr:`object_id_field`. Returns ------- list[int] Sorted list of object ID values for the filtered feature set. """ object_id_field = getattr(self, "object_id_field", "OBJECTID") return await self.get_unique_values(object_id_field)
[docs] async def sample_gdf(self, n: int = 10) -> GeoDataFrame: """Get n random features as a GeoDataFrame.""" _require_featurelayer_geo_support("FeatureLayer.sample_gdf()") oids = await get_unique_values( self.url, self.object_id_field, self.session, **self.kwargs, ) sample_oids = random.sample(oids, min(n, len(oids))) wherestr = where_var_in_list(self.object_id_field, sample_oids) new_rest = await self.where(wherestr) return await new_rest.get_gdf()
[docs] async def head_gdf(self, n: int = 10) -> GeoDataFrame: """Get the n first features as a GeoDataFrame.""" _require_featurelayer_geo_support("FeatureLayer.head_gdf()") oids = await get_unique_values( self.url, self.object_id_field, self.session, **self.kwargs, ) head_oids = oids[:n] wherestr = where_var_in_list(self.object_id_field, head_oids) new_rest = await self.where(wherestr) return await new_rest.get_gdf()
[docs] async def get_gdf(self) -> GeoDataFrame: """Get a GeoDataFrame from an ArcGIS FeatureLayer. The returned ``GeoDataFrame`` carries ``gdf.attrs["spatial_reference"]`` populated from the layer's metadata envelope (R-65) when the layer advertises a spatial reference via ``extent.spatialReference`` or top-level ``spatialReference``. """ if self.gdf is None: _require_featurelayer_geo_support("FeatureLayer.get_gdf()") self.gdf = await get_gdf(self.url, self.session, **self.kwargs) return self.gdf
# ----------------------------------------------------------------- # Streaming primitives (BL-24 / Q-A11). ``iter_pages`` is the single # low-level async generator every public streaming helper composes # on top of. ``stream_features`` is a deliberate alias of # ``iter_features`` so callers have a single canonical name. # -----------------------------------------------------------------
[docs] async def iter_pages( self, *, order: Literal["request", "completion"] = "request", max_concurrent_pages: int | None = None, on_truncation: Literal["raise", "ignore", "split"] = "raise", **kwargs: Any, ) -> AsyncIterator[dict[str, Any]]: """Yield raw ArcGIS query-page envelopes from this FeatureLayer. Parameters ---------- order ``"request"`` (default) yields pages in submit order. ``"completion"`` yields pages as the underlying fetches complete (may reorder relative to the pagination plan). max_concurrent_pages Upper bound on concurrent in-flight page fetches. ``None`` (default) leaves concurrency unbounded. on_truncation Behavior when a page reports ``exceededTransferLimit=true``: * ``"raise"`` (default) — raise :class:`restgdf.errors.RestgdfResponseError` with ``context='exceededTransferLimit'``. * ``"ignore"`` — log a ``restgdf.pagination`` warning and yield the truncated page anyway. * ``"split"`` — bisect the predicate's OID list and recurse (max depth 32; irreducible partitions raise). Yields ------ dict The full raw response envelope for each page (``features``, ``objectIdFieldName``, ``exceededTransferLimit``, etc.). Notes ----- When telemetry is enabled, emits exactly ONE INTERNAL parent span named ``feature_layer.stream`` wrapping the per-page loop (R-61). No per-page restgdf child spans are emitted. """ merged_kwargs = {**self.kwargs, **kwargs} if "data" in self.kwargs or "data" in kwargs: merged_kwargs["data"] = default_data( kwargs.get("data"), self.kwargs.get("data"), ) metadata = getattr(self, "metadata", None) layer_id = getattr(metadata, "id", None) if metadata is not None else None out_fields = self.datadict.get("outFields") span_where = self.wherestr if self.wherestr and self.wherestr != "1=1" else None # ``aclosing`` ensures the underlying async generator's ``finally`` # (which ends the R-61 INTERNAL span) runs when the consumer breaks # early or calls ``aclose()``. Without it, GC-deferred cleanup would # leak the span until the next event-loop tick. async with aclosing( _iter_pages_raw( self.url, self.session, order=order, max_concurrent_pages=max_concurrent_pages, on_truncation=on_truncation, span_layer_id=layer_id, span_out_fields=out_fields, span_where=span_where, **merged_kwargs, ), ) as pages: async for page in pages: yield page
[docs] async def iter_features( self, *, order: Literal["request", "completion"] = "request", max_concurrent_pages: int | None = None, on_truncation: Literal["raise", "ignore", "split"] = "raise", **kwargs: Any, ) -> AsyncIterator[dict[str, Any]]: """Yield one raw ArcGIS feature dict at a time. Thin wrapper over :meth:`iter_pages` that flattens each page's ``features`` list. See :meth:`iter_pages` for parameter semantics. """ async with aclosing( self.iter_pages( # type: ignore[type-var] order=order, max_concurrent_pages=max_concurrent_pages, on_truncation=on_truncation, **kwargs, ), ) as pages: async for page in pages: for feature in page.get("features", []): yield feature
# ``stream_features`` is the canonical public name; ``iter_features`` # is the lower-level iterator primitive. Keep them as the same # coroutine function so introspection and import paths agree # (see tests/test_feature_layer_streaming_public.py). stream_features = iter_features
[docs] async def stream_feature_batches( self, *, order: Literal["request", "completion"] = "request", max_concurrent_pages: int | None = None, on_truncation: Literal["raise", "ignore", "split"] = "raise", **kwargs: Any, ) -> AsyncIterator[list[dict[str, Any]]]: """Yield one list of raw feature dicts per page. See :meth:`iter_pages` for parameter semantics. """ async with aclosing( self.iter_pages( # type: ignore[type-var] order=order, max_concurrent_pages=max_concurrent_pages, on_truncation=on_truncation, **kwargs, ), ) as pages: async for page in pages: yield list(page.get("features", []))
[docs] async def stream_rows( self, *, order: Literal["request", "completion"] = "request", max_concurrent_pages: int | None = None, on_truncation: Literal["raise", "ignore", "split"] = "raise", **kwargs: Any, ) -> AsyncIterator[dict[str, Any]]: """Yield row-shaped dicts (attributes plus raw geometry). Each row is the layer feature's ``attributes`` merged with a ``geometry`` key holding the ArcGIS geometry dict verbatim. See :meth:`iter_pages` for parameter semantics. """ async with aclosing( self.iter_features( # type: ignore[type-var] order=order, max_concurrent_pages=max_concurrent_pages, on_truncation=on_truncation, **kwargs, ), ) as features: async for feature in features: yield _feature_to_row_dict(feature)
[docs] async def stream_gdf_chunks( self, **kwargs: Any, ) -> AsyncIterator[GeoDataFrame]: """Yield ``GeoDataFrame`` chunks; each chunk's ``attrs`` carries spatial_reference (R-65). Requires the optional geo stack (``geopandas`` / ``pyogrio``). """ _require_featurelayer_geo_support("FeatureLayer.stream_gdf_chunks()") merged_kwargs = {**self.kwargs, **kwargs} if "data" in self.kwargs or "data" in kwargs: merged_kwargs["data"] = default_data( kwargs.get("data"), self.kwargs.get("data"), ) async for chunk in chunk_generator(self.url, self.session, **merged_kwargs): yield chunk
[docs] async def get_df(self, resolve_domains: bool = False) -> DataFrame: """Get a pandas DataFrame from an ArcGIS FeatureLayer. Tabular row view: attributes plus any raw ``geometry`` dict returned by the server, with no geopandas/pyogrio dependency. Raises :class:`restgdf.errors.OptionalDependencyError` when ``pandas`` is not installed. This is the pandas-only counterpart to :meth:`get_gdf` — prefer it when callers only need tabular access and want to avoid the full geo dependency stack. Parameters ---------- resolve_domains: When ``True``, coded-value domain fields are post-processed so the DataFrame contains the human-readable ``name`` rather than the raw ``code``. Codes absent from the domain's ``codedValues`` table pass through unchanged. Range domains are not validated or coerced. Defaults to ``False`` — the historical behavior where the DataFrame faithfully mirrors the server payload. No additional HTTP traffic is issued; resolution uses the already-loaded :attr:`FeatureLayer.metadata` fetched during :meth:`prep`. Examples -------- >>> df = await layer.get_df(resolve_domains=True) # doctest: +SKIP >>> df["STATUS"].head().tolist() # doctest: +SKIP ['Active', 'Inactive', 'Active', ...] """ from restgdf.adapters.pandas import ( arows_to_dataframe, resolve_domains as _resolve_domains, ) df = await arows_to_dataframe(self.stream_rows()) if resolve_domains: fields = getattr(getattr(self, "metadata", None), "fields", None) df = _resolve_domains(df, fields) return df
[docs] async def row_dict_generator( self, **kwargs, ) -> AsyncIterable[dict]: """Asynchronously yield rows from a GeoDataFrame as dictionaries. .. deprecated:: 2.0 Use :meth:`stream_rows` instead. This method emits a :class:`DeprecationWarning` and continues to delegate to the module-level ``row_dict_generator`` helper for backwards compatibility with existing ``unittest.mock.patch`` targets. Scheduled for removal in a future release. """ _warn_deprecated( "FeatureLayer.row_dict_generator is deprecated; " "use FeatureLayer.stream_rows instead.", ) merged_kwargs = {**self.kwargs, **kwargs} if "data" in self.kwargs or "data" in kwargs: merged_kwargs["data"] = default_data( kwargs.get("data"), self.kwargs.get("data"), ) _gen = row_dict_generator(self.url, self.session, **merged_kwargs) async for row in _gen: yield row
[docs] async def get_unique_values( self, fields: tuple | str, sortby: str | None = None, ) -> list | DataFrame: """Get unique values for one or more fields. Results are cached per ``(fields, sortby)`` key for the lifetime of this instance. Parameters ---------- fields : str or tuple of str A single field name (returns a list) or a tuple of field names (returns a :class:`~pandas.DataFrame`; requires the geo extra). sortby : str or None, optional Field name to sort results by. When ``None``, the server's default ordering is used. Returns ------- list or DataFrame A plain list when *fields* is a single string, or a DataFrame when *fields* is a tuple. Raises ------ FieldDoesNotExistError If any requested field is not present in the layer schema. """ cache_key = (fields, sortby) if cache_key not in self.uniquevalues: if (isinstance(fields, str) and fields not in self.fields) or ( not isinstance(fields, str) and any(field not in self.fields for field in fields) ): raise FieldDoesNotExistError( fields, context="FeatureLayer.get_unique_values", ) self.uniquevalues[cache_key] = await get_unique_values( self.url, fields, self.session, sortby, **self.kwargs, ) return self.uniquevalues[cache_key]
[docs] async def get_value_counts(self, field: str) -> DataFrame: """Get value counts for a single field. Results are cached per field for the lifetime of this instance. Parameters ---------- field : str The field name to compute value counts for. Returns ------- DataFrame A pandas DataFrame with one row per distinct value and an associated count column. Raises ------ FieldDoesNotExistError If *field* is not present in the layer schema. """ if field not in self.valuecounts: if field not in self.fields: raise FieldDoesNotExistError( field, context="FeatureLayer.get_value_counts", ) self.valuecounts[field] = await get_value_counts( self.url, field, self.session, **self.kwargs, ) return self.valuecounts[field]
[docs] async def get_nested_count(self, fields: tuple) -> DataFrame: """Get nested (cross-tabulated) value counts for multiple fields. Results are cached per *fields* tuple for the lifetime of this instance. Parameters ---------- fields : tuple of str Two or more field names to cross-tabulate. Returns ------- DataFrame A pandas DataFrame with one row per unique combination of values across the requested fields and an associated count. Raises ------ FieldDoesNotExistError If any field in *fields* is not present in the layer schema. """ if fields not in self.nestedcount: if any(field not in self.fields for field in fields): raise FieldDoesNotExistError( fields, context="FeatureLayer.get_nested_count", ) self.nestedcount[fields] = await nested_count( self.url, fields, self.session, **self.kwargs, ) return self.nestedcount[fields]
# ----------------------------------------------------------------- # Deprecated legacy method names (Phase 6). Emit DeprecationWarning # and delegate to the canonical implementation. Kept for backward # compatibility; will be removed in a future release. # -----------------------------------------------------------------
[docs] async def getoids(self) -> list[int]: """Deprecated alias for :meth:`get_oids`.""" warnings.warn( "`FeatureLayer.getoids` is deprecated; use `get_oids` instead.", DeprecationWarning, stacklevel=2, ) return await self.get_oids()
[docs] async def samplegdf(self, n: int = 10) -> GeoDataFrame: """Deprecated alias for :meth:`sample_gdf`.""" warnings.warn( "`FeatureLayer.samplegdf` is deprecated; use `sample_gdf` instead.", DeprecationWarning, stacklevel=2, ) return await self.sample_gdf(n)
[docs] async def headgdf(self, n: int = 10) -> GeoDataFrame: """Deprecated alias for :meth:`head_gdf`.""" warnings.warn( "`FeatureLayer.headgdf` is deprecated; use `head_gdf` instead.", DeprecationWarning, stacklevel=2, ) return await self.head_gdf(n)
[docs] async def getgdf(self) -> GeoDataFrame: """Deprecated alias for :meth:`get_gdf`.""" warnings.warn( "`FeatureLayer.getgdf` is deprecated; use `get_gdf` instead.", DeprecationWarning, stacklevel=2, ) return await self.get_gdf()
[docs] async def getuniquevalues( self, fields: tuple | str, sortby: str | None = None, ) -> list | DataFrame: """Deprecated alias for :meth:`get_unique_values`.""" warnings.warn( "`FeatureLayer.getuniquevalues` is deprecated; use " "`get_unique_values` instead.", DeprecationWarning, stacklevel=2, ) return await self.get_unique_values(fields, sortby)
[docs] async def getvaluecounts(self, field: str) -> DataFrame: """Deprecated alias for :meth:`get_value_counts`.""" warnings.warn( "`FeatureLayer.getvaluecounts` is deprecated; use " "`get_value_counts` instead.", DeprecationWarning, stacklevel=2, ) return await self.get_value_counts(field)
[docs] async def getnestedcount(self, fields: tuple) -> DataFrame: """Deprecated alias for :meth:`get_nested_count`.""" warnings.warn( "`FeatureLayer.getnestedcount` is deprecated; use " "`get_nested_count` instead.", DeprecationWarning, stacklevel=2, ) return await self.get_nested_count(fields)
[docs] async def where(self, wherestr: str) -> FeatureLayer: """Create a refined ``FeatureLayer`` bound to ``wherestr``. BL-46: when the current instance has already resolved its schema via :meth:`prep`, the refined child reuses the parent's cached ``metadata`` / ``name`` / ``fields`` / ``object_id_field`` so the expensive metadata GET (``?f=json``) is not re-issued. The feature-count POST is still issued, but scoped to the refined ``where_clause`` so ``refined.count`` is correct for the refined filter. """ wherestr_plus = ( wherestr if self.wherestr == "1=1" else f"{self.wherestr} AND {wherestr}" ) if not hasattr(self, "metadata"): return await FeatureLayer.from_url( self.url, session=self.session, where=wherestr_plus, **self.kwargs, ) refined_kwargs = {k: v for k, v in self.kwargs.items() if k != "data"} refined_data = { k: v for k, v in self.kwargs.get("data", {}).items() if k != "where" } refined = FeatureLayer( self.url, session=self.session, where=wherestr_plus, data=refined_data, **refined_kwargs, ) refined.metadata = self.metadata refined.name = self.name refined.fields = self.fields refined._fieldtypes_frame = None refined.object_id_field = self.object_id_field refined.count = await get_feature_count( refined.url, refined.session, **refined.kwargs, ) return refined
def __repr__(self) -> str: """Return a string representation of the Rest object.""" kwargstr = ", ".join(f"{k}={v}" for k, v in self.kwargs.items()) return f"Rest({self.url}, {self.session}, {self.wherestr}, {kwargstr})" def __str__(self) -> str: """Return a string representation of the Rest object.""" return f"{self.name} ({self.url})"