import time
import logging
import fastcluster
import numpy as np
import pandas as pd
from tqdm import tqdm
from kneed import KneeLocator
from sklearn.cluster import KMeans
from scipy.cluster.hierarchy import fcluster
from sklearn.cross_decomposition import PLSRegression
from sklearn.metrics import balanced_accuracy_score, silhouette_score
from ..projection.vip import VIP
from ..coevolution.ccga import CCGA
from ..projection.cipls import CIPLS
from ..utils.datasets import DataLoader
from ..utils.memory import force_memory_release
from ..decomposition.ranking import RankingFeatureGrouping
from ..decomposition.clustering import ClusteringFeatureGrouping
[docs]
class CCPSTFG(CCGA):
"""Cooperative Co-Evolutionary Algorithm with Projection-based Self-Tuning Feature Grouping (CCPSTFG).
Attributes
----------
n_components : int
Number of components to keep after dimensionality reduction.
method : str
Projection-based decomposition method. It can be 'clustering', 'elitist' and 'distributed'.
vip_threshold : float
All features whose importance is less than or equal to this threshold will be removed.
removed_features : np.ndarray
Indexes of features that were removed due to their low importance.
"""
CLUSTERING_METHODS = {
"k_means": (KMeans, {}),
"agglomerative_clustering": (
fastcluster.linkage, {"method": "ward", "metric": "euclidean"}
)
}
def __init__(self, data: DataLoader, conf: dict, verbose: bool = True):
"""Initialize the CCPSTFG algorithm."""
self.clustering_model_type = conf["decomposition"].get("clustering_model_type")
if self.clustering_model_type not in CCPSTFG.CLUSTERING_METHODS:
raise NotImplementedError(
f"The clustering model type '{self.clustering_model_type}' is not supported. "
f"Supported methods are: {list(CCPSTFG.CLUSTERING_METHODS.keys())}."
)
super().__init__(data, conf, verbose)
def _feature_clustering(self, projection_model) -> np.ndarray:
"""Cluster the features according to their contribution to the components of the
low-dimensional latent space.
Parameters
----------
projection_model : sklearn model object
Partial Least Squares regression model. It can be the traditional version (PLS) or the
Covariance-free version (CIPLS).
Returns
-------
feature_clusters : np.ndarray
Index of the cluster each feature belongs to.
"""
X_train_normalized = self.data.X_train - self.data.X_train.mean(axis=0)
y_train_encoded = pd.get_dummies(self.data.y_train).astype(np.int8)
projection_model.fit(X=X_train_normalized, Y=y_train_encoded)
# Get the loadings of features on PLS components
feature_loadings = abs(projection_model.x_loadings_)
linkage_matrix = None
# Precompute the linkage matrix to avoid redundant distance calculations
if self.clustering_model_type == "agglomerative_clustering":
logging.info("Computing linkage matrix using fastcluster...")
clustering_method, clustering_params = CCPSTFG.CLUSTERING_METHODS[self.clustering_model_type]
start_time = time.time()
linkage_matrix = clustering_method(feature_loadings, **clustering_params)
logging.info(f"Done. Elapsed time: {time.time() - start_time:.2f}s.")
if self.conf["coevolution"].get("n_subcomps"):
self.n_subcomps = self.conf["coevolution"]["n_subcomps"]
logging.info(f"User-defined number of subcomponents: {self.n_subcomps}")
else:
logging.info("Automatically choosing the number of subcomponents...")
start_time = time.time()
self.n_subcomps = self._get_best_number_of_subcomponents(feature_loadings, linkage_matrix)
self._n_subcomponents_tuning_time = time.time() - start_time
# Update the subpopulation sizes after update the number of subcomponents
self.subpop_sizes = [self.subpop_sizes[0]] * self.n_subcomps
# Cluster features based on loadings.
# Loadings indicate how strongly each feature contributes to each component.
# Features with similar loadings on the same components are likely to be related.
if self.clustering_model_type == "agglomerative_clustering":
# Instead of fit and predict, simply prune the precomputed linkage matrix
feature_clusters = fcluster(linkage_matrix, t=self.n_subcomps, criterion="maxclust")
else:
clustering_method, clustering_params = CCPSTFG.CLUSTERING_METHODS[self.clustering_model_type]
clustering_model = clustering_method(n_clusters=self.n_subcomps, **clustering_params)
feature_clusters = clustering_model.fit_predict(feature_loadings)
return feature_clusters
def _get_best_number_of_components(self, projection_class):
"""Get the best number of components to keep by PLS decomposition.
The number of components will occur at the elbow point where the coefficient of
determination (r2-score) starts to level off.
Parameters
----------
projection_class : sklearn model class
Partial Least Squares regression class. It can be the traditional version (PLS) or the
Covariance-free version (CIPLS).
Returns
-------
n_components : int
Best number of components to keep after the decomposition into a latent space.
"""
task_type = self.conf["wrapper"]["task"]
max_n_pls_components = self.conf["decomposition"].get("max_n_pls_components", 30)
# Search space
n_components_range = range(2, min(max_n_pls_components, self.data.X_train.shape[1]))
logging.info(f"Search space (PLS components for {task_type}): {n_components_range}")
performance_values = list()
for n_components in n_components_range:
metric_folds = list()
for k in range(self.data.kfolds):
# Retrieve training and validation inputs and outputs
X_train_fold, y_train_fold, X_val_fold, y_val_fold = self.data.get_fold(k)
# Apply centering to both training and validation folds
X_train_fold_mean = X_train_fold.mean(axis=0)
X_train_fold_centered = X_train_fold - X_train_fold_mean
X_val_fold_centered = X_val_fold - X_train_fold_mean # Use train mean on val data
# Handle multiclass classification target encoding (if needed)
if (task_type.lower() == "classification") and (len(np.unique(y_train_fold)) > 2):
y_train_pls = pd.get_dummies(y_train_fold).astype(np.int8)
else:
y_train_pls = y_train_fold
# Fit projection model
projection_model = projection_class(n_components=n_components, copy=True)
projection_model.fit(X_train_fold_centered, y_train_pls)
if task_type.lower() == "regression":
r2_score = projection_model.score(X_val_fold_centered, y_val_fold)
metric_folds.append(r2_score)
elif task_type.lower() == "classification":
# Transform data to the PLS space
X_train_projected = projection_model.transform(X_train_fold_centered)
X_val_projected = projection_model.transform(X_val_fold_centered)
# Fit classifier on the projected training data
model = self.fitness_function.evaluator.base_model.clone()
model.estimator.fit(X_train_projected, y_train_fold)
# Predict and evaluate on the projected validation data
y_pred_val = model.estimator.predict(X_val_projected)
accuracy = balanced_accuracy_score(y_val_fold, y_pred_val)
metric_folds.append(accuracy)
del model, X_train_projected, X_val_projected, y_pred_val
performance_values.append(np.mean(metric_folds))
del metric_folds, projection_model, X_train_fold_centered, X_val_fold_centered
force_memory_release()
logging.info(
[
f"{n_components}: {performance_value:.4f}"
for n_components, performance_value in zip(n_components_range, performance_values)
]
)
# Use kneed to find the knee/elbow point
kneedle = KneeLocator(
x=n_components_range,
y=performance_values,
curve="concave",
direction="increasing"
)
n_components = kneedle.knee or n_components_range.start
logging.info(f"Optimized number of PLS components: {n_components}.")
return n_components
def _get_best_number_of_subcomponents(
self,
feature_loadings: np.ndarray,
linkage_matrix: np.ndarray = None
) -> int:
"""Get the best number of subcomponents.
The number of subcomponents (clusters) will be the one that maximizes the silhouette
coefficient of the clustering of X loadings.
Parameters
----------
feature_loadings : np.ndarray
The absolute importance of terms to components.
linkage_matrix : np.ndarray, optional
The linkage matrix used for hierarchical clustering.
Returns
-------
n_subcomps : int
Best number of subcomponents to decompose the original problem.
"""
max_n_clusters = self.conf["decomposition"].get("max_n_clusters", 10)
n_features = feature_loadings.shape[0]
n_clusters_range = range(2, min(max_n_clusters, n_features))
logging.info(f"Search space (clusters): {n_clusters_range}")
silhouette_scores = list()
for n_clusters in n_clusters_range:
if linkage_matrix is not None:
cluster_labels = fcluster(linkage_matrix, t=n_clusters, criterion="maxclust")
else:
clustering_method, clustering_params = CCPSTFG.CLUSTERING_METHODS[self.clustering_model_type]
clustering_model = clustering_method(n_clusters=n_clusters, **clustering_params)
cluster_labels = clustering_model.fit_predict(feature_loadings)
del clustering_model
force_memory_release()
silhouette_avg = silhouette_score(
X=feature_loadings,
labels=cluster_labels,
sample_size=min(n_features, 10000) # Use sampling to maintain silhouette computation feasible for large datasets
)
silhouette_scores.append(silhouette_avg)
# Get the number of subcomponents that maximizes the silhouette score
best_idx = np.argmax(silhouette_scores)
n_subcomps = n_clusters_range[best_idx]
logging.info(f"Optimized number of subcomponents: {n_subcomps}.")
return n_subcomps
def _get_best_quantile_to_remove(self, importances: np.ndarray) -> float:
"""Get the best quantile of the feature importance distribution to remove weak features.
The best quantile will be the one that, when its features are removed from a context
vector filled with 1's (selecting all remaining features), gives the best fitness value.
Parameters
----------
importances : np.ndarray (n_features,)
Importance of each feature based on its contribution to yield the latent space.
Returns
-------
best_quantile : float
Best quantile. All features that have their importance in this quantile of the
feature importance distribution will be removed.
"""
metrics = dict()
max_removal_quantile = self.conf["decomposition"].get("max_removal_quantile", 0.50)
remove_quantile_step_size = self.conf["decomposition"].get("removal_quantile_step_size", 0.05)
quantiles = np.arange(
start=0,
stop=(max_removal_quantile+remove_quantile_step_size),
step=remove_quantile_step_size
).round(2)
logging.info(f"Search space (quantile): {quantiles}")
for quantile in quantiles:
vip_threshold = round(np.quantile(importances, quantile), 4)
features_to_keep = importances > vip_threshold
# Build context vector with the remaining features
metrics[quantile] = self.fitness_function.evaluate(features_to_keep, self.data)
logging.getLogger().disabled = False
del features_to_keep
force_memory_release()
# Get the quantile that gives the best fitness value
metrics = pd.DataFrame(list(metrics.items()), columns=["quantile", "fitness"])
best_quantile = metrics.loc[metrics["fitness"].idxmax(), "quantile"]
logging.info(f"Best quantile: {best_quantile}.")
return best_quantile
def _remove_unimportant_features(
self,
importances: np.ndarray,
) -> np.ndarray:
"""Remove irrelevant or weaken features from folds and subsets.
Parameters
----------
importances : np.ndarray
Importance of each feature based on its contribution to yield the latent space.
Returns
-------
importances : np.ndarray
Importance of the remaining features.
"""
start_time = time.time()
self.quantile_to_remove = self._get_best_quantile_to_remove(importances)
self._quantile_tuning_time = time.time() - start_time
self.vip_threshold = round(np.quantile(importances, self.quantile_to_remove), 4)
logging.info(f"Removing features with VIP less than or equal to {self.vip_threshold}...")
features_to_keep = importances > self.vip_threshold
self.removed_features = np.where(features_to_keep == False)[0]
logging.info(f"{len(self.removed_features)} features were removed.")
logging.info(f"Remaining features: {np.sum(features_to_keep)}.")
# Removing features from subsets and folds
self.data.X_train = self.data.X_train[:, features_to_keep]
self.data.X_test = self.data.X_test[:, features_to_keep]
if hasattr(self.data, "_raw_X_train"):
self.data._raw_X_train = self.data._raw_X_train[:, features_to_keep]
# Importance of the remaining features
importances = importances[features_to_keep]
return importances
def _compute_variable_importances(self, projection_model) -> np.ndarray:
"""Compute variable importance in projection (VIP).
Parameters
----------
projection_model : sklearn model object
Partial Least Squares regression model. It can be the traditional version (PLS) or the
Covariance-free version (CIPLS).
Returns
-------
: np.ndarray (n_features,)
Importance of each feature based on its contribution to yield the latent space.
"""
X_train_normalized = self.data.X_train - self.data.X_train.mean(axis=0)
y_train_encoded = pd.get_dummies(self.data.y_train).astype(np.int8)
projection_model.fit(X=X_train_normalized, Y=y_train_encoded)
vip = VIP(model=projection_model)
vip.compute()
return vip.importances
def _init_decomposer(self) -> None:
"""Instantiate feature grouping method."""
# Method used to distribute features into subcomponents
self.method = self.conf["decomposition"]["method"]
logging.info(f"Decomposition approach: {self.method}.")
# Define projection model according to the number of features
high_dim = self.data.n_features > 100000
# TODO CIPLS breaks when running for too many dimensions
projection_class = CIPLS if high_dim else PLSRegression
if self.conf["decomposition"].get("n_components"):
self.n_components = self.conf["decomposition"]["n_components"]
logging.info(f"User-defined number of PLS components: {self.n_components}")
else:
logging.info("Automatically choosing the number of PLS components...")
start_time = time.time()
# Get the best number of components to keep after projection using the Elbow method
self.n_components = self._get_best_number_of_components(projection_class)
self._n_components_tuning_time = time.time() - start_time
# Instantiate projection model object
projection_model = projection_class(n_components=self.n_components, copy=True)
# Compute feature importances
importances = self._compute_variable_importances(projection_model=projection_model)
# Remove irrelevant or weaken relevant features
if self.conf["decomposition"].get("drop", False):
importances = self._remove_unimportant_features(importances)
# Instantiate feature grouping
if self.method == "clustering":
logging.info(f"Clustering model type: {self.clustering_model_type}")
feature_clusters = self._feature_clustering(projection_model=projection_model)
self.decomposer = ClusteringFeatureGrouping(
n_subcomps=self.n_subcomps,
clusters=feature_clusters
)
else:
# Ranking feature grouping using variable importances as scores
self.decomposer = RankingFeatureGrouping(
n_subcomps=self.n_subcomps,
subcomp_sizes=self.subcomp_sizes,
scores=importances,
method=self.method,
ascending=False
)
self.feature_importances = importances
self._tuning_time = (
getattr(self, "_n_components_tuning_time", 0)
+ getattr(self, "_quantile_tuning_time", 0)
+ getattr(self, "_n_subcomponents_tuning_time", 0)
)
def _allocate_subproblem_resources(self) -> None:
"""Allocate resources to subproblems based on feature importances and subcomponent sizes."""
# Compute cumulative sum of subcomponent sizes and remove the last element to use as split indices
indices = np.cumsum(self.subcomp_sizes)[:-1]
# Split the feature importances array into subcomponents based on the indices
importances = np.split(self.feature_importances, indices)
# Calculate the average importance of each subcomponent
subcomp_importances = np.array([np.mean(subcomp) for subcomp in importances])
# Normalize the subcomponent importances
normalized_subcomp_importances = subcomp_importances / np.sum(subcomp_importances)
logging.info(f"Subcomponent importances: {normalized_subcomp_importances}")
# Calculate the normalized subcomponent sizes
normalized_subcomp_sizes = np.array(self.subcomp_sizes) / np.sum(self.subcomp_sizes)
logging.info(f"Normalized subcomponent sizes: {normalized_subcomp_sizes}")
# Calculate the allocation factor
allocation_factor = normalized_subcomp_importances * normalized_subcomp_sizes
# Normalize the allocation factor
normalized_allocation_factor = allocation_factor / np.sum(allocation_factor)
# Update the subpopulation sizes based on the normalized allocation factor
logging.info(f"Subpopulation sizes in Round-Robin strategy: {self.subpop_sizes}")
self.subpop_sizes = np.round(normalized_allocation_factor * sum(self.subpop_sizes)).astype(np.int32)
logging.info(f"Subpopulation sizes after resource allocation: {self.subpop_sizes}")
[docs]
def optimize(self) -> None:
"""Solve the feature selection problem through optimization."""
# Decompose problem
self._problem_decomposition()
if self.conf["coevolution"].get("optimized_resource_allocation", False):
logging.info("Optimizing resource allocation...")
self._allocate_subproblem_resources()
self._log_memory_usage(stage="After problem decomposition")
# Initialize subpopulations
self._init_subpopulations()
self._log_memory_usage(stage="After subpopulation initialization")
# Instantiate optimizers
self._init_optimizers()
# Get the best individual and context vector from each subpopulation
self.current_best = self._get_best_individuals(
subpops=self.subpops,
fitness=self.fitness,
context_vectors=self.context_vectors
)
# Select the globally best context vector
self.best_context_vector, self.best_fitness = self._get_global_best()
self._record_best_context_vector(self.best_context_vector)
# Save the order of features considered in the random feature grouping
self.best_feature_idxs = self.feature_idxs.copy()
# Set the number of generations counter
n_gen = 0
# Number of generations that the best fitness has not improved
stagnation_counter = 0
# Initialize the optimization progress bar
progress_bar = tqdm(total=self.conf["coevolution"]["max_gen"],
desc="Generations",
leave=False)
# Iterate up to the maximum number of generations
while n_gen <= self.conf["coevolution"]["max_gen"]:
# Append current best fitness
self.convergence_curve.append(self.best_fitness)
# Evolve each subpopulation using a genetic algorithm
current_subpops = list()
for i in range(self.n_subcomps):
current_subpop = self.optimizers[i].evolve(
subpop=self.subpops[i],
fitness=self.fitness[i]
)
current_subpops.append(current_subpop)
self._log_memory_usage(stage="After subpopulation evolution", n_gen=n_gen)
# Evaluate each individual of the evolved subpopulations
current_fitness, current_best_context_vectors = self._evaluate_evolved_subpopulations(
collaborators_getter=lambda subpop_idx, indiv_idx: self.best_collaborator.get_collaborators(
subpop_idx=subpop_idx,
indiv_idx=indiv_idx,
current_subpops=current_subpops,
current_best=self.current_best
),
build_context_vector=self.best_collaborator.build_context_vector
)
# Update subpopulations, context vectors and evaluations
self.subpops = current_subpops
self.fitness = current_fitness
self.context_vectors = current_best_context_vectors
del current_subpops, current_fitness, current_best_context_vectors
force_memory_release()
self._log_memory_usage(stage="After subpopulation evaluation", n_gen=n_gen)
# Get the best individual and context vector from each subpopulation
self.current_best = self._get_best_individuals(
subpops=self.subpops,
fitness=self.fitness,
context_vectors=self.context_vectors
)
# Select the globally best context vector
best_context_vector, best_fitness = self._get_global_best()
# Update best context vector
if self.best_fitness < best_fitness:
# Reset stagnation counter because best fitness has improved
stagnation_counter = 0
# Enable logger if specified
logging.getLogger().disabled = False if self.verbose else True
# Current fitness
current_best_fitness = round(self.best_fitness, 4)
# New fitness
new_best_fitness = round(best_fitness, 4)
# Show improvement
logging.info(
f"\nUpdate fitness from {current_best_fitness} to {new_best_fitness}.\n"
)
# Update best context vector
self.best_context_vector = best_context_vector.copy()
self._record_best_context_vector(self.best_context_vector)
# Update best fitness
self.best_fitness = best_fitness
else:
# Increase stagnation counter because best fitness has not improved
stagnation_counter += 1
# Checks whether the optimization has been stagnant for a long time
if stagnation_counter >= self.conf["coevolution"]["max_gen_without_improvement"]:
# Enable logger
logging.getLogger().disabled = False
logging.info(
"\nEarly stopping because fitness has been stagnant for "
f"{stagnation_counter} generations in a row."
)
break
self._log_memory_usage(stage="After generation", n_gen=n_gen)
# Increase number of generations
n_gen += 1
# Update progress bar
progress_bar.update(1)
# Close progress bar after optimization
progress_bar.close()