Source code for pyccea.coevolution.ccea

import os
import logging
import threading
import numpy as np
from abc import ABC, abstractmethod
from ..utils.datasets import DataLoader
from ..utils.memory import force_memory_release
from concurrent.futures import ThreadPoolExecutor


[docs] class CCEA(ABC): """ An abstract class for a Cooperative Co-Evolutionary-Based Feature Selection Algorithm. Attributes ---------- subpop_sizes: list Subpopulation sizes, that is, the number of individuals in each subpopulation. decomposer: object of one of the decomposition classes Responsible for decompose the problem into smaller subproblems. collaborator: object of one of the collaboration classes. Responsible for selecting collaborators for individuals. fitness_function: object of one of the fitness classes. Responsible for evaluating individuals, that is, subsets of features. initializer: object of one of the subpopulation initializers Responsible for initializing all individuals of all subpopulations. optimizers: list of objects of optimizer classes Responsible for evolving each of the subpopulations individually. subpops: list Individuals from all subpopulations. Each individual is represented by a binary n-dimensional array, where n is the number of features. If there is a 1 in the i-th position of the array, it indicates that the i-th feature should be considered and if there is a 0, it indicates that the feature should not be considered. fitness: list Evaluation of all context vectors from all subpopulations. context_vectors: list Complete problem solutions. convergence_curve: list Best global fitness in each generation. current_best: dict Current best individual of each subpopulation and its respective evaluation. best_context_vector: np.ndarray Best solution of the complete problem. best_fitness: float Evaluation of the best solution of the complete problem. feature_idxs : np.ndarray List of feature indexes. best_context_vectors: list Best context vector in each generation. """ def __init__(self, data: DataLoader, conf: dict, verbose: bool = True): """ Parameters ---------- data: DataLoader Container with process data and training and test sets. conf: dict Configuration parameters of the cooperative coevolutionary algorithm. verbose: bool, default True If True, show the improvements obtained from the optimization process. """ # Seed self.seed = conf["coevolution"].get("seed") # Verbose self.verbose = verbose # Data self.data = data # Size of each subpopulation self.subpop_sizes = conf["coevolution"]["subpop_sizes"] # Number of subcomponents self.n_subcomps = conf["coevolution"].get("n_subcomps") if self.n_subcomps: if self.n_subcomps != len(self.subpop_sizes): if len(self.subpop_sizes) == 1: subpop_size = self.subpop_sizes[0] logging.info(f"Considering all subpopulations with size {subpop_size}.") self.subpop_sizes = [subpop_size] * self.n_subcomps else: raise AssertionError( f"The number of subcomponents ({self.n_subcomps}) is not equal to the " f"number of subpopulations ({len(self.subpop_sizes)}). Check parameters " "'n_subcomps' and 'subpop_sizes' in the configuration file." ) # Number of features in each subcomponent self.subcomp_sizes = conf["coevolution"].get("subcomp_sizes") if self.subcomp_sizes: if len(self.subcomp_sizes) != len(self.subpop_sizes): raise AssertionError( f"The number of subcomponents ({len(self.subcomp_sizes)}) is not equal to the" f" number of subpopulations ({len(self.subpop_sizes)}). Check parameters " "'subcomp_sizes' and 'subpop_sizes' in the configuration file." ) # Evaluation mode self.eval_mode = self.data.splitter_type # Configuration parameters self.conf = conf # Initializes the components of the cooperative co-evolutionary algorithm self._init_evaluator() self._init_decomposer() self._init_collaborator() # List to store the best global fitness in each generation self.convergence_curve = list() # List to store the best context vector in each generation self.best_context_vectors = list() # Maximum number of best context vectors to store self.max_best_context_vectors = conf["coevolution"].get("max_best_context_vectors") # Optional memory profiling self.memory_profile = conf["coevolution"].get("memory_profile", False) self.memory_log_every = conf["coevolution"].get("memory_log_every", 0) # Optional parallel evaluation workers self.evaluation_workers = conf["evaluation"].get("n_workers", 1) self._fitness_fn_tls = threading.local() # Initialize logger with info level logging.basicConfig(encoding="utf-8", level=logging.INFO) # Reset handlers logging.getLogger().handlers = [] # Add a custom handler handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('%(message)s')) logging.getLogger().addHandler(handler) @abstractmethod def _init_decomposer(self): """Instantiate feature grouping method.""" pass @abstractmethod def _init_evaluator(self): """Instantiate evaluation method.""" pass @abstractmethod def _init_collaborator(self): """Instantiate collaboration method.""" pass @abstractmethod def _init_subpop_initializer(self): """Instantiate subpopulation initialization method.""" pass @abstractmethod def _init_optimizers(self): """Instantiate evolutionary algorithms to evolve each subpopulation.""" pass
[docs] @abstractmethod def optimize(self): """Solve the feature selection problem through optimization.""" pass
def _get_best_individuals(self, subpops: list, fitness: list, context_vectors: list): """ Get the best individual from each subpopulation. Parameters ---------- subpops: list Individuals from all subpopulations. Each individual is represented by a binary n-dimensional array, where n is the number of features. If there is a 1 in the i-th position of the array, it indicates that the i-th feature should be considered and if there is a 0, it indicates that the feature should not be considered. fitness: list Evaluation of all context vectors from all subpopulations. context_vectors: list Complete problem solutions. Returns ------- current_best: dict Current best individual of each subpopulation and its respective evaluation. """ # Current best individual of each subpopulation current_best = dict() # Number of subpopulations n_subpops = len(subpops) # For each subpopulation for i in range(n_subpops): best_ind_idx = np.argmax(fitness[i]) current_best[i] = dict() current_best[i]["individual"] = subpops[i][best_ind_idx].copy() current_best[i]["context_vector"] = context_vectors[i].copy() current_best[i]["fitness"] = fitness[i][best_ind_idx] return current_best def _get_global_best(self): """Get the globally best context vector.""" best_idx = np.argmax([best["fitness"] for best in self.current_best.values()]) best_fitness = self.current_best[best_idx]["fitness"] best_context_vector = self.current_best[best_idx]["context_vector"].copy() return best_context_vector, best_fitness def _record_best_context_vector(self, context_vector: np.ndarray) -> None: """Store best context vectors while keeping memory bounded.""" self.best_context_vectors.append(context_vector.copy()) max_keep = self.max_best_context_vectors if max_keep is None: return if isinstance(max_keep, int) and max_keep > 0: if len(self.best_context_vectors) > max_keep: self.best_context_vectors = self.best_context_vectors[-max_keep:] if isinstance(max_keep, int) and max_keep <= 0: self.best_context_vectors.clear() def _get_rss_mb(self) -> float: """Get resident set size (RSS) in megabytes (MB) when available.""" if os.name != "posix": return None try: with open("/proc/self/status", "r", encoding="utf-8") as status_file: for line in status_file: if line.startswith("VmRSS:"): parts = line.split() return round(int(parts[1]) / 1024, 2) # Convert from kB to MB except FileNotFoundError: return None return None def _evaluate_context_vectors(self, context_vectors: list) -> list: """Evaluate a batch of context vectors with optional parallel workers.""" def _get_local_fitness_fn(): fitness_fn = getattr(self._fitness_fn_tls, "fitness_fn", None) if fitness_fn is None: if hasattr(self.fitness_function, "clone"): fitness_fn = self.fitness_function.clone() else: fitness_fn = self.fitness_function self._fitness_fn_tls.fitness_fn = fitness_fn return fitness_fn if self.evaluation_workers and self.evaluation_workers > 1: with ThreadPoolExecutor(max_workers=self.evaluation_workers) as executor: return list( executor.map( lambda cv: _get_local_fitness_fn().evaluate(cv, self.data), context_vectors ) ) return [_get_local_fitness_fn().evaluate(cv, self.data) for cv in context_vectors] def _sum_nbytes(self, obj) -> int: """Recursively sum the number of bytes of a given object and its contents.""" total_size = 0 if isinstance(obj, np.ndarray): total_size += obj.nbytes elif isinstance(obj, (list, tuple, set)): for item in obj: total_size += self._sum_nbytes(item) elif isinstance(obj, dict): for key, value in obj.items(): total_size += self._sum_nbytes(key) total_size += self._sum_nbytes(value) return total_size def _log_memory_usage(self, stage: str, n_gen: int = None) -> None: """Log memory usage of objects to help find bottenecks.""" if not self.memory_profile: return if n_gen is not None: if not self.memory_log_every or (n_gen % self.memory_log_every) != 0: return rss_mb = self._get_rss_mb() subpops_mb = round(self._sum_nbytes(getattr(self, "subpops", None)) / (1024 ** 2), 2) context_vectors_mb = round(self._sum_nbytes(getattr(self, "context_vectors", None)) / (1024 ** 2), 2) best_context_vectors_mb = round(self._sum_nbytes(getattr(self, "best_context_vectors", None)) / (1024 ** 2), 2) training_mb = round(self._sum_nbytes(getattr(self.data, "X_train", None)) / (1024 ** 2), 2) test_mb = round(self._sum_nbytes(getattr(self.data, "X_test", None)) / (1024 ** 2), 2) raw_training_mb = round(self._sum_nbytes(getattr(self.data, "_raw_X_train", None)) / (1024 ** 2), 2) message = ( f"[Memory Usage] Stage: {stage} | " f"{'' if n_gen is None else f' Generation: {n_gen} | '}" f"RSS: {rss_mb} MB | " f"Subpopulations: {subpops_mb} MB | " f"Context Vectors: {context_vectors_mb} MB | " f"Best Context Vectors: {best_context_vectors_mb} MB | " f"Training data: {training_mb} MB | " f"Test data: {test_mb} MB | " f"Raw Training data: {raw_training_mb} MB" ) # Log message even if logging is disabled logger = logging.getLogger() was_disabled = logger.disabled logger.disabled = False logging.info(message) logger.disabled = was_disabled def _init_subpopulations(self): """Initialize all subpopulations according to their respective sizes.""" # Instantiate subpopulation initialization method self._init_subpop_initializer() # Build subpopulations # Number of subpopulations is equal to the number of subcomponents self.initializer.build_subpopulations() # Evaluate all individuals in each subpopulation # Number of individuals in each subpopulation is in the list of subcomponent sizes self.initializer.evaluate_individuals() # Subpopulations self.subpops = self.initializer.subpops # Context vectors self.context_vectors = self.initializer.context_vectors # Evaluations of context vectors self.fitness = self.initializer.fitness delattr(self, "initializer") force_memory_release() def _problem_decomposition(self): """Decompose the problem into smaller subproblems.""" # Decompose only once to use the same feature indexes on all k-folds Xk_train, _, _, _ = self.data.get_fold(0, normalize=False) _, self.feature_idxs = self.decomposer.decompose(X=Xk_train) if hasattr(self, "feature_importances"): # Reorder feature importances according to the shuffling in the feature decomposition self.feature_importances = self.feature_importances[self.feature_idxs] # Reorder training set according to the shuffling in the feature decomposition self.data.X_train = self.data.X_train[:, self.feature_idxs] # Reorder test set according to the shuffling in the feature decomposition self.data.X_test = self.data.X_test[:, self.feature_idxs] # Keep raw training data aligned for fold normalization if it exists if hasattr(self.data, "_raw_X_train"): self.data._raw_X_train = self.data._raw_X_train[:, self.feature_idxs] # Update 'n_subcomps' when it starts with NoneType self.n_subcomps = self.decomposer.n_subcomps # Update 'subcomp_sizes' when it starts with an empty list self.subcomp_sizes = self.decomposer.subcomp_sizes