Source code for pyccea.coevolution.ccpstfg

import gc
import copy
import time
import logging
import numpy as np
import pandas as pd
from tqdm import tqdm
from kneed import KneeLocator
from ..projection.vip import VIP
from ..coevolution.ccga import CCGA
from ..projection.cipls import CIPLS
from sklearn.metrics import silhouette_score
from sklearn.cross_decomposition import PLSRegression
from ..decomposition.ranking import RankingFeatureGrouping
from sklearn.cluster import KMeans, AgglomerativeClustering
from ..decomposition.clustering import ClusteringFeatureGrouping


[docs] class CCPSTFG(CCGA): """Cooperative Co-Evolutionary Algorithm with Projection-based Self-Tuning Feature Grouping (CCPSTFG). Attributes ---------- n_components : int Number of components to keep after dimensionality reduction. method : str Projection-based decomposition method. It can be 'clustering', 'elitist' and 'distributed'. vip_threshold : float All features whose importance is less than or equal to this threshold will be removed. removed_features : np.ndarray Indexes of features that were removed due to their low importance. """ CLUSTERING_METHODS = { "k_means": (KMeans, {}), "agglomerative_clustering": (AgglomerativeClustering, {"linkage": "ward", "metric": "euclidean"}) } def _feature_clustering(self, projection_model) -> np.ndarray: """Cluster the features according to their contribution to the components of the low-dimensional latent space. Parameters ---------- projection_model : sklearn model object Partial Least Squares regression model. It can be the traditional version (PLS) or the Covariance-free version (CIPLS). Returns ------- feature_clusters : np.ndarray Index of the cluster each feature belongs to. """ projection_model = copy.deepcopy(projection_model) X_train_normalized = self.data.X_train - self.data.X_train.mean(axis=0) y_train_encoded = pd.get_dummies(self.data.y_train).astype(int) projection_model.fit(X=X_train_normalized, Y=y_train_encoded) # Get the loadings of features on PLS components feature_loadings = abs(projection_model.x_loadings_) if self.conf["coevolution"].get("n_subcomps"): self.n_subcomps = self.conf["coevolution"]["n_subcomps"] logging.info(f"User-defined number of subcomponents: {self.n_subcomps}") else: logging.info("Automatically choosing the number of subcomponents...") start_time = time.time() self.n_subcomps = self._get_best_number_of_subcomponents(feature_loadings) self._n_subcomponents_tuning_time = time.time() - start_time # Update the subpopulation sizes after update the number of subcomponents self.subpop_sizes = [self.subpop_sizes[0]] * self.n_subcomps # Cluster features based on loadings. # Loadings indicate how strongly each feature contributes to each component. # Features with similar loadings on the same components are likely to be related. clustering_model_class, clustering_params = CCPSTFG.CLUSTERING_METHODS[self.clustering_model_type] clustering_model = clustering_model_class(n_clusters=self.n_subcomps, **clustering_params) feature_clusters = clustering_model.fit_predict(feature_loadings) return feature_clusters def _get_best_number_of_components( self, projection_class, X_train: np.ndarray, y_train: np.ndarray ): """Get the best number of components to keep by PLS decomposition. The number of components will occur at the elbow point where the coefficient of determination (r2-score) starts to level off. Parameters ---------- projection_class : sklearn model class Partial Least Squares regression class. It can be the traditional version (PLS) or the Covariance-free version (CIPLS). X_train : np.ndarray Train input data. y_train : np.ndarray Train output data. Returns ------- n_components : int Best number of components to keep after the decomposition into a latent space. """ max_n_pls_components = self.conf["decomposition"].get("max_n_pls_components", 30) n_components_range = range(2, min(max_n_pls_components, X_train.shape[1])) logging.info(f"Search space (PLS components): {n_components_range}") r_squared_values = list() for n_components in n_components_range: projection_model = projection_class(n_components=n_components, copy=True) X_train_normalized = X_train - X_train.mean(axis=0) y_train_encoded = pd.get_dummies(y_train).astype(int) projection_model.fit(X_train_normalized, y_train_encoded) # Sum the coefficient of determination of the prediction r_squared_values.append(np.sum(projection_model.score(X_train_normalized, y_train_encoded))) del projection_model gc.collect() # Use kneed to find the knee/elbow point kneedle = KneeLocator( n_components_range, r_squared_values, curve="concave", direction="increasing" ) n_components = kneedle.knee logging.info(f"Optimized number of PLS components: {n_components}.") return n_components def _get_best_number_of_subcomponents(self, feature_loadings: np.ndarray) -> int: """Get the best number of subcomponents. The number of subcomponents (clusters) will be the one that maximizes the silhouette coefficient of the clustering of X loadings. Parameters ---------- feature_loadings : np.ndarray The absolute importance of terms to components. Returns ------- n_subcomps : int Best number of subcomponents to decompose the original problem. """ max_n_clusters = self.conf["decomposition"].get("max_n_clusters", 10) n_clusters_range = range(2, min(max_n_clusters, feature_loadings.shape[0])) logging.info(f"Search space (clusters): {n_clusters_range}") silhouette_scores = list() for n_clusters in n_clusters_range: clustering_model_class, clustering_params = CCPSTFG.CLUSTERING_METHODS[self.clustering_model_type] clustering_model = clustering_model_class(n_clusters=n_clusters, **clustering_params) cluster_labels = clustering_model.fit_predict(feature_loadings) silhouette_avg = silhouette_score(feature_loadings, cluster_labels) silhouette_scores.append(silhouette_avg) silhouette_scores = pd.DataFrame( list(zip(n_clusters_range, silhouette_scores)), columns=["n_clusters", "silhouette_score"] ) n_subcomps = silhouette_scores.loc[ silhouette_scores["silhouette_score"].idxmax(), "n_clusters" ] logging.info(f"Optimized number of subcomponents: {n_subcomps}.") return n_subcomps def _get_best_quantile_to_remove(self, importances: np.ndarray) -> float: """ Gets the best quantile of the feature importance distribution to remove weak features. The best quantile will be the one that, when its features are removed from a context vector filled with 1's (selecting all remaining features), gives the best fitness value. Parameters ---------- importances : np.ndarray (n_features,) Importance of each feature based on its contribution to yield the latent space. Returns ------- best_quantile : float Best quantile. All features that have their importance in this quantile of the feature importance distribution will be removed. """ metrics = dict() max_removal_quantile = self.conf["decomposition"].get("max_removal_quantile", 0.50) remove_quantile_step_size = self.conf["decomposition"].get("removal_quantile_step_size", 0.05) quantiles = np.arange( start=0, stop=(max_removal_quantile+remove_quantile_step_size), step=remove_quantile_step_size ).round(2) logging.info(f"Search space (quantile): {quantiles}") for quantile in quantiles: data_q = copy.deepcopy(self.data) vip_threshold = round(np.quantile(importances, quantile), 4) features_to_keep = importances > vip_threshold # Removing features from subsets and folds data_q.X_train = data_q.X_train[:, features_to_keep].copy() data_q.X_test = data_q.X_test[:, features_to_keep].copy() for k in range(data_q.kfolds): data_q.train_folds[k][0] = data_q.train_folds[k][0][:, features_to_keep].copy() data_q.val_folds[k][0] = data_q.val_folds[k][0][:, features_to_keep].copy() # Build context vector with all remaining features context_vector = np.ones((data_q.X_train.shape[1],)).astype(bool) metrics[quantile] = self.fitness_function.evaluate(context_vector, data_q) logging.getLogger().disabled = False # Get the quantile that gives the best fitness value metrics = pd.DataFrame(list(metrics.items()), columns=["quantile", "fitness"]) best_quantile = metrics.loc[metrics["fitness"].idxmax(), "quantile"] logging.info(f"Best quantile: {best_quantile}.") return best_quantile def _remove_unimportant_features( self, importances: np.ndarray, ) -> np.ndarray: """Remove irrelevant or weaken features from folds and subsets. Parameters ---------- importances : np.ndarray Importance of each feature based on its contribution to yield the latent space. Returns ------- importances : np.ndarray Importance of the remaining features. """ start_time = time.time() self.quantile_to_remove = self._get_best_quantile_to_remove(importances) self._quantile_tuning_time = time.time() - start_time self.vip_threshold = round(np.quantile(importances, self.quantile_to_remove), 4) logging.info(f"Removing features with VIP less than or equal to {self.vip_threshold}...") features_to_keep = importances > self.vip_threshold self.removed_features = np.where(features_to_keep == False)[0] logging.info(f"{len(self.removed_features)} features were removed.") # Removing features from subsets and folds self.data.X_train = self.data.X_train[:, features_to_keep].copy() self.data.X_test = self.data.X_test[:, features_to_keep].copy() for k in range(self.data.kfolds): self.data.train_folds[k][0] = self.data.train_folds[k][0][:, features_to_keep].copy() self.data.val_folds[k][0] = self.data.val_folds[k][0][:, features_to_keep].copy() # Importance of the remaining features importances = importances[features_to_keep].copy() return importances def _compute_variable_importances(self, projection_model) -> np.ndarray: """Compute variable importance in projection (VIP). Parameters ---------- projection_model : sklearn model object Partial Least Squares regression model. It can be the traditional version (PLS) or the Covariance-free version (CIPLS). Returns ------- importances : np.ndarray (n_features,) Importance of each feature based on its contribution to yield the latent space. """ projection_model = copy.deepcopy(projection_model) X_train_normalized = self.data.X_train - self.data.X_train.mean(axis=0) y_train_encoded = pd.get_dummies(self.data.y_train).astype(int) projection_model.fit(X=X_train_normalized, Y=y_train_encoded) vip = VIP(model=projection_model) vip.compute() importances = vip.importances.copy() return importances def _init_decomposer(self) -> None: """Instantiate feature grouping method.""" # Method used to distribute features into subcomponents self.method = self.conf["decomposition"]["method"] logging.info(f"Decomposition approach: {self.method}.") # Define projection model according to the number of features high_dim = self.data.n_features > 100000 # TODO CIPLS breaks when running for too many dimensions projection_class = CIPLS if high_dim else PLSRegression if self.conf["decomposition"].get("n_components"): self.n_components = self.conf["decomposition"]["n_components"] logging.info(f"User-defined number of PLS components: {self.n_components}") else: logging.info("Automatically choosing the number of PLS components...") start_time = time.time() # Get the best number of components to keep after projection using the Elbow method self.n_components = self._get_best_number_of_components( projection_class, self.data.X_train, self.data.y_train ) self._n_components_tuning_time = time.time() - start_time # Instantiate projection model object projection_model = projection_class(n_components=self.n_components, copy=True) # Compute feature importances importances = self._compute_variable_importances(projection_model=projection_model) # Remove irrelevant or weaken relevant features if self.conf["decomposition"].get("drop", False): importances = self._remove_unimportant_features(importances) # Instantiate feature grouping if self.method == "clustering": self.clustering_model_type = self.conf["decomposition"]["clustering_model_type"] logging.info(f"Clustering model type: {self.clustering_model_type}") feature_clusters = self._feature_clustering(projection_model=projection_model) self.decomposer = ClusteringFeatureGrouping( n_subcomps=self.n_subcomps, clusters=feature_clusters ) else: # Ranking feature grouping using variable importances as scores self.decomposer = RankingFeatureGrouping( n_subcomps=self.n_subcomps, subcomp_sizes=self.subcomp_sizes, scores=importances, method=self.method, ascending=False ) self.feature_importances = importances.copy() self._tuning_time = ( getattr(self, "_n_components_tuning_time", 0) + getattr(self, "_quantile_tuning_time", 0) + getattr(self, "_n_subcomponents_tuning_time", 0) ) def _allocate_subproblem_resources(self) -> None: """Allocate resources to subproblems based on feature importances and subcomponent sizes.""" # Compute cumulative sum of subcomponent sizes and remove the last element to use as split indices indices = np.cumsum(self.subcomp_sizes)[:-1] # Split the feature importances array into subcomponents based on the indices importances = np.split(self.feature_importances, indices) # Calculate the average importance of each subcomponent subcomp_importances = np.array([np.mean(subcomp) for subcomp in importances]) # Normalize the subcomponent importances normalized_subcomp_importances = subcomp_importances / np.sum(subcomp_importances) logging.info(f"Subcomponent importances: {normalized_subcomp_importances}") # Calculate the normalized subcomponent sizes normalized_subcomp_sizes = np.array(self.subcomp_sizes) / np.sum(self.subcomp_sizes) logging.info(f"Normalized subcomponent sizes: {normalized_subcomp_sizes}") # Calculate the allocation factor allocation_factor = normalized_subcomp_importances * normalized_subcomp_sizes # Normalize the allocation factor normalized_allocation_factor = allocation_factor / np.sum(allocation_factor) # Update the subpopulation sizes based on the normalized allocation factor logging.info(f"Subpopulation sizes in Round-Robin strategy: {self.subpop_sizes}") self.subpop_sizes = np.round(normalized_allocation_factor * sum(self.subpop_sizes)).astype(int) logging.info(f"Subpopulation sizes after resource allocation: {self.subpop_sizes}")
[docs] def optimize(self) -> None: """Solve the feature selection problem through optimization.""" # Decompose problem self._problem_decomposition() if self.conf["coevolution"].get("optimized_resource_allocation", False): logging.info("Optimizing resource allocation...") self._allocate_subproblem_resources() # Initialize subpopulations self._init_subpopulations() # Instantiate optimizers self._init_optimizers() # Get the best individual and context vector from each subpopulation self.current_best = self._get_best_individuals( subpops=self.subpops, fitness=self.fitness, context_vectors=self.context_vectors ) # Select the globally best context vector self.best_context_vector, self.best_fitness = self._get_global_best() self.best_context_vectors.append(self.best_context_vector.copy()) # Save the order of features considered in the random feature grouping self.best_feature_idxs = self.feature_idxs.copy() # Set the number of generations counter n_gen = 0 # Number of generations that the best fitness has not improved stagnation_counter = 0 # Initialize the optimization progress bar progress_bar = tqdm(total=self.conf["coevolution"]["max_gen"], desc="Generations", leave=False) # Iterate up to the maximum number of generations while n_gen <= self.conf["coevolution"]["max_gen"]: # Append current best fitness self.convergence_curve.append(self.best_fitness) # Evolve each subpopulation using a genetic algorithm current_subpops = list() for i in range(self.n_subcomps): current_subpop = self.optimizers[i].evolve( subpop=self.subpops[i], fitness=self.fitness[i] ) current_subpops.append(current_subpop) # Evaluate each individual of the evolved subpopulations current_fitness = list() current_context_vectors = list() for i in range(self.n_subcomps): current_fitness.append(list()) current_context_vectors.append(list()) # Use best individuals from the previous generation (`self.current_best`) as # collaborators for each individual in the current generation after evolve # (`current_subpops`) for j in range(self.subpop_sizes[i]): collaborators = self.best_collaborator.get_collaborators( subpop_idx=i, indiv_idx=j, current_subpops=current_subpops, current_best=self.current_best ) context_vector = self.best_collaborator.build_context_vector(collaborators) # Update the context vector current_context_vectors[i].append(context_vector.copy()) # Update fitness current_fitness[i].append(self.fitness_function.evaluate(context_vector, self.data)) # Update subpopulations, context vectors and evaluations self.subpops = copy.deepcopy(current_subpops) self.fitness = copy.deepcopy(current_fitness) self.context_vectors = copy.deepcopy(current_context_vectors) del current_subpops, current_fitness, current_context_vectors gc.collect() # Get the best individual and context vector from each subpopulation self.current_best = self._get_best_individuals( subpops=self.subpops, fitness=self.fitness, context_vectors=self.context_vectors ) # Select the globally best context vector best_context_vector, best_fitness = self._get_global_best() # Update best context vector if self.best_fitness < best_fitness: # Reset stagnation counter because best fitness has improved stagnation_counter = 0 # Enable logger if specified logging.getLogger().disabled = False if self.verbose else True # Current fitness current_best_fitness = round(self.best_fitness, 4) # New fitness new_best_fitness = round(best_fitness, 4) # Show improvement logging.info( f"\nUpdate fitness from {current_best_fitness} to {new_best_fitness}.\n" ) # Update best context vector self.best_context_vector = best_context_vector.copy() self.best_context_vectors.append(self.best_context_vector.copy()) # Update best fitness self.best_fitness = best_fitness else: # Increase stagnation counter because best fitness has not improved stagnation_counter += 1 # Checks whether the optimization has been stagnant for a long time if stagnation_counter >= self.conf["coevolution"]["max_gen_without_improvement"]: # Enable logger logging.getLogger().disabled = False logging.info( "\nEarly stopping because fitness has been stagnant for " f"{stagnation_counter} generations in a row." ) break # Increase number of generations n_gen += 1 # Update progress bar progress_bar.update(1) # Close progress bar after optimization progress_bar.close()