Source code for pyccea.utils.datasets

import gc
import importlib.resources
import logging
import os
import warnings
from typing import Tuple

import numpy as np
import pandas as pd
import toml
from sklearn.base import clone as sklearn_clone
from sklearn.model_selection import (GroupKFold, KFold, LeaveOneOut,
                                     StratifiedKFold, train_test_split)
from sklearn.preprocessing import MinMaxScaler, StandardScaler

from .preprocessing import Winsoriser


[docs] class DataLoader(): """Load dataset and preprocess it to train machine learning algorithms. Attributes ---------- data : pd.DataFrame Raw dataset. X : pd.DataFrame Raw input data. y : pd.Series Raw output data. X_train : np.ndarray Train input data. X_test : np.ndarray Test input data. y_train : np.ndarray Train output data. y_test : np.ndarray Test output data. n_examples : int Total number of examples. n_features : int Number of features in the dataset. n_classes : int Number of classes. classes : np.ndarray Class identifiers. train_size : int Number of examples in the training set. test_size : int Number of examples in the test set. seed : int, default None It controls the randomness of the data split. preset : bool, default False In some works, the training and testing sets have already been defined. To use them, just set this boolean variable to True. test_ratio : float Proportion of the dataset to include in the test set. It should be between 0 and 1. splitter_type : str Model selection strategy. It can be "k_fold" or "leave_one_out". kfolds : int or None Number of folds in the k-fold cross validation or in the leave-one-out cross validation. stratified : bool, default False If True, the folds are made by preserving the percentage of examples for each class. It is only used in case 'splitter_type' parameter is set to 'k_fold'. normalize : bool, default False If True, normalizes training and test sets generated by the split method. """ # Class parameters SPLITTER_TYPES = ["k_fold", "leave_one_out"] PRIMARY_CONF_KEYS = ["general", "splitter", "preprocessing", "normalization"] NORMALIZATION_METHODS = {"min_max": MinMaxScaler, "standard": StandardScaler} with importlib.resources.open_text("pyccea.parameters", "datasets.toml") as toml_file: DATASETS = toml.load(toml_file) def __init__(self, dataset: str, conf: dict): """ Parameters ---------- dataset : str Name of the dataset that will be loaded and processed. conf : dict Configuration parameters of the dataloader. """ self.dataset = dataset self.conf = conf # Check if the data configuration file passed as parameter is valid for primary_key in DataLoader.PRIMARY_CONF_KEYS: if primary_key not in self.conf: raise AssertionError( f"The '{primary_key}' section should be specified in the data configuration " "file." ) # Initialize logger with info level if self.conf["general"].get("verbose", True): logging.basicConfig(encoding="utf-8", level=logging.INFO) # Parse parameters self._parse_parameters() def _parse_general_parameters(self) -> None: """Parse parameters from the general section of the data configuration file.""" if "splitter_type" not in self.conf["general"]: raise AssertionError( "The 'splitter_type' parameter should be specified in the general section of the " "data configuration file." ) self.splitter_type = self.conf["general"]["splitter_type"] if self.splitter_type not in DataLoader.SPLITTER_TYPES: raise NotImplementedError( f"The splitter type '{self.splitter_type}' is not implemented." ) self.seed = self.conf["general"].get("seed") self.verbose = self.conf["general"].get("verbose", True) float_dtype = self.conf["general"].get("float_dtype") if float_dtype is None: self.float_dtype = None else: float_dtype = str(float_dtype).lower() if float_dtype not in ["float32", "float64"]: raise ValueError( "The 'float_dtype' parameter in the general section of the data " "configuration file should be either 'float32' or 'float64'." ) self.float_dtype = np.float32 if float_dtype == "float32" else np.float64 def _parse_splitter_parameters(self) -> None: """Parse parameters from the splitter section of the data configuration file.""" if self.splitter_type == "k_fold": if ("kfolds" not in self.conf["splitter"]) and (self.conf["splitter"].get("prefold", False) is False): raise AssertionError( "The parameter 'kfolds' should be specified in the splitter section of the " "data configuration file when 'splitter_type' is set to 'k_fold' and 'prefold' " "is set to False or is not defined in the splitter section." ) self.kfolds = self.conf["splitter"].get("kfolds") self.stratified = self.conf["splitter"].get("stratified", False) if self.splitter_type == "leave_one_out": if "kfolds" in self.conf["splitter"]: warnings.warn( "You specified the number of folds using Leave-One-Out (LOO). However, LOO is" " equivalent to K-Fold when K is equal to the number of examples. Therefore, " "the value of 'kfolds' parameter will be ignored in this case.", UserWarning ) if "stratified" in self.conf["splitter"]: warnings.warn( "You specified the 'stratified' parameter using Leave-One-Out (LOO). However," " the validation folds made by the LOO have only one sample. Therefore, the " "value of 'stratified' parameter will be ignored in this case.", UserWarning ) if self.conf["splitter"].get("prefold"): warnings.warn( "You specified the 'prefold' parameter using Leave-One-Out (LOO). However, " "the validation folds made by the LOO have only one sample. Therefore, the " "value of 'prefold' parameter will be ignored in this case.", UserWarning ) self.preset = self.conf["splitter"].get("preset", False) if self.preset: if self.conf["splitter"].get("test_ratio") is not None: logging.info( "After setting both the 'preset' and 'test_ratio' parameters, the predefined " "subsets will take precedence, rendering the 'test_ratio' parameter unused." ) self.test_ratio = None else: self.test_ratio = self.conf["splitter"].get("test_ratio") if self.test_ratio is None: raise ValueError( "The 'test_ratio' parameter should be specified in the splitter section of " "the data configuration file when the 'preset' parameter is set to False or " "is not defined in the splitter section." ) if (self.test_ratio) <= 0 or (self.test_ratio >= 1): raise ValueError( "The 'test_ratio' parameter should be within the range of 0 and 1, excluding " "extreme values (i.e., 0 < 'test_ratio' < 1)." ) self.prefold = self.conf["splitter"].get("prefold", False) def _parse_normalization_parameters(self) -> None: """Parse parameters from the normalization section of the data configuration file.""" self.normalize = self.conf["normalization"].get("normalize", False) self.normalization_method = self.conf["normalization"].get("method") if self.normalize: if "method" not in self.conf["normalization"]: raise AssertionError( "The 'method' parameter should be specified in the normalization section of " "the data configuration file when 'normalize' parameter is set to True." ) if self.normalization_method not in DataLoader.NORMALIZATION_METHODS.keys(): raise NotImplementedError( f"The normalization method '{self.normalization_method}' is not implemented." ) self.normalizer = DataLoader.NORMALIZATION_METHODS[self.normalization_method]() else: if self.normalization_method is not None: raise ValueError( "The 'normalize' parameter should be set to True in the normalization section" " of the data configuration file when 'method' parameter is specified." ) def _parse_preprocessing_parameters(self) -> None: """Parse parameters from the preprocessing section of the data configuration file.""" self._dropna = self.conf["preprocessing"].get("drop_na") self.winsorization = self.conf["preprocessing"].get("winsorization", False) if self.winsorization: if "quantiles" not in self.conf["preprocessing"]: raise AssertionError( "The Winsorization method is enable, but the required 'quantiles' key is" "missing in the 'preprocessing configuration section." ) lower, upper = self.conf["preprocessing"]["quantiles"] self.winsor = Winsoriser(lower=lower, upper=upper) def _parse_parameters(self) -> None: """Parse parameters, validate their values, and assign them to attributes.""" self._parse_general_parameters() self._parse_splitter_parameters() self._parse_normalization_parameters() self._parse_preprocessing_parameters() def _cast_array(self, X: np.ndarray, array_name: str) -> np.ndarray: """Cast input matrix to the configured float dtype.""" # Only cast numeric arrays; object arrays should fail earlier in preprocessing. if isinstance(X, np.ndarray) and np.issubdtype(X.dtype, np.number): if X.dtype != self.float_dtype: logging.info(f"Casting {array_name} from {X.dtype} to {self.float_dtype}.") return X.astype(self.float_dtype, copy=False) return X def _cast_float_dtype(self) -> None: """Cast stored feature matrices to the configured float dtype (if any).""" if self.float_dtype is None: return # Main subsets if hasattr(self, "X_train"): self.X_train = self._cast_array(self.X_train, "X_train") if hasattr(self, "X_test"): self.X_test = self._cast_array(self.X_test, "X_test") # Fold (if exists) if hasattr(self, "train_folds") and hasattr(self, "val_folds"): for k in range(len(self.train_folds)): self.train_folds[k][0] = self._cast_array(self.train_folds[k][0], f"{k}-th train fold") self.val_folds[k][0] = self._cast_array(self.val_folds[k][0], f"{k}-th val fold") # Raw training set used for fold normalization (if exists) if hasattr(self, "_raw_X_train"): self._raw_X_train = self._cast_array(self._raw_X_train, "_raw_X_train") def _delete_intermediate_data(self) -> None: """Delete intermediate data to free memory.""" if hasattr(self, "data"): delattr(self, "data") if hasattr(self, "X"): delattr(self, "X") if hasattr(self, "y"): delattr(self, "y") gc.collect() def _load(self) -> None: """Load dataset according to dataset given as a parameter.""" try: current_dir = os.path.dirname(__file__) path = os.path.join(current_dir, "..", "datasets", DataLoader.DATASETS[self.dataset]["file"]) except: # Check if the chosen dataset is available raise ValueError( f"The '{self.dataset}' dataset is not available. " f"The available datasets are {', '.join(DataLoader.DATASETS.keys())}." ) # Load dataset logging.info(f"Dataset: {self.dataset}") self.data = pd.read_parquet(path) def _get_input(self) -> pd.DataFrame: """Get the input data X from the dataset. Returns ------- : pd.DataFrame (n_examples, n_features) Input data (features). """ # Get all columns except 'label', 'subset', and 'fold' while preserving order excluded_cols = ["label", "subset", "fold"] selected_cols = [col for col in self.data.columns if col not in excluded_cols] return self.data.loc[:, selected_cols] def _get_output(self) -> pd.Series: """Get the output data y from the dataset. Returns ------- : pd.Series (n_examples, ) Output data (labels). """ return self.data.loc[:, "label"] def _preprocess(self) -> None: """Preprocess the dataset to be used by machine learning models.""" # Setting a default representation for NaN values pd.set_option("future.no_silent_downcasting", True) self.data.replace(to_replace="?", value=np.nan, inplace=True) # Remove rows with at least one NaN value if self._dropna: # Store the number of rows before dropping NaNs initial_row_count = self.data.shape[0] self.data.dropna(inplace=True) self.data.reset_index(drop=True, inplace=True) # Calculate the number of removed rows removed_rows = initial_row_count - self.data.shape[0] logging.info(f"Number of rows removed due to NaN values: {removed_rows}") # Split into input and output data self.X = self._get_input() self.y = self._get_output() # Set number of examples self.n_examples = self.X.shape[0] # Set number of features self.n_features = self.X.shape[1] # For classification tasks if DataLoader.DATASETS[self.dataset]["task"] == "classification": # Set number of classes self.n_classes = self.y.nunique() # Get class identifiers self.classes = sorted(self.y.unique()) # Compute imbalance ratio minority_class = self.y.value_counts().min() majority_class = self.y.value_counts().max() self.imbalance_ratio = round(majority_class/minority_class, 4) # Ensure labels are integer-encoded self.y = self.y.astype(np.int8) def _split(self) -> None: """Split dataset into training and test sets.""" if self.preset: logging.info("Using predefined sets...") # Get predefined training set self.X_train = ( self.data.query("subset == 'train'") .drop(columns=["label", "subset", "fold"], errors="ignore") .to_numpy() ) self.y_train = self.data.query("subset == 'train'")["label"].to_numpy() # Get predefined test set self.X_test = ( self.data.query("subset == 'test'") .drop(columns=["label", "subset", "fold"], errors="ignore") .to_numpy() ) self.y_test = self.data.query("subset == 'test'")["label"].to_numpy() else: logging.info("Splitting data...") # Split data into training and test sets self.X_train, self.X_test, self.y_train, self.y_test = ( train_test_split( self.X.to_numpy(), self.y.to_numpy(), test_size=self.test_ratio, random_state=self.seed ) ) # Set subset sizes self.train_size = self.X_train.shape[0] self.test_size = self.X_test.shape[0] self.test_ratio = round(self.test_size / (self.train_size + self.test_size), 4) logging.info(f"Training set with {self.train_size} observations.") logging.info(f"Test set with {self.test_size} observations.") def _truncate_subsets( self, X_train: pd.DataFrame, X_test: pd.DataFrame ) -> Tuple[pd.DataFrame, pd.DataFrame]: """Truncate the outliers. Parameters ---------- X_train : pd.DataFrame Training input data. X_test : pd.DataFrame Test input data. Returns ------- X_truncated_train : pd.DataFrame Truncated training input data. X_truncated_test : pd.DataFrame Truncated test input data. """ # Winsorization across instances should be done after splitting the data between training # and test set to avoid leakage X_truncated_train = self.winsor.fit_transform(X=X_train) # When truncating the test set, it should apply the quantiles parameters previously # obtained from the training set as-is X_truncated_test = self.winsor.transform(X=X_test) return X_truncated_train, X_truncated_test def _normalize_subsets( self, X_train: pd.DataFrame, X_test: pd.DataFrame ) -> Tuple[pd.DataFrame, pd.DataFrame]: """Normalize feature of subsets. Parameters ---------- X_train : pd.DataFrame Training input data. X_test : pd.DataFrame Test input data. Returns ------- X_normalized_train : pd.DataFrame Normalized training input data. X_normalized_test : pd.DataFrame Normalized test input data. """ # Normalization across instances should be done after splitting the data between training # and test set to avoid leakage normalizer = sklearn_clone(self.normalizer) if self.winsorization: X_train, X_test = self._truncate_subsets(X_train, X_test) X_normalized_train = normalizer.fit_transform(X=X_train) # When normalizing the test set, it should apply the normalization parameters previously # obtained from the training set as-is X_normalized_test = normalizer.transform(X=X_test) return X_normalized_train, X_normalized_test def _model_selection(self) -> None: """Prepare data according to the specified splitter type.""" logging.info(f"Splitter type: {self.splitter_type}.") def _populate_folds(splitter, X, y, groups=None): "Populate the train and validation folds using the provided splitter." self.train_indices = [] self.val_indices = [] split_args = (X, y) if groups is None else (X, y, groups) for train_idx, val_idx in splitter.split(*split_args): # Store the train and validation indices self.train_indices.append(train_idx) self.val_indices.append(val_idx) if self.splitter_type == "k_fold": if self.prefold: if "fold" not in self.data.columns: raise AssertionError( "The 'fold' column should be specified in the dataset when 'prefold' " "is set to True." ) logging.info("Using predefined folds...") train_data = self.data.query("subset == 'train'") kfolds = train_data["fold"].nunique() if self.kfolds is not None and kfolds != self.kfolds: raise AssertionError( f"The number of folds in the training set ({kfolds}) does not match the " f"number of folds specified in the configuration file ({self.kfolds})." ) self.kfolds = kfolds self.splitter = GroupKFold(n_splits=self.kfolds, shuffle=True, random_state=self.seed) _populate_folds(self.splitter, self.X_train, self.y_train, groups=train_data["fold"]) else: is_classification = DataLoader.DATASETS[self.dataset]["task"] == "classification" if self.stratified and is_classification: self.splitter = StratifiedKFold( n_splits=self.kfolds, shuffle=True, random_state=self.seed ) else: self.splitter = KFold( n_splits=self.kfolds, shuffle=True, random_state=self.seed ) _populate_folds(self.splitter, self.X_train, self.y_train) elif self.splitter_type == "leave_one_out": self.splitter = LeaveOneOut() self.kfolds = self.train_size _populate_folds(self.splitter, self.X_train, self.y_train)
[docs] def get_fold( self, k: int, normalize: bool = None ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Get k-th train and validation folds. Parameters ---------- k : int Fold index. normalize : bool, default None If True, normalizes training and validation sets generated by the split method. If None, uses the value of the 'normalize' attribute. Returns ------- X_fold_train : np.ndarray k-th fold training input data. y_fold_train : np.ndarray k-th fold training output data. X_fold_val : np.ndarray k-th fold validation input data. y_fold_val : np.ndarray k-th fold validation output data. """ if not hasattr(self, "train_indices") or not hasattr(self, "val_indices"): raise AttributeError( "The train and validation folds have not been created yet. Please, run the " "'_model_selection()' method first." ) base_X = self._raw_X_train if hasattr(self, "_raw_X_train") else self.X_train train_idx = self.train_indices[k] val_idx = self.val_indices[k] X_fold_train = base_X[train_idx] y_fold_train = self.y_train[train_idx] X_fold_val = base_X[val_idx] y_fold_val = self.y_train[val_idx] if normalize is None: normalize = self.normalize if normalize: X_fold_train, X_fold_val = self._normalize_subsets(X_fold_train, X_fold_val) return X_fold_train, y_fold_train, X_fold_val, y_fold_val
[docs] def get_ready(self) -> None: """Prepare data for a machine learning algorithm to perform feature selection.""" self._load() self._preprocess() self._split() self._model_selection() if self.normalize: logging.info("Normalizing train and test subsets...") if self.splitter_type in ["k_fold", "leave_one_out"]: # No .copy() needed. X_train is replaced (not modified in-place), so this # keeps the pre-normalization array without duplicating memory. self._raw_X_train = self.X_train self.X_train, self.X_test = self._normalize_subsets( X_train=self.X_train, X_test=self.X_test ) self._cast_float_dtype() self._delete_intermediate_data() logging.info("Data is ready for use.")