Source code for nestkit.conformal.regressor_conformal

"""CV+ Mondrian conformal prediction intervals for regression."""

from __future__ import annotations

import warnings

import numpy as np

from nestkit.conformal.results import RegressorConformalResult


[docs] class MondrianRegressorConformal: """Mondrian conformal prediction intervals conditioned on predicted value. Bins OOF predictions into equal-frequency groups and computes per-bin residual quantiles, yielding tighter intervals in easy-to-predict regions and wider intervals elsewhere. """
[docs] @staticmethod def fit( oof_predictions: np.ndarray, oof_residuals: np.ndarray, alpha: float = 0.05, n_bins: int = 5, min_bin_size: int = 20, ) -> RegressorConformalResult: """Compute per-bin residual quantiles from OOF data. Parameters ---------- oof_predictions : ndarray of shape (n_cal,) Out-of-fold point predictions. oof_residuals : ndarray of shape (n_cal,) Signed residuals ``y_true - y_pred``. alpha : float Significance level (default 0.05 for 95% coverage). n_bins : int Number of equal-frequency bins for Mondrian conditioning. min_bin_size : int Minimum calibration samples per bin; smaller bins are merged. Returns ------- RegressorConformalResult """ n_cal = len(oof_predictions) # Auto-reduce n_bins if insufficient data max_bins = max(1, n_cal // min_bin_size) if n_bins > max_bins: warnings.warn( f"n_bins={n_bins} would give < {min_bin_size} samples per bin " f"with {n_cal} calibration points; reducing to n_bins={max_bins}.", UserWarning, stacklevel=2, ) n_bins = max_bins # Global fallback quantiles (exact order statistics, finite-sample corrected) fallback_quantiles = _corrected_residual_quantiles(oof_residuals, alpha) if n_bins == 1: # Single bin = global conformal return RegressorConformalResult( alpha=alpha, n_bins=1, bin_edges=np.array([-np.inf, np.inf]), bin_quantiles=[fallback_quantiles], bin_counts=np.array([n_cal]), fallback_quantiles=fallback_quantiles, ) # Equal-frequency bin edges from quantiles of OOF predictions quantile_fracs = np.linspace(0, 1, n_bins + 1) bin_edges = np.quantile(oof_predictions, quantile_fracs) # Ensure extreme edges capture all data bin_edges[0] = -np.inf bin_edges[-1] = np.inf # Assign calibration points to bins assignments = np.digitize(oof_predictions, bin_edges[1:-1], right=False) # digitize returns 0..n_bins-1 with this setup # Merge small bins with nearest neighbor bin_edges, assignments = _merge_small_bins(bin_edges, assignments, n_bins, min_bin_size) final_n_bins = len(bin_edges) - 1 # Compute per-bin quantiles bin_quantiles = [] bin_counts = np.empty(final_n_bins, dtype=int) for b in range(final_n_bins): mask = assignments == b n_b = int(mask.sum()) bin_counts[b] = n_b if n_b == 0: bin_quantiles.append(fallback_quantiles) continue resid_b = oof_residuals[mask] bin_quantiles.append(_corrected_residual_quantiles(resid_b, alpha)) return RegressorConformalResult( alpha=alpha, n_bins=final_n_bins, bin_edges=bin_edges, bin_quantiles=bin_quantiles, bin_counts=bin_counts, fallback_quantiles=fallback_quantiles, )
[docs] @staticmethod def predict( test_predictions: np.ndarray, conformal_result: RegressorConformalResult, ) -> dict: """Generate per-bin prediction intervals for test predictions. Parameters ---------- test_predictions : ndarray of shape (n_test,) Point predictions for test data. conformal_result : RegressorConformalResult Result from :meth:`fit`. Returns ------- dict ``lower``: ndarray of lower bounds. ``upper``: ndarray of upper bounds. ``bin_assignments``: ndarray of int bin indices. """ edges = conformal_result.bin_edges n_bins = conformal_result.n_bins # Assign to bins (clamp to valid range) assignments = np.digitize(test_predictions, edges[1:-1], right=False) assignments = np.clip(assignments, 0, n_bins - 1) lower = np.empty_like(test_predictions) upper = np.empty_like(test_predictions) for b in range(n_bins): mask = assignments == b if not mask.any(): continue q_lo, q_hi = conformal_result.bin_quantiles[b] lower[mask] = test_predictions[mask] + q_lo upper[mask] = test_predictions[mask] + q_hi return { "lower": lower, "upper": upper, "bin_assignments": assignments, }
def _corrected_quantile_levels(n: int, alpha: float) -> tuple[float, float]: """Finite-sample corrected quantile levels for conformal intervals. .. deprecated:: Prefer :func:`_corrected_residual_quantiles` which uses exact order statistics instead of ``np.quantile`` interpolation. """ if n == 0: return 0.0, 1.0 q_lo = max(0.0, np.floor((alpha / 2) * (n + 1)) / n) q_hi = min(1.0, np.ceil((1 - alpha / 2) * (n + 1)) / n) return float(q_lo), float(q_hi) def _corrected_residual_quantiles(residuals: np.ndarray, alpha: float) -> tuple[float, float]: """Exact order-statistic residual quantiles with finite-sample correction. Uses the same correction as :func:`_corrected_quantile_levels` but returns exact order statistics from the sorted residual array instead of relying on ``np.quantile`` interpolation. This matches the approach used in :class:`MondrianClassifierConformal` and is required for formal conformal coverage guarantees. """ n = len(residuals) if n == 0: return 0.0, 0.0 sorted_resid = np.sort(residuals) # Lower: floor-based index (conservative lower bound) k_lo = int(np.floor((alpha / 2) * (n + 1))) - 1 # 0-indexed k_lo = max(0, min(k_lo, n - 1)) # Upper: ceil-based index (conservative upper bound) k_hi = int(np.ceil((1 - alpha / 2) * (n + 1))) - 1 # 0-indexed k_hi = max(0, min(k_hi, n - 1)) return float(sorted_resid[k_lo]), float(sorted_resid[k_hi]) def _merge_small_bins( bin_edges: np.ndarray, assignments: np.ndarray, n_bins: int, min_bin_size: int, ) -> tuple[np.ndarray, np.ndarray]: """Merge bins with fewer than ``min_bin_size`` samples into neighbours.""" while True: unique_bins = np.arange(len(bin_edges) - 1) counts = np.array([int((assignments == b).sum()) for b in unique_bins]) small = np.where(counts < min_bin_size)[0] if len(small) == 0 or len(unique_bins) <= 1: break # Pick the smallest bin victim = small[np.argmin(counts[small])] current_n_bins = len(unique_bins) if victim == 0: merge_with = 1 elif victim == current_n_bins - 1: merge_with = current_n_bins - 2 else: # Merge with the neighbour that has fewer samples (balance) merge_with = victim - 1 if counts[victim - 1] <= counts[victim + 1] else victim + 1 # Merge: absorb victim into merge_with lo, hi = sorted([victim, merge_with]) # Remove the interior edge between lo and hi bin_edges = np.delete(bin_edges, lo + 1) # Reassign: anything in hi becomes lo assignments[assignments == hi] = lo # Shift higher bin indices down assignments[assignments > hi] -= 1 return bin_edges, assignments