Source code for pyccea.coevolution.ccilfg

import gc
import copy
import logging
import numpy as np
from math import log
from tqdm import tqdm
from scipy.stats import entropy
from tqdm_joblib import tqdm_joblib
from joblib import Parallel, delayed
from sklearn.metrics import mutual_info_score
from sklearn.preprocessing import KBinsDiscretizer
from ..coevolution.ccga import CCGA
from ..decomposition.dummy import DummyFeatureGrouping


[docs] class CCILFG(CCGA): """Cooperative Co-Evolutionary Algorithm with Interaction Learning Feature Grouping (CCILFG). Decomposition strategy proposed on: Hou, Yaqing, et al. "A correlation-guided cooperative coevolutionary method for feature selection via interaction learning-based space division." Swarm and Evolutionary Computation 93 (2025): 101846. """ def _symmetric_uncertainty(self, x: np.ndarray, y: np.ndarray) -> float: """Compute the symmetric uncertainty (SU). It is considered a scale-insensitive correlation measure as it normalizes mutual information. The SU value between two random variables, X and Y, can be defined as follows: SU(X, Y) = 2*IG(X|Y)/(H(X)+H(Y)), where H(X) is the entropy of X, IG(X|Y) refers to the information gain, and H(X|Y) is the conditional entropy. Parameters ---------- x : np.ndarray (n_examples,) First random variable. y : np.ndarray (n_examples,) Second random variable. Returns ------- su : float Symmetric uncertainty between the two inputs. """ # Compute mutual information mi = mutual_info_score(x, y) # Compute entropy of each variable hx = entropy(np.bincount(x)) # Entropy of x hy = entropy(np.bincount(y)) # Entropy of y # Symmetric Uncertainty formula su = 2.0 * mi / (hx + hy) if hx + hy > 0 else 0.0 return su def _compute_symmetric_uncertainty_class( self, X: np.ndarray, y: np.ndarray, n_bins: int = 10 ) -> np.ndarray: """Compute symmetric uncertainty between each feature in X and the class labels y. Parameters ---------- X : np.ndarray (n_samples, n_features) Input matrix (features). y (np.ndarray (n_samples,) Target vector (class labels). n_bins : int Number of bins for discretization of features. Returns ------- su_values : np.ndarray (n_features,) Array of SU values between each feature and the class labels. """ # Discretize features discretizer = KBinsDiscretizer(n_bins=n_bins, encode="ordinal", strategy="uniform") X_discretized = discretizer.fit_transform(X).astype(int) # Discretize class labels if necessary if not np.issubdtype(y.dtype, np.integer): y = np.digitize(y, bins=np.histogram_bin_edges(y, bins=n_bins)) # Compute SU for each feature with the class labels su_values = np.array( [ self._symmetric_uncertainty(X_discretized[:, i], y) for i in range(X.shape[1]) ] ) return su_values def _compute_symmetric_uncertainty_features_parallel( self, X: np.ndarray, n_bins: int = 10 ) -> np.ndarray: """Optimized computation of symmetric uncertainty matrix.""" # Discretize features discretizer = KBinsDiscretizer(n_bins=n_bins, encode="ordinal", strategy="uniform") X_discretized = discretizer.fit_transform(X).astype(int) n_features = X.shape[1] su_matrix = np.zeros((n_features, n_features)) # Fill diagonal with 1s (SU of a feature with itself) np.fill_diagonal(su_matrix, 1.0) def compute_su(i, j): su = self._symmetric_uncertainty(X_discretized[:, i], X_discretized[:, j]) return (i, j, su) # Compute SU for upper triangular matrix only (i <= j) task_count = (n_features * (n_features - 1)) // 2 # Number of unique pairs (i < j) with tqdm_joblib(tqdm(desc="Computing SU between features", total=task_count)) as progress_bar: results = Parallel(n_jobs=-1)( delayed(compute_su)(i, j) for i in range(n_features) for j in range(i + 1, n_features) ) # Populate the symmetric matrix for i, j, su in results: su_matrix[i, j] = su su_matrix[j, i] = su # Exploit symmetry return su_matrix def _compute_symmetric_uncertainty_features( self, X: np.ndarray, n_bins: int = 10 ) -> np.ndarray: """Compute a symmetric uncertainty matrix for all features in dataset X. Parameters ---------- X : np.ndarray (n_samples, n_features) Input matrix (features). n_bins : int Number of bins for discretization of features. Returns ------- su_matrix : np.ndarray (n_features, n_features) Symmetric uncertainty matrix between all features in X. """ # Discretize features discretizer = KBinsDiscretizer(n_bins=n_bins, encode="ordinal", strategy="uniform") X_discretized = discretizer.fit_transform(X).astype(int) n_features = X.shape[1] su_matrix = np.zeros((n_features, n_features)) # Fill diagonal with 1s (SU of a feature with itself) np.fill_diagonal(su_matrix, 1.0) for i in tqdm(range(n_features), desc="Computing SU between features"): for j in range(i + 1, n_features): su = self._symmetric_uncertainty(X_discretized[:, i], X_discretized[:, j]) su_matrix[i, j] = su su_matrix[j, i] = su # Symmetry: SU(i, j) == SU(j, i) return su_matrix def _remove_unimportant_features(self, importances: np.ndarray) -> np.ndarray: """Remove irrelevant or weaken features from folds and subsets. Parameters ---------- importances : np.ndarray (n_features,) Importance of each feature based on symmetric uncertainty. Returns ------- importances : np.ndarray Importance of the remaining features. """ logging.info(f"Removing features with SU less than {round(self.su_threshold, 4)}...") features_to_keep = importances >= self.su_threshold self.removed_features = np.where(features_to_keep == False)[0] logging.info(f"{len(self.removed_features)} features were removed.") # Removing features from subsets and folds self.data.X_train = self.data.X_train[:, features_to_keep].copy() self.data.X_test = self.data.X_test[:, features_to_keep].copy() for k in range(self.data.kfolds): self.data.train_folds[k][0] = self.data.train_folds[k][0][:, features_to_keep].copy() self.data.val_folds[k][0] = self.data.val_folds[k][0][:, features_to_keep].copy() # Importance of the remaining features importances = importances[features_to_keep].copy() return importances def _find_knee_point(self, x: np.ndarray, y: np.ndarray) -> float: """Find the knee point in a curve. The knee point is the point where the curve has the maximum curvature. Parameters ---------- x : np.ndarray x-axis values. y : np.ndarray y-axis values. Returns ------- knee_point : float Knee point in the curve. """ # Define the line connecting the first and last points x0, y0 = x[0], y[0] xn, yn = x[-1], y[-1] ang_coef = (yn - y0) / (xn - x0) # Slope lin_coef = y0 - ang_coef* x0 # Intercept # Calculate distances from the curve points to the line distances = np.abs(ang_coef * x - y + lin_coef) / np.sqrt(ang_coef**2 + 1) # Find the index of the maximum distance (knee point) knee_index = np.argmax(distances) # Return the knee point coordinates and gamma gamma = y[knee_index] return gamma def _space_division_based_on_interaction_learning( self, su_matrix: np.ndarray, promising_features: np.ndarray, K: int = 3 ) -> np.ndarray: """Space division based on interaction learning. Parameters ---------- su_matrix : np.ndarray (n_features, n_features) Symmetric uncertainty matrix between all features in X. promising_features : np.ndarray (n_features,) Indices of promising features. K : int Number of neighbors to consider for each feature. Returns ------- ranking : np.ndarray (n_features,) Ranking of features based on the space division. """ # Initialize subspaces self.boundary_subspace = [] # B self.high_correlation_subspace = [] # H self.low_correlation_subspace = [] # L # Classify each feature based on neighbors for i, feature_idx in tqdm(enumerate(range(su_matrix.shape[0])), desc="Classifying features based on neighbors"): # Get SU_F values for this feature with all others su_values = su_matrix[i] # Get indices of top K neighbors (excluding itself) neighbor_indices = np.argsort(su_values)[-K - 1:-1][::-1] # Determine the group of the feature and its neighbors feature_group = "Fp" if feature_idx in promising_features else "Fr" neighbor_groups = [ "Fp" if neighbor in promising_features else "Fr" for neighbor in neighbor_indices ] # Check if all neighbors belong to the same group if all(group == feature_group for group in neighbor_groups): if feature_group == "Fp": self.high_correlation_subspace.append(feature_idx) # Add to H else: self.low_correlation_subspace.append(feature_idx) # Add to L else: self.boundary_subspace.append(feature_idx) # Add to B logging.info(f"Number of features in B: {len(self.boundary_subspace)}") logging.info(f"Number of features in H: {len(self.high_correlation_subspace)}") logging.info(f"Number of features in L: {len(self.low_correlation_subspace)}") ranking = np.array( self.high_correlation_subspace + self.low_correlation_subspace + self.boundary_subspace ) return ranking def _init_decomposer(self) -> None: """Instantiate feature grouping method.""" # Compute feature importances SU_c = self._compute_symmetric_uncertainty_class(self.data.X_train, self.data.y_train) # Compute symmetric uncertainty threshold # The threshold is 5% of the maximal feature importance of the current dataset self.su_threshold = 0.05*np.max(SU_c) # Remove irrelevant or weaken relevant features SU_c = self._remove_unimportant_features(SU_c) self.importances = SU_c.copy() # Find knee point sorted_indices = np.argsort(SU_c)[::-1] sorted_SU_c = np.array(SU_c)[sorted_indices] ranking = np.arange(1, len(sorted_SU_c) + 1) gamma = self._find_knee_point(ranking, sorted_SU_c) # Select promising and remaining features promising_features = np.where(SU_c >= gamma)[0] remaining_features = np.where(SU_c < gamma)[0] # Compute symmetric uncertainty matrix for all features if self.data.n_features > 100000: logging.info("Computing SU matrix in parallel...") SU_f = self._compute_symmetric_uncertainty_features_parallel(self.data.X_train) else: logging.info("Computing SU matrix sequentially...") SU_f = self._compute_symmetric_uncertainty_features(self.data.X_train) # Space division based on interaction learning ranking = self._space_division_based_on_interaction_learning(SU_f, promising_features) # Set the number of subcomponents and their sizes self.n_subcomps = 3 self.subcomp_sizes = [ len(self.high_correlation_subspace), len(self.low_correlation_subspace), len(self.boundary_subspace) ] # Ranking feature grouping using variable importances as scores self.decomposer = DummyFeatureGrouping( subcomp_sizes=self.subcomp_sizes, feature_idxs=ranking )
[docs] def optimize(self) -> None: """Solve the feature selection problem through optimization.""" # Decompose problem self._problem_decomposition() # Initialize subpopulations self._init_subpopulations() # Instantiate optimizers self._init_optimizers() # Get the best individual and context vector from each subpopulation self.current_best = self._get_best_individuals( subpops=self.subpops, fitness=self.fitness, context_vectors=self.context_vectors ) # Select the globally best context vector self.best_context_vector, self.best_fitness = self._get_global_best() self.best_context_vectors.append(self.best_context_vector.copy()) # Save the order of features considered in the random feature grouping self.best_feature_idxs = self.feature_idxs.copy() # Set the number of generations counter n_gen = 0 # Number of generations that the best fitness has not improved stagnation_counter = 0 # Initialize the optimization progress bar progress_bar = tqdm(total=self.conf["coevolution"]["max_gen"], desc="Generations", leave=False) # Iterate up to the maximum number of generations while n_gen <= self.conf["coevolution"]["max_gen"]: # Append current best fitness self.convergence_curve.append(self.best_fitness) # Evolve each subpopulation using a genetic algorithm current_subpops = list() for i in range(self.n_subcomps): current_subpop = self.optimizers[i].evolve( subpop=self.subpops[i], fitness=self.fitness[i] ) current_subpops.append(current_subpop) # Evaluate each individual of the evolved subpopulations current_fitness = list() current_context_vectors = list() for i in range(self.n_subcomps): current_fitness.append(list()) current_context_vectors.append(list()) # Use best individuals from the previous generation (`self.current_best`) as # collaborators for each individual in the current generation after evolve # (`current_subpops`) for j in range(self.subpop_sizes[i]): collaborators = self.best_collaborator.get_collaborators( subpop_idx=i, indiv_idx=j, current_subpops=current_subpops, current_best=self.current_best ) context_vector = self.best_collaborator.build_context_vector(collaborators) # Update the context vector current_context_vectors[i].append(context_vector.copy()) # Update fitness current_fitness[i].append(self.fitness_function.evaluate(context_vector, self.data)) # Update subpopulations, context vectors and evaluations self.subpops = copy.deepcopy(current_subpops) self.fitness = copy.deepcopy(current_fitness) self.context_vectors = copy.deepcopy(current_context_vectors) del current_subpops, current_fitness, current_context_vectors gc.collect() # Get the best individual and context vector from each subpopulation self.current_best = self._get_best_individuals( subpops=self.subpops, fitness=self.fitness, context_vectors=self.context_vectors ) # Select the globally best context vector best_context_vector, best_fitness = self._get_global_best() # Update best context vector if self.best_fitness < best_fitness: # Reset stagnation counter because best fitness has improved stagnation_counter = 0 # Enable logger if specified logging.getLogger().disabled = False if self.verbose else True # Current fitness current_best_fitness = round(self.best_fitness, 4) # New fitness new_best_fitness = round(best_fitness, 4) # Show improvement logging.info( f"\nUpdate fitness from {current_best_fitness} to {new_best_fitness}.\n" ) # Update best context vector self.best_context_vector = best_context_vector.copy() self.best_context_vectors.append(self.best_context_vector.copy()) # Update best fitness self.best_fitness = best_fitness else: # Increase stagnation counter because best fitness has not improved stagnation_counter += 1 # Checks whether the optimization has been stagnant for a long time if stagnation_counter >= self.conf["coevolution"]["max_gen_without_improvement"]: # Enable logger logging.getLogger().disabled = False logging.info( "\nEarly stopping because fitness has been stagnant for " f"{stagnation_counter} generations in a row." ) break # Increase number of generations n_gen += 1 # Update progress bar progress_bar.update(1) # Close progress bar after optimization progress_bar.close()