import gc
import importlib.resources
import logging
import os
import warnings
from typing import Tuple
import numpy as np
import pandas as pd
import toml
from sklearn.base import clone as sklearn_clone
from sklearn.model_selection import (GroupKFold, KFold, LeaveOneOut,
StratifiedKFold, train_test_split)
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from .preprocessing import Winsoriser
[docs]
class DataLoader():
"""Load dataset and preprocess it to train machine learning algorithms.
Attributes
----------
data : pd.DataFrame
Raw dataset.
X : pd.DataFrame
Raw input data.
y : pd.Series
Raw output data.
X_train : np.ndarray
Train input data.
X_test : np.ndarray
Test input data.
y_train : np.ndarray
Train output data.
y_test : np.ndarray
Test output data.
n_examples : int
Total number of examples.
n_features : int
Number of features in the dataset.
n_classes : int
Number of classes.
classes : np.ndarray
Class identifiers.
train_size : int
Number of examples in the training set.
test_size : int
Number of examples in the test set.
seed : int, default None
It controls the randomness of the data split.
preset : bool, default False
In some works, the training and testing sets have already been defined. To use them, just
set this boolean variable to True.
test_ratio : float
Proportion of the dataset to include in the test set. It should be between 0 and 1.
splitter_type : str
Model selection strategy. It can be "k_fold" or "leave_one_out".
kfolds : int or None
Number of folds in the k-fold cross validation or in the leave-one-out cross validation.
stratified : bool, default False
If True, the folds are made by preserving the percentage of examples for each class. It is
only used in case 'splitter_type' parameter is set to 'k_fold'.
normalize : bool, default False
If True, normalizes training and test sets generated by the split method.
"""
# Class parameters
SPLITTER_TYPES = ["k_fold", "leave_one_out"]
PRIMARY_CONF_KEYS = ["general", "splitter", "preprocessing", "normalization"]
NORMALIZATION_METHODS = {"min_max": MinMaxScaler, "standard": StandardScaler}
with importlib.resources.open_text("pyccea.parameters", "datasets.toml") as toml_file:
DATASETS = toml.load(toml_file)
def __init__(self, dataset: str, conf: dict):
"""
Parameters
----------
dataset : str
Name of the dataset that will be loaded and processed.
conf : dict
Configuration parameters of the dataloader.
"""
self.dataset = dataset
self.conf = conf
# Check if the data configuration file passed as parameter is valid
for primary_key in DataLoader.PRIMARY_CONF_KEYS:
if primary_key not in self.conf:
raise AssertionError(
f"The '{primary_key}' section should be specified in the data configuration "
"file."
)
# Initialize logger with info level
if self.conf["general"].get("verbose", True):
logging.basicConfig(encoding="utf-8", level=logging.INFO)
# Parse parameters
self._parse_parameters()
def _parse_general_parameters(self) -> None:
"""Parse parameters from the general section of the data configuration file."""
if "splitter_type" not in self.conf["general"]:
raise AssertionError(
"The 'splitter_type' parameter should be specified in the general section of the "
"data configuration file."
)
self.splitter_type = self.conf["general"]["splitter_type"]
if self.splitter_type not in DataLoader.SPLITTER_TYPES:
raise NotImplementedError(
f"The splitter type '{self.splitter_type}' is not implemented."
)
self.seed = self.conf["general"].get("seed")
self.verbose = self.conf["general"].get("verbose", True)
float_dtype = self.conf["general"].get("float_dtype")
if float_dtype is None:
self.float_dtype = None
else:
float_dtype = str(float_dtype).lower()
if float_dtype not in ["float32", "float64"]:
raise ValueError(
"The 'float_dtype' parameter in the general section of the data "
"configuration file should be either 'float32' or 'float64'."
)
self.float_dtype = np.float32 if float_dtype == "float32" else np.float64
def _parse_splitter_parameters(self) -> None:
"""Parse parameters from the splitter section of the data configuration file."""
if self.splitter_type == "k_fold":
if ("kfolds" not in self.conf["splitter"]) and (self.conf["splitter"].get("prefold", False) is False):
raise AssertionError(
"The parameter 'kfolds' should be specified in the splitter section of the "
"data configuration file when 'splitter_type' is set to 'k_fold' and 'prefold' "
"is set to False or is not defined in the splitter section."
)
self.kfolds = self.conf["splitter"].get("kfolds")
self.stratified = self.conf["splitter"].get("stratified", False)
if self.splitter_type == "leave_one_out":
if "kfolds" in self.conf["splitter"]:
warnings.warn(
"You specified the number of folds using Leave-One-Out (LOO). However, LOO is"
" equivalent to K-Fold when K is equal to the number of examples. Therefore, "
"the value of 'kfolds' parameter will be ignored in this case.",
UserWarning
)
if "stratified" in self.conf["splitter"]:
warnings.warn(
"You specified the 'stratified' parameter using Leave-One-Out (LOO). However,"
" the validation folds made by the LOO have only one sample. Therefore, the "
"value of 'stratified' parameter will be ignored in this case.",
UserWarning
)
if self.conf["splitter"].get("prefold"):
warnings.warn(
"You specified the 'prefold' parameter using Leave-One-Out (LOO). However, "
"the validation folds made by the LOO have only one sample. Therefore, the "
"value of 'prefold' parameter will be ignored in this case.",
UserWarning
)
self.preset = self.conf["splitter"].get("preset", False)
if self.preset:
if self.conf["splitter"].get("test_ratio") is not None:
logging.info(
"After setting both the 'preset' and 'test_ratio' parameters, the predefined "
"subsets will take precedence, rendering the 'test_ratio' parameter unused."
)
self.test_ratio = None
else:
self.test_ratio = self.conf["splitter"].get("test_ratio")
if self.test_ratio is None:
raise ValueError(
"The 'test_ratio' parameter should be specified in the splitter section of "
"the data configuration file when the 'preset' parameter is set to False or "
"is not defined in the splitter section."
)
if (self.test_ratio) <= 0 or (self.test_ratio >= 1):
raise ValueError(
"The 'test_ratio' parameter should be within the range of 0 and 1, excluding "
"extreme values (i.e., 0 < 'test_ratio' < 1)."
)
self.prefold = self.conf["splitter"].get("prefold", False)
def _parse_normalization_parameters(self) -> None:
"""Parse parameters from the normalization section of the data configuration file."""
self.normalize = self.conf["normalization"].get("normalize", False)
self.normalization_method = self.conf["normalization"].get("method")
if self.normalize:
if "method" not in self.conf["normalization"]:
raise AssertionError(
"The 'method' parameter should be specified in the normalization section of "
"the data configuration file when 'normalize' parameter is set to True."
)
if self.normalization_method not in DataLoader.NORMALIZATION_METHODS.keys():
raise NotImplementedError(
f"The normalization method '{self.normalization_method}' is not implemented."
)
self.normalizer = DataLoader.NORMALIZATION_METHODS[self.normalization_method]()
else:
if self.normalization_method is not None:
raise ValueError(
"The 'normalize' parameter should be set to True in the normalization section"
" of the data configuration file when 'method' parameter is specified."
)
def _parse_preprocessing_parameters(self) -> None:
"""Parse parameters from the preprocessing section of the data configuration file."""
self._dropna = self.conf["preprocessing"].get("drop_na")
self.winsorization = self.conf["preprocessing"].get("winsorization", False)
if self.winsorization:
if "quantiles" not in self.conf["preprocessing"]:
raise AssertionError(
"The Winsorization method is enable, but the required 'quantiles' key is"
"missing in the 'preprocessing configuration section."
)
lower, upper = self.conf["preprocessing"]["quantiles"]
self.winsor = Winsoriser(lower=lower, upper=upper)
def _parse_parameters(self) -> None:
"""Parse parameters, validate their values, and assign them to attributes."""
self._parse_general_parameters()
self._parse_splitter_parameters()
self._parse_normalization_parameters()
self._parse_preprocessing_parameters()
def _cast_array(self, X: np.ndarray, array_name: str) -> np.ndarray:
"""Cast input matrix to the configured float dtype."""
# Only cast numeric arrays; object arrays should fail earlier in preprocessing.
if isinstance(X, np.ndarray) and np.issubdtype(X.dtype, np.number):
if X.dtype != self.float_dtype:
logging.info(f"Casting {array_name} from {X.dtype} to {self.float_dtype}.")
return X.astype(self.float_dtype, copy=False)
return X
def _cast_float_dtype(self) -> None:
"""Cast stored feature matrices to the configured float dtype (if any)."""
if self.float_dtype is None:
return
# Main subsets
if hasattr(self, "X_train"):
self.X_train = self._cast_array(self.X_train, "X_train")
if hasattr(self, "X_test"):
self.X_test = self._cast_array(self.X_test, "X_test")
# Fold (if exists)
if hasattr(self, "train_folds") and hasattr(self, "val_folds"):
for k in range(len(self.train_folds)):
self.train_folds[k][0] = self._cast_array(self.train_folds[k][0], f"{k}-th train fold")
self.val_folds[k][0] = self._cast_array(self.val_folds[k][0], f"{k}-th val fold")
# Raw training set used for fold normalization (if exists)
if hasattr(self, "_raw_X_train"):
self._raw_X_train = self._cast_array(self._raw_X_train, "_raw_X_train")
def _delete_intermediate_data(self) -> None:
"""Delete intermediate data to free memory."""
if hasattr(self, "data"):
delattr(self, "data")
if hasattr(self, "X"):
delattr(self, "X")
if hasattr(self, "y"):
delattr(self, "y")
gc.collect()
def _load(self) -> None:
"""Load dataset according to dataset given as a parameter."""
try:
current_dir = os.path.dirname(__file__)
path = os.path.join(current_dir, "..", "datasets", DataLoader.DATASETS[self.dataset]["file"])
except:
# Check if the chosen dataset is available
raise ValueError(
f"The '{self.dataset}' dataset is not available. "
f"The available datasets are {', '.join(DataLoader.DATASETS.keys())}."
)
# Load dataset
logging.info(f"Dataset: {self.dataset}")
self.data = pd.read_parquet(path)
def _get_input(self) -> pd.DataFrame:
"""Get the input data X from the dataset.
Returns
-------
: pd.DataFrame (n_examples, n_features)
Input data (features).
"""
# Get all columns except 'label', 'subset', and 'fold' while preserving order
excluded_cols = ["label", "subset", "fold"]
selected_cols = [col for col in self.data.columns if col not in excluded_cols]
return self.data.loc[:, selected_cols]
def _get_output(self) -> pd.Series:
"""Get the output data y from the dataset.
Returns
-------
: pd.Series (n_examples, )
Output data (labels).
"""
return self.data.loc[:, "label"]
def _preprocess(self) -> None:
"""Preprocess the dataset to be used by machine learning models."""
# Setting a default representation for NaN values
pd.set_option("future.no_silent_downcasting", True)
self.data.replace(to_replace="?", value=np.nan, inplace=True)
# Remove rows with at least one NaN value
if self._dropna:
# Store the number of rows before dropping NaNs
initial_row_count = self.data.shape[0]
self.data.dropna(inplace=True)
self.data.reset_index(drop=True, inplace=True)
# Calculate the number of removed rows
removed_rows = initial_row_count - self.data.shape[0]
logging.info(f"Number of rows removed due to NaN values: {removed_rows}")
# Split into input and output data
self.X = self._get_input()
self.y = self._get_output()
# Set number of examples
self.n_examples = self.X.shape[0]
# Set number of features
self.n_features = self.X.shape[1]
# For classification tasks
if DataLoader.DATASETS[self.dataset]["task"] == "classification":
# Set number of classes
self.n_classes = self.y.nunique()
# Get class identifiers
self.classes = sorted(self.y.unique())
# Compute imbalance ratio
minority_class = self.y.value_counts().min()
majority_class = self.y.value_counts().max()
self.imbalance_ratio = round(majority_class/minority_class, 4)
# Ensure labels are integer-encoded
self.y = self.y.astype(np.int8)
def _split(self) -> None:
"""Split dataset into training and test sets."""
if self.preset:
logging.info("Using predefined sets...")
# Get predefined training set
self.X_train = (
self.data.query("subset == 'train'")
.drop(columns=["label", "subset", "fold"], errors="ignore")
.to_numpy()
)
self.y_train = self.data.query("subset == 'train'")["label"].to_numpy()
# Get predefined test set
self.X_test = (
self.data.query("subset == 'test'")
.drop(columns=["label", "subset", "fold"], errors="ignore")
.to_numpy()
)
self.y_test = self.data.query("subset == 'test'")["label"].to_numpy()
else:
logging.info("Splitting data...")
# Split data into training and test sets
self.X_train, self.X_test, self.y_train, self.y_test = (
train_test_split(
self.X.to_numpy(),
self.y.to_numpy(),
test_size=self.test_ratio,
random_state=self.seed
)
)
# Set subset sizes
self.train_size = self.X_train.shape[0]
self.test_size = self.X_test.shape[0]
self.test_ratio = round(self.test_size / (self.train_size + self.test_size), 4)
logging.info(f"Training set with {self.train_size} observations.")
logging.info(f"Test set with {self.test_size} observations.")
def _truncate_subsets(
self,
X_train: pd.DataFrame,
X_test: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Truncate the outliers.
Parameters
----------
X_train : pd.DataFrame
Training input data.
X_test : pd.DataFrame
Test input data.
Returns
-------
X_truncated_train : pd.DataFrame
Truncated training input data.
X_truncated_test : pd.DataFrame
Truncated test input data.
"""
# Winsorization across instances should be done after splitting the data between training
# and test set to avoid leakage
X_truncated_train = self.winsor.fit_transform(X=X_train)
# When truncating the test set, it should apply the quantiles parameters previously
# obtained from the training set as-is
X_truncated_test = self.winsor.transform(X=X_test)
return X_truncated_train, X_truncated_test
def _normalize_subsets(
self,
X_train: pd.DataFrame,
X_test: pd.DataFrame
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Normalize feature of subsets.
Parameters
----------
X_train : pd.DataFrame
Training input data.
X_test : pd.DataFrame
Test input data.
Returns
-------
X_normalized_train : pd.DataFrame
Normalized training input data.
X_normalized_test : pd.DataFrame
Normalized test input data.
"""
# Normalization across instances should be done after splitting the data between training
# and test set to avoid leakage
normalizer = sklearn_clone(self.normalizer)
if self.winsorization:
X_train, X_test = self._truncate_subsets(X_train, X_test)
X_normalized_train = normalizer.fit_transform(X=X_train)
# When normalizing the test set, it should apply the normalization parameters previously
# obtained from the training set as-is
X_normalized_test = normalizer.transform(X=X_test)
return X_normalized_train, X_normalized_test
def _model_selection(self) -> None:
"""Prepare data according to the specified splitter type."""
logging.info(f"Splitter type: {self.splitter_type}.")
def _populate_folds(splitter, X, y, groups=None):
"Populate the train and validation folds using the provided splitter."
self.train_indices = []
self.val_indices = []
split_args = (X, y) if groups is None else (X, y, groups)
for train_idx, val_idx in splitter.split(*split_args):
# Store the train and validation indices
self.train_indices.append(train_idx)
self.val_indices.append(val_idx)
if self.splitter_type == "k_fold":
if self.prefold:
if "fold" not in self.data.columns:
raise AssertionError(
"The 'fold' column should be specified in the dataset when 'prefold' "
"is set to True."
)
logging.info("Using predefined folds...")
train_data = self.data.query("subset == 'train'")
kfolds = train_data["fold"].nunique()
if self.kfolds is not None and kfolds != self.kfolds:
raise AssertionError(
f"The number of folds in the training set ({kfolds}) does not match the "
f"number of folds specified in the configuration file ({self.kfolds})."
)
self.kfolds = kfolds
self.splitter = GroupKFold(n_splits=self.kfolds, shuffle=True, random_state=self.seed)
_populate_folds(self.splitter, self.X_train, self.y_train, groups=train_data["fold"])
else:
is_classification = DataLoader.DATASETS[self.dataset]["task"] == "classification"
if self.stratified and is_classification:
self.splitter = StratifiedKFold(
n_splits=self.kfolds, shuffle=True, random_state=self.seed
)
else:
self.splitter = KFold(
n_splits=self.kfolds, shuffle=True, random_state=self.seed
)
_populate_folds(self.splitter, self.X_train, self.y_train)
elif self.splitter_type == "leave_one_out":
self.splitter = LeaveOneOut()
self.kfolds = self.train_size
_populate_folds(self.splitter, self.X_train, self.y_train)
[docs]
def get_fold(
self, k: int,
normalize: bool = None
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Get k-th train and validation folds.
Parameters
----------
k : int
Fold index.
normalize : bool, default None
If True, normalizes training and validation sets generated by the split method.
If None, uses the value of the 'normalize' attribute.
Returns
-------
X_fold_train : np.ndarray
k-th fold training input data.
y_fold_train : np.ndarray
k-th fold training output data.
X_fold_val : np.ndarray
k-th fold validation input data.
y_fold_val : np.ndarray
k-th fold validation output data.
"""
if not hasattr(self, "train_indices") or not hasattr(self, "val_indices"):
raise AttributeError(
"The train and validation folds have not been created yet. Please, run the "
"'_model_selection()' method first."
)
base_X = self._raw_X_train if hasattr(self, "_raw_X_train") else self.X_train
train_idx = self.train_indices[k]
val_idx = self.val_indices[k]
X_fold_train = base_X[train_idx]
y_fold_train = self.y_train[train_idx]
X_fold_val = base_X[val_idx]
y_fold_val = self.y_train[val_idx]
if normalize is None:
normalize = self.normalize
if normalize:
X_fold_train, X_fold_val = self._normalize_subsets(X_fold_train, X_fold_val)
return X_fold_train, y_fold_train, X_fold_val, y_fold_val
[docs]
def get_ready(self) -> None:
"""Prepare data for a machine learning algorithm to perform feature selection."""
self._load()
self._preprocess()
self._split()
self._model_selection()
if self.normalize:
logging.info("Normalizing train and test subsets...")
if self.splitter_type in ["k_fold", "leave_one_out"]:
# No .copy() needed. X_train is replaced (not modified in-place), so this
# keeps the pre-normalization array without duplicating memory.
self._raw_X_train = self.X_train
self.X_train, self.X_test = self._normalize_subsets(
X_train=self.X_train,
X_test=self.X_test
)
self._cast_float_dtype()
self._delete_intermediate_data()
logging.info("Data is ready for use.")