import gc
import copy
import logging
from tqdm import tqdm
from ..coevolution.ccga import CCGA
from ..decomposition.static import SequentialFeatureGrouping
[docs]
class CCEAFS(CCGA):
"""Cooperative Co-Evolutionary-Based Feature Selection (CCEAFS).
Rashid, A. N. M., et al. "A novel penalty-based wrapper objective function for feature
selection in Big Data using cooperative co-evolution." IEEE Access 8 (2020): 150113-150129.
"""
def _init_decomposer(self):
"""Instantiate feature grouping method."""
self.decomposer = SequentialFeatureGrouping(
n_subcomps=self.n_subcomps,
subcomp_sizes=self.subcomp_sizes
)
[docs]
def optimize(self):
"""Solve the feature selection problem through optimization."""
# Decompose problem
self._problem_decomposition()
# Initialize subpopulations
self._init_subpopulations()
# Instantiate optimizers
self._init_optimizers()
# Get the best individual and context vector from each subpopulation
self.current_best = self._get_best_individuals(
subpops=self.subpops,
fitness=self.fitness,
context_vectors=self.context_vectors
)
# Select the globally best context vector
self.best_context_vector, self.best_fitness = self._get_global_best()
self.best_context_vectors.append(self.best_context_vector.copy())
# Save the order of features considered in the random feature grouping
self.best_feature_idxs = self.feature_idxs.copy()
# Set the number of generations counter
n_gen = 0
# Number of generations that the best fitness has not improved
stagnation_counter = 0
# Initialize the optimization progress bar
progress_bar = tqdm(total=self.conf["coevolution"]["max_gen"],
desc="Generations",
leave=False)
# Iterate up to the maximum number of generations
while n_gen <= self.conf["coevolution"]["max_gen"]:
# Append current best fitness
self.convergence_curve.append(self.best_fitness)
# Evolve each subpopulation using a genetic algorithm
current_subpops = list()
for i in range(self.n_subcomps):
current_subpop = self.optimizers[i].evolve(
subpop=self.subpops[i],
fitness=self.fitness[i]
)
current_subpops.append(current_subpop)
# Evaluate each individual of the evolved subpopulations
current_fitness = list()
current_context_vectors = list()
for i in range(self.n_subcomps):
current_fitness.append(list())
current_context_vectors.append(list())
# Use best individuals from the previous generation (`self.current_best`) as
# collaborators for each individual in the current generation after evolve
# (`current_subpops`)
for j in range(self.subpop_sizes[i]):
collaborators = self.best_collaborator.get_collaborators(
subpop_idx=i,
indiv_idx=j,
current_subpops=current_subpops,
current_best=self.current_best
)
context_vector = self.best_collaborator.build_context_vector(collaborators)
# Update the context vector
current_context_vectors[i].append(context_vector.copy())
# Update fitness
current_fitness[i].append(self.fitness_function.evaluate(context_vector, self.data))
# Update subpopulations, context vectors and evaluations
self.subpops = copy.deepcopy(current_subpops)
self.fitness = copy.deepcopy(current_fitness)
self.context_vectors = copy.deepcopy(current_context_vectors)
del current_subpops, current_fitness, current_context_vectors
gc.collect()
# Get the best individual and context vector from each subpopulation
self.current_best = self._get_best_individuals(
subpops=self.subpops,
fitness=self.fitness,
context_vectors=self.context_vectors
)
# Select the globally best context vector
best_context_vector, best_fitness = self._get_global_best()
# Update best context vector
if self.best_fitness < best_fitness:
# Reset stagnation counter because best fitness has improved
stagnation_counter = 0
# Enable logger if specified
logging.getLogger().disabled = False if self.verbose else True
# Objective weight
w1 = self.conf["evaluation"]["weights"][0]
# Penalty weight
w2 = self.conf["evaluation"]["weights"][1]
# Current fitness, performance evaluation and penalty
current_best_fitness = round(self.best_fitness, 4)
current_penalty = round(self.best_context_vector.sum()/self.data.n_features, 4)
current_eval = round((self.best_fitness + w2*current_penalty)/w1, 4)
# New fitness, performance evaluation and penalty
new_best_fitness = round(best_fitness, 4)
new_penalty = round(best_context_vector.sum()/self.data.n_features, 4)
new_eval = round((best_fitness + w2*new_penalty)/w1, 4)
# Show improvement
logging.info(
f"\nUpdate fitness from {current_best_fitness} to {new_best_fitness}.\n"
f"Update predictive performance from {current_eval} to {new_eval}.\n"
f"Update penalty from {current_penalty} to {new_penalty}.\n"
)
# Update best context vector
self.best_context_vector = best_context_vector.copy()
self.best_context_vectors.append(self.best_context_vector.copy())
# Update best fitness
self.best_fitness = best_fitness
else:
# Increase stagnation counter because best fitness has not improved
stagnation_counter += 1
# Checks whether the optimization has been stagnant for a long time
if stagnation_counter >= self.conf["coevolution"]["max_gen_without_improvement"]:
# Enable logger
logging.getLogger().disabled = False
logging.info(
"\nEarly stopping because fitness has been stagnant for "
f"{stagnation_counter} generations in a row."
)
break
# Increase number of generations
n_gen += 1
# Update progress bar
progress_bar.update(1)
# Close progress bar after optimization
progress_bar.close()