Select Git revision
clustering-checkpoint.ipynb
binary_classification_workflow.py 17.73 KiB
# -*- coding: utf-8 -*-
"""
Spyder Editor
This file contains the preprocessing functions needed to clean
and prepare the data.
"""
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from scipy import stats
from tqdm import tqdm
from sklearn.preprocessing import MinMaxScaler, StandardScaler, OrdinalEncoder, OneHotEncoder
from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV
from sklearn.decomposition import PCA
from sklearn.metrics import f1_score
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score, confusion_matrix
from sklearn.model_selection import learning_curve
from sklearn.model_selection import GridSearchCV
######### Main function for preprocessing ############
def pre_process_data(data):
"""
Pre-process the data using the defined sub functions.
Parameters:
-----------
data (DataFrame): The dataset.
Returns:
----------
data: preprocessed data.
"""
numerical_cols = get_numerical_columns(data)
categorical_cols = get_categorical_columns(data)
scale_normalize(data,numerical_cols)
if categorical_cols != []:
fill_categorical_kidney(data, categorical_cols)
data = convert_categorical_feats(data, categorical_cols)
fill_numerical_columns(data, skew_threshold=0.5)
return data
######### Main function for preparing the data for the training phase ############
def prepare_training_data(data, target, use_pca=True, threshold_variance_ratio=0.90):
"""
Prepare the data for training phase.
Parameters:
-----------
data (DataFrame): The dataset.
"""
if use_pca:
df, explainable_ratios= feature_selection(data, target, threshold_variance_ratio)
#Explainable plots
fig, axes = plt.subplots(1, 2, figsize=(10, 4))
axes[0].scatter(df.loc[df['classification'] == 0, 'PCA1'], df.loc[df['classification'] == 0, 'PCA2'], color='red', label="CKD")
axes[0].scatter(df.loc[df['classification'] == 1, 'PCA1'], df.loc[df['classification'] == 1, 'PCA2'], color='green', label="NOT CKD")
axes[0].set_xlabel("PCA1: First component")
axes[0].set_ylabel("PCA2: Second component")
axes[0].set_title('Data projection onto the first two PCA components')
axes[0].legend()
axes[1].plot(range(1,len(explainable_ratios)+1), explainable_ratios)
axes[1].axhline(y=0.90, linestyle='--', color='red', label='Threshold Ratio')
axes[1].set_xlabel('Number of components')
axes[1].set_ylabel('Cumulative Explainable variance')
axes[1].set_title("Cumulative Explainable variance vs Number of components")
plt.tight_layout()
plt.subplots_adjust(wspace=0.4)
plt.show()
X_train, X_test, y_train, y_test, cv = split(df, target ,alpha=0.2,n=5)
return X_train, X_test, y_train, y_test, cv
######### Main function for training the data ############
def train_and_tune_model(X_train, y_train, model, param_grid, cv, scoring = "f1", verbose=1):
"""
Train and fine-tune a binary classification model using GridSearchCV.
Parameters
----------
X_train : DataFrame
Training data features.
y_train : Series
Training data target.
model : Estimator object
The binary classification model to be trained.
param_grid : dict
The hyperparameter grid to use for fine-tuning the model.
cv : Cross-validation strategy
The cross-validation splitting strategy.
scoring : str or list of str, optional
The scoring metric(s) to use for evaluation. Default is 'f1-score'.
verbose : int, optional
The verbosity level.
Returns
-------
best_model : Estimator object
The best model found during the GridSearchCV process.
"""
# Setting up the GridSearchCV
grid_search = GridSearchCV(estimator=model,
param_grid=param_grid,
cv=cv,
scoring=scoring,
n_jobs=-1, # Use parallel processing
verbose=verbose) # amount of messaging (information) output
# Fitting the model
grid_search.fit(X_train, y_train)
# Retrieving the best model
best_model = grid_search.best_estimator_
return best_model
# Example usage
# from sklearn.ensemble import RandomForestClassifier
# model = RandomForestClassifier()
# param_grid = {'n_estimators': [100, 200], 'max_depth': [10, 20]}
# best_rf_model = train_and_tune_model(X_train, y_train, model, param_grid, cv)
######### Main function to display the results for comparison purposes ############
def display_results(dict_models, X_train, y_train, X_test, y_test, cv, disp_col):
"""
Display the F1 scores of different models in a DataFrame for comparison purposes.
Parameters
----------
dict_models : dict
Contains the models that we want to compare along with their parameter grids.
X_train : DataFrame
Training data features.
y_train : Series
Training data target
X_test : DataFrame
Test data features
y_test : Series
Test data target
cv : StratifiedKFold
Cross-validation strategy
disp_col : str
Name of the column to be displayed
Returns
-------
df_results : DataFrame
DataFrame with the F1 scores.
"""
df_results = pd.DataFrame(columns=["Model Name",disp_col])
for model_name, model_details in tqdm(dict_models.items(), desc="Going through each model defined in the dictionnary..."):
#extract the details related to every model from the dict
model_params = model_details["param_grid"]
model = model_details["model"]
best_model = train_and_tune_model(X_train, y_train, model, model_params, cv)
score = test_model(X_test, y_test, best_model) #evaluate f1 score on test data
rounded_score = np.round(score*100,2)
new_row = {"Model Name": model_name, disp_col: rounded_score}
df_results = pd.concat([df_results, pd.DataFrame([new_row])], ignore_index=True)
conf_matrix = confusion_matrix(y_test, best_model.predict(X_test))
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', cbar=False)
plt.title(f'Confusion Matrix - {model_name}')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()
# Print and analyze additional evaluation metrics
y_pred = best_model.predict(X_test)
print(f'Model: {model_name}')
print(f'Accuracy: {accuracy_score(y_test, y_pred)}')
print(f'Precision: {precision_score(y_test, y_pred)}')
print(f'Recall: {recall_score(y_test, y_pred)}')
print(f'ROC-AUC: {roc_auc_score(y_test, best_model.predict_proba(X_test)[:, 1])}')
print('\n')
# Plot learning curves
train_sizes, train_scores, valid_scores = learning_curve(best_model, X_train, y_train, cv=cv, scoring='f1', n_jobs=-1)
plt.figure(figsize=(8, 6))
plt.plot(train_sizes, np.mean(train_scores, axis=1), label='Training F1 Score')
plt.plot(train_sizes, np.mean(valid_scores, axis=1), label='Validation F1 Score')
plt.xlabel('Training Examples')
plt.ylabel('F1 Score')
plt.legend()
plt.title(f'Learning Curves - {model_name}')
plt.show()
# Apply styling after creating the DataFrame
df_results = df_results.style.highlight_max(subset=[disp_col], color='salmon') #highlight the model with the higher f1 score
return df_results
# Sub-Function to Identify Categorical Columns
def get_categorical_columns(data):
"""
Identify categorical columns in a DataFrame.
Parameters:
-----------
data (DataFrame): The dataset.
Returns:
-----------
list: List of names of categorical columns.
"""
return data.select_dtypes(include=['object', 'category']).columns.tolist()
# Sub-Function to Identify Numerical Columns
def get_numerical_columns(data):
"""
Identify numerical columns in a DataFrame.
Parameters:
-----------
data (DataFrame): The dataset.
Returns:
-----------
list: List of names of numerical columns.
"""
if 'id' in data.columns:
data.drop(columns = ['id'], inplace=True) #Not a relevant column for the classification
return data.iloc[:,:-1].select_dtypes(include=['int64', 'float64']).columns.tolist()
def visualise_numerical_data(df,columns=None):
"""
Parameters
----------
df : Pandas.DataFrame
the dataframe containing the data of interest.
columns : List, optional
A list of the columns we want to visualize . The default is all.
Returns
-------
None. Plots the distributions.
"""
if not columns :
df.hist(bins=10, figsize=(15, 10))
plt.suptitle('Distribution of Numerical Columns', y=0.92)
plt.show()
else :
for c in columns :
# Plot a histogram
sns.histplot(df[c], kde=True)
plt.title(f'Histogram of the feature {str(c)}')
plt.xlabel(str(c))
plt.ylabel('Frequency')
plt.show()
def fill_median(df,col):
"""
Parameters
----------
df : Pandas.DataFrame
the dataframe containing the data of interest.
col : str
Column name we want to fill.
Returns
-------
None. fills inplace the missing values with the median
"""
median = df[col].median()
df[col].fillna(median, inplace = True)
print(col+" done !")
def fill_mean(df,col):
"""
Parameters
----------
df : Pandas.DataFrame
the dataframe containing the data of interest.
col : str
Column name we want to fill.
Returns
-------
None. fills inplace the missing values with the mean
"""
mean = df[col].mean()
df[col].fillna(mean, inplace = True)
print(col+" done !")
def is_skewed(df, col, skew_threshold=0.5):
"""
Check if a column is skewed based on a skewness threshold.
Parameters
----------
df : Pandas.DataFrame
The DataFrame containing the data of interest.
col : str
Column name to check for skewness.
skew_threshold : float, optional
Threshold to determine skewness. Default is 0.5.
Returns
-------
bool: True if the column is skewed, False otherwise.
"""
return abs(df[col].skew()) > skew_threshold
def has_outliers(df, col, z_score_threshold=3):
"""
Check for outliers in a column based on Z-scores.
Parameters
----------
df : Pandas.DataFrame
The DataFrame containing the data of interest.
col : str
Column name to check for outliers.
z_score_threshold : float, optional
The Z-score threshold to identify outliers. Default is 3.
Returns
-------
bool: True if outliers are present, False otherwise.
"""
z_scores = np.abs(stats.zscore(df[col].dropna()))
return any(z_scores > z_score_threshold)
def fill_numerical_columns(df, skew_threshold=0.5):
"""
Fills the numerical columns of a DataFrame, using median for skewed columns
and mean for less skewed (or symmetric) columns.
Parameters
----------
df : Pandas.DataFrame
The DataFrame to process.
skew_threshold : float, optional
Threshold to determine skewness. Default is 0.5.
Returns
-------
None: Modifies the DataFrame in place.
"""
numerical_columns = get_numerical_columns(df)
for col in numerical_columns:
# Determine if the column is skewed
skewed = is_skewed(df, col, skew_threshold)
# Fill missing values based on skewness
if skewed:
fill_median(df, col)
else:
# For less skewed columns, check for outliers before deciding on mean imputation
if has_outliers(df, col):
fill_median(df, col)
else:
fill_mean(df, col)
def fill_categorical_kidney(df,columns):
"""
Parameters
----------
df : Pandas.DataFrame
must be the kidney data frame.
columns : List
The list of categorical features.
Returns
-------
None. This function fills the categorical columns of the kidney df.
"""
# Rename correctly
df['dm'].replace({'\tno': 'no', '\tyes': 'yes', ' yes': 'yes'}, inplace=True)
df['cad'].replace({'\tno': 'no',}, inplace=True)
df['classification'].replace({'ckd\t':'ckd'}, inplace=True)
df.replace(to_replace=r'\t', value='', regex=True, inplace=True)
df.replace(to_replace='?', value=np.nan, inplace=True)
# Iterate over categorical columns
for col in tqdm(columns, desc="Going through each categorical feature..."):
print(f"\nProcessing column: {col}")
# Compute the frequency of each category
category_frequencies = df[col].value_counts(normalize=True)
print("Possible categories and their frequencies:")
print(category_frequencies)
# Impute missing values based on observed frequencies
missing_mask = df[col].isnull()
df.loc[missing_mask, col] = np.random.choice(category_frequencies.index,
size=missing_mask.sum(),
p=category_frequencies.values)
def scale_normalize(df,columns):
"""
Parameters
----------
df : Pandas.Dataframe
The dataframe containing the numerical values to scale and normalize.
columns : List
List of the numerical columns to scale and normalize.
Returns
-------
None. The modifications are done inplace
"""
print('#######BEFORE SCALING AND NORMALIZING########')
print(df.describe())
# Min-Max scaling
min_max_scaler = MinMaxScaler()
df[columns] = min_max_scaler.fit_transform(df[columns])
# Z-score normalization
standard_scaler = StandardScaler()
df[columns] = standard_scaler.fit_transform(df[columns])
print('#######AFTER SCALING AND NORMALIZING########')
print(df.describe())
def split(df, target,alpha=0.2,n=5):
"""
Splits the DataFrame into training and testing sets using stratified sampling,
and prepares cross-validation setup.
Parameters
----------
df : Pandas.DataFrame
The data frame we want to split.
target : str
the name of the target feature.
alpha : float, optional
test size. The default is 0.2.
n : int, optional
number of splits for cross validation. The default is 5.
Returns
-------
tuple: Contains the split datasets (X_train, X_test, y_train, y_test) and
the cross-validation object (cv).
"""
# Separate features and target variable
X = df.drop(target, axis=1)
y = df[target]
# Stratified train-test split
X_train, X_test, y_train, y_test = train_test_split( X, y,
test_size=alpha,
random_state=42,
stratify=y)
# Prepare stratified cross-validation setup
cv = StratifiedKFold(n_splits=n,
shuffle=True,
random_state=42)
return X_train, X_test, y_train, y_test, cv
def convert_categorical_feats(df, categorical_cols):
"""
Encode the categorical features of the dataset using OrdinalEncoder
and OneHotEncoder.
Parameters:
----------
df : pandas.DataFrame
The dataset (Kidney OR Banknote).
categorical_cols : list[str]
The list of the dataset's categorical features.
"""
ord_encoder = OrdinalEncoder()
one_hot_encoder = OneHotEncoder(sparse=False)
for col in categorical_cols:
if len(df[col].value_counts()) <= 2:
df[col] = ord_encoder.fit_transform(df[[col]]).astype(int)
else:
one_hot_df = one_hot_encoder.fit_transform(df[[col]])
names_cols = one_hot_encoder.get_feature_names_out([col])
encoded_df = pd.DataFrame(one_hot_df, columns=names_cols).astype(int)
df = pd.concat([df, encoded_df], axis=1)
df.drop(col, axis=1, inplace=True)
return df
def feature_selection(df, target, threshold_variance_ratio=0.99):
"""
Apply PCA method to reduce the dimensionality of the dataset.
Parameters:
----------
df : pandas.DataFrame
The dataset (Kidney OR Banknote).
target : str
The name of the label column.
threshold_variance_ratio: float, optional
The desired explained variance ratio. Defaults to 0.99.
Returns
-------
result_df : pandas.DataFrame
The new dataset after feature selection, along with the target column.
"""
X = df.iloc[:, :-1]
pca = PCA()
X = pca.fit_transform(X) #project onto the PCA's feat space
explained_variance_ratios = np.cumsum(pca.explained_variance_ratio_)
nb_feats = np.argmax(explained_variance_ratios >= threshold_variance_ratio) + 1 # Number of eigenvectors needed to achieve the desired level of explainability
result_df = pd.DataFrame(X[:, :nb_feats], columns=[f'PCA{i+1}' for i in range(nb_feats)])
result_df[target] = df[target]
return result_df, explained_variance_ratios
def test_model(X_test, y_test, model):
"""
Evaluate the F1 score of a trained model on the test set.
Parameters
----------
X_test : DataFrame
Test data features.
y_test : Series
Test data target.
model : trained model
The model to be evaluated.
Returns
-------
score : float
The F1 score of the model on the test set.
"""
y_pred = model.predict(X_test)
score = f1_score(y_test, y_pred)
return score