Source code for restgdf.utils.crawl

"""Recursive ArcGIS Server directory crawling and service discovery.

Provides :func:`fetch_all_data` for raw-dict output and :func:`safe_crawl`
for structured :class:`~restgdf.CrawlReport` output with per-stage error
capture.
"""

from __future__ import annotations

import asyncio
from typing import Any

from pydantic import BaseModel

from restgdf._client._protocols import AsyncHTTPSession
from restgdf._models.crawl import CrawlError, CrawlReport, CrawlServiceEntry
from restgdf._models.responses import LayerMetadata
from restgdf._config import get_config
from restgdf.utils.getinfo import service_metadata, get_metadata


def _to_plain_dict(value: Any) -> dict:
    """Normalize a pydantic model or mapping to a mutable ``dict``."""
    if isinstance(value, BaseModel):
        return value.model_dump(by_alias=True)
    return dict(value)


[docs] async def fetch_all_data( session: AsyncHTTPSession, base_url: str, token: str | None = None, return_feature_count: bool = False, ) -> dict: """Fetch all services and their layers in a highly concurrent manner.""" # BL-01: one BoundedSemaphore per top-level orchestration call, shared # with every nested ``service_metadata`` call so the cap is truly global # per request (plan.md §3c R-18/R-44, kickoff §10.3). The outer # fan-out uses plain ``asyncio.gather`` because the cap is enforced at # the nested ``service_metadata`` fan-out and at the directory-level # ``get_metadata`` calls below — wrapping outer tasks in the same sem # would double-acquire (``asyncio.Semaphore`` is not re-entrant). _sem = asyncio.BoundedSemaphore( get_config().concurrency.max_concurrent_requests, ) # Retrieve the initial list of folders and services try: async with _sem: base_metadata = _to_plain_dict( await get_metadata(base_url, session, token), ) except Exception as e: return {"error": e} base_metadata["url"] = base_url # Prepare list of service information to fetch layers services_list = [ { "name": service["name"], "url": f"{base_url}/{service['name']}/{service['type']}", } for service in base_metadata.get("services") or [] ] # Add nested folders' service information for folder in base_metadata.get("folders") or []: folder_url = f"{base_url}/{folder}" try: async with _sem: folder_metadata = _to_plain_dict( await get_metadata(folder_url, session, token), ) except Exception as e: return {"error": e} folder_metadata["url"] = folder_url services_list.extend( [ { "name": service["name"], "url": f"{base_url}/{service['name']}/{service['type']}", } for service in folder_metadata.get("services") or [] ], ) async def _service_metadata(*args, **kwargs): try: return await service_metadata(*args, _sem=_sem, **kwargs) except Exception as e: return {"error": e} # Fetch all layers concurrently service_metadata_tasks = [ _service_metadata( session, service["url"], token, return_feature_count=return_feature_count, ) for service in services_list ] service_metadata_results = await asyncio.gather(*service_metadata_tasks) # Combine service_metadata_results with service names for i, service_data in enumerate(service_metadata_results): services_list[i]["metadata"] = service_data return { "metadata": base_metadata, "services": services_list, }
def _make_error(stage: str, url: str, exc: BaseException) -> CrawlError: return CrawlError(stage=stage, url=url, message=str(exc), exception=exc) def _as_layer_metadata(raw: Any) -> LayerMetadata: if isinstance(raw, LayerMetadata): return raw return LayerMetadata.model_validate(raw)
[docs] async def safe_crawl( session: AsyncHTTPSession, base_url: str, token: str | None = None, return_feature_count: bool = False, ) -> CrawlReport: """Crawl an ArcGIS REST directory and aggregate results + errors. Unlike :func:`fetch_all_data`, this function never short-circuits on the first failure. Every recoverable error is captured as a typed :class:`~restgdf._models.crawl.CrawlError` entry in :attr:`CrawlReport.errors` and successful services are always present in :attr:`CrawlReport.services`. The three failure stages are ``"base_metadata"`` (root ``get_metadata`` call), ``"folder_metadata"`` (per-folder ``get_metadata`` call), and ``"service_metadata"`` (per-service ``service_metadata`` call). When a folder's metadata fails, services discovered in earlier folders (and the base) are still returned. """ errors: list[CrawlError] = [] services_raw: list[dict[str, Any]] = [] # BL-01: one BoundedSemaphore per top-level orchestration call, shared # with every nested ``service_metadata`` call so the cap is truly global # per request (plan.md §3c R-18/R-44, kickoff §10.3). The outer # ``asyncio.gather`` below is NOT wrapped in this sem because nested # orchestrators re-acquire it; ``asyncio.Semaphore`` is not re-entrant. _sem = asyncio.BoundedSemaphore( get_config().concurrency.max_concurrent_requests, ) try: async with _sem: base_metadata: dict[str, Any] = _to_plain_dict( await get_metadata(base_url, session, token), ) except Exception as exc: errors.append(_make_error("base_metadata", base_url, exc)) return CrawlReport(services=[], errors=errors, metadata=None) base_metadata["url"] = base_url for service in base_metadata.get("services") or []: services_raw.append( { "name": service["name"], "type": service.get("type"), "url": f"{base_url}/{service['name']}/{service['type']}", }, ) for folder in base_metadata.get("folders") or []: folder_url = f"{base_url}/{folder}" try: async with _sem: folder_metadata = _to_plain_dict( await get_metadata(folder_url, session, token), ) except Exception as exc: errors.append(_make_error("folder_metadata", folder_url, exc)) continue folder_metadata["url"] = folder_url for service in folder_metadata.get("services") or []: services_raw.append( { "name": service["name"], "type": service.get("type"), "url": f"{base_url}/{service['name']}/{service['type']}", }, ) async def _svc(url: str) -> Any: try: return await service_metadata( session, url, token, return_feature_count=return_feature_count, _sem=_sem, ) except Exception as exc: errors.append(_make_error("service_metadata", url, exc)) return None results = await asyncio.gather(*(_svc(svc["url"]) for svc in services_raw)) service_entries: list[CrawlServiceEntry] = [] for entry, result in zip(services_raw, results): service_entries.append( CrawlServiceEntry( name=entry["name"], url=entry["url"], type=entry.get("type"), metadata=_as_layer_metadata(result) if result is not None else None, ), ) return CrawlReport( services=service_entries, errors=errors, metadata=_as_layer_metadata(base_metadata), )