import os
import toml
import copy
import logging
import warnings
import numpy as np
import pandas as pd
import importlib.resources
from typing import Tuple
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import GroupKFold, KFold, LeaveOneOut, StratifiedKFold
[docs]
class DataLoader():
"""Load dataset and preprocess it to train machine learning algorithms.
Attributes
----------
data : pd.DataFrame
Raw dataset.
X : pd.DataFrame
Raw input data.
y : pd.Series
Raw output data.
X_train : np.ndarray
Train input data.
X_test : np.ndarray
Test input data.
y_train : np.ndarray
Train output data.
y_test : np.ndarray
Test output data.
n_examples : int
Total number of examples.
n_features : int
Number of features in the dataset.
n_classes : int
Number of classes.
classes : np.ndarray
Class identifiers.
train_size : int
Number of examples in the training set.
test_size : int
Number of examples in the test set.
seed : int, default None
It controls the randomness of the data split.
preset : bool, default False
In some works, the training and testing sets have already been defined. To use them, just
set this boolean variable to True.
test_ratio : float
Proportion of the dataset to include in the test set. It should be between 0 and 1.
splitter_type : str
Model selection strategy. It can be "k_fold" or "leave_one_out".
kfolds : int or None
Number of folds in the k-fold cross validation or in the leave-one-out cross validation.
stratified : bool, default False
If True, the folds are made by preserving the percentage of examples for each class. It is
only used in case 'splitter_type' parameter is set to 'k_fold'.
normalize : bool, default False
If True, normalizes training and test sets generated by the split method.
"""
# Class parameters
SPLITTER_TYPES = ["k_fold", "leave_one_out"]
PRIMARY_CONF_KEYS = ["general", "splitter", "normalization"]
NORMALIZATION_METHODS = {"min_max": MinMaxScaler, "standard": StandardScaler}
with importlib.resources.open_text("pyccea.parameters", "datasets.toml") as toml_file:
DATASETS = toml.load(toml_file)
def __init__(self, dataset: str, conf: dict):
"""
Parameters
----------
dataset : str
Name of the dataset that will be loaded and processed.
conf : dict
Configuration parameters of the dataloader.
"""
self.dataset = dataset
self.conf = conf
# Check if the data configuration file passed as parameter is valid
for primary_key in DataLoader.PRIMARY_CONF_KEYS:
if primary_key not in self.conf:
raise AssertionError(
f"The '{primary_key}' section should be specified in the data configuration "
"file."
)
# Initialize logger with info level
if self.conf["general"].get("verbose", True):
logging.basicConfig(encoding="utf-8", level=logging.INFO)
# Parse parameters
self._parse_parameters()
def _parse_general_parameters(self) -> None:
"""Parse parameters from the general section of the data configuration file."""
if "splitter_type" not in self.conf["general"]:
raise AssertionError(
"The 'splitter_type' parameter should be specified in the general section of the "
"data configuration file."
)
self.splitter_type = self.conf["general"]["splitter_type"]
if self.splitter_type not in DataLoader.SPLITTER_TYPES:
raise NotImplementedError(
f"The splitter type '{self.splitter_type}' is not implemented."
)
self.seed = self.conf["general"].get("seed")
self.verbose = self.conf["general"].get("verbose", True)
def _parse_splitter_parameters(self) -> None:
"""Parse parameters from the splitter section of the data configuration file."""
if self.splitter_type == "k_fold":
if ("kfolds" not in self.conf["splitter"]) and (self.conf["splitter"].get("prefold", False) is False):
raise AssertionError(
"The parameter 'kfolds' should be specified in the splitter section of the "
"data configuration file when 'splitter_type' is set to 'k_fold' and 'prefold' "
"is set to False or is not defined in the splitter section."
)
self.kfolds = self.conf["splitter"].get("kfolds")
self.stratified = self.conf["splitter"].get("stratified", False)
if self.splitter_type == "leave_one_out":
if "kfolds" in self.conf["splitter"]:
warnings.warn(
"You specified the number of folds using Leave-One-Out (LOO). However, LOO is"
" equivalent to K-Fold when K is equal to the number of examples. Therefore, "
"the value of 'kfolds' parameter will be ignored in this case.",
UserWarning
)
if "stratified" in self.conf["splitter"]:
warnings.warn(
"You specified the 'stratified' parameter using Leave-One-Out (LOO). However,"
" the validation folds made by the LOO have only one sample. Therefore, the "
"value of 'stratified' parameter will be ignored in this case.",
UserWarning
)
if self.conf["splitter"].get("prefold"):
warnings.warn(
"You specified the 'prefold' parameter using Leave-One-Out (LOO). However, "
"the validation folds made by the LOO have only one sample. Therefore, the "
"value of 'prefold' parameter will be ignored in this case.",
UserWarning
)
self.preset = self.conf["splitter"].get("preset", False)
if self.preset:
if self.conf["splitter"].get("test_ratio") is not None:
logging.info(
"After setting both the 'preset' and 'test_ratio' parameters, the predefined "
"subsets will take precedence, rendering the 'test_ratio' parameter unused."
)
self.test_ratio = None
else:
self.test_ratio = self.conf["splitter"].get("test_ratio")
if self.test_ratio is None:
raise ValueError(
"The 'test_ratio' parameter should be specified in the splitter section of "
"the data configuration file when the 'preset' parameter is set to False or "
"is not defined in the splitter section."
)
if (self.test_ratio) <= 0 or (self.test_ratio >= 1):
raise ValueError(
"The 'test_ratio' parameter should be within the range of 0 and 1, excluding "
"extreme values (i.e., 0 < 'test_ratio' < 1)."
)
self.prefold = self.conf["splitter"].get("prefold", False)
def _parse_normalization_parameters(self) -> None:
"""Parse parameters from the normalization section of the data configuration file."""
self.normalize = self.conf["normalization"].get("normalize", False)
self.normalization_method = self.conf["normalization"].get("method")
if self.normalize:
if "method" not in self.conf["normalization"]:
raise AssertionError(
"The 'method' parameter should be specified in the normalization section of "
"the data configuration file when 'normalize' parameter is set to True."
)
if self.normalization_method not in DataLoader.NORMALIZATION_METHODS.keys():
raise NotImplementedError(
f"The normalization method '{self.normalization_method}' is not implemented."
)
self.normalizer = DataLoader.NORMALIZATION_METHODS[self.normalization_method]()
else:
if self.normalization_method is not None:
raise ValueError(
"The 'normalize' parameter should be set to True in the normalization section"
" of the data configuration file when 'method' parameter is specified."
)
def _parse_parameters(self) -> None:
"""Parse parameters, validate their values, and assign them to attributes."""
self._parse_general_parameters()
self._parse_splitter_parameters()
self._parse_normalization_parameters()
[docs]
def get_ready(self) -> None:
"""
Prepare the data for a Cooperative Co-Evolutionary Algorithm to perform feature selection.
"""
self._load()
self._preprocess()
self._split()
self._model_selection()
if self.normalize:
self.X_train, self.X_test = self._normalize_subsets(
X_train=self.X_train,
X_test=self.X_test
)
logging.info("Data is ready for use.")
def _load(self) -> None:
"""Load dataset according to dataset given as a parameter."""
try:
current_dir = os.path.dirname(__file__)
path = os.path.join(current_dir, "..", "datasets", DataLoader.DATASETS[self.dataset]["file"])
except:
# Check if the chosen dataset is available
raise ValueError(
f"The '{self.dataset}' dataset is not available. "
f"The available datasets are {', '.join(DataLoader.DATASETS.keys())}."
)
# Load dataset
logging.info(f"Dataset: {self.dataset}")
self.data = pd.read_parquet(path)
def _get_input(self) -> pd.DataFrame:
"""Get the input data X from the dataset.
Returns
-------
X : pd.DataFrame (n_examples, n_features)
Input data (features).
"""
# Get all columns except 'label', 'subset', and 'fold' while preserving order
excluded_cols = ["label", "subset", "fold"]
selected_cols = [col for col in self.data.columns if col not in excluded_cols]
X = self.data.loc[:, selected_cols].copy()
return X
def _get_output(self) -> pd.Series:
"""Get the output data y from the dataset.
Returns
-------
y : pd.Series (n_examples, )
Output data (labels).
"""
y = self.data.loc[:, "label"].copy()
return y
def _preprocess(self, dropna: bool = True) -> None:
"""Preprocess the dataset to be used by machine learning models.
Parameters
----------
dropna : bool, default False
Remove rows that contains NaN values.
"""
# Setting a default representation for NaN values
self.data.replace(to_replace="?", value=np.nan, inplace=True)
# Remove rows with at least one NaN value
if dropna:
# Store the number of rows before dropping NaNs
initial_row_count = self.data.shape[0]
self.data.dropna(inplace=True)
self.data.reset_index(drop=True, inplace=True)
# Calculate the number of removed rows
removed_rows = initial_row_count - self.data.shape[0]
logging.info(f"Number of rows removed due to NaN values: {removed_rows}")
# Split into input and output data
self.X = self._get_input()
self.y = self._get_output()
# Set number of examples
self.n_examples = self.X.shape[0]
# Set number of features
self.n_features = self.X.shape[1]
# For classification tasks
if DataLoader.DATASETS[self.dataset]["task"] == "classification":
# Set number of classes
self.n_classes = self.y.nunique()
# Get class identifiers
self.classes = sorted(self.y.unique())
# Compute imbalance ratio
minority_class = self.y.value_counts().min()
majority_class = self.y.value_counts().max()
self.imbalance_ratio = round(majority_class/minority_class, 4)
# Ensure labels are integer-encoded
self.y = self.y.astype(int)
def _split(self) -> None:
"""Split dataset into training and test sets."""
if self.preset:
logging.info("Using predefined sets...")
# Get predefined training set
self.X_train = (
self.data.query("subset == 'train'")
.drop(columns=["label", "subset", "fold"], errors="ignore")
.to_numpy()
)
self.y_train = self.data.query("subset == 'train'")["label"].to_numpy()
# Get predefined test set
self.X_test = (
self.data.query("subset == 'test'")
.drop(columns=["label", "subset", "fold"], errors="ignore")
.to_numpy()
)
self.y_test = self.data.query("subset == 'test'")["label"].to_numpy()
else:
logging.info("Splitting data...")
# Split data into training and test sets
self.X_train, self.X_test, self.y_train, self.y_test = (
train_test_split(
self.X.to_numpy(),
self.y.to_numpy(),
test_size=self.test_ratio,
random_state=self.seed
)
)
# Set subset sizes
self.train_size = self.X_train.shape[0]
self.test_size = self.X_test.shape[0]
self.test_ratio = round(self.test_size / (self.train_size + self.test_size), 4)
logging.info(f"Training set with {self.train_size} observations.")
logging.info(f"Test set with {self.test_size} observations.")
def _normalize_subsets(
self,
X_train: pd.DataFrame,
X_test: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Normalize feature of subsets.
Parameters
----------
X_train : pd.DataFrame
Training input data.
X_test : pd.DataFrame
Test input data.
Returns
-------
X_normalized_train : pd.DataFrame
Normalized training input data.
X_normalized_test : pd.DataFrame
Normalized test input data.
"""
# Normalization across instances should be done after splitting the data between training
# and test set to avoid leakage
normalizer = copy.deepcopy(self.normalizer)
X_normalized_train = normalizer.fit_transform(X=X_train)
# When normalizing the test set, it should apply the normalization parameters previously
# obtained from the training set as-is
X_normalized_test = normalizer.transform(X=X_test)
return X_normalized_train, X_normalized_test
def _model_selection(self) -> None:
"""Prepare data according to the specified splitter type."""
logging.info(f"Splitter type: {self.splitter_type}.")
def _populate_folds(splitter, X, y, groups=None):
"Populate the train and validation folds using the provided splitter."
self.train_folds = []
self.val_folds = []
self.train_indices = []
self.val_indices = []
split_args = (X, y) if groups is None else (X, y, groups)
for train_idx, val_idx in splitter.split(*split_args):
X_train_fold, X_val_fold = X[train_idx].copy(), X[val_idx].copy()
y_train_fold, y_val_fold = y[train_idx].copy(), y[val_idx].copy()
if self.normalize:
X_train_fold, X_val_fold = self._normalize_subsets(
X_train_fold, X_val_fold
)
# Store the train and validation folds
self.train_folds.append([X_train_fold, y_train_fold])
self.val_folds.append([X_val_fold, y_val_fold])
self.train_indices.append(train_idx)
self.val_indices.append(val_idx)
if self.splitter_type == "k_fold":
if self.prefold:
if "fold" not in self.data.columns:
raise AssertionError(
"The 'fold' column should be specified in the dataset when 'prefold' "
"is set to True."
)
logging.info("Using predefined folds...")
train_data = self.data.query("subset == 'train'")
kfolds = train_data["fold"].nunique()
if self.kfolds is not None and kfolds != self.kfolds:
raise AssertionError(
f"The number of folds in the training set ({kfolds}) does not match the "
f"number of folds specified in the configuration file ({self.kfolds})."
)
self.kfolds = kfolds
self.splitter = GroupKFold(n_splits=self.kfolds, shuffle=True, random_state=self.seed)
_populate_folds(self.splitter, self.X_train, self.y_train, groups=train_data["fold"])
else:
is_classification = DataLoader.DATASETS[self.dataset]["task"] == "classification"
if self.stratified and is_classification:
self.splitter = StratifiedKFold(
n_splits=self.kfolds, shuffle=True, random_state=self.seed
)
else:
self.splitter = KFold(
n_splits=self.kfolds, shuffle=True, random_state=self.seed
)
_populate_folds(self.splitter, self.X_train, self.y_train)
elif self.splitter_type == "leave_one_out":
self.splitter = LeaveOneOut()
self.kfolds = self.train_size
_populate_folds(self.splitter, self.X_train, self.y_train)