Source code for pyccea.utils.datasets

import os
import toml
import copy
import logging
import warnings
import numpy as np
import pandas as pd
import importlib.resources
from typing import Tuple
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import GroupKFold, KFold, LeaveOneOut, StratifiedKFold


[docs] class DataLoader(): """Load dataset and preprocess it to train machine learning algorithms. Attributes ---------- data : pd.DataFrame Raw dataset. X : pd.DataFrame Raw input data. y : pd.Series Raw output data. X_train : np.ndarray Train input data. X_test : np.ndarray Test input data. y_train : np.ndarray Train output data. y_test : np.ndarray Test output data. n_examples : int Total number of examples. n_features : int Number of features in the dataset. n_classes : int Number of classes. classes : np.ndarray Class identifiers. train_size : int Number of examples in the training set. test_size : int Number of examples in the test set. seed : int, default None It controls the randomness of the data split. preset : bool, default False In some works, the training and testing sets have already been defined. To use them, just set this boolean variable to True. test_ratio : float Proportion of the dataset to include in the test set. It should be between 0 and 1. splitter_type : str Model selection strategy. It can be "k_fold" or "leave_one_out". kfolds : int or None Number of folds in the k-fold cross validation or in the leave-one-out cross validation. stratified : bool, default False If True, the folds are made by preserving the percentage of examples for each class. It is only used in case 'splitter_type' parameter is set to 'k_fold'. normalize : bool, default False If True, normalizes training and test sets generated by the split method. """ # Class parameters SPLITTER_TYPES = ["k_fold", "leave_one_out"] PRIMARY_CONF_KEYS = ["general", "splitter", "normalization"] NORMALIZATION_METHODS = {"min_max": MinMaxScaler, "standard": StandardScaler} with importlib.resources.open_text("pyccea.parameters", "datasets.toml") as toml_file: DATASETS = toml.load(toml_file) def __init__(self, dataset: str, conf: dict): """ Parameters ---------- dataset : str Name of the dataset that will be loaded and processed. conf : dict Configuration parameters of the dataloader. """ self.dataset = dataset self.conf = conf # Check if the data configuration file passed as parameter is valid for primary_key in DataLoader.PRIMARY_CONF_KEYS: if primary_key not in self.conf: raise AssertionError( f"The '{primary_key}' section should be specified in the data configuration " "file." ) # Initialize logger with info level if self.conf["general"].get("verbose", True): logging.basicConfig(encoding="utf-8", level=logging.INFO) # Parse parameters self._parse_parameters() def _parse_general_parameters(self) -> None: """Parse parameters from the general section of the data configuration file.""" if "splitter_type" not in self.conf["general"]: raise AssertionError( "The 'splitter_type' parameter should be specified in the general section of the " "data configuration file." ) self.splitter_type = self.conf["general"]["splitter_type"] if self.splitter_type not in DataLoader.SPLITTER_TYPES: raise NotImplementedError( f"The splitter type '{self.splitter_type}' is not implemented." ) self.seed = self.conf["general"].get("seed") self.verbose = self.conf["general"].get("verbose", True) def _parse_splitter_parameters(self) -> None: """Parse parameters from the splitter section of the data configuration file.""" if self.splitter_type == "k_fold": if ("kfolds" not in self.conf["splitter"]) and (self.conf["splitter"].get("prefold", False) is False): raise AssertionError( "The parameter 'kfolds' should be specified in the splitter section of the " "data configuration file when 'splitter_type' is set to 'k_fold' and 'prefold' " "is set to False or is not defined in the splitter section." ) self.kfolds = self.conf["splitter"].get("kfolds") self.stratified = self.conf["splitter"].get("stratified", False) if self.splitter_type == "leave_one_out": if "kfolds" in self.conf["splitter"]: warnings.warn( "You specified the number of folds using Leave-One-Out (LOO). However, LOO is" " equivalent to K-Fold when K is equal to the number of examples. Therefore, " "the value of 'kfolds' parameter will be ignored in this case.", UserWarning ) if "stratified" in self.conf["splitter"]: warnings.warn( "You specified the 'stratified' parameter using Leave-One-Out (LOO). However," " the validation folds made by the LOO have only one sample. Therefore, the " "value of 'stratified' parameter will be ignored in this case.", UserWarning ) if self.conf["splitter"].get("prefold"): warnings.warn( "You specified the 'prefold' parameter using Leave-One-Out (LOO). However, " "the validation folds made by the LOO have only one sample. Therefore, the " "value of 'prefold' parameter will be ignored in this case.", UserWarning ) self.preset = self.conf["splitter"].get("preset", False) if self.preset: if self.conf["splitter"].get("test_ratio") is not None: logging.info( "After setting both the 'preset' and 'test_ratio' parameters, the predefined " "subsets will take precedence, rendering the 'test_ratio' parameter unused." ) self.test_ratio = None else: self.test_ratio = self.conf["splitter"].get("test_ratio") if self.test_ratio is None: raise ValueError( "The 'test_ratio' parameter should be specified in the splitter section of " "the data configuration file when the 'preset' parameter is set to False or " "is not defined in the splitter section." ) if (self.test_ratio) <= 0 or (self.test_ratio >= 1): raise ValueError( "The 'test_ratio' parameter should be within the range of 0 and 1, excluding " "extreme values (i.e., 0 < 'test_ratio' < 1)." ) self.prefold = self.conf["splitter"].get("prefold", False) def _parse_normalization_parameters(self) -> None: """Parse parameters from the normalization section of the data configuration file.""" self.normalize = self.conf["normalization"].get("normalize", False) self.normalization_method = self.conf["normalization"].get("method") if self.normalize: if "method" not in self.conf["normalization"]: raise AssertionError( "The 'method' parameter should be specified in the normalization section of " "the data configuration file when 'normalize' parameter is set to True." ) if self.normalization_method not in DataLoader.NORMALIZATION_METHODS.keys(): raise NotImplementedError( f"The normalization method '{self.normalization_method}' is not implemented." ) self.normalizer = DataLoader.NORMALIZATION_METHODS[self.normalization_method]() else: if self.normalization_method is not None: raise ValueError( "The 'normalize' parameter should be set to True in the normalization section" " of the data configuration file when 'method' parameter is specified." ) def _parse_parameters(self) -> None: """Parse parameters, validate their values, and assign them to attributes.""" self._parse_general_parameters() self._parse_splitter_parameters() self._parse_normalization_parameters()
[docs] def get_ready(self) -> None: """ Prepare the data for a Cooperative Co-Evolutionary Algorithm to perform feature selection. """ self._load() self._preprocess() self._split() self._model_selection() if self.normalize: self.X_train, self.X_test = self._normalize_subsets( X_train=self.X_train, X_test=self.X_test ) logging.info("Data is ready for use.")
def _load(self) -> None: """Load dataset according to dataset given as a parameter.""" try: current_dir = os.path.dirname(__file__) path = os.path.join(current_dir, "..", "datasets", DataLoader.DATASETS[self.dataset]["file"]) except: # Check if the chosen dataset is available raise ValueError( f"The '{self.dataset}' dataset is not available. " f"The available datasets are {', '.join(DataLoader.DATASETS.keys())}." ) # Load dataset logging.info(f"Dataset: {self.dataset}") self.data = pd.read_parquet(path) def _get_input(self) -> pd.DataFrame: """Get the input data X from the dataset. Returns ------- X : pd.DataFrame (n_examples, n_features) Input data (features). """ # Get all columns except 'label', 'subset', and 'fold' while preserving order excluded_cols = ["label", "subset", "fold"] selected_cols = [col for col in self.data.columns if col not in excluded_cols] X = self.data.loc[:, selected_cols].copy() return X def _get_output(self) -> pd.Series: """Get the output data y from the dataset. Returns ------- y : pd.Series (n_examples, ) Output data (labels). """ y = self.data.loc[:, "label"].copy() return y def _preprocess(self, dropna: bool = True) -> None: """Preprocess the dataset to be used by machine learning models. Parameters ---------- dropna : bool, default False Remove rows that contains NaN values. """ # Setting a default representation for NaN values self.data.replace(to_replace="?", value=np.nan, inplace=True) # Remove rows with at least one NaN value if dropna: # Store the number of rows before dropping NaNs initial_row_count = self.data.shape[0] self.data.dropna(inplace=True) self.data.reset_index(drop=True, inplace=True) # Calculate the number of removed rows removed_rows = initial_row_count - self.data.shape[0] logging.info(f"Number of rows removed due to NaN values: {removed_rows}") # Split into input and output data self.X = self._get_input() self.y = self._get_output() # Set number of examples self.n_examples = self.X.shape[0] # Set number of features self.n_features = self.X.shape[1] # For classification tasks if DataLoader.DATASETS[self.dataset]["task"] == "classification": # Set number of classes self.n_classes = self.y.nunique() # Get class identifiers self.classes = sorted(self.y.unique()) # Compute imbalance ratio minority_class = self.y.value_counts().min() majority_class = self.y.value_counts().max() self.imbalance_ratio = round(majority_class/minority_class, 4) # Ensure labels are integer-encoded self.y = self.y.astype(int) def _split(self) -> None: """Split dataset into training and test sets.""" if self.preset: logging.info("Using predefined sets...") # Get predefined training set self.X_train = ( self.data.query("subset == 'train'") .drop(columns=["label", "subset", "fold"], errors="ignore") .to_numpy() ) self.y_train = self.data.query("subset == 'train'")["label"].to_numpy() # Get predefined test set self.X_test = ( self.data.query("subset == 'test'") .drop(columns=["label", "subset", "fold"], errors="ignore") .to_numpy() ) self.y_test = self.data.query("subset == 'test'")["label"].to_numpy() else: logging.info("Splitting data...") # Split data into training and test sets self.X_train, self.X_test, self.y_train, self.y_test = ( train_test_split( self.X.to_numpy(), self.y.to_numpy(), test_size=self.test_ratio, random_state=self.seed ) ) # Set subset sizes self.train_size = self.X_train.shape[0] self.test_size = self.X_test.shape[0] self.test_ratio = round(self.test_size / (self.train_size + self.test_size), 4) logging.info(f"Training set with {self.train_size} observations.") logging.info(f"Test set with {self.test_size} observations.") def _normalize_subsets( self, X_train: pd.DataFrame, X_test: pd.DataFrame ) -> Tuple[pd.DataFrame, pd.DataFrame]: """Normalize feature of subsets. Parameters ---------- X_train : pd.DataFrame Training input data. X_test : pd.DataFrame Test input data. Returns ------- X_normalized_train : pd.DataFrame Normalized training input data. X_normalized_test : pd.DataFrame Normalized test input data. """ # Normalization across instances should be done after splitting the data between training # and test set to avoid leakage normalizer = copy.deepcopy(self.normalizer) X_normalized_train = normalizer.fit_transform(X=X_train) # When normalizing the test set, it should apply the normalization parameters previously # obtained from the training set as-is X_normalized_test = normalizer.transform(X=X_test) return X_normalized_train, X_normalized_test def _model_selection(self) -> None: """Prepare data according to the specified splitter type.""" logging.info(f"Splitter type: {self.splitter_type}.") def _populate_folds(splitter, X, y, groups=None): "Populate the train and validation folds using the provided splitter." self.train_folds = [] self.val_folds = [] self.train_indices = [] self.val_indices = [] split_args = (X, y) if groups is None else (X, y, groups) for train_idx, val_idx in splitter.split(*split_args): X_train_fold, X_val_fold = X[train_idx].copy(), X[val_idx].copy() y_train_fold, y_val_fold = y[train_idx].copy(), y[val_idx].copy() if self.normalize: X_train_fold, X_val_fold = self._normalize_subsets( X_train_fold, X_val_fold ) # Store the train and validation folds self.train_folds.append([X_train_fold, y_train_fold]) self.val_folds.append([X_val_fold, y_val_fold]) self.train_indices.append(train_idx) self.val_indices.append(val_idx) if self.splitter_type == "k_fold": if self.prefold: if "fold" not in self.data.columns: raise AssertionError( "The 'fold' column should be specified in the dataset when 'prefold' " "is set to True." ) logging.info("Using predefined folds...") train_data = self.data.query("subset == 'train'") kfolds = train_data["fold"].nunique() if self.kfolds is not None and kfolds != self.kfolds: raise AssertionError( f"The number of folds in the training set ({kfolds}) does not match the " f"number of folds specified in the configuration file ({self.kfolds})." ) self.kfolds = kfolds self.splitter = GroupKFold(n_splits=self.kfolds, shuffle=True, random_state=self.seed) _populate_folds(self.splitter, self.X_train, self.y_train, groups=train_data["fold"]) else: is_classification = DataLoader.DATASETS[self.dataset]["task"] == "classification" if self.stratified and is_classification: self.splitter = StratifiedKFold( n_splits=self.kfolds, shuffle=True, random_state=self.seed ) else: self.splitter = KFold( n_splits=self.kfolds, shuffle=True, random_state=self.seed ) _populate_folds(self.splitter, self.X_train, self.y_train) elif self.splitter_type == "leave_one_out": self.splitter = LeaveOneOut() self.kfolds = self.train_size _populate_folds(self.splitter, self.X_train, self.y_train)