"""A package for getting GeoDataFrames from ArcGIS FeatureLayers."""
from __future__ import annotations
import asyncio
from re import compile, IGNORECASE
from aiohttp import ClientSession
from pandas import DataFrame, concat
FIELDDOESNOTEXIST: IndexError = IndexError("Field does not exist")
DEFAULTDICT: dict = {
"where": "1=1",
"outFields": "*",
"returnGeometry": True,
"returnCountOnly": False,
"f": "json",
}
[docs]
def default_data(
data: dict | None = None,
default_dict: dict | None = None,
) -> dict:
"""Return a dict with default values for ArcGIS REST API requests."""
default_dict = default_dict or DEFAULTDICT
return {**default_dict, **(data or {})}
[docs]
async def get_feature_count(
url: str,
session: ClientSession,
**kwargs,
) -> int:
"""Get the feature count for a layer."""
datadict: dict = {"where": "1=1", "returnCountOnly": True, "f": "json"}
if "data" in kwargs:
datadict["where"] = kwargs["data"].get("where", "1=1")
if "token" in kwargs["data"]:
datadict["token"] = kwargs["data"]["token"]
xkwargs: dict = {k: v for k, v in kwargs.items() if k != "data"}
response = await session.post(f"{url}/query", data=datadict, **xkwargs)
# the line above provides keyword arguments other than data dict
# because data dict is manipulated for this function
# (this allows the use of token authentication, for example)
response_json = await response.json()
try:
return response_json["count"]
except KeyError as e:
# print(response)
# print(url, datadict, kwargs, xkwargs, sep="\n")
# print(response_json)
raise e
[docs]
def get_max_record_count(metadata: dict) -> int:
"""Get the maximum record count for a layer."""
key_pattern = compile(
r"max(imum)?(\s|_)?record(\s|_)?count$",
flags=IGNORECASE,
)
key_list = [key for key in metadata.keys() if key_pattern.match(key)]
if len(key_list) != 1:
raise FIELDDOESNOTEXIST
return metadata[key_list[0]]
[docs]
async def get_offset_range(
url: str,
session: ClientSession,
**kwargs,
) -> range:
"""Get the offset range for a layer."""
feature_count = await get_feature_count(url, session, **kwargs)
metadata = await get_metadata(url, session)
max_record_count = get_max_record_count(metadata)
return range(0, feature_count, max_record_count)
[docs]
def get_name(metadata: dict) -> str:
"""Get the name of a layer."""
key_pattern = compile("name", flags=IGNORECASE)
key_list = [key for key in metadata.keys() if key_pattern.match(key)]
if len(key_list) != 1:
raise FIELDDOESNOTEXIST
return metadata[key_list[0]]
[docs]
def getfields(layer_metadata: dict, types: bool = False):
"""Get the fields of a layer."""
if types:
return {
f["name"]: f["type"].replace("esriFieldType", "")
for f in layer_metadata["fields"]
}
else:
return [f["name"] for f in layer_metadata["fields"]]
[docs]
def getfields_df(layer_metadata: dict) -> DataFrame:
"""Get the fields of a layer as a DataFrame."""
return DataFrame(
[
(f["name"], f["type"].replace("esriFieldType", ""))
for f in layer_metadata["fields"]
],
columns=["name", "type"],
)
[docs]
async def getuniquevalues(
url: str,
fields: tuple | str,
session: ClientSession,
sortby: str | None = None,
**kwargs,
) -> list | DataFrame:
"""Get the unique values for a field."""
datadict: dict = {
"where": "1=1",
"f": "json",
"returnGeometry": False,
"returnDistinctValues": True,
"outFields": fields if isinstance(fields, str) else ",".join(fields),
}
if "data" in kwargs:
datadict["where"] = kwargs["data"].get("where", "1=1")
if "token" in kwargs["data"]:
datadict["token"] = kwargs["data"]["token"]
xkwargs: dict = {k: v for k, v in kwargs.items() if k != "data"}
response = await session.post(f"{url}/query", data=datadict, **xkwargs)
metadata = await response.json()
res_l: list | None = None
res_df: DataFrame | None = None
if isinstance(fields, str):
res_l = [x["attributes"][fields] for x in metadata["features"]]
elif len(fields) == 1:
res_l = [x["attributes"][fields[0]] for x in metadata["features"]]
else:
res_df = concat(
[DataFrame(x).T.reset_index(drop=True) for x in metadata["features"]],
ignore_index=True,
)
if sortby:
res_df = res_df.sort_values(sortby).reset_index(drop=True)
return res_l or res_df
[docs]
async def getvaluecounts(
url: str,
field: str,
session: ClientSession,
**kwargs,
) -> DataFrame:
"""Get the value counts for a field."""
statstr = f'[{{"statisticType":"count","onStatisticField":"{field}","outStatisticFieldName":"{field}_count"}}]'
data = kwargs.pop("data", {})
data = {
"where": "1=1",
"f": "json",
"returnGeometry": False,
"outFields": field,
"outStatistics": statstr,
"groupByFieldsForStatistics": field,
**data,
}
response = await session.post(f"{url}/query", data=data, **kwargs)
metadata = await response.json()
features = metadata["features"]
cc = concat(
[DataFrame(x["attributes"], index=[0]) for x in features],
ignore_index=True,
)
return cc.sort_values(f"{field}_count", ascending=False).reset_index(drop=True)
[docs]
async def nestedcount(
url: str,
fields,
session: ClientSession,
**kwargs,
) -> DataFrame:
"""Get the nested value counts for a field."""
statstr = "".join(
(
"[",
",".join(
f'{{"statisticType":"count","onStatisticField":"{f}","outStatisticFieldName":"{f}_count"}}'
for f in fields
),
"]",
),
)
data = kwargs.pop("data", {})
data = {
"where": "1=1",
"f": "json",
"returnGeometry": False,
"outFields": ",".join(fields),
"outStatistics": statstr,
"groupByFieldsForStatistics": ",".join(fields),
**data,
}
response = await session.post(f"{url}/query", data=data, **kwargs)
metadata = await response.json()
features = metadata["features"]
cc = concat(
[DataFrame(x).T.reset_index(drop=True) for x in features],
ignore_index=True,
)
dropcol = [c for c in cc.columns if c.startswith(f"{fields[0]}_count")][0]
rencol = [c for c in cc.columns if c.startswith(f"{fields[1]}_count")][0]
return (
cc.drop(columns=dropcol)
.rename(columns={rencol: "Count"})
.sort_values([fields[0], "Count"], ascending=[True, False])
.reset_index(drop=True)
)