Skip to content
Snippets Groups Projects
Select Git revision
  • 3de4b35ffa23c1deb5b28426d6c53f73811c00f3
  • master default protected
2 results

clustering-checkpoint.ipynb

Blame
  • binary_classification_workflow.py 17.73 KiB
    # -*- coding: utf-8 -*-
    """
    Spyder Editor
    
    This file contains the preprocessing functions needed to clean 
    and prepare the data.
    """
    
    import seaborn as sns
    import matplotlib.pyplot as plt
    import pandas as pd
    import numpy as np
    from scipy import stats
    from tqdm import tqdm
    from sklearn.preprocessing import MinMaxScaler, StandardScaler, OrdinalEncoder, OneHotEncoder
    from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV
    from sklearn.decomposition import PCA
    from sklearn.metrics import f1_score
    from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score, confusion_matrix
    from sklearn.model_selection import learning_curve
    from sklearn.model_selection import GridSearchCV
    
    
    ######### Main function for preprocessing ############
    def pre_process_data(data):
        """
        Pre-process the data using the defined sub functions.
    
        Parameters:
        -----------
        data (DataFrame): The dataset.
    
        Returns:
        ----------
    
        data: preprocessed data.
        """
    
        numerical_cols = get_numerical_columns(data)
        categorical_cols = get_categorical_columns(data)
    
        scale_normalize(data,numerical_cols)
        if categorical_cols != []:
            fill_categorical_kidney(data, categorical_cols)
            data = convert_categorical_feats(data, categorical_cols)
    
        fill_numerical_columns(data, skew_threshold=0.5)
    
        return data
    
    
    ######### Main function for preparing the data for the training phase ############
    def prepare_training_data(data, target, use_pca=True, threshold_variance_ratio=0.90):
        """
        Prepare the data for training phase.
    
        Parameters:
        -----------
        data (DataFrame): The dataset.
        """
    
        if use_pca:
    
            df, explainable_ratios= feature_selection(data, target, threshold_variance_ratio)
            #Explainable plots
            fig, axes = plt.subplots(1, 2, figsize=(10, 4)) 
            axes[0].scatter(df.loc[df['classification'] == 0, 'PCA1'], df.loc[df['classification'] == 0, 'PCA2'], color='red', label="CKD")
            axes[0].scatter(df.loc[df['classification'] == 1, 'PCA1'], df.loc[df['classification'] == 1, 'PCA2'], color='green', label="NOT CKD")
            axes[0].set_xlabel("PCA1: First component")
            axes[0].set_ylabel("PCA2: Second component")
            axes[0].set_title('Data projection onto the first two PCA components')
            axes[0].legend()
    
            axes[1].plot(range(1,len(explainable_ratios)+1), explainable_ratios)
            axes[1].axhline(y=0.90, linestyle='--', color='red', label='Threshold Ratio')
            axes[1].set_xlabel('Number of components')
            axes[1].set_ylabel('Cumulative Explainable variance')
            axes[1].set_title("Cumulative Explainable variance vs Number of components")
    
            plt.tight_layout()
            plt.subplots_adjust(wspace=0.4)
            plt.show()
    
        X_train, X_test, y_train, y_test, cv = split(df, target ,alpha=0.2,n=5)
        return X_train, X_test, y_train, y_test, cv
    
    
    ######### Main function for training the data ############
    def train_and_tune_model(X_train, y_train, model, param_grid, cv, scoring = "f1", verbose=1):
        """
        Train and fine-tune a binary classification model using GridSearchCV.
    
        Parameters
        ----------
        X_train : DataFrame
            Training data features.
        y_train : Series
            Training data target.
        model : Estimator object
            The binary classification model to be trained.
        param_grid : dict
            The hyperparameter grid to use for fine-tuning the model.
        cv : Cross-validation strategy
            The cross-validation splitting strategy.
        scoring : str or list of str, optional
            The scoring metric(s) to use for evaluation. Default is 'f1-score'.
        verbose : int, optional
            The verbosity level.
    
        Returns
        -------
        best_model : Estimator object
            The best model found during the GridSearchCV process.
        """
        # Setting up the GridSearchCV
        grid_search = GridSearchCV(estimator=model, 
                                   param_grid=param_grid, 
                                   cv=cv, 
                                   scoring=scoring,  
                                   n_jobs=-1,  # Use parallel processing
                                   verbose=verbose)  # amount of messaging (information) output 
    
        # Fitting the model
        grid_search.fit(X_train, y_train)
    
        # Retrieving the best model
        best_model = grid_search.best_estimator_
    
        return best_model
    
    # Example usage
    # from sklearn.ensemble import RandomForestClassifier
    # model = RandomForestClassifier()
    # param_grid = {'n_estimators': [100, 200], 'max_depth': [10, 20]}
    # best_rf_model = train_and_tune_model(X_train, y_train, model, param_grid, cv)
    
    
    ######### Main function to display the results for comparison purposes ############
    def display_results(dict_models, X_train, y_train, X_test, y_test, cv, disp_col):
        """
        Display the F1 scores of different models in a DataFrame for comparison purposes.
    
        Parameters
        ----------
        dict_models : dict
            Contains the models that we want to compare along with their parameter grids.
        X_train : DataFrame
            Training data features.
        y_train : Series
            Training data target
        X_test : DataFrame
            Test data features
        y_test : Series
            Test data target
        cv : StratifiedKFold
            Cross-validation strategy
        disp_col : str
            Name of the column to be displayed
    
        Returns
        -------
        df_results : DataFrame
            DataFrame with the F1 scores.
        """
        
        df_results = pd.DataFrame(columns=["Model Name",disp_col])
    
        for model_name, model_details in tqdm(dict_models.items(), desc="Going through each model defined in the dictionnary..."):
            #extract the details related to every model from the dict
            model_params = model_details["param_grid"]
            model = model_details["model"]
            best_model = train_and_tune_model(X_train, y_train, model, model_params, cv)
            score = test_model(X_test, y_test, best_model) #evaluate f1 score on test data
            rounded_score = np.round(score*100,2)
            new_row = {"Model Name": model_name, disp_col: rounded_score}
            df_results = pd.concat([df_results, pd.DataFrame([new_row])], ignore_index=True)
    
            
            conf_matrix = confusion_matrix(y_test, best_model.predict(X_test))
            plt.figure(figsize=(8, 6))
            sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', cbar=False)
            plt.title(f'Confusion Matrix - {model_name}')
            plt.xlabel('Predicted')
            plt.ylabel('True')
            plt.show()
    
            # Print and analyze additional evaluation metrics
            y_pred = best_model.predict(X_test)
            print(f'Model: {model_name}')
            print(f'Accuracy: {accuracy_score(y_test, y_pred)}')
            print(f'Precision: {precision_score(y_test, y_pred)}')
            print(f'Recall: {recall_score(y_test, y_pred)}')
            print(f'ROC-AUC: {roc_auc_score(y_test, best_model.predict_proba(X_test)[:, 1])}')
            print('\n')
    
            # Plot learning curves
            train_sizes, train_scores, valid_scores = learning_curve(best_model, X_train, y_train, cv=cv, scoring='f1', n_jobs=-1)
            plt.figure(figsize=(8, 6))
            plt.plot(train_sizes, np.mean(train_scores, axis=1), label='Training F1 Score')
            plt.plot(train_sizes, np.mean(valid_scores, axis=1), label='Validation F1 Score')
            plt.xlabel('Training Examples')
            plt.ylabel('F1 Score')
            plt.legend()
            plt.title(f'Learning Curves - {model_name}')
            plt.show()
        # Apply styling after creating the DataFrame
        df_results = df_results.style.highlight_max(subset=[disp_col], color='salmon') #highlight the model with the higher f1 score
        return df_results
    
    
    # Sub-Function to Identify Categorical Columns
    def get_categorical_columns(data):
        """
        Identify categorical columns in a DataFrame.
    
        Parameters:
        -----------
        data (DataFrame): The dataset.
    
        Returns:
        -----------
        list: List of names of categorical columns.
        """
        return data.select_dtypes(include=['object', 'category']).columns.tolist()
    
    # Sub-Function to Identify Numerical Columns
    def get_numerical_columns(data):
        """
        Identify numerical columns in a DataFrame.
    
        Parameters:
        -----------
        data (DataFrame): The dataset.
    
        Returns:
        -----------
        list: List of names of numerical columns.
        """
        if 'id' in data.columns:
            data.drop(columns = ['id'], inplace=True) #Not a relevant column for the classification
    
        return data.iloc[:,:-1].select_dtypes(include=['int64', 'float64']).columns.tolist()
    
    
    def visualise_numerical_data(df,columns=None):
        """
        Parameters
        ----------
        df : Pandas.DataFrame
            the dataframe containing the data of interest.
        columns : List, optional
            A list of the columns we want to visualize . The default is all.
    
        Returns
        -------
        None. Plots the distributions.
    
        """
        if not columns :
            df.hist(bins=10, figsize=(15, 10))
            plt.suptitle('Distribution of Numerical Columns', y=0.92)
            plt.show()
        else :
            for c in columns :
                # Plot a histogram
                sns.histplot(df[c], kde=True)
                plt.title(f'Histogram of the feature {str(c)}')
                plt.xlabel(str(c))
                plt.ylabel('Frequency')
                plt.show()
    
    
    def fill_median(df,col):
        """
        Parameters
        ----------
        df : Pandas.DataFrame
            the dataframe containing the data of interest.
        col : str
            Column name we want to fill.
    
        Returns
        -------
        None. fills inplace the missing values with the median
    
        """
        median = df[col].median()
        df[col].fillna(median, inplace = True)
        print(col+" done !")
    
        
    def fill_mean(df,col):
        """
        Parameters
        ----------
        df : Pandas.DataFrame
            the dataframe containing the data of interest.
        col : str
            Column name we want to fill.
    
        Returns
        -------
        None. fills inplace the missing values with the mean
    
        """
        mean = df[col].mean()
        df[col].fillna(mean, inplace = True)
        print(col+" done !")
    
    
    def is_skewed(df, col, skew_threshold=0.5):
        """
        Check if a column is skewed based on a skewness threshold.
    
        Parameters
        ----------
        df : Pandas.DataFrame
            The DataFrame containing the data of interest.
        col : str
            Column name to check for skewness.
        skew_threshold : float, optional
            Threshold to determine skewness. Default is 0.5.
    
        Returns
        -------
        bool: True if the column is skewed, False otherwise.
        """
        return abs(df[col].skew()) > skew_threshold
    
    
    def has_outliers(df, col, z_score_threshold=3):
        """
        Check for outliers in a column based on Z-scores.
    
        Parameters
        ----------
        df : Pandas.DataFrame
            The DataFrame containing the data of interest.
        col : str
            Column name to check for outliers.
        z_score_threshold : float, optional
            The Z-score threshold to identify outliers. Default is 3.
    
        Returns
        -------
        bool: True if outliers are present, False otherwise.
        """
        z_scores = np.abs(stats.zscore(df[col].dropna()))
        return any(z_scores > z_score_threshold)
    
    
    def fill_numerical_columns(df, skew_threshold=0.5):
        """
        Fills the numerical columns of a DataFrame, using median for skewed columns
        and mean for less skewed (or symmetric) columns.
    
        Parameters
        ----------
        df : Pandas.DataFrame
            The DataFrame to process.
        skew_threshold : float, optional
            Threshold to determine skewness. Default is 0.5.
    
        Returns
        -------
        None: Modifies the DataFrame in place.
        """
        numerical_columns = get_numerical_columns(df)
    
        for col in numerical_columns:
            # Determine if the column is skewed
            skewed = is_skewed(df, col, skew_threshold)
    
            # Fill missing values based on skewness
            if skewed:
                fill_median(df, col)
            else:
                # For less skewed columns, check for outliers before deciding on mean imputation
                if has_outliers(df, col):
                    fill_median(df, col)
                else:
                    fill_mean(df, col)
    
    
    def fill_categorical_kidney(df,columns):
        """
        Parameters
        ----------
        df : Pandas.DataFrame
            must be the kidney data frame.
        columns : List
            The list of categorical features.
    
    
        Returns
        -------
        None. This function fills the categorical columns of the kidney df.
    
        """
        # Rename correctly 
        df['dm'].replace({'\tno': 'no', '\tyes': 'yes', ' yes': 'yes'}, inplace=True)
        df['cad'].replace({'\tno': 'no',}, inplace=True)
        df['classification'].replace({'ckd\t':'ckd'}, inplace=True)
        df.replace(to_replace=r'\t', value='', regex=True, inplace=True)
        df.replace(to_replace='?', value=np.nan, inplace=True)
    
    
        # Iterate over categorical columns
        for col in tqdm(columns, desc="Going through each categorical feature..."):
            print(f"\nProcessing column: {col}")
            
            # Compute the frequency of each category
            category_frequencies = df[col].value_counts(normalize=True)
            print("Possible categories and their frequencies:")
            print(category_frequencies)
            
            # Impute missing values based on observed frequencies
            missing_mask = df[col].isnull()
            df.loc[missing_mask, col] = np.random.choice(category_frequencies.index, 
                                                         size=missing_mask.sum(), 
                                                         p=category_frequencies.values)
    
    
    def scale_normalize(df,columns):
        """
        Parameters
        ----------
        df : Pandas.Dataframe
            The dataframe containing the numerical values to scale and normalize.
        columns : List
            List of the numerical columns to scale and normalize.
    
        Returns
        -------
        None. The modifications are done inplace
    
        """ 
        print('#######BEFORE SCALING AND NORMALIZING########')
        print(df.describe())
        # Min-Max scaling
        min_max_scaler = MinMaxScaler()
        df[columns] = min_max_scaler.fit_transform(df[columns])
        
        # Z-score normalization
        standard_scaler = StandardScaler()
        df[columns] = standard_scaler.fit_transform(df[columns])
        print('#######AFTER SCALING AND NORMALIZING########')
        print(df.describe())
    
    
    def split(df, target,alpha=0.2,n=5):
        """
        Splits the DataFrame into training and testing sets using stratified sampling,
        and prepares cross-validation setup.
    
        Parameters
        ----------
        df : Pandas.DataFrame
            The data frame we want to split.
        target : str
            the name of the target feature.
        alpha : float, optional
            test size. The default is 0.2.
        n : int, optional
            number of splits for cross validation. The default is 5.
    
        Returns
        -------
        tuple: Contains the split datasets (X_train, X_test, y_train, y_test) and 
               the cross-validation object (cv).
    
        """
        # Separate features and target variable
        X = df.drop(target, axis=1)
        y = df[target]
    
        # Stratified train-test split
        X_train, X_test, y_train, y_test = train_test_split( X, y, 
                                                            test_size=alpha,
                                                            random_state=42, 
                                                            stratify=y)
    
        # Prepare stratified cross-validation setup
        cv = StratifiedKFold(n_splits=n, 
                             shuffle=True, 
                             random_state=42)
    
        return X_train, X_test, y_train, y_test, cv
    
    
    def convert_categorical_feats(df, categorical_cols):
        """
        Encode the categorical features of the dataset using OrdinalEncoder 
        and OneHotEncoder.
    
        Parameters:
        ----------
        df : pandas.DataFrame
            The dataset (Kidney OR Banknote).
        categorical_cols : list[str]
            The list of the dataset's categorical features.
    
        """
        ord_encoder = OrdinalEncoder()
        one_hot_encoder = OneHotEncoder(sparse=False)
    
        for col in categorical_cols:
    
            if len(df[col].value_counts()) <= 2: 
                df[col] = ord_encoder.fit_transform(df[[col]]).astype(int)
    
            else:
                one_hot_df = one_hot_encoder.fit_transform(df[[col]])
                names_cols = one_hot_encoder.get_feature_names_out([col])
                encoded_df = pd.DataFrame(one_hot_df, columns=names_cols).astype(int)
                df = pd.concat([df, encoded_df], axis=1)
                df.drop(col, axis=1, inplace=True)
    
        return df 
    
    
    
    def feature_selection(df, target, threshold_variance_ratio=0.99):
        """
        Apply PCA method to reduce the dimensionality of the dataset.
    
        Parameters:
        ----------
        df : pandas.DataFrame
            The dataset (Kidney OR Banknote).
        target : str
            The name of the label column.
        threshold_variance_ratio: float, optional
            The desired explained variance ratio. Defaults to 0.99.
    
        Returns
        -------
        result_df : pandas.DataFrame
            The new dataset after feature selection, along with the target column.
    
        """
        X = df.iloc[:, :-1]
        pca = PCA()
        X = pca.fit_transform(X) #project onto the PCA's feat space
    
        explained_variance_ratios = np.cumsum(pca.explained_variance_ratio_)
        nb_feats = np.argmax(explained_variance_ratios >= threshold_variance_ratio) + 1 # Number of eigenvectors needed to achieve the desired level of explainability
    
        result_df = pd.DataFrame(X[:, :nb_feats], columns=[f'PCA{i+1}' for i in range(nb_feats)])
        result_df[target] = df[target]
    
        return result_df, explained_variance_ratios
    
    
    def test_model(X_test, y_test, model):
        """
        Evaluate the F1 score of a trained model on the test set.
    
        Parameters
        ----------
        X_test : DataFrame
            Test data features.
        y_test : Series
            Test data target.
        model : trained model
            The model to be evaluated.
    
        Returns
        -------
        score : float
            The F1 score of the model on the test set.
        """
        
        y_pred = model.predict(X_test)
        score = f1_score(y_test, y_pred)
        return score