diff --git a/lmo/_lm.py b/lmo/_lm.py index f92dcfbb..bf8c6fd5 100644 --- a/lmo/_lm.py +++ b/lmo/_lm.py @@ -12,7 +12,6 @@ from . import ostats, pwm_beta from ._utils import ( clean_order, - clean_orders, clean_trim, ensure_axis_at, l_stats_orders, @@ -22,7 +21,6 @@ sort_maybe, ) from .linalg import ir_pascal, sandwich, sh_legendre, trim_matrix -from .typing import AnyOrder, AnyOrderND if sys.version_info >= (3, 13): from typing import TypeVar @@ -34,6 +32,7 @@ import lmo.typing as lmt import lmo.typing.np as lnpt + from .typing import AnyOrder, AnyOrderND __all__ = ( @@ -248,9 +247,9 @@ def l_moment( a: lnpt.AnyArrayFloat, r: AnyOrder, /, - trim: lmt.AnyTrim = ..., + trim: lmt.AnyTrim = 0, *, - axis: None = ..., + axis: None = None, dtype: _DType[_SCT_f] = np.float64, **kwds: Unpack[lmt.LMomentOptions], ) -> _SCT_f: ... @@ -259,7 +258,7 @@ def l_moment( a: lnpt.AnyMatrixFloat | lnpt.AnyTensorFloat, r: AnyOrder | AnyOrderND, /, - trim: lmt.AnyTrim = ..., + trim: lmt.AnyTrim = 0, *, axis: int, dtype: _DType[_SCT_f] = np.float64, @@ -270,7 +269,7 @@ def l_moment( a: lnpt.AnyVectorFloat, r: AnyOrder, /, - trim: lmt.AnyTrim = ..., + trim: lmt.AnyTrim = 0, *, axis: int, dtype: _DType[_SCT_f] = np.float64, @@ -281,9 +280,9 @@ def l_moment( a: lnpt.AnyArrayFloat, r: AnyOrderND, /, - trim: lmt.AnyTrim = ..., + trim: lmt.AnyTrim = 0, *, - axis: int | None = ..., + axis: int | None = None, dtype: _DType[_SCT_f] = np.float64, **kwds: Unpack[lmt.LMomentOptions], ) -> onpt.Array[Any, _SCT_f]: ... @@ -410,11 +409,8 @@ def l_moment( x_k = ensure_axis_at(x_k, axis, -1) n = x_k.shape[-1] - if np.isscalar(r): - _r = np.array(clean_order(cast(AnyOrder, r))) - else: - _r = clean_orders(cast(AnyOrderND, r)) - r_min, r_max = np.min(_r), int(np.max(_r)) + _r = clean_order(r) + r_min, r_max = np.min(_r), np.max(_r) # TODO @jorenham: nan handling, see: # https://github.com/jorenham/Lmo/issues/70 @@ -422,7 +418,7 @@ def l_moment( (s, t) = st = clean_trim(trim) # ensure that any inf's (not nan's) are properly trimmed - if (s or t) and isinstance(s, int): + if isinstance(s, int): if s: x_k[..., :s] = np.nan_to_num(x_k[..., :s], nan=np.nan) if t: diff --git a/lmo/_utils.py b/lmo/_utils.py index 58a4695b..adeefb69 100644 --- a/lmo/_utils.py +++ b/lmo/_utils.py @@ -8,10 +8,8 @@ import numpy.typing as npt if sys.version_info >= (3, 13): - from typing import LiteralString, TypeVar + from typing import TypeVar else: - from typing import LiteralString - from typing_extensions import TypeVar if TYPE_CHECKING: @@ -37,15 +35,15 @@ _SCT = TypeVar("_SCT", bound=np.generic) _SCT_uifc = TypeVar("_SCT_uifc", bound="lnpt.Number") -_SCT_ui = TypeVar("_SCT_ui", bound="lnpt.Int", default=np.int_) -_SCT_f = TypeVar("_SCT_f", bound="lnpt.Float", default=np.float64) +_SCT_ui = TypeVar("_SCT_ui", bound=np.integer[Any], default=np.intp) +_SCT_f = TypeVar("_SCT_f", bound=np.floating[Any], default=np.float64) -_DT_f = TypeVar("_DT_f", bound=np.dtype["lnpt.Float"]) -_AT_f = TypeVar("_AT_f", bound="npt.NDArray[lnpt.Float] | lnpt.Float") +_DT_f = TypeVar("_DT_f", bound=np.dtype[np.floating[Any]]) +_AT_f = TypeVar("_AT_f", bound=np.floating[Any] | npt.NDArray[np.floating[Any]]) _SizeT = TypeVar("_SizeT", bound=int) -_ShapeT = TypeVar("_ShapeT", bound="onpt.AtLeast0D") +_ShapeT = TypeVar("_ShapeT", bound=tuple[int, ...]) _ShapeT1 = TypeVar("_ShapeT1", bound="onpt.AtLeast1D") _ShapeT2 = TypeVar("_ShapeT2", bound="onpt.AtLeast2D") @@ -261,18 +259,30 @@ def ordered( # noqa: C901 return x_kk +@overload +def clean_order(r: lmt.AnyOrder, /, name: str = "r", rmin: int = 0) -> int: ... +@overload +def clean_order( + r: lmt.AnyOrderND, + /, + name: str = "r", + rmin: int = 0, +) -> npt.NDArray[np.intp]: ... def clean_order( - r: lmt.AnyOrder, + r: lmt.AnyOrder | lmt.AnyOrderND, /, - name: LiteralString = "r", + name: str = "r", rmin: int = 0, -) -> int: +) -> int | npt.NDArray[np.intp]: """Validates and cleans an single (L-)moment order.""" - if (_r := int(r)) < rmin: - msg = f"expected {name} >= {rmin}, got {_r}" + if not isinstance(r, int | np.integer): + return clean_orders(r, name=name, rmin=rmin) + + if r < rmin: + msg = f"expected {name} >= {rmin}, got {r}" raise TypeError(msg) - return _r + return int(r) def clean_orders( @@ -280,8 +290,8 @@ def clean_orders( /, name: str = "r", rmin: int = 0, - dtype: _DType[_SCT_ui] = np.int_, -) -> onpt.Array[Any, _SCT_ui]: + dtype: _DType[_SCT_ui] = np.intp, +) -> npt.NDArray[_SCT_ui]: """Validates and cleans an array-like of (L-)moment orders.""" _r = np.asarray_chkfinite(r, dtype=dtype) diff --git a/lmo/contrib/scipy_stats.py b/lmo/contrib/scipy_stats.py index 2bdaedcb..a99c6e81 100644 --- a/lmo/contrib/scipy_stats.py +++ b/lmo/contrib/scipy_stats.py @@ -38,10 +38,11 @@ moments_to_ratio, round0, ) -from lmo.distributions._lm import get_lm_func, has_lm_func +from lmo.distributions._lm import get_lm_func, has_lm_func, prefers_ppf from lmo.theoretical import ( l_moment_cov_from_cdf, l_moment_from_cdf, + l_moment_from_ppf, l_moment_influence_from_cdf, l_ratio_influence_from_cdf, l_stats_cov_from_cdf, @@ -150,61 +151,6 @@ class l_rv_generic(PatchClass): ppf: _Fn1 std: Callable[..., float] - def _get_xxf( - self, - *args: Any, - loc: float = 0, - scale: float = 1, - ) -> _Tuple2[_Fn1]: - assert scale > 0 - - _cdf, _ppf = self._cdf, self._ppf - - def cdf(x: _T_x, /) -> _T_x: - return _cdf(np.array([(x - loc) / scale], dtype=float), *args)[0] - - def ppf(q: _T_x, /) -> _T_x: - return _ppf(np.array([q], dtype=float), *args)[0] * scale + loc - - return cdf, ppf - - def _l_moment( - self, - r: npt.NDArray[np.intp], - *args: Any, - trim: _Tuple2[int] | _Tuple2[float] = (0, 0), - quad_opts: lspt.QuadOptions | None = None, - ) -> _ArrF8: - """ - Population L-moments of the standard distribution (i.e. assuming - `loc=0` and `scale=1`). - - Todo: - - Sparse caching; key as `(self, args, r, trim)`, using a - priority queue. Prefer small `r` and `sum(trim)`, skip fractional - trim. - """ - name = self.name - if quad_opts is None and has_lm_func(name): - with contextlib.suppress(NotImplementedError): - return get_lm_func(name)(r, trim[0], trim[1], *args) - - cdf, ppf = self._get_xxf(*args) - - # TODO: use ppf when appropriate (e.g. genextreme, tukeylambda, kappa4) - with np.errstate(over="ignore", under="ignore"): - lmbda_r = l_moment_from_cdf( - cdf, - r, - trim=trim, - support=self._get_support(*args), - ppf=ppf, - quad_opts=quad_opts, - ) - - # re-wrap scalars in 0-d arrays (lmo.theoretical unpacks them) - return np.asarray(lmbda_r) - @np.errstate(divide="ignore") def _logqdf(self, u: _ArrF8, *args: Any) -> _ArrF8: """Overridable log quantile distribution function (QDF).""" @@ -337,7 +283,7 @@ def l_moment( return np.full(_r.shape, np.nan)[()] # L-moments of the standard distribution (loc=0, scale=scale0) - l0_r = self._l_moment(_r, *shapes, trim=_trim, quad_opts=quad_opts) + l0_r = _l_moment(self, _r, *shapes, trim=_trim, quad_opts=quad_opts) # shift (by loc) and scale shift_r = loc * (_r == 1) @@ -688,7 +634,7 @@ def l_moments_cov( self._parse_args(*args, **kwds), ) support = self._get_support(*args) - cdf, _ = self._get_xxf(*args) + cdf, _ = _get_xxf(self, *args) cov = l_moment_cov_from_cdf( cdf, @@ -799,7 +745,7 @@ def l_stats_cov( """ args, _, scale = self._parse_args(*args, **kwds) support = self._get_support(*args) - cdf, ppf = self._get_xxf(*args) + cdf, ppf = _get_xxf(self, *args) cov = l_stats_cov_from_cdf( cdf, @@ -889,7 +835,7 @@ def l_moment_influence( lm = self.l_moment(r, *args, trim=trim, quad_opts=quad_opts, **kwds) args, loc, scale = self._parse_args(*args, **kwds) - cdf = self._get_xxf(*args, loc=loc, scale=scale)[0] + cdf, _ = _get_xxf(self, *args, loc=loc, scale=scale) return l_moment_influence_from_cdf( cdf, @@ -983,7 +929,7 @@ def l_ratio_influence( ) args, loc, scale = self._parse_args(*args, **kwds) - cdf = self._get_xxf(*args, loc=loc, scale=scale)[0] + cdf = _get_xxf(self, *args, loc=loc, scale=scale)[0] return l_ratio_influence_from_cdf( cdf, @@ -1219,7 +1165,7 @@ def l_fit( r = np.arange(1, len(args0) + n_extra + 1) _lmo_cache: dict[tuple[float, ...], _ArrF8] = {} - _lmo_fn = self._l_moment + _lmo_fn = _l_moment # temporary cache to speed up L-moment calculations with the same # shape args @@ -1234,7 +1180,7 @@ def lmo_fn( if shapes in _lmo_cache: lmbda_r = _lmo_cache[shapes] else: - lmbda_r = _lmo_fn(_r, *shapes, **kwds) + lmbda_r = _lmo_fn(self, _r, *shapes, **kwds) lmbda_r.setflags(write=False) # prevent cache corruption _lmo_cache[shapes] = lmbda_r @@ -1480,6 +1426,66 @@ def l_ratio_influence( # noqa: D102 ) +def _l_moment( + self: l_rv_generic | rv_continuous, + r: npt.NDArray[np.intp], + /, + *args: Any, + trim: _Tuple2[int] | _Tuple2[float] = (0, 0), + quad_opts: lspt.QuadOptions | None = None, +) -> _ArrF8: + """ + Population L-moments of the standard distribution (i.e. `loc, scale = 0, 1`). + + Todo: + Sparse caching: Use a priority queue with key `(self, args, r, trim)`. + Prefer small `r` and small `sum(trim)`, skip fractional trim. + """ + name = self.name + if quad_opts is None and has_lm_func(name): + with contextlib.suppress(NotImplementedError): + return get_lm_func(name)(r, *trim, *args) + + cdf, ppf = _get_xxf(self, *args) + + if prefers_ppf(name): + lmbda_r = l_moment_from_ppf(ppf, r, trim=trim, quad_opts=quad_opts) + else: + a, b = self._get_support(*args) + with np.errstate(over="ignore", under="ignore"): + lmbda_r = l_moment_from_cdf( + cdf, + r, + trim=trim, + support=(float(a), float(b)), + ppf=ppf, + quad_opts=quad_opts, + ) + + # re-wrap scalars in 0-d arrays (lmo.theoretical unpacks them) + return np.asarray(lmbda_r) + + +def _get_xxf( + self: l_rv_generic | rv_continuous, + /, + *shape: float, + loc: float = 0, + scale: float = 1, +) -> _Tuple2[_Fn1]: + assert scale > 0 + + _cdf, _ppf = self._cdf, self._ppf + + def cdf(x: _T_x, /) -> _T_x: + return _cdf(np.array([(x - loc) / scale], dtype=np.float64), *shape)[0] + + def ppf(q: _T_x, /) -> _T_x: + return _ppf(np.array([q], dtype=np.float64), *shape)[0] * scale + loc + + return cdf, ppf + + def install() -> None: """ Add the public methods from diff --git a/lmo/distributions/__init__.py b/lmo/distributions/__init__.py index b9f8fe84..0294fc2f 100644 --- a/lmo/distributions/__init__.py +++ b/lmo/distributions/__init__.py @@ -77,7 +77,4 @@ [Distributions - GLD](../distributions.md#gld). """ else: - genlambda: Final = cast( - lspt.RVContinuous, - genlambda_gen(name="genlambda"), - ) + genlambda: Final = cast(lspt.RVContinuous, genlambda_gen(name="genlambda")) diff --git a/lmo/distributions/_genlambda.py b/lmo/distributions/_genlambda.py index 43b373b3..c7e2a59e 100644 --- a/lmo/distributions/_genlambda.py +++ b/lmo/distributions/_genlambda.py @@ -7,19 +7,17 @@ import functools import math import sys -from typing import TYPE_CHECKING, Final, TypeAlias, TypeVar, cast +from typing import TYPE_CHECKING, Final, TypeAlias, TypeVar import numpy as np import numpy.typing as npt import scipy.special as sps -from scipy.stats._distn_infrastructure import ( - _ShapeInfo, # noqa: PLC2701 # pyright: ignore[reportPrivateUsage] -) from scipy.stats.distributions import rv_continuous from lmo.special import harmonic from lmo.theoretical import entropy_from_qdf, l_moment_from_ppf from ._lm import get_lm_func +from ._utils import ShapeInfo if sys.version_info >= (3, 13): from typing import override @@ -64,7 +62,7 @@ def _genlambda_ppf0(q: float, b: float, d: float, f: float) -> float: @np.errstate(divide="ignore") def _genlambda_qdf(q: _XT, b: float, d: float, f: float) -> _XT: - return cast(_XT, (1 + f) * q ** (b - 1) + (1 - f) * (1 - q) ** (d - 1)) + return (1 + f) * q ** (b - 1) + (1 - f) * (1 - q) ** (d - 1) # pyright: ignore[reportReturnType] def _genlambda_cdf0( # noqa: C901 @@ -73,7 +71,7 @@ def _genlambda_cdf0( # noqa: C901 d: float, f: float, *, - ptol: float = 1e-4, + ptol: float = 1e-04, xtol: float = 1e-14, maxiter: int = 60, ) -> float: @@ -143,20 +141,14 @@ def _argcheck(self, /, b: _F8, d: _F8, f: _F8) -> np.bool_: # pyright: ignore[r return np.isfinite(b) & np.isfinite(d) & (f >= -1) & (f <= 1) @override - def _shape_info(self, /) -> list[_ShapeInfo]: - ibeta = _ShapeInfo("b", False, (-np.inf, np.inf), (False, False)) - idelta = _ShapeInfo("d", False, (-np.inf, np.inf), (False, False)) - iphi = _ShapeInfo("f", False, (-1, 1), (True, True)) + def _shape_info(self, /) -> list[ShapeInfo]: + ibeta = ShapeInfo("b", False, (-np.inf, np.inf), (False, False)) + idelta = ShapeInfo("d", False, (-np.inf, np.inf), (False, False)) + iphi = ShapeInfo("f", False, (-1, 1), (True, True)) return [ibeta, idelta, iphi] @override - def _get_support( - self, - /, - b: float, - d: float, - f: float, - ) -> tuple[float, float]: + def _get_support(self, /, b: float, d: float, f: float) -> tuple[float, float]: return _genlambda_support(b, d, f) @override @@ -167,14 +159,11 @@ def _fitstart( args: tuple[float, float, float] | None = None, ) -> tuple[float, float, float, float, float]: # Arbitrary, but the default f=1 is a bad start - return cast( - tuple[float, float, float, float, float], - super()._fitstart(data, args or (1.0, 1.0, 0.0)), - ) + return super()._fitstart(data, args or (1.0, 1.0, 0.0)) # pyright: ignore[reportReturnType] @override def _pdf(self, /, x: _XT, b: float, d: float, f: float) -> _XT: # pyright: ignore[reportIncompatibleMethodOverride] - return cast(_XT, 1 / self._qdf(self._cdf(x, b, d, f), b, d, f)) + return 1 / self._qdf(self._cdf(x, b, d, f), b, d, f) # pyright: ignore[reportReturnType] @override def _cdf(self, /, x: _XT, b: float, d: float, f: float) -> _XT: # pyright: ignore[reportIncompatibleMethodOverride] @@ -201,7 +190,7 @@ def _stats( a, c = 1 + f, 1 - f b1, d1 = 1 + b, 1 + d - m1 = 0 if b == d and f == 0 else cast(float, _genlambda_lm(1, 0, 0, b, d, f)) + m1: float = 0 if b == d and f == 0 else _genlambda_lm(1, 0, 0, b, d, f).item() if b <= -1 / 2 or d <= -1 / 2: return m1, math.nan, math.nan, math.nan @@ -256,14 +245,11 @@ def _l_moment( if quad_opts is not None: # only do numerical integration when quad_opts is passed - lmbda_r = cast( - float | _ArrF8, - l_moment_from_ppf( - functools.partial(self._ppf, b=b, d=d, f=f), - r, - trim=trim, - quad_opts=quad_opts, - ), + lmbda_r = l_moment_from_ppf( + functools.partial(self._ppf, b=b, d=d, f=f), + r, + trim=trim, + quad_opts=quad_opts, ) return np.asarray(lmbda_r) diff --git a/lmo/distributions/_kumaraswamy.py b/lmo/distributions/_kumaraswamy.py index 09e5367c..64f5fd3b 100644 --- a/lmo/distributions/_kumaraswamy.py +++ b/lmo/distributions/_kumaraswamy.py @@ -1,38 +1,29 @@ from __future__ import annotations -import functools import math import sys -from typing import TYPE_CHECKING, Any, TypeAlias, TypeVar, cast, final +from typing import TypeAlias, TypeVar, final import numpy as np import numpy.typing as npt import optype.numpy as onpt import scipy.special as sc -from scipy.stats._distn_infrastructure import ( - _ShapeInfo, # noqa: PLC2701 # pyright: ignore[reportPrivateUsage] -) -from scipy.stats.distributions import rv_continuous as _rv_continuous +from scipy.stats.distributions import rv_continuous from lmo.special import harmonic -from lmo.theoretical import l_moment_from_ppf from ._lm import get_lm_func +from ._utils import ShapeInfo if sys.version_info >= (3, 13): from typing import override else: from typing_extensions import override -if TYPE_CHECKING: - import lmo.typing.scipy as lspt - - __all__ = ("kumaraswamy_gen",) -_F8: TypeAlias = float | np.float64 _ArrF8: TypeAlias = onpt.Array[tuple[int, ...], np.float64] -_XT = TypeVar("_XT", _F8, _ArrF8) +_XT = TypeVar("_XT", float | np.float64, _ArrF8) _lm_kumaraswamy = get_lm_func("kumaraswamy") @@ -42,87 +33,53 @@ @final -class kumaraswamy_gen(_rv_continuous): +class kumaraswamy_gen(rv_continuous): @override - def _argcheck(self, /, a: _F8, b: _F8) -> bool | np.bool_: + def _argcheck(self, /, a: float, b: float) -> bool | np.bool_: return (a > 0) & (b > 0) @override - def _shape_info(self, /) -> list[_ShapeInfo]: - ia = _ShapeInfo("a", False, (0, np.inf), (False, False)) - ib = _ShapeInfo("b", False, (0, np.inf), (False, False)) + def _shape_info(self, /) -> list[ShapeInfo]: + ia = ShapeInfo("a", False, (0, np.inf), (False, False)) + ib = ShapeInfo("b", False, (0, np.inf), (False, False)) return [ia, ib] @override - def _get_support(self, /, a: _F8, b: _F8) -> tuple[_F8, _F8]: + def _get_support(self, /, a: float, b: float) -> tuple[float, float]: return 0.0, 1.0 @override def _pdf(self, /, x: _XT, a: float, b: float) -> _XT: - return cast(_XT, a * b * x ** (a - 1) * (1 - x**a) ** (b - 1)) + return a * b * x ** (a - 1) * (1 - x**a) ** (b - 1) # pyright: ignore[reportReturnType] @override def _logpdf(self, /, x: _XT, a: float, b: float) -> _XT: - return cast( - _XT, - np.log(a * b) + (a - 1) * np.log(x) + (b - 1) * np.log(1 - x**a), - ) + return np.log(a * b) + (a - 1) * np.log(x) + (b - 1) * np.log(1 - x**a) @override def _cdf(self, /, x: _XT, a: float, b: float) -> _XT: - return cast(_XT, 1 - (1 - x**a) ** b) + return 1 - (1 - x**a) ** b # pyright: ignore[reportReturnType] @override def _sf(self, /, x: _XT, a: float, b: float) -> _XT: - return cast(_XT, (1 - x**a) ** (b - 1)) + return (1 - x**a) ** (b - 1) # pyright: ignore[reportReturnType] @override def _isf(self, /, q: _XT, a: float, b: float) -> _XT: - return cast(_XT, (1 - q ** (1 / b)) ** (1 / a)) + return (1 - q ** (1 / b)) ** (1 / a) # pyright: ignore[reportReturnType] def _qdf(self, /, q: _XT, a: float, b: float) -> _XT: p = 1 - q - return cast( - _XT, - p ** (1 / (b - 1)) * (1 - p ** (1 / b)) ** (1 / (a - 1)) / (a * b), - ) + return p ** (1 / (b - 1)) * (1 - p ** (1 / b)) ** (1 / (a - 1)) / (a * b) # pyright: ignore[reportReturnType] @override def _ppf(self, /, q: _XT, a: float, b: float) -> _XT: - return cast(_XT, (1 - (1 - q) ** (1 / b)) ** (1 / a)) + return (1 - (1 - q) ** (1 / b)) ** (1 / a) # pyright: ignore[reportReturnType] def _entropy(self, a: float, b: float) -> float: # https://wikipedia.org/wiki/Kumaraswamy_distribution return (1 - 1 / b) + (1 - 1 / a) * harmonic(b) - math.log(a * b) @override - def _munp( - self, - /, - n: int | np.integer[Any] | npt.NDArray[np.integer[Any]], - a: float, - b: float, - ) -> _ArrF8: + def _munp(self, /, n: int | npt.NDArray[np.intp], a: float, b: float) -> _ArrF8: return b * sc.beta(1 + n / a, b) - - def _l_moment( - self, - r: npt.NDArray[np.int_], - a: float, - b: float, - *, - trim: tuple[int, int] | tuple[float, float], - quad_opts: lspt.QuadOptions | None = None, - ) -> _ArrF8: - s, t = trim - if quad_opts is not None or isinstance(trim[0], float): - out = l_moment_from_ppf( - functools.partial(self._ppf, a=a, b=b), - r, - trim=trim, - quad_opts=quad_opts, - ) - else: - out = _lm_kumaraswamy(r, s, t, a, b) - - return np.atleast_1d(out) diff --git a/lmo/distributions/_lm.py b/lmo/distributions/_lm.py index f82bf96e..073086ff 100644 --- a/lmo/distributions/_lm.py +++ b/lmo/distributions/_lm.py @@ -69,6 +69,31 @@ "genlambda", ] _LM_REGISTRY: Final[dict[DistributionName, _LmVFunc[...]]] = {} +_PPF_REGISTRY: Final[set[str]] = { + "arcsine", + "cauchy", + "halfcauchy", + "exponweib", + "exponpow", + "genlogistic", + "genhalflogistic", + "gompertz", + "gumbel_l", + "hypsecant", + "invweibull", + "kappa3", + "kappa4", + "laplace", + "laplace_asymmetric", + "pareto", + "truncpareto", + "lomax", + "powerlaw", + "rayleigh", + "tukeylambda", + "weibull_min", + "weibull_max", +} _ArrF8: TypeAlias = onpt.Array[tuple[int, ...], np.float64] @@ -125,10 +150,15 @@ def __call__( def register_lm_func( name: DistributionName, /, + *, + prefer_ppf: bool = True, ) -> Callable[[_LmFunc[_Tss]], _LmVFunc[_Tss]]: # TODO: vectorize decorator (only for `r`), with correct signature assert name not in _LM_REGISTRY + if prefer_ppf: + _PPF_REGISTRY.add(name) + def _wrapper(func: _LmFunc[_Tss], /) -> _LmVFunc[_Tss]: vfunc = np.vectorize(func, [float], excluded={1, 2}) _LM_REGISTRY[name] = vfunc @@ -149,7 +179,15 @@ def get_lm_func(name: DistributionName, /) -> _LmVFunc[...]: return _LM_REGISTRY[name] -@register_lm_func("uniform") +def prefers_ppf(name: str, /) -> bool: + """ + Whether the distribution with the given name prefers `ppf` over `cdf` for + numerical calculation of the L-moments. + """ + return name in _PPF_REGISTRY + + +@register_lm_func("uniform", prefer_ppf=False) def lm_uniform(r: int, s: float, t: float, /) -> float: """ Exact generalized* trimmed L-moments of the standard uniform distribution diff --git a/lmo/distributions/_nonparametric.py b/lmo/distributions/_nonparametric.py index 2a5801a0..050f3e5f 100644 --- a/lmo/distributions/_nonparametric.py +++ b/lmo/distributions/_nonparametric.py @@ -8,7 +8,6 @@ ClassVar, Final, Literal, - LiteralString, Protocol, Self, TypeAlias, @@ -33,11 +32,15 @@ if TYPE_CHECKING: from collections.abc import Callable - import optype.numpy as onpt + import optype as op +_F8: TypeAlias = np.float64 +_ArrF8: TypeAlias = npt.NDArray[_F8] +_VecF8: TypeAlias = np.ndarray[tuple[int], np.dtype[_F8]] + _T = TypeVar("_T") -_T_x = TypeVar("_T_x", float, npt.NDArray[np.float64]) +_T_x = TypeVar("_T_x", float, _ArrF8) class _Fn1(Protocol): @@ -68,10 +71,8 @@ def __call__(self, x: _T_x, /) -> _T_x: ... _Tuple4m: TypeAlias = tuple[()] | _Tuple1[_T] | _Tuple2[_T] | _Tuple3[_T] | _Tuple4[_T] -def _get_rng(seed: lnpt.Seed | None = None) -> np.random.Generator: - if isinstance(seed, np.random.Generator): - return seed - return np.random.default_rng(seed) +def _get_rng(s: lnpt.Seed | None = None, /) -> np.random.Generator: + return s if isinstance(s, np.random.Generator) else np.random.default_rng(s) class l_poly: # noqa: N801 @@ -79,15 +80,15 @@ class l_poly: # noqa: N801 Polynomial quantile distribution with (only) the given L-moments. """ - name: ClassVar[LiteralString] = "l_poly" + name: ClassVar[Literal["l_poly"]] = "l_poly" badvalue: ClassVar[float] = np.nan moment_type: ClassVar[_MomentType] = 1 - numargs: ClassVar[int] = 2 - shapes: ClassVar[LiteralString | None] = "lmbda, trim" + numargs: ClassVar[Literal[2]] = 2 + shapes: ClassVar[Literal["lmbda, trim"]] = "lmbda, trim" - _l_moments: Final[onpt.Array[tuple[int], np.float64]] + _l_moments: Final[_VecF8] _trim: Final[_Trim] - _support: Final[tuple[np.float64, np.float64]] + _support: Final[_Tuple2[_F8]] _cdf: Final[_Fn1] _ppf: Final[_Fn1] @@ -138,12 +139,12 @@ def __init__( super().__init__() @property - def a(self, /) -> np.float64: + def a(self, /) -> _F8: """Lower bound of the support.""" return self._support[0] @property - def b(self, /) -> np.float64: + def b(self, /) -> _F8: """Upper bound of the support.""" return self._support[1] @@ -153,11 +154,7 @@ def random_state(self, /) -> np.random.Generator: return self._random_state @random_state.setter - def random_state( - self, - seed: lnpt.Seed, # pyright: ignore[reportPropertyTypeMismatch] - /, - ) -> None: + def random_state(self, seed: lnpt.Seed, /) -> None: # pyright: ignore[reportPropertyTypeMismatch] self._random_state = _get_rng(seed) @classmethod @@ -221,20 +218,20 @@ def rvs( /, size: Literal[1] | None = None, random_state: lnpt.Seed | None = None, - ) -> np.float64: ... + ) -> _F8: ... @overload def rvs( self, /, size: int | tuple[int, ...], random_state: lnpt.Seed | None = None, - ) -> npt.NDArray[np.float64]: ... + ) -> _ArrF8: ... def rvs( self, /, size: int | tuple[int, ...] | None = None, random_state: lnpt.Seed | None = None, - ) -> np.float64 | npt.NDArray[np.float64]: + ) -> _F8 | _ArrF8: """ Draw random variates from the relevant distribution. @@ -493,7 +490,7 @@ def entropy(self, /) -> float: """ return self._entropy - def support(self, /) -> tuple[np.float64, np.float64]: + def support(self, /) -> tuple[_F8, _F8]: r""" The support \( (Q(0), Q(1)) \) of the distribution, where \( Q(p) \) is the [PPF][lmo.distributions.l_poly.ppf]. @@ -501,21 +498,14 @@ def support(self, /) -> tuple[np.float64, np.float64]: return self._support @overload - def interval(self, confidence: _AnyReal0D, /) -> tuple[np.float64, np.float64]: ... + def interval(self, confidence: _AnyReal0D, /) -> _Tuple2[_F8]: ... @overload - def interval( - self, - confidence: _AnyRealND, - /, - ) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]: ... + def interval(self, confidence: _AnyRealND, /) -> _Tuple2[_ArrF8]: ... def interval( self, confidence: _AnyReal0D | _AnyRealND, /, - ) -> ( - tuple[np.float64, np.float64] - | tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]] - ): + ) -> _Tuple2[_F8] | _Tuple2[_ArrF8]: r""" [Confidence interval](https://w.wiki/3kdb) with equal areas around the [median][lmo.distributions.l_poly.median]. @@ -545,7 +535,7 @@ def interval( return self._ppf((1 - alpha) / 2), self._ppf((1 + alpha) / 2) - def moment(self, n: int | np.integer[Any], /) -> float: + def moment(self, n: op.CanIndex, /) -> float: r""" Non-central product moment \( \E[X^n] \) of \( X \) of specified order \( n \). @@ -566,6 +556,7 @@ def moment(self, n: int | np.integer[Any], /) -> float: - For n>=2, attempt tot infer from `_l_moments` if the 2nd moment condition holds, using `diagnostics.l_moment_bounds`. """ + n = int(n) if n < 0: msg = f"expected n >= 0, got {n}" raise ValueError(msg) @@ -651,7 +642,7 @@ def expect(self, g: Callable[[float], float], /) -> float: ppf = self._ppf def i(u: float, /) -> float: - # the cast is safe, since `np.float64 <: float` (at runtime) + # the cast is safe, since `_F8 <: float` (at runtime) return g(ppf(u)) from scipy.integrate import quad @@ -660,25 +651,20 @@ def i(u: float, /) -> float: return quad(i, a, b)[0] + quad(i, b, c)[0] + quad(i, c, d)[0] @overload - def l_moment( - self, - r: lmt.AnyOrder, - /, - trim: lmt.AnyTrim | None = None, - ) -> np.float64: ... + def l_moment(self, r: lmt.AnyOrder, /, trim: lmt.AnyTrim | None = None) -> _F8: ... @overload def l_moment( self, r: lmt.AnyOrderND, /, trim: lmt.AnyTrim | None = None, - ) -> npt.NDArray[np.float64]: ... + ) -> _ArrF8: ... def l_moment( self, r: lmt.AnyOrder | lmt.AnyOrderND, /, trim: lmt.AnyTrim | None = None, - ) -> np.float64 | npt.NDArray[np.float64]: + ) -> _F8 | _ArrF8: r""" Evaluate the population L-moment(s) $\lambda^{(s,t)}_r$. @@ -700,7 +686,7 @@ def l_ratio( k: lmt.AnyOrder, /, trim: lmt.AnyTrim | None = None, - ) -> np.float64: ... + ) -> _F8: ... @overload def l_ratio( self, @@ -708,7 +694,7 @@ def l_ratio( k: lmt.AnyOrder | lmt.AnyOrderND, /, trim: lmt.AnyTrim | None = None, - ) -> npt.NDArray[np.float64]: ... + ) -> _ArrF8: ... @overload def l_ratio( self, @@ -716,14 +702,14 @@ def l_ratio( k: lmt.AnyOrderND, /, trim: lmt.AnyTrim | None = None, - ) -> npt.NDArray[np.float64]: ... + ) -> _ArrF8: ... def l_ratio( self, r: lmt.AnyOrder | lmt.AnyOrderND, k: lmt.AnyOrder | lmt.AnyOrderND, /, trim: lmt.AnyTrim | None = None, - ) -> np.float64 | npt.NDArray[np.float64]: + ) -> _F8 | _ArrF8: r""" Evaluate the population L-moment ratio('s) $\tau^{(s,t)}_{r,k}$. @@ -741,12 +727,7 @@ def l_ratio( lms = self.l_moment(rs, trim=trim) return moments_to_ratio(rs, lms) - def l_stats( - self, - /, - trim: lmt.AnyTrim | None = None, - moments: int = 4, - ) -> npt.NDArray[np.float64]: + def l_stats(self, /, trim: lmt.AnyTrim | None = None, moments: int = 4) -> _ArrF8: r""" Evaluate the L-moments (for $r \le 2$) and L-ratio's (for $r > 2$). @@ -811,7 +792,7 @@ def dist(self, /) -> type[Self]: return type(self) @property - def args(self, /) -> tuple[onpt.Array[tuple[int], np.float64], _Trim]: + def args(self, /) -> tuple[_VecF8, _Trim]: return self._l_moments, self._trim @property @@ -829,12 +810,7 @@ def freeze( return cls(lmbda, trim, **kwds) @classmethod - def nnlf( - cls, - /, - theta: _LPolyParams, - x: npt.NDArray[np.float64], - ) -> float | npt.NDArray[np.float64]: + def nnlf(cls, /, theta: _LPolyParams, x: _ArrF8) -> float | _ArrF8: """ Negative loglikelihood function. diff --git a/lmo/distributions/_utils.py b/lmo/distributions/_utils.py new file mode 100644 index 00000000..f35a0c08 --- /dev/null +++ b/lmo/distributions/_utils.py @@ -0,0 +1,5 @@ +from scipy.stats._distn_infrastructure import ( + _ShapeInfo as ShapeInfo, # noqa: PLC2701 # pyright: ignore[reportPrivateUsage] +) + +__all__ = ("ShapeInfo",) diff --git a/lmo/distributions/_wakeby.py b/lmo/distributions/_wakeby.py index 59f41540..cc2027f5 100644 --- a/lmo/distributions/_wakeby.py +++ b/lmo/distributions/_wakeby.py @@ -1,31 +1,23 @@ from __future__ import annotations -import functools import math import sys import warnings -from typing import TYPE_CHECKING, Final, TypeAlias, TypeVar, cast +from typing import Final, TypeAlias, TypeVar import numpy as np -import numpy.typing as npt import optype.numpy as onpt -import scipy.special as sc -from scipy.stats._distn_infrastructure import ( - _ShapeInfo, # noqa: PLC2701 # pyright: ignore[reportPrivateUsage] -) +import scipy.special as sps from scipy.stats.distributions import rv_continuous -from lmo.theoretical import l_moment_from_ppf from ._lm import get_lm_func +from ._utils import ShapeInfo if sys.version_info >= (3, 13): from typing import override else: from typing_extensions import override -if TYPE_CHECKING: - import lmo.typing.scipy as lspt - __all__ = ("wakeby_gen",) @@ -52,7 +44,7 @@ def _wakeby_ub(b: _F8, d: _F8, f: _F8) -> _F8: return _INF -def _wakeby_isf0(q: _F8, b: _F8, d: _F8, f: _F8) -> _F8: +def _wakeby_isf0(q: _F8, /, b: _F8, d: _F8, f: _F8) -> _F8: """Inverse survival function, does not validate params.""" if q <= 0: return _wakeby_ub(b, d, f) @@ -79,15 +71,15 @@ def _wakeby_isf0(q: _F8, b: _F8, d: _F8, f: _F8) -> _F8: _wakeby_isf = np.vectorize(_wakeby_isf0, [float]) -def _wakeby_qdf(p: _XT, b: _F8, d: _F8, f: _F8) -> _XT: +def _wakeby_qdf(p: _XT, /, b: _F8, d: _F8, f: _F8) -> _XT: """Quantile density function (QDF), the derivative of the PPF.""" q = 1 - p lhs = f * q ** (b - 1) rhs = (1 - f) / q ** (d + 1) - return cast(_XT, lhs + rhs) + return lhs + rhs # pyright: ignore[reportReturnType] -def _wakeby_sf0(x: _F8, b: _F8, d: _F8, f: _F8) -> _F8: # noqa: C901 +def _wakeby_sf0(x: _F8, /, b: _F8, d: _F8, f: _F8) -> _F8: # noqa: C901 """ Numerical approximation of Wakeby's survival function. @@ -122,7 +114,7 @@ def _wakeby_sf0(x: _F8, b: _F8, d: _F8, f: _F8) -> _F8: # noqa: C901 # https://wikipedia.org/wiki/Lambert_W_function # it's easy to show that this is valid for all x, f, and d w = (1 - f) / f - return float((w / sc.lambertw(w * math.exp((1 + d * x) / f - 1))) ** (1 / d)) + return float((w / sps.lambertw(w * math.exp((1 + d * x) / f - 1))) ** (1 / d)) z: _F8 if x < _wakeby_isf0(0.9, b, d, f): @@ -197,10 +189,10 @@ def _argcheck(self, /, b: _F8, d: _F8, f: _F8) -> bool | np.bool_: # pyright: i ) @override - def _shape_info(self) -> list[_ShapeInfo]: - ibeta = _ShapeInfo("b", False, (-np.inf, np.inf), (False, False)) - idelta = _ShapeInfo("d", False, (-np.inf, np.inf), (False, False)) - iphi = _ShapeInfo("f", False, (0, 1), (True, True)) + def _shape_info(self) -> list[ShapeInfo]: + ibeta = ShapeInfo("b", False, (-np.inf, np.inf), (False, False)) + idelta = ShapeInfo("d", False, (-np.inf, np.inf), (False, False)) + iphi = ShapeInfo("f", False, (0, 1), (True, True)) return [ibeta, idelta, iphi] @override @@ -218,10 +210,7 @@ def _fitstart( args: tuple[float, float, float] | None = None, ) -> tuple[float, float, float, float, float]: # Arbitrary, but the default f=1 is a bad start - return cast( - tuple[float, float, float, float, float], - super()._fitstart(data, args or (1.0, 1.0, 0.5)), - ) + return super()._fitstart(data, args or (1.0, 1.0, 0.5)) # pyright: ignore[reportReturnType] @override def _pdf(self, /, x: _XT, b: float, d: float, f: float) -> _XT: # pyright: ignore[reportIncompatibleMethodOverride] @@ -246,6 +235,7 @@ def _isf(self, /, q: _XT, b: float, d: float, f: float) -> _XT: # pyright: igno @override def _stats( self, + /, b: float, d: float, f: float, @@ -275,7 +265,7 @@ def _stats( return m1, m2, m3, m4 - def _entropy(self, b: float, d: float, f: float) -> float: + def _entropy(self, /, b: float, d: float, f: float) -> float: """ Entropy can be calculated from the QDF (PPF derivative) as the Integrate[Log[QDF[u]], {u, 0, 1}]. This is the (semi) closed-form @@ -294,30 +284,4 @@ def _entropy(self, b: float, d: float, f: float) -> float: assert bd > 0 ibd = 1 / bd - return 1 - b + bd * float(sc.hyp2f1(1, ibd, 1 + ibd, f / (f - 1))) - - def _l_moment( - self, - r: npt.NDArray[np.int64], - b: float, - d: float, - f: float, - *, - trim: tuple[int, int] | tuple[float, float], - quad_opts: lspt.QuadOptions | None = None, - ) -> _ArrF8: - s, t = trim - - if quad_opts is not None: - # only do numerical integration when quad_opts is passed - lmbda_r = l_moment_from_ppf( - functools.partial(self._ppf, b=b, d=d, f=f), - r, - trim=trim, - quad_opts=quad_opts, - ) - out = lmbda_r - else: - out = _wakeby_lm(r, s, t, b, d, f) - - return np.atleast_1d(out) + return 1 - b + bd * float(sps.hyp2f1(1, ibd, 1 + ibd, f / (f - 1))) diff --git a/lmo/typing/__init__.py b/lmo/typing/__init__.py index 3b891cd0..d73693b3 100644 --- a/lmo/typing/__init__.py +++ b/lmo/typing/__init__.py @@ -3,11 +3,13 @@ from __future__ import annotations import sys -from typing import TYPE_CHECKING, Any, TypeAlias +from typing import TYPE_CHECKING, Any, Protocol, TypeAlias +# pyright: reportPrivateUsage=false import numpy as np +import numpy.typing as npt import optype.numpy as onpt -from optype import CanSequence +from numpy._typing import _NestedSequence # noqa: PLC2701 if sys.version_info >= (3, 13): from typing import TypedDict @@ -24,6 +26,7 @@ "AnyOrder", "AnyOrderND", "AnyTrim", + "AnyTrimFloat", "AnyTrimInt", "LComomentOptions", "LMomentOptions", @@ -38,11 +41,14 @@ def __dir__() -> list[str]: AnyTrimFloat: TypeAlias = float | tuple[float, float] AnyTrim: TypeAlias = AnyTrimInt | AnyTrimFloat + +class _CanIntegerArray(Protocol): + def __len__(self, /) -> int: ... # this excludes scalar types + def __array__(self, /) -> npt.NDArray[np.integer[Any]]: ... + + AnyOrder: TypeAlias = int | np.integer[Any] -AnyOrderND: TypeAlias = ( - CanSequence[int, int, int] - | onpt.CanArray[tuple[int, ...], np.dtype[np.integer[Any]]] -) +AnyOrderND: TypeAlias = _CanIntegerArray | _NestedSequence[int | np.integer[Any]] AnyFWeights: TypeAlias = onpt.Array[tuple[int], np.integer[Any]] AnyAWeights: TypeAlias = onpt.Array[onpt.AtLeast1D, np.floating[Any]]