Source code for pyccea.coevolution.ccsufg

import gc
import copy
import logging
import numpy as np
from math import log
from tqdm import tqdm
from ..coevolution.ccga import CCGA
from ..decomposition.ranking import RankingFeatureGrouping


[docs] class CCSUFG(CCGA): """Cooperative Co-Evolutionary Algorithm with Symmetric Uncertainty Feature Grouping (CCSUFG). Symmetric Uncertainty computation based on: Li, Jundong, et al. "Feature selection: A data perspective." ACM Computing Surveys (CSUR) 50.6 (2017): 1-45. Decomposition strategy proposed on: Song, Xian-Fang, et al. "Variable-size cooperative coevolutionary particle swarm optimization for feature selection on high-dimensional data." IEEE Transactions on Evolutionary Computation 24.5 (2020): 882-895. """ def _hist(self, samples: list): """Build a histogram from a list of samples. Parameters ---------- samples : list Samples to build a histogram. Returns ------- histogram : iterator Histogram made from the samples. """ d = dict() for s in samples: d[s] = d.get(s, 0) + 1 histogram = map(lambda z: float(z)/len(samples), d.values()) return histogram def _elog(self, x: float) -> float: """Compute the product of the logarithm of a value with the value itself. For entropy, 0 * log(0) is 0. However, log(0) raises an error. This function handles this specific case. Parameters ---------- x : float Target value. Returns ------- : float Product of the logarithm of the input value with the value itself. """ if x <= 0. or x >= 1.: return 0 else: return x*log(x) def _entropy_from_probs(self, probs: list, base: int = 2) -> float: """Compute entropy from a normalized list of probabilities of discrete outcomes. Parameters ---------- probs : list Probabilities of discrete outcomes. base : int, default 2 Logarithm base. Returns ------- : float Entropy. """ return -sum(map(self._elog, probs))/log(base) def _compute_discrete_entropy(self, samples: list, base: int = 2) -> float: """Compute discrete entropy given a list of samples (any hashable object). Parameters ---------- samples : list Samples to calculate discrete entropy. base : int, default 2 Logarithm base. Returns ------- : float Discrete entropy. """ return self._entropy_from_probs(self._hist(samples), base=base) def _midd(self, X: list, Y: list) -> float: """Compute discrete mutual information given a list of samples (any hashable object). Parameters ---------- X : np.ndarray (n_examples,) First random variable. Y : np.ndarray (n_examples,) Second random variable. Returns ------- : float Discrete mutual information. """ return ( -self._compute_discrete_entropy(list(zip(X, Y)))+ self._compute_discrete_entropy(X)+self._compute_discrete_entropy(Y) ) def _compute_conditional_entropy(self, X: np.ndarray, Y: np.ndarray) -> float: """Compute the conditional entropy (CE). The CE value between two variables, X and Y, can be defined as follows: CE(X,Y) = H(X) - I(X|Y), which measures the uncertainty of X when Y is given. Parameters ---------- X : np.ndarray (n_examples,) First random variable. Y : np.ndarray (n_examples,) Second random variable. Returns ------- conditional_entropy : float Conditional entropy of X and Y. """ conditional_entropy = self._compute_discrete_entropy(X) - self._midd(X, Y) return conditional_entropy def _compute_information_gain(self, X: np.ndarray, Y: np.ndarray) -> float: """Compute the information gain (IG). The IG value between two random variables, X and Y, can be defined as follows: IG(X, Y) = H(X) - H(X|Y), which represents the decrease degree of uncertainty of X when Y is known. Parameters ---------- X : np.ndarray (n_examples,) First random variable. Y : np.ndarray (n_examples,) Second random variable. Returns ------- information_gain : float Information gain between the two inputs. """ information_gain = ( self._compute_discrete_entropy(X) - self._compute_conditional_entropy(X, Y) ) return information_gain def _compute_symmetric_uncertainty(self, X: np.ndarray, Y: np.ndarray) -> float: """Compute the symmetric uncertainty (SU). It is considered a scale-insensitive correlation measure as it normalizes mutual information. The SU value between two random variables, X and Y, can be defined as follows: SU(X, Y) = 2*IG(X|Y)/(H(X)+H(Y)), where H(X) is the entropy of X, IG(X|Y) refers to the information gain, and H(X|Y) is the conditional entropy. Parameters ---------- X : np.ndarray (n_examples,) First random variable. Y : np.ndarray (n_examples,) Second random variable. Returns ------- symmetric_uncertainty : float Symmetric uncertainty between the two inputs. """ # Calculate information gain of X and Y t1 = self._compute_information_gain(X, Y) # Calculate entropy of X t2 = self._compute_discrete_entropy(X) # Calculate entropy of Y t3 = self._compute_discrete_entropy(Y) # Compute symmetric uncertainty symmetric_uncertainty = 2.0*t1/(t2+t3) return symmetric_uncertainty def _remove_unimportant_features(self, importances: np.ndarray) -> np.ndarray: """Remove irrelevant or weaken features from folds and subsets. Parameters ---------- importances : np.ndarray (n_features,) Importance of each feature based on symmetric uncertainty. Returns ------- importances : np.ndarray Importance of the remaining features. """ logging.info(f"Removing features with SU less than {round(self.su_threshold, 4)}...") features_to_keep = importances >= self.su_threshold self.removed_features = np.where(features_to_keep == False)[0] logging.info(f"{len(self.removed_features)} features were removed.") # Removing features from subsets and folds self.data.X_train = self.data.X_train[:, features_to_keep].copy() self.data.X_test = self.data.X_test[:, features_to_keep].copy() for k in range(self.data.kfolds): self.data.train_folds[k][0] = self.data.train_folds[k][0][:, features_to_keep].copy() self.data.val_folds[k][0] = self.data.val_folds[k][0][:, features_to_keep].copy() # Importance of the remaining features importances = importances[features_to_keep].copy() return importances def _compute_variable_importances(self) -> list: """Compute symmetric uncertainty between each feature and class labels. Returns ------- importances : np.ndarray Importance of each feature based on symmetric uncertainty. """ importances = list() for i in range(self.data.n_features): importances.append( self._compute_symmetric_uncertainty( self.data.X_train[:, i], self.data.y_train ) ) return np.array(importances) def _init_decomposer(self) -> None: """Instantiate feature grouping method.""" # Compute feature importances importances = self._compute_variable_importances() # Compute symmetric uncertainty threshold # The threshold is 10% of the maximal feature importance of the current dataset self.su_threshold = 0.10*np.max(importances) # Remove irrelevant or weaken relevant features importances = self._remove_unimportant_features(importances) # Ranking feature grouping using variable importances as scores self.decomposer = RankingFeatureGrouping( n_subcomps=self.n_subcomps, subcomp_sizes=self.subcomp_sizes, scores=importances, method="elitist", ascending=False )
[docs] def optimize(self) -> None: """Solve the feature selection problem through optimization.""" # Decompose problem self._problem_decomposition() # Initialize subpopulations self._init_subpopulations() # Instantiate optimizers self._init_optimizers() # Get the best individual and context vector from each subpopulation self.current_best = self._get_best_individuals( subpops=self.subpops, fitness=self.fitness, context_vectors=self.context_vectors ) # Select the globally best context vector self.best_context_vector, self.best_fitness = self._get_global_best() self.best_context_vectors.append(self.best_context_vector.copy()) # Save the order of features considered in the random feature grouping self.best_feature_idxs = self.feature_idxs.copy() # Set the number of generations counter n_gen = 0 # Number of generations that the best fitness has not improved stagnation_counter = 0 # Initialize the optimization progress bar progress_bar = tqdm(total=self.conf["coevolution"]["max_gen"], desc="Generations", leave=False) # Iterate up to the maximum number of generations while n_gen <= self.conf["coevolution"]["max_gen"]: # Append current best fitness self.convergence_curve.append(self.best_fitness) # Evolve each subpopulation using a genetic algorithm current_subpops = list() for i in range(self.n_subcomps): current_subpop = self.optimizers[i].evolve( subpop=self.subpops[i], fitness=self.fitness[i] ) current_subpops.append(current_subpop) # Evaluate each individual of the evolved subpopulations current_fitness = list() current_context_vectors = list() for i in range(self.n_subcomps): current_fitness.append(list()) current_context_vectors.append(list()) # Use best individuals from the previous generation (`self.current_best`) as # collaborators for each individual in the current generation after evolve # (`current_subpops`) for j in range(self.subpop_sizes[i]): collaborators = self.best_collaborator.get_collaborators( subpop_idx=i, indiv_idx=j, current_subpops=current_subpops, current_best=self.current_best ) context_vector = self.best_collaborator.build_context_vector(collaborators) # Update the context vector current_context_vectors[i].append(context_vector.copy()) # Update fitness current_fitness[i].append(self.fitness_function.evaluate(context_vector, self.data)) # Update subpopulations, context vectors and evaluations self.subpops = copy.deepcopy(current_subpops) self.fitness = copy.deepcopy(current_fitness) self.context_vectors = copy.deepcopy(current_context_vectors) del current_subpops, current_fitness, current_context_vectors gc.collect() # Get the best individual and context vector from each subpopulation self.current_best = self._get_best_individuals( subpops=self.subpops, fitness=self.fitness, context_vectors=self.context_vectors ) # Select the globally best context vector best_context_vector, best_fitness = self._get_global_best() # Update best context vector if self.best_fitness < best_fitness: # Reset stagnation counter because best fitness has improved stagnation_counter = 0 # Enable logger if specified logging.getLogger().disabled = False if self.verbose else True # Current fitness current_best_fitness = round(self.best_fitness, 4) # New fitness new_best_fitness = round(best_fitness, 4) # Show improvement logging.info( f"\nUpdate fitness from {current_best_fitness} to {new_best_fitness}.\n" ) # Update best context vector self.best_context_vector = best_context_vector.copy() self.best_context_vectors.append(self.best_context_vector.copy()) # Update best fitness self.best_fitness = best_fitness else: # Increase stagnation counter because best fitness has not improved stagnation_counter += 1 # Checks whether the optimization has been stagnant for a long time if stagnation_counter >= self.conf["coevolution"]["max_gen_without_improvement"]: # Enable logger logging.getLogger().disabled = False logging.info( "\nEarly stopping because fitness has been stagnant for " f"{stagnation_counter} generations in a row." ) break # Increase number of generations n_gen += 1 # Update progress bar progress_bar.update(1) # Close progress bar after optimization progress_bar.close()