Source code for nestkit.results.classifier_results

"""Classifier-specific results containers."""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any

import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator
from sklearn.metrics import classification_report

from nestkit._constants import _EPS
from nestkit.conformal.results import ClassifierConformalResult
from nestkit.results._base import _BaseNestedCVResults
from nestkit.thresholding.results import ThresholdResult


[docs] @dataclass class ClassifierOuterFoldResult: """Result of a single outer fold evaluation (classification).""" # Core (always) fold_idx: int train_indices: np.ndarray test_indices: np.ndarray best_params: dict best_inner_score: float inner_cv_results: dict fit_time: float score_time: float fitted_estimator: BaseEstimator | None # Predictions (always) y_true: np.ndarray y_proba_raw: np.ndarray y_pred_default: np.ndarray outer_scores_default: dict = field(default_factory=dict) confusion_matrix_default: np.ndarray = field(default_factory=lambda: np.array([])) # [OPT-CAL] y_proba_calibrated: np.ndarray | None = None calibration_method: str | None = None calibrator: Any | None = None oof_calibration_diagnostics: dict | None = None # [OPT-THR] y_pred_optimized: np.ndarray | None = None outer_scores_optimized: dict | None = None confusion_matrix_optimized: np.ndarray | None = None threshold_result: ThresholdResult | None = None # [OPT-CONFORMAL] conformal_result: ClassifierConformalResult | None = None conformal_prediction_sets: list[list[int]] | None = None conformal_set_sizes: np.ndarray | None = None conformal_coverage: float | None = None
[docs] class ClassifierResults(_BaseNestedCVResults): """Aggregated nested CV results for classification.""" def __init__( self, n_outer_folds: int, feature_names: list[str] | None = None, original_index: Any | None = None, ): super().__init__(n_outer_folds, feature_names, original_index) @property def has_calibration(self) -> bool: if not self.fold_results_: return False return self.fold_results_[0].calibration_method is not None @property def has_threshold_optimization(self) -> bool: if not self.fold_results_: return False return self.fold_results_[0].threshold_result is not None @property def has_conformal(self) -> bool: if not self.fold_results_: return False return self.fold_results_[0].conformal_result is not None
[docs] def finalize(self) -> None: if self._finalized: return self._finalized = True self.best_params_per_fold_ = [fr.best_params for fr in self.fold_results_] # Param stability self._compute_param_stability() # Inner reports from nestkit.inner.tuning_report import InnerCVReport self.inner_reports_ = [ InnerCVReport(fr.inner_cv_results, fr.fold_idx) for fr in self.fold_results_ ] # Default scores self.outer_scores_default_ = pd.DataFrame( [fr.outer_scores_default for fr in self.fold_results_] ) self.summary_default_ = self._compute_summary(self.outer_scores_default_) # Confusion matrices self.confusion_matrices_default_ = [ fr.confusion_matrix_default for fr in self.fold_results_ ] self.confusion_matrix_aggregate_default_ = sum(self.confusion_matrices_default_) # Predictions DataFrame self._build_predictions_df() # Generalization gap self._compute_generalization_gap() # Calibration attributes if self.has_calibration: self._compute_calibration_attributes() # Threshold attributes if self.has_threshold_optimization: self._compute_threshold_attributes() # Conformal attributes if self.has_conformal: self._compute_conformal_attributes()
def _compute_param_stability(self) -> None: if not self.best_params_per_fold_: self.param_stability_ = pd.DataFrame() return all_params = set() for p in self.best_params_per_fold_: all_params.update(p.keys()) rows = [] for param in sorted(all_params): values = [p.get(param) for p in self.best_params_per_fold_] from collections import Counter counts = Counter(values) mode_val, mode_count = counts.most_common(1)[0] rows.append( { "parameter": param, "mode": mode_val, "nunique": len(counts), "agreement_rate": mode_count / len(values), } ) self.param_stability_ = pd.DataFrame(rows) def _build_predictions_df(self) -> None: dfs = [] for fr in self.fold_results_: fold_df = pd.DataFrame( { "y_true": fr.y_true, "y_pred_default": fr.y_pred_default, "fold_idx": fr.fold_idx, } ) # Add raw probabilities if fr.y_proba_raw.ndim == 2: for c in range(fr.y_proba_raw.shape[1]): fold_df[f"y_proba_raw_{c}"] = fr.y_proba_raw[:, c] else: fold_df["y_proba_raw"] = fr.y_proba_raw # Calibrated probabilities if fr.y_proba_calibrated is not None: if fr.y_proba_calibrated.ndim == 2: for c in range(fr.y_proba_calibrated.shape[1]): fold_df[f"y_proba_cal_{c}"] = fr.y_proba_calibrated[:, c] else: fold_df["y_proba_cal"] = fr.y_proba_calibrated # Optimized predictions if fr.y_pred_optimized is not None: fold_df["y_pred_optimized"] = fr.y_pred_optimized # Conformal prediction sets if fr.conformal_set_sizes is not None: fold_df["conformal_set_size"] = fr.conformal_set_sizes fold_df["conformal_in_set"] = [ fr.y_true[i] in fr.conformal_prediction_sets[i] for i in range(len(fr.y_true)) ] # Set original index if available if self._original_index is not None: fold_df.index = self._original_index[fr.test_indices] else: fold_df.index = fr.test_indices dfs.append(fold_df) self.predictions_ = pd.concat(dfs).sort_index() def _compute_generalization_gap(self) -> None: rows = [] for fr in self.fold_results_: row = {"fold_idx": fr.fold_idx, "best_inner_score": fr.best_inner_score} for metric, val in fr.outer_scores_default.items(): row[f"outer_{metric}"] = val rows.append(row) self.generalization_gap_ = pd.DataFrame(rows) def _compute_calibration_attributes(self) -> None: rows = [] for fr in self.fold_results_: if fr.oof_calibration_diagnostics: rows.append({"fold_idx": fr.fold_idx, **fr.oof_calibration_diagnostics}) self.calibration_summary_ = pd.DataFrame(rows) if rows else pd.DataFrame() if not self.calibration_summary_.empty: improvement_rows = [] for _, row in self.calibration_summary_.iterrows(): imp = {"fold_idx": row["fold_idx"]} if "ece_raw" in row and "ece_calibrated" in row: imp["delta_ece"] = row["ece_raw"] - row["ece_calibrated"] if "brier_raw" in row and "brier_calibrated" in row: imp["delta_brier"] = row["brier_raw"] - row["brier_calibrated"] improvement_rows.append(imp) self.calibration_improvement_ = pd.DataFrame(improvement_rows) else: self.calibration_improvement_ = pd.DataFrame() def _compute_threshold_attributes(self) -> None: self.outer_scores_optimized_ = pd.DataFrame( [fr.outer_scores_optimized for fr in self.fold_results_ if fr.outer_scores_optimized] ) self.summary_optimized_ = self._compute_summary(self.outer_scores_optimized_) self.thresholds_per_fold_ = np.array( [ fr.threshold_result.optimal_threshold for fr in self.fold_results_ if fr.threshold_result ] ) if len(self.thresholds_per_fold_) > 0: self.threshold_stability_ = { "mean": float(np.mean(self.thresholds_per_fold_)), "std": float(np.std(self.thresholds_per_fold_, ddof=1)), "cv": float( np.std(self.thresholds_per_fold_, ddof=1) / (np.mean(self.thresholds_per_fold_) + _EPS) ), "range": float(np.ptp(self.thresholds_per_fold_)), } else: self.threshold_stability_ = {} self.confusion_matrices_optimized_ = [ fr.confusion_matrix_optimized for fr in self.fold_results_ if fr.confusion_matrix_optimized is not None ] if self.confusion_matrices_optimized_: self.confusion_matrix_aggregate_optimized_ = sum(self.confusion_matrices_optimized_) else: self.confusion_matrix_aggregate_optimized_ = np.array([])
[docs] def threshold_comparison(self) -> pd.DataFrame: """Side-by-side comparison of default vs optimized threshold performance.""" if not self.has_threshold_optimization: raise ValueError("Threshold optimization was not enabled for this run.") default = self.summary_default_.copy().rename( columns={"mean": "mean_default", "std": "std_default"} ) optimized = self.summary_optimized_.copy().rename( columns={"mean": "mean_optimized", "std": "std_optimized"} ) return pd.merge( default[["metric", "mean_default", "std_default"]], optimized[["metric", "mean_optimized", "std_optimized"]], on="metric", )
[docs] def calibration_report(self) -> pd.DataFrame: """Detailed calibration diagnostics per fold.""" if not self.has_calibration: raise ValueError("Calibration was not enabled for this run.") return self.calibration_summary_.copy()
def _compute_conformal_attributes(self) -> None: coverages = [ fr.conformal_coverage for fr in self.fold_results_ if fr.conformal_coverage is not None ] set_sizes = [ fr.conformal_set_sizes for fr in self.fold_results_ if fr.conformal_set_sizes is not None ] self.conformal_coverage_ = { "mean": float(np.mean(coverages)), "per_fold": coverages, } all_sizes = np.concatenate(set_sizes) self.conformal_set_size_stats_ = { "mean": float(np.mean(all_sizes)), "median": float(np.median(all_sizes)), "frac_singleton": float(np.mean(all_sizes == 1)), "frac_empty": float(np.mean(all_sizes == 0)), "frac_multi": float(np.mean(all_sizes > 1)), } qhats = np.array( [ fr.conformal_result.qhat_per_class for fr in self.fold_results_ if fr.conformal_result is not None ] ) self.conformal_qhat_per_fold_ = qhats if qhats.shape[0] > 1: self.conformal_qhat_stability_ = { "mean_per_class": np.mean(qhats, axis=0).tolist(), "std_per_class": np.std(qhats, axis=0, ddof=1).tolist(), } else: self.conformal_qhat_stability_ = { "mean_per_class": qhats[0].tolist(), "std_per_class": [0.0] * qhats.shape[1], }
[docs] def conformal_report(self) -> pd.DataFrame: """Per-fold conformal coverage and set size statistics. Returns ------- pd.DataFrame One row per outer fold with coverage, mean set size, fraction of singleton and empty prediction sets. Raises ------ ValueError If conformal prediction was not enabled. """ if not self.has_conformal: raise ValueError("Conformal prediction was not enabled for this run.") rows = [] for fr in self.fold_results_: if fr.conformal_coverage is not None: sizes = fr.conformal_set_sizes rows.append( { "fold_idx": fr.fold_idx, "coverage": fr.conformal_coverage, "mean_set_size": float(np.mean(sizes)), "frac_singleton": float(np.mean(sizes == 1)), "frac_empty": float(np.mean(sizes == 0)), } ) return pd.DataFrame(rows)
[docs] def classification_report_pooled(self, threshold: str = "default") -> str: """sklearn-style classification report on pooled OOF predictions.""" y_true = self.predictions_["y_true"].values if threshold == "optimized": if not self.has_threshold_optimization: raise ValueError("Threshold optimization was not enabled.") y_pred = self.predictions_["y_pred_optimized"].values else: y_pred = self.predictions_["y_pred_default"].values return classification_report(y_true, y_pred)