Source code for pyccea.evaluation.wrapper

import os
import pickle
import logging
import warnings
import numpy as np
from collections import OrderedDict
from ..utils.datasets import DataLoader
from ..utils.models import ClassificationModel, RegressionModel
from ..utils.metrics import ClassificationMetrics, RegressionMetrics
from ..utils.memory import force_memory_release

warnings.filterwarnings(action="ignore", category=UserWarning, message="y_pred contains classes")


[docs] class WrapperEvaluation(): """Evaluate selected features based on the predictive performance of a machine learning model. Attributes ---------- model_evaluator : object of one of the metrics classes Responsible for computing performance metrics to evaluate models. base_model : sklearn model object Model that has not been fitted. Works as a template to avoid multiple model initializations. As each model evaluates a subset of features (individual), the base model is copied and fitted for each individual. model : sklearn model object Model that has been fitted to evaluate the current individual. estimators : list of sklearn model objects, optional Estimators used in the current evaluation. It is one when 'eval_mode' is set to "hold_out" and k when 'eval_mode' is set to "k_fold" or "leave_one_out". If 'store_estimators' is False, this attribute is not created. _cache : collections.OrderedDict Internal LRU cache mapping a packed-bit representation of 'solution' to the evaluation dict. The cache is bounded by 'cache_size'. """ models = {"classification": ClassificationModel, "regression": RegressionModel} metrics = {"classification": ClassificationMetrics, "regression": RegressionMetrics} eval_modes = ["hold_out", "k_fold", "leave_one_out"] def __init__( self, task: str, model_type: str, eval_function: str, eval_mode: str, n_classes: int = None, store_estimators: bool = True, cache_size: int = 0, use_subprocess: bool = False ): """ Parameters ---------- task : str Name of the supervised learning task (e.g., regression, classification). model_type : str Name of the machine learning model that will be fitted for the task. eval_function : str Metric that will be used to evaluate the performance of the model trained with the selected subset of features (makes up the fitness of the individual). eval_mode : str Evaluation mode. It can be 'hold_out', 'leave_one_out', or 'k_fold'. n_classes : int, default None Number of classes when task parameter is set to 'classification'. store_estimators : bool, default True Whether to store the estimators used in the evaluation. cache_size : int, default 0 Maximum number of distinct feature-subset evaluations to keep in an in-memory LRU cache. Set to 0 or None to disable caching. This is useful when the evolutionary loop revisits the same solution multiple time (e.g., due to elitism/crossover), avoiding redundant model fits. Note: caching assumes evaluation is deterministic for a given solution. use_subprocess : bool, default False Whether to evaluate in a subprocess to release native memory back to the OS. """ # Check if the chosen task is available if not task in WrapperEvaluation.metrics.keys(): raise NotImplementedError( f"Task '{task}' is not implemented. " f"The available tasks are {', '.join(WrapperEvaluation.metrics.keys())}." ) # Initialize the model evaluator according to the task task_kwargs = { "classification": {"n_classes": n_classes}, "regression": {}, } self.model_evaluator = WrapperEvaluation.metrics[task](**task_kwargs[task]) self.task = task # Check if the chosen evaluation function is available if not eval_function in self.model_evaluator.metrics: raise NotImplementedError( f"Evaluation function '{eval_function}' is not implemented. " f"The available {task} metrics are " f"{', '.join(self.model_evaluator.metrics)}." ) self.eval_function = eval_function # Initialize the model present in the wrapper model_evaluator self.base_model = WrapperEvaluation.models[task](model_type=model_type) self.model_type = model_type # Check if the chosen evaluation mode is available if not eval_mode in WrapperEvaluation.eval_modes: raise NotImplementedError( f"Evaluation mode '{eval_mode}' is not implemented. " f"The available evaluation modes are {', '.join(WrapperEvaluation.eval_modes)}." ) self.eval_mode = eval_mode self.store_estimators = store_estimators self.use_subprocess = use_subprocess self._init_kwargs = { "task": task, "model_type": model_type, "eval_function": eval_function, "eval_mode": eval_mode, "n_classes": n_classes, "store_estimators": store_estimators, "cache_size": cache_size, "use_subprocess": use_subprocess, } # Optional bounded cache: avoids reffiting identifical feature subsets many times self.cache_size = int(cache_size) if cache_size is not None else 0 self._cache = OrderedDict() if self.cache_size > 0 else None # Initialize logger with info level logging.basicConfig(encoding="utf-8", level=logging.INFO) if self.use_subprocess and self.store_estimators: raise ValueError("Subprocess evaluation does not support storing estimators.")
[docs] def clone(self) -> "WrapperEvaluation": """Create a new evaluator with the same configuration.""" return WrapperEvaluation(**self._init_kwargs)
def _hold_out_validation(self, solution_mask: np.ndarray, data: DataLoader) -> None: """Evaluate an individual using hold_out validation (train/test).""" # Get model that has not been previously fitted model = self.base_model.clone() # Train model with the current subset of features model.train( X_train=data.X_train[:, solution_mask], y_train=data.y_train, optimize=False, verbose=False ) if self.store_estimators: self.estimators.append(model.estimator) # Evaluate the individual self.model_evaluator.compute( estimator=model.estimator, X_test=data.X_test[:, solution_mask], y_test=data.y_test, verbose=False ) # Get evaluation in the test set self.evaluations = self.model_evaluator.values del model force_memory_release() def _cross_validation(self, solution_mask: np.ndarray, data: DataLoader) -> None: """Evaluate an individual using cross-validation (leave-one-out or k-fold).""" for k in range(data.kfolds): # Get training and validations subsets built from the full training set X_train, y_train, X_val, y_val = data.get_fold(k) # Get model that has not been previously fitted model = self.base_model.clone() # Train model with the current subset of features model.train( X_train=X_train[:, solution_mask], y_train=y_train, optimize=False, verbose=False ) if self.store_estimators: self.estimators.append(model.estimator) # Evaluate the individual self.model_evaluator.compute( estimator=model.estimator, X_test=X_val[:, solution_mask], y_test=y_val, verbose=False ) for metric in self.evaluations.keys(): self.evaluations[metric] += self.model_evaluator.values[metric] del model # Calculate average performance over k folds for metric in self.evaluations.keys(): self.evaluations[metric] = round(self.evaluations[metric]/data.kfolds, 4) del X_train, X_val, y_train, y_val force_memory_release() def _evaluate_core(self, solution: np.ndarray, data: DataLoader) -> dict: """Evaluate an individual without cache or subprocess.""" # If no feature is selected self.evaluations = {metric: 0 for metric in self.model_evaluator.metrics} if solution.sum() == 0: return self.evaluations # Boolean array used to filter which features will be used to fit the model solution_mask = solution.astype(bool) # Hold-out validation if self.eval_mode == "hold_out": self._hold_out_validation( solution_mask=solution_mask, data=data, ) # K-fold cross validation or leave-one-out cross validation elif self.eval_mode in ["k_fold", "leave_one_out"]: self._cross_validation( solution_mask=solution_mask, data=data, ) return self.evaluations def _evaluate_in_subprocess(self, solution: np.ndarray, data: DataLoader) -> dict: """Evaluate in a forked subprocess and return evaluations.""" if os.name != "posix": return self._evaluate_core(solution=solution, data=data) read_fd, write_fd = os.pipe() pid = os.fork() if pid == 0: try: os.close(read_fd) result = self._evaluate_core(solution=solution, data=data) payload = pickle.dumps(result) os.write(write_fd, payload) except Exception as e: payload = pickle.dumps({"__error__": repr(e)}) os.write(write_fd, payload) finally: os.close(write_fd) os._exit(0) os.close(write_fd) chunks = list() while True: chunk = os.read(read_fd, 1024 * 1024) if not chunk: break chunks.append(chunk) os.close(read_fd) os.waitpid(pid, 0) data_bytes = b"".join(chunks) if not data_bytes: return {} result = pickle.loads(data_bytes) if "__error__" in result: raise RunetimeError(result["__error__"]) self.evaluations = result return result
[docs] def evaluate(self, solution: np.ndarray, data: DataLoader) -> dict: """Evaluate an individual represented by a complete solution through the predictive performance of a machine learning model. Parameters ---------- solution : np.ndarray Solution represented by a binary n-dimensional array, where n is the number of features. data : DataLoader Container with process data and training and test sets. Returns ------- : dict Evaluation metrics. """ # Cache lookup (key is unique for fixed-length solutions) cache_key = None if self.cache_size > 0: packed = np.packbits(solution.astype(np.uint8, copy=False)) cache_key = (solution.shape[0], packed.tobytes()) cached = self._cache.get(cache_key) if cached is not None: self._cache.move_to_end(cache_key) return cached.copy() # Estimator(s) used for the current evaluation if self.store_estimators: self.estimators = list() # Evaluate in subprocess (if enabled) if self.use_subprocess: result = self._evaluate_in_subprocess(solution=solution, data=data) else: result = self._evaluate_core(solution=solution, data=data) # Cache store (bounded) if cache_key is not None and self.cache_size > 0: self._cache[cache_key] = result.copy() self._cache.move_to_end(cache_key) while len(self._cache) > self.cache_size: self._cache.popitem(last=False) return result