Source code for lsapy.lsa

"""Land Suitability definition."""

from __future__ import annotations

import warnings
from collections.abc import Mapping
from typing import Any

import xarray as xr

from lsapy.aggregate import aggregate
from lsapy.core.formatting import lsa_repr
from lsapy.criteria import SuitabilityCriteria

__all__ = ["LandSuitabilityAnalysis"]


[docs] class LandSuitabilityAnalysis: """ Data structure to define and run land suitability analysis. The land suitability analysis is defined by a set of suitability criteria that are combined to compute the suitability. Parameters ---------- land_use : str A name for the land use. criteria : dict[str, SuitabilityCriteria] A dictionary of suitability criteria where the key is the name of the criteria. short_name : str, optional A short name for the land suitability analysis. The default is None. If provided, it will be stored as an attribute. long_name : str, optional A long name for the land suitability analysis. The default is None. If provided, it will be stored as an attribute. description : str, optional A description for the land suitability analysis. The default is None. If provided, it will be stored as an attribute. comment : str, optional Additional information about the land suitability analysis. The default is None. If provided, it will be stored as an attribute. attrs : Mapping[Any, Any], optional Arbitrary metadata to store with the land suitability analysis, in addition to the attributes `short_name`, `long_name`, `description`, and `comment`. The default is None. Examples -------- Let first define the ``SuitabilityCriteria`` (we use `xclim` package for the GDD computation): >>> from lsapy.utils import open_data >>> import lsapy.standardize as lstd >>> from xclim.indicators.atmos import growing_degree_days >>> drainage = open_data("land", variables="drainage") >>> tas = open_data("climate", variables="tas") >>> sc = { ... "drainage_class": SuitabilityCriteria( ... name="drainage_class", ... long_name="Drainage Class Suitability", ... weight=3, ... category="soilTerrain", ... indicator=drainage, ... func=lstd.discrete, ... fparams={"rules": {0: 0, 1: 0.1, 2: 0.5, 3: 0.9, 4: 1}}, ... ), ... "growing_degree_days": SuitabilityCriteria( ... name="growing_degree_days", ... long_name="Growing Degree Days Suitability", ... weight=1, ... category="climate", ... indicator=growing_degree_days(tas, thresh="10 degC", freq="YS-JUL"), ... func=lstd.vetharaniam2022_eq5, ... fparams={"a": -1.41, "b": 801}, ... ), ... } Now we can define the ``LandSuitabilityAnalysis`` : >>> lsa = LandSuitabilityAnalysis( ... land_use="land_use", ... short_name="land_suitability_analysis", ... long_name="Land Suitability Analysis", ... criteria=sc, ... ) The land suitability analysis can now be run: >>> lsa.run(inplace=True) """
[docs] def __init__( self, land_use: str, criteria: dict[str, SuitabilityCriteria], short_name: str | None = None, long_name: str | None = None, description: str | None = None, comment: str | None = None, attrs: Mapping[Any, Any] | None = None, ) -> None: self.land_use = land_use self.criteria = criteria _attrs = {} if short_name: _attrs.update({"short_name": short_name}) if long_name: _attrs.update({"long_name": long_name}) if description: _attrs.update({"description": description}) if comment: _attrs.update({"comment": comment}) if attrs: _attrs.update(dict(attrs)) self.attrs = _attrs self._sort_criteria_by_weight() # important if suitability as limited factor self._criteria_list = [sc.name for sc in self.criteria.values()] self.category = list(dict.fromkeys([sc.category for sc in self.criteria.values()])) self._get_params_by_category()
def __repr__(self) -> str: """Return a string representation of the land suitability.""" return lsa_repr(self) @property def land_use(self) -> str: """ Name of the land use. Returns ------- str Name of the land use. """ return self._land_use @land_use.setter def land_use(self, value: str) -> None: """ Set the name of the land use. Parameters ---------- value : str Name of the land use. """ self._land_use = value @property def criteria(self) -> dict[str, SuitabilityCriteria]: """ Dictionary of the suitability criteria. Returns ------- dict Dictionary containing the suitability criteria. """ return self._criteria @criteria.setter def criteria(self, value: dict[str, SuitabilityCriteria]) -> None: """ Set the suitability criteria. Parameters ---------- value : dict[str, SuitabilityCriteria] Dictionary of suitability criteria where the key is the name of the criteria. """ self._criteria = value @property def data(self) -> xr.Dataset: """ Dataset containing the computed suitability. Returns ------- xr.Dataset Dataset containing the computed suitability. """ return self._data @data.setter def data(self, value: xr.Dataset) -> None: """ Set the computed suitability dataset. Parameters ---------- value : xr.Dataset Dataset containing the computed suitability. """ self._data = value @property def category(self) -> list[str | None]: """ List of categories defined in the suitability criteria. Returns ------- list[str | None] List of categories defined in the suitability criteria. """ return self._category @category.setter def category(self, value: list[str | None]) -> None: """ Set the list of categories defined in the suitability criteria. Parameters ---------- value : list[str | None] List of categories defined in the suitability criteria. """ self._category = value @property def criteria_by_category(self) -> dict[str | None, list[str]]: """ Dictionary of criteria names grouped by category. Returns ------- dict Dictionary where keys are categories and values are lists of criteria names. """ return self._criteria_by_category @criteria_by_category.setter def criteria_by_category(self, value: dict[str | None, list[str]]) -> None: """ Set the dictionary of criteria names grouped by category. Parameters ---------- value : dict[str | None, list[str]] Dictionary where keys are categories and values are lists of criteria names. """ self._criteria_by_category = value @property def weights_by_category(self) -> dict[str | None, float]: """ Dictionary of total weights grouped by category. Returns ------- dict Dictionary where keys are categories and values are total weights. """ return self._weights_by_category @weights_by_category.setter def weights_by_category(self, value: dict[str | None, float]) -> None: """ Set the dictionary of total weights grouped by category. Parameters ---------- value : dict[str | None, float] Dictionary where keys are categories and values are total weights. """ self._weights_by_category = value @property def attrs(self) -> dict[Any, Any]: """ Dictionary of the Land Suitability Analysis attributes. Returns ------- dict Dictionary containing the attributes of the Land Suitability Analysis. """ return self._attrs @attrs.setter def attrs(self, value: Mapping[Any, Any]) -> None: """ Set the attributes of the Land Suitability Analysis. Parameters ---------- value : Mapping[Any, Any] Mapping of attributes to set for the Land Suitability Analysis. """ self._attrs = dict(value)
[docs] def run( self, suitability_type: str = "overall", agg_methods: str | dict[str, str] = "mean", by_category: bool | None = None, keep_vars: bool | None = True, inplace=False, **kwargs, ): """ Run the land suitability analysis. Parameters ---------- suitability_type : str, optional The type of suitability to compute. Options are 'criteria', 'category', or 'overall'. The default is 'overall'. agg_methods : str | dict[str, str], optional The aggregation method to use for the suitability computation. If a string, it applies the same method to compute the category and overall suitability. If a dictionary, the keys 'category' and 'overall' are used to specify the aggregation method to use for each type of suitability. The default is 'mean'. by_category : bool | None, optional If True, compute the overall suitability aggregating categories suitability. If False, use the criteria suitability. The default behavior uses categories suitability if categories are found in criteria, otherwise it uses the criteria suitability. keep_vars : bool | None, optional If True, return all the variables computed as part of the computation process, otherwise return only the data defined by the `suitability_type`. The default is True. inplace : bool, optional If True, compute the suitability in place. The default is False. **kwargs : dict Additional keyword arguments to pass to the suitability criteria compute method. Returns ------- None | xr.Dataset If `inplace` is False, return the computed suitability as a Dataset. If `inplace` is True, return None. Notes ----- To avoid biais in LSA categories outputs, it was decided to apply the same aggregation method to all categories. Examples -------- Let first define the ``SuitabilityCriteria`` (we use `xclim` package for the GDD computation): >>> from lsapy.utils import open_data >>> import lsapy.standardize as lstd >>> from xclim.indicators.atmos import growing_degree_days >>> drainage = open_data("land", variables="drainage") >>> tas = open_data("climate", variables="tas") >>> sc = { ... "drainage_class": SuitabilityCriteria( ... name="drainage_class", ... long_name="Drainage Class Suitability", ... weight=3, ... category="soilTerrain", ... indicator=drainage, ... func=lstd.discrete, ... fparams={"rules": {0: 0, 1: 0.1, 2: 0.5, 3: 0.9, 4: 1}}, ... ), ... "growing_degree_days": SuitabilityCriteria( ... name="growing_degree_days", ... long_name="Growing Degree Days Suitability", ... weight=1, ... category="climate", ... indicator=growing_degree_days(tas, thresh="10 degC", freq="YS-JUL"), ... func=lstd.vetharaniam2022_eq5, ... fparams={"a": -1.41, "b": 801}, ... ), ... } Now we can define the ``LandSuitabilityAnalysis`` : >>> lsa = LandSuitabilityAnalysis( ... land_use="land_use", ... short_name="land_suitability_analysis", ... long_name="Land Suitability Analysis", ... criteria=sc, ... ) The land suitability analysis can now be run: >>> lsa.run(inplace=True) """ def _pre_agg(suitability_type, by_category): """Prepare the aggregation variables and methods based on defined parameters.""" agg_on = {} if suitability_type == "overall": if by_category is None and self.category == [None]: by_category = False elif by_category is None: by_category = True if by_category: agg_on = {"suitability": self.category} else: agg_on = {"suitability": self._criteria_list} if suitability_type == "category" or by_category: if self.category == [None] and suitability_type == "overall": warnings.warn( "No categories defined. Computing suitability on criteria instead.", UserWarning, stacklevel=2, ) agg_on.update({"suitability": self._criteria_list}) elif self.category == [None]: warnings.warn( "No categories defined. Skipping category suitability computation.", UserWarning, stacklevel=2, ) else: agg_on = {**self.criteria_by_category, **agg_on} return agg_on, by_category suitability_type = suitability_type.lower() if suitability_type not in ["criteria", "category", "overall"]: raise ValueError( f"'suitability_type' must be one of 'criteria', 'category', or 'overall'. Got '{suitability_type}'." ) ds = self._run_criteria(**kwargs) if suitability_type in ["category", "overall"]: agg_on, by_category = _pre_agg(suitability_type, by_category) if agg_on != {}: if isinstance(agg_methods, str): agg_methods = {"category": agg_methods, "suitability": agg_methods} elif not isinstance(agg_methods, dict): raise TypeError(f"'agg_methods' must be a string or a dictionary. Got {type(agg_methods)}.") elif "overall" in agg_methods.keys(): suit_method = agg_methods.pop("overall") agg_methods = {**agg_methods, "suitability": suit_method} if "category" not in agg_methods.keys() and (by_category or suitability_type == "category"): raise ValueError("No aggregation method provided for 'category'.") elif suitability_type == "category" or by_category: cat_method = agg_methods.pop("category") agg_methods = { **{k: cat_method for k in self.category if k is not None}, **agg_methods, } if "suitability" not in agg_methods.keys() and suitability_type == "overall": raise ValueError("No aggregation method provided for 'overall'.") ds = self._aggregate( ds, agg_on=agg_on, methods=agg_methods, keep_vars=keep_vars, kwargs=self._format_agg_kwargs(agg_methods, agg_on), ) if not inplace: return ds else: if hasattr(self, "data"): warnings.warn("Existing data found and will be overwritten.", UserWarning, stacklevel=2) self.data = ds
def _run_criteria( self, **kwargs, ) -> xr.Dataset: """ Compute the suitability of each criteria. Parameters ---------- **kwargs : dict Additional keyword arguments to pass to the suitability criteria compute method. Returns ------- xr.Dataset A Dataset containing the computed suitability for each criteria. """ out: Any = [] attrs: dict[str, dict[Any, Any]] = {} for sc in self.criteria.values(): da = sc.compute(**kwargs) out.append(da) attrs[sc.name] = da.attrs out = xr.merge(out, compat="override", combine_attrs="drop") # Reassign attributes to each criteria for sc in out.data_vars: out[sc].attrs = attrs.get(sc, out[sc].attrs) out.attrs["land_use"] = self.land_use out.attrs["criteria"] = self._criteria_list out.attrs.update(self.attrs) return out def _sort_criteria_by_weight(self) -> None: self.criteria = dict( sorted( self.criteria.items(), key=lambda item: item[1].weight if item[1].weight is not None else 0, reverse=True, ) ) def _get_params_by_category(self): self.criteria_by_category = {category: [] for category in self.category} for sc in self.criteria.values(): self.criteria_by_category[sc.category].append(sc.name) self.weights_by_category = {category: [] for category in self.category} for category in self.category: self.weights_by_category[category] = sum( [sc.weight for sc in self.criteria.values() if sc.category == category] ) def _format_agg_kwargs(self, agg_methods: str | dict[str, str], agg_on: dict[str, list[str]]) -> dict[str, Any]: """ Format the keyword arguments for the reduction function based on the LandSuitabilityAnalysis object and dataset. Parameters ---------- agg_methods : str | dict[str, str] The aggregation methods to apply. If a string, it applies the same method for all aggregations. If a dictionary, keys are aggregated variable names and values are the associated aggregation methods. agg_on : dict[str, list[str]] A dictionary where keys are new variable names and values are lists of variable names to aggregate. Returns ------- kwargs : dict[str, Any] A dictionary of keyword arguments to pass to the aggregation function for each variable. """ kwargs = {} for k, v in agg_on.items(): k_method = agg_methods if isinstance(agg_methods, str) else agg_methods[k] if k_method in ["wmean", "wgmean"]: kwargs[k] = { "weights": [ self.weights_by_category[_v] if _v in self.category else self.criteria[_v].weight for _v in v ] } else: kwargs[k] = {} return kwargs @staticmethod def _aggregate( ds: xr.Dataset, agg_on: dict[str, list[str]], methods: str | dict[str, str], keep_vars: bool | None = False, kwargs: dict[str, Any] | None = None, ): """ Aggregate variables based on specified methods. Parameters ---------- ds : xr.Dataset The input dataset. agg_on : dict[str, list[str]] A dictionary where keys are new variable names and values are lists of variable names to aggregate. methods : str or dict[str, str] The aggregation methods to apply. If a string, it applies the same method for all aggregations. If a dictionary, keys are aggregated variable names and values are the associated aggregation methods. keep_vars : bool, optional If True, keeps the original variables in the output dataset. kwargs : dict[str, Any], optional Additional keyword arguments to pass to the aggregation function for each variable. Returns ------- xr.Dataset A dataset with the aggregated variables. If `keep_vars` is True, all original variables are kept. Otherwise, only the aggregated variables are returned. """ if isinstance(methods, str): methods = {k: methods for k in agg_on.keys()} elif not isinstance(methods, dict): raise TypeError("'methods' must be a string or a dictionary of strings.") for k, v in agg_on.items(): if kwargs and k in kwargs: kwargs_k = kwargs[k] else: kwargs_k = {} out = aggregate(ds, method=methods[k], variables=v, **kwargs_k) if methods[k] == "limfactor" and isinstance(out, xr.Dataset): ds[k] = out["limiting_factor"] ds[f"{k}_limvar"] = out["limiting_variable"].assign_attrs( {"long_name": f"{k.capitalize()} Limiting Factor Variable"} ) else: ds[k] = out ds[k].attrs.update({"long_name": f"{k.capitalize()} Suitability"}) if keep_vars: return ds vars_to_keep = [k for k in agg_on.keys() if k not in [i for e in agg_on.values() for i in e]] return ds[[i for v in vars_to_keep for i in ([v, f"{v}_limvar"] if methods[v] == "limfactor" else [v])]]