# SPDX-License-Identifier: GPL-3.0-or-later
"""Top-level ``lmrob`` entry points.
End-to-end pipeline: formula -> design matrix -> fast-S init -> MM
iteration -> covariance -> :class:`pylmrob.results.LmRobResults`.
"""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import replace
from typing import TYPE_CHECKING, Any
import numpy as np
from pylmrob import psi as _psi
from pylmrob._fast_s import FastSConfig, fast_s
from pylmrob._mm import mm_iterate
from pylmrob.control import Control
from pylmrob.d_scale import d_scale
from pylmrob.formula import model_matrix
from pylmrob.inference import vcov_avar1, vcov_w
from pylmrob.ms_estimator import m_s_fit
from pylmrob.results import LmRobResults
if TYPE_CHECKING:
import pandas as pd
def _to_chi_psi_family(psi_family: str) -> str:
"""The chi family used in S iteration is the same as the user's psi family."""
return psi_family
# Cache Cython kernel handles at module load. Avoids per-call importlib
# overhead. Each handle is ``None`` if the compiled module is unavailable.
# ``Callable[..., Any]`` is loose on purpose: the Cython signatures are
# more complex than what's worth replicating in Python type hints, and
# the actual checking happens at call time.
_CyFn = Callable[..., Any]
def _load_cy() -> tuple[_CyFn | None, _CyFn | None, _CyFn | None, _CyFn | None]:
try:
import importlib
mod = importlib.import_module("pylmrob._core._lmrob")
return (
getattr(mod, "cy_lmrob_fit", None),
getattr(mod, "cy_lmrob_d_scale", None),
getattr(mod, "cy_lmrob_vcov_avar1", None),
getattr(mod, "cy_lmrob_mm", None),
)
except Exception:
return None, None, None, None
_CY_LMROB_FIT, _CY_LMROB_D_SCALE, _CY_LMROB_VCOV, _CY_LMROB_MM = _load_cy()
# Map family names to the integer enum used by the Cython kernel.
_CY_FAMILY_IDS = {
"bisquare": 0,
"biweight": 0,
"hampel": 1,
"optimal": 2,
"lqq": 3,
"ggw": 4,
}
[docs]
def lmrob(
formula: str,
data: pd.DataFrame,
control: Control | None = None,
weights: np.ndarray | None = None,
na_action: str = "drop",
seed: int | np.random.Generator | None = None,
**kwargs: Any,
) -> LmRobResults:
"""Fit a robust MM linear regression.
Parameters
----------
formula :
R-style formula, e.g. ``"y ~ x1 + x2 + x3"``. Parsed by
:mod:`formulaic`.
data :
DataFrame containing the columns referenced by ``formula``.
control :
Algorithm parameters; defaults to ``Control()`` (KS2014 preset,
``engine_c=True``).
weights :
Optional non-negative per-case weights (length ``len(data)``).
Implemented via the ``sqrt(w)``-transform that R's lmrob uses:
the transformed design ``(sqrt(w)*X, sqrt(w)*y)`` goes through
the unweighted fit. Zero-weight rows are dropped. Compatible
with both the default Cython engine and the legacy NumPy path
(the transform is applied before any path dispatch, so the
Cython kernel never needs to know about weights itself).
na_action :
``"drop"`` (default) drops rows with any NA before fitting.
seed :
Seed for the resampling RNG.
Returns
-------
LmRobResults
"""
if kwargs:
raise TypeError(f"unexpected keyword arguments: {sorted(kwargs)!r}")
if control is None:
control = Control()
if weights is not None:
weights = np.asarray(weights, dtype=np.float64).ravel()
if not np.isfinite(weights).all() or (weights < 0).any():
raise ValueError("weights must be non-negative and finite")
if weights.size != len(data):
raise ValueError(f"weights length {weights.size} != number of rows {len(data)}")
# Drop zero-weight rows. R does the same: zero-weight observations
# contribute nothing to the M-scale or IRWLS.
keep = weights > 0
if not keep.all():
data = data.loc[keep].reset_index(drop=True)
weights = weights[keep]
# Non-trivial weights flow through the engine_c block too:
# ``_lmrob_impl`` applies the sqrt(w)-transform to (X, y) before
# any path dispatch, so the Cython kernel sees a transformed
# design and never needs to know about weights itself. One
# exception: ``cov=".vcov.w"`` with non-trivial weights on small
# n can land the engine_c basin in a corner where every
# transformed residual is past the psi-redescending cutoff,
# making ``mean(psi'(r/s)) = 0`` and dividing by zero downstream.
# Force the NumPy path for that combo.
if not np.allclose(weights, 1.0) and control.cov == ".vcov.w":
control = replace(control, engine_c=False)
try:
return _lmrob_impl(formula, data, control, na_action, seed, weights)
except FloatingPointError as exc:
# ``engine_c=True`` uses an internal Floyd subset-draw which is
# not byte-identical to ``np.random.choice``. On a few small
# classical datasets that puts fast-S in a basin where the
# final ``X' W X`` is singular for ``vcov_avar1``. Retry once
# with engine_c off (numpy choice -- different basin) so the
# default path stays robust on those datasets.
if not control.engine_c or "singular" not in str(exc):
raise
return _lmrob_impl(
formula, data, replace(control, engine_c=False), na_action, seed, weights
)
def _lmrob_impl(
formula: str,
data: pd.DataFrame,
control: Control,
na_action: str,
seed: int | np.random.Generator | None,
case_weights: np.ndarray | None = None,
) -> LmRobResults:
# Control.__post_init__ guarantees these are populated; narrow for the
# type checker.
assert control.psi is not None
assert control.method is not None
assert control.cov is not None
psi_family: str = control.psi
cov_kind: str = control.cov
# ------------------------------------------------------------------
# Design matrix
# ------------------------------------------------------------------
if na_action == "drop":
# ``dropna`` always allocates a new DataFrame even when no rows
# are dropped. Check for NAs first; the check is much cheaper than
# the copy for the typical NA-free path.
if data.isna().any(axis=None):
data = data.dropna()
elif na_action == "raise":
if data.isna().any().any():
raise ValueError("data contains NA values; pass na_action='drop' to skip them")
else:
raise ValueError(f"unknown na_action: {na_action!r}")
design = model_matrix(formula, data)
y, X, term_names = design.y, design.X, design.term_names
n, p = X.shape
if n <= p:
raise ValueError(f"need n > p; got n={n}, p={p}")
# Per-case weights: R's lmrob handles them with a sqrt(w) transform
# at the top level (robustbase/R/lmrob.R:96-98). The transformed
# design (X_t, y_t) goes through the unweighted fit. After fitting,
# ``residuals_`` / ``fitted_`` are reported on the original scale
# (y - X @ coef), while ``rweights_`` / ``scale_`` / ``cov_`` come
# from the transformed fit (matching R's reporting).
X_orig: np.ndarray | None = None
y_orig: np.ndarray | None = None
if case_weights is not None and not np.allclose(case_weights, 1.0):
X_orig = X
y_orig = y
sqrt_w = np.sqrt(case_weights)
X = sqrt_w[:, None] * X
y = sqrt_w * y
# ------------------------------------------------------------------
# Pick init method. ``init="auto"`` chooses M-S when factor columns
# exist and S otherwise. Anything else honors the explicit user request.
# ------------------------------------------------------------------
init_method = control.init
if init_method == "auto":
init_method = "M-S" if design.is_factor_col.any() else "S"
s_seed = seed if seed is not None else control.seed
k_chi_tuple = tuple(np.atleast_1d(np.asarray(control.tuning_chi, dtype=float)).ravel())
psi_k_eff = tuple(np.atleast_1d(np.asarray(control.tuning_psi, dtype=float)).ravel())
# ------------------------------------------------------------------
# Monolithic Cython engine: fast-S + MM in one nogil call. Sets the
# ``_engine_c_done`` flag so the per-step Python blocks below skip
# themselves.
# ------------------------------------------------------------------
_engine_c_done = False
# Predeclare variables that are populated in one of the engine_c
# / S / M-S branches below; pyright can't see that the branches
# are exhaustive for the cases that reach them.
beta_init = np.empty(p, dtype=np.float64)
sigma_init = 0.0
init_info: dict[str, object] = {}
_engine_mm: Any = None
_engine_residuals: np.ndarray | None = None
_engine_rweights: np.ndarray | None = None
_engine_cov_buf: np.ndarray | None = None
# engine_c now parallelises internally via OpenMP prange on the
# resample loop, so the previous ``n*p^2 >= 100k`` auto-fallback to
# the threaded NumPy path is no longer needed. ``control.n_workers``
# is honoured directly by the monolithic kernel.
_engine_c_too_big = False
_eff_n_workers = control.n_workers
if (
control.engine_c
and init_method == "S"
and psi_family in _CY_FAMILY_IDS
and _CY_LMROB_FIT is not None
):
_tuning_chi = np.zeros(3, dtype=np.float64)
_tuning_psi = np.zeros(3, dtype=np.float64)
# Tuples are short (1-3 entries); a manual loop avoids the cost of
# np.asarray + ravel that ``np.atleast_1d`` would incur.
for _i in range(min(3, len(k_chi_tuple))):
_tuning_chi[_i] = k_chi_tuple[_i]
for _i in range(min(3, len(psi_k_eff))):
_tuning_psi[_i] = psi_k_eff[_i]
# One ``np.empty`` for all output buffers instead of 4-5.
# ``np.empty`` is ~10 µs each at small sizes; bundling saves
# ~40 µs/fit.
_want_inline_vcov_alloc = cov_kind == ".vcov.avar1"
_cov_size = p * p if _want_inline_vcov_alloc else 0
_big = np.empty(2 * p + 2 * n + _cov_size, dtype=np.float64)
beta_out = _big[:p]
residuals_out = _big[p : p + n]
rweights_out = _big[p + n : p + 2 * n]
beta_init_out = _big[p + 2 * n : 2 * p + 2 * n]
from pylmrob._fast_s import make_generator as _make_gen
rng_e = _make_gen(s_seed, kind=control.rng)
# X and y are already float64 C-contiguous coming from the design
# builder; skip np.ascontiguousarray if so to avoid a Python call.
X_c = (
X
if (X.dtype == np.float64 and X.flags.c_contiguous)
else np.ascontiguousarray(X, dtype=np.float64)
)
y_c = (
y
if (y.dtype == np.float64 and y.flags.c_contiguous)
else np.ascontiguousarray(y, dtype=np.float64)
)
# When we'll also need vcov_avar1, ask the kernel to compute it
# inline (saves a Python call + a separate np.zeros + a re-pass
# through the LAPACK setup). Only useful for cov=".vcov.avar1"
# which is the default MM path.
# Cov buffer is the tail of ``_big`` (zero-initialized later if
# the kernel actually writes to it). ``_big`` is uninitialized
# which is fine: the Cython kernel writes the full p*p block.
_want_inline_vcov = cov_kind == ".vcov.avar1"
_cov_buf = _big[2 * p + 2 * n :].reshape((p, p)) if _want_inline_vcov else None
# Resolve effective worker count and pre-spawn per-thread bitgens
# for the engine_c parallel resample loop. The resolution mirrors
# ``_fast_s._resolve_n_workers``.
from pylmrob._fast_s import _resolve_n_workers as _resolve_nw
from pylmrob._fast_s import _to_seed_sequence
_ec_workers = _resolve_nw(_eff_n_workers, control.nResample)
if _ec_workers > 1:
_ec_caps = [
_make_gen(_s, kind=control.rng).bit_generator.capsule
for _s in _to_seed_sequence(s_seed).spawn(_ec_workers)
]
else:
_ec_caps = None
scale_e, status_e, n_iter_s, _conv_s, n_iter_mm, conv_mm, _vcov_status = _CY_LMROB_FIT(
X_c,
y_c,
rng_e.bit_generator.capsule,
_CY_FAMILY_IDS[psi_family],
_tuning_chi,
_tuning_psi,
control.bb,
control.nResample,
control.mts,
control.k_fast_s,
control.best_r_s,
control.max_it,
control.refine_tol,
control.max_it,
control.rel_tol,
control.k_max,
control.scale_tol,
beta_out,
residuals_out,
rweights_out,
beta_init_out,
_cov_buf,
_tuning_chi,
control.bb,
bitgen_capsules=_ec_caps,
n_workers=_ec_workers,
)
if status_e == 1:
raise RuntimeError("lmrob: no non-singular subsamples found")
beta_init = beta_init_out
sigma_init = float(scale_e)
init_info = {
"coef": beta_out,
"scale": float(scale_e),
"n_iter": int(n_iter_s),
"method": "S",
}
from types import SimpleNamespace
_engine_mm = SimpleNamespace(coef=beta_out, converged=bool(conv_mm), n_iter=int(n_iter_mm))
_engine_c_done = True
_engine_residuals = residuals_out
_engine_rweights = rweights_out
# Stash the inline-computed vcov (if any) so the vcov block below
# can skip its own call to cy_lmrob_vcov_avar1.
_engine_cov_buf = _cov_buf if (_cov_buf is not None and _vcov_status == 0) else None
if _engine_c_done:
pass # cy_lmrob_fit already populated beta_init / sigma_init / init_info
elif init_method == "S":
# ------------------------------------------------------------------
# Initial S estimate via fast-S resampling
# ------------------------------------------------------------------
cfg = FastSConfig(
psi_chi=psi_family,
k_chi=k_chi_tuple,
b0=control.bb,
nResample=control.nResample,
k_fast_s=control.k_fast_s,
best_r=control.best_r_s,
max_it=control.max_it,
refine_tol=control.refine_tol,
scale_tol=control.scale_tol,
max_iter_scale=control.k_max,
mts=control.mts,
n_workers=_eff_n_workers,
fast_rng=control.fast_rng,
# If we already decided ``engine_c`` is too expensive for
# this problem size (n*p^2 >= 100k), don't let ``fast_s``
# take its own engine_c branch either; that branch is a
# single Cython call and disables threading.
engine_c=control.engine_c and not _engine_c_too_big,
rng=control.rng,
subsampling=control.subsampling,
)
s_result = fast_s(X, y, cfg=cfg, seed=s_seed)
beta_init = s_result.coef
sigma_init = s_result.scale
init_info: dict[str, object] = {
"coef": s_result.coef.copy(),
"scale": s_result.scale,
"n_iter": s_result.n_iter,
"method": "S",
}
elif init_method == "M-S":
if not design.is_factor_col.any():
# No factor cols; M-S degenerates to S.
cfg = FastSConfig(
psi_chi=psi_family,
k_chi=k_chi_tuple,
b0=control.bb,
nResample=control.nResample,
k_fast_s=control.k_fast_s,
best_r=control.best_r_s,
max_it=control.max_it,
refine_tol=control.refine_tol,
scale_tol=control.scale_tol,
max_iter_scale=control.k_max,
mts=control.mts,
rng=control.rng,
subsampling=control.subsampling,
)
s_result = fast_s(X, y, cfg=cfg, seed=s_seed)
beta_init = s_result.coef
sigma_init = s_result.scale
init_info = {
"coef": s_result.coef.copy(),
"scale": s_result.scale,
"n_iter": s_result.n_iter,
"method": "S",
}
else:
cat_cols = design.is_factor_col
X_cat = X[:, cat_cols]
X_cont = X[:, ~cat_cols]
ms_result = m_s_fit(
X_cat=X_cat,
X_cont=X_cont,
y=y,
psi_chi=psi_family,
k_chi=k_chi_tuple,
b0=control.bb,
k_m_s=control.k_m_s,
nResample=control.nResample,
max_it=control.max_it,
rel_tol=control.rel_tol,
seed=s_seed,
)
# Re-stitch into the original column order.
beta_init = np.empty(p, dtype=np.float64)
beta_init[cat_cols] = ms_result.coef_cat
beta_init[~cat_cols] = ms_result.coef_cont
sigma_init = ms_result.scale
init_info = {
"coef": beta_init.copy(),
"scale": ms_result.scale,
"n_iter": ms_result.n_iter,
"method": "M-S",
}
elif init_method == "L1":
# L1 (least absolute deviation) initial estimate. High-breakdown
# but lower Gaussian efficiency than S; useful when fast-S
# resampling is unstable. R's ``lmrob(init="L1")``.
from pylmrob._l1 import l1_fit
l1_result = l1_fit(X, y)
beta_init = l1_result.coef
sigma_init = l1_result.scale
init_info = {
"coef": l1_result.coef.copy(),
"scale": l1_result.scale,
"n_iter": 0,
"method": "L1",
}
else:
raise NotImplementedError(f"init={init_method!r} not implemented")
# ------------------------------------------------------------------
# MM step holding sigma fixed (skipped if the monolithic engine
# already produced post-MM beta + residuals + rweights).
# ------------------------------------------------------------------
if _engine_c_done:
# The engine_c block populated these; the asserts narrow the
# ``| None`` predeclared types for the static checker.
assert _engine_residuals is not None
assert _engine_rweights is not None
mm = _engine_mm
coef = mm.coef
sigma = sigma_init
residuals = _engine_residuals
fitted = y - residuals
rweights = _engine_rweights
else:
# Cython MM fast path when engine_c was requested (the user has
# signalled they want speed; engine_c block above ran or fell
# back, but in either case we should use the Cython MM here too).
# rng="R" also routes through this path because the Cython MM
# uses LAPACK dgels (same QR-based solver as robustbase's rwls),
# which matches R's IRWLS step bit-for-bit. The pure-Python
# mm_iterate uses np.linalg.lstsq (gelsd, SVD-based) and
# diverges in the last few ULPs.
_use_cy_mm = control.engine_c or control.rng == "R"
if _use_cy_mm and _CY_LMROB_MM is not None and psi_family in _CY_FAMILY_IDS:
_tuning_psi_mm = np.zeros(3, dtype=np.float64)
for _i in range(min(3, len(psi_k_eff))):
_tuning_psi_mm[_i] = float(psi_k_eff[_i])
beta_mm = np.ascontiguousarray(beta_init, dtype=np.float64).copy()
n_iter_mm, converged_mm, _mm_status = _CY_LMROB_MM(
np.ascontiguousarray(X, dtype=np.float64),
np.ascontiguousarray(y, dtype=np.float64),
beta_mm,
float(sigma_init),
_CY_FAMILY_IDS[psi_family],
_tuning_psi_mm,
control.max_it,
control.rel_tol,
)
from types import SimpleNamespace
mm = SimpleNamespace(coef=beta_mm, converged=bool(converged_mm), n_iter=int(n_iter_mm))
else:
mm = mm_iterate(
X=X,
y=y,
beta_init=beta_init,
sigma=sigma_init,
psi_family=psi_family,
psi_k=psi_k_eff,
max_it=control.max_it,
rel_tol=control.rel_tol,
)
coef = mm.coef
sigma = sigma_init
residuals = y - X @ coef
fitted = X @ coef
z = residuals / sigma if sigma != 0 else residuals
rweights = _psi.wgt(z, psi_family, psi_k_eff)
# ------------------------------------------------------------------
# D-step (used by methods SMD and SMDM): refines scale via design-
# adaptive weighting. SMDM also re-fits MM with the new scale.
# ``tau_vec`` is also stashed for ``vcov_w(corrfact='tau' | 'hybrid' | 'tauold')``.
# ------------------------------------------------------------------
method = control.method or "MM"
tau_vec: np.ndarray | None = None
if "D" in method:
sigma_d = None
d_converged = False
# Cython D-step when engine_c is on and tabulated coefficients exist.
if control.engine_c and _CY_LMROB_D_SCALE is not None and psi_family in _CY_FAMILY_IDS:
_cy_d = _CY_LMROB_D_SCALE
if True:
_tau_buf = np.empty(n, dtype=np.float64)
_tuning_psi = np.zeros(3, dtype=np.float64)
for _i, _v in enumerate(
tuple(np.atleast_1d(np.asarray(control.tuning_psi, dtype=float)).ravel())[:3]
):
_tuning_psi[_i] = float(_v)
_sigma_d, _d_conv, _d_status = _cy_d(
np.ascontiguousarray(X, dtype=np.float64),
np.ascontiguousarray(residuals, dtype=np.float64),
np.ascontiguousarray(rweights, dtype=np.float64),
float(sigma),
_CY_FAMILY_IDS[psi_family],
_tuning_psi,
control.k_max,
control.rel_tol,
_tau_buf,
)
if _d_status == 0:
sigma_d = float(_sigma_d)
d_converged = bool(_d_conv)
tau_vec = _tau_buf
if sigma_d is None:
sigma_d, d_converged, tau_vec, _h = d_scale(
X=X,
residuals=residuals,
rweights=rweights,
init_scale=sigma,
family=psi_family,
c_psi=psi_k_eff,
max_iter=control.k_max,
tol=control.rel_tol,
)
if d_converged and sigma_d > 0:
sigma = sigma_d
init_info["d_scale"] = sigma_d
init_info["d_converged"] = True
# The trailing M in SMDM: re-fit MM with the new D-scale.
if method.endswith("M"):
mm2 = mm_iterate(
X=X,
y=y,
beta_init=coef,
sigma=sigma,
psi_family=psi_family,
psi_k=psi_k_eff,
max_it=control.max_it,
rel_tol=control.rel_tol,
)
coef = mm2.coef
residuals = y - X @ coef
fitted = X @ coef
z = residuals / sigma if sigma != 0 else residuals
rweights = _psi.wgt(z, psi_family, psi_k_eff)
# ------------------------------------------------------------------
# Covariance
# ------------------------------------------------------------------
init_residuals = y - X @ beta_init
cov = None
if cov_kind == ".vcov.avar1":
# If the monolithic engine already produced the cov inline,
# use it (Cython already posdefified).
if _engine_c_done and _engine_cov_buf is not None:
cov = _engine_cov_buf
# Otherwise Cython vcov fast path when engine_c is on. The
# matrix products now go through BLAS dgemm so the kernel is
# competitive with NumPy at all sizes; the previous large-n
# gate is no longer needed.
elif control.engine_c and _CY_LMROB_VCOV is not None and psi_family in _CY_FAMILY_IDS:
_cy_vcov = _CY_LMROB_VCOV
_tuning_psi = np.zeros(3, dtype=np.float64)
_tuning_chi = np.zeros(3, dtype=np.float64)
for _i, _v in enumerate(psi_k_eff[:3]):
_tuning_psi[_i] = float(_v)
for _i, _v in enumerate(k_chi_tuple[:3]):
_tuning_chi[_i] = float(_v)
cov_buf = np.zeros((p, p), dtype=np.float64)
_vcov_status = _cy_vcov(
np.ascontiguousarray(X, dtype=np.float64),
np.ascontiguousarray(residuals, dtype=np.float64),
np.ascontiguousarray(init_residuals, dtype=np.float64),
float(sigma),
_CY_FAMILY_IDS[psi_family],
_tuning_psi,
_tuning_chi,
control.bb,
cov_buf,
)
if _vcov_status == 0:
# Cython already posdefified.
cov = cov_buf
if cov is None:
cov = vcov_avar1(
X=X,
residuals=residuals,
sigma=sigma,
psi_family=psi_family,
psi_k=psi_k_eff,
init_residuals=init_residuals,
chi_family=psi_family,
chi_k=k_chi_tuple,
bb=control.bb,
)
elif cov_kind == ".vcov.w":
# Pre-compute tau if it isn't already populated by the D-step.
# vcov_w with corrfact="tau"/"hybrid"/"tauold" needs it.
if tau_vec is None:
from pylmrob.d_scale import tau as _compute_tau
sw = np.sqrt(np.maximum(rweights, 0.0))
Xw = X * sw[:, None]
Q, _ = np.linalg.qr(Xw, mode="reduced")
h = np.minimum(1.0, np.sum(Q * Q, axis=1))
tau_vec = _compute_tau(h, psi_family, psi_k_eff)
s_init_scale_raw = init_info.get("scale", sigma)
s_init_scale = (
float(s_init_scale_raw) if isinstance(s_init_scale_raw, (int, float)) else float(sigma)
) # type: ignore[arg-type]
cov = vcov_w(
X=X,
residuals=residuals,
sigma=sigma,
psi_family=psi_family,
psi_k=psi_k_eff,
rweights=rweights,
init_residuals=init_residuals,
init_scale=s_init_scale,
chi_k=k_chi_tuple,
method=method,
tau=tau_vec,
)
else:
# Fallback: legacy / unknown
cov = vcov_avar1(
X=X,
residuals=residuals,
sigma=sigma,
psi_family=psi_family,
psi_k=psi_k_eff,
)
# When weights were applied via sqrt-transform, R reports residuals
# and fitted on the *original* scale (y - X @ coef), but rweights /
# scale / cov come from the transformed fit. The design returned to
# the user is also the original X / y.
if X_orig is not None and y_orig is not None:
residuals_out = y_orig - X_orig @ coef
fitted_out = X_orig @ coef
design_x_out = X_orig
design_y_out = y_orig
else:
residuals_out = residuals
fitted_out = fitted
design_x_out = X
design_y_out = y
return LmRobResults(
coef_=np.asarray(coef, dtype=np.float64),
scale_=float(sigma),
weights_=(
np.asarray(case_weights, dtype=np.float64)
if case_weights is not None
else np.ones(n, dtype=np.float64)
),
rweights_=np.asarray(rweights, dtype=np.float64),
residuals_=np.asarray(residuals_out, dtype=np.float64),
fitted_=np.asarray(fitted_out, dtype=np.float64),
cov_=np.asarray(cov, dtype=np.float64),
df_residual_=n - p,
converged_=mm.converged,
n_iter_=mm.n_iter,
nobs_=n,
term_names_=term_names,
control=control,
init_=init_info,
rhs_spec_=design.rhs_spec,
design_x_=np.asarray(design_x_out, dtype=np.float64),
design_y_=np.asarray(design_y_out, dtype=np.float64),
)
# sklearn integration. LmRob inherits BaseEstimator + RegressorMixin when
# sklearn is installed; sklearn 1.8+ relies on __sklearn_tags__ from
# BaseEstimator, which means a bare class no longer interoperates with
# Pipeline. The import is wrapped so non-sklearn users still get a working
# LmRob class (it just defines its own get_params / set_params below).
#
# We do the conditional via __init_subclass__-style runtime mixing because
# static type checkers (ty, pyright) refuse to follow a try/except class
# definition. The fallback class is the static base; we patch sklearn's
# BaseEstimator + RegressorMixin onto the MRO at module load when present.
class _LmRobBase:
"""Static base for ``LmRob``. Replaced at runtime with the sklearn-mixed
class when scikit-learn is installed."""
try:
from sklearn.base import BaseEstimator as _SkBaseEstimator
from sklearn.base import RegressorMixin as _SkRegressorMixin
_HAS_SKLEARN = True
class _LmRobBaseSklearn(_SkRegressorMixin, _SkBaseEstimator):
# RegressorMixin must come first so its score() doesn't override ours.
pass
_LmRobBase = _LmRobBaseSklearn # ty: ignore[invalid-assignment] # type: ignore[misc,assignment]
except ImportError:
_HAS_SKLEARN = False
[docs]
class LmRob(_LmRobBase):
"""scikit-learn-style estimator wrapper around :func:`lmrob`.
Inherits ``BaseEstimator`` + ``RegressorMixin`` when scikit-learn is
installed. Drops to a bare class otherwise so non-sklearn callers
don't pay the import cost.
"""
def __init__(self, control: Control | None = None) -> None:
self.control = control or Control()
self._result: LmRobResults | None = None
def fit(self, X: np.ndarray, y: np.ndarray, seed: int | None = None) -> LmRob:
# Build a tiny pandas DataFrame so we can reuse the formula path.
# Constructing the DataFrame from a single dict-of-arrays is faster
# than ``pd.DataFrame(X, columns=...)`` + ``df["y"] = y`` (one
# block-manager build instead of two).
import pandas as pd
X = np.asarray(X, dtype=np.float64)
y = np.asarray(y, dtype=np.float64)
if X.ndim != 2:
raise ValueError("X must be 2-D")
cols = [f"x{i}" for i in range(X.shape[1])]
data_dict: dict[str, np.ndarray] = {c: X[:, i] for i, c in enumerate(cols)}
data_dict["y"] = y
df = pd.DataFrame(data_dict)
formula = "y ~ " + " + ".join(cols)
self._result = lmrob(formula, df, control=self.control, seed=seed)
return self
[docs]
def predict(self, X: np.ndarray) -> np.ndarray:
"""Predict on a new design matrix (raw, without intercept column).
``LmRob`` always fits with an intercept, so we wrap ``X`` in a
DataFrame with the same column names used at fit time and let the
stored formula spec re-add the intercept.
"""
if self._result is None:
raise RuntimeError("call .fit before .predict")
import pandas as pd
X = np.asarray(X, dtype=np.float64)
cols = [f"x{i}" for i in range(X.shape[1])]
return self._result.predict(pd.DataFrame(X, columns=cols))
[docs]
def score(self, X: np.ndarray, y: np.ndarray) -> float:
"""Standard ``R^2`` on a test set, sklearn convention.
``1 - SS_res / SS_tot`` where SS_res = sum((y - y_hat)^2). This is
the OLS coefficient of determination, not the robust R^2 reported
by ``summary()`` (use ``self.result_.summary().r_squared`` for
that). Returning OLS R^2 keeps ``LmRob`` compatible with sklearn
utilities (``cross_val_score``, ``GridSearchCV``) that assume the
regressor scorer contract.
"""
y = np.asarray(y, dtype=np.float64)
y_hat = self.predict(X)
ss_res = float(np.sum((y - y_hat) ** 2))
ss_tot = float(np.sum((y - np.mean(y)) ** 2))
if ss_tot <= 0.0:
return 0.0
return 1.0 - ss_res / ss_tot
# When sklearn is installed, BaseEstimator provides canonical
# get_params/set_params via __init__-signature inspection. The block
# below only fires for the no-sklearn fallback path.
if not _HAS_SKLEARN:
def get_params(self, deep: bool = True) -> dict:
return {"control": self.control}
def set_params(self, **params: object) -> LmRob:
for key, value in params.items():
if not hasattr(self, key):
raise ValueError(f"Invalid parameter {key!r} for LmRob")
setattr(self, key, value)
return self
def __sklearn_is_fitted__(self) -> bool:
return self._result is not None
@property
def result_(self) -> LmRobResults:
if self._result is None:
raise RuntimeError("call .fit before accessing result_")
return self._result